SHOGUN
v3.2.0
|
00001 /* 00002 * This program is free software; you can redistribute it and/or modify 00003 * it under the terms of the GNU General Public License as published by 00004 * the Free Software Foundation; either version 3 of the License, or 00005 * (at your option) any later version. 00006 * 00007 * Written (W) 2011 Shashwat Lal Das 00008 * Copyright (C) 2011 Berlin Institute of Technology and Max-Planck-Society 00009 */ 00010 00011 #ifndef __INPUTPARSER_H__ 00012 #define __INPUTPARSER_H__ 00013 00014 #include <shogun/lib/common.h> 00015 #ifdef HAVE_PTHREAD 00016 00017 #include <shogun/io/SGIO.h> 00018 #include <shogun/io/streaming/StreamingFile.h> 00019 #include <shogun/io/streaming/ParseBuffer.h> 00020 #include <pthread.h> 00021 00022 #define PARSER_DEFAULT_BUFFSIZE 100 00023 00024 namespace shogun 00025 { 00028 enum E_EXAMPLE_TYPE 00029 { 00030 E_LABELLED = 1, 00031 E_UNLABELLED = 2 00032 }; 00033 00082 template <class T> class CInputParser 00083 { 00084 public: 00085 00090 CInputParser(); 00091 00096 ~CInputParser(); 00097 00109 void init(CStreamingFile* input_file, bool is_labelled = true, int32_t size = PARSER_DEFAULT_BUFFSIZE); 00110 00116 bool is_running(); 00117 00124 int32_t get_number_of_features() { return number_of_features; } 00125 00137 void set_read_vector(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len)); 00138 00150 void set_read_vector_and_label(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len, float64_t &label)); 00151 00163 int32_t get_vector_and_label(T* &feature_vector, 00164 int32_t &length, 00165 float64_t &label); 00166 00177 int32_t get_vector_only(T* &feature_vector, int32_t &length); 00178 00185 void set_free_vector_after_release(bool free_vec); 00186 00193 void set_free_vectors_on_destruct(bool destroy); 00194 00200 void start_parser(); 00201 00210 void* main_parse_loop(void* params); 00211 00212 00218 void copy_example_into_buffer(Example<T>* ex); 00219 00226 Example<T>* retrieve_example(); 00227 00240 int32_t get_next_example(T* &feature_vector, 00241 int32_t &length, 00242 float64_t &label); 00243 00252 int32_t get_next_example(T* &feature_vector, 00253 int32_t &length); 00254 00262 void finalize_example(); 00263 00268 void end_parser(); 00269 00272 void exit_parser(); 00273 00279 int32_t get_ring_size() { return ring_size; } 00280 00281 private: 00289 static void* parse_loop_entry_point(void* params); 00290 00291 public: 00292 bool parsing_done; 00293 bool reading_done; 00295 E_EXAMPLE_TYPE example_type; 00297 protected: 00304 void (CStreamingFile::*read_vector) (T* &vec, int32_t &len); 00305 00312 void (CStreamingFile::*read_vector_and_label) (T* &vec, int32_t &len, float64_t &label); 00313 00315 CStreamingFile* input_source; 00316 00318 pthread_t parse_thread; 00319 00321 CParseBuffer<T>* examples_ring; 00322 00324 int32_t number_of_features; 00325 00327 int32_t number_of_vectors_parsed; 00328 00330 int32_t number_of_vectors_read; 00331 00333 Example<T>* current_example; 00334 00336 T* current_feature_vector; 00337 00339 float64_t current_label; 00340 00342 int32_t current_len; 00343 00345 bool free_after_release; 00346 00348 int32_t ring_size; 00349 00351 pthread_mutex_t examples_state_lock; 00352 00354 pthread_cond_t examples_state_changed; 00355 00356 }; 00357 00358 template <class T> 00359 void CInputParser<T>::set_read_vector(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len)) 00360 { 00361 // Set read_vector to point to the function passed as arg 00362 read_vector=func_ptr; 00363 } 00364 00365 template <class T> 00366 void CInputParser<T>::set_read_vector_and_label(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len, float64_t &label)) 00367 { 00368 // Set read_vector_and_label to point to the function passed as arg 00369 read_vector_and_label=func_ptr; 00370 } 00371 00372 template <class T> 00373 CInputParser<T>::CInputParser() 00374 { 00375 /* this line was commented out when I found it. However, the mutex locks 00376 * have to be initialised. Otherwise uninitialised memory error */ 00377 //init(NULL, true, PARSER_DEFAULT_BUFFSIZE); 00378 pthread_mutex_init(&examples_state_lock, NULL); 00379 pthread_cond_init(&examples_state_changed, NULL); 00380 examples_ring=NULL; 00381 parsing_done=true; 00382 reading_done=true; 00383 } 00384 00385 template <class T> 00386 CInputParser<T>::~CInputParser() 00387 { 00388 pthread_mutex_destroy(&examples_state_lock); 00389 pthread_cond_destroy(&examples_state_changed); 00390 00391 SG_UNREF(examples_ring); 00392 } 00393 00394 template <class T> 00395 void CInputParser<T>::init(CStreamingFile* input_file, bool is_labelled, int32_t size) 00396 { 00397 input_source = input_file; 00398 00399 if (is_labelled == true) 00400 example_type = E_LABELLED; 00401 else 00402 example_type = E_UNLABELLED; 00403 00404 examples_ring = new CParseBuffer<T>(size); 00405 SG_REF(examples_ring); 00406 00407 parsing_done = false; 00408 reading_done = false; 00409 number_of_vectors_parsed = 0; 00410 number_of_vectors_read = 0; 00411 00412 current_len = -1; 00413 current_label = -1; 00414 current_feature_vector = NULL; 00415 00416 free_after_release=true; 00417 ring_size=size; 00418 } 00419 00420 template <class T> 00421 void CInputParser<T>::set_free_vector_after_release(bool free_vec) 00422 { 00423 free_after_release=free_vec; 00424 } 00425 00426 template <class T> 00427 void CInputParser<T>::set_free_vectors_on_destruct(bool destroy) 00428 { 00429 examples_ring->set_free_vectors_on_destruct(destroy); 00430 } 00431 00432 template <class T> 00433 void CInputParser<T>::start_parser() 00434 { 00435 SG_SDEBUG("entering CInputParser::start_parser()\n") 00436 if (is_running()) 00437 { 00438 SG_SERROR("Parser thread is already running! Multiple parse threads not supported.\n") 00439 } 00440 00441 SG_SDEBUG("creating parse thread\n") 00442 pthread_create(&parse_thread, NULL, parse_loop_entry_point, this); 00443 00444 SG_SDEBUG("leaving CInputParser::start_parser()\n") 00445 } 00446 00447 template <class T> 00448 void* CInputParser<T>::parse_loop_entry_point(void* params) 00449 { 00450 ((CInputParser *) params)->main_parse_loop(params); 00451 00452 return NULL; 00453 } 00454 00455 template <class T> 00456 bool CInputParser<T>::is_running() 00457 { 00458 SG_SDEBUG("entering CInputParser::is_running()\n") 00459 bool ret; 00460 00461 pthread_mutex_lock(&examples_state_lock); 00462 00463 if (parsing_done) 00464 if (reading_done) 00465 ret = false; 00466 else 00467 ret = true; 00468 else 00469 ret = false; 00470 00471 pthread_mutex_unlock(&examples_state_lock); 00472 00473 SG_SDEBUG("leaving CInputParser::is_running(), returning %d\n", ret) 00474 return ret; 00475 } 00476 00477 template <class T> 00478 int32_t CInputParser<T>::get_vector_and_label(T* &feature_vector, 00479 int32_t &length, 00480 float64_t &label) 00481 { 00482 (input_source->*read_vector_and_label)(feature_vector, length, label); 00483 00484 if (length < 1) 00485 { 00486 // Problem reading the example 00487 return 0; 00488 } 00489 00490 return 1; 00491 } 00492 00493 template <class T> 00494 int32_t CInputParser<T>::get_vector_only(T* &feature_vector, 00495 int32_t &length) 00496 { 00497 (input_source->*read_vector)(feature_vector, length); 00498 00499 if (length < 1) 00500 { 00501 // Problem reading the example 00502 return 0; 00503 } 00504 00505 return 1; 00506 } 00507 00508 template <class T> 00509 void CInputParser<T>::copy_example_into_buffer(Example<T>* ex) 00510 { 00511 examples_ring->copy_example(ex); 00512 } 00513 00514 template <class T> void* CInputParser<T>::main_parse_loop(void* params) 00515 { 00516 // Read the examples into current_* objects 00517 // Instead of allocating mem for new objects each time 00518 #ifdef HAVE_PTHREAD 00519 CInputParser* this_obj = (CInputParser *) params; 00520 this->input_source = this_obj->input_source; 00521 00522 while (1) 00523 { 00524 pthread_mutex_lock(&examples_state_lock); 00525 if (parsing_done) 00526 { 00527 pthread_mutex_unlock(&examples_state_lock); 00528 return NULL; 00529 } 00530 pthread_mutex_unlock(&examples_state_lock); 00531 00532 pthread_testcancel(); 00533 00534 current_example = examples_ring->get_free_example(); 00535 current_feature_vector = current_example->fv; 00536 current_len = current_example->length; 00537 current_label = current_example->label; 00538 00539 if (example_type == E_LABELLED) 00540 get_vector_and_label(current_feature_vector, current_len, current_label); 00541 else 00542 get_vector_only(current_feature_vector, current_len); 00543 00544 if (current_len < 0) 00545 { 00546 pthread_mutex_lock(&examples_state_lock); 00547 parsing_done = true; 00548 pthread_cond_signal(&examples_state_changed); 00549 pthread_mutex_unlock(&examples_state_lock); 00550 return NULL; 00551 } 00552 00553 current_example->label = current_label; 00554 current_example->fv = current_feature_vector; 00555 current_example->length = current_len; 00556 00557 examples_ring->copy_example(current_example); 00558 00559 pthread_mutex_lock(&examples_state_lock); 00560 number_of_vectors_parsed++; 00561 pthread_cond_signal(&examples_state_changed); 00562 pthread_mutex_unlock(&examples_state_lock); 00563 } 00564 #endif /* HAVE_PTHREAD */ 00565 return NULL; 00566 } 00567 00568 template <class T> Example<T>* CInputParser<T>::retrieve_example() 00569 { 00570 /* This function should be guarded by mutexes while calling */ 00571 Example<T> *ex; 00572 00573 if (parsing_done) 00574 { 00575 if (number_of_vectors_read == number_of_vectors_parsed) 00576 { 00577 reading_done = true; 00578 /* Signal to waiting threads that no more examples are left */ 00579 pthread_cond_signal(&examples_state_changed); 00580 return NULL; 00581 } 00582 } 00583 00584 if (number_of_vectors_parsed <= 0) 00585 return NULL; 00586 00587 if (number_of_vectors_read == number_of_vectors_parsed) 00588 { 00589 return NULL; 00590 } 00591 00592 ex = examples_ring->get_unused_example(); 00593 number_of_vectors_read++; 00594 00595 return ex; 00596 } 00597 00598 template <class T> int32_t CInputParser<T>::get_next_example(T* &fv, 00599 int32_t &length, float64_t &label) 00600 { 00601 /* if reading is done, no more examples can be fetched. return 0 00602 else, if example can be read, get the example and return 1. 00603 otherwise, wait for further parsing, get the example and 00604 return 1 */ 00605 00606 Example<T> *ex; 00607 00608 while (1) 00609 { 00610 if (reading_done) 00611 return 0; 00612 00613 pthread_mutex_lock(&examples_state_lock); 00614 ex = retrieve_example(); 00615 00616 if (ex == NULL) 00617 { 00618 if (reading_done) 00619 { 00620 /* No more examples left, return */ 00621 pthread_mutex_unlock(&examples_state_lock); 00622 return 0; 00623 } 00624 else 00625 { 00626 /* Examples left, wait for one to become ready */ 00627 pthread_cond_wait(&examples_state_changed, &examples_state_lock); 00628 pthread_mutex_unlock(&examples_state_lock); 00629 continue; 00630 } 00631 } 00632 else 00633 { 00634 /* Example ready, return the example */ 00635 pthread_mutex_unlock(&examples_state_lock); 00636 break; 00637 } 00638 } 00639 00640 fv = ex->fv; 00641 length = ex->length; 00642 label = ex->label; 00643 00644 return 1; 00645 } 00646 00647 template <class T> 00648 int32_t CInputParser<T>::get_next_example(T* &fv, int32_t &length) 00649 { 00650 float64_t label_dummy; 00651 00652 return get_next_example(fv, length, label_dummy); 00653 } 00654 00655 template <class T> 00656 void CInputParser<T>::finalize_example() 00657 { 00658 examples_ring->finalize_example(free_after_release); 00659 } 00660 00661 template <class T> void CInputParser<T>::end_parser() 00662 { 00663 SG_SDEBUG("entering CInputParser::end_parser\n") 00664 SG_SDEBUG("joining parse thread\n") 00665 pthread_join(parse_thread, NULL); 00666 SG_SDEBUG("leaving CInputParser::end_parser\n") 00667 } 00668 00669 template <class T> void CInputParser<T>::exit_parser() 00670 { 00671 SG_SDEBUG("cancelling parse thread\n") 00672 pthread_cancel(parse_thread); 00673 } 00674 } 00675 00676 #endif /* HAVE_PTHREAD */ 00677 00678 #endif // __INPUTPARSER_H__