SHOGUN  v3.2.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines
ParseBuffer.h
Go to the documentation of this file.
00001 /*
00002  * This program is free software; you can redistribute it and/or modify
00003  * it under the terms of the GNU General Public License as published by
00004  * the Free Software Foundation; either version 3 of the License, or
00005  * (at your option) any later version.
00006  *
00007  * Written (W) 2011 Shashwat Lal Das
00008  * Copyright (C) 2011 Berlin Institute of Technology and Max-Planck-Society
00009  */
00010 #ifndef __PARSEBUFFER_H__
00011 #define __PARSEBUFFER_H__
00012 
00013 #include <shogun/lib/common.h>
00014 #ifdef HAVE_PTHREAD
00015 
00016 #include <shogun/lib/DataType.h>
00017 #include <pthread.h>
00018 
00019 namespace shogun
00020 {
00021 
00024 enum E_IS_EXAMPLE_USED
00025 {
00026     E_EMPTY = 1,
00027     E_NOT_USED = 2,
00028     E_USED = 3
00029 };
00030 
00040 template <class T>
00041 class Example
00042 {
00043 public:
00045     float64_t label;
00047     T* fv;
00048     index_t length;
00049 };
00050 
00067 template <class T> class CParseBuffer: public CSGObject
00068 {
00069 public:
00075     CParseBuffer(int32_t size = 1024);
00076 
00081     ~CParseBuffer();
00082 
00089     Example<T>* get_free_example()
00090     {
00091         pthread_mutex_lock(write_lock);
00092         pthread_mutex_lock(&ex_in_use_mutex[ex_write_index]);
00093         while (ex_used[ex_write_index] == E_NOT_USED)
00094             pthread_cond_wait(&ex_in_use_cond[ex_write_index], &ex_in_use_mutex[ex_write_index]);
00095         Example<T>* ex=&ex_ring[ex_write_index];
00096         pthread_mutex_unlock(&ex_in_use_mutex[ex_write_index]);
00097         pthread_mutex_unlock(write_lock);
00098 
00099         return ex;
00100     }
00101 
00110     int32_t write_example(Example<T>* ex);
00111 
00117     Example<T>* return_example_to_read();
00118 
00124     Example<T>* get_unused_example();
00125 
00134     int32_t copy_example(Example<T>* ex);
00135 
00143     void finalize_example(bool free_after_release);
00144 
00154     void set_free_vectors_on_destruct(bool destroy)
00155     {
00156         free_vectors_on_destruct = destroy;
00157     }
00158 
00163     bool get_free_vectors_on_destruct()
00164     {
00165         return free_vectors_on_destruct;
00166     }
00167 
00173     virtual const char* get_name() const { return "ParseBuffer"; }
00174 
00175 protected:
00180     virtual void inc_read_index()
00181     {
00182         ex_read_index=(ex_read_index + 1) % ring_size;
00183     }
00184 
00189     virtual void inc_write_index()
00190     {
00191         ex_write_index=(ex_write_index + 1) % ring_size;
00192     }
00193 
00194 protected:
00195 
00197     int32_t ring_size;
00199     Example<T>* ex_ring;
00200 
00202     E_IS_EXAMPLE_USED* ex_used;
00204     pthread_mutex_t* ex_in_use_mutex;
00206     pthread_cond_t* ex_in_use_cond;
00208     pthread_mutex_t* read_lock;
00210     pthread_mutex_t* write_lock;
00211 
00213     int32_t ex_write_index;
00215     int32_t ex_read_index;
00216 
00218     bool free_vectors_on_destruct;
00219 };
00220 
00221 template <class T> CParseBuffer<T>::CParseBuffer(int32_t size)
00222 {
00223     ring_size = size;
00224     ex_ring = SG_CALLOC(Example<T>, ring_size);
00225     ex_used = SG_MALLOC(E_IS_EXAMPLE_USED, ring_size);
00226     ex_in_use_mutex = SG_MALLOC(pthread_mutex_t, ring_size);
00227     ex_in_use_cond = SG_MALLOC(pthread_cond_t, ring_size);
00228     read_lock = SG_MALLOC(pthread_mutex_t, 1);
00229     write_lock = SG_MALLOC(pthread_mutex_t, 1);
00230 
00231     SG_SINFO("Initialized with ring size: %d.\n", ring_size)
00232 
00233     ex_write_index = 0;
00234     ex_read_index = 0;
00235 
00236     for (int32_t i=0; i<ring_size; i++)
00237     {
00238         ex_used[i] = E_EMPTY;
00239 
00240         /* this closes a memory leak, seems to have no bad consequences,
00241          * but I am not completely sure due to lack of any tests */
00242         //ex_ring[i].fv = SG_MALLOC(T, 1);
00243         //ex_ring[i].length = 1;
00244         ex_ring[i].label = FLT_MAX;
00245 
00246         pthread_cond_init(&ex_in_use_cond[i], NULL);
00247         pthread_mutex_init(&ex_in_use_mutex[i], NULL);
00248     }
00249     pthread_mutex_init(read_lock, NULL);
00250     pthread_mutex_init(write_lock, NULL);
00251 
00252     free_vectors_on_destruct = true;
00253 }
00254 
00255 template <class T> CParseBuffer<T>::~CParseBuffer()
00256 {
00257     for (int32_t i=0; i<ring_size; i++)
00258     {
00259         if (ex_ring[i].fv != NULL && free_vectors_on_destruct)
00260         {
00261             SG_DEBUG("%s::~%s(): destroying examples ring vector %d at %p\n",
00262                     get_name(), get_name(), i, ex_ring[i].fv);
00263             SG_FREE(ex_ring[i].fv);
00264         }
00265         pthread_mutex_destroy(&ex_in_use_mutex[i]);
00266         pthread_cond_destroy(&ex_in_use_cond[i]);
00267     }
00268     SG_FREE(ex_ring);
00269     SG_FREE(ex_used);
00270     SG_FREE(ex_in_use_mutex);
00271     SG_FREE(ex_in_use_cond);
00272 
00273     SG_FREE(read_lock);
00274     SG_FREE(write_lock);
00275 }
00276 
00277 template <class T>
00278 int32_t CParseBuffer<T>::write_example(Example<T> *ex)
00279 {
00280     ex_ring[ex_write_index].label = ex->label;
00281     ex_ring[ex_write_index].fv = ex->fv;
00282     ex_ring[ex_write_index].length = ex->length;
00283     ex_used[ex_write_index] = E_NOT_USED;
00284     inc_write_index();
00285 
00286     return 1;
00287 }
00288 
00289 template <class T>
00290 Example<T>* CParseBuffer<T>::return_example_to_read()
00291 {
00292     if (ex_read_index >= 0)
00293         return &ex_ring[ex_read_index];
00294     else
00295         return NULL;
00296 }
00297 
00298 template <class T>
00299 Example<T>* CParseBuffer<T>::get_unused_example()
00300 {
00301     pthread_mutex_lock(read_lock);
00302 
00303     Example<T> *ex;
00304     int32_t current_index = ex_read_index;
00305     // Because read index will change after return_example_to_read
00306 
00307     pthread_mutex_lock(&ex_in_use_mutex[current_index]);
00308 
00309     if (ex_used[current_index] == E_NOT_USED)
00310         ex = return_example_to_read();
00311     else
00312         ex = NULL;
00313 
00314     pthread_mutex_unlock(&ex_in_use_mutex[current_index]);
00315 
00316     pthread_mutex_unlock(read_lock);
00317     return ex;
00318 }
00319 
00320 template <class T>
00321 int32_t CParseBuffer<T>::copy_example(Example<T> *ex)
00322 {
00323     pthread_mutex_lock(write_lock);
00324     int32_t ret;
00325     int32_t current_index = ex_write_index;
00326 
00327     pthread_mutex_lock(&ex_in_use_mutex[current_index]);
00328     while (ex_used[ex_write_index] == E_NOT_USED)
00329     {
00330         pthread_cond_wait(&ex_in_use_cond[ex_write_index], &ex_in_use_mutex[ex_write_index]);
00331     }
00332 
00333     ret = write_example(ex);
00334 
00335     pthread_mutex_unlock(&ex_in_use_mutex[current_index]);
00336     pthread_mutex_unlock(write_lock);
00337 
00338     return ret;
00339 }
00340 
00341 template <class T>
00342 void CParseBuffer<T>::finalize_example(bool free_after_release)
00343 {
00344     pthread_mutex_lock(read_lock);
00345     pthread_mutex_lock(&ex_in_use_mutex[ex_read_index]);
00346     ex_used[ex_read_index] = E_USED;
00347 
00348     if (free_after_release)
00349     {
00350         SG_DEBUG("Freeing object in ring at index %d and address: %p.\n",
00351              ex_read_index, ex_ring[ex_read_index].fv);
00352 
00353         SG_FREE(ex_ring[ex_read_index].fv);
00354         ex_ring[ex_read_index].fv=NULL;
00355     }
00356 
00357     pthread_cond_signal(&ex_in_use_cond[ex_read_index]);
00358     pthread_mutex_unlock(&ex_in_use_mutex[ex_read_index]);
00359     inc_read_index();
00360 
00361     pthread_mutex_unlock(read_lock);
00362 }
00363 
00364 }
00365 #endif // HAVE_PTHREAD
00366 #endif // __PARSEBUFFER_H__
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

SHOGUN Machine Learning Toolbox - Documentation