SHOGUN  v3.2.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines
InputParser.h
Go to the documentation of this file.
00001 /*
00002  * This program is free software; you can redistribute it and/or modify
00003  * it under the terms of the GNU General Public License as published by
00004  * the Free Software Foundation; either version 3 of the License, or
00005  * (at your option) any later version.
00006  *
00007  * Written (W) 2011 Shashwat Lal Das
00008  * Copyright (C) 2011 Berlin Institute of Technology and Max-Planck-Society
00009  */
00010 
00011 #ifndef __INPUTPARSER_H__
00012 #define __INPUTPARSER_H__
00013 
00014 #include <shogun/lib/common.h>
00015 #ifdef HAVE_PTHREAD
00016 
00017 #include <shogun/io/SGIO.h>
00018 #include <shogun/io/streaming/StreamingFile.h>
00019 #include <shogun/io/streaming/ParseBuffer.h>
00020 #include <pthread.h>
00021 
00022 #define PARSER_DEFAULT_BUFFSIZE 100
00023 
00024 namespace shogun
00025 {
00028     enum E_EXAMPLE_TYPE
00029     {
00030         E_LABELLED = 1,
00031         E_UNLABELLED = 2
00032     };
00033 
00082 template <class T> class CInputParser
00083 {
00084 public:
00085 
00090     CInputParser();
00091 
00096     ~CInputParser();
00097 
00109     void init(CStreamingFile* input_file, bool is_labelled = true, int32_t size = PARSER_DEFAULT_BUFFSIZE);
00110 
00116     bool is_running();
00117 
00124     int32_t get_number_of_features() { return number_of_features; }
00125 
00137     void set_read_vector(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len));
00138 
00150     void set_read_vector_and_label(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len, float64_t &label));
00151 
00163     int32_t get_vector_and_label(T* &feature_vector,
00164                      int32_t &length,
00165                      float64_t &label);
00166 
00177     int32_t get_vector_only(T* &feature_vector, int32_t &length);
00178 
00185     void set_free_vector_after_release(bool free_vec);
00186 
00193     void set_free_vectors_on_destruct(bool destroy);
00194 
00200     void start_parser();
00201 
00210     void* main_parse_loop(void* params);
00211 
00212 
00218     void copy_example_into_buffer(Example<T>* ex);
00219 
00226     Example<T>* retrieve_example();
00227 
00240     int32_t get_next_example(T* &feature_vector,
00241                  int32_t &length,
00242                  float64_t &label);
00243 
00252     int32_t get_next_example(T* &feature_vector,
00253                  int32_t &length);
00254 
00262     void finalize_example();
00263 
00268     void end_parser();
00269 
00272     void exit_parser();
00273 
00279     int32_t get_ring_size() { return ring_size; }
00280 
00281 private:
00289     static void* parse_loop_entry_point(void* params);
00290 
00291 public:
00292     bool parsing_done;  
00293     bool reading_done;  
00295     E_EXAMPLE_TYPE example_type; 
00297 protected:
00304     void (CStreamingFile::*read_vector) (T* &vec, int32_t &len);
00305 
00312     void (CStreamingFile::*read_vector_and_label) (T* &vec, int32_t &len, float64_t &label);
00313 
00315     CStreamingFile* input_source;
00316 
00318     pthread_t parse_thread;
00319 
00321     CParseBuffer<T>* examples_ring;
00322 
00324     int32_t number_of_features;
00325 
00327     int32_t number_of_vectors_parsed;
00328 
00330     int32_t number_of_vectors_read;
00331 
00333     Example<T>* current_example;
00334 
00336     T* current_feature_vector;
00337 
00339     float64_t current_label;
00340 
00342     int32_t current_len;
00343 
00345     bool free_after_release;
00346 
00348     int32_t ring_size;
00349 
00351     pthread_mutex_t examples_state_lock;
00352 
00354     pthread_cond_t examples_state_changed;
00355 
00356 };
00357 
00358 template <class T>
00359     void CInputParser<T>::set_read_vector(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len))
00360 {
00361     // Set read_vector to point to the function passed as arg
00362     read_vector=func_ptr;
00363 }
00364 
00365 template <class T>
00366     void CInputParser<T>::set_read_vector_and_label(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len, float64_t &label))
00367 {
00368     // Set read_vector_and_label to point to the function passed as arg
00369     read_vector_and_label=func_ptr;
00370 }
00371 
00372 template <class T>
00373     CInputParser<T>::CInputParser()
00374 {
00375     /* this line was commented out when I found it. However, the mutex locks
00376      * have to be initialised. Otherwise uninitialised memory error */
00377     //init(NULL, true, PARSER_DEFAULT_BUFFSIZE);
00378     pthread_mutex_init(&examples_state_lock, NULL);
00379     pthread_cond_init(&examples_state_changed, NULL);
00380     examples_ring=NULL;
00381     parsing_done=true;
00382     reading_done=true;
00383 }
00384 
00385 template <class T>
00386     CInputParser<T>::~CInputParser()
00387 {
00388     pthread_mutex_destroy(&examples_state_lock);
00389     pthread_cond_destroy(&examples_state_changed);
00390 
00391     SG_UNREF(examples_ring);
00392 }
00393 
00394 template <class T>
00395     void CInputParser<T>::init(CStreamingFile* input_file, bool is_labelled, int32_t size)
00396 {
00397     input_source = input_file;
00398 
00399     if (is_labelled == true)
00400         example_type = E_LABELLED;
00401     else
00402         example_type = E_UNLABELLED;
00403 
00404     examples_ring = new CParseBuffer<T>(size);
00405     SG_REF(examples_ring);
00406 
00407     parsing_done = false;
00408     reading_done = false;
00409     number_of_vectors_parsed = 0;
00410     number_of_vectors_read = 0;
00411 
00412     current_len = -1;
00413     current_label = -1;
00414     current_feature_vector = NULL;
00415 
00416     free_after_release=true;
00417     ring_size=size;
00418 }
00419 
00420 template <class T>
00421     void CInputParser<T>::set_free_vector_after_release(bool free_vec)
00422 {
00423     free_after_release=free_vec;
00424 }
00425 
00426 template <class T>
00427     void CInputParser<T>::set_free_vectors_on_destruct(bool destroy)
00428 {
00429     examples_ring->set_free_vectors_on_destruct(destroy);
00430 }
00431 
00432 template <class T>
00433     void CInputParser<T>::start_parser()
00434 {
00435     SG_SDEBUG("entering CInputParser::start_parser()\n")
00436     if (is_running())
00437     {
00438         SG_SERROR("Parser thread is already running! Multiple parse threads not supported.\n")
00439     }
00440 
00441     SG_SDEBUG("creating parse thread\n")
00442     pthread_create(&parse_thread, NULL, parse_loop_entry_point, this);
00443 
00444     SG_SDEBUG("leaving CInputParser::start_parser()\n")
00445 }
00446 
00447 template <class T>
00448     void* CInputParser<T>::parse_loop_entry_point(void* params)
00449 {
00450     ((CInputParser *) params)->main_parse_loop(params);
00451 
00452     return NULL;
00453 }
00454 
00455 template <class T>
00456     bool CInputParser<T>::is_running()
00457 {
00458     SG_SDEBUG("entering CInputParser::is_running()\n")
00459     bool ret;
00460 
00461     pthread_mutex_lock(&examples_state_lock);
00462 
00463     if (parsing_done)
00464         if (reading_done)
00465             ret = false;
00466         else
00467             ret = true;
00468     else
00469         ret = false;
00470 
00471     pthread_mutex_unlock(&examples_state_lock);
00472 
00473     SG_SDEBUG("leaving CInputParser::is_running(), returning %d\n", ret)
00474     return ret;
00475 }
00476 
00477 template <class T>
00478     int32_t CInputParser<T>::get_vector_and_label(T* &feature_vector,
00479                               int32_t &length,
00480                               float64_t &label)
00481 {
00482     (input_source->*read_vector_and_label)(feature_vector, length, label);
00483 
00484     if (length < 1)
00485     {
00486         // Problem reading the example
00487         return 0;
00488     }
00489 
00490     return 1;
00491 }
00492 
00493 template <class T>
00494     int32_t CInputParser<T>::get_vector_only(T* &feature_vector,
00495                          int32_t &length)
00496 {
00497     (input_source->*read_vector)(feature_vector, length);
00498 
00499     if (length < 1)
00500     {
00501         // Problem reading the example
00502         return 0;
00503     }
00504 
00505     return 1;
00506 }
00507 
00508 template <class T>
00509     void CInputParser<T>::copy_example_into_buffer(Example<T>* ex)
00510 {
00511     examples_ring->copy_example(ex);
00512 }
00513 
00514 template <class T> void* CInputParser<T>::main_parse_loop(void* params)
00515 {
00516     // Read the examples into current_* objects
00517     // Instead of allocating mem for new objects each time
00518 #ifdef HAVE_PTHREAD
00519     CInputParser* this_obj = (CInputParser *) params;
00520     this->input_source = this_obj->input_source;
00521 
00522     while (1)
00523     {
00524         pthread_mutex_lock(&examples_state_lock);
00525         if (parsing_done)
00526         {
00527             pthread_mutex_unlock(&examples_state_lock);
00528             return NULL;
00529         }
00530         pthread_mutex_unlock(&examples_state_lock);
00531 
00532         pthread_testcancel();
00533 
00534         current_example = examples_ring->get_free_example();
00535         current_feature_vector = current_example->fv;
00536         current_len = current_example->length;
00537         current_label = current_example->label;
00538 
00539         if (example_type == E_LABELLED)
00540             get_vector_and_label(current_feature_vector, current_len, current_label);
00541         else
00542             get_vector_only(current_feature_vector, current_len);
00543 
00544         if (current_len < 0)
00545         {
00546             pthread_mutex_lock(&examples_state_lock);
00547             parsing_done = true;
00548             pthread_cond_signal(&examples_state_changed);
00549             pthread_mutex_unlock(&examples_state_lock);
00550             return NULL;
00551         }
00552 
00553         current_example->label = current_label;
00554         current_example->fv = current_feature_vector;
00555         current_example->length = current_len;
00556 
00557         examples_ring->copy_example(current_example);
00558 
00559         pthread_mutex_lock(&examples_state_lock);
00560         number_of_vectors_parsed++;
00561         pthread_cond_signal(&examples_state_changed);
00562         pthread_mutex_unlock(&examples_state_lock);
00563     }
00564 #endif /* HAVE_PTHREAD */
00565     return NULL;
00566 }
00567 
00568 template <class T> Example<T>* CInputParser<T>::retrieve_example()
00569 {
00570     /* This function should be guarded by mutexes while calling  */
00571     Example<T> *ex;
00572 
00573     if (parsing_done)
00574     {
00575         if (number_of_vectors_read == number_of_vectors_parsed)
00576         {
00577             reading_done = true;
00578             /* Signal to waiting threads that no more examples are left */
00579             pthread_cond_signal(&examples_state_changed);
00580             return NULL;
00581         }
00582     }
00583 
00584     if (number_of_vectors_parsed <= 0)
00585         return NULL;
00586 
00587     if (number_of_vectors_read == number_of_vectors_parsed)
00588     {
00589         return NULL;
00590     }
00591 
00592     ex = examples_ring->get_unused_example();
00593     number_of_vectors_read++;
00594 
00595     return ex;
00596 }
00597 
00598 template <class T> int32_t CInputParser<T>::get_next_example(T* &fv,
00599         int32_t &length, float64_t &label)
00600 {
00601     /* if reading is done, no more examples can be fetched. return 0
00602        else, if example can be read, get the example and return 1.
00603        otherwise, wait for further parsing, get the example and
00604        return 1 */
00605 
00606     Example<T> *ex;
00607 
00608     while (1)
00609     {
00610         if (reading_done)
00611             return 0;
00612 
00613         pthread_mutex_lock(&examples_state_lock);
00614         ex = retrieve_example();
00615 
00616         if (ex == NULL)
00617         {
00618             if (reading_done)
00619             {
00620                 /* No more examples left, return */
00621                 pthread_mutex_unlock(&examples_state_lock);
00622                 return 0;
00623             }
00624             else
00625             {
00626                 /* Examples left, wait for one to become ready */
00627                 pthread_cond_wait(&examples_state_changed, &examples_state_lock);
00628                 pthread_mutex_unlock(&examples_state_lock);
00629                 continue;
00630             }
00631         }
00632         else
00633         {
00634             /* Example ready, return the example */
00635             pthread_mutex_unlock(&examples_state_lock);
00636             break;
00637         }
00638     }
00639 
00640     fv = ex->fv;
00641     length = ex->length;
00642     label = ex->label;
00643 
00644     return 1;
00645 }
00646 
00647 template <class T>
00648     int32_t CInputParser<T>::get_next_example(T* &fv, int32_t &length)
00649 {
00650     float64_t label_dummy;
00651 
00652     return get_next_example(fv, length, label_dummy);
00653 }
00654 
00655 template <class T>
00656     void CInputParser<T>::finalize_example()
00657 {
00658     examples_ring->finalize_example(free_after_release);
00659 }
00660 
00661 template <class T> void CInputParser<T>::end_parser()
00662 {
00663     SG_SDEBUG("entering CInputParser::end_parser\n")
00664     SG_SDEBUG("joining parse thread\n")
00665     pthread_join(parse_thread, NULL);
00666     SG_SDEBUG("leaving CInputParser::end_parser\n")
00667 }
00668 
00669 template <class T> void CInputParser<T>::exit_parser()
00670 {
00671     SG_SDEBUG("cancelling parse thread\n")
00672     pthread_cancel(parse_thread);
00673 }
00674 }
00675 
00676 #endif /* HAVE_PTHREAD */
00677 
00678 #endif // __INPUTPARSER_H__
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

SHOGUN Machine Learning Toolbox - Documentation