websocketpp  0.3.0
C++/Boost Asio based websocket client/server library
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Pages
connection_impl.hpp
1 /*
2  * Copyright (c) 2014, Peter Thorson. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright
7  * notice, this list of conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright
9  * notice, this list of conditions and the following disclaimer in the
10  * documentation and/or other materials provided with the distribution.
11  * * Neither the name of the WebSocket++ Project nor the
12  * names of its contributors may be used to endorse or promote products
13  * derived from this software without specific prior written permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED. IN NO EVENT SHALL PETER THORSON BE LIABLE FOR ANY
19  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
22  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25  *
26  */
27 
28 #ifndef WEBSOCKETPP_CONNECTION_IMPL_HPP
29 #define WEBSOCKETPP_CONNECTION_IMPL_HPP
30 
31 #include <websocketpp/common/platforms.hpp>
32 #include <websocketpp/common/system_error.hpp>
33 
34 #include <websocketpp/processors/processor.hpp>
35 
36 #include <websocketpp/processors/hybi00.hpp>
37 #include <websocketpp/processors/hybi07.hpp>
38 #include <websocketpp/processors/hybi08.hpp>
39 #include <websocketpp/processors/hybi13.hpp>
40 
41 namespace websocketpp {
42 
43 namespace istate = session::internal_state;
44 
45 template <typename config>
47  termination_handler new_handler)
48 {
49  m_alog.write(log::alevel::devel,
50  "connection set_termination_handler");
51 
52  //scoped_lock_type lock(m_connection_state_lock);
53 
54  m_termination_handler = new_handler;
55 }
56 
57 template <typename config>
58 const std::string& connection<config>::get_origin() const {
59  //scoped_lock_type lock(m_connection_state_lock);
60  return m_processor->get_origin(m_request);
61 }
62 
63 template <typename config>
65  //scoped_lock_type lock(m_connection_state_lock);
66  return m_send_buffer_size;
67 }
68 
69 template <typename config>
70 session::state::value connection<config>::get_state() const {
71  //scoped_lock_type lock(m_connection_state_lock);
72  return m_state;
73 }
74 
75 template <typename config>
76 lib::error_code connection<config>::send(const std::string& payload,
77  frame::opcode::value op)
78 {
79  message_ptr msg = m_msg_manager->get_message(op,payload.size());
80  msg->append_payload(payload);
81 
82  return send(msg);
83 }
84 
85 template <typename config>
86 lib::error_code connection<config>::send(const void* payload, size_t len,
87  frame::opcode::value op)
88 {
89  message_ptr msg = m_msg_manager->get_message(op,len);
90  msg->append_payload(payload,len);
91 
92  return send(msg);
93 }
94 
95 template <typename config>
96 lib::error_code connection<config>::send(typename config::message_type::ptr msg)
97 {
98  if (m_alog.static_test(log::alevel::devel)) {
99  m_alog.write(log::alevel::devel,"connection send");
100  }
101  // TODO:
102 
103  if (m_state != session::state::open) {
104  return error::make_error_code(error::invalid_state);
105  }
106 
107  message_ptr outgoing_msg;
108  bool needs_writing = false;
109 
110  if (msg->get_prepared()) {
111  outgoing_msg = msg;
112 
113  scoped_lock_type lock(m_write_lock);
114  write_push(outgoing_msg);
115  needs_writing = !m_write_flag && !m_send_queue.empty();
116  } else {
117  outgoing_msg = m_msg_manager->get_message();
118 
119  if (!outgoing_msg) {
120  return error::make_error_code(error::no_outgoing_buffers);
121  }
122 
123  scoped_lock_type lock(m_write_lock);
124  lib::error_code ec = m_processor->prepare_data_frame(msg,outgoing_msg);
125 
126  if (ec) {
127  return ec;
128  }
129 
130  write_push(outgoing_msg);
131  needs_writing = !m_write_flag && !m_send_queue.empty();
132  }
133 
134  if (needs_writing) {
135  transport_con_type::dispatch(lib::bind(
136  &type::write_frame,
137  type::get_shared()
138  ));
139  }
140 
141  return lib::error_code();
142 }
143 
144 template <typename config>
145 void connection<config>::ping(const std::string& payload, lib::error_code& ec) {
146  if (m_alog.static_test(log::alevel::devel)) {
147  m_alog.write(log::alevel::devel,"connection ping");
148  }
149 
150  if (m_state != session::state::open) {
151  ec = error::make_error_code(error::invalid_state);
152  return;
153  }
154 
155  message_ptr msg = m_msg_manager->get_message();
156  if (!msg) {
157  ec = error::make_error_code(error::no_outgoing_buffers);
158  return;
159  }
160 
161  ec = m_processor->prepare_ping(payload,msg);
162  if (ec) {return;}
163 
164  // set ping timer if we are listening for one
165  if (m_pong_timeout_handler) {
166  // Cancel any existing timers
167  if (m_ping_timer) {
168  m_ping_timer->cancel();
169  }
170 
171  if (m_pong_timeout_dur > 0) {
172  m_ping_timer = transport_con_type::set_timer(
173  m_pong_timeout_dur,
174  lib::bind(
175  &type::handle_pong_timeout,
176  type::get_shared(),
177  payload,
178  lib::placeholders::_1
179  )
180  );
181  }
182 
183  if (!m_ping_timer) {
184  // Our transport doesn't support timers
185  m_elog.write(log::elevel::warn,"Warning: a pong_timeout_handler is \
186  set but the transport in use does not support timeouts.");
187  }
188  }
189 
190  bool needs_writing = false;
191  {
192  scoped_lock_type lock(m_write_lock);
193  write_push(msg);
194  needs_writing = !m_write_flag && !m_send_queue.empty();
195  }
196 
197  if (needs_writing) {
198  transport_con_type::dispatch(lib::bind(
199  &type::write_frame,
200  type::get_shared()
201  ));
202  }
203 
204  ec = lib::error_code();
205 }
206 
207 template<typename config>
208 void connection<config>::ping(const std::string& payload) {
209  lib::error_code ec;
210  ping(payload,ec);
211  if (ec) {
212  throw ec;
213  }
214 }
215 
216 template<typename config>
217 void connection<config>::handle_pong_timeout(std::string payload, const lib::error_code &
218  ec)
219 {
220  if (ec) {
222  // ignore, this is expected
223  return;
224  }
225 
226  m_elog.write(log::elevel::devel,"pong_timeout error: "+ec.message());
227  return;
228  }
229 
230  if (m_pong_timeout_handler) {
231  m_pong_timeout_handler(m_connection_hdl,payload);
232  }
233 }
234 
235 template <typename config>
236 void connection<config>::pong(const std::string& payload, lib::error_code& ec) {
237  if (m_alog.static_test(log::alevel::devel)) {
238  m_alog.write(log::alevel::devel,"connection pong");
239  }
240 
241  if (m_state != session::state::open) {
242  ec = error::make_error_code(error::invalid_state);
243  return;
244  }
245 
246  message_ptr msg = m_msg_manager->get_message();
247  if (!msg) {
248  ec = error::make_error_code(error::no_outgoing_buffers);
249  return;
250  }
251 
252  ec = m_processor->prepare_pong(payload,msg);
253  if (ec) {return;}
254 
255  bool needs_writing = false;
256  {
257  scoped_lock_type lock(m_write_lock);
258  write_push(msg);
259  needs_writing = !m_write_flag && !m_send_queue.empty();
260  }
261 
262  if (needs_writing) {
263  transport_con_type::dispatch(lib::bind(
264  &type::write_frame,
265  type::get_shared()
266  ));
267  }
268 
269  ec = lib::error_code();
270 }
271 
272 template<typename config>
273 void connection<config>::pong(const std::string& payload) {
274  lib::error_code ec;
275  pong(payload,ec);
276  if (ec) {
277  throw ec;
278  }
279 }
280 
281 template <typename config>
283  std::string const & reason, lib::error_code & ec)
284 {
285  if (m_alog.static_test(log::alevel::devel)) {
286  m_alog.write(log::alevel::devel,"connection close");
287  }
288 
289  if (m_state != session::state::open) {
290  ec = error::make_error_code(error::invalid_state);
291  return;
292  }
293 
294  // Truncate reason to maximum size allowable in a close frame.
295  std::string tr(reason,0,std::min<size_t>(reason.size(),
297 
298  ec = this->send_close_frame(code,tr,false,close::status::terminal(code));
299 }
300 
301 template<typename config>
303  std::string const & reason)
304 {
305  lib::error_code ec;
306  close(code,reason,ec);
307  if (ec) {
308  throw ec;
309  }
310 }
311 
313 
316 template <typename config>
317 lib::error_code connection<config>::interrupt() {
318  m_alog.write(log::alevel::devel,"connection connection::interrupt");
319  return transport_con_type::interrupt(
320  lib::bind(
321  &type::handle_interrupt,
322  type::get_shared()
323  )
324  );
325 }
326 
327 
328 template <typename config>
330  if (m_interrupt_handler) {
331  m_interrupt_handler(m_connection_hdl);
332  }
333 }
334 
335 template <typename config>
337  m_alog.write(log::alevel::devel,"connection connection::pause_reading");
338  return transport_con_type::dispatch(
339  lib::bind(
340  &type::handle_pause_reading,
341  type::get_shared()
342  )
343  );
344 }
345 
347 template <typename config>
349  m_alog.write(log::alevel::devel,"connection connection::handle_pause_reading");
350  m_read_flag = false;
351 }
352 
353 template <typename config>
355  m_alog.write(log::alevel::devel,"connection connection::resume_reading");
356  return transport_con_type::dispatch(
357  lib::bind(
358  &type::handle_resume_reading,
359  type::get_shared()
360  )
361  );
362 }
363 
365 template <typename config>
367  m_read_flag = true;
368  read_frame();
369 }
370 
371 
372 
373 
374 
375 
376 
377 
378 
379 
380 
381 template <typename config>
383  //scoped_lock_type lock(m_connection_state_lock);
384  return m_uri->get_secure();
385 }
386 
387 template <typename config>
388 const std::string& connection<config>::get_host() const {
389  //scoped_lock_type lock(m_connection_state_lock);
390  return m_uri->get_host();
391 }
392 
393 template <typename config>
394 const std::string& connection<config>::get_resource() const {
395  //scoped_lock_type lock(m_connection_state_lock);
396  return m_uri->get_resource();
397 }
398 
399 template <typename config>
401  //scoped_lock_type lock(m_connection_state_lock);
402  return m_uri->get_port();
403 }
404 
405 template <typename config>
407  //scoped_lock_type lock(m_connection_state_lock);
408  return m_uri;
409 }
410 
411 template <typename config>
413  //scoped_lock_type lock(m_connection_state_lock);
414  m_uri = uri;
415 }
416 
417 
418 
419 
420 
421 
422 template <typename config>
423 const std::string & connection<config>::get_subprotocol() const {
424  return m_subprotocol;
425 }
426 
427 template <typename config>
428 const std::vector<std::string> &
430  return m_requested_subprotocols;
431 }
432 
433 template <typename config>
435  lib::error_code & ec)
436 {
437  if (m_is_server) {
438  ec = error::make_error_code(error::client_only);
439  return;
440  }
441 
442  // If the value is empty or has a non-RFC2616 token character it is invalid.
443  if (value.empty() || std::find_if(value.begin(),value.end(),
444  http::is_not_token_char) != value.end())
445  {
446  ec = error::make_error_code(error::invalid_subprotocol);
447  return;
448  }
449 
450  m_requested_subprotocols.push_back(value);
451 }
452 
453 template <typename config>
454 void connection<config>::add_subprotocol(const std::string & value) {
455  lib::error_code ec;
456  this->add_subprotocol(value,ec);
457  if (ec) {
458  throw ec;
459  }
460 }
461 
462 
463 template <typename config>
465  lib::error_code & ec)
466 {
467  if (!m_is_server) {
468  ec = error::make_error_code(error::server_only);
469  return;
470  }
471 
472  if (value.empty()) {
473  ec = lib::error_code();
474  return;
475  }
476 
477  std::vector<std::string>::iterator it;
478 
479  it = std::find(m_requested_subprotocols.begin(),
480  m_requested_subprotocols.end(),
481  value);
482 
483  if (it == m_requested_subprotocols.end()) {
484  ec = error::make_error_code(error::unrequested_subprotocol);
485  return;
486  }
487 
488  m_subprotocol = value;
489 }
490 
491 template <typename config>
493  lib::error_code ec;
494  this->select_subprotocol(value,ec);
495  if (ec) {
496  throw ec;
497  }
498 }
499 
500 
501 template <typename config>
502 const std::string &
503 connection<config>::get_request_header(const std::string &key) {
504  return m_request.get_header(key);
505 }
506 
507 template <typename config>
508 const std::string &
510  return m_response.get_header(key);
511 }
512 
513 template <typename config>
514 void connection<config>::set_status(http::status_code::value code)
515 {
516  //scoped_lock_type lock(m_connection_state_lock);
517 
518  if (m_internal_state != istate::PROCESS_HTTP_REQUEST) {
519  throw error::make_error_code(error::invalid_state);
520  //throw exception("Call to set_status from invalid state",
521  // error::INVALID_STATE);
522  }
523 
524  m_response.set_status(code);
525 }
526 template <typename config>
527 void connection<config>::set_status(http::status_code::value code,
528  std::string const & msg)
529 {
530  //scoped_lock_type lock(m_connection_state_lock);
531 
532  if (m_internal_state != istate::PROCESS_HTTP_REQUEST) {
533  throw error::make_error_code(error::invalid_state);
534  //throw exception("Call to set_status from invalid state",
535  // error::INVALID_STATE);
536  }
537 
538  m_response.set_status(code,msg);
539 }
540 template <typename config>
541 void connection<config>::set_body(std::string const & value) {
542  //scoped_lock_type lock(m_connection_state_lock);
543 
544  if (m_internal_state != istate::PROCESS_HTTP_REQUEST) {
545  throw error::make_error_code(error::invalid_state);
546  //throw exception("Call to set_status from invalid state",
547  // error::INVALID_STATE);
548  }
549 
550  m_response.set_body(value);
551 }
552 
553 template <typename config>
554 void connection<config>::append_header(std::string const & key,
555  std::string const & val)
556 {
557  //scoped_lock_type lock(m_connection_state_lock);
558 
559  if (m_is_server) {
560  if (m_internal_state == istate::PROCESS_HTTP_REQUEST) {
561  // we are setting response headers for an incoming server connection
562  m_response.append_header(key,val);
563  } else {
564  throw error::make_error_code(error::invalid_state);
565  }
566  } else {
567  if (m_internal_state == istate::USER_INIT) {
568  // we are setting initial headers for an outgoing client connection
569  m_request.append_header(key,val);
570  } else {
571  throw error::make_error_code(error::invalid_state);
572  }
573  }
574 }
575 template <typename config>
576 void connection<config>::replace_header(std::string const & key,
577  std::string const & val)
578 {
579  // scoped_lock_type lock(m_connection_state_lock);
580 
581  if (m_is_server) {
582  if (m_internal_state == istate::PROCESS_HTTP_REQUEST) {
583  // we are setting response headers for an incoming server connection
584  m_response.replace_header(key,val);
585  } else {
586  throw error::make_error_code(error::invalid_state);
587  }
588  } else {
589  if (m_internal_state == istate::USER_INIT) {
590  // we are setting initial headers for an outgoing client connection
591  m_request.replace_header(key,val);
592  } else {
593  throw error::make_error_code(error::invalid_state);
594  }
595  }
596 }
597 template <typename config>
598 void connection<config>::remove_header(std::string const & key)
599 {
600  //scoped_lock_type lock(m_connection_state_lock);
601 
602  if (m_is_server) {
603  if (m_internal_state == istate::PROCESS_HTTP_REQUEST) {
604  // we are setting response headers for an incoming server connection
605  m_response.remove_header(key);
606  } else {
607  throw error::make_error_code(error::invalid_state);
608  }
609  } else {
610  if (m_internal_state == istate::USER_INIT) {
611  // we are setting initial headers for an outgoing client connection
612  m_request.remove_header(key);
613  } else {
614  throw error::make_error_code(error::invalid_state);
615  }
616  }
617 }
618 
619 
620 
621 
622 
623 
624 /******** logic thread ********/
625 
626 template <typename config>
628  m_alog.write(log::alevel::devel,"connection start");
629 
630  this->atomic_state_change(
631  istate::USER_INIT,
632  istate::TRANSPORT_INIT,
633  "Start must be called from user init state"
634  );
635 
636  // Depending on how the transport implements init this function may return
637  // immediately and call handle_transport_init later or call
638  // handle_transport_init from this function.
639  transport_con_type::init(
640  lib::bind(
641  &type::handle_transport_init,
642  type::get_shared(),
643  lib::placeholders::_1
644  )
645  );
646 }
647 
648 template <typename config>
649 void connection<config>::handle_transport_init(lib::error_code const & ec) {
650  m_alog.write(log::alevel::devel,"connection handle_transport_init");
651 
652  {
653  scoped_lock_type lock(m_connection_state_lock);
654 
655  if (m_internal_state != istate::TRANSPORT_INIT) {
656  throw error::make_error_code(error::invalid_state);
657  //throw exception("handle_transport_init must be called from transport init state",
658  // error::INVALID_STATE);
659  }
660 
661  if (!ec) {
662  // unless there was a transport error, advance internal state.
663  if (m_is_server) {
664  m_internal_state = istate::READ_HTTP_REQUEST;
665  } else {
666  m_internal_state = istate::WRITE_HTTP_REQUEST;
667  }
668  }
669  }
670 
671  if (ec) {
672  std::stringstream s;
673  s << "handle_transport_init received error: "<< ec.message();
674  m_elog.write(log::elevel::fatal,s.str());
675 
676  this->terminate(ec);
677  return;
678  }
679 
680  // At this point the transport is ready to read and write bytes.
681  if (m_is_server) {
682  this->read_handshake(1);
683  } else {
684  // We are a client. Set the processor to the version specified in the
685  // config file and send a handshake request.
686  m_processor = get_processor(config::client_version);
687  this->send_http_request();
688  }
689 }
690 
691 template <typename config>
692 void connection<config>::read_handshake(size_t num_bytes) {
693  m_alog.write(log::alevel::devel,"connection read");
694 
695  if (m_open_handshake_timeout_dur > 0) {
696  m_handshake_timer = transport_con_type::set_timer(
697  m_open_handshake_timeout_dur,
698  lib::bind(
699  &type::handle_open_handshake_timeout,
700  type::get_shared(),
701  lib::placeholders::_1
702  )
703  );
704  }
705 
706  transport_con_type::async_read_at_least(
707  num_bytes,
708  m_buf,
709  config::connection_read_buffer_size,
710  lib::bind(
711  &type::handle_read_handshake,
712  type::get_shared(),
713  lib::placeholders::_1,
714  lib::placeholders::_2
715  )
716  );
717 }
718 
719 // All exit paths for this function need to call send_http_response() or submit
720 // a new read request with this function as the handler.
721 template <typename config>
722 void connection<config>::handle_read_handshake(lib::error_code const & ec,
723  size_t bytes_transferred)
724 {
725  m_alog.write(log::alevel::devel,"connection handle_read_handshake");
726 
727  this->atomic_state_check(
728  istate::READ_HTTP_REQUEST,
729  "handle_read_handshake must be called from READ_HTTP_REQUEST state"
730  );
731 
732  if (ec) {
733  if (ec == transport::error::eof) {
734  // we expect to get eof if the connection is closed already
735  if (m_state == session::state::closed) {
736  m_alog.write(log::alevel::devel,"got eof from closed con");
737  return;
738  }
739  }
740 
741  std::stringstream s;
742  s << "error in handle_read_handshake: "<< ec.message();
743  m_elog.write(log::elevel::fatal,s.str());
744  this->terminate(ec);
745  return;
746  }
747 
748  // Boundaries checking. TODO: How much of this should be done?
749  if (bytes_transferred > config::connection_read_buffer_size) {
750  m_elog.write(log::elevel::fatal,"Fatal boundaries checking error.");
751  this->terminate(make_error_code(error::general));
752  return;
753  }
754 
755  size_t bytes_processed = 0;
756  try {
757  bytes_processed = m_request.consume(m_buf,bytes_transferred);
758  } catch (http::exception &e) {
759  // All HTTP exceptions will result in this request failing and an error
760  // response being returned. No more bytes will be read in this con.
761  m_response.set_status(e.m_error_code,e.m_error_msg);
762  this->send_http_response_error();
763  return;
764  }
765 
766  // More paranoid boundaries checking.
767  // TODO: Is this overkill?
768  if (bytes_processed > config::connection_read_buffer_size) {
769  m_elog.write(log::elevel::fatal,"Fatal boundaries checking error.");
770  this->terminate(make_error_code(error::general));
771  return;
772  }
773 
774  if (m_alog.static_test(log::alevel::devel)) {
775  std::stringstream s;
776  s << "bytes_transferred: " << bytes_transferred
777  << " bytes, bytes processed: " << bytes_processed << " bytes";
778  m_alog.write(log::alevel::devel,s.str());
779  }
780 
781  if (m_request.ready()) {
782  if (!this->initialize_processor()) {
783  this->send_http_response_error();
784  return;
785  }
786 
787  if (m_processor && m_processor->get_version() == 0) {
788  // Version 00 has an extra requirement to read some bytes after the
789  // handshake
790  if (bytes_transferred-bytes_processed >= 8) {
791  m_request.replace_header(
792  "Sec-WebSocket-Key3",
793  std::string(m_buf+bytes_processed,m_buf+bytes_processed+8)
794  );
795  bytes_processed += 8;
796  } else {
797  // TODO: need more bytes
798  m_alog.write(log::alevel::devel,"short key3 read");
799  m_response.set_status(http::status_code::internal_server_error);
800  this->send_http_response_error();
801  return;
802  }
803  }
804 
805  if (m_alog.static_test(log::alevel::devel)) {
806  m_alog.write(log::alevel::devel,m_request.raw());
807  if (m_request.get_header("Sec-WebSocket-Key3") != "") {
808  m_alog.write(log::alevel::devel,
809  utility::to_hex(m_request.get_header("Sec-WebSocket-Key3")));
810  }
811  }
812 
813  // The remaining bytes in m_buf are frame data. Copy them to the
814  // beginning of the buffer and note the length. They will be read after
815  // the handshake completes and before more bytes are read.
816  std::copy(m_buf+bytes_processed,m_buf+bytes_transferred,m_buf);
817  m_buf_cursor = bytes_transferred-bytes_processed;
818 
819  this->atomic_state_change(
820  istate::READ_HTTP_REQUEST,
821  istate::PROCESS_HTTP_REQUEST,
822  "send_http_response must be called from READ_HTTP_REQUEST state"
823  );
824 
825  // We have the complete request. Process it.
826  this->process_handshake_request();
827  this->send_http_response();
828  } else {
829  // read at least 1 more byte
830  transport_con_type::async_read_at_least(
831  1,
832  m_buf,
833  config::connection_read_buffer_size,
834  lib::bind(
835  &type::handle_read_handshake,
836  type::get_shared(),
837  lib::placeholders::_1,
838  lib::placeholders::_2
839  )
840  );
841  }
842 }
843 
844 // send_http_response requires the request to be fully read and the connection
845 // to be in the PROCESS_HTTP_REQUEST state. In some cases we can detect errors
846 // before the request is fully read (specifically at a point where we aren't
847 // sure if the hybi00 key3 bytes need to be read). This method sets the correct
848 // state and calls send_http_response
849 template <typename config>
850 void connection<config>::send_http_response_error() {
851  this->atomic_state_change(
852  istate::READ_HTTP_REQUEST,
853  istate::PROCESS_HTTP_REQUEST,
854  "send_http_response must be called from READ_HTTP_REQUEST state"
855  );
856  this->send_http_response();
857 }
858 
859 // All exit paths for this function need to call send_http_response() or submit
860 // a new read request with this function as the handler.
861 template <typename config>
862 void connection<config>::handle_read_frame(lib::error_code const & ec,
863  size_t bytes_transferred)
864 {
865  //m_alog.write(log::alevel::devel,"connection handle_read_frame");
866 
867  this->atomic_state_check(
868  istate::PROCESS_CONNECTION,
869  "handle_read_frame must be called from PROCESS_CONNECTION state"
870  );
871 
872  if (ec) {
873  log::level echannel = log::elevel::fatal;
874 
875  if (ec == transport::error::eof) {
876  if (m_state == session::state::closed) {
877  // we expect to get eof if the connection is closed already
878  // just ignore it
879  m_alog.write(log::alevel::devel,"got eof from closed con");
880  return;
881  } else if (m_state == session::state::closing && !m_is_server) {
882  // If we are a client we expect to get eof in the closing state,
883  // this is a signal to terminate our end of the connection after
884  // the closing handshake
885  terminate(lib::error_code());
886  return;
887  }
888  }
890  if (m_state == session::state::closed) {
891  // We expect to get a TLS short read if we try to read after the
892  // connection is closed. If this happens ignore and exit the
893  // read frame path.
894  terminate(lib::error_code());
895  return;
896  }
897  echannel = log::elevel::rerror;
898  } else if (ec == transport::error::action_after_shutdown) {
899  echannel = log::elevel::info;
900  }
901 
902  log_err(echannel, "handle_read_frame", ec);
903  this->terminate(ec);
904  return;
905  }
906 
907  // Boundaries checking. TODO: How much of this should be done?
908  /*if (bytes_transferred > config::connection_read_buffer_size) {
909  m_elog.write(log::elevel::fatal,"Fatal boundaries checking error");
910  this->terminate(make_error_code(error::general));
911  return;
912  }*/
913 
914  size_t p = 0;
915 
916  if (m_alog.static_test(log::alevel::devel)) {
917  std::stringstream s;
918  s << "p = " << p << " bytes transferred = " << bytes_transferred;
919  m_alog.write(log::alevel::devel,s.str());
920  }
921 
922  while (p < bytes_transferred) {
923  if (m_alog.static_test(log::alevel::devel)) {
924  std::stringstream s;
925  s << "calling consume with " << bytes_transferred-p << " bytes";
926  m_alog.write(log::alevel::devel,s.str());
927  }
928 
929  lib::error_code consume_ec;
930 
931  p += m_processor->consume(
932  reinterpret_cast<uint8_t*>(m_buf)+p,
933  bytes_transferred-p,
934  consume_ec
935  );
936 
937  if (m_alog.static_test(log::alevel::devel)) {
938  std::stringstream s;
939  s << "bytes left after consume: " << bytes_transferred-p;
940  m_alog.write(log::alevel::devel,s.str());
941  }
942  if (consume_ec) {
943  log_err(log::elevel::rerror, "consume", consume_ec);
944 
945  if (config::drop_on_protocol_error) {
946  this->terminate(consume_ec);
947  return;
948  } else {
949  lib::error_code close_ec;
950  this->close(
951  processor::error::to_ws(consume_ec),
952  consume_ec.message(),
953  close_ec
954  );
955 
956  if (close_ec) {
957  log_err(log::elevel::fatal, "Protocol error close frame ", close_ec);
958  this->terminate(close_ec);
959  return;
960  }
961  }
962  return;
963  }
964 
965  if (m_processor->ready()) {
966  if (m_alog.static_test(log::alevel::devel)) {
967  std::stringstream s;
968  s << "Complete message received. Dispatching";
969  m_alog.write(log::alevel::devel,s.str());
970  }
971 
972  message_ptr msg = m_processor->get_message();
973 
974  if (!msg) {
975  m_alog.write(log::alevel::devel, "null message from m_processor");
976  } else if (!is_control(msg->get_opcode())) {
977  // data message, dispatch to user
978  if (m_state != session::state::open) {
979  m_elog.write(log::elevel::warn, "got non-close frame while closing");
980  } else if (m_message_handler) {
981  m_message_handler(m_connection_hdl, msg);
982  }
983  } else {
984  process_control_frame(msg);
985  }
986  }
987  }
988 
989  read_frame();
990 }
991 
993 template <typename config>
995  if (!m_read_flag) {
996  return;
997  }
998 
999  transport_con_type::async_read_at_least(
1000  // std::min wont work with undefined static const values.
1001  // TODO: is there a more elegant way to do this?
1002  // Need to determine if requesting 1 byte or the exact number of bytes
1003  // is better here. 1 byte lets us be a bit more responsive at a
1004  // potential expense of additional runs through handle_read_frame
1005  /*(m_processor->get_bytes_needed() > config::connection_read_buffer_size ?
1006  config::connection_read_buffer_size : m_processor->get_bytes_needed())*/
1007  1,
1008  m_buf,
1009  config::connection_read_buffer_size,
1010  m_handle_read_frame
1011  );
1012 }
1013 
1014 template <typename config>
1016  m_alog.write(log::alevel::devel,"initialize_processor");
1017 
1018  // if it isn't a websocket handshake nothing to do.
1019  if (!processor::is_websocket_handshake(m_request)) {
1020  return true;
1021  }
1022 
1023  int version = processor::get_websocket_version(m_request);
1024 
1025  if (version < 0) {
1026  m_alog.write(log::alevel::devel, "BAD REQUEST: can't determine version");
1027  m_response.set_status(http::status_code::bad_request);
1028  return false;
1029  }
1030 
1031  m_processor = get_processor(version);
1032 
1033  // if the processor is not null we are done
1034  if (m_processor) {
1035  return true;
1036  }
1037 
1038  // We don't have a processor for this version. Return bad request
1039  // with Sec-WebSocket-Version header filled with values we do accept
1040  m_alog.write(log::alevel::devel, "BAD REQUEST: no processor for version");
1041  m_response.set_status(http::status_code::bad_request);
1042 
1043  std::stringstream ss;
1044  std::string sep = "";
1045  std::vector<int>::const_iterator it;
1046  for (it = versions_supported.begin(); it != versions_supported.end(); it++)
1047  {
1048  ss << sep << *it;
1049  sep = ",";
1050  }
1051 
1052  m_response.replace_header("Sec-WebSocket-Version",ss.str());
1053  return false;
1054 }
1055 
1056 template <typename config>
1058  m_alog.write(log::alevel::devel,"process handshake request");
1059 
1060  if (!processor::is_websocket_handshake(m_request)) {
1061  // this is not a websocket handshake. Process as plain HTTP
1062  m_alog.write(log::alevel::devel,"HTTP REQUEST");
1063 
1064  // extract URI from request
1066  m_request,
1067  (transport_con_type::is_secure() ? "https" : "http")
1068  );
1069 
1070  if (!m_uri->get_valid()) {
1071  m_alog.write(log::alevel::devel, "Bad request: failed to parse uri");
1072  m_response.set_status(http::status_code::bad_request);
1073  return false;
1074  }
1075 
1076  if (m_http_handler) {
1077  m_http_handler(m_connection_hdl);
1078  } else {
1079  set_status(http::status_code::upgrade_required);
1080  }
1081 
1082  return true;
1083  }
1084 
1085  lib::error_code ec = m_processor->validate_handshake(m_request);
1086 
1087  // Validate: make sure all required elements are present.
1088  if (ec){
1089  // Not a valid handshake request
1090  m_alog.write(log::alevel::devel, "Bad request " + ec.message());
1091  m_response.set_status(http::status_code::bad_request);
1092  return false;
1093  }
1094 
1095  // Read extension parameters and set up values necessary for the end user
1096  // to complete extension negotiation.
1097  std::pair<lib::error_code,std::string> neg_results;
1098  neg_results = m_processor->negotiate_extensions(m_request);
1099 
1100  if (neg_results.first) {
1101  // There was a fatal error in extension parsing that should result in
1102  // a failed connection attempt.
1103  m_alog.write(log::alevel::devel, "Bad request: " + neg_results.first.message());
1104  m_response.set_status(http::status_code::bad_request);
1105  return false;
1106  } else {
1107  // extension negotiation succeeded, set response header accordingly
1108  // we don't send an empty extensions header because it breaks many
1109  // clients.
1110  if (neg_results.second.size() > 0) {
1111  m_response.replace_header("Sec-WebSocket-Extensions",
1112  neg_results.second);
1113  }
1114  }
1115 
1116  // extract URI from request
1117  m_uri = m_processor->get_uri(m_request);
1118 
1119 
1120  if (!m_uri->get_valid()) {
1121  m_alog.write(log::alevel::devel, "Bad request: failed to parse uri");
1122  m_response.set_status(http::status_code::bad_request);
1123  return false;
1124  }
1125 
1126  // extract subprotocols
1127  lib::error_code subp_ec = m_processor->extract_subprotocols(m_request,
1128  m_requested_subprotocols);
1129 
1130  if (subp_ec) {
1131  // should we do anything?
1132  }
1133 
1134  // Ask application to validate the connection
1135  if (!m_validate_handler || m_validate_handler(m_connection_hdl)) {
1136  m_response.set_status(http::status_code::switching_protocols);
1137 
1138  // Write the appropriate response headers based on request and
1139  // processor version
1140  ec = m_processor->process_handshake(m_request,m_subprotocol,m_response);
1141 
1142  if (ec) {
1143  std::stringstream s;
1144  s << "Processing error: " << ec << "(" << ec.message() << ")";
1145  m_alog.write(log::alevel::devel, s.str());
1146 
1147  m_response.set_status(http::status_code::internal_server_error);
1148  return false;
1149  }
1150  } else {
1151  // User application has rejected the handshake
1152  m_alog.write(log::alevel::devel, "USER REJECT");
1153 
1154  // Use Bad Request if the user handler did not provide a more
1155  // specific http response error code.
1156  // TODO: is there a better default?
1157  if (m_response.get_status_code() == http::status_code::uninitialized) {
1158  m_response.set_status(http::status_code::bad_request);
1159  }
1160 
1161  return false;
1162  }
1163 
1164  return true;
1165 }
1166 
1167 template <typename config>
1169  m_alog.write(log::alevel::devel,"connection send_http_response");
1170 
1171  if (m_response.get_status_code() == http::status_code::uninitialized) {
1172  m_response.set_status(http::status_code::internal_server_error);
1173  }
1174 
1175  m_response.set_version("HTTP/1.1");
1176 
1177  // Set server header based on the user agent settings
1178  if (m_response.get_header("Server") == "") {
1179  if (!m_user_agent.empty()) {
1180  m_response.replace_header("Server",m_user_agent);
1181  } else {
1182  m_response.remove_header("Server");
1183  }
1184  }
1185 
1186  // have the processor generate the raw bytes for the wire (if it exists)
1187  if (m_processor) {
1188  m_handshake_buffer = m_processor->get_raw(m_response);
1189  } else {
1190  // a processor wont exist for raw HTTP responses.
1191  m_handshake_buffer = m_response.raw();
1192  }
1193 
1194  if (m_alog.static_test(log::alevel::devel)) {
1195  m_alog.write(log::alevel::devel,"Raw Handshake response:\n"+m_handshake_buffer);
1196  if (m_response.get_header("Sec-WebSocket-Key3") != "") {
1197  m_alog.write(log::alevel::devel,
1198  utility::to_hex(m_response.get_header("Sec-WebSocket-Key3")));
1199  }
1200  }
1201 
1202  // write raw bytes
1203  transport_con_type::async_write(
1204  m_handshake_buffer.data(),
1205  m_handshake_buffer.size(),
1206  lib::bind(
1207  &type::handle_send_http_response,
1208  type::get_shared(),
1209  lib::placeholders::_1
1210  )
1211  );
1212 }
1213 
1214 template <typename config>
1215 void connection<config>::handle_send_http_response(lib::error_code const & ec) {
1216  m_alog.write(log::alevel::devel,"handle_send_http_response");
1217 
1218  this->atomic_state_check(
1219  istate::PROCESS_HTTP_REQUEST,
1220  "handle_send_http_response must be called from PROCESS_HTTP_REQUEST state"
1221  );
1222 
1223  if (ec) {
1224  log_err(log::elevel::rerror,"handle_send_http_response",ec);
1225  this->terminate(ec);
1226  return;
1227  }
1228 
1229  this->log_open_result();
1230 
1231  if (m_handshake_timer) {
1232  m_handshake_timer->cancel();
1233  m_handshake_timer.reset();
1234  }
1235 
1236  if (m_response.get_status_code() != http::status_code::switching_protocols)
1237  {
1238  if (m_processor) {
1239  // this was a websocket connection that ended in an error
1240  std::stringstream s;
1241  s << "Handshake ended with HTTP error: "
1242  << m_response.get_status_code();
1243  m_elog.write(log::elevel::rerror,s.str());
1244  } else {
1245  // if this was not a websocket connection, we have written
1246  // the expected response and the connection can be closed.
1247  }
1248  this->terminate(make_error_code(error::http_connection_ended));
1249  return;
1250  }
1251 
1252  this->atomic_state_change(
1253  istate::PROCESS_HTTP_REQUEST,
1254  istate::PROCESS_CONNECTION,
1255  session::state::connecting,
1256  session::state::open,
1257  "handle_send_http_response must be called from PROCESS_HTTP_REQUEST state"
1258  );
1259 
1260  if (m_open_handler) {
1261  m_open_handler(m_connection_hdl);
1262  }
1263 
1264  this->handle_read_frame(lib::error_code(), m_buf_cursor);
1265 }
1266 
1267 template <typename config>
1268 void connection<config>::send_http_request() {
1269  m_alog.write(log::alevel::devel,"connection send_http_request");
1270 
1271  // TODO: origin header?
1272 
1273  // Have the protocol processor fill in the appropriate fields based on the
1274  // selected client version
1275  if (m_processor) {
1276  lib::error_code ec;
1277  ec = m_processor->client_handshake_request(m_request,m_uri,
1278  m_requested_subprotocols);
1279 
1280  if (ec) {
1281  log_err(log::elevel::fatal,"Internal library error: Processor",ec);
1282  return;
1283  }
1284  } else {
1285  m_elog.write(log::elevel::fatal,"Internal library error: missing processor");
1286  return;
1287  }
1288 
1289  // Unless the user has overridden the user agent, send generic WS++ UA.
1290  if (m_request.get_header("User-Agent") == "") {
1291  if (!m_user_agent.empty()) {
1292  m_request.replace_header("User-Agent",m_user_agent);
1293  } else {
1294  m_request.remove_header("User-Agent");
1295  }
1296  }
1297 
1298  m_handshake_buffer = m_request.raw();
1299 
1300  if (m_alog.static_test(log::alevel::devel)) {
1301  m_alog.write(log::alevel::devel,"Raw Handshake request:\n"+m_handshake_buffer);
1302  }
1303 
1304  if (m_open_handshake_timeout_dur > 0) {
1305  m_handshake_timer = transport_con_type::set_timer(
1306  m_open_handshake_timeout_dur,
1307  lib::bind(
1308  &type::handle_open_handshake_timeout,
1309  type::get_shared(),
1310  lib::placeholders::_1
1311  )
1312  );
1313  }
1314 
1315  transport_con_type::async_write(
1316  m_handshake_buffer.data(),
1317  m_handshake_buffer.size(),
1318  lib::bind(
1319  &type::handle_send_http_request,
1320  type::get_shared(),
1321  lib::placeholders::_1
1322  )
1323  );
1324 }
1325 
1326 template <typename config>
1327 void connection<config>::handle_send_http_request(lib::error_code const & ec) {
1328  m_alog.write(log::alevel::devel,"handle_send_http_request");
1329 
1330  this->atomic_state_check(
1331  istate::WRITE_HTTP_REQUEST,
1332  "handle_send_http_request must be called from WRITE_HTTP_REQUEST state"
1333  );
1334 
1335  if (ec) {
1336  log_err(log::elevel::rerror,"handle_send_http_request",ec);
1337  this->terminate(ec);
1338  return;
1339  }
1340 
1341  this->atomic_state_change(
1342  istate::WRITE_HTTP_REQUEST,
1343  istate::READ_HTTP_RESPONSE,
1344  "handle_send_http_request must be called from WRITE_HTTP_REQUEST state"
1345  );
1346 
1347  transport_con_type::async_read_at_least(
1348  1,
1349  m_buf,
1350  config::connection_read_buffer_size,
1351  lib::bind(
1352  &type::handle_read_http_response,
1353  type::get_shared(),
1354  lib::placeholders::_1,
1355  lib::placeholders::_2
1356  )
1357  );
1358 }
1359 
1360 template <typename config>
1361 void connection<config>::handle_read_http_response(lib::error_code const & ec,
1362  size_t bytes_transferred)
1363 {
1364  m_alog.write(log::alevel::devel,"handle_read_http_response");
1365 
1366  this->atomic_state_check(
1367  istate::READ_HTTP_RESPONSE,
1368  "handle_read_http_response must be called from READ_HTTP_RESPONSE state"
1369  );
1370 
1371  if (ec) {
1372  log_err(log::elevel::rerror,"handle_send_http_response",ec);
1373  this->terminate(ec);
1374  return;
1375  }
1376  size_t bytes_processed = 0;
1377  // TODO: refactor this to use error codes rather than exceptions
1378  try {
1379  bytes_processed = m_response.consume(m_buf,bytes_transferred);
1380  } catch (http::exception & e) {
1381  m_elog.write(log::elevel::rerror,
1382  std::string("error in handle_read_http_response: ")+e.what());
1383  this->terminate(make_error_code(error::general));
1384  return;
1385  }
1386 
1387  m_alog.write(log::alevel::devel,std::string("Raw response: ")+m_response.raw());
1388 
1389  if (m_response.headers_ready()) {
1390  if (m_handshake_timer) {
1391  m_handshake_timer->cancel();
1392  m_handshake_timer.reset();
1393  }
1394 
1395  lib::error_code validate_ec = m_processor->validate_server_handshake_response(
1396  m_request,
1397  m_response
1398  );
1399  if (validate_ec) {
1400  log_err(log::elevel::rerror,"Server handshake response",validate_ec);
1401  this->terminate(validate_ec);
1402  return;
1403  }
1404 
1405  // response is valid, connection can now be assumed to be open
1406  this->atomic_state_change(
1407  istate::READ_HTTP_RESPONSE,
1408  istate::PROCESS_CONNECTION,
1409  session::state::connecting,
1410  session::state::open,
1411  "handle_read_http_response must be called from READ_HTTP_RESPONSE state"
1412  );
1413 
1414  this->log_open_result();
1415 
1416  if (m_open_handler) {
1417  m_open_handler(m_connection_hdl);
1418  }
1419 
1420  // The remaining bytes in m_buf are frame data. Copy them to the
1421  // beginning of the buffer and note the length. They will be read after
1422  // the handshake completes and before more bytes are read.
1423  std::copy(m_buf+bytes_processed,m_buf+bytes_transferred,m_buf);
1424  m_buf_cursor = bytes_transferred-bytes_processed;
1425 
1426  this->handle_read_frame(lib::error_code(), m_buf_cursor);
1427  } else {
1428  transport_con_type::async_read_at_least(
1429  1,
1430  m_buf,
1431  config::connection_read_buffer_size,
1432  lib::bind(
1433  &type::handle_read_http_response,
1434  type::get_shared(),
1435  lib::placeholders::_1,
1436  lib::placeholders::_2
1437  )
1438  );
1439  }
1440 }
1441 
1442 template <typename config>
1443 void connection<config>::handle_open_handshake_timeout(
1444  lib::error_code const & ec)
1445 {
1447  m_alog.write(log::alevel::devel,"open handshake timer cancelled");
1448  } else if (ec) {
1449  m_alog.write(log::alevel::devel,
1450  "open handle_open_handshake_timeout error: "+ec.message());
1451  // TODO: ignore or fail here?
1452  } else {
1453  m_alog.write(log::alevel::devel,"open handshake timer expired");
1454  terminate(make_error_code(error::open_handshake_timeout));
1455  }
1456 }
1457 
1458 template <typename config>
1459 void connection<config>::handle_close_handshake_timeout(
1460  lib::error_code const & ec)
1461 {
1463  m_alog.write(log::alevel::devel,"asio close handshake timer cancelled");
1464  } else if (ec) {
1465  m_alog.write(log::alevel::devel,
1466  "asio open handle_close_handshake_timeout error: "+ec.message());
1467  // TODO: ignore or fail here?
1468  } else {
1469  m_alog.write(log::alevel::devel, "asio close handshake timer expired");
1470  terminate(make_error_code(error::close_handshake_timeout));
1471  }
1472 }
1473 
1474 template <typename config>
1475 void connection<config>::terminate(lib::error_code const & ec) {
1476  if (m_alog.static_test(log::alevel::devel)) {
1477  m_alog.write(log::alevel::devel,"connection terminate");
1478  }
1479 
1480  // Cancel close handshake timer
1481  if (m_handshake_timer) {
1482  m_handshake_timer->cancel();
1483  m_handshake_timer.reset();
1484  }
1485 
1486  terminate_status tstat = unknown;
1487  if (ec) {
1488  m_ec = ec;
1489  m_local_close_code = close::status::abnormal_close;
1490  m_local_close_reason = ec.message();
1491  }
1492 
1493  if (m_state == session::state::connecting) {
1494  m_state = session::state::closed;
1495  tstat = failed;
1496  } else if (m_state != session::state::closed) {
1497  m_state = session::state::closed;
1498  tstat = closed;
1499  } else {
1500  m_alog.write(log::alevel::devel,
1501  "terminate called on connection that was already terminated");
1502  return;
1503  }
1504 
1505  // TODO: choose between shutdown and close based on error code sent
1506 
1507  transport_con_type::async_shutdown(
1508  lib::bind(
1509  &type::handle_terminate,
1510  type::get_shared(),
1511  tstat,
1512  lib::placeholders::_1
1513  )
1514  );
1515 }
1516 
1517 template <typename config>
1518 void connection<config>::handle_terminate(terminate_status tstat,
1519  lib::error_code const & ec)
1520 {
1521  if (m_alog.static_test(log::alevel::devel)) {
1522  m_alog.write(log::alevel::devel,"connection handle_terminate");
1523  }
1524 
1525  if (ec) {
1526  // there was an error actually shutting down the connection
1527  log_err(log::elevel::devel,"handle_terminate",ec);
1528  }
1529 
1530  // clean shutdown
1531  if (tstat == failed) {
1532  if (m_fail_handler) {
1533  m_fail_handler(m_connection_hdl);
1534  }
1535  log_fail_result();
1536  } else if (tstat == closed) {
1537  if (m_close_handler) {
1538  m_close_handler(m_connection_hdl);
1539  }
1540  log_close_result();
1541  } else {
1542  m_elog.write(log::elevel::rerror,"Unknown terminate_status");
1543  }
1544 
1545  // call the termination handler if it exists
1546  // if it exists it might (but shouldn't) refer to a bad memory location.
1547  // If it does, we don't care and should catch and ignore it.
1548  if (m_termination_handler) {
1549  try {
1550  m_termination_handler(type::get_shared());
1551  } catch (std::exception const & e) {
1552  m_elog.write(log::elevel::warn,
1553  std::string("termination_handler call failed. Reason was: ")+e.what());
1554  }
1555  }
1556 }
1557 
1558 template <typename config>
1560  //m_alog.write(log::alevel::devel,"connection write_frame");
1561 
1562  {
1563  scoped_lock_type lock(m_write_lock);
1564 
1565  // Check the write flag. If true, there is an outstanding transport
1566  // write already. In this case we just return. The write handler will
1567  // start a new write if the write queue isn't empty. If false, we set
1568  // the write flag and proceed to initiate a transport write.
1569  if (m_write_flag) {
1570  return;
1571  }
1572 
1573  // pull off all the messages that are ready to write.
1574  // stop if we get a message marked terminal
1575  message_ptr next_message = write_pop();
1576  while (next_message) {
1577  m_current_msgs.push_back(next_message);
1578  if (!next_message->get_terminal()) {
1579  next_message = write_pop();
1580  } else {
1581  next_message = message_ptr();
1582  }
1583  }
1584 
1585  if (m_current_msgs.empty()) {
1586  // there was nothing to send
1587  return;
1588  } else {
1589  // At this point we own the next messages to be sent and are
1590  // responsible for holding the write flag until they are
1591  // successfully sent or there is some error
1592  m_write_flag = true;
1593  }
1594  }
1595 
1596  typename std::vector<message_ptr>::iterator it;
1597  for (it = m_current_msgs.begin(); it != m_current_msgs.end(); ++it) {
1598  std::string const & header = (*it)->get_header();
1599  std::string const & payload = (*it)->get_payload();
1600 
1601  m_send_buffer.push_back(transport::buffer(header.c_str(),header.size()));
1602  m_send_buffer.push_back(transport::buffer(payload.c_str(),payload.size()));
1603  }
1604 
1605  // Print detailed send stats if those log levels are enabled
1606  if (m_alog.static_test(log::alevel::frame_header)) {
1607  if (m_alog.dynamic_test(log::alevel::frame_header)) {
1608  std::stringstream general,header,payload;
1609 
1610  general << "Dispatching write containing " << m_current_msgs.size()
1611  <<" message(s) containing ";
1612  header << "Header Bytes: \n";
1613  payload << "Payload Bytes: \n";
1614 
1615  size_t hbytes = 0;
1616  size_t pbytes = 0;
1617 
1618  for (size_t i = 0; i < m_current_msgs.size(); i++) {
1619  hbytes += m_current_msgs[i]->get_header().size();
1620  pbytes += m_current_msgs[i]->get_payload().size();
1621 
1622 
1623  header << "[" << i << "] ("
1624  << m_current_msgs[i]->get_header().size() << ") "
1625  << utility::to_hex(m_current_msgs[i]->get_header()) << "\n";
1626 
1627  if (m_alog.static_test(log::alevel::frame_payload)) {
1628  if (m_alog.dynamic_test(log::alevel::frame_payload)) {
1629  payload << "[" << i << "] ("
1630  << m_current_msgs[i]->get_payload().size() << ") "
1631  << utility::to_hex(m_current_msgs[i]->get_payload())
1632  << "\n";
1633  }
1634  }
1635  }
1636 
1637  general << hbytes << " header bytes and " << pbytes << " payload bytes";
1638 
1639  m_alog.write(log::alevel::frame_header,general.str());
1640  m_alog.write(log::alevel::frame_header,header.str());
1641  m_alog.write(log::alevel::frame_payload,payload.str());
1642  }
1643  }
1644 
1645  transport_con_type::async_write(
1646  m_send_buffer,
1647  m_write_frame_handler
1648  );
1649 }
1650 
1651 template <typename config>
1652 void connection<config>::handle_write_frame(lib::error_code const & ec)
1653 {
1654  if (m_alog.static_test(log::alevel::devel)) {
1655  m_alog.write(log::alevel::devel,"connection handle_write_frame");
1656  }
1657 
1658  bool terminal = m_current_msgs.back()->get_terminal();
1659 
1660  m_send_buffer.clear();
1661  m_current_msgs.clear();
1662  // TODO: recycle instead of deleting
1663 
1664  if (ec) {
1665  log_err(log::elevel::fatal,"handle_write_frame",ec);
1666  this->terminate(ec);
1667  return;
1668  }
1669 
1670  if (terminal) {
1671  this->terminate(lib::error_code());
1672  return;
1673  }
1674 
1675  bool needs_writing = false;
1676  {
1677  scoped_lock_type lock(m_write_lock);
1678 
1679  // release write flag
1680  m_write_flag = false;
1681 
1682  needs_writing = !m_send_queue.empty();
1683  }
1684 
1685  if (needs_writing) {
1686  transport_con_type::dispatch(lib::bind(
1687  &type::write_frame,
1688  type::get_shared()
1689  ));
1690  }
1691 }
1692 
1693 template <typename config>
1694 void connection<config>::atomic_state_change(istate_type req, istate_type dest,
1695  std::string msg)
1696 {
1697  scoped_lock_type lock(m_connection_state_lock);
1698 
1699  if (m_internal_state != req) {
1700  throw error::make_error_code(error::invalid_state);
1701  //throw exception(msg,error::INVALID_STATE);
1702  }
1703 
1704  m_internal_state = dest;
1705 }
1706 
1707 template <typename config>
1708 void connection<config>::atomic_state_change(istate_type internal_req,
1709  istate_type internal_dest, session::state::value external_req,
1710  session::state::value external_dest, std::string msg)
1711 {
1712  scoped_lock_type lock(m_connection_state_lock);
1713 
1714  if (m_internal_state != internal_req || m_state != external_req) {
1715  throw error::make_error_code(error::invalid_state);
1716  //throw exception(msg,error::INVALID_STATE);
1717  }
1718 
1719  m_internal_state = internal_dest;
1720  m_state = external_dest;
1721 }
1722 
1723 template <typename config>
1724 void connection<config>::atomic_state_check(istate_type req, std::string msg)
1725 {
1726  scoped_lock_type lock(m_connection_state_lock);
1727 
1728  if (m_internal_state != req) {
1729  throw error::make_error_code(error::invalid_state);
1730  //throw exception(msg,error::INVALID_STATE);
1731  }
1732 }
1733 
1734 template <typename config>
1735 const std::vector<int>& connection<config>::get_supported_versions() const
1736 {
1737  return versions_supported;
1738 }
1739 
1740 template <typename config>
1741 void connection<config>::process_control_frame(typename config::message_type::ptr msg)
1742 {
1743  m_alog.write(log::alevel::devel,"process_control_frame");
1744 
1745  frame::opcode::value op = msg->get_opcode();
1746  lib::error_code ec;
1747 
1748  std::stringstream s;
1749  s << "Control frame received with opcode " << op;
1750  m_alog.write(log::alevel::control,s.str());
1751 
1752  if (m_state == session::state::closed) {
1753  m_elog.write(log::elevel::warn,"got frame in state closed");
1754  return;
1755  }
1756  if (op != frame::opcode::CLOSE && m_state != session::state::open) {
1757  m_elog.write(log::elevel::warn,"got non-close frame in state closing");
1758  return;
1759  }
1760 
1761  if (op == frame::opcode::PING) {
1762  bool should_reply = true;
1763 
1764  if (m_ping_handler) {
1765  should_reply = m_ping_handler(m_connection_hdl, msg->get_payload());
1766  }
1767 
1768  if (should_reply) {
1769  this->pong(msg->get_payload(),ec);
1770  if (ec) {
1771  log_err(log::elevel::devel,"Failed to send response pong",ec);
1772  }
1773  }
1774  } else if (op == frame::opcode::PONG) {
1775  if (m_pong_handler) {
1776  m_pong_handler(m_connection_hdl, msg->get_payload());
1777  }
1778  if (m_ping_timer) {
1779  m_ping_timer->cancel();
1780  }
1781  } else if (op == frame::opcode::CLOSE) {
1782  m_alog.write(log::alevel::devel,"got close frame");
1783  // record close code and reason somewhere
1784 
1785  m_remote_close_code = close::extract_code(msg->get_payload(),ec);
1786  if (ec) {
1787  s.str("");
1788  if (config::drop_on_protocol_error) {
1789  s << "Received invalid close code " << m_remote_close_code
1790  << " dropping connection per config.";
1791  m_elog.write(log::elevel::devel,s.str());
1792  this->terminate(ec);
1793  } else {
1794  s << "Received invalid close code " << m_remote_close_code
1795  << " sending acknowledgement and closing";
1796  m_elog.write(log::elevel::devel,s.str());
1797  ec = send_close_ack(close::status::protocol_error,
1798  "Invalid close code");
1799  if (ec) {
1800  log_err(log::elevel::devel,"send_close_ack",ec);
1801  }
1802  }
1803  return;
1804  }
1805 
1806  m_remote_close_reason = close::extract_reason(msg->get_payload(),ec);
1807  if (ec) {
1808  if (config::drop_on_protocol_error) {
1809  m_elog.write(log::elevel::devel,
1810  "Received invalid close reason. Dropping connection per config");
1811  this->terminate(ec);
1812  } else {
1813  m_elog.write(log::elevel::devel,
1814  "Received invalid close reason. Sending acknowledgement and closing");
1815  ec = send_close_ack(close::status::protocol_error,
1816  "Invalid close reason");
1817  if (ec) {
1818  log_err(log::elevel::devel,"send_close_ack",ec);
1819  }
1820  }
1821  return;
1822  }
1823 
1824  if (m_state == session::state::open) {
1825  s.str("");
1826  s << "Received close frame with code " << m_remote_close_code
1827  << " and reason " << m_remote_close_reason;
1828  m_alog.write(log::alevel::devel,s.str());
1829 
1830  ec = send_close_ack();
1831  if (ec) {
1832  log_err(log::elevel::devel,"send_close_ack",ec);
1833  }
1834  } else if (m_state == session::state::closing && !m_was_clean) {
1835  // ack of our close
1836  m_alog.write(log::alevel::devel, "Got acknowledgement of close");
1837 
1838  m_was_clean = true;
1839 
1840  // If we are a server terminate the connection now. Clients should
1841  // leave the connection open to give the server an opportunity to
1842  // initiate the TCP close. The client's timer will handle closing
1843  // its side of the connection if the server misbehaves.
1844  //
1845  // TODO: different behavior if the underlying transport doesn't
1846  // support timers?
1847  if (m_is_server) {
1848  terminate(lib::error_code());
1849  }
1850  } else {
1851  // spurious, ignore
1852  m_elog.write(log::elevel::devel, "Got close frame in wrong state");
1853  }
1854  } else {
1855  // got an invalid control opcode
1856  m_elog.write(log::elevel::devel, "Got control frame with invalid opcode");
1857  // initiate protocol error shutdown
1858  }
1859 }
1860 
1861 template <typename config>
1862 lib::error_code connection<config>::send_close_ack(close::status::value code,
1863  std::string const & reason)
1864 {
1865  return send_close_frame(code,reason,true,m_is_server);
1866 }
1867 
1868 template <typename config>
1869 lib::error_code connection<config>::send_close_frame(close::status::value code,
1870  std::string const & reason, bool ack, bool terminal)
1871 {
1872  m_alog.write(log::alevel::devel,"send_close_frame");
1873 
1874  // check for special codes
1875 
1876  // If silent close is set, respect it and blank out close information
1877  // Otherwise use whatever has been specified in the parameters. If
1878  // parameters specifies close::status::blank then determine what to do
1879  // based on whether or not this is an ack. If it is not an ack just
1880  // send blank info. If it is an ack then echo the close information from
1881  // the remote endpoint.
1882  if (config::silent_close) {
1883  m_alog.write(log::alevel::devel,"closing silently");
1884  m_local_close_code = close::status::no_status;
1885  m_local_close_reason = "";
1886  } else if (code != close::status::blank) {
1887  m_alog.write(log::alevel::devel,"closing with specified codes");
1888  m_local_close_code = code;
1889  m_local_close_reason = reason;
1890  } else if (!ack) {
1891  m_alog.write(log::alevel::devel,"closing with no status code");
1892  m_local_close_code = close::status::no_status;
1893  m_local_close_reason = "";
1894  } else if (m_remote_close_code == close::status::no_status) {
1895  m_alog.write(log::alevel::devel,
1896  "acknowledging a no-status close with normal code");
1897  m_local_close_code = close::status::normal;
1898  m_local_close_reason = "";
1899  } else {
1900  m_alog.write(log::alevel::devel,"acknowledging with remote codes");
1901  m_local_close_code = m_remote_close_code;
1902  m_local_close_reason = m_remote_close_reason;
1903  }
1904 
1905  std::stringstream s;
1906  s << "Closing with code: " << m_local_close_code << ", and reason: "
1907  << m_local_close_reason;
1908  m_alog.write(log::alevel::devel,s.str());
1909 
1910  message_ptr msg = m_msg_manager->get_message();
1911  if (!msg) {
1912  return error::make_error_code(error::no_outgoing_buffers);
1913  }
1914 
1915  lib::error_code ec = m_processor->prepare_close(m_local_close_code,
1916  m_local_close_reason,msg);
1917  if (ec) {
1918  return ec;
1919  }
1920 
1921  // Messages flagged terminal will result in the TCP connection being dropped
1922  // after the message has been written. This is typically used when servers
1923  // send an ack and when any endpoint encounters a protocol error
1924  if (terminal) {
1925  msg->set_terminal(true);
1926  }
1927 
1928  m_state = session::state::closing;
1929 
1930  if (ack) {
1931  m_was_clean = true;
1932  }
1933 
1934  // Start a timer so we don't wait forever for the acknowledgement close
1935  // frame
1936  if (m_close_handshake_timeout_dur > 0) {
1937  m_handshake_timer = transport_con_type::set_timer(
1938  m_close_handshake_timeout_dur,
1939  lib::bind(
1940  &type::handle_close_handshake_timeout,
1941  type::get_shared(),
1942  lib::placeholders::_1
1943  )
1944  );
1945  }
1946 
1947  bool needs_writing = false;
1948  {
1949  scoped_lock_type lock(m_write_lock);
1950  write_push(msg);
1951  needs_writing = !m_write_flag && !m_send_queue.empty();
1952  }
1953 
1954  if (needs_writing) {
1955  transport_con_type::dispatch(lib::bind(
1956  &type::write_frame,
1957  type::get_shared()
1958  ));
1959  }
1960 
1961  return lib::error_code();
1962 }
1963 
1964 template <typename config>
1965 typename connection<config>::processor_ptr
1966 connection<config>::get_processor(int version) const {
1967  // TODO: allow disabling certain versions
1968 
1969  processor_ptr p;
1970 
1971  switch (version) {
1972  case 0:
1973  p.reset(new processor::hybi00<config>(
1974  transport_con_type::is_secure(),
1975  m_is_server,
1976  m_msg_manager
1977  ));
1978  break;
1979  case 7:
1980  p.reset(new processor::hybi07<config>(
1981  transport_con_type::is_secure(),
1982  m_is_server,
1983  m_msg_manager,
1984  m_rng
1985  ));
1986  break;
1987  case 8:
1988  p.reset(new processor::hybi08<config>(
1989  transport_con_type::is_secure(),
1990  m_is_server,
1991  m_msg_manager,
1992  m_rng
1993  ));
1994  break;
1995  case 13:
1996  p.reset(new processor::hybi13<config>(
1997  transport_con_type::is_secure(),
1998  m_is_server,
1999  m_msg_manager,
2000  m_rng
2001  ));
2002  break;
2003  default:
2004  return p;
2005  }
2006 
2007  // Settings not configured by the constructor
2008  p->set_max_message_size(m_max_message_size);
2009 
2010  return p;
2011 }
2012 
2013 template <typename config>
2014 void connection<config>::write_push(typename config::message_type::ptr msg)
2015 {
2016  if (!msg) {
2017  return;
2018  }
2019 
2020  m_send_buffer_size += msg->get_payload().size();
2021  m_send_queue.push(msg);
2022 
2023  if (m_alog.static_test(log::alevel::devel)) {
2024  std::stringstream s;
2025  s << "write_push: message count: " << m_send_queue.size()
2026  << " buffer size: " << m_send_buffer_size;
2027  m_alog.write(log::alevel::devel,s.str());
2028  }
2029 }
2030 
2031 template <typename config>
2032 typename config::message_type::ptr connection<config>::write_pop()
2033 {
2034  message_ptr msg;
2035 
2036  if (m_send_queue.empty()) {
2037  return msg;
2038  }
2039 
2040  msg = m_send_queue.front();
2041 
2042  m_send_buffer_size -= msg->get_payload().size();
2043  m_send_queue.pop();
2044 
2045  if (m_alog.static_test(log::alevel::devel)) {
2046  std::stringstream s;
2047  s << "write_pop: message count: " << m_send_queue.size()
2048  << " buffer size: " << m_send_buffer_size;
2049  m_alog.write(log::alevel::devel,s.str());
2050  }
2051  return msg;
2052 }
2053 
2054 template <typename config>
2055 void connection<config>::log_open_result()
2056 {
2057  std::stringstream s;
2058 
2059  int version;
2060  if (!processor::is_websocket_handshake(m_request)) {
2061  version = -1;
2062  } else {
2063  version = processor::get_websocket_version(m_request);
2064  }
2065 
2066  // Connection Type
2067  s << (version == -1 ? "HTTP" : "WebSocket") << " Connection ";
2068 
2069  // Remote endpoint address
2070  s << transport_con_type::get_remote_endpoint() << " ";
2071 
2072  // Version string if WebSocket
2073  if (version != -1) {
2074  s << "v" << version << " ";
2075  }
2076 
2077  // User Agent
2078  std::string ua = m_request.get_header("User-Agent");
2079  if (ua == "") {
2080  s << "\"\" ";
2081  } else {
2082  // check if there are any quotes in the user agent
2083  s << "\"" << utility::string_replace_all(ua,"\"","\\\"") << "\" ";
2084  }
2085 
2086  // URI
2087  s << (m_uri ? m_uri->get_resource() : "NULL") << " ";
2088 
2089  // Status code
2090  s << m_response.get_status_code();
2091 
2092  m_alog.write(log::alevel::connect,s.str());
2093 }
2094 
2095 template <typename config>
2096 void connection<config>::log_close_result()
2097 {
2098  std::stringstream s;
2099 
2100  s << "Disconnect "
2101  << "close local:[" << m_local_close_code
2102  << (m_local_close_reason == "" ? "" : ","+m_local_close_reason)
2103  << "] remote:[" << m_remote_close_code
2104  << (m_remote_close_reason == "" ? "" : ","+m_remote_close_reason) << "]";
2105 
2106  m_alog.write(log::alevel::disconnect,s.str());
2107 }
2108 
2109 template <typename config>
2110 void connection<config>::log_fail_result()
2111 {
2112  // TODO: include more information about the connection?
2113  // should this be filed under connect rather than disconnect?
2114  m_alog.write(log::alevel::disconnect,"Failed: "+m_ec.message());
2115 }
2116 
2117 } // namespace websocketpp
2118 
2119 #endif // WEBSOCKETPP_CONNECTION_IMPL_HPP
bool is_control(value v)
Check if an opcode is for a control frame.
Definition: frame.hpp:138
void add_subprotocol(std::string const &request, lib::error_code &ec)
Adds the given subprotocol string to the request list (exception free)
static level const fatal
Definition: levels.hpp:59
void set_termination_handler(termination_handler new_handler)
void read_frame()
Issue a new transport read unless reading is paused.
std::string const & get_subprotocol() const
Gets the negotated subprotocol.
uint16_t value
The type of a close code value.
Definition: close.hpp:49
std::string const & get_origin() const
Return the same origin policy origin value from the opening request.
static level const control
One line per control frame.
Definition: levels.hpp:106
bool terminal(value code)
Determine if the code represents an unrecoverable error.
Definition: close.hpp:197
uri_ptr get_uri_from_host(request_type &request, std::string scheme)
Extract a URI ptr from the host header of the request.
Definition: processor.hpp:130
lib::error_code pause_reading()
Pause reading of new data.
bool is_websocket_handshake(request_type &r)
Determine whether or not a generic HTTP request is a WebSocket handshake.
Definition: processor.hpp:66
Attempted to use a client specific feature on a server endpoint.
Definition: error.hpp:103
session::state::value get_state() const
Return the connection state.
static std::vector< int > const versions_supported(helper, helper+4)
Container that stores the list of protocol versions supported.
Selected subprotocol was not requested by the client.
Definition: error.hpp:100
int get_websocket_version(request_type &r)
Extract the version from a WebSocket handshake request.
Definition: processor.hpp:105
uri_ptr get_uri() const
Gets the connection URI.
void replace_header(std::string const &key, std::string const &val)
Replace a header.
void ping(std::string const &payload)
Send a ping.
Represents an individual WebSocket connection.
Definition: connection.hpp:221
size_t get_buffered_amount() const
Get the size of the outgoing write buffer (in payload bytes)
std::string const & get_request_header(std::string const &key)
Retrieve a request header.
static level const frame_payload
One line per frame, includes the full message payload (warning: chatty)
Definition: levels.hpp:110
static value const protocol_error
A protocol error occurred.
Definition: close.hpp:83
static value const normal
Definition: close.hpp:76
static level const devel
Low level debugging information (warning: very chatty)
Definition: levels.hpp:44
std::string string_replace_all(std::string subject, const std::string &search, const std::string &replace)
Replace all occurrances of a substring with another.
status::value extract_code(std::string const &payload, lib::error_code &ec)
Extract a close code value from a close payload.
Definition: close.hpp:264
void select_subprotocol(std::string const &value, lib::error_code &ec)
Select a subprotocol to use (exception free)
std::string to_hex(const std::string &input)
Convert std::string to ascii printed string of hex digits.
void pong(std::string const &payload)
Send a pong.
bool is_not_token_char(unsigned char c)
Is the character a non-token.
Definition: constants.hpp:98
void atomic_state_check(istate_type req, std::string msg)
Atomically read and compared the internal state.
void handle_resume_reading()
Resume reading callback.
static level const frame_header
One line per frame, includes the full frame header.
Definition: levels.hpp:108
std::string const & get_resource() const
Returns the resource component of the connection URI.
static level const devel
Development messages (warning: very chatty)
Definition: levels.hpp:122
static level const disconnect
One line for each closed connection. Includes closing codes and reasons.
Definition: levels.hpp:104
const std::vector< int > & get_supported_versions() const
Get array of WebSocket protocol versions that this connection supports.
std::string const & get_host() const
Returns the host component of the connection URI.
lib::error_code resume_reading()
Resume reading of new data.
void close(close::status::value const code, std::string const &reason)
Close the connection.
void atomic_state_change(istate_type req, istate_type dest, std::string msg)
Atomically change the internal connection state.
void set_status(http::status_code::value code)
Set response status code and message.
void handle_pong_timeout(std::string payload, lib::error_code const &ec)
Utility method that gets called back when the ping timer expires.
close::status::value to_ws(lib::error_code ec)
Converts a processor error_code into a websocket close code.
Definition: base.hpp:256
void append_header(std::string const &key, std::string const &val)
Append a header.
lib::error_code send(std::string const &payload, frame::opcode::value op=frame::opcode::text)
Create a message and then add it to the outgoing send queue.
The connection was in the wrong state for this operation.
Definition: error.hpp:72
static level const info
Definition: levels.hpp:50
void write_frame()
Checks if there are frames in the send queue and if there are sends one.
void handle_write_frame(lib::error_code const &ec)
Process the results of a frame write operation and start the next write.
Namespace for the WebSocket++ project.
Definition: base64.hpp:41
uint16_t get_port() const
Returns the port component of the connection URI.
std::string const & get_response_header(std::string const &key)
Retrieve a response header.
A simple utility buffer class.
Definition: connection.hpp:137
The endpoint is out of outgoing message buffers.
Definition: error.hpp:66
lib::shared_ptr< uri > uri_ptr
Pointer to a URI.
Definition: uri.hpp:350
void handle_interrupt()
Transport inturrupt callback.
static value const no_status
A dummy value to indicate that no status code was received.
Definition: close.hpp:97
void handle_pause_reading()
Pause reading callback.
WebSocket close handshake timed out.
Definition: error.hpp:115
Catch-all library error.
Definition: error.hpp:45
std::vector< std::string > const & get_requested_subprotocols() const
Gets all of the subprotocols requested by the client.
bool get_secure() const
Returns the secure flag from the connection URI.
static value const abnormal_close
A dummy value to indicate that the connection was closed abnormally.
Definition: close.hpp:104
static uint8_t const close_reason_size
Maximum size of close frame reason.
Definition: frame.hpp:168
void set_uri(uri_ptr uri)
Sets the connection URI.
WebSocket opening handshake timed out.
Definition: error.hpp:112
Attempted to use a server specific feature on a client endpoint.
Definition: error.hpp:106
lib::error_code interrupt()
Asyncronously invoke handler::on_inturrupt.
std::string extract_reason(std::string const &payload, lib::error_code &ec)
Extract the reason string from a close payload.
Definition: close.hpp:303
static level const rerror
Definition: levels.hpp:56
static level const warn
Definition: levels.hpp:53
static value const blank
A blank value for internal use.
Definition: close.hpp:52
static level const connect
Information about new connections.
Definition: levels.hpp:102
void remove_header(std::string const &key)
Remove a header.
void set_body(std::string const &value)
Set response body content.