websocketpp  0.3.0
C++/Boost Asio based websocket client/server library
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Pages
connection.hpp
1 /*
2  * Copyright (c) 2013, Peter Thorson. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright
7  * notice, this list of conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright
9  * notice, this list of conditions and the following disclaimer in the
10  * documentation and/or other materials provided with the distribution.
11  * * Neither the name of the WebSocket++ Project nor the
12  * names of its contributors may be used to endorse or promote products
13  * derived from this software without specific prior written permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED. IN NO EVENT SHALL PETER THORSON BE LIABLE FOR ANY
19  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
22  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25  *
26  */
27 
28 #ifndef WEBSOCKETPP_TRANSPORT_IOSTREAM_CON_HPP
29 #define WEBSOCKETPP_TRANSPORT_IOSTREAM_CON_HPP
30 
31 #include <websocketpp/common/connection_hdl.hpp>
32 #include <websocketpp/common/memory.hpp>
33 #include <websocketpp/common/platforms.hpp>
34 #include <websocketpp/logger/levels.hpp>
35 
36 #include <websocketpp/transport/base/connection.hpp>
37 #include <websocketpp/transport/iostream/base.hpp>
38 
39 #include <sstream>
40 #include <vector>
41 
42 namespace websocketpp {
43 namespace transport {
44 namespace iostream {
45 
48 struct timer {
49  void cancel() {}
50 };
51 
52 template <typename config>
53 class connection : public lib::enable_shared_from_this< connection<config> > {
54 public:
58  typedef lib::shared_ptr<type> ptr;
59 
61  typedef typename config::concurrency_type concurrency_type;
63  typedef typename config::alog_type alog_type;
65  typedef typename config::elog_type elog_type;
66 
67  // Concurrency policy types
68  typedef typename concurrency_type::scoped_lock_type scoped_lock_type;
69  typedef typename concurrency_type::mutex_type mutex_type;
70 
71  typedef lib::shared_ptr<timer> timer_ptr;
72 
73  explicit connection(bool is_server, alog_type & alog, elog_type & elog)
74  : m_output_stream(NULL)
75  , m_reading(false)
76  , m_is_server(is_server)
77  , m_is_secure(false)
78  , m_alog(alog)
79  , m_elog(elog)
80  , m_remote_endpoint("iostream transport")
81  {
82  m_alog.write(log::alevel::devel,"iostream con transport constructor");
83  }
84 
86  ptr get_shared() {
87  return type::shared_from_this();
88  }
89 
91 
97  void register_ostream(std::ostream * o) {
98  // TODO: lock transport state?
99  scoped_lock_type lock(m_read_mutex);
100  m_output_stream = o;
101  }
102 
104 
122  friend std::istream & operator>> (std::istream & in, type & t) {
123  // this serializes calls to external read.
124  scoped_lock_type lock(t.m_read_mutex);
125 
126  t.read(in);
127 
128  return in;
129  }
130 
132 
145  size_t read_some(char const * buf, size_t len) {
146  // this serializes calls to external read.
147  scoped_lock_type lock(m_read_mutex);
148 
149  return this->read_some_impl(buf,len);
150  }
151 
153 
168  size_t read_all(char const * buf, size_t len) {
169  // this serializes calls to external read.
170  scoped_lock_type lock(m_read_mutex);
171 
172  size_t total_read = 0;
173  size_t read = 0;
174 
175  do {
176  read = this->read_some_impl(buf+total_read,len-total_read);
177  total_read += read;
178  } while (read != 0 && total_read < len);
179 
180  return total_read;
181  }
182 
184 
188  size_t readsome(char const * buf, size_t len) {
189  return this->read_some(buf,len);
190  }
191 
193 
199  void eof() {
200  // this serializes calls to external read.
201  scoped_lock_type lock(m_read_mutex);
202 
203  if (m_reading) {
204  complete_read(make_error_code(transport::error::eof));
205  }
206  }
207 
209 
215  void fatal_error() {
216  // this serializes calls to external read.
217  scoped_lock_type lock(m_read_mutex);
218 
219  if (m_reading) {
220  complete_read(make_error_code(transport::error::pass_through));
221  }
222  }
223 
225 
237  void set_secure(bool value) {
238  m_is_secure = value;
239  }
240 
242 
251  bool is_secure() const {
252  return m_is_secure;
253  }
254 
256 
269  void set_remote_endpoint(std::string value) {
270  m_remote_endpoint = value;
271  }
272 
274 
285  std::string get_remote_endpoint() const {
286  return m_remote_endpoint;
287  }
288 
290 
294  return m_connection_hdl;
295  }
296 
298 
307  timer_ptr set_timer(long duration, timer_handler handler) {
308  return timer_ptr();
309  }
310 protected:
312 
317  void init(init_handler handler) {
318  m_alog.write(log::alevel::devel,"iostream connection init");
319  handler(lib::error_code());
320  }
321 
323 
346  void async_read_at_least(size_t num_bytes, char *buf, size_t len,
347  read_handler handler)
348  {
349  std::stringstream s;
350  s << "iostream_con async_read_at_least: " << num_bytes;
351  m_alog.write(log::alevel::devel,s.str());
352 
353  if (num_bytes > len) {
354  handler(make_error_code(error::invalid_num_bytes),size_t(0));
355  return;
356  }
357 
358  if (m_reading == true) {
359  handler(make_error_code(error::double_read),size_t(0));
360  return;
361  }
362 
363  if (num_bytes == 0 || len == 0) {
364  handler(lib::error_code(),size_t(0));
365  return;
366  }
367 
368  m_buf = buf;
369  m_len = len;
370  m_bytes_needed = num_bytes;
371  m_read_handler = handler;
372  m_cursor = 0;
373  m_reading = true;
374  }
375 
377 
390  void async_write(char const * buf, size_t len, write_handler handler) {
391  m_alog.write(log::alevel::devel,"iostream_con async_write");
392  // TODO: lock transport state?
393 
394  if (!m_output_stream) {
395  handler(make_error_code(error::output_stream_required));
396  return;
397  }
398 
399  m_output_stream->write(buf,len);
400 
401  if (m_output_stream->bad()) {
402  handler(make_error_code(error::bad_stream));
403  } else {
404  handler(lib::error_code());
405  }
406  }
407 
409 
421  void async_write(std::vector<buffer> const & bufs, write_handler handler) {
422  m_alog.write(log::alevel::devel,"iostream_con async_write buffer list");
423  // TODO: lock transport state?
424 
425  if (!m_output_stream) {
426  handler(make_error_code(error::output_stream_required));
427  return;
428  }
429 
430  std::vector<buffer>::const_iterator it;
431  for (it = bufs.begin(); it != bufs.end(); it++) {
432  m_output_stream->write((*it).buf,(*it).len);
433 
434  if (m_output_stream->bad()) {
435  handler(make_error_code(error::bad_stream));
436  }
437  }
438 
439  handler(lib::error_code());
440  }
441 
443 
447  m_connection_hdl = hdl;
448  }
449 
451 
461  lib::error_code dispatch(dispatch_handler handler) {
462  handler();
463  return lib::error_code();
464  }
465 
467 
471  handler(lib::error_code());
472  }
473 private:
474  void read(std::istream &in) {
475  m_alog.write(log::alevel::devel,"iostream_con read");
476 
477  while (in.good()) {
478  if (!m_reading) {
479  m_elog.write(log::elevel::devel,"write while not reading");
480  break;
481  }
482 
483  in.read(m_buf+m_cursor,static_cast<std::streamsize>(m_len-m_cursor));
484 
485  if (in.gcount() == 0) {
486  m_elog.write(log::elevel::devel,"read zero bytes");
487  break;
488  }
489 
490  m_cursor += static_cast<size_t>(in.gcount());
491 
492  // TODO: error handling
493  if (in.bad()) {
494  m_reading = false;
495  complete_read(make_error_code(error::bad_stream));
496  }
497 
498  if (m_cursor >= m_bytes_needed) {
499  m_reading = false;
500  complete_read(lib::error_code());
501  }
502  }
503  }
504 
505  size_t read_some_impl(char const * buf, size_t len) {
506  m_alog.write(log::alevel::devel,"iostream_con read_some");
507 
508  if (!m_reading) {
509  m_elog.write(log::elevel::devel,"write while not reading");
510  return 0;
511  }
512 
513  size_t bytes_to_copy = std::min(len,m_len-m_cursor);
514 
515  std::copy(buf,buf+bytes_to_copy,m_buf+m_cursor);
516 
517  m_cursor += bytes_to_copy;
518 
519  if (m_cursor >= m_bytes_needed) {
520  complete_read(lib::error_code());
521  }
522 
523  return bytes_to_copy;
524  }
525 
527 
542  void complete_read(lib::error_code const & ec) {
543  m_reading = false;
544 
545  read_handler handler = m_read_handler;
546  m_read_handler = read_handler();
547 
548  handler(ec,m_cursor);
549  }
550 
551  // Read space (Protected by m_read_mutex)
552  char * m_buf;
553  size_t m_len;
554  size_t m_bytes_needed;
555  read_handler m_read_handler;
556  size_t m_cursor;
557 
558  // transport resources
559  std::ostream * m_output_stream;
560  connection_hdl m_connection_hdl;
561 
562  bool m_reading;
563  bool const m_is_server;
564  bool m_is_secure;
565  alog_type & m_alog;
566  elog_type & m_elog;
567  std::string m_remote_endpoint;
568 
569  // This lock ensures that only one thread can edit read data for this
570  // connection. This is a very coarse lock that is basically locked all the
571  // time. The nature of the connection is such that it cannot be
572  // parallelized, the locking is here to prevent intra-connection concurrency
573  // in order to allow inter-connection concurrency.
574  mutex_type m_read_mutex;
575 };
576 
577 
578 } // namespace iostream
579 } // namespace transport
580 } // namespace websocketpp
581 
582 #endif // WEBSOCKETPP_TRANSPORT_IOSTREAM_CON_HPP
void register_ostream(std::ostream *o)
Register a std::ostream with the transport for writing output.
Definition: connection.hpp:97
uint16_t value
The type of a close code value.
Definition: close.hpp:49
void set_handle(connection_hdl hdl)
Set Connection Handle.
Definition: connection.hpp:446
friend std::istream & operator>>(std::istream &in, type &t)
Overloaded stream input operator.
Definition: connection.hpp:122
lib::error_code dispatch(dispatch_handler handler)
Call given handler back within the transport's event system (if present)
Definition: connection.hpp:461
void set_remote_endpoint(std::string value)
Set human readable remote endpoint address.
Definition: connection.hpp:269
lib::function< void(lib::error_code const &)> write_handler
The type and signature of the callback passed to the write method.
Definition: connection.hpp:122
static level const devel
Low level debugging information (warning: very chatty)
Definition: levels.hpp:44
lib::weak_ptr< void > connection_hdl
A handle to uniquely identify a connection.
bool is_secure() const
Tests whether or not the underlying transport is secure.
Definition: connection.hpp:251
size_t readsome(char const *buf, size_t len)
Manual input supply (DEPRECATED)
Definition: connection.hpp:188
async_read called while another async_read was in progress
Definition: base.hpp:52
lib::function< void(lib::error_code const &, size_t)> read_handler
The type and signature of the callback passed to the read method.
Definition: connection.hpp:119
size_t read_all(char const *buf, size_t len)
Manual input supply (read all)
Definition: connection.hpp:168
underlying transport pass through
Definition: connection.hpp:152
std::string get_remote_endpoint() const
Get human readable remote endpoint address.
Definition: connection.hpp:285
static level const devel
Development messages (warning: very chatty)
Definition: levels.hpp:122
void async_write(char const *buf, size_t len, write_handler handler)
Asyncronous Transport Write.
Definition: connection.hpp:390
size_t read_some(char const *buf, size_t len)
Manual input supply (read some)
Definition: connection.hpp:145
lib::function< void()> dispatch_handler
The type and signature of the callback passed to the dispatch method.
Definition: connection.hpp:134
async_read_at_least call requested more bytes than buffer can store
Definition: base.hpp:49
timer_ptr set_timer(long duration, timer_handler handler)
Call back a function after a period of time.
Definition: connection.hpp:307
config::alog_type alog_type
Type of this transport's access logging policy.
Definition: connection.hpp:63
lib::function< void(lib::error_code const &)> timer_handler
The type and signature of the callback passed to the read method.
Definition: connection.hpp:125
lib::function< void(lib::error_code const &)> shutdown_handler
The type and signature of the callback passed to the shutdown method.
Definition: connection.hpp:128
Namespace for the WebSocket++ project.
Definition: base64.hpp:41
config::elog_type elog_type
Type of this transport's error logging policy.
Definition: connection.hpp:65
lib::function< void(lib::error_code const &)> init_handler
The type and signature of the callback passed to the init hook.
Definition: connection.hpp:116
connection< config > type
Type of this connection transport component.
Definition: connection.hpp:56
ptr get_shared()
Get a shared pointer to this component.
Definition: connection.hpp:86
void init(init_handler handler)
Initialize the connection transport.
Definition: connection.hpp:317
connection_hdl get_handle() const
Get the connection handle.
Definition: connection.hpp:293
void async_shutdown(shutdown_handler handler)
Perform cleanup on socket shutdown_handler.
Definition: connection.hpp:470
void set_secure(bool value)
Set whether or not this connection is secure.
Definition: connection.hpp:237
void async_write(std::vector< buffer > const &bufs, write_handler handler)
Asyncronous Transport Write (scatter-gather)
Definition: connection.hpp:421
void fatal_error()
Signal transport error.
Definition: connection.hpp:215
void async_read_at_least(size_t num_bytes, char *buf, size_t len, read_handler handler)
Initiate an async_read for at least num_bytes bytes into buf.
Definition: connection.hpp:346
config::concurrency_type concurrency_type
transport concurrency policy
Definition: connection.hpp:61
lib::shared_ptr< type > ptr
Type of a shared pointer to this connection transport component.
Definition: connection.hpp:58