pion  5.0.6
stream.hpp
1 // ---------------------------------------------------------------------
2 // pion: a Boost C++ framework for building lightweight HTTP interfaces
3 // ---------------------------------------------------------------------
4 // Copyright (C) 2007-2014 Splunk Inc. (https://github.com/splunk/pion)
5 //
6 // Distributed under the Boost Software License, Version 1.0.
7 // See http://www.boost.org/LICENSE_1_0.txt
8 //
9 
10 #ifndef __PION_TCP_STREAM_HEADER__
11 #define __PION_TCP_STREAM_HEADER__
12 
13 #include <cstring>
14 #include <istream>
15 #include <streambuf>
16 #include <boost/bind.hpp>
17 #include <boost/thread/mutex.hpp>
18 #include <boost/thread/condition.hpp>
19 #include <pion/config.hpp>
20 #include <pion/tcp/connection.hpp>
21 
22 
23 namespace pion { // begin namespace pion
24 namespace tcp { // begin namespace tcp
25 
26 
33  : public std::basic_streambuf<char, std::char_traits<char> >
34 {
35 public:
36 
37  // data type definitions required for iostream compatability
38  typedef char char_type;
39  typedef std::char_traits<char>::int_type int_type;
40  typedef std::char_traits<char>::off_type off_type;
41  typedef std::char_traits<char>::pos_type pos_type;
42  typedef std::char_traits<char> traits_type;
43 
44  // some integer constants used within stream_buffer
45  enum {
46  PUT_BACK_MAX = 10, //< number of bytes that can be put back into the read buffer
47  WRITE_BUFFER_SIZE = 8192 //< size of the write buffer
48  };
49 
50 
56  explicit stream_buffer(const tcp::connection_ptr& conn_ptr)
57  : m_conn_ptr(conn_ptr), m_read_buf(m_conn_ptr->get_read_buffer().c_array())
58  {
59  setup_buffers();
60  }
61 
68  explicit stream_buffer(boost::asio::io_service& io_service,
69  const bool ssl_flag = false)
70  : m_conn_ptr(new connection(io_service, ssl_flag)),
71  m_read_buf(m_conn_ptr->get_read_buffer().c_array())
72  {
73  setup_buffers();
74  }
75 
82  stream_buffer(boost::asio::io_service& io_service,
83  connection::ssl_context_type& ssl_context)
84  : m_conn_ptr(new connection(io_service, ssl_context)),
85  m_read_buf(m_conn_ptr->get_read_buffer().c_array())
86  {
87  setup_buffers();
88  }
89 
91  virtual ~stream_buffer() { sync(); }
92 
94  connection& get_connection(void) { return *m_conn_ptr; }
95 
97  const connection& get_connection(void) const { return *m_conn_ptr; }
98 
99 
100 protected:
101 
103  inline void setup_buffers(void) {
104  // use the TCP connection's read buffer and allow for bytes to be put back
105  setg(m_read_buf+PUT_BACK_MAX, m_read_buf+PUT_BACK_MAX, m_read_buf+PUT_BACK_MAX);
106  // set write buffer size-1 so that we have an extra char avail for overflow
107  setp(m_write_buf, m_write_buf+(WRITE_BUFFER_SIZE-1));
108  }
109 
115  inline int_type flush_output(void) {
116  const std::streamsize bytes_to_send = std::streamsize(pptr() - pbase());
117  int_type bytes_sent = 0;
118  if (bytes_to_send > 0) {
119  boost::mutex::scoped_lock async_lock(m_async_mutex);
120  m_bytes_transferred = 0;
121  m_conn_ptr->async_write(boost::asio::buffer(pbase(), bytes_to_send),
122  boost::bind(&stream_buffer::operation_finished, this,
123  boost::asio::placeholders::error,
124  boost::asio::placeholders::bytes_transferred));
125  m_async_done.wait(async_lock);
126  bytes_sent = m_bytes_transferred;
127  pbump(-bytes_sent);
128  if (m_async_error)
129  bytes_sent = traits_type::eof();
130  }
131  return bytes_sent;
132  }
133 
139  virtual int_type underflow(void) {
140  // first check if we still have bytes available in the read buffer
141  if (gptr() < egptr())
142  return traits_type::to_int_type(*gptr());
143 
144  // calculate the number of bytes we will allow to be put back
145  std::streamsize put_back_num = std::streamsize(gptr() - eback());
146  if (put_back_num > PUT_BACK_MAX)
147  put_back_num = PUT_BACK_MAX;
148 
149  // copy the last bytes read to the beginning of the buffer (for put back)
150  if (put_back_num > 0)
151  memmove(m_read_buf+(PUT_BACK_MAX-put_back_num), gptr()-put_back_num, put_back_num);
152 
153  // read data from the TCP connection
154  // note that this has to be an ansynchronous call; otherwise, it cannot
155  // be cancelled by other threads and will block forever (such as during shutdown)
156  boost::mutex::scoped_lock async_lock(m_async_mutex);
157  m_bytes_transferred = 0;
158  m_conn_ptr->async_read_some(boost::asio::buffer(m_read_buf+PUT_BACK_MAX,
159  connection::READ_BUFFER_SIZE-PUT_BACK_MAX),
160  boost::bind(&stream_buffer::operation_finished, this,
161  boost::asio::placeholders::error,
162  boost::asio::placeholders::bytes_transferred));
163  m_async_done.wait(async_lock);
164  if (m_async_error)
165  return traits_type::eof();
166 
167  // reset buffer pointers now that data is available
168  setg(m_read_buf+(PUT_BACK_MAX-put_back_num), //< beginning of putback bytes
169  m_read_buf+PUT_BACK_MAX, //< read position
170  m_read_buf+PUT_BACK_MAX+m_bytes_transferred); //< end of buffer
171 
172  // return next character available
173  return traits_type::to_int_type(*gptr());
174  }
175 
182  virtual int_type overflow(int_type c) {
183  if (! traits_type::eq_int_type(c, traits_type::eof())) {
184  // character is not eof -> add it to the end of the write buffer
185  // we can push this to the back of the write buffer because we set
186  // the size of the write buffer to 1 less than the actual size using setp()
187  *pptr() = c;
188  pbump(1);
189  }
190  // flush data in the write buffer by sending it to the TCP connection
191  return ((flush_output() == traits_type::eof())
192  ? traits_type::eof() : traits_type::not_eof(c));
193  }
194 
203  virtual std::streamsize xsputn(const char_type *s, std::streamsize n) {
204  const std::streamsize bytes_available = std::streamsize(epptr() - pptr());
205  std::streamsize bytes_sent = 0;
206  if (bytes_available >= n) {
207  // there is enough room in the buffer -> just put it in there
208  memcpy(pptr(), s, n);
209  pbump(n);
210  bytes_sent = n;
211  } else {
212  // there is not enough room left in the buffer
213  if (bytes_available > 0) {
214  // fill up the buffer
215  memcpy(pptr(), s, bytes_available);
216  pbump(bytes_available);
217  }
218  // flush data in the write buffer by sending it to the TCP connection
219  if (flush_output() == traits_type::eof())
220  return 0;
221  if ((n-bytes_available) >= (WRITE_BUFFER_SIZE-1)) {
222  // the remaining data to send is larger than the buffer available
223  // send it all now rather than buffering
224  boost::mutex::scoped_lock async_lock(m_async_mutex);
225  m_bytes_transferred = 0;
226  m_conn_ptr->async_write(boost::asio::buffer(s+bytes_available,
227  n-bytes_available),
228  boost::bind(&stream_buffer::operation_finished, this,
229  boost::asio::placeholders::error,
230  boost::asio::placeholders::bytes_transferred));
231  m_async_done.wait(async_lock);
232  bytes_sent = bytes_available + m_bytes_transferred;
233  } else {
234  // the buffer is larger than the remaining data
235  // put remaining data to the beginning of the output buffer
236  memcpy(pbase(), s+bytes_available, n-bytes_available);
237  pbump(n-bytes_available);
238  bytes_sent = n;
239  }
240  }
241  return bytes_sent;
242  }
243 
252  virtual std::streamsize xsgetn(char_type *s, std::streamsize n) {
253  std::streamsize bytes_remaining = n;
254  while (bytes_remaining > 0) {
255  const std::streamsize bytes_available = std::streamsize(egptr() - gptr());
256  const std::streamsize bytes_next_read = ((bytes_available >= bytes_remaining)
257  ? bytes_remaining : bytes_available);
258  // copy available input data from buffer
259  if (bytes_next_read > 0) {
260  memcpy(s, gptr(), bytes_next_read);
261  gbump(bytes_next_read);
262  bytes_remaining -= bytes_next_read;
263  s += bytes_next_read;
264  }
265  if (bytes_remaining > 0) {
266  // call underflow() to read more data
267  if (traits_type::eq_int_type(underflow(), traits_type::eof()))
268  break;
269  }
270  }
271  return(n-bytes_remaining);
272  }
273 
279  virtual int_type sync(void) {
280  return ((flush_output() == traits_type::eof()) ? -1 : 0);
281  }
282 
283 
284 private:
285 
287  inline void operation_finished(const boost::system::error_code& error_code,
288  std::size_t bytes_transferred)
289  {
290  boost::mutex::scoped_lock async_lock(m_async_mutex);
291  m_async_error = error_code;
292  m_bytes_transferred = bytes_transferred;
293  m_async_done.notify_one();
294  }
295 
296 
298  tcp::connection_ptr m_conn_ptr;
299 
301  boost::mutex m_async_mutex;
302 
304  boost::condition m_async_done;
305 
307  boost::system::error_code m_async_error;
308 
310  std::size_t m_bytes_transferred;
311 
313  char_type * m_read_buf;
314 
316  char_type m_write_buf[WRITE_BUFFER_SIZE];
317 };
318 
319 
323 class stream
324  : public std::basic_iostream<char, std::char_traits<char> >
325 {
326 public:
327 
328  // data type definitions required for iostream compatability
329  typedef char char_type;
330  typedef std::char_traits<char>::int_type int_type;
331  typedef std::char_traits<char>::off_type off_type;
332  typedef std::char_traits<char>::pos_type pos_type;
333  typedef std::char_traits<char> traits_type;
334 
335 
341  explicit stream(const tcp::connection_ptr& conn_ptr)
342  : std::basic_iostream<char, std::char_traits<char> >(NULL), m_tcp_buf(conn_ptr)
343  {
344  // initialize basic_iostream with pointer to the stream buffer
345  std::basic_ios<char,std::char_traits<char> >::init(&m_tcp_buf);
346  }
347 
354  explicit stream(boost::asio::io_service& io_service,
355  const bool ssl_flag = false)
356  : std::basic_iostream<char, std::char_traits<char> >(NULL), m_tcp_buf(io_service, ssl_flag)
357  {
358  // initialize basic_iostream with pointer to the stream buffer
359  std::basic_ios<char,std::char_traits<char> >::init(&m_tcp_buf);
360  }
361 
368  stream(boost::asio::io_service& io_service,
369  connection::ssl_context_type& ssl_context)
370  : std::basic_iostream<char, std::char_traits<char> >(NULL), m_tcp_buf(io_service, ssl_context)
371  {
372  // initialize basic_iostream with pointer to the stream buffer
373  std::basic_ios<char,std::char_traits<char> >::init(&m_tcp_buf);
374  }
375 
384  inline boost::system::error_code accept(boost::asio::ip::tcp::acceptor& tcp_acceptor)
385  {
386  boost::system::error_code ec = m_tcp_buf.get_connection().accept(tcp_acceptor);
387  if (! ec && get_ssl_flag()) ec = m_tcp_buf.get_connection().handshake_server();
388  return ec;
389  }
390 
399  inline boost::system::error_code connect(boost::asio::ip::tcp::endpoint& tcp_endpoint)
400  {
401  boost::system::error_code ec = m_tcp_buf.get_connection().connect(tcp_endpoint);
402  if (! ec && get_ssl_flag()) ec = m_tcp_buf.get_connection().handshake_client();
403  return ec;
404  }
405 
415  inline boost::system::error_code connect(const boost::asio::ip::address& remote_addr,
416  const unsigned int remote_port)
417  {
418  boost::asio::ip::tcp::endpoint tcp_endpoint(remote_addr, remote_port);
419  boost::system::error_code ec = m_tcp_buf.get_connection().connect(tcp_endpoint);
420  if (! ec && get_ssl_flag()) ec = m_tcp_buf.get_connection().handshake_client();
421  return ec;
422  }
423 
425  inline void close(void) { m_tcp_buf.get_connection().close(); }
426 
427  /*
428  Use close instead; basic_socket::cancel is deprecated for Windows XP.
429 
431  inline void cancel(void) { m_tcp_buf.get_connection().cancel(); }
432  */
433 
435  inline bool is_open(void) const { return m_tcp_buf.get_connection().is_open(); }
436 
438  inline bool get_ssl_flag(void) const { return m_tcp_buf.get_connection().get_ssl_flag(); }
439 
441  inline boost::asio::ip::address get_remote_ip(void) const {
442  return m_tcp_buf.get_connection().get_remote_ip();
443  }
444 
446  stream_buffer *rdbuf(void) { return &m_tcp_buf; }
447 
448 
449 private:
450 
452  stream_buffer m_tcp_buf;
453 };
454 
455 
456 } // end namespace tcp
457 } // end namespace pion
458 
459 #endif
boost::asio::ip::address get_remote_ip(void) const
returns the client&#39;s IP address
Definition: stream.hpp:441
virtual int_type overflow(int_type c)
Definition: stream.hpp:182
int_type flush_output(void)
Definition: stream.hpp:115
bool get_ssl_flag(void) const
returns true if the connection is encrypted using SSL
Definition: stream.hpp:438
stream(const tcp::connection_ptr &conn_ptr)
Definition: stream.hpp:341
boost::system::error_code accept(boost::asio::ip::tcp::acceptor &tcp_acceptor)
Definition: stream.hpp:384
stream_buffer(boost::asio::io_service &io_service, const bool ssl_flag=false)
Definition: stream.hpp:68
void setup_buffers(void)
sets up the read and write buffers for input and output
Definition: stream.hpp:103
bool is_open(void) const
returns true if the connection is currently open
Definition: stream.hpp:435
stream(boost::asio::io_service &io_service, const bool ssl_flag=false)
Definition: stream.hpp:354
stream_buffer(boost::asio::io_service &io_service, connection::ssl_context_type &ssl_context)
Definition: stream.hpp:82
STL namespace.
stream(boost::asio::io_service &io_service, connection::ssl_context_type &ssl_context)
Definition: stream.hpp:368
const connection & get_connection(void) const
returns a const reference to the current TCP connection
Definition: stream.hpp:97
virtual std::streamsize xsputn(const char_type *s, std::streamsize n)
Definition: stream.hpp:203
virtual ~stream_buffer()
virtual destructor flushes the write buffer
Definition: stream.hpp:91
connection & get_connection(void)
returns a reference to the current TCP connection
Definition: stream.hpp:94
boost::system::error_code connect(boost::asio::ip::tcp::endpoint &tcp_endpoint)
Definition: stream.hpp:399
void close(void)
closes the tcp connection
Definition: stream.hpp:425
virtual int_type underflow(void)
Definition: stream.hpp:139
stream_buffer * rdbuf(void)
returns a pointer to the stream buffer in use
Definition: stream.hpp:446
virtual int_type sync(void)
Definition: stream.hpp:279
virtual std::streamsize xsgetn(char_type *s, std::streamsize n)
Definition: stream.hpp:252
boost::system::error_code connect(const boost::asio::ip::address &remote_addr, const unsigned int remote_port)
Definition: stream.hpp:415
stream_buffer(const tcp::connection_ptr &conn_ptr)
Definition: stream.hpp:56