pion
5.0.6
|
00001 // --------------------------------------------------------------------- 00002 // pion: a Boost C++ framework for building lightweight HTTP interfaces 00003 // --------------------------------------------------------------------- 00004 // Copyright (C) 2007-2014 Splunk Inc. (https://github.com/splunk/pion) 00005 // 00006 // Distributed under the Boost Software License, Version 1.0. 00007 // See http://www.boost.org/LICENSE_1_0.txt 00008 // 00009 00010 #ifndef __PION_SCHEDULER_HEADER__ 00011 #define __PION_SCHEDULER_HEADER__ 00012 00013 #include <vector> 00014 #include <boost/asio.hpp> 00015 #include <boost/assert.hpp> 00016 #include <boost/bind.hpp> 00017 #include <boost/function/function0.hpp> 00018 #include <boost/cstdint.hpp> 00019 #include <boost/shared_ptr.hpp> 00020 #include <boost/noncopyable.hpp> 00021 #include <boost/thread/thread.hpp> 00022 #include <boost/thread/mutex.hpp> 00023 #include <boost/thread/xtime.hpp> 00024 #include <boost/thread/condition.hpp> 00025 #include <pion/config.hpp> 00026 #include <pion/logger.hpp> 00027 00028 00029 namespace pion { // begin namespace pion 00030 00034 class PION_API scheduler : 00035 private boost::noncopyable 00036 { 00037 public: 00038 00040 scheduler(void) 00041 : m_logger(PION_GET_LOGGER("pion.scheduler")), 00042 m_num_threads(DEFAULT_NUM_THREADS), m_active_users(0), m_is_running(false) 00043 {} 00044 00046 virtual ~scheduler() {} 00047 00049 virtual void startup(void) {} 00050 00052 virtual void shutdown(void); 00053 00055 void join(void); 00056 00060 void add_active_user(void); 00061 00063 void remove_active_user(void); 00064 00066 inline bool is_running(void) const { return m_is_running; } 00067 00069 inline void set_num_threads(const boost::uint32_t n) { m_num_threads = n; } 00070 00072 inline boost::uint32_t get_num_threads(void) const { return m_num_threads; } 00073 00075 inline void set_logger(logger log_ptr) { m_logger = log_ptr; } 00076 00078 inline logger get_logger(void) { return m_logger; } 00079 00081 virtual boost::asio::io_service& get_io_service(void) = 0; 00082 00088 virtual void post(boost::function0<void> work_func) { 00089 get_io_service().post(work_func); 00090 } 00091 00098 void keep_running(boost::asio::io_service& my_service, 00099 boost::asio::deadline_timer& my_timer); 00100 00107 inline static void sleep(boost::uint32_t sleep_sec, boost::uint32_t sleep_nsec) { 00108 boost::system_time wakeup_time(get_wakeup_time(sleep_sec, sleep_nsec)); 00109 boost::thread::sleep(wakeup_time); 00110 } 00111 00121 template <typename ConditionType, typename LockType> 00122 inline static void sleep(ConditionType& wakeup_condition, LockType& wakeup_lock, 00123 boost::uint32_t sleep_sec, boost::uint32_t sleep_nsec) 00124 { 00125 boost::system_time wakeup_time(get_wakeup_time(sleep_sec, sleep_nsec)); 00126 wakeup_condition.timed_wait(wakeup_lock, wakeup_time); 00127 } 00128 00129 00131 void process_service_work(boost::asio::io_service& service); 00132 00133 00134 protected: 00135 00144 static boost::system_time get_wakeup_time(boost::uint32_t sleep_sec, 00145 boost::uint32_t sleep_nsec); 00146 00148 virtual void stop_services(void) {} 00149 00151 virtual void stop_threads(void) {} 00152 00154 virtual void finish_services(void) {} 00155 00157 virtual void finish_threads(void) {} 00158 00159 00161 static const boost::uint32_t DEFAULT_NUM_THREADS; 00162 00164 static const boost::uint32_t NSEC_IN_SECOND; 00165 00167 static const boost::uint32_t MICROSEC_IN_SECOND; 00168 00170 static const boost::uint32_t KEEP_RUNNING_TIMER_SECONDS; 00171 00172 00174 boost::mutex m_mutex; 00175 00177 logger m_logger; 00178 00180 boost::condition m_no_more_active_users; 00181 00183 boost::condition m_scheduler_has_stopped; 00184 00186 boost::uint32_t m_num_threads; 00187 00189 boost::uint32_t m_active_users; 00190 00192 bool m_is_running; 00193 }; 00194 00195 00199 class PION_API multi_thread_scheduler : 00200 public scheduler 00201 { 00202 public: 00203 00205 multi_thread_scheduler(void) {} 00206 00208 virtual ~multi_thread_scheduler() {} 00209 00210 00211 protected: 00212 00214 virtual void stop_threads(void) { 00215 if (! m_thread_pool.empty()) { 00216 PION_LOG_DEBUG(m_logger, "Waiting for threads to shutdown"); 00217 00218 // wait until all threads in the pool have stopped 00219 boost::thread current_thread; 00220 for (ThreadPool::iterator i = m_thread_pool.begin(); 00221 i != m_thread_pool.end(); ++i) 00222 { 00223 // make sure we do not call join() for the current thread, 00224 // since this may yield "undefined behavior" 00225 if (**i != current_thread) (*i)->join(); 00226 } 00227 } 00228 } 00229 00231 virtual void finish_threads(void) { m_thread_pool.clear(); } 00232 00233 00235 typedef std::vector<boost::shared_ptr<boost::thread> > ThreadPool; 00236 00237 00239 ThreadPool m_thread_pool; 00240 }; 00241 00242 00246 class PION_API single_service_scheduler : 00247 public multi_thread_scheduler 00248 { 00249 public: 00250 00252 single_service_scheduler(void) 00253 : m_service(), m_timer(m_service) 00254 {} 00255 00257 virtual ~single_service_scheduler() { shutdown(); } 00258 00260 virtual boost::asio::io_service& get_io_service(void) { return m_service; } 00261 00263 virtual void startup(void); 00264 00265 00266 protected: 00267 00269 virtual void stop_services(void) { m_service.stop(); } 00270 00272 virtual void finish_services(void) { m_service.reset(); } 00273 00274 00276 boost::asio::io_service m_service; 00277 00279 boost::asio::deadline_timer m_timer; 00280 }; 00281 00282 00286 class PION_API one_to_one_scheduler : 00287 public multi_thread_scheduler 00288 { 00289 public: 00290 00292 one_to_one_scheduler(void) 00293 : m_service_pool(), m_next_service(0) 00294 {} 00295 00297 virtual ~one_to_one_scheduler() { shutdown(); } 00298 00300 virtual boost::asio::io_service& get_io_service(void) { 00301 boost::mutex::scoped_lock scheduler_lock(m_mutex); 00302 while (m_service_pool.size() < m_num_threads) { 00303 boost::shared_ptr<service_pair_type> service_ptr(new service_pair_type()); 00304 m_service_pool.push_back(service_ptr); 00305 } 00306 if (++m_next_service >= m_num_threads) 00307 m_next_service = 0; 00308 BOOST_ASSERT(m_next_service < m_num_threads); 00309 return m_service_pool[m_next_service]->first; 00310 } 00311 00318 virtual boost::asio::io_service& get_io_service(boost::uint32_t n) { 00319 BOOST_ASSERT(n < m_num_threads); 00320 BOOST_ASSERT(n < m_service_pool.size()); 00321 return m_service_pool[n]->first; 00322 } 00323 00325 virtual void startup(void); 00326 00327 00328 protected: 00329 00331 virtual void stop_services(void) { 00332 for (service_pool_type::iterator i = m_service_pool.begin(); i != m_service_pool.end(); ++i) { 00333 (*i)->first.stop(); 00334 } 00335 } 00336 00338 virtual void finish_services(void) { m_service_pool.clear(); } 00339 00340 00342 struct service_pair_type { 00343 service_pair_type(void) : first(), second(first) {} 00344 boost::asio::io_service first; 00345 boost::asio::deadline_timer second; 00346 }; 00347 00349 typedef std::vector<boost::shared_ptr<service_pair_type> > service_pool_type; 00350 00351 00353 service_pool_type m_service_pool; 00354 00356 boost::uint32_t m_next_service; 00357 }; 00358 00359 00360 } // end namespace pion 00361 00362 #endif