pion  5.0.6
include/pion/scheduler.hpp
00001 // ---------------------------------------------------------------------
00002 // pion:  a Boost C++ framework for building lightweight HTTP interfaces
00003 // ---------------------------------------------------------------------
00004 // Copyright (C) 2007-2014 Splunk Inc.  (https://github.com/splunk/pion)
00005 //
00006 // Distributed under the Boost Software License, Version 1.0.
00007 // See http://www.boost.org/LICENSE_1_0.txt
00008 //
00009 
00010 #ifndef __PION_SCHEDULER_HEADER__
00011 #define __PION_SCHEDULER_HEADER__
00012 
00013 #include <vector>
00014 #include <boost/asio.hpp>
00015 #include <boost/assert.hpp>
00016 #include <boost/bind.hpp>
00017 #include <boost/function/function0.hpp>
00018 #include <boost/cstdint.hpp>
00019 #include <boost/shared_ptr.hpp>
00020 #include <boost/noncopyable.hpp>
00021 #include <boost/thread/thread.hpp>
00022 #include <boost/thread/mutex.hpp>
00023 #include <boost/thread/xtime.hpp>
00024 #include <boost/thread/condition.hpp>
00025 #include <pion/config.hpp>
00026 #include <pion/logger.hpp>
00027 
00028 
00029 namespace pion {    // begin namespace pion
00030 
00034 class PION_API scheduler :
00035     private boost::noncopyable
00036 {
00037 public:
00038 
00040     scheduler(void)
00041         : m_logger(PION_GET_LOGGER("pion.scheduler")),
00042         m_num_threads(DEFAULT_NUM_THREADS), m_active_users(0), m_is_running(false)
00043     {}
00044     
00046     virtual ~scheduler() {}
00047 
00049     virtual void startup(void) {}
00050     
00052     virtual void shutdown(void);
00053 
00055     void join(void);
00056     
00060     void add_active_user(void);
00061 
00063     void remove_active_user(void);
00064     
00066     inline bool is_running(void) const { return m_is_running; }
00067     
00069     inline void set_num_threads(const boost::uint32_t n) { m_num_threads = n; }
00070     
00072     inline boost::uint32_t get_num_threads(void) const { return m_num_threads; }
00073 
00075     inline void set_logger(logger log_ptr) { m_logger = log_ptr; }
00076 
00078     inline logger get_logger(void) { return m_logger; }
00079     
00081     virtual boost::asio::io_service& get_io_service(void) = 0;
00082     
00088     virtual void post(boost::function0<void> work_func) {
00089         get_io_service().post(work_func);
00090     }
00091     
00098     void keep_running(boost::asio::io_service& my_service,
00099                      boost::asio::deadline_timer& my_timer);
00100     
00107     inline static void sleep(boost::uint32_t sleep_sec, boost::uint32_t sleep_nsec) {
00108         boost::system_time wakeup_time(get_wakeup_time(sleep_sec, sleep_nsec));
00109         boost::thread::sleep(wakeup_time);
00110     }
00111 
00121     template <typename ConditionType, typename LockType>
00122     inline static void sleep(ConditionType& wakeup_condition, LockType& wakeup_lock,
00123                              boost::uint32_t sleep_sec, boost::uint32_t sleep_nsec)
00124     {
00125         boost::system_time wakeup_time(get_wakeup_time(sleep_sec, sleep_nsec));
00126         wakeup_condition.timed_wait(wakeup_lock, wakeup_time);
00127     }
00128     
00129     
00131     void process_service_work(boost::asio::io_service& service);
00132 
00133 
00134 protected:
00135 
00144     static boost::system_time get_wakeup_time(boost::uint32_t sleep_sec,
00145         boost::uint32_t sleep_nsec);
00146 
00148     virtual void stop_services(void) {}
00149     
00151     virtual void stop_threads(void) {}
00152 
00154     virtual void finish_services(void) {}
00155 
00157     virtual void finish_threads(void) {}
00158     
00159     
00161     static const boost::uint32_t    DEFAULT_NUM_THREADS;
00162 
00164     static const boost::uint32_t    NSEC_IN_SECOND;
00165 
00167     static const boost::uint32_t    MICROSEC_IN_SECOND;
00168     
00170     static const boost::uint32_t    KEEP_RUNNING_TIMER_SECONDS;
00171 
00172 
00174     boost::mutex                    m_mutex;
00175     
00177     logger                          m_logger;
00178 
00180     boost::condition                m_no_more_active_users;
00181 
00183     boost::condition                m_scheduler_has_stopped;
00184 
00186     boost::uint32_t                 m_num_threads;
00187 
00189     boost::uint32_t                 m_active_users;
00190 
00192     bool                            m_is_running;
00193 };
00194 
00195     
00199 class PION_API multi_thread_scheduler :
00200     public scheduler
00201 {
00202 public:
00203     
00205     multi_thread_scheduler(void) {}
00206     
00208     virtual ~multi_thread_scheduler() {}
00209 
00210     
00211 protected:
00212     
00214     virtual void stop_threads(void) {
00215         if (! m_thread_pool.empty()) {
00216             PION_LOG_DEBUG(m_logger, "Waiting for threads to shutdown");
00217             
00218             // wait until all threads in the pool have stopped
00219             boost::thread current_thread;
00220             for (ThreadPool::iterator i = m_thread_pool.begin();
00221                  i != m_thread_pool.end(); ++i)
00222             {
00223                 // make sure we do not call join() for the current thread,
00224                 // since this may yield "undefined behavior"
00225                 if (**i != current_thread) (*i)->join();
00226             }
00227         }
00228     }
00229     
00231     virtual void finish_threads(void) { m_thread_pool.clear(); }
00232 
00233     
00235     typedef std::vector<boost::shared_ptr<boost::thread> >  ThreadPool;
00236     
00237     
00239     ThreadPool              m_thread_pool;
00240 };
00241     
00242     
00246 class PION_API single_service_scheduler :
00247     public multi_thread_scheduler
00248 {
00249 public:
00250     
00252     single_service_scheduler(void)
00253         : m_service(), m_timer(m_service)
00254     {}
00255     
00257     virtual ~single_service_scheduler() { shutdown(); }
00258     
00260     virtual boost::asio::io_service& get_io_service(void) { return m_service; }
00261     
00263     virtual void startup(void);
00264         
00265     
00266 protected:
00267     
00269     virtual void stop_services(void) { m_service.stop(); }
00270     
00272     virtual void finish_services(void) { m_service.reset(); }
00273 
00274     
00276     boost::asio::io_service         m_service;
00277     
00279     boost::asio::deadline_timer     m_timer;
00280 };
00281     
00282 
00286 class PION_API one_to_one_scheduler :
00287     public multi_thread_scheduler
00288 {
00289 public:
00290     
00292     one_to_one_scheduler(void)
00293         : m_service_pool(), m_next_service(0)
00294     {}
00295     
00297     virtual ~one_to_one_scheduler() { shutdown(); }
00298     
00300     virtual boost::asio::io_service& get_io_service(void) {
00301         boost::mutex::scoped_lock scheduler_lock(m_mutex);
00302         while (m_service_pool.size() < m_num_threads) {
00303             boost::shared_ptr<service_pair_type>  service_ptr(new service_pair_type());
00304             m_service_pool.push_back(service_ptr);
00305         }
00306         if (++m_next_service >= m_num_threads)
00307             m_next_service = 0;
00308         BOOST_ASSERT(m_next_service < m_num_threads);
00309         return m_service_pool[m_next_service]->first;
00310     }
00311     
00318     virtual boost::asio::io_service& get_io_service(boost::uint32_t n) {
00319         BOOST_ASSERT(n < m_num_threads);
00320         BOOST_ASSERT(n < m_service_pool.size());
00321         return m_service_pool[n]->first;
00322     }
00323 
00325     virtual void startup(void);
00326     
00327     
00328 protected:
00329     
00331     virtual void stop_services(void) {
00332         for (service_pool_type::iterator i = m_service_pool.begin(); i != m_service_pool.end(); ++i) {
00333             (*i)->first.stop();
00334         }
00335     }
00336         
00338     virtual void finish_services(void) { m_service_pool.clear(); }
00339     
00340 
00342     struct service_pair_type {
00343         service_pair_type(void) : first(), second(first) {}
00344         boost::asio::io_service         first;
00345         boost::asio::deadline_timer     second;
00346     };
00347     
00349     typedef std::vector<boost::shared_ptr<service_pair_type> >        service_pool_type;
00350 
00351     
00353     service_pool_type   m_service_pool;
00354 
00356     boost::uint32_t     m_next_service;
00357 };
00358     
00359     
00360 }   // end namespace pion
00361 
00362 #endif