Marsyas
0.6.0-alpha
|
00001 /* 00002 ** Copyright (C) 2012 Nate Bogdanowicz <natezb@gmail.com> 00003 ** 00004 ** This program is free software; you can redistribute it and/or modify 00005 ** it under the terms of the GNU General Public License as published by 00006 ** the Free Software Foundation; either version 2 of the License, or 00007 ** (at your option) any later version. 00008 ** 00009 ** This program is distributed in the hope that it will be useful, 00010 ** but WITHOUT ANY WARRANTY; without even the implied warranty of 00011 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00012 ** GNU General Public License for more details. 00013 ** 00014 ** You should have received a copy of the GNU General Public License 00015 ** along with this program; if not, write to the Free Software 00016 ** Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. 00017 */ 00018 00019 #include <marsyas/common_source.h> 00020 #include <marsyas/common_header.h> 00021 #include "GStreamerSource.h" 00022 00023 #include <glib.h> 00024 #include <gst/gst.h> 00025 #include <gst/app/gstappsink.h> 00026 00027 00028 using namespace Marsyas; 00029 00030 00031 /* This must be a free-standing function (i.e. not a member of a class), since 00032 * GStreamer is written in plain C. */ 00033 void 00034 on_pad_added(GstElement *dec, GstPad *pad, GstElement *element) 00035 { 00036 (void) dec; 00037 GstPad *sinkpad; 00038 00039 sinkpad = gst_element_get_static_pad(element, "sink"); 00040 if(!gst_pad_is_linked(sinkpad)) { 00041 if(gst_pad_link(pad, sinkpad) != GST_PAD_LINK_OK) { 00042 MRSERR("Failed to link pads"); 00043 } 00044 } 00045 00046 gst_object_unref(sinkpad); 00047 } 00048 00049 00050 GStreamerSource::GStreamerSource(mrs_string name):AbsSoundFileSource("GStreamerSource", name) 00051 { 00052 size_ = 0; 00053 pos_ = 0; 00054 hasData_ = false; 00055 lastTickWithData_ = false; 00056 playing_ = false; 00057 buffer_size_ = 0; 00058 buffer_left_ = 0; 00059 pipe_created_ = false; 00060 filename_ = "gst-source"; 00061 00062 addControls(); 00063 } 00064 00065 GStreamerSource::GStreamerSource(const GStreamerSource& a): AbsSoundFileSource(a) 00066 { 00067 size_ = 0; 00068 pos_ = 0; 00069 hasData_ = false; 00070 lastTickWithData_ = false; 00071 playing_ = false; 00072 buffer_size_ = 0; 00073 buffer_left_ = 0; 00074 pipe_created_ = false; 00075 filename_ = "gst-source"; 00076 00077 ctrl_pos_ = getctrl("mrs_natural/pos"); 00078 ctrl_currentlyPlaying_ = getctrl("mrs_string/currentlyPlaying"); 00079 ctrl_previouslyPlaying_ = getctrl("mrs_string/previouslyPlaying"); 00080 ctrl_regression_ = getctrl("mrs_bool/regression"); 00081 ctrl_currentLabel_ = getctrl("mrs_real/currentLabel"); 00082 ctrl_previousLabel_ = getctrl("mrs_real/previousLabel"); 00083 ctrl_labelNames_ = getctrl("mrs_string/labelNames"); 00084 ctrl_nLabels_ = getctrl("mrs_natural/nLabels"); 00085 ctrl_currentHasData_ = getctrl("mrs_bool/currentHasData"); 00086 ctrl_currentLastTickWithData_ = getctrl("mrs_bool/currentLastTickWithData"); 00087 ctrl_hasData_ = getctrl("mrs_bool/hasData"); 00088 ctrl_lastTickWithData_ = getctrl("mrs_bool/lastTickWithData"); 00089 } 00090 00091 GStreamerSource::~GStreamerSource() 00092 { 00093 /* Clean up the pipeline and unref the objects we explicitly un-floated */ 00094 if(pipe_ != NULL) { 00095 gst_element_set_state(pipe_, GST_STATE_NULL); 00096 gst_object_unref(pipe_); 00097 } 00098 00099 if(dec_ != NULL) 00100 gst_object_unref(dec_); 00101 00102 if(sink_ != NULL) 00103 gst_object_unref(sink_); 00104 } 00105 00106 00107 MarSystem* GStreamerSource::clone() const 00108 { 00109 return new GStreamerSource(*this); 00110 } 00111 00112 00113 void 00114 GStreamerSource::addControls() 00115 { 00116 addctrl("mrs_natural/size", 0); 00117 addctrl("mrs_natural/pos", 0, ctrl_pos_); 00118 setctrlState("mrs_natural/pos", true); 00119 00120 addctrl("mrs_string/filename", filename_); 00121 setctrlState("mrs_string/filename", true); 00122 00123 addctrl("mrs_bool/hasData", true, ctrl_hasData_); 00124 addctrl("mrs_bool/noteon", false); 00125 setctrlState("mrs_bool/noteon", true); 00126 00127 addctrl("mrs_string/filetype", "gst"); 00128 00129 addctrl("mrs_string/currentlyPlaying", "gst-source", ctrl_currentlyPlaying_); 00130 addctrl("mrs_string/previouslyPlaying", "gst-source", ctrl_previouslyPlaying_); 00131 addctrl("mrs_real/currentLabel", 0.0, ctrl_currentLabel_); 00132 addctrl("mrs_real/previousLabel", 0.0, ctrl_previousLabel_); 00133 addctrl("mrs_natural/nLabels", 0, ctrl_nLabels_); 00134 addctrl("mrs_string/labelNames", ",", ctrl_labelNames_); 00135 addctrl("mrs_bool/regression", false, ctrl_regression_); 00136 00137 addctrl("mrs_bool/lastTickWithData", false, ctrl_lastTickWithData_); 00138 00139 addctrl("mrs_real/repetitions", 1.0); 00140 setctrlState("mrs_real/repetitions", true); 00141 00142 addctrl("mrs_bool/shuffle", false); 00143 setctrlState("mrs_bool/shuffle", true); 00144 00145 addctrl("mrs_real/duration", -1.0); 00146 setctrlState("mrs_real/duration", true); 00147 00148 addctrl("mrs_natural/loopPos", (mrs_natural)0); 00149 setctrlState("mrs_natural/loopPos", true); 00150 00151 addctrl("mrs_natural/advance", 0); 00152 setctrlState("mrs_natural/advance", true); 00153 00154 addctrl("mrs_natural/cindex", 0); 00155 setctrlState("mrs_natural/cindex", true); 00156 00157 addctrl("mrs_string/allfilenames", ","); 00158 addctrl("mrs_natural/numFiles", 1); 00159 00160 addctrl("mrs_bool/currentHasData", true, ctrl_currentHasData_); 00161 addctrl("mrs_bool/currentLastTickWithData", false, ctrl_currentLastTickWithData_); 00162 } 00163 00164 00165 void 00166 GStreamerSource::init_pipeline() 00167 { 00168 GstElement *conv; 00169 00170 /* Initialize GStreamer */ 00171 if(!gst_is_initialized() && !gst_init_check(NULL, NULL, NULL)) { 00172 MRSERR("GStreamer could not be initialized!"); 00173 return; 00174 } 00175 00176 /* Create pipeline and elements */ 00177 pipe_ = gst_pipeline_new("pipeline"); 00178 dec_ = gst_element_factory_make("uridecodebin", NULL); 00179 conv = gst_element_factory_make("audioconvert", NULL); 00180 sink_ = gst_element_factory_make("appsink", NULL); 00181 00182 /* Unfloat floating references we wish to access later */ 00183 gst_object_ref_sink(pipe_); 00184 gst_object_ref_sink(dec_); 00185 gst_object_ref_sink(sink_); 00186 00187 /* Configure pipeline and elements */ 00188 gst_pipeline_use_clock(GST_PIPELINE(pipe_), NULL); 00189 00190 /* Set up the capabilities filtering */ 00191 GstCaps *caps = gst_caps_new_simple("audio/x-raw", 00192 "format", G_TYPE_STRING, g_strdup_printf("F%luLE", sizeof(mrs_real)*8), 00193 NULL); 00194 gst_app_sink_set_caps(GST_APP_SINK(sink_), caps); 00195 gst_caps_unref(caps); 00196 00197 /* Add everything to the pipeline and link up static pads */ 00198 gst_bin_add_many(GST_BIN(pipe_), dec_, conv, sink_, NULL); 00199 gst_element_link(conv, sink_); 00200 00201 /* Set up callback to link the decoder's dynamic pad to the converter */ 00202 g_signal_connect(dec_, "pad_added", G_CALLBACK(on_pad_added), conv); 00203 00204 pipe_created_ = true; 00205 } 00206 00207 00208 void 00209 GStreamerSource::getHeader(mrs_string filename) 00210 { 00211 if(!pipe_created_) { 00212 init_pipeline(); 00213 if(!pipe_created_) { 00214 // Pipe creation failed 00215 size_ = -1; 00216 return; 00217 } 00218 } 00219 00220 /* (Re)Set the uridecodebin's filename */ 00221 gst_element_set_state(pipe_, GST_STATE_NULL); 00222 if(gst_uri_is_valid(filename.c_str())) { 00223 /* "filename" is already a valid uri, just use it as-is */ 00224 g_object_set(G_OBJECT(dec_), "uri", filename.c_str(), NULL); 00225 } else { 00226 g_object_set(G_OBJECT(dec_), "uri", gst_filename_to_uri(filename.c_str(), NULL), NULL); 00227 } 00228 00229 /* Set to PAUSED so we can get a preroll buffer */ 00230 GstStateChangeReturn ret = gst_element_set_state(pipe_, GST_STATE_PAUSED); 00231 if(ret == GST_STATE_CHANGE_FAILURE) { 00232 MRSERR("GStreamer pipeline failed to change state. This could be due to an invalid filename"); 00233 } 00234 GstSample *sample = gst_app_sink_pull_preroll(GST_APP_SINK(sink_)); 00235 00236 /* Grab the sample's caps so we can get some useful info */ 00237 GstCaps *samp_caps = gst_sample_get_caps(sample); 00238 GstStructure *samp_struct = gst_caps_get_structure(samp_caps, 0); 00239 00240 /* Get sample rate from buffer */ 00241 gint rate; 00242 if(gst_structure_get_int(samp_struct, "rate", &rate)) { 00243 osrate_ = (mrs_real)rate; 00244 } 00245 00246 /* Get number of channels from buffer */ 00247 gint channels; 00248 if(gst_structure_get_int(samp_struct, "channels", &channels)) { 00249 onObservations_ = channels; 00250 } 00251 00252 /* Get the rough size of the audio. Use GST_FORMAT_TIME since it works with most file types */ 00253 gint64 duration; 00254 GstFormat format = GST_FORMAT_TIME; 00255 00256 // Force blocking on state change completion 00257 gst_element_get_state(pipe_, NULL, NULL, GST_SECOND); 00258 if (gst_element_query_duration(GST_ELEMENT(pipe_), format, &duration) && format==GST_FORMAT_TIME) { 00259 size_ = (mrs_natural)(duration*1e-9*osrate_ + 0.5); 00260 } else { 00261 /* GStreamer can't tell us the size of the stream */ 00262 MRSWARN("Query Duration failed:"); 00263 size_ = -1; 00264 } 00265 00266 /* Reset these in case this isn't our first song */ 00267 pos_ = 0; 00268 hasData_ = true; 00269 lastTickWithData_ = false; 00270 00271 /* Clean up */ 00272 gst_caps_unref(samp_caps); 00273 00274 /* Set Controls */ 00275 //setctrl("mrs_real/israte", osrate_); 00276 setctrl("mrs_real/osrate", osrate_); 00277 setctrl("mrs_natural/onObservations", onObservations_); 00278 //setctrl("mrs_natural/inObservations", onObservations_); 00279 setctrl("mrs_natural/size", size_); 00280 setctrl("mrs_natural/pos", pos_); 00281 ctrl_hasData_->setValue(hasData_); 00282 ctrl_lastTickWithData_->setValue(lastTickWithData_); 00283 ctrl_currentHasData_->setValue(hasData_); 00284 ctrl_currentLastTickWithData_->setValue(lastTickWithData_); 00285 00286 00287 /* Start playing, so queue fills with buffers [Should we do this?] */ 00288 gst_element_set_state(pipe_, GST_STATE_PLAYING); 00289 playing_ = true; // Should we check if set_state worked before doing this? 00290 } 00291 00292 void 00293 GStreamerSource::copyFromBuffer(GstBuffer *buf, 00294 mrs_natural buf_start, 00295 realvec& vec, 00296 mrs_natural vec_start, 00297 mrs_natural length) 00298 { 00299 mrs_natural i, ch; 00300 for(ch = 0; ch < onObservations_; ch++) { 00301 for(i = 0; i < length; i++) { 00302 mrs_real sample; 00303 gst_buffer_extract(buf, ((buf_start + i)*onObservations_ + ch)*sizeof(mrs_real), &sample, sizeof(mrs_real)); 00304 vec(ch, vec_start + i) = sample; 00305 } 00306 } 00307 00308 } 00309 00310 00311 mrs_bool 00312 GStreamerSource::pull_buffer() { 00313 GstSample *sample = gst_app_sink_pull_sample(GST_APP_SINK(sink_)); 00314 00315 if(sample == NULL) { 00316 if(gst_app_sink_is_eos(GST_APP_SINK(sink_))) { 00317 /* EOS, stop pulling buffers */ 00318 hasData_ = false; 00319 lastTickWithData_ = true; 00320 00321 ctrl_hasData_->setValue(hasData_); 00322 ctrl_lastTickWithData_->setValue(lastTickWithData_); 00323 ctrl_currentHasData_->setValue(hasData_); 00324 ctrl_currentLastTickWithData_->setValue(lastTickWithData_); 00325 playing_ = false; 00326 return false; 00327 } else { 00328 /* Something messed up. Should we try pulling again? */ 00329 MRSERR("GStreamer Error"); 00330 return false; 00331 } 00332 } else { 00333 buffer_ = gst_sample_get_buffer(sample); 00334 buffer_size_ = gst_buffer_get_size(buffer_) / sizeof(mrs_real) / onObservations_; 00335 buffer_left_ = buffer_size_; 00336 } 00337 00338 return true; 00339 } 00340 00341 00342 void 00343 GStreamerSource::myProcess(realvec& in, realvec& out) 00344 { 00345 (void)in; // Suppress warning from unused parameter 00346 00347 // TODO Should output_size be calculated from output.getCols() or in/onSamples_? 00348 mrs_natural output_size = inSamples_; 00349 mrs_natural output_left = output_size; 00350 //mrs_real *output = out.getData(); 00351 00352 while(output_left > 0) { 00353 if(buffer_left_ == 0) { 00354 /* Current buffer empty, so pull a new one */ 00355 if(!pull_buffer()) { 00356 break; 00357 } 00358 } 00359 00360 if(buffer_left_ >= output_left) { 00361 /* Copy enough to fill the output */ 00362 /* GSt_BUFFER_DATA() returns guint8*, i.e. a byte-pointer, while output it a double-pointer */ 00363 // memcpy(output + output_size - output_left, 00364 // GST_BUFFER_DATA(buffer_) + (buffer_size_ - buffer_left_)*sizeof(mrs_real), 00365 // output_left*sizeof(mrs_real)); 00366 copyFromBuffer(buffer_, buffer_size_-buffer_left_, out, output_size-output_left, output_left); 00367 buffer_left_ -= output_left; 00368 output_left = 0; 00369 } else { 00370 /* Copy what's left in the buffer */ 00371 // memcpy(output + (output_size - output_left), 00372 // GST_BUFFER_DATA(buffer_) + (buffer_size_ - buffer_left_)*sizeof(mrs_real), 00373 // buffer_left_*sizeof(mrs_real)); 00374 copyFromBuffer(buffer_, buffer_size_-buffer_left_, out, output_size-output_left, buffer_left_); 00375 output_left -= buffer_left_; 00376 buffer_left_ = 0; 00377 } 00378 00379 if(buffer_left_ == 0 && buffer_ != NULL) { 00380 /* Buffer empty, unref it */ 00381 gst_buffer_unref(buffer_); 00382 } 00383 } 00384 00385 // /* Make sure unfilled output is zeroed (TODO is this necessary?) */ 00386 // mrs_natural i; 00387 // for(i = output_size-1; i >= output_size-output_left; --i) { 00388 // out(0,i) = 0.0; 00389 // } 00390 00391 /* Update current position */ 00392 pos_ += output_size - output_left; 00393 setctrl("mrs_natural/pos", pos_); 00394 } 00395 00396 mrs_bool 00397 GStreamerSource::seek() 00398 { 00399 GstSeekFlags flags = (GstSeekFlags)(GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_KEY_UNIT); 00400 gint64 seek_to = (gint64)(newpos_/osrate_*1e9); 00401 if(seek_to >= 0) { 00402 gst_element_seek_simple(GST_ELEMENT(pipe_), GST_FORMAT_TIME, flags, seek_to); 00403 } else { 00404 MRSWARN("Tried to seek to negative position"); 00405 return false; 00406 } 00407 00408 GstStateChangeReturn res = gst_element_get_state(pipe_, NULL, NULL, GST_SECOND); 00409 if(res != GST_STATE_CHANGE_SUCCESS) { 00410 MRSERR("Seek failed!"); 00411 return false; 00412 } 00413 00414 gint64 cur; 00415 GstFormat format = GST_FORMAT_TIME; 00416 if(!gst_element_query_position(GST_ELEMENT(pipe_), format, &cur)) { 00417 MRSERR("Position query failed!"); 00418 return false; 00419 } 00420 00421 pos_ = cur*1e-9*osrate_; 00422 buffer_left_ = 0; 00423 return true; 00424 } 00425 00426 void 00427 GStreamerSource::myUpdate(MarControlPtr sender) 00428 { 00429 (void) sender; // Suppress warning from unused parameter 00430 00431 // This stuff is already done in MarSystem::update() 00432 //inSamples_ = getctrl("mrs_natural/inSamples")->to<mrs_natural>(); 00433 //inObservations_ = getctrl("mrs_natural/inObservations")->to<mrs_natural>(); 00434 //israte_ = getctrl("mrs_real/israte")->to<mrs_real>(); 00435 00436 /* This seems to be the convention, to ignore/overwrite the onSamples control */ 00437 setctrl("mrs_natural/onSamples", inSamples_); 00438 00439 tfilename_ = filename_; 00440 filename_ = getctrl("mrs_string/filename")->to<mrs_string>(); 00441 00442 /* Check if filename was changed */ 00443 if(g_strcmp0(tfilename_.c_str(), filename_.c_str()) != 0) { 00444 getHeader(filename_); 00445 } 00446 00447 /* Only deal with pos_ if there's a playing stream */ 00448 if(playing_) { 00449 newpos_ = getctrl("mrs_natural/pos")->to<mrs_natural>(); 00450 00451 if(newpos_ != pos_) { 00452 /* Position control was written to, try to seek */ 00453 seek(); 00454 } 00455 00456 /* Write current (possibly updated) position back to the control */ 00457 setctrl("mrs_natural/pos", pos_); 00458 } 00459 } 00460