Marsyas  0.6.0-alpha
/usr/src/RPM/BUILD/marsyas-0.6.0/src/marsyas/realtime/udp_receiver.cpp
Go to the documentation of this file.
00001 /*
00002 ** Copyright (C) 2014 George Tzanetakis <gtzan@cs.uvic.ca>
00003 **
00004 ** This program is free software; you can redistribute it and/or modify
00005 ** it under the terms of the GNU General Public License as published by
00006 ** the Free Software Foundation; either version 2 of the License, or
00007 ** (at your option) any later version.
00008 **
00009 ** This program is distributed in the hope that it will be useful,
00010 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
00011 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012 ** GNU General Public License for more details.
00013 **
00014 ** You should have received a copy of the GNU General Public License
00015 ** along with this program; if not, write to the Free Software
00016 ** Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
00017 */
00018 
00019 #include "udp_receiver.h"
00020 #include "packet_queue.h"
00021 #include <marsyas/common_source.h>
00022 
00023 #include <oscpack/ip/UdpSocket.h>
00024 #include <oscpack/ip/PacketListener.h>
00025 
00026 #include <memory>
00027 #include <cassert>
00028 #include <iostream>
00029 
00030 using namespace std;
00031 
00032 namespace Marsyas {
00033 namespace RealTime {
00034 
00035 class UdpReceiver::Implementation : public PacketListener
00036 {
00037   packet_queue *m_queue;
00038   SocketReceiveMultiplexer m_mux;
00039 
00040 public:
00041   Implementation( packet_queue *queue ):
00042     m_queue(queue)
00043   {
00044   }
00045 
00046   void run( const std::string & address, int port )
00047   {
00048     std::unique_ptr<UdpSocket> socket;
00049     try {
00050       socket.reset(new UdpSocket);
00051       socket->Bind( IpEndpointName( address.c_str(), port ) );
00052     }
00053     catch (std::exception & e)
00054     {
00055       MRSERR("UDP receiver: Failed to set up UDP socket. Port already in use?");
00056       return;
00057     }
00058 
00059     m_mux.AttachSocketListener( socket.get(), this );
00060     m_mux.Run();
00061     m_mux.DetachSocketListener( socket.get(), this );
00062   }
00063 
00064   void stop()
00065   {
00066     m_mux.AsynchronousBreak();
00067   }
00068 
00069 private:
00070 
00071   void ProcessPacket( const char *data, int size,
00072                       const IpEndpointName& )
00073   {
00074     m_queue->push(data, size);
00075   }
00076 };
00077 
00078 UdpReceiver::UdpReceiver( const std::string & address, int port,
00079                           size_t queue_size ):
00080   OscQueueProvider(&m_queue),
00081   m_address(address),
00082   m_port(port),
00083   m_queue(queue_size),
00084   m_implementation(nullptr)
00085 {
00086 }
00087 
00088 UdpReceiver::~UdpReceiver()
00089 {
00090   if (running())
00091     stop();
00092 }
00093 
00094 void UdpReceiver::start()
00095 {
00096   if (m_implementation)
00097   {
00098     MRSERR("UdpReceiver: Can not start: already running.");
00099     return;
00100   }
00101   m_implementation = new Implementation(&m_queue);
00102   m_thread = thread(&Implementation::run, m_implementation, m_address, m_port);
00103 }
00104 
00105 void UdpReceiver::stop()
00106 {
00107   if (!m_implementation)
00108   {
00109     MRSERR("UdpReceiver: Can not stop: not running.");
00110     return;
00111   }
00112   m_implementation->stop();
00113   m_thread.join();
00114   delete m_implementation;
00115   m_implementation = nullptr;
00116 }
00117 
00118 }
00119 }