pion
5.0.6
|
00001 // --------------------------------------------------------------------- 00002 // pion: a Boost C++ framework for building lightweight HTTP interfaces 00003 // --------------------------------------------------------------------- 00004 // Copyright (C) 2007-2014 Splunk Inc. (https://github.com/splunk/pion) 00005 // 00006 // Distributed under the Boost Software License, Version 1.0. 00007 // See http://www.boost.org/LICENSE_1_0.txt 00008 // 00009 00010 #ifndef __PION_TCP_STREAM_HEADER__ 00011 #define __PION_TCP_STREAM_HEADER__ 00012 00013 #include <cstring> 00014 #include <istream> 00015 #include <streambuf> 00016 #include <boost/bind.hpp> 00017 #include <boost/thread/mutex.hpp> 00018 #include <boost/thread/condition.hpp> 00019 #include <pion/config.hpp> 00020 #include <pion/tcp/connection.hpp> 00021 00022 00023 namespace pion { // begin namespace pion 00024 namespace tcp { // begin namespace tcp 00025 00026 00032 class stream_buffer 00033 : public std::basic_streambuf<char, std::char_traits<char> > 00034 { 00035 public: 00036 00037 // data type definitions required for iostream compatability 00038 typedef char char_type; 00039 typedef std::char_traits<char>::int_type int_type; 00040 typedef std::char_traits<char>::off_type off_type; 00041 typedef std::char_traits<char>::pos_type pos_type; 00042 typedef std::char_traits<char> traits_type; 00043 00044 // some integer constants used within stream_buffer 00045 enum { 00046 PUT_BACK_MAX = 10, //< number of bytes that can be put back into the read buffer 00047 WRITE_BUFFER_SIZE = 8192 //< size of the write buffer 00048 }; 00049 00050 00056 explicit stream_buffer(tcp::connection_ptr& conn_ptr) 00057 : m_conn_ptr(conn_ptr), m_read_buf(m_conn_ptr->get_read_buffer().c_array()) 00058 { 00059 setup_buffers(); 00060 } 00061 00068 explicit stream_buffer(boost::asio::io_service& io_service, 00069 const bool ssl_flag = false) 00070 : m_conn_ptr(new connection(io_service, ssl_flag)), 00071 m_read_buf(m_conn_ptr->get_read_buffer().c_array()) 00072 { 00073 setup_buffers(); 00074 } 00075 00082 stream_buffer(boost::asio::io_service& io_service, 00083 connection::ssl_context_type& ssl_context) 00084 : m_conn_ptr(new connection(io_service, ssl_context)), 00085 m_read_buf(m_conn_ptr->get_read_buffer().c_array()) 00086 { 00087 setup_buffers(); 00088 } 00089 00091 virtual ~stream_buffer() { sync(); } 00092 00094 connection& get_connection(void) { return *m_conn_ptr; } 00095 00097 const connection& get_connection(void) const { return *m_conn_ptr; } 00098 00099 00100 protected: 00101 00103 inline void setup_buffers(void) { 00104 // use the TCP connection's read buffer and allow for bytes to be put back 00105 setg(m_read_buf+PUT_BACK_MAX, m_read_buf+PUT_BACK_MAX, m_read_buf+PUT_BACK_MAX); 00106 // set write buffer size-1 so that we have an extra char avail for overflow 00107 setp(m_write_buf, m_write_buf+(WRITE_BUFFER_SIZE-1)); 00108 } 00109 00115 inline int_type flush_output(void) { 00116 const std::streamsize bytes_to_send = std::streamsize(pptr() - pbase()); 00117 int_type bytes_sent = 0; 00118 if (bytes_to_send > 0) { 00119 boost::mutex::scoped_lock async_lock(m_async_mutex); 00120 m_bytes_transferred = 0; 00121 m_conn_ptr->async_write(boost::asio::buffer(pbase(), bytes_to_send), 00122 boost::bind(&stream_buffer::operation_finished, this, 00123 boost::asio::placeholders::error, 00124 boost::asio::placeholders::bytes_transferred)); 00125 m_async_done.wait(async_lock); 00126 bytes_sent = m_bytes_transferred; 00127 pbump(-bytes_sent); 00128 if (m_async_error) 00129 bytes_sent = traits_type::eof(); 00130 } 00131 return bytes_sent; 00132 } 00133 00139 virtual int_type underflow(void) { 00140 // first check if we still have bytes available in the read buffer 00141 if (gptr() < egptr()) 00142 return traits_type::to_int_type(*gptr()); 00143 00144 // calculate the number of bytes we will allow to be put back 00145 std::streamsize put_back_num = std::streamsize(gptr() - eback()); 00146 if (put_back_num > PUT_BACK_MAX) 00147 put_back_num = PUT_BACK_MAX; 00148 00149 // copy the last bytes read to the beginning of the buffer (for put back) 00150 if (put_back_num > 0) 00151 memmove(m_read_buf+(PUT_BACK_MAX-put_back_num), gptr()-put_back_num, put_back_num); 00152 00153 // read data from the TCP connection 00154 // note that this has to be an ansynchronous call; otherwise, it cannot 00155 // be cancelled by other threads and will block forever (such as during shutdown) 00156 boost::mutex::scoped_lock async_lock(m_async_mutex); 00157 m_bytes_transferred = 0; 00158 m_conn_ptr->async_read_some(boost::asio::buffer(m_read_buf+PUT_BACK_MAX, 00159 connection::READ_BUFFER_SIZE-PUT_BACK_MAX), 00160 boost::bind(&stream_buffer::operation_finished, this, 00161 boost::asio::placeholders::error, 00162 boost::asio::placeholders::bytes_transferred)); 00163 m_async_done.wait(async_lock); 00164 if (m_async_error) 00165 return traits_type::eof(); 00166 00167 // reset buffer pointers now that data is available 00168 setg(m_read_buf+(PUT_BACK_MAX-put_back_num), //< beginning of putback bytes 00169 m_read_buf+PUT_BACK_MAX, //< read position 00170 m_read_buf+PUT_BACK_MAX+m_bytes_transferred); //< end of buffer 00171 00172 // return next character available 00173 return traits_type::to_int_type(*gptr()); 00174 } 00175 00182 virtual int_type overflow(int_type c) { 00183 if (! traits_type::eq_int_type(c, traits_type::eof())) { 00184 // character is not eof -> add it to the end of the write buffer 00185 // we can push this to the back of the write buffer because we set 00186 // the size of the write buffer to 1 less than the actual size using setp() 00187 *pptr() = c; 00188 pbump(1); 00189 } 00190 // flush data in the write buffer by sending it to the TCP connection 00191 return ((flush_output() == traits_type::eof()) 00192 ? traits_type::eof() : traits_type::not_eof(c)); 00193 } 00194 00203 virtual std::streamsize xsputn(const char_type *s, std::streamsize n) { 00204 const std::streamsize bytes_available = std::streamsize(epptr() - pptr()); 00205 std::streamsize bytes_sent = 0; 00206 if (bytes_available >= n) { 00207 // there is enough room in the buffer -> just put it in there 00208 memcpy(pptr(), s, n); 00209 pbump(n); 00210 bytes_sent = n; 00211 } else { 00212 // there is not enough room left in the buffer 00213 if (bytes_available > 0) { 00214 // fill up the buffer 00215 memcpy(pptr(), s, bytes_available); 00216 pbump(bytes_available); 00217 } 00218 // flush data in the write buffer by sending it to the TCP connection 00219 if (flush_output() == traits_type::eof()) 00220 return 0; 00221 if ((n-bytes_available) >= (WRITE_BUFFER_SIZE-1)) { 00222 // the remaining data to send is larger than the buffer available 00223 // send it all now rather than buffering 00224 boost::mutex::scoped_lock async_lock(m_async_mutex); 00225 m_bytes_transferred = 0; 00226 m_conn_ptr->async_write(boost::asio::buffer(s+bytes_available, 00227 n-bytes_available), 00228 boost::bind(&stream_buffer::operation_finished, this, 00229 boost::asio::placeholders::error, 00230 boost::asio::placeholders::bytes_transferred)); 00231 m_async_done.wait(async_lock); 00232 bytes_sent = bytes_available + m_bytes_transferred; 00233 } else { 00234 // the buffer is larger than the remaining data 00235 // put remaining data to the beginning of the output buffer 00236 memcpy(pbase(), s+bytes_available, n-bytes_available); 00237 pbump(n-bytes_available); 00238 bytes_sent = n; 00239 } 00240 } 00241 return bytes_sent; 00242 } 00243 00252 virtual std::streamsize xsgetn(char_type *s, std::streamsize n) { 00253 std::streamsize bytes_remaining = n; 00254 while (bytes_remaining > 0) { 00255 const std::streamsize bytes_available = std::streamsize(egptr() - gptr()); 00256 const std::streamsize bytes_next_read = ((bytes_available >= bytes_remaining) 00257 ? bytes_remaining : bytes_available); 00258 // copy available input data from buffer 00259 if (bytes_next_read > 0) { 00260 memcpy(s, gptr(), bytes_next_read); 00261 gbump(bytes_next_read); 00262 bytes_remaining -= bytes_next_read; 00263 s += bytes_next_read; 00264 } 00265 if (bytes_remaining > 0) { 00266 // call underflow() to read more data 00267 if (traits_type::eq_int_type(underflow(), traits_type::eof())) 00268 break; 00269 } 00270 } 00271 return(n-bytes_remaining); 00272 } 00273 00279 virtual int_type sync(void) { 00280 return ((flush_output() == traits_type::eof()) ? -1 : 0); 00281 } 00282 00283 00284 private: 00285 00287 inline void operation_finished(const boost::system::error_code& error_code, 00288 std::size_t bytes_transferred) 00289 { 00290 boost::mutex::scoped_lock async_lock(m_async_mutex); 00291 m_async_error = error_code; 00292 m_bytes_transferred = bytes_transferred; 00293 m_async_done.notify_one(); 00294 } 00295 00296 00298 tcp::connection_ptr m_conn_ptr; 00299 00301 boost::mutex m_async_mutex; 00302 00304 boost::condition m_async_done; 00305 00307 boost::system::error_code m_async_error; 00308 00310 std::size_t m_bytes_transferred; 00311 00313 char_type * m_read_buf; 00314 00316 char_type m_write_buf[WRITE_BUFFER_SIZE]; 00317 }; 00318 00319 00323 class stream 00324 : public std::basic_iostream<char, std::char_traits<char> > 00325 { 00326 public: 00327 00328 // data type definitions required for iostream compatability 00329 typedef char char_type; 00330 typedef std::char_traits<char>::int_type int_type; 00331 typedef std::char_traits<char>::off_type off_type; 00332 typedef std::char_traits<char>::pos_type pos_type; 00333 typedef std::char_traits<char> traits_type; 00334 00335 00341 explicit stream(tcp::connection_ptr& conn_ptr) 00342 : std::basic_iostream<char, std::char_traits<char> >(NULL), m_tcp_buf(conn_ptr) 00343 { 00344 // initialize basic_iostream with pointer to the stream buffer 00345 std::basic_ios<char,std::char_traits<char> >::init(&m_tcp_buf); 00346 } 00347 00354 explicit stream(boost::asio::io_service& io_service, 00355 const bool ssl_flag = false) 00356 : std::basic_iostream<char, std::char_traits<char> >(NULL), m_tcp_buf(io_service, ssl_flag) 00357 { 00358 // initialize basic_iostream with pointer to the stream buffer 00359 std::basic_ios<char,std::char_traits<char> >::init(&m_tcp_buf); 00360 } 00361 00368 stream(boost::asio::io_service& io_service, 00369 connection::ssl_context_type& ssl_context) 00370 : std::basic_iostream<char, std::char_traits<char> >(NULL), m_tcp_buf(io_service, ssl_context) 00371 { 00372 // initialize basic_iostream with pointer to the stream buffer 00373 std::basic_ios<char,std::char_traits<char> >::init(&m_tcp_buf); 00374 } 00375 00384 inline boost::system::error_code accept(boost::asio::ip::tcp::acceptor& tcp_acceptor) 00385 { 00386 boost::system::error_code ec = m_tcp_buf.get_connection().accept(tcp_acceptor); 00387 if (! ec && get_ssl_flag()) ec = m_tcp_buf.get_connection().handshake_server(); 00388 return ec; 00389 } 00390 00399 inline boost::system::error_code connect(boost::asio::ip::tcp::endpoint& tcp_endpoint) 00400 { 00401 boost::system::error_code ec = m_tcp_buf.get_connection().connect(tcp_endpoint); 00402 if (! ec && get_ssl_flag()) ec = m_tcp_buf.get_connection().handshake_client(); 00403 return ec; 00404 } 00405 00415 inline boost::system::error_code connect(const boost::asio::ip::address& remote_addr, 00416 const unsigned int remote_port) 00417 { 00418 boost::asio::ip::tcp::endpoint tcp_endpoint(remote_addr, remote_port); 00419 boost::system::error_code ec = m_tcp_buf.get_connection().connect(tcp_endpoint); 00420 if (! ec && get_ssl_flag()) ec = m_tcp_buf.get_connection().handshake_client(); 00421 return ec; 00422 } 00423 00425 inline void close(void) { m_tcp_buf.get_connection().close(); } 00426 00427 /* 00428 Use close instead; basic_socket::cancel is deprecated for Windows XP. 00429 00431 inline void cancel(void) { m_tcp_buf.get_connection().cancel(); } 00432 */ 00433 00435 inline bool is_open(void) const { return m_tcp_buf.get_connection().is_open(); } 00436 00438 inline bool get_ssl_flag(void) const { return m_tcp_buf.get_connection().get_ssl_flag(); } 00439 00441 inline boost::asio::ip::address get_remote_ip(void) const { 00442 return m_tcp_buf.get_connection().get_remote_ip(); 00443 } 00444 00446 stream_buffer *rdbuf(void) { return &m_tcp_buf; } 00447 00448 00449 private: 00450 00452 stream_buffer m_tcp_buf; 00453 }; 00454 00455 00456 } // end namespace tcp 00457 } // end namespace pion 00458 00459 #endif