Marsyas  0.6.0-alpha
/usr/src/RPM/BUILD/marsyas-0.6.0/src/marsyas/realtime/queue.h
Go to the documentation of this file.
00001 /*
00002 ** Copyright (C) 2013-2014 George Tzanetakis <gtzan@cs.uvic.ca>
00003 **
00004 ** This program is free software; you can redistribute it and/or modify
00005 ** it under the terms of the GNU General Public License as published by
00006 ** the Free Software Foundation; either version 2 of the License, or
00007 ** (at your option) any later version.
00008 **
00009 ** This program is distributed in the hope that it will be useful,
00010 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
00011 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012 ** GNU General Public License for more details.
00013 **
00014 ** You should have received a copy of the GNU General Public License
00015 ** along with this program; if not, write to the Free Software
00016 ** Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
00017 */
00018 
00019 #ifndef MARSYAS_REALTIME_QUEUE_INCLUDED
00020 #define MARSYAS_REALTIME_QUEUE_INCLUDED
00021 
00022 #include <atomic>
00023 #include <vector>
00024 #include <cstddef>
00025 #include <cassert>
00026 
00027 namespace Marsyas {
00028 namespace RealTime {
00029 
00030 using std::atomic;
00031 using std::vector;
00032 using std::size_t;
00033 
00039 template<typename T> class queue_producer;
00040 template<typename T> class queue_consumer;
00041 
00042 template<typename T>
00043 class queue
00044 {
00045   typedef atomic<size_t> atomic_size_t;
00046   typedef vector<T> vector_t;
00047 
00048   vector_t m_vector;
00049   atomic_size_t m_write_index;
00050   atomic_size_t m_read_index;
00051 
00052 public:
00053   friend class queue_producer<T>;
00054   friend class queue_consumer<T>;
00055 
00056   queue( size_t size ):
00057     m_vector(size),
00058     m_write_index(0),
00059     m_read_index(0)
00060   {}
00061 
00062   void clear()
00063   {
00064     m_read_index = m_write_index = 0;
00065   }
00066 
00067   bool push( const T & source )
00068   {
00069     if (write_capacity() < 1)
00070       return false;
00071 
00072     size_t write_index = m_write_index.load( std::memory_order_relaxed );
00073     m_vector[write_index] = source;
00074     m_write_index.store( (write_index + 1) % m_vector.size(),
00075                          std::memory_order_release );
00076     return true;
00077   }
00078 
00079   bool push( const T * source, size_t count )
00080   {
00081     if (write_capacity() < count)
00082       return false;
00083 
00084     size_t write_index = m_write_index.load( std::memory_order_relaxed );
00085     size_t source_index = 0;
00086     size_t queue_size = m_vector.size();
00087 
00088     while( source_index < count && write_index < queue_size )
00089     {
00090       m_vector[write_index] = source[source_index];
00091       ++source_index;
00092       ++write_index;
00093     }
00094 
00095     if ( write_index == queue_size )
00096       write_index = 0;
00097 
00098     while( source_index < count )
00099     {
00100       m_vector[write_index] = source[source_index];
00101       ++source_index;
00102       ++write_index;
00103     }
00104 
00105     m_write_index.store( write_index, std::memory_order_release );
00106 
00107     return true;
00108   }
00109 
00110   bool pop( T & destination )
00111   {
00112     if (read_capacity() < 1)
00113       return false;
00114 
00115     size_t read_index = m_read_index.load( std::memory_order_relaxed );
00116     destination = m_vector[read_index];
00117     m_read_index.store( (read_index + 1) % m_vector.size(),
00118                         std::memory_order_relaxed );
00119     return true;
00120   }
00121 
00122   bool pop( T * destination, size_t count )
00123   {
00124     if (read_capacity() < count)
00125       return false;
00126 
00127     size_t read_index = m_read_index.load( std::memory_order_relaxed );
00128     size_t dest_index = 0;
00129     size_t queue_size = m_vector.size();
00130 
00131     while ( dest_index < count && read_index < queue_size )
00132     {
00133       destination[dest_index] = m_vector[read_index];
00134       ++dest_index;
00135       ++read_index;
00136     }
00137 
00138     if (read_index == queue_size)
00139       read_index = 0;
00140 
00141     while ( dest_index < count )
00142     {
00143       destination[dest_index] = m_vector[read_index];
00144       ++dest_index;
00145       ++read_index;
00146     }
00147 
00148     m_read_index.store( read_index, std::memory_order_relaxed );
00149 
00150     return true;
00151   }
00152 
00153   size_t capacity()
00154   {
00155     return m_vector.size();
00156   }
00157 
00158   size_t write_capacity()
00159   {
00160     size_t read_pos = m_read_index.load(std::memory_order_relaxed);
00161     size_t write_pos = m_write_index.load(std::memory_order_relaxed);
00162     size_t capacity = m_vector.size();
00163 
00164     size_t available;
00165     if (write_pos >= read_pos)
00166       available = capacity - (write_pos - read_pos);
00167     else
00168       available = read_pos - write_pos;
00169     available -= 1;
00170     return available;
00171   }
00172 
00173   size_t read_capacity()
00174   {
00175     size_t read_pos = m_read_index.load(std::memory_order_relaxed);
00176     size_t write_pos = m_write_index.load(std::memory_order_acquire);
00177     size_t capacity = m_vector.size();
00178 
00179     size_t available;
00180     if (write_pos >= read_pos)
00181       available = write_pos - read_pos;
00182     else
00183       available = capacity - (read_pos - write_pos);
00184     return available;
00185   }
00186 };
00187 
00188 
00189 template <typename T>
00190 class queue_producer
00191 {
00192   queue<T> & m_queue;
00193   size_t m_capacity;
00194   size_t m_position;
00195 
00196 public:
00197   queue_producer(queue<T> & destination, size_t capacity):
00198     m_queue( destination ),
00199     m_position( destination.m_write_index.load(std::memory_order_relaxed) )
00200   {
00201     if (destination.write_capacity() < capacity)
00202       m_capacity = 0;
00203     else
00204       m_capacity = capacity;
00205   }
00206 
00207   ~queue_producer()
00208   {
00209     if (m_capacity > 0)
00210     {
00211       size_t new_write_index = (m_position + m_capacity) % m_queue.capacity();
00212       m_queue.m_write_index.store(new_write_index, std::memory_order_release);
00213     }
00214   }
00215 
00216   size_t capacity() { return m_capacity; }
00217 
00218   bool reserve( size_t capacity )
00219   {
00220     if (m_queue.write_capacity() < capacity)
00221       return false;
00222     m_capacity = capacity;
00223     return true;
00224   }
00225 
00226   T & operator [] ( size_t position )
00227   {
00228     assert(position < m_capacity);
00229     size_t queue_position = (m_position + position) % m_queue.capacity();
00230     return m_queue.m_vector[queue_position];
00231   }
00232 
00233   void write ( size_t position, const T * source, size_t count )
00234   {
00235     assert(position + count <= m_capacity);
00236 
00237     size_t queue_capacity = m_queue.capacity();
00238     size_t write_index = m_position + position;
00239     size_t source_index = 0;
00240 
00241     while( source_index < count && write_index < queue_capacity )
00242     {
00243       m_queue.m_vector[write_index] = source[source_index];
00244       ++source_index;
00245       ++write_index;
00246     }
00247 
00248     write_index = write_index % queue_capacity;
00249 
00250     while( source_index < count )
00251     {
00252       m_queue.m_vector[write_index] = source[source_index];
00253       ++source_index;
00254       ++write_index;
00255     }
00256   }
00257 
00258   void abort()
00259   {
00260     reserve(0);
00261   }
00262 };
00263 
00264 template<typename T>
00265 class queue_consumer
00266 {
00267   queue<T> & m_queue;
00268   size_t m_capacity;
00269   size_t m_position;
00270 
00271 public:
00272 
00273   queue_consumer(queue<T> & source, size_t capacity):
00274     m_queue(source),
00275     m_position( source.m_read_index.load(std::memory_order_relaxed) )
00276   {
00277     if (source.read_capacity() < capacity)
00278       m_capacity = 0;
00279     else
00280       m_capacity = capacity;
00281   }
00282 
00283   ~queue_consumer()
00284   {
00285     if (m_capacity > 0) {
00286       size_t new_read_index = (m_position + m_capacity) % m_queue.capacity();
00287       m_queue.m_read_index.store(new_read_index, std::memory_order_release);
00288     }
00289   }
00290 
00291   size_t capacity() { return m_capacity; }
00292 
00293   bool reserve( size_t capacity )
00294   {
00295     if (m_queue.read_capacity() < capacity)
00296       return false;
00297     m_capacity = capacity;
00298     return true;
00299   }
00300 
00301   T & operator[] ( size_t position )
00302   {
00303     assert(position < m_capacity);
00304     size_t queue_position = (m_position + position) % m_queue.capacity();
00305     return m_queue.m_vector[queue_position];
00306   }
00307 
00308   void read ( size_t position, T * destination, size_t count )
00309   {
00310     assert(position + count <= m_capacity);
00311 
00312     size_t queue_capacity = m_queue.capacity();
00313     size_t read_index = m_position + position;
00314     size_t dest_index = 0;
00315 
00316     while ( dest_index < count && read_index < queue_capacity )
00317     {
00318       destination[dest_index] = m_queue.m_vector[read_index];
00319       ++dest_index;
00320       ++read_index;
00321     }
00322 
00323     read_index = read_index % queue_capacity;
00324 
00325     while ( dest_index < count )
00326     {
00327       destination[dest_index] = m_queue.m_vector[read_index];
00328       ++dest_index;
00329       ++read_index;
00330     }
00331   }
00332 
00333   void abort()
00334   {
00335     reserve(0);
00336   }
00337 };
00338 
00339 }
00340 }
00341 
00342 #endif