SHOGUN
v3.2.0
|
00001 /* 00002 * This program is free software; you can redistribute it and/or modify 00003 * it under the terms of the GNU General Public License as published by 00004 * the Free Software Foundation; either version 3 of the License, or 00005 * (at your option) any later version. 00006 * 00007 * Written (W) 2011 Shashwat Lal Das 00008 * Copyright (C) 2011 Berlin Institute of Technology and Max-Planck-Society 00009 */ 00010 #ifndef __PARSEBUFFER_H__ 00011 #define __PARSEBUFFER_H__ 00012 00013 #include <shogun/lib/common.h> 00014 #ifdef HAVE_PTHREAD 00015 00016 #include <shogun/lib/DataType.h> 00017 #include <pthread.h> 00018 00019 namespace shogun 00020 { 00021 00024 enum E_IS_EXAMPLE_USED 00025 { 00026 E_EMPTY = 1, 00027 E_NOT_USED = 2, 00028 E_USED = 3 00029 }; 00030 00040 template <class T> 00041 class Example 00042 { 00043 public: 00045 float64_t label; 00047 T* fv; 00048 index_t length; 00049 }; 00050 00067 template <class T> class CParseBuffer: public CSGObject 00068 { 00069 public: 00075 CParseBuffer(int32_t size = 1024); 00076 00081 ~CParseBuffer(); 00082 00089 Example<T>* get_free_example() 00090 { 00091 pthread_mutex_lock(write_lock); 00092 pthread_mutex_lock(&ex_in_use_mutex[ex_write_index]); 00093 while (ex_used[ex_write_index] == E_NOT_USED) 00094 pthread_cond_wait(&ex_in_use_cond[ex_write_index], &ex_in_use_mutex[ex_write_index]); 00095 Example<T>* ex=&ex_ring[ex_write_index]; 00096 pthread_mutex_unlock(&ex_in_use_mutex[ex_write_index]); 00097 pthread_mutex_unlock(write_lock); 00098 00099 return ex; 00100 } 00101 00110 int32_t write_example(Example<T>* ex); 00111 00117 Example<T>* return_example_to_read(); 00118 00124 Example<T>* get_unused_example(); 00125 00134 int32_t copy_example(Example<T>* ex); 00135 00143 void finalize_example(bool free_after_release); 00144 00154 void set_free_vectors_on_destruct(bool destroy) 00155 { 00156 free_vectors_on_destruct = destroy; 00157 } 00158 00163 bool get_free_vectors_on_destruct() 00164 { 00165 return free_vectors_on_destruct; 00166 } 00167 00173 virtual const char* get_name() const { return "ParseBuffer"; } 00174 00175 protected: 00180 virtual void inc_read_index() 00181 { 00182 ex_read_index=(ex_read_index + 1) % ring_size; 00183 } 00184 00189 virtual void inc_write_index() 00190 { 00191 ex_write_index=(ex_write_index + 1) % ring_size; 00192 } 00193 00194 protected: 00195 00197 int32_t ring_size; 00199 Example<T>* ex_ring; 00200 00202 E_IS_EXAMPLE_USED* ex_used; 00204 pthread_mutex_t* ex_in_use_mutex; 00206 pthread_cond_t* ex_in_use_cond; 00208 pthread_mutex_t* read_lock; 00210 pthread_mutex_t* write_lock; 00211 00213 int32_t ex_write_index; 00215 int32_t ex_read_index; 00216 00218 bool free_vectors_on_destruct; 00219 }; 00220 00221 template <class T> CParseBuffer<T>::CParseBuffer(int32_t size) 00222 { 00223 ring_size = size; 00224 ex_ring = SG_CALLOC(Example<T>, ring_size); 00225 ex_used = SG_MALLOC(E_IS_EXAMPLE_USED, ring_size); 00226 ex_in_use_mutex = SG_MALLOC(pthread_mutex_t, ring_size); 00227 ex_in_use_cond = SG_MALLOC(pthread_cond_t, ring_size); 00228 read_lock = SG_MALLOC(pthread_mutex_t, 1); 00229 write_lock = SG_MALLOC(pthread_mutex_t, 1); 00230 00231 SG_SINFO("Initialized with ring size: %d.\n", ring_size) 00232 00233 ex_write_index = 0; 00234 ex_read_index = 0; 00235 00236 for (int32_t i=0; i<ring_size; i++) 00237 { 00238 ex_used[i] = E_EMPTY; 00239 00240 /* this closes a memory leak, seems to have no bad consequences, 00241 * but I am not completely sure due to lack of any tests */ 00242 //ex_ring[i].fv = SG_MALLOC(T, 1); 00243 //ex_ring[i].length = 1; 00244 ex_ring[i].label = FLT_MAX; 00245 00246 pthread_cond_init(&ex_in_use_cond[i], NULL); 00247 pthread_mutex_init(&ex_in_use_mutex[i], NULL); 00248 } 00249 pthread_mutex_init(read_lock, NULL); 00250 pthread_mutex_init(write_lock, NULL); 00251 00252 free_vectors_on_destruct = true; 00253 } 00254 00255 template <class T> CParseBuffer<T>::~CParseBuffer() 00256 { 00257 for (int32_t i=0; i<ring_size; i++) 00258 { 00259 if (ex_ring[i].fv != NULL && free_vectors_on_destruct) 00260 { 00261 SG_DEBUG("%s::~%s(): destroying examples ring vector %d at %p\n", 00262 get_name(), get_name(), i, ex_ring[i].fv); 00263 SG_FREE(ex_ring[i].fv); 00264 } 00265 pthread_mutex_destroy(&ex_in_use_mutex[i]); 00266 pthread_cond_destroy(&ex_in_use_cond[i]); 00267 } 00268 SG_FREE(ex_ring); 00269 SG_FREE(ex_used); 00270 SG_FREE(ex_in_use_mutex); 00271 SG_FREE(ex_in_use_cond); 00272 00273 SG_FREE(read_lock); 00274 SG_FREE(write_lock); 00275 } 00276 00277 template <class T> 00278 int32_t CParseBuffer<T>::write_example(Example<T> *ex) 00279 { 00280 ex_ring[ex_write_index].label = ex->label; 00281 ex_ring[ex_write_index].fv = ex->fv; 00282 ex_ring[ex_write_index].length = ex->length; 00283 ex_used[ex_write_index] = E_NOT_USED; 00284 inc_write_index(); 00285 00286 return 1; 00287 } 00288 00289 template <class T> 00290 Example<T>* CParseBuffer<T>::return_example_to_read() 00291 { 00292 if (ex_read_index >= 0) 00293 return &ex_ring[ex_read_index]; 00294 else 00295 return NULL; 00296 } 00297 00298 template <class T> 00299 Example<T>* CParseBuffer<T>::get_unused_example() 00300 { 00301 pthread_mutex_lock(read_lock); 00302 00303 Example<T> *ex; 00304 int32_t current_index = ex_read_index; 00305 // Because read index will change after return_example_to_read 00306 00307 pthread_mutex_lock(&ex_in_use_mutex[current_index]); 00308 00309 if (ex_used[current_index] == E_NOT_USED) 00310 ex = return_example_to_read(); 00311 else 00312 ex = NULL; 00313 00314 pthread_mutex_unlock(&ex_in_use_mutex[current_index]); 00315 00316 pthread_mutex_unlock(read_lock); 00317 return ex; 00318 } 00319 00320 template <class T> 00321 int32_t CParseBuffer<T>::copy_example(Example<T> *ex) 00322 { 00323 pthread_mutex_lock(write_lock); 00324 int32_t ret; 00325 int32_t current_index = ex_write_index; 00326 00327 pthread_mutex_lock(&ex_in_use_mutex[current_index]); 00328 while (ex_used[ex_write_index] == E_NOT_USED) 00329 { 00330 pthread_cond_wait(&ex_in_use_cond[ex_write_index], &ex_in_use_mutex[ex_write_index]); 00331 } 00332 00333 ret = write_example(ex); 00334 00335 pthread_mutex_unlock(&ex_in_use_mutex[current_index]); 00336 pthread_mutex_unlock(write_lock); 00337 00338 return ret; 00339 } 00340 00341 template <class T> 00342 void CParseBuffer<T>::finalize_example(bool free_after_release) 00343 { 00344 pthread_mutex_lock(read_lock); 00345 pthread_mutex_lock(&ex_in_use_mutex[ex_read_index]); 00346 ex_used[ex_read_index] = E_USED; 00347 00348 if (free_after_release) 00349 { 00350 SG_DEBUG("Freeing object in ring at index %d and address: %p.\n", 00351 ex_read_index, ex_ring[ex_read_index].fv); 00352 00353 SG_FREE(ex_ring[ex_read_index].fv); 00354 ex_ring[ex_read_index].fv=NULL; 00355 } 00356 00357 pthread_cond_signal(&ex_in_use_cond[ex_read_index]); 00358 pthread_mutex_unlock(&ex_in_use_mutex[ex_read_index]); 00359 inc_read_index(); 00360 00361 pthread_mutex_unlock(read_lock); 00362 } 00363 00364 } 00365 #endif // HAVE_PTHREAD 00366 #endif // __PARSEBUFFER_H__