Marsyas
0.6.0-alpha
|
00001 /* 00002 ** Copyright (C) 2013-2014 George Tzanetakis <gtzan@cs.uvic.ca> 00003 ** 00004 ** This program is free software; you can redistribute it and/or modify 00005 ** it under the terms of the GNU General Public License as published by 00006 ** the Free Software Foundation; either version 2 of the License, or 00007 ** (at your option) any later version. 00008 ** 00009 ** This program is distributed in the hope that it will be useful, 00010 ** but WITHOUT ANY WARRANTY; without even the implied warranty of 00011 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00012 ** GNU General Public License for more details. 00013 ** 00014 ** You should have received a copy of the GNU General Public License 00015 ** along with this program; if not, write to the Free Software 00016 ** Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. 00017 */ 00018 00019 #ifndef MARSYAS_REALTIME_QUEUE_INCLUDED 00020 #define MARSYAS_REALTIME_QUEUE_INCLUDED 00021 00022 #include <atomic> 00023 #include <vector> 00024 #include <cstddef> 00025 #include <cassert> 00026 00027 namespace Marsyas { 00028 namespace RealTime { 00029 00030 using std::atomic; 00031 using std::vector; 00032 using std::size_t; 00033 00039 template<typename T> class queue_producer; 00040 template<typename T> class queue_consumer; 00041 00042 template<typename T> 00043 class queue 00044 { 00045 typedef atomic<size_t> atomic_size_t; 00046 typedef vector<T> vector_t; 00047 00048 vector_t m_vector; 00049 atomic_size_t m_write_index; 00050 atomic_size_t m_read_index; 00051 00052 public: 00053 friend class queue_producer<T>; 00054 friend class queue_consumer<T>; 00055 00056 queue( size_t size ): 00057 m_vector(size), 00058 m_write_index(0), 00059 m_read_index(0) 00060 {} 00061 00062 void clear() 00063 { 00064 m_read_index = m_write_index = 0; 00065 } 00066 00067 bool push( const T & source ) 00068 { 00069 if (write_capacity() < 1) 00070 return false; 00071 00072 size_t write_index = m_write_index.load( std::memory_order_relaxed ); 00073 m_vector[write_index] = source; 00074 m_write_index.store( (write_index + 1) % m_vector.size(), 00075 std::memory_order_release ); 00076 return true; 00077 } 00078 00079 bool push( const T * source, size_t count ) 00080 { 00081 if (write_capacity() < count) 00082 return false; 00083 00084 size_t write_index = m_write_index.load( std::memory_order_relaxed ); 00085 size_t source_index = 0; 00086 size_t queue_size = m_vector.size(); 00087 00088 while( source_index < count && write_index < queue_size ) 00089 { 00090 m_vector[write_index] = source[source_index]; 00091 ++source_index; 00092 ++write_index; 00093 } 00094 00095 if ( write_index == queue_size ) 00096 write_index = 0; 00097 00098 while( source_index < count ) 00099 { 00100 m_vector[write_index] = source[source_index]; 00101 ++source_index; 00102 ++write_index; 00103 } 00104 00105 m_write_index.store( write_index, std::memory_order_release ); 00106 00107 return true; 00108 } 00109 00110 bool pop( T & destination ) 00111 { 00112 if (read_capacity() < 1) 00113 return false; 00114 00115 size_t read_index = m_read_index.load( std::memory_order_relaxed ); 00116 destination = m_vector[read_index]; 00117 m_read_index.store( (read_index + 1) % m_vector.size(), 00118 std::memory_order_relaxed ); 00119 return true; 00120 } 00121 00122 bool pop( T * destination, size_t count ) 00123 { 00124 if (read_capacity() < count) 00125 return false; 00126 00127 size_t read_index = m_read_index.load( std::memory_order_relaxed ); 00128 size_t dest_index = 0; 00129 size_t queue_size = m_vector.size(); 00130 00131 while ( dest_index < count && read_index < queue_size ) 00132 { 00133 destination[dest_index] = m_vector[read_index]; 00134 ++dest_index; 00135 ++read_index; 00136 } 00137 00138 if (read_index == queue_size) 00139 read_index = 0; 00140 00141 while ( dest_index < count ) 00142 { 00143 destination[dest_index] = m_vector[read_index]; 00144 ++dest_index; 00145 ++read_index; 00146 } 00147 00148 m_read_index.store( read_index, std::memory_order_relaxed ); 00149 00150 return true; 00151 } 00152 00153 size_t capacity() 00154 { 00155 return m_vector.size(); 00156 } 00157 00158 size_t write_capacity() 00159 { 00160 size_t read_pos = m_read_index.load(std::memory_order_relaxed); 00161 size_t write_pos = m_write_index.load(std::memory_order_relaxed); 00162 size_t capacity = m_vector.size(); 00163 00164 size_t available; 00165 if (write_pos >= read_pos) 00166 available = capacity - (write_pos - read_pos); 00167 else 00168 available = read_pos - write_pos; 00169 available -= 1; 00170 return available; 00171 } 00172 00173 size_t read_capacity() 00174 { 00175 size_t read_pos = m_read_index.load(std::memory_order_relaxed); 00176 size_t write_pos = m_write_index.load(std::memory_order_acquire); 00177 size_t capacity = m_vector.size(); 00178 00179 size_t available; 00180 if (write_pos >= read_pos) 00181 available = write_pos - read_pos; 00182 else 00183 available = capacity - (read_pos - write_pos); 00184 return available; 00185 } 00186 }; 00187 00188 00189 template <typename T> 00190 class queue_producer 00191 { 00192 queue<T> & m_queue; 00193 size_t m_capacity; 00194 size_t m_position; 00195 00196 public: 00197 queue_producer(queue<T> & destination, size_t capacity): 00198 m_queue( destination ), 00199 m_position( destination.m_write_index.load(std::memory_order_relaxed) ) 00200 { 00201 if (destination.write_capacity() < capacity) 00202 m_capacity = 0; 00203 else 00204 m_capacity = capacity; 00205 } 00206 00207 ~queue_producer() 00208 { 00209 if (m_capacity > 0) 00210 { 00211 size_t new_write_index = (m_position + m_capacity) % m_queue.capacity(); 00212 m_queue.m_write_index.store(new_write_index, std::memory_order_release); 00213 } 00214 } 00215 00216 size_t capacity() { return m_capacity; } 00217 00218 bool reserve( size_t capacity ) 00219 { 00220 if (m_queue.write_capacity() < capacity) 00221 return false; 00222 m_capacity = capacity; 00223 return true; 00224 } 00225 00226 T & operator [] ( size_t position ) 00227 { 00228 assert(position < m_capacity); 00229 size_t queue_position = (m_position + position) % m_queue.capacity(); 00230 return m_queue.m_vector[queue_position]; 00231 } 00232 00233 void write ( size_t position, const T * source, size_t count ) 00234 { 00235 assert(position + count <= m_capacity); 00236 00237 size_t queue_capacity = m_queue.capacity(); 00238 size_t write_index = m_position + position; 00239 size_t source_index = 0; 00240 00241 while( source_index < count && write_index < queue_capacity ) 00242 { 00243 m_queue.m_vector[write_index] = source[source_index]; 00244 ++source_index; 00245 ++write_index; 00246 } 00247 00248 write_index = write_index % queue_capacity; 00249 00250 while( source_index < count ) 00251 { 00252 m_queue.m_vector[write_index] = source[source_index]; 00253 ++source_index; 00254 ++write_index; 00255 } 00256 } 00257 00258 void abort() 00259 { 00260 reserve(0); 00261 } 00262 }; 00263 00264 template<typename T> 00265 class queue_consumer 00266 { 00267 queue<T> & m_queue; 00268 size_t m_capacity; 00269 size_t m_position; 00270 00271 public: 00272 00273 queue_consumer(queue<T> & source, size_t capacity): 00274 m_queue(source), 00275 m_position( source.m_read_index.load(std::memory_order_relaxed) ) 00276 { 00277 if (source.read_capacity() < capacity) 00278 m_capacity = 0; 00279 else 00280 m_capacity = capacity; 00281 } 00282 00283 ~queue_consumer() 00284 { 00285 if (m_capacity > 0) { 00286 size_t new_read_index = (m_position + m_capacity) % m_queue.capacity(); 00287 m_queue.m_read_index.store(new_read_index, std::memory_order_release); 00288 } 00289 } 00290 00291 size_t capacity() { return m_capacity; } 00292 00293 bool reserve( size_t capacity ) 00294 { 00295 if (m_queue.read_capacity() < capacity) 00296 return false; 00297 m_capacity = capacity; 00298 return true; 00299 } 00300 00301 T & operator[] ( size_t position ) 00302 { 00303 assert(position < m_capacity); 00304 size_t queue_position = (m_position + position) % m_queue.capacity(); 00305 return m_queue.m_vector[queue_position]; 00306 } 00307 00308 void read ( size_t position, T * destination, size_t count ) 00309 { 00310 assert(position + count <= m_capacity); 00311 00312 size_t queue_capacity = m_queue.capacity(); 00313 size_t read_index = m_position + position; 00314 size_t dest_index = 0; 00315 00316 while ( dest_index < count && read_index < queue_capacity ) 00317 { 00318 destination[dest_index] = m_queue.m_vector[read_index]; 00319 ++dest_index; 00320 ++read_index; 00321 } 00322 00323 read_index = read_index % queue_capacity; 00324 00325 while ( dest_index < count ) 00326 { 00327 destination[dest_index] = m_queue.m_vector[read_index]; 00328 ++dest_index; 00329 ++read_index; 00330 } 00331 } 00332 00333 void abort() 00334 { 00335 reserve(0); 00336 } 00337 }; 00338 00339 } 00340 } 00341 00342 #endif