pion
5.0.6
|
00001 // --------------------------------------------------------------------- 00002 // pion: a Boost C++ framework for building lightweight HTTP interfaces 00003 // --------------------------------------------------------------------- 00004 // Copyright (C) 2007-2014 Splunk Inc. (https://github.com/splunk/pion) 00005 // 00006 // Distributed under the Boost Software License, Version 1.0. 00007 // See http://www.boost.org/LICENSE_1_0.txt 00008 // 00009 00010 #include <boost/exception/diagnostic_information.hpp> 00011 #include <boost/date_time/posix_time/posix_time_duration.hpp> 00012 #include <pion/scheduler.hpp> 00013 00014 namespace pion { // begin namespace pion 00015 00016 00017 // static members of scheduler 00018 00019 const boost::uint32_t scheduler::DEFAULT_NUM_THREADS = 8; 00020 const boost::uint32_t scheduler::NSEC_IN_SECOND = 1000000000; // (10^9) 00021 const boost::uint32_t scheduler::MICROSEC_IN_SECOND = 1000000; // (10^6) 00022 const boost::uint32_t scheduler::KEEP_RUNNING_TIMER_SECONDS = 5; 00023 00024 00025 // scheduler member functions 00026 00027 void scheduler::shutdown(void) 00028 { 00029 // lock mutex for thread safety 00030 boost::mutex::scoped_lock scheduler_lock(m_mutex); 00031 00032 if (m_is_running) { 00033 00034 PION_LOG_INFO(m_logger, "Shutting down the thread scheduler"); 00035 00036 while (m_active_users > 0) { 00037 // first, wait for any active users to exit 00038 PION_LOG_INFO(m_logger, "Waiting for " << m_active_users << " scheduler users to finish"); 00039 m_no_more_active_users.wait(scheduler_lock); 00040 } 00041 00042 // shut everything down 00043 m_is_running = false; 00044 stop_services(); 00045 stop_threads(); 00046 finish_services(); 00047 finish_threads(); 00048 00049 PION_LOG_INFO(m_logger, "The thread scheduler has shutdown"); 00050 00051 // Make sure anyone waiting on shutdown gets notified 00052 m_scheduler_has_stopped.notify_all(); 00053 00054 } else { 00055 00056 // stop and finish everything to be certain that no events are pending 00057 stop_services(); 00058 stop_threads(); 00059 finish_services(); 00060 finish_threads(); 00061 00062 // Make sure anyone waiting on shutdown gets notified 00063 // even if the scheduler did not startup successfully 00064 m_scheduler_has_stopped.notify_all(); 00065 } 00066 } 00067 00068 void scheduler::join(void) 00069 { 00070 boost::mutex::scoped_lock scheduler_lock(m_mutex); 00071 while (m_is_running) { 00072 // sleep until scheduler_has_stopped condition is signaled 00073 m_scheduler_has_stopped.wait(scheduler_lock); 00074 } 00075 } 00076 00077 void scheduler::keep_running(boost::asio::io_service& my_service, 00078 boost::asio::deadline_timer& my_timer) 00079 { 00080 if (m_is_running) { 00081 // schedule this again to make sure the service doesn't complete 00082 my_timer.expires_from_now(boost::posix_time::seconds(KEEP_RUNNING_TIMER_SECONDS)); 00083 my_timer.async_wait(boost::bind(&scheduler::keep_running, this, 00084 boost::ref(my_service), boost::ref(my_timer))); 00085 } 00086 } 00087 00088 void scheduler::add_active_user(void) 00089 { 00090 if (!m_is_running) startup(); 00091 boost::mutex::scoped_lock scheduler_lock(m_mutex); 00092 ++m_active_users; 00093 } 00094 00095 void scheduler::remove_active_user(void) 00096 { 00097 boost::mutex::scoped_lock scheduler_lock(m_mutex); 00098 if (--m_active_users == 0) 00099 m_no_more_active_users.notify_all(); 00100 } 00101 00102 boost::system_time scheduler::get_wakeup_time(boost::uint32_t sleep_sec, 00103 boost::uint32_t sleep_nsec) 00104 { 00105 return boost::get_system_time() + boost::posix_time::seconds(sleep_sec) + boost::posix_time::microseconds(sleep_nsec / 1000); 00106 } 00107 00108 void scheduler::process_service_work(boost::asio::io_service& service) { 00109 while (m_is_running) { 00110 try { 00111 service.run(); 00112 } catch (std::exception& e) { 00113 PION_LOG_ERROR(m_logger, boost::diagnostic_information(e)); 00114 } catch (...) { 00115 PION_LOG_ERROR(m_logger, "caught unrecognized exception"); 00116 } 00117 } 00118 } 00119 00120 00121 // single_service_scheduler member functions 00122 00123 void single_service_scheduler::startup(void) 00124 { 00125 // lock mutex for thread safety 00126 boost::mutex::scoped_lock scheduler_lock(m_mutex); 00127 00128 if (! m_is_running) { 00129 PION_LOG_INFO(m_logger, "Starting thread scheduler"); 00130 m_is_running = true; 00131 00132 // schedule a work item to make sure that the service doesn't complete 00133 m_service.reset(); 00134 keep_running(m_service, m_timer); 00135 00136 // start multiple threads to handle async tasks 00137 for (boost::uint32_t n = 0; n < m_num_threads; ++n) { 00138 boost::shared_ptr<boost::thread> new_thread(new boost::thread( boost::bind(&scheduler::process_service_work, 00139 this, boost::ref(m_service)) )); 00140 m_thread_pool.push_back(new_thread); 00141 } 00142 } 00143 } 00144 00145 00146 // one_to_one_scheduler member functions 00147 00148 void one_to_one_scheduler::startup(void) 00149 { 00150 // lock mutex for thread safety 00151 boost::mutex::scoped_lock scheduler_lock(m_mutex); 00152 00153 if (! m_is_running) { 00154 PION_LOG_INFO(m_logger, "Starting thread scheduler"); 00155 m_is_running = true; 00156 00157 // make sure there are enough services initialized 00158 while (m_service_pool.size() < m_num_threads) { 00159 boost::shared_ptr<service_pair_type> service_ptr(new service_pair_type()); 00160 m_service_pool.push_back(service_ptr); 00161 } 00162 00163 // schedule a work item for each service to make sure that it doesn't complete 00164 for (service_pool_type::iterator i = m_service_pool.begin(); i != m_service_pool.end(); ++i) { 00165 keep_running((*i)->first, (*i)->second); 00166 } 00167 00168 // start multiple threads to handle async tasks 00169 for (boost::uint32_t n = 0; n < m_num_threads; ++n) { 00170 boost::shared_ptr<boost::thread> new_thread(new boost::thread( boost::bind(&scheduler::process_service_work, 00171 this, boost::ref(m_service_pool[n]->first)) )); 00172 m_thread_pool.push_back(new_thread); 00173 } 00174 } 00175 } 00176 00177 00178 } // end namespace pion