Marsyas  0.6.0-alpha
/usr/src/RPM/BUILD/marsyas-0.6.0/src/marsyas/realtime/runner.cpp
Go to the documentation of this file.
00001 /*
00002 ** Copyright (C) 1998-2013 George Tzanetakis <gtzan@cs.uvic.ca>
00003 **
00004 ** This program is free software; you can redistribute it and/or modify
00005 ** it under the terms of the GNU General Public License as published by
00006 ** the Free Software Foundation; either version 2 of the License, or
00007 ** (at your option) any later version.
00008 **
00009 ** This program is distributed in the hope that it will be useful,
00010 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
00011 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012 ** GNU General Public License for more details.
00013 **
00014 ** You should have received a copy of the GNU General Public License
00015 ** along with this program; if not, write to the Free Software
00016 ** Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
00017 */
00018 
00019 #include "osc_transmitter.h"
00020 #include "osc_receiver.h"
00021 
00022 #include <marsyas/realtime/runner.h>
00023 #include <marsyas/realtime/atomic_control.h>
00024 #include <marsyas/realtime/packet_queue.h>
00025 #include <marsyas/common_header.h>
00026 
00027 #include "../common_source.h"
00028 
00029 #include <iostream>
00030 #include <algorithm>
00031 #include <functional>
00032 #include <memory>
00033 #include <map>
00034 #include <vector>
00035 #include <cstring>
00036 #include <cerrno>
00037 
00038 #if defined(MARSYAS_MACOSX) || defined(MARSYAS_LINUX) || defined(MARSYAS_MINGW)
00039 #  include <pthread.h>
00040 #elif defined(MARSYAS_WIN32)
00041 #  include <windows.h>
00042 #endif
00043 
00044 using namespace std;
00045 
00046 namespace Marsyas {
00047 namespace RealTime {
00048 
00049 class RunnerThread
00050 {
00051 public:
00052   RunnerThread( MarSystem * system, Runner::Shared * shared,
00053                 bool realtime_priority, unsigned int ticks ):
00054     m_system(system),
00055     m_shared(shared),
00056     m_ticks(ticks > 0 ? ticks : -1),
00057     m_stop(false),
00058     m_thread(&Marsyas::RealTime::RunnerThread::run, this)
00059   {
00060 #if defined(MARSYAS_MACOSX) || defined(MARSYAS_LINUX) || defined(MARSYAS_MINGW)
00061     int policy;
00062     sched_param param;
00063     pthread_getschedparam( m_thread.native_handle(), &policy, &param );
00064 
00065     policy = realtime_priority ? SCHED_RR : SCHED_OTHER;
00066     int min_priority = sched_get_priority_min( policy );
00067     int max_priority = sched_get_priority_max( policy );
00068     int relative_priority = (int) ((max_priority - min_priority) * 0.6);
00069     int priority = min_priority + relative_priority;
00070     param.sched_priority = priority;
00071 
00072     if (pthread_setschedparam( m_thread.native_handle(), policy, &param ))
00073     {
00074       MRSWARN("RunnerThread: Failed to set thread scheduling policy and priority: "
00075               << std::strerror(errno));
00076     }
00077 #elif defined(MARSYAS_WIN32)
00078     if (!SetThreadPriority( m_thread.native_handle(), THREAD_PRIORITY_HIGHEST ))
00079     {
00080       MRSWARN("RunnerThread: Failed to set thread priority.");
00081     }
00082 #else
00083     MRSWARN("RunnerThread: Increasing thread priority on this platform is not implemented yet.");
00084 #endif
00085   }
00086 
00087   void stop()
00088   {
00089     m_stop = true;
00090   }
00091 
00092   void join()
00093   {
00094     m_thread.join();
00095   }
00096 
00097 private:
00098   void run();
00099   void process_requests();
00100 
00101   Marsyas::MarSystem *m_system;
00102 
00103   Runner::Shared * m_shared;
00104 
00105   int m_ticks;
00106   std::atomic<bool> m_stop;
00107   std::thread m_thread;
00108 };
00109 
00110 static any get_control_value( const MarControlPtr & control )
00111 {
00112   std::string control_type = control->getType();
00113   if(control_type == "mrs_bool")
00114     return any( control->to<bool>() );
00115   else if(control_type == "mrs_real")
00116     return any( control->to<mrs_real>() );
00117   else if(control_type == "mrs_natural")
00118     return any( control->to<mrs_natural>() );
00119   else if(control_type == "mrs_string")
00120     return any( control->to<mrs_string>() );
00121   else if(control_type == "mrs_realvec")
00122     return any( control->to<mrs_realvec>() );
00123   else {
00124     MRSERR(
00125       "Marsyas::Thread::System:: Can not get control value - unsupported type: "
00126       << control_type.c_str()
00127     );
00128     return any();
00129   }
00130 }
00131 
00132 Runner::Runner(Marsyas::MarSystem * system):
00133   m_system(system),
00134   m_realtime_priority(false),
00135   m_osc_receiver(system),
00136   m_osc_transmitter(system),
00137   m_thread(0),
00138   m_shared(new Shared(&m_osc_receiver))
00139 {
00140 }
00141 
00142 Runner::~Runner()
00143 {
00144   stop();
00145 
00146   delete m_shared;
00147 }
00148 
00149 void Runner::start(unsigned int ticks)
00150 {
00151   if (!m_thread)
00152   {
00153     refit_realvec_controls();
00154 
00155     m_thread = new RunnerThread(m_system, m_shared, m_realtime_priority, ticks);
00156   }
00157 }
00158 
00159 void Runner::stop()
00160 {
00161   if (m_thread)
00162   {
00163     m_thread->stop();
00164     m_thread->join();
00165 
00166     delete m_thread;
00167     m_thread = 0;
00168   }
00169 }
00170 
00171 void Runner::wait()
00172 {
00173   if (m_thread)
00174   {
00175     m_thread->join();
00176 
00177     delete m_thread;
00178     m_thread = 0;
00179   }
00180 }
00181 
00182 void Runner::addController( OscProvider * controller )
00183 {
00184   if (isRunning())
00185   {
00186     MRSERR("Runner: can not add OSC controller while running.");
00187     return;
00188   }
00189 
00190   m_osc_receiver.addProvider(controller);
00191 }
00192 
00193 void Runner::removeController( OscProvider * controller )
00194 {
00195   if (isRunning())
00196   {
00197     MRSERR("Runner: can not remove OSC controller while running.");
00198     return;
00199   }
00200 
00201   m_osc_receiver.removeProvider(controller);
00202 }
00203 
00204 bool Runner::subscribe( const std::string & path, OscSubscriber *subscriber )
00205 {
00206   if (isRunning())
00207   {
00208     MRSERR("Runner: can not add OSC subscriptions while running.");
00209     return false;
00210   }
00211 
00212   return m_osc_transmitter.subscribe( path, subscriber );
00213 }
00214 
00215 void Runner::unsubscribe( const std::string & path,  OscSubscriber *subscriber )
00216 {
00217   if (isRunning())
00218   {
00219     MRSERR("Runner: can not remove OSC subscriptions while running.");
00220     return;
00221   }
00222 
00223   m_osc_transmitter.unsubscribe( path, subscriber );
00224 }
00225 
00226 Control * Runner::control( const std::string & path )
00227 {
00228   std::map<std::string, Control*>::iterator it = m_shared->controls.find(path);
00229   if (it != m_shared->controls.end())
00230     return it->second;
00231   else
00232     return create_control( path );
00233 }
00234 
00235 Control * Runner::create_control( const std::string & control_path )
00236 {
00237   if (isRunning()) {
00238     MRSERR("Marsyas::Thread::System:: can not start tracking controls while running.");
00239     return 0;
00240   }
00241 
00242   Marsyas::MarControlPtr sys_control = m_system->getControl( control_path );
00243   if (sys_control.isInvalid()) {
00244     MRSERR("Marsyas::Thread::System:: Can not track control - invalid path: " << control_path);
00245     return 0;
00246   }
00247 
00248   AtomicControl *atomic_control;
00249 
00250   std::string sys_control_type = sys_control->getType();
00251   if(sys_control_type == "mrs_bool")
00252     atomic_control = new AtomicControlT<bool>(sys_control);
00253   else if(sys_control_type == "mrs_real")
00254     atomic_control = new AtomicControlT<mrs_real>(sys_control);
00255   else if(sys_control_type == "mrs_natural")
00256     atomic_control =  new AtomicControlT<mrs_natural>(sys_control);
00257   else if(sys_control_type == "mrs_string")
00258     atomic_control = new AtomicControlT<mrs_string>(sys_control);
00259   else if(sys_control_type == "mrs_realvec")
00260     atomic_control = new AtomicControlT<mrs_realvec>(sys_control);
00261   else {
00262     MRSERR(
00263       "Marsyas::Thread::System:: Can not track control - unsupported type: "
00264       << sys_control_type.c_str()
00265     );
00266     return 0;
00267   }
00268 
00269   Control * control = new Control(this, control_path, atomic_control);
00270 
00271   m_shared->controls.insert(std::pair<std::string, Control*>(control_path, control));
00272 
00273   return control;
00274 }
00275 
00276 void Runner::refit_realvec_controls()
00277 {
00278 for (const auto & mapping : m_shared->controls)
00279   {
00280     Control *control = mapping.second;
00281     control->resizeToFit();
00282   }
00283 }
00284 
00286 
00287 static void init_audio_recursive( MarSystem * system )
00288 {
00289   MarControlPtr init_audio_control = system->getControl("mrs_bool/initAudio");
00290   if (!init_audio_control.isInvalid())
00291     init_audio_control->setValue(true);
00292   for ( MarSystem * child : system->children() )
00293     init_audio_recursive( child );
00294 }
00295 
00296 void RunnerThread::run()
00297 {
00298   //cout << "MarSystemThread: running" << endl;
00299 
00300   init_audio_recursive(m_system);
00301 
00302   m_system->updControl("mrs_bool/active", true);
00303 
00304   MarControlPtr done_control = m_system->getControl("mrs_bool/done");
00305   function<bool()> not_system_done;
00306   if (done_control.isInvalid())
00307     not_system_done = [](){return true;};
00308   else
00309     not_system_done = [&done_control](){return !done_control->to<mrs_bool>();};
00310 
00311   int ticks_done = 0;
00312   int ticks_left = m_ticks;
00313 
00314   while ( !m_stop &&
00315           ticks_left &&
00316           not_system_done() )
00317   {
00318     //cout << "tick" << endl;
00319 
00320     m_shared->controller->run();
00321 
00322     m_system->tick();
00323 
00324     for (const auto & mapping : m_shared->controls)
00325       mapping.second->push();
00326 
00327     if (ticks_left > 0)
00328       --ticks_left;
00329     ++ticks_done;
00330   }
00331 
00332 #if 0
00333   if (done_control->to<mrs_bool>())
00334   {
00335     cout << "done in " << ticks_done << " ticks " << endl;
00336   }
00337   else if (ticks_left == 0)
00338   {
00339     cout << "timeout!" << endl;
00340   }
00341 #endif
00342 
00343   m_system->updControl("mrs_bool/active", false);
00344 
00345   // make sure not to miss latest updates:
00346   m_shared->controller->run();
00347 }
00348 
00349 
00351 
00352 any Control::value() const
00353 {
00354   if (m_runner->isRunning())
00355     return m_atomic->value();
00356   else
00357     return get_control_value( m_atomic->systemControl() );
00358 }
00359 
00360 } // namespace RealTime
00361 } // namespace Marsyas