Marsyas  0.6.0-alpha
/usr/src/RPM/BUILD/marsyas-0.6.0/src/marsyas/marsystems/GStreamerSource.cpp
Go to the documentation of this file.
00001 /*
00002 ** Copyright (C) 2012 Nate Bogdanowicz <natezb@gmail.com>
00003 **
00004 ** This program is free software; you can redistribute it and/or modify
00005 ** it under the terms of the GNU General Public License as published by
00006 ** the Free Software Foundation; either version 2 of the License, or
00007 ** (at your option) any later version.
00008 **
00009 ** This program is distributed in the hope that it will be useful,
00010 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
00011 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012 ** GNU General Public License for more details.
00013 **
00014 ** You should have received a copy of the GNU General Public License
00015 ** along with this program; if not, write to the Free Software
00016 ** Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
00017 */
00018 
00019 #include <marsyas/common_source.h>
00020 #include <marsyas/common_header.h>
00021 #include "GStreamerSource.h"
00022 
00023 #include <glib.h>
00024 #include <gst/gst.h>
00025 #include <gst/app/gstappsink.h>
00026 
00027 
00028 using namespace Marsyas;
00029 
00030 
00031 /* This must be a free-standing function (i.e. not a member of a class), since
00032  * GStreamer is written in plain C. */
00033 void
00034 on_pad_added(GstElement *dec, GstPad *pad, GstElement *element)
00035 {
00036   (void) dec;
00037   GstPad *sinkpad;
00038 
00039   sinkpad = gst_element_get_static_pad(element, "sink");
00040   if(!gst_pad_is_linked(sinkpad)) {
00041     if(gst_pad_link(pad, sinkpad) != GST_PAD_LINK_OK) {
00042       MRSERR("Failed to link pads");
00043     }
00044   }
00045 
00046   gst_object_unref(sinkpad);
00047 }
00048 
00049 
00050 GStreamerSource::GStreamerSource(mrs_string name):AbsSoundFileSource("GStreamerSource", name)
00051 {
00052   size_ = 0;
00053   pos_ = 0;
00054   hasData_ = false;
00055   lastTickWithData_ = false;
00056   playing_ = false;
00057   buffer_size_ = 0;
00058   buffer_left_ = 0;
00059   pipe_created_ = false;
00060   filename_ = "gst-source";
00061 
00062   addControls();
00063 }
00064 
00065 GStreamerSource::GStreamerSource(const GStreamerSource& a): AbsSoundFileSource(a)
00066 {
00067   size_ = 0;
00068   pos_ = 0;
00069   hasData_ = false;
00070   lastTickWithData_ = false;
00071   playing_ = false;
00072   buffer_size_ = 0;
00073   buffer_left_ = 0;
00074   pipe_created_ = false;
00075   filename_ = "gst-source";
00076 
00077   ctrl_pos_ = getctrl("mrs_natural/pos");
00078   ctrl_currentlyPlaying_ = getctrl("mrs_string/currentlyPlaying");
00079   ctrl_previouslyPlaying_ = getctrl("mrs_string/previouslyPlaying");
00080   ctrl_regression_ = getctrl("mrs_bool/regression");
00081   ctrl_currentLabel_ = getctrl("mrs_real/currentLabel");
00082   ctrl_previousLabel_ = getctrl("mrs_real/previousLabel");
00083   ctrl_labelNames_ = getctrl("mrs_string/labelNames");
00084   ctrl_nLabels_ = getctrl("mrs_natural/nLabels");
00085   ctrl_currentHasData_ = getctrl("mrs_bool/currentHasData");
00086   ctrl_currentLastTickWithData_ = getctrl("mrs_bool/currentLastTickWithData");
00087   ctrl_hasData_ = getctrl("mrs_bool/hasData");
00088   ctrl_lastTickWithData_ = getctrl("mrs_bool/lastTickWithData");
00089 }
00090 
00091 GStreamerSource::~GStreamerSource()
00092 {
00093   /* Clean up the pipeline and unref the objects we explicitly un-floated */
00094   if(pipe_ != NULL) {
00095     gst_element_set_state(pipe_, GST_STATE_NULL);
00096     gst_object_unref(pipe_);
00097   }
00098 
00099   if(dec_ != NULL)
00100     gst_object_unref(dec_);
00101 
00102   if(sink_ != NULL)
00103     gst_object_unref(sink_);
00104 }
00105 
00106 
00107 MarSystem* GStreamerSource::clone() const
00108 {
00109   return new GStreamerSource(*this);
00110 }
00111 
00112 
00113 void
00114 GStreamerSource::addControls()
00115 {
00116   addctrl("mrs_natural/size", 0);
00117   addctrl("mrs_natural/pos", 0, ctrl_pos_);
00118   setctrlState("mrs_natural/pos", true);
00119 
00120   addctrl("mrs_string/filename", filename_);
00121   setctrlState("mrs_string/filename", true);
00122 
00123   addctrl("mrs_bool/hasData", true, ctrl_hasData_);
00124   addctrl("mrs_bool/noteon", false);
00125   setctrlState("mrs_bool/noteon", true);
00126 
00127   addctrl("mrs_string/filetype", "gst");
00128 
00129   addctrl("mrs_string/currentlyPlaying", "gst-source", ctrl_currentlyPlaying_);
00130   addctrl("mrs_string/previouslyPlaying", "gst-source", ctrl_previouslyPlaying_);
00131   addctrl("mrs_real/currentLabel", 0.0, ctrl_currentLabel_);
00132   addctrl("mrs_real/previousLabel", 0.0, ctrl_previousLabel_);
00133   addctrl("mrs_natural/nLabels", 0, ctrl_nLabels_);
00134   addctrl("mrs_string/labelNames", ",", ctrl_labelNames_);
00135   addctrl("mrs_bool/regression", false, ctrl_regression_);
00136 
00137   addctrl("mrs_bool/lastTickWithData", false, ctrl_lastTickWithData_);
00138 
00139   addctrl("mrs_real/repetitions", 1.0);
00140   setctrlState("mrs_real/repetitions", true);
00141 
00142   addctrl("mrs_bool/shuffle", false);
00143   setctrlState("mrs_bool/shuffle", true);
00144 
00145   addctrl("mrs_real/duration", -1.0);
00146   setctrlState("mrs_real/duration", true);
00147 
00148   addctrl("mrs_natural/loopPos", (mrs_natural)0);
00149   setctrlState("mrs_natural/loopPos", true);
00150 
00151   addctrl("mrs_natural/advance", 0);
00152   setctrlState("mrs_natural/advance", true);
00153 
00154   addctrl("mrs_natural/cindex", 0);
00155   setctrlState("mrs_natural/cindex", true);
00156 
00157   addctrl("mrs_string/allfilenames", ",");
00158   addctrl("mrs_natural/numFiles", 1);
00159 
00160   addctrl("mrs_bool/currentHasData", true, ctrl_currentHasData_);
00161   addctrl("mrs_bool/currentLastTickWithData", false, ctrl_currentLastTickWithData_);
00162 }
00163 
00164 
00165 void
00166 GStreamerSource::init_pipeline()
00167 {
00168   GstElement *conv;
00169 
00170   /* Initialize GStreamer */
00171   if(!gst_is_initialized() && !gst_init_check(NULL, NULL, NULL)) {
00172     MRSERR("GStreamer could not be initialized!");
00173     return;
00174   }
00175 
00176   /* Create pipeline and elements */
00177   pipe_ = gst_pipeline_new("pipeline");
00178   dec_ = gst_element_factory_make("uridecodebin", NULL);
00179   conv = gst_element_factory_make("audioconvert", NULL);
00180   sink_ = gst_element_factory_make("appsink", NULL);
00181 
00182   /* Unfloat floating references we wish to access later */
00183   gst_object_ref_sink(pipe_);
00184   gst_object_ref_sink(dec_);
00185   gst_object_ref_sink(sink_);
00186 
00187   /* Configure pipeline and elements */
00188   gst_pipeline_use_clock(GST_PIPELINE(pipe_), NULL);
00189 
00190   /* Set up the capabilities filtering */
00191   GstCaps *caps = gst_caps_new_simple("audio/x-raw",
00192                                       "format", G_TYPE_STRING, g_strdup_printf("F%luLE", sizeof(mrs_real)*8),
00193                                       NULL);
00194   gst_app_sink_set_caps(GST_APP_SINK(sink_), caps);
00195   gst_caps_unref(caps);
00196 
00197   /* Add everything to the pipeline and link up static pads */
00198   gst_bin_add_many(GST_BIN(pipe_), dec_, conv, sink_, NULL);
00199   gst_element_link(conv, sink_);
00200 
00201   /* Set up callback to link the decoder's dynamic pad to the converter */
00202   g_signal_connect(dec_, "pad_added", G_CALLBACK(on_pad_added), conv);
00203 
00204   pipe_created_ = true;
00205 }
00206 
00207 
00208 void
00209 GStreamerSource::getHeader(mrs_string filename)
00210 {
00211   if(!pipe_created_) {
00212     init_pipeline();
00213     if(!pipe_created_) {
00214       // Pipe creation failed
00215       size_ = -1;
00216       return;
00217     }
00218   }
00219 
00220   /* (Re)Set the uridecodebin's filename */
00221   gst_element_set_state(pipe_, GST_STATE_NULL);
00222   if(gst_uri_is_valid(filename.c_str())) {
00223     /* "filename" is already a valid uri, just use it as-is */
00224     g_object_set(G_OBJECT(dec_), "uri", filename.c_str(), NULL);
00225   } else {
00226     g_object_set(G_OBJECT(dec_), "uri", gst_filename_to_uri(filename.c_str(), NULL), NULL);
00227   }
00228 
00229   /* Set to PAUSED so we can get a preroll buffer */
00230   GstStateChangeReturn ret = gst_element_set_state(pipe_, GST_STATE_PAUSED);
00231   if(ret == GST_STATE_CHANGE_FAILURE) {
00232     MRSERR("GStreamer pipeline failed to change state. This could be due to an invalid filename");
00233   }
00234   GstSample *sample = gst_app_sink_pull_preroll(GST_APP_SINK(sink_));
00235 
00236   /* Grab the sample's caps so we can get some useful info */
00237   GstCaps *samp_caps = gst_sample_get_caps(sample);
00238   GstStructure *samp_struct = gst_caps_get_structure(samp_caps, 0);
00239 
00240   /* Get sample rate from buffer */
00241   gint rate;
00242   if(gst_structure_get_int(samp_struct, "rate", &rate)) {
00243     osrate_ = (mrs_real)rate;
00244   }
00245 
00246   /* Get number of channels from buffer */
00247   gint channels;
00248   if(gst_structure_get_int(samp_struct, "channels", &channels)) {
00249     onObservations_ = channels;
00250   }
00251 
00252   /* Get the rough size of the audio. Use GST_FORMAT_TIME since it works with most file types */
00253   gint64 duration;
00254   GstFormat format = GST_FORMAT_TIME;
00255 
00256   // Force blocking on state change completion
00257   gst_element_get_state(pipe_, NULL, NULL, GST_SECOND);
00258   if (gst_element_query_duration(GST_ELEMENT(pipe_), format, &duration) && format==GST_FORMAT_TIME) {
00259     size_ = (mrs_natural)(duration*1e-9*osrate_ + 0.5);
00260   } else {
00261     /* GStreamer can't tell us the size of the stream */
00262     MRSWARN("Query Duration failed:");
00263     size_ = -1;
00264   }
00265 
00266   /* Reset these in case this isn't our first song */
00267   pos_ = 0;
00268   hasData_ = true;
00269   lastTickWithData_ = false;
00270 
00271   /* Clean up */
00272   gst_caps_unref(samp_caps);
00273 
00274   /* Set Controls */
00275   //setctrl("mrs_real/israte", osrate_);
00276   setctrl("mrs_real/osrate", osrate_);
00277   setctrl("mrs_natural/onObservations", onObservations_);
00278   //setctrl("mrs_natural/inObservations", onObservations_);
00279   setctrl("mrs_natural/size", size_);
00280   setctrl("mrs_natural/pos", pos_);
00281   ctrl_hasData_->setValue(hasData_);
00282   ctrl_lastTickWithData_->setValue(lastTickWithData_);
00283   ctrl_currentHasData_->setValue(hasData_);
00284   ctrl_currentLastTickWithData_->setValue(lastTickWithData_);
00285 
00286 
00287   /* Start playing, so queue fills with buffers [Should we do this?] */
00288   gst_element_set_state(pipe_, GST_STATE_PLAYING);
00289   playing_ = true; // Should we check if set_state worked before doing this?
00290 }
00291 
00292 void
00293 GStreamerSource::copyFromBuffer(GstBuffer   *buf,
00294                                 mrs_natural  buf_start,
00295                                 realvec&     vec,
00296                                 mrs_natural  vec_start,
00297                                 mrs_natural  length)
00298 {
00299   mrs_natural i, ch;
00300   for(ch = 0; ch < onObservations_; ch++) {
00301     for(i = 0; i < length; i++) {
00302       mrs_real sample;
00303       gst_buffer_extract(buf, ((buf_start + i)*onObservations_ + ch)*sizeof(mrs_real), &sample, sizeof(mrs_real));
00304       vec(ch, vec_start + i) = sample;
00305     }
00306   }
00307 
00308 }
00309 
00310 
00311 mrs_bool
00312 GStreamerSource::pull_buffer() {
00313   GstSample *sample = gst_app_sink_pull_sample(GST_APP_SINK(sink_));
00314 
00315   if(sample == NULL) {
00316     if(gst_app_sink_is_eos(GST_APP_SINK(sink_))) {
00317       /* EOS, stop pulling buffers */
00318       hasData_ = false;
00319       lastTickWithData_ = true;
00320 
00321       ctrl_hasData_->setValue(hasData_);
00322       ctrl_lastTickWithData_->setValue(lastTickWithData_);
00323       ctrl_currentHasData_->setValue(hasData_);
00324       ctrl_currentLastTickWithData_->setValue(lastTickWithData_);
00325       playing_ = false;
00326       return false;
00327     } else {
00328       /* Something messed up. Should we try pulling again? */
00329       MRSERR("GStreamer Error");
00330       return false;
00331     }
00332   } else {
00333     buffer_ = gst_sample_get_buffer(sample);
00334     buffer_size_ = gst_buffer_get_size(buffer_) / sizeof(mrs_real) / onObservations_;
00335     buffer_left_ = buffer_size_;
00336   }
00337 
00338   return true;
00339 }
00340 
00341 
00342 void
00343 GStreamerSource::myProcess(realvec& in, realvec& out)
00344 {
00345   (void)in; // Suppress warning from unused parameter
00346 
00347   // TODO Should output_size be calculated from output.getCols() or in/onSamples_?
00348   mrs_natural output_size = inSamples_;
00349   mrs_natural output_left = output_size;
00350   //mrs_real *output = out.getData();
00351 
00352   while(output_left > 0) {
00353     if(buffer_left_ == 0) {
00354       /* Current buffer empty, so pull a new one */
00355       if(!pull_buffer()) {
00356         break;
00357       }
00358     }
00359 
00360     if(buffer_left_ >= output_left) {
00361       /* Copy enough to fill the output */
00362       /* GSt_BUFFER_DATA() returns guint8*, i.e. a byte-pointer, while output it a double-pointer */
00363       // memcpy(output + output_size - output_left,
00364       //        GST_BUFFER_DATA(buffer_) + (buffer_size_ - buffer_left_)*sizeof(mrs_real),
00365       //        output_left*sizeof(mrs_real));
00366       copyFromBuffer(buffer_, buffer_size_-buffer_left_, out, output_size-output_left, output_left);
00367       buffer_left_ -= output_left;
00368       output_left = 0;
00369     } else {
00370       /* Copy what's left in the buffer */
00371       // memcpy(output + (output_size - output_left),
00372       //        GST_BUFFER_DATA(buffer_) + (buffer_size_ - buffer_left_)*sizeof(mrs_real),
00373       //        buffer_left_*sizeof(mrs_real));
00374       copyFromBuffer(buffer_, buffer_size_-buffer_left_, out, output_size-output_left, buffer_left_);
00375       output_left -= buffer_left_;
00376       buffer_left_ = 0;
00377     }
00378 
00379     if(buffer_left_ == 0 && buffer_ != NULL) {
00380       /* Buffer empty, unref it */
00381       gst_buffer_unref(buffer_);
00382     }
00383   }
00384 
00385   // /* Make sure unfilled output is zeroed (TODO is this necessary?) */
00386   // mrs_natural i;
00387   // for(i = output_size-1; i >= output_size-output_left; --i) {
00388   //    out(0,i) = 0.0;
00389   // }
00390 
00391   /* Update current position */
00392   pos_ += output_size - output_left;
00393   setctrl("mrs_natural/pos", pos_);
00394 }
00395 
00396 mrs_bool
00397 GStreamerSource::seek()
00398 {
00399   GstSeekFlags flags = (GstSeekFlags)(GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_KEY_UNIT);
00400   gint64 seek_to = (gint64)(newpos_/osrate_*1e9);
00401   if(seek_to >= 0) {
00402     gst_element_seek_simple(GST_ELEMENT(pipe_), GST_FORMAT_TIME, flags, seek_to);
00403   } else {
00404     MRSWARN("Tried to seek to negative position");
00405     return false;
00406   }
00407 
00408   GstStateChangeReturn res = gst_element_get_state(pipe_, NULL, NULL, GST_SECOND);
00409   if(res != GST_STATE_CHANGE_SUCCESS) {
00410     MRSERR("Seek failed!");
00411     return false;
00412   }
00413 
00414   gint64 cur;
00415   GstFormat format = GST_FORMAT_TIME;
00416   if(!gst_element_query_position(GST_ELEMENT(pipe_), format, &cur)) {
00417     MRSERR("Position query failed!");
00418     return false;
00419   }
00420 
00421   pos_ = cur*1e-9*osrate_;
00422   buffer_left_ = 0;
00423   return true;
00424 }
00425 
00426 void
00427 GStreamerSource::myUpdate(MarControlPtr sender)
00428 {
00429   (void) sender; // Suppress warning from unused parameter
00430 
00431   // This stuff is already done in MarSystem::update()
00432   //inSamples_ = getctrl("mrs_natural/inSamples")->to<mrs_natural>();
00433   //inObservations_ = getctrl("mrs_natural/inObservations")->to<mrs_natural>();
00434   //israte_ = getctrl("mrs_real/israte")->to<mrs_real>();
00435 
00436   /* This seems to be the convention, to ignore/overwrite the onSamples control */
00437   setctrl("mrs_natural/onSamples", inSamples_);
00438 
00439   tfilename_ = filename_;
00440   filename_ = getctrl("mrs_string/filename")->to<mrs_string>();
00441 
00442   /* Check if filename was changed */
00443   if(g_strcmp0(tfilename_.c_str(), filename_.c_str()) != 0) {
00444     getHeader(filename_);
00445   }
00446 
00447   /* Only deal with pos_ if there's a playing stream */
00448   if(playing_) {
00449     newpos_ = getctrl("mrs_natural/pos")->to<mrs_natural>();
00450 
00451     if(newpos_ != pos_) {
00452       /* Position control was written to, try to seek */
00453       seek();
00454     }
00455 
00456     /* Write current (possibly updated) position back to the control */
00457     setctrl("mrs_natural/pos", pos_);
00458   }
00459 }
00460