pion  5.0.6
src/tcp_server.cpp
00001 // ---------------------------------------------------------------------
00002 // pion:  a Boost C++ framework for building lightweight HTTP interfaces
00003 // ---------------------------------------------------------------------
00004 // Copyright (C) 2007-2014 Splunk Inc.  (https://github.com/splunk/pion)
00005 //
00006 // Distributed under the Boost Software License, Version 1.0.
00007 // See http://www.boost.org/LICENSE_1_0.txt
00008 //
00009 
00010 #include <boost/asio.hpp>
00011 #include <boost/bind.hpp>
00012 #include <boost/thread/mutex.hpp>
00013 #include <pion/admin_rights.hpp>
00014 #include <pion/tcp/server.hpp>
00015 
00016 
00017 namespace pion {    // begin namespace pion
00018 namespace tcp {     // begin namespace tcp
00019 
00020     
00021 // tcp::server member functions
00022 
00023 server::server(scheduler& sched, const unsigned int tcp_port)
00024     : m_logger(PION_GET_LOGGER("pion.tcp.server")),
00025     m_active_scheduler(sched),
00026     m_tcp_acceptor(m_active_scheduler.get_io_service()),
00027 #ifdef PION_HAVE_SSL
00028     m_ssl_context(m_active_scheduler.get_io_service(), boost::asio::ssl::context::sslv23),
00029 #else
00030     m_ssl_context(0),
00031 #endif
00032     m_endpoint(boost::asio::ip::tcp::v4(), tcp_port), m_ssl_flag(false), m_is_listening(false)
00033 {}
00034     
00035 server::server(scheduler& sched, const boost::asio::ip::tcp::endpoint& endpoint)
00036     : m_logger(PION_GET_LOGGER("pion.tcp.server")),
00037     m_active_scheduler(sched),
00038     m_tcp_acceptor(m_active_scheduler.get_io_service()),
00039 #ifdef PION_HAVE_SSL
00040     m_ssl_context(m_active_scheduler.get_io_service(), boost::asio::ssl::context::sslv23),
00041 #else
00042     m_ssl_context(0),
00043 #endif
00044     m_endpoint(endpoint), m_ssl_flag(false), m_is_listening(false)
00045 {}
00046 
00047 server::server(const unsigned int tcp_port)
00048     : m_logger(PION_GET_LOGGER("pion.tcp.server")),
00049     m_default_scheduler(), m_active_scheduler(m_default_scheduler),
00050     m_tcp_acceptor(m_active_scheduler.get_io_service()),
00051 #ifdef PION_HAVE_SSL
00052     m_ssl_context(m_active_scheduler.get_io_service(), boost::asio::ssl::context::sslv23),
00053 #else
00054     m_ssl_context(0),
00055 #endif
00056     m_endpoint(boost::asio::ip::tcp::v4(), tcp_port), m_ssl_flag(false), m_is_listening(false)
00057 {}
00058 
00059 server::server(const boost::asio::ip::tcp::endpoint& endpoint)
00060     : m_logger(PION_GET_LOGGER("pion.tcp.server")),
00061     m_default_scheduler(), m_active_scheduler(m_default_scheduler),
00062     m_tcp_acceptor(m_active_scheduler.get_io_service()),
00063 #ifdef PION_HAVE_SSL
00064     m_ssl_context(m_active_scheduler.get_io_service(), boost::asio::ssl::context::sslv23),
00065 #else
00066     m_ssl_context(0),
00067 #endif
00068     m_endpoint(endpoint), m_ssl_flag(false), m_is_listening(false)
00069 {}
00070     
00071 void server::start(void)
00072 {
00073     // lock mutex for thread safety
00074     boost::mutex::scoped_lock server_lock(m_mutex);
00075 
00076     if (! m_is_listening) {
00077         PION_LOG_INFO(m_logger, "Starting server on port " << get_port());
00078         
00079         before_starting();
00080 
00081         // configure the acceptor service
00082         try {
00083             // get admin permissions in case we're binding to a privileged port
00084             pion::admin_rights use_admin_rights(get_port() > 0 && get_port() < 1024);
00085             m_tcp_acceptor.open(m_endpoint.protocol());
00086             // allow the acceptor to reuse the address (i.e. SO_REUSEADDR)
00087             // ...except when running not on Windows - see http://msdn.microsoft.com/en-us/library/ms740621%28VS.85%29.aspx
00088 #ifndef _MSC_VER
00089             m_tcp_acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
00090 #endif
00091             m_tcp_acceptor.bind(m_endpoint);
00092             if (m_endpoint.port() == 0) {
00093                 // update the endpoint to reflect the port chosen by bind
00094                 m_endpoint = m_tcp_acceptor.local_endpoint();
00095             }
00096             m_tcp_acceptor.listen();
00097         } catch (std::exception& e) {
00098             PION_LOG_ERROR(m_logger, "Unable to bind to port " << get_port() << ": " << e.what());
00099             throw;
00100         }
00101 
00102         m_is_listening = true;
00103 
00104         // unlock the mutex since listen() requires its own lock
00105         server_lock.unlock();
00106         listen();
00107         
00108         // notify the thread scheduler that we need it now
00109         m_active_scheduler.add_active_user();
00110     }
00111 }
00112 
00113 void server::stop(bool wait_until_finished)
00114 {
00115     // lock mutex for thread safety
00116     boost::mutex::scoped_lock server_lock(m_mutex);
00117 
00118     if (m_is_listening) {
00119         PION_LOG_INFO(m_logger, "Shutting down server on port " << get_port());
00120     
00121         m_is_listening = false;
00122 
00123         // this terminates any connections waiting to be accepted
00124         m_tcp_acceptor.close();
00125         
00126         if (! wait_until_finished) {
00127             // this terminates any other open connections
00128             std::for_each(m_conn_pool.begin(), m_conn_pool.end(),
00129                           boost::bind(&connection::close, _1));
00130         }
00131     
00132         // wait for all pending connections to complete
00133         while (! m_conn_pool.empty()) {
00134             // try to prun connections that didn't finish cleanly
00135             if (prune_connections() == 0)
00136                 break;  // if no more left, then we can stop waiting
00137             // sleep for up to a quarter second to give open connections a chance to finish
00138             PION_LOG_INFO(m_logger, "Waiting for open connections to finish");
00139             scheduler::sleep(m_no_more_connections, server_lock, 0, 250000000);
00140         }
00141         
00142         // notify the thread scheduler that we no longer need it
00143         m_active_scheduler.remove_active_user();
00144         
00145         // all done!
00146         after_stopping();
00147         m_server_has_stopped.notify_all();
00148     }
00149 }
00150 
00151 void server::join(void)
00152 {
00153     boost::mutex::scoped_lock server_lock(m_mutex);
00154     while (m_is_listening) {
00155         // sleep until server_has_stopped condition is signaled
00156         m_server_has_stopped.wait(server_lock);
00157     }
00158 }
00159 
00160 void server::set_ssl_key_file(const std::string& pem_key_file)
00161 {
00162     // configure server for SSL
00163     set_ssl_flag(true);
00164 #ifdef PION_HAVE_SSL
00165     m_ssl_context.set_options(boost::asio::ssl::context::default_workarounds
00166                               | boost::asio::ssl::context::no_sslv2
00167                               | boost::asio::ssl::context::single_dh_use);
00168     m_ssl_context.use_certificate_file(pem_key_file, boost::asio::ssl::context::pem);
00169     m_ssl_context.use_private_key_file(pem_key_file, boost::asio::ssl::context::pem);
00170 #endif
00171 }
00172 
00173 void server::listen(void)
00174 {
00175     // lock mutex for thread safety
00176     boost::mutex::scoped_lock server_lock(m_mutex);
00177     
00178     if (m_is_listening) {
00179         // create a new TCP connection object
00180         tcp::connection_ptr new_connection(connection::create(get_io_service(),
00181                                                               m_ssl_context, m_ssl_flag,
00182                                                               boost::bind(&server::finish_connection,
00183                                                                           this, _1)));
00184         
00185         // prune connections that finished uncleanly
00186         prune_connections();
00187 
00188         // keep track of the object in the server's connection pool
00189         m_conn_pool.insert(new_connection);
00190         
00191         // use the object to accept a new connection
00192         new_connection->async_accept(m_tcp_acceptor,
00193                                      boost::bind(&server::handle_accept,
00194                                                  this, new_connection,
00195                                                  boost::asio::placeholders::error));
00196     }
00197 }
00198 
00199 void server::handle_accept(tcp::connection_ptr& tcp_conn,
00200                              const boost::system::error_code& accept_error)
00201 {
00202     if (accept_error) {
00203         // an error occured while trying to a accept a new connection
00204         // this happens when the server is being shut down
00205         if (m_is_listening) {
00206             listen();   // schedule acceptance of another connection
00207             PION_LOG_WARN(m_logger, "Accept error on port " << get_port() << ": " << accept_error.message());
00208         }
00209         finish_connection(tcp_conn);
00210     } else {
00211         // got a new TCP connection
00212         PION_LOG_DEBUG(m_logger, "New" << (tcp_conn->get_ssl_flag() ? " SSL " : " ")
00213                        << "connection on port " << get_port());
00214 
00215         // schedule the acceptance of another new connection
00216         // (this returns immediately since it schedules it as an event)
00217         if (m_is_listening) listen();
00218         
00219         // handle the new connection
00220 #ifdef PION_HAVE_SSL
00221         if (tcp_conn->get_ssl_flag()) {
00222             tcp_conn->async_handshake_server(boost::bind(&server::handle_ssl_handshake,
00223                                                          this, tcp_conn,
00224                                                          boost::asio::placeholders::error));
00225         } else
00226 #endif
00227             // not SSL -> call the handler immediately
00228             handle_connection(tcp_conn);
00229     }
00230 }
00231 
00232 void server::handle_ssl_handshake(tcp::connection_ptr& tcp_conn,
00233                                    const boost::system::error_code& handshake_error)
00234 {
00235     if (handshake_error) {
00236         // an error occured while trying to establish the SSL connection
00237         PION_LOG_WARN(m_logger, "SSL handshake failed on port " << get_port()
00238                       << " (" << handshake_error.message() << ')');
00239         finish_connection(tcp_conn);
00240     } else {
00241         // handle the new connection
00242         PION_LOG_DEBUG(m_logger, "SSL handshake succeeded on port " << get_port());
00243         handle_connection(tcp_conn);
00244     }
00245 }
00246 
00247 void server::finish_connection(tcp::connection_ptr& tcp_conn)
00248 {
00249     boost::mutex::scoped_lock server_lock(m_mutex);
00250     if (m_is_listening && tcp_conn->get_keep_alive()) {
00251         
00252         // keep the connection alive
00253         handle_connection(tcp_conn);
00254 
00255     } else {
00256         PION_LOG_DEBUG(m_logger, "Closing connection on port " << get_port());
00257         
00258         // remove the connection from the server's management pool
00259         ConnectionPool::iterator conn_itr = m_conn_pool.find(tcp_conn);
00260         if (conn_itr != m_conn_pool.end())
00261             m_conn_pool.erase(conn_itr);
00262 
00263         // trigger the no more connections condition if we're waiting to stop
00264         if (!m_is_listening && m_conn_pool.empty())
00265             m_no_more_connections.notify_all();
00266     }
00267 }
00268 
00269 std::size_t server::prune_connections(void)
00270 {
00271     // assumes that a server lock has already been acquired
00272     ConnectionPool::iterator conn_itr = m_conn_pool.begin();
00273     while (conn_itr != m_conn_pool.end()) {
00274         if (conn_itr->unique()) {
00275             PION_LOG_WARN(m_logger, "Closing orphaned connection on port " << get_port());
00276             ConnectionPool::iterator erase_itr = conn_itr;
00277             ++conn_itr;
00278             (*erase_itr)->close();
00279             m_conn_pool.erase(erase_itr);
00280         } else {
00281             ++conn_itr;
00282         }
00283     }
00284 
00285     // return the number of connections remaining
00286     return m_conn_pool.size();
00287 }
00288 
00289 std::size_t server::get_connections(void) const
00290 {
00291     boost::mutex::scoped_lock server_lock(m_mutex);
00292     return (m_is_listening ? (m_conn_pool.size() - 1) : m_conn_pool.size());
00293 }
00294 
00295 }   // end namespace tcp
00296 }   // end namespace pion