Marsyas
0.6.0-alpha
|
00001 /* 00002 ** Copyright (C) 2014 George Tzanetakis <gtzan@cs.uvic.ca> 00003 ** 00004 ** This program is free software; you can redistribute it and/or modify 00005 ** it under the terms of the GNU General Public License as published by 00006 ** the Free Software Foundation; either version 2 of the License, or 00007 ** (at your option) any later version. 00008 ** 00009 ** This program is distributed in the hope that it will be useful, 00010 ** but WITHOUT ANY WARRANTY; without even the implied warranty of 00011 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00012 ** GNU General Public License for more details. 00013 ** 00014 ** You should have received a copy of the GNU General Public License 00015 ** along with this program; if not, write to the Free Software 00016 ** Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. 00017 */ 00018 00019 #include "udp_receiver.h" 00020 #include "packet_queue.h" 00021 #include <marsyas/common_source.h> 00022 00023 #include <oscpack/ip/UdpSocket.h> 00024 #include <oscpack/ip/PacketListener.h> 00025 00026 #include <memory> 00027 #include <cassert> 00028 #include <iostream> 00029 00030 using namespace std; 00031 00032 namespace Marsyas { 00033 namespace RealTime { 00034 00035 class UdpReceiver::Implementation : public PacketListener 00036 { 00037 packet_queue *m_queue; 00038 SocketReceiveMultiplexer m_mux; 00039 00040 public: 00041 Implementation( packet_queue *queue ): 00042 m_queue(queue) 00043 { 00044 } 00045 00046 void run( const std::string & address, int port ) 00047 { 00048 std::unique_ptr<UdpSocket> socket; 00049 try { 00050 socket.reset(new UdpSocket); 00051 socket->Bind( IpEndpointName( address.c_str(), port ) ); 00052 } 00053 catch (std::exception & e) 00054 { 00055 MRSERR("UDP receiver: Failed to set up UDP socket. Port already in use?"); 00056 return; 00057 } 00058 00059 m_mux.AttachSocketListener( socket.get(), this ); 00060 m_mux.Run(); 00061 m_mux.DetachSocketListener( socket.get(), this ); 00062 } 00063 00064 void stop() 00065 { 00066 m_mux.AsynchronousBreak(); 00067 } 00068 00069 private: 00070 00071 void ProcessPacket( const char *data, int size, 00072 const IpEndpointName& ) 00073 { 00074 m_queue->push(data, size); 00075 } 00076 }; 00077 00078 UdpReceiver::UdpReceiver( const std::string & address, int port, 00079 size_t queue_size ): 00080 OscQueueProvider(&m_queue), 00081 m_address(address), 00082 m_port(port), 00083 m_queue(queue_size), 00084 m_implementation(nullptr) 00085 { 00086 } 00087 00088 UdpReceiver::~UdpReceiver() 00089 { 00090 if (running()) 00091 stop(); 00092 } 00093 00094 void UdpReceiver::start() 00095 { 00096 if (m_implementation) 00097 { 00098 MRSERR("UdpReceiver: Can not start: already running."); 00099 return; 00100 } 00101 m_implementation = new Implementation(&m_queue); 00102 m_thread = thread(&Implementation::run, m_implementation, m_address, m_port); 00103 } 00104 00105 void UdpReceiver::stop() 00106 { 00107 if (!m_implementation) 00108 { 00109 MRSERR("UdpReceiver: Can not stop: not running."); 00110 return; 00111 } 00112 m_implementation->stop(); 00113 m_thread.join(); 00114 delete m_implementation; 00115 m_implementation = nullptr; 00116 } 00117 00118 } 00119 }