pion  5.0.6
src/scheduler.cpp
00001 // ---------------------------------------------------------------------
00002 // pion:  a Boost C++ framework for building lightweight HTTP interfaces
00003 // ---------------------------------------------------------------------
00004 // Copyright (C) 2007-2014 Splunk Inc.  (https://github.com/splunk/pion)
00005 //
00006 // Distributed under the Boost Software License, Version 1.0.
00007 // See http://www.boost.org/LICENSE_1_0.txt
00008 //
00009 
00010 #include <boost/exception/diagnostic_information.hpp>
00011 #include <boost/date_time/posix_time/posix_time_duration.hpp>
00012 #include <pion/scheduler.hpp>
00013 
00014 namespace pion {    // begin namespace pion
00015 
00016 
00017 // static members of scheduler
00018     
00019 const boost::uint32_t   scheduler::DEFAULT_NUM_THREADS = 8;
00020 const boost::uint32_t   scheduler::NSEC_IN_SECOND = 1000000000; // (10^9)
00021 const boost::uint32_t   scheduler::MICROSEC_IN_SECOND = 1000000;    // (10^6)
00022 const boost::uint32_t   scheduler::KEEP_RUNNING_TIMER_SECONDS = 5;
00023 
00024 
00025 // scheduler member functions
00026 
00027 void scheduler::shutdown(void)
00028 {
00029     // lock mutex for thread safety
00030     boost::mutex::scoped_lock scheduler_lock(m_mutex);
00031     
00032     if (m_is_running) {
00033         
00034         PION_LOG_INFO(m_logger, "Shutting down the thread scheduler");
00035         
00036         while (m_active_users > 0) {
00037             // first, wait for any active users to exit
00038             PION_LOG_INFO(m_logger, "Waiting for " << m_active_users << " scheduler users to finish");
00039             m_no_more_active_users.wait(scheduler_lock);
00040         }
00041 
00042         // shut everything down
00043         m_is_running = false;
00044         stop_services();
00045         stop_threads();
00046         finish_services();
00047         finish_threads();
00048         
00049         PION_LOG_INFO(m_logger, "The thread scheduler has shutdown");
00050 
00051         // Make sure anyone waiting on shutdown gets notified
00052         m_scheduler_has_stopped.notify_all();
00053         
00054     } else {
00055         
00056         // stop and finish everything to be certain that no events are pending
00057         stop_services();
00058         stop_threads();
00059         finish_services();
00060         finish_threads();
00061         
00062         // Make sure anyone waiting on shutdown gets notified
00063         // even if the scheduler did not startup successfully
00064         m_scheduler_has_stopped.notify_all();
00065     }
00066 }
00067 
00068 void scheduler::join(void)
00069 {
00070     boost::mutex::scoped_lock scheduler_lock(m_mutex);
00071     while (m_is_running) {
00072         // sleep until scheduler_has_stopped condition is signaled
00073         m_scheduler_has_stopped.wait(scheduler_lock);
00074     }
00075 }
00076     
00077 void scheduler::keep_running(boost::asio::io_service& my_service,
00078                                 boost::asio::deadline_timer& my_timer)
00079 {
00080     if (m_is_running) {
00081         // schedule this again to make sure the service doesn't complete
00082         my_timer.expires_from_now(boost::posix_time::seconds(KEEP_RUNNING_TIMER_SECONDS));
00083         my_timer.async_wait(boost::bind(&scheduler::keep_running, this,
00084                                         boost::ref(my_service), boost::ref(my_timer)));
00085     }
00086 }
00087 
00088 void scheduler::add_active_user(void)
00089 {
00090     if (!m_is_running) startup();
00091     boost::mutex::scoped_lock scheduler_lock(m_mutex);
00092     ++m_active_users;
00093 }
00094 
00095 void scheduler::remove_active_user(void)
00096 {
00097     boost::mutex::scoped_lock scheduler_lock(m_mutex);
00098     if (--m_active_users == 0)
00099         m_no_more_active_users.notify_all();
00100 }
00101 
00102 boost::system_time scheduler::get_wakeup_time(boost::uint32_t sleep_sec,
00103     boost::uint32_t sleep_nsec)
00104 {
00105     return boost::get_system_time() + boost::posix_time::seconds(sleep_sec) + boost::posix_time::microseconds(sleep_nsec / 1000);
00106 }
00107                      
00108 void scheduler::process_service_work(boost::asio::io_service& service) {
00109     while (m_is_running) {
00110         try {
00111             service.run();
00112         } catch (std::exception& e) {
00113             PION_LOG_ERROR(m_logger, boost::diagnostic_information(e));
00114         } catch (...) {
00115             PION_LOG_ERROR(m_logger, "caught unrecognized exception");
00116         }
00117     }   
00118 }
00119                      
00120 
00121 // single_service_scheduler member functions
00122 
00123 void single_service_scheduler::startup(void)
00124 {
00125     // lock mutex for thread safety
00126     boost::mutex::scoped_lock scheduler_lock(m_mutex);
00127     
00128     if (! m_is_running) {
00129         PION_LOG_INFO(m_logger, "Starting thread scheduler");
00130         m_is_running = true;
00131         
00132         // schedule a work item to make sure that the service doesn't complete
00133         m_service.reset();
00134         keep_running(m_service, m_timer);
00135         
00136         // start multiple threads to handle async tasks
00137         for (boost::uint32_t n = 0; n < m_num_threads; ++n) {
00138             boost::shared_ptr<boost::thread> new_thread(new boost::thread( boost::bind(&scheduler::process_service_work,
00139                                                                                        this, boost::ref(m_service)) ));
00140             m_thread_pool.push_back(new_thread);
00141         }
00142     }
00143 }
00144 
00145     
00146 // one_to_one_scheduler member functions
00147 
00148 void one_to_one_scheduler::startup(void)
00149 {
00150     // lock mutex for thread safety
00151     boost::mutex::scoped_lock scheduler_lock(m_mutex);
00152     
00153     if (! m_is_running) {
00154         PION_LOG_INFO(m_logger, "Starting thread scheduler");
00155         m_is_running = true;
00156         
00157         // make sure there are enough services initialized
00158         while (m_service_pool.size() < m_num_threads) {
00159             boost::shared_ptr<service_pair_type>  service_ptr(new service_pair_type());
00160             m_service_pool.push_back(service_ptr);
00161         }
00162 
00163         // schedule a work item for each service to make sure that it doesn't complete
00164         for (service_pool_type::iterator i = m_service_pool.begin(); i != m_service_pool.end(); ++i) {
00165             keep_running((*i)->first, (*i)->second);
00166         }
00167         
00168         // start multiple threads to handle async tasks
00169         for (boost::uint32_t n = 0; n < m_num_threads; ++n) {
00170             boost::shared_ptr<boost::thread> new_thread(new boost::thread( boost::bind(&scheduler::process_service_work,
00171                                                                                        this, boost::ref(m_service_pool[n]->first)) ));
00172             m_thread_pool.push_back(new_thread);
00173         }
00174     }
00175 }
00176 
00177     
00178 }   // end namespace pion