websocketpp  0.4.0
C++/Boost Asio based websocket client/server library
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Pages
connection_impl.hpp
1 /*
2  * Copyright (c) 2014, Peter Thorson. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright
7  * notice, this list of conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright
9  * notice, this list of conditions and the following disclaimer in the
10  * documentation and/or other materials provided with the distribution.
11  * * Neither the name of the WebSocket++ Project nor the
12  * names of its contributors may be used to endorse or promote products
13  * derived from this software without specific prior written permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED. IN NO EVENT SHALL PETER THORSON BE LIABLE FOR ANY
19  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
22  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25  *
26  */
27 
28 #ifndef WEBSOCKETPP_CONNECTION_IMPL_HPP
29 #define WEBSOCKETPP_CONNECTION_IMPL_HPP
30 
31 #include <websocketpp/common/platforms.hpp>
32 #include <websocketpp/common/system_error.hpp>
33 
34 #include <websocketpp/processors/processor.hpp>
35 
36 #include <websocketpp/processors/hybi00.hpp>
37 #include <websocketpp/processors/hybi07.hpp>
38 #include <websocketpp/processors/hybi08.hpp>
39 #include <websocketpp/processors/hybi13.hpp>
40 
41 namespace websocketpp {
42 
43 namespace istate = session::internal_state;
44 
45 template <typename config>
47  termination_handler new_handler)
48 {
49  m_alog.write(log::alevel::devel,
50  "connection set_termination_handler");
51 
52  //scoped_lock_type lock(m_connection_state_lock);
53 
54  m_termination_handler = new_handler;
55 }
56 
57 template <typename config>
58 const std::string& connection<config>::get_origin() const {
59  //scoped_lock_type lock(m_connection_state_lock);
60  return m_processor->get_origin(m_request);
61 }
62 
63 template <typename config>
65  //scoped_lock_type lock(m_connection_state_lock);
66  return m_send_buffer_size;
67 }
68 
69 template <typename config>
70 session::state::value connection<config>::get_state() const {
71  //scoped_lock_type lock(m_connection_state_lock);
72  return m_state;
73 }
74 
75 template <typename config>
76 lib::error_code connection<config>::send(const std::string& payload,
77  frame::opcode::value op)
78 {
79  message_ptr msg = m_msg_manager->get_message(op,payload.size());
80  msg->append_payload(payload);
81 
82  return send(msg);
83 }
84 
85 template <typename config>
86 lib::error_code connection<config>::send(const void* payload, size_t len,
87  frame::opcode::value op)
88 {
89  message_ptr msg = m_msg_manager->get_message(op,len);
90  msg->append_payload(payload,len);
91 
92  return send(msg);
93 }
94 
95 template <typename config>
96 lib::error_code connection<config>::send(typename config::message_type::ptr msg)
97 {
98  if (m_alog.static_test(log::alevel::devel)) {
99  m_alog.write(log::alevel::devel,"connection send");
100  }
101  // TODO:
102 
103  if (m_state != session::state::open) {
104  return error::make_error_code(error::invalid_state);
105  }
106 
107  message_ptr outgoing_msg;
108  bool needs_writing = false;
109 
110  if (msg->get_prepared()) {
111  outgoing_msg = msg;
112 
113  scoped_lock_type lock(m_write_lock);
114  write_push(outgoing_msg);
115  needs_writing = !m_write_flag && !m_send_queue.empty();
116  } else {
117  outgoing_msg = m_msg_manager->get_message();
118 
119  if (!outgoing_msg) {
120  return error::make_error_code(error::no_outgoing_buffers);
121  }
122 
123  scoped_lock_type lock(m_write_lock);
124  lib::error_code ec = m_processor->prepare_data_frame(msg,outgoing_msg);
125 
126  if (ec) {
127  return ec;
128  }
129 
130  write_push(outgoing_msg);
131  needs_writing = !m_write_flag && !m_send_queue.empty();
132  }
133 
134  if (needs_writing) {
135  transport_con_type::dispatch(lib::bind(
136  &type::write_frame,
137  type::get_shared()
138  ));
139  }
140 
141  return lib::error_code();
142 }
143 
144 template <typename config>
145 void connection<config>::ping(const std::string& payload, lib::error_code& ec) {
146  if (m_alog.static_test(log::alevel::devel)) {
147  m_alog.write(log::alevel::devel,"connection ping");
148  }
149 
150  if (m_state != session::state::open) {
151  ec = error::make_error_code(error::invalid_state);
152  return;
153  }
154 
155  message_ptr msg = m_msg_manager->get_message();
156  if (!msg) {
157  ec = error::make_error_code(error::no_outgoing_buffers);
158  return;
159  }
160 
161  ec = m_processor->prepare_ping(payload,msg);
162  if (ec) {return;}
163 
164  // set ping timer if we are listening for one
165  if (m_pong_timeout_handler) {
166  // Cancel any existing timers
167  if (m_ping_timer) {
168  m_ping_timer->cancel();
169  }
170 
171  if (m_pong_timeout_dur > 0) {
172  m_ping_timer = transport_con_type::set_timer(
173  m_pong_timeout_dur,
174  lib::bind(
175  &type::handle_pong_timeout,
176  type::get_shared(),
177  payload,
178  lib::placeholders::_1
179  )
180  );
181  }
182 
183  if (!m_ping_timer) {
184  // Our transport doesn't support timers
185  m_elog.write(log::elevel::warn,"Warning: a pong_timeout_handler is \
186  set but the transport in use does not support timeouts.");
187  }
188  }
189 
190  bool needs_writing = false;
191  {
192  scoped_lock_type lock(m_write_lock);
193  write_push(msg);
194  needs_writing = !m_write_flag && !m_send_queue.empty();
195  }
196 
197  if (needs_writing) {
198  transport_con_type::dispatch(lib::bind(
199  &type::write_frame,
200  type::get_shared()
201  ));
202  }
203 
204  ec = lib::error_code();
205 }
206 
207 template<typename config>
208 void connection<config>::ping(std::string const & payload) {
209  lib::error_code ec;
210  ping(payload,ec);
211  if (ec) {
212  throw exception(ec);
213  }
214 }
215 
216 template<typename config>
218  lib::error_code const & ec)
219 {
220  if (ec) {
222  // ignore, this is expected
223  return;
224  }
225 
226  m_elog.write(log::elevel::devel,"pong_timeout error: "+ec.message());
227  return;
228  }
229 
230  if (m_pong_timeout_handler) {
231  m_pong_timeout_handler(m_connection_hdl,payload);
232  }
233 }
234 
235 template <typename config>
236 void connection<config>::pong(const std::string& payload, lib::error_code& ec) {
237  if (m_alog.static_test(log::alevel::devel)) {
238  m_alog.write(log::alevel::devel,"connection pong");
239  }
240 
241  if (m_state != session::state::open) {
242  ec = error::make_error_code(error::invalid_state);
243  return;
244  }
245 
246  message_ptr msg = m_msg_manager->get_message();
247  if (!msg) {
248  ec = error::make_error_code(error::no_outgoing_buffers);
249  return;
250  }
251 
252  ec = m_processor->prepare_pong(payload,msg);
253  if (ec) {return;}
254 
255  bool needs_writing = false;
256  {
257  scoped_lock_type lock(m_write_lock);
258  write_push(msg);
259  needs_writing = !m_write_flag && !m_send_queue.empty();
260  }
261 
262  if (needs_writing) {
263  transport_con_type::dispatch(lib::bind(
264  &type::write_frame,
265  type::get_shared()
266  ));
267  }
268 
269  ec = lib::error_code();
270 }
271 
272 template<typename config>
273 void connection<config>::pong(std::string const & payload) {
274  lib::error_code ec;
275  pong(payload,ec);
276  if (ec) {
277  throw exception(ec);
278  }
279 }
280 
281 template <typename config>
283  std::string const & reason, lib::error_code & ec)
284 {
285  if (m_alog.static_test(log::alevel::devel)) {
286  m_alog.write(log::alevel::devel,"connection close");
287  }
288 
289  if (m_state != session::state::open) {
290  ec = error::make_error_code(error::invalid_state);
291  return;
292  }
293 
294  // Truncate reason to maximum size allowable in a close frame.
295  std::string tr(reason,0,std::min<size_t>(reason.size(),
297 
298  ec = this->send_close_frame(code,tr,false,close::status::terminal(code));
299 }
300 
301 template<typename config>
303  std::string const & reason)
304 {
305  lib::error_code ec;
306  close(code,reason,ec);
307  if (ec) {
308  throw exception(ec);
309  }
310 }
311 
313 
316 template <typename config>
317 lib::error_code connection<config>::interrupt() {
318  m_alog.write(log::alevel::devel,"connection connection::interrupt");
319  return transport_con_type::interrupt(
320  lib::bind(
321  &type::handle_interrupt,
322  type::get_shared()
323  )
324  );
325 }
326 
327 
328 template <typename config>
330  if (m_interrupt_handler) {
331  m_interrupt_handler(m_connection_hdl);
332  }
333 }
334 
335 template <typename config>
337  m_alog.write(log::alevel::devel,"connection connection::pause_reading");
338  return transport_con_type::dispatch(
339  lib::bind(
340  &type::handle_pause_reading,
341  type::get_shared()
342  )
343  );
344 }
345 
347 template <typename config>
349  m_alog.write(log::alevel::devel,"connection connection::handle_pause_reading");
350  m_read_flag = false;
351 }
352 
353 template <typename config>
355  m_alog.write(log::alevel::devel,"connection connection::resume_reading");
356  return transport_con_type::dispatch(
357  lib::bind(
358  &type::handle_resume_reading,
359  type::get_shared()
360  )
361  );
362 }
363 
365 template <typename config>
367  m_read_flag = true;
368  read_frame();
369 }
370 
371 
372 
373 
374 
375 
376 
377 
378 
379 
380 
381 template <typename config>
383  //scoped_lock_type lock(m_connection_state_lock);
384  return m_uri->get_secure();
385 }
386 
387 template <typename config>
388 const std::string& connection<config>::get_host() const {
389  //scoped_lock_type lock(m_connection_state_lock);
390  return m_uri->get_host();
391 }
392 
393 template <typename config>
394 const std::string& connection<config>::get_resource() const {
395  //scoped_lock_type lock(m_connection_state_lock);
396  return m_uri->get_resource();
397 }
398 
399 template <typename config>
401  //scoped_lock_type lock(m_connection_state_lock);
402  return m_uri->get_port();
403 }
404 
405 template <typename config>
407  //scoped_lock_type lock(m_connection_state_lock);
408  return m_uri;
409 }
410 
411 template <typename config>
413  //scoped_lock_type lock(m_connection_state_lock);
414  m_uri = uri;
415 }
416 
417 
418 
419 
420 
421 
422 template <typename config>
423 const std::string & connection<config>::get_subprotocol() const {
424  return m_subprotocol;
425 }
426 
427 template <typename config>
428 const std::vector<std::string> &
430  return m_requested_subprotocols;
431 }
432 
433 template <typename config>
435  lib::error_code & ec)
436 {
437  if (m_is_server) {
438  ec = error::make_error_code(error::client_only);
439  return;
440  }
441 
442  // If the value is empty or has a non-RFC2616 token character it is invalid.
443  if (value.empty() || std::find_if(value.begin(),value.end(),
444  http::is_not_token_char) != value.end())
445  {
446  ec = error::make_error_code(error::invalid_subprotocol);
447  return;
448  }
449 
450  m_requested_subprotocols.push_back(value);
451 }
452 
453 template <typename config>
454 void connection<config>::add_subprotocol(std::string const & value) {
455  lib::error_code ec;
456  this->add_subprotocol(value,ec);
457  if (ec) {
458  throw exception(ec);
459  }
460 }
461 
462 
463 template <typename config>
465  lib::error_code & ec)
466 {
467  if (!m_is_server) {
468  ec = error::make_error_code(error::server_only);
469  return;
470  }
471 
472  if (value.empty()) {
473  ec = lib::error_code();
474  return;
475  }
476 
477  std::vector<std::string>::iterator it;
478 
479  it = std::find(m_requested_subprotocols.begin(),
480  m_requested_subprotocols.end(),
481  value);
482 
483  if (it == m_requested_subprotocols.end()) {
484  ec = error::make_error_code(error::unrequested_subprotocol);
485  return;
486  }
487 
488  m_subprotocol = value;
489 }
490 
491 template <typename config>
493  lib::error_code ec;
494  this->select_subprotocol(value,ec);
495  if (ec) {
496  throw exception(ec);
497  }
498 }
499 
500 
501 template <typename config>
502 const std::string &
503 connection<config>::get_request_header(std::string const & key) {
504  return m_request.get_header(key);
505 }
506 
507 template <typename config>
508 const std::string &
509 connection<config>::get_response_header(std::string const & key) {
510  return m_response.get_header(key);
511 }
512 
513 template <typename config>
514 void connection<config>::set_status(http::status_code::value code)
515 {
516  //scoped_lock_type lock(m_connection_state_lock);
517 
518  if (m_internal_state != istate::PROCESS_HTTP_REQUEST) {
519  throw exception("Call to set_status from invalid state",
520  error::make_error_code(error::invalid_state));
521  }
522 
523  m_response.set_status(code);
524 }
525 template <typename config>
526 void connection<config>::set_status(http::status_code::value code,
527  std::string const & msg)
528 {
529  //scoped_lock_type lock(m_connection_state_lock);
530 
531  if (m_internal_state != istate::PROCESS_HTTP_REQUEST) {
532  throw exception("Call to set_status from invalid state",
533  error::make_error_code(error::invalid_state));
534  }
535 
536  m_response.set_status(code,msg);
537 }
538 template <typename config>
539 void connection<config>::set_body(std::string const & value) {
540  //scoped_lock_type lock(m_connection_state_lock);
541 
542  if (m_internal_state != istate::PROCESS_HTTP_REQUEST) {
543  throw exception("Call to set_status from invalid state",
544  error::make_error_code(error::invalid_state));
545  }
546 
547  m_response.set_body(value);
548 }
549 
550 template <typename config>
551 void connection<config>::append_header(std::string const & key,
552  std::string const & val)
553 {
554  //scoped_lock_type lock(m_connection_state_lock);
555 
556  if (m_is_server) {
557  if (m_internal_state == istate::PROCESS_HTTP_REQUEST) {
558  // we are setting response headers for an incoming server connection
559  m_response.append_header(key,val);
560  } else {
561  throw exception("Call to append_header from invalid state",
562  error::make_error_code(error::invalid_state));
563  }
564  } else {
565  if (m_internal_state == istate::USER_INIT) {
566  // we are setting initial headers for an outgoing client connection
567  m_request.append_header(key,val);
568  } else {
569  throw exception("Call to append_header from invalid state",
570  error::make_error_code(error::invalid_state));
571  }
572  }
573 }
574 template <typename config>
575 void connection<config>::replace_header(std::string const & key,
576  std::string const & val)
577 {
578  // scoped_lock_type lock(m_connection_state_lock);
579 
580  if (m_is_server) {
581  if (m_internal_state == istate::PROCESS_HTTP_REQUEST) {
582  // we are setting response headers for an incoming server connection
583  m_response.replace_header(key,val);
584  } else {
585  throw exception("Call to replace_header from invalid state",
586  error::make_error_code(error::invalid_state));
587  }
588  } else {
589  if (m_internal_state == istate::USER_INIT) {
590  // we are setting initial headers for an outgoing client connection
591  m_request.replace_header(key,val);
592  } else {
593  throw exception("Call to replace_header from invalid state",
594  error::make_error_code(error::invalid_state));
595  }
596  }
597 }
598 template <typename config>
599 void connection<config>::remove_header(std::string const & key)
600 {
601  //scoped_lock_type lock(m_connection_state_lock);
602 
603  if (m_is_server) {
604  if (m_internal_state == istate::PROCESS_HTTP_REQUEST) {
605  // we are setting response headers for an incoming server connection
606  m_response.remove_header(key);
607  } else {
608  throw exception("Call to remove_header from invalid state",
609  error::make_error_code(error::invalid_state));
610  }
611  } else {
612  if (m_internal_state == istate::USER_INIT) {
613  // we are setting initial headers for an outgoing client connection
614  m_request.remove_header(key);
615  } else {
616  throw exception("Call to remove_header from invalid state",
617  error::make_error_code(error::invalid_state));
618  }
619  }
620 }
621 
622 
623 
624 
625 
626 
627 /******** logic thread ********/
628 
629 template <typename config>
631  m_alog.write(log::alevel::devel,"connection start");
632 
633  this->atomic_state_change(
634  istate::USER_INIT,
635  istate::TRANSPORT_INIT,
636  "Start must be called from user init state"
637  );
638 
639  // Depending on how the transport implements init this function may return
640  // immediately and call handle_transport_init later or call
641  // handle_transport_init from this function.
642  transport_con_type::init(
643  lib::bind(
644  &type::handle_transport_init,
645  type::get_shared(),
646  lib::placeholders::_1
647  )
648  );
649 }
650 
651 template <typename config>
652 void connection<config>::handle_transport_init(lib::error_code const & ec) {
653  m_alog.write(log::alevel::devel,"connection handle_transport_init");
654 
655  {
656  scoped_lock_type lock(m_connection_state_lock);
657 
658  if (m_internal_state != istate::TRANSPORT_INIT) {
659  throw exception("handle_transport_init must be called from transport init state",
660  error::make_error_code(error::invalid_state));
661  }
662 
663  if (!ec) {
664  // unless there was a transport error, advance internal state.
665  if (m_is_server) {
666  m_internal_state = istate::READ_HTTP_REQUEST;
667  } else {
668  m_internal_state = istate::WRITE_HTTP_REQUEST;
669  }
670  }
671  }
672 
673  if (ec) {
674  std::stringstream s;
675  s << "handle_transport_init received error: "<< ec.message();
676  m_elog.write(log::elevel::fatal,s.str());
677 
678  this->terminate(ec);
679  return;
680  }
681 
682  // At this point the transport is ready to read and write bytes.
683  if (m_is_server) {
684  this->read_handshake(1);
685  } else {
686  // We are a client. Set the processor to the version specified in the
687  // config file and send a handshake request.
688  m_processor = get_processor(config::client_version);
689  this->send_http_request();
690  }
691 }
692 
693 template <typename config>
694 void connection<config>::read_handshake(size_t num_bytes) {
695  m_alog.write(log::alevel::devel,"connection read");
696 
697  if (m_open_handshake_timeout_dur > 0) {
698  m_handshake_timer = transport_con_type::set_timer(
699  m_open_handshake_timeout_dur,
700  lib::bind(
701  &type::handle_open_handshake_timeout,
702  type::get_shared(),
703  lib::placeholders::_1
704  )
705  );
706  }
707 
708  transport_con_type::async_read_at_least(
709  num_bytes,
710  m_buf,
711  config::connection_read_buffer_size,
712  lib::bind(
713  &type::handle_read_handshake,
714  type::get_shared(),
715  lib::placeholders::_1,
716  lib::placeholders::_2
717  )
718  );
719 }
720 
721 // All exit paths for this function need to call send_http_response() or submit
722 // a new read request with this function as the handler.
723 template <typename config>
724 void connection<config>::handle_read_handshake(lib::error_code const & ec,
725  size_t bytes_transferred)
726 {
727  m_alog.write(log::alevel::devel,"connection handle_read_handshake");
728 
729  this->atomic_state_check(
730  istate::READ_HTTP_REQUEST,
731  "handle_read_handshake must be called from READ_HTTP_REQUEST state"
732  );
733 
734  if (ec) {
735  if (ec == transport::error::eof) {
736  // we expect to get eof if the connection is closed already
737  if (m_state == session::state::closed) {
738  m_alog.write(log::alevel::devel,"got eof from closed con");
739  return;
740  }
741  }
742 
743  std::stringstream s;
744  s << "error in handle_read_handshake: "<< ec.message();
745  m_elog.write(log::elevel::fatal,s.str());
746  this->terminate(ec);
747  return;
748  }
749 
750  // Boundaries checking. TODO: How much of this should be done?
751  if (bytes_transferred > config::connection_read_buffer_size) {
752  m_elog.write(log::elevel::fatal,"Fatal boundaries checking error.");
753  this->terminate(make_error_code(error::general));
754  return;
755  }
756 
757  size_t bytes_processed = 0;
758  try {
759  bytes_processed = m_request.consume(m_buf,bytes_transferred);
760  } catch (http::exception &e) {
761  // All HTTP exceptions will result in this request failing and an error
762  // response being returned. No more bytes will be read in this con.
763  m_response.set_status(e.m_error_code,e.m_error_msg);
764  this->send_http_response_error();
765  return;
766  }
767 
768  // More paranoid boundaries checking.
769  // TODO: Is this overkill?
770  if (bytes_processed > config::connection_read_buffer_size) {
771  m_elog.write(log::elevel::fatal,"Fatal boundaries checking error.");
772  this->terminate(make_error_code(error::general));
773  return;
774  }
775 
776  if (m_alog.static_test(log::alevel::devel)) {
777  std::stringstream s;
778  s << "bytes_transferred: " << bytes_transferred
779  << " bytes, bytes processed: " << bytes_processed << " bytes";
780  m_alog.write(log::alevel::devel,s.str());
781  }
782 
783  if (m_request.ready()) {
784  if (!this->initialize_processor()) {
785  this->send_http_response_error();
786  return;
787  }
788 
789  if (m_processor && m_processor->get_version() == 0) {
790  // Version 00 has an extra requirement to read some bytes after the
791  // handshake
792  if (bytes_transferred-bytes_processed >= 8) {
793  m_request.replace_header(
794  "Sec-WebSocket-Key3",
795  std::string(m_buf+bytes_processed,m_buf+bytes_processed+8)
796  );
797  bytes_processed += 8;
798  } else {
799  // TODO: need more bytes
800  m_alog.write(log::alevel::devel,"short key3 read");
801  m_response.set_status(http::status_code::internal_server_error);
802  this->send_http_response_error();
803  return;
804  }
805  }
806 
807  if (m_alog.static_test(log::alevel::devel)) {
808  m_alog.write(log::alevel::devel,m_request.raw());
809  if (m_request.get_header("Sec-WebSocket-Key3") != "") {
810  m_alog.write(log::alevel::devel,
811  utility::to_hex(m_request.get_header("Sec-WebSocket-Key3")));
812  }
813  }
814 
815  // The remaining bytes in m_buf are frame data. Copy them to the
816  // beginning of the buffer and note the length. They will be read after
817  // the handshake completes and before more bytes are read.
818  std::copy(m_buf+bytes_processed,m_buf+bytes_transferred,m_buf);
819  m_buf_cursor = bytes_transferred-bytes_processed;
820 
821  this->atomic_state_change(
822  istate::READ_HTTP_REQUEST,
823  istate::PROCESS_HTTP_REQUEST,
824  "send_http_response must be called from READ_HTTP_REQUEST state"
825  );
826 
827  // We have the complete request. Process it.
828  this->process_handshake_request();
829  this->send_http_response();
830  } else {
831  // read at least 1 more byte
832  transport_con_type::async_read_at_least(
833  1,
834  m_buf,
835  config::connection_read_buffer_size,
836  lib::bind(
837  &type::handle_read_handshake,
838  type::get_shared(),
839  lib::placeholders::_1,
840  lib::placeholders::_2
841  )
842  );
843  }
844 }
845 
846 // send_http_response requires the request to be fully read and the connection
847 // to be in the PROCESS_HTTP_REQUEST state. In some cases we can detect errors
848 // before the request is fully read (specifically at a point where we aren't
849 // sure if the hybi00 key3 bytes need to be read). This method sets the correct
850 // state and calls send_http_response
851 template <typename config>
852 void connection<config>::send_http_response_error() {
853  this->atomic_state_change(
854  istate::READ_HTTP_REQUEST,
855  istate::PROCESS_HTTP_REQUEST,
856  "send_http_response must be called from READ_HTTP_REQUEST state"
857  );
858  this->send_http_response();
859 }
860 
861 // All exit paths for this function need to call send_http_response() or submit
862 // a new read request with this function as the handler.
863 template <typename config>
864 void connection<config>::handle_read_frame(lib::error_code const & ec,
865  size_t bytes_transferred)
866 {
867  //m_alog.write(log::alevel::devel,"connection handle_read_frame");
868 
869  this->atomic_state_check(
870  istate::PROCESS_CONNECTION,
871  "handle_read_frame must be called from PROCESS_CONNECTION state"
872  );
873 
874  if (ec) {
875  log::level echannel = log::elevel::fatal;
876 
877  if (ec == transport::error::eof) {
878  if (m_state == session::state::closed) {
879  // we expect to get eof if the connection is closed already
880  // just ignore it
881  m_alog.write(log::alevel::devel,"got eof from closed con");
882  return;
883  } else if (m_state == session::state::closing && !m_is_server) {
884  // If we are a client we expect to get eof in the closing state,
885  // this is a signal to terminate our end of the connection after
886  // the closing handshake
887  terminate(lib::error_code());
888  return;
889  }
890  }
892  if (m_state == session::state::closed) {
893  // We expect to get a TLS short read if we try to read after the
894  // connection is closed. If this happens ignore and exit the
895  // read frame path.
896  terminate(lib::error_code());
897  return;
898  }
899  echannel = log::elevel::rerror;
900  } else if (ec == transport::error::action_after_shutdown) {
901  echannel = log::elevel::info;
902  }
903 
904  log_err(echannel, "handle_read_frame", ec);
905  this->terminate(ec);
906  return;
907  }
908 
909  // Boundaries checking. TODO: How much of this should be done?
910  /*if (bytes_transferred > config::connection_read_buffer_size) {
911  m_elog.write(log::elevel::fatal,"Fatal boundaries checking error");
912  this->terminate(make_error_code(error::general));
913  return;
914  }*/
915 
916  size_t p = 0;
917 
918  if (m_alog.static_test(log::alevel::devel)) {
919  std::stringstream s;
920  s << "p = " << p << " bytes transferred = " << bytes_transferred;
921  m_alog.write(log::alevel::devel,s.str());
922  }
923 
924  while (p < bytes_transferred) {
925  if (m_alog.static_test(log::alevel::devel)) {
926  std::stringstream s;
927  s << "calling consume with " << bytes_transferred-p << " bytes";
928  m_alog.write(log::alevel::devel,s.str());
929  }
930 
931  lib::error_code consume_ec;
932 
933  p += m_processor->consume(
934  reinterpret_cast<uint8_t*>(m_buf)+p,
935  bytes_transferred-p,
936  consume_ec
937  );
938 
939  if (m_alog.static_test(log::alevel::devel)) {
940  std::stringstream s;
941  s << "bytes left after consume: " << bytes_transferred-p;
942  m_alog.write(log::alevel::devel,s.str());
943  }
944  if (consume_ec) {
945  log_err(log::elevel::rerror, "consume", consume_ec);
946 
947  if (config::drop_on_protocol_error) {
948  this->terminate(consume_ec);
949  return;
950  } else {
951  lib::error_code close_ec;
952  this->close(
953  processor::error::to_ws(consume_ec),
954  consume_ec.message(),
955  close_ec
956  );
957 
958  if (close_ec) {
959  log_err(log::elevel::fatal, "Protocol error close frame ", close_ec);
960  this->terminate(close_ec);
961  return;
962  }
963  }
964  return;
965  }
966 
967  if (m_processor->ready()) {
968  if (m_alog.static_test(log::alevel::devel)) {
969  std::stringstream s;
970  s << "Complete message received. Dispatching";
971  m_alog.write(log::alevel::devel,s.str());
972  }
973 
974  message_ptr msg = m_processor->get_message();
975 
976  if (!msg) {
977  m_alog.write(log::alevel::devel, "null message from m_processor");
978  } else if (!is_control(msg->get_opcode())) {
979  // data message, dispatch to user
980  if (m_state != session::state::open) {
981  m_elog.write(log::elevel::warn, "got non-close frame while closing");
982  } else if (m_message_handler) {
983  m_message_handler(m_connection_hdl, msg);
984  }
985  } else {
986  process_control_frame(msg);
987  }
988  }
989  }
990 
991  read_frame();
992 }
993 
995 template <typename config>
997  if (!m_read_flag) {
998  return;
999  }
1000 
1001  transport_con_type::async_read_at_least(
1002  // std::min wont work with undefined static const values.
1003  // TODO: is there a more elegant way to do this?
1004  // Need to determine if requesting 1 byte or the exact number of bytes
1005  // is better here. 1 byte lets us be a bit more responsive at a
1006  // potential expense of additional runs through handle_read_frame
1007  /*(m_processor->get_bytes_needed() > config::connection_read_buffer_size ?
1008  config::connection_read_buffer_size : m_processor->get_bytes_needed())*/
1009  1,
1010  m_buf,
1011  config::connection_read_buffer_size,
1012  m_handle_read_frame
1013  );
1014 }
1015 
1016 template <typename config>
1018  m_alog.write(log::alevel::devel,"initialize_processor");
1019 
1020  // if it isn't a websocket handshake nothing to do.
1021  if (!processor::is_websocket_handshake(m_request)) {
1022  return true;
1023  }
1024 
1025  int version = processor::get_websocket_version(m_request);
1026 
1027  if (version < 0) {
1028  m_alog.write(log::alevel::devel, "BAD REQUEST: can't determine version");
1029  m_response.set_status(http::status_code::bad_request);
1030  return false;
1031  }
1032 
1033  m_processor = get_processor(version);
1034 
1035  // if the processor is not null we are done
1036  if (m_processor) {
1037  return true;
1038  }
1039 
1040  // We don't have a processor for this version. Return bad request
1041  // with Sec-WebSocket-Version header filled with values we do accept
1042  m_alog.write(log::alevel::devel, "BAD REQUEST: no processor for version");
1043  m_response.set_status(http::status_code::bad_request);
1044 
1045  std::stringstream ss;
1046  std::string sep = "";
1047  std::vector<int>::const_iterator it;
1048  for (it = versions_supported.begin(); it != versions_supported.end(); it++)
1049  {
1050  ss << sep << *it;
1051  sep = ",";
1052  }
1053 
1054  m_response.replace_header("Sec-WebSocket-Version",ss.str());
1055  return false;
1056 }
1057 
1058 template <typename config>
1060  m_alog.write(log::alevel::devel,"process handshake request");
1061 
1062  if (!processor::is_websocket_handshake(m_request)) {
1063  // this is not a websocket handshake. Process as plain HTTP
1064  m_alog.write(log::alevel::devel,"HTTP REQUEST");
1065 
1066  // extract URI from request
1068  m_request,
1069  (transport_con_type::is_secure() ? "https" : "http")
1070  );
1071 
1072  if (!m_uri->get_valid()) {
1073  m_alog.write(log::alevel::devel, "Bad request: failed to parse uri");
1074  m_response.set_status(http::status_code::bad_request);
1075  return false;
1076  }
1077 
1078  if (m_http_handler) {
1079  m_http_handler(m_connection_hdl);
1080  } else {
1081  set_status(http::status_code::upgrade_required);
1082  }
1083 
1084  return true;
1085  }
1086 
1087  lib::error_code ec = m_processor->validate_handshake(m_request);
1088 
1089  // Validate: make sure all required elements are present.
1090  if (ec){
1091  // Not a valid handshake request
1092  m_alog.write(log::alevel::devel, "Bad request " + ec.message());
1093  m_response.set_status(http::status_code::bad_request);
1094  return false;
1095  }
1096 
1097  // Read extension parameters and set up values necessary for the end user
1098  // to complete extension negotiation.
1099  std::pair<lib::error_code,std::string> neg_results;
1100  neg_results = m_processor->negotiate_extensions(m_request);
1101 
1102  if (neg_results.first) {
1103  // There was a fatal error in extension parsing that should result in
1104  // a failed connection attempt.
1105  m_alog.write(log::alevel::devel, "Bad request: " + neg_results.first.message());
1106  m_response.set_status(http::status_code::bad_request);
1107  return false;
1108  } else {
1109  // extension negotiation succeeded, set response header accordingly
1110  // we don't send an empty extensions header because it breaks many
1111  // clients.
1112  if (neg_results.second.size() > 0) {
1113  m_response.replace_header("Sec-WebSocket-Extensions",
1114  neg_results.second);
1115  }
1116  }
1117 
1118  // extract URI from request
1119  m_uri = m_processor->get_uri(m_request);
1120 
1121 
1122  if (!m_uri->get_valid()) {
1123  m_alog.write(log::alevel::devel, "Bad request: failed to parse uri");
1124  m_response.set_status(http::status_code::bad_request);
1125  return false;
1126  }
1127 
1128  // extract subprotocols
1129  lib::error_code subp_ec = m_processor->extract_subprotocols(m_request,
1130  m_requested_subprotocols);
1131 
1132  if (subp_ec) {
1133  // should we do anything?
1134  }
1135 
1136  // Ask application to validate the connection
1137  if (!m_validate_handler || m_validate_handler(m_connection_hdl)) {
1138  m_response.set_status(http::status_code::switching_protocols);
1139 
1140  // Write the appropriate response headers based on request and
1141  // processor version
1142  ec = m_processor->process_handshake(m_request,m_subprotocol,m_response);
1143 
1144  if (ec) {
1145  std::stringstream s;
1146  s << "Processing error: " << ec << "(" << ec.message() << ")";
1147  m_alog.write(log::alevel::devel, s.str());
1148 
1149  m_response.set_status(http::status_code::internal_server_error);
1150  return false;
1151  }
1152  } else {
1153  // User application has rejected the handshake
1154  m_alog.write(log::alevel::devel, "USER REJECT");
1155 
1156  // Use Bad Request if the user handler did not provide a more
1157  // specific http response error code.
1158  // TODO: is there a better default?
1159  if (m_response.get_status_code() == http::status_code::uninitialized) {
1160  m_response.set_status(http::status_code::bad_request);
1161  }
1162 
1163  return false;
1164  }
1165 
1166  return true;
1167 }
1168 
1169 template <typename config>
1171  m_alog.write(log::alevel::devel,"connection send_http_response");
1172 
1173  if (m_response.get_status_code() == http::status_code::uninitialized) {
1174  m_response.set_status(http::status_code::internal_server_error);
1175  }
1176 
1177  m_response.set_version("HTTP/1.1");
1178 
1179  // Set server header based on the user agent settings
1180  if (m_response.get_header("Server") == "") {
1181  if (!m_user_agent.empty()) {
1182  m_response.replace_header("Server",m_user_agent);
1183  } else {
1184  m_response.remove_header("Server");
1185  }
1186  }
1187 
1188  // have the processor generate the raw bytes for the wire (if it exists)
1189  if (m_processor) {
1190  m_handshake_buffer = m_processor->get_raw(m_response);
1191  } else {
1192  // a processor wont exist for raw HTTP responses.
1193  m_handshake_buffer = m_response.raw();
1194  }
1195 
1196  if (m_alog.static_test(log::alevel::devel)) {
1197  m_alog.write(log::alevel::devel,"Raw Handshake response:\n"+m_handshake_buffer);
1198  if (m_response.get_header("Sec-WebSocket-Key3") != "") {
1199  m_alog.write(log::alevel::devel,
1200  utility::to_hex(m_response.get_header("Sec-WebSocket-Key3")));
1201  }
1202  }
1203 
1204  // write raw bytes
1205  transport_con_type::async_write(
1206  m_handshake_buffer.data(),
1207  m_handshake_buffer.size(),
1208  lib::bind(
1209  &type::handle_send_http_response,
1210  type::get_shared(),
1211  lib::placeholders::_1
1212  )
1213  );
1214 }
1215 
1216 template <typename config>
1217 void connection<config>::handle_send_http_response(lib::error_code const & ec) {
1218  m_alog.write(log::alevel::devel,"handle_send_http_response");
1219 
1220  this->atomic_state_check(
1221  istate::PROCESS_HTTP_REQUEST,
1222  "handle_send_http_response must be called from PROCESS_HTTP_REQUEST state"
1223  );
1224 
1225  if (ec) {
1226  log_err(log::elevel::rerror,"handle_send_http_response",ec);
1227  this->terminate(ec);
1228  return;
1229  }
1230 
1231  this->log_open_result();
1232 
1233  if (m_handshake_timer) {
1234  m_handshake_timer->cancel();
1235  m_handshake_timer.reset();
1236  }
1237 
1238  if (m_response.get_status_code() != http::status_code::switching_protocols)
1239  {
1240  if (m_processor) {
1241  // this was a websocket connection that ended in an error
1242  std::stringstream s;
1243  s << "Handshake ended with HTTP error: "
1244  << m_response.get_status_code();
1245  m_elog.write(log::elevel::rerror,s.str());
1246  } else {
1247  // if this was not a websocket connection, we have written
1248  // the expected response and the connection can be closed.
1249  }
1250  this->terminate(make_error_code(error::http_connection_ended));
1251  return;
1252  }
1253 
1254  this->atomic_state_change(
1255  istate::PROCESS_HTTP_REQUEST,
1256  istate::PROCESS_CONNECTION,
1257  session::state::connecting,
1258  session::state::open,
1259  "handle_send_http_response must be called from PROCESS_HTTP_REQUEST state"
1260  );
1261 
1262  if (m_open_handler) {
1263  m_open_handler(m_connection_hdl);
1264  }
1265 
1266  this->handle_read_frame(lib::error_code(), m_buf_cursor);
1267 }
1268 
1269 template <typename config>
1270 void connection<config>::send_http_request() {
1271  m_alog.write(log::alevel::devel,"connection send_http_request");
1272 
1273  // TODO: origin header?
1274 
1275  // Have the protocol processor fill in the appropriate fields based on the
1276  // selected client version
1277  if (m_processor) {
1278  lib::error_code ec;
1279  ec = m_processor->client_handshake_request(m_request,m_uri,
1280  m_requested_subprotocols);
1281 
1282  if (ec) {
1283  log_err(log::elevel::fatal,"Internal library error: Processor",ec);
1284  return;
1285  }
1286  } else {
1287  m_elog.write(log::elevel::fatal,"Internal library error: missing processor");
1288  return;
1289  }
1290 
1291  // Unless the user has overridden the user agent, send generic WS++ UA.
1292  if (m_request.get_header("User-Agent") == "") {
1293  if (!m_user_agent.empty()) {
1294  m_request.replace_header("User-Agent",m_user_agent);
1295  } else {
1296  m_request.remove_header("User-Agent");
1297  }
1298  }
1299 
1300  m_handshake_buffer = m_request.raw();
1301 
1302  if (m_alog.static_test(log::alevel::devel)) {
1303  m_alog.write(log::alevel::devel,"Raw Handshake request:\n"+m_handshake_buffer);
1304  }
1305 
1306  if (m_open_handshake_timeout_dur > 0) {
1307  m_handshake_timer = transport_con_type::set_timer(
1308  m_open_handshake_timeout_dur,
1309  lib::bind(
1310  &type::handle_open_handshake_timeout,
1311  type::get_shared(),
1312  lib::placeholders::_1
1313  )
1314  );
1315  }
1316 
1317  transport_con_type::async_write(
1318  m_handshake_buffer.data(),
1319  m_handshake_buffer.size(),
1320  lib::bind(
1321  &type::handle_send_http_request,
1322  type::get_shared(),
1323  lib::placeholders::_1
1324  )
1325  );
1326 }
1327 
1328 template <typename config>
1329 void connection<config>::handle_send_http_request(lib::error_code const & ec) {
1330  m_alog.write(log::alevel::devel,"handle_send_http_request");
1331 
1332  this->atomic_state_check(
1333  istate::WRITE_HTTP_REQUEST,
1334  "handle_send_http_request must be called from WRITE_HTTP_REQUEST state"
1335  );
1336 
1337  if (ec) {
1338  log_err(log::elevel::rerror,"handle_send_http_request",ec);
1339  this->terminate(ec);
1340  return;
1341  }
1342 
1343  this->atomic_state_change(
1344  istate::WRITE_HTTP_REQUEST,
1345  istate::READ_HTTP_RESPONSE,
1346  "handle_send_http_request must be called from WRITE_HTTP_REQUEST state"
1347  );
1348 
1349  transport_con_type::async_read_at_least(
1350  1,
1351  m_buf,
1352  config::connection_read_buffer_size,
1353  lib::bind(
1354  &type::handle_read_http_response,
1355  type::get_shared(),
1356  lib::placeholders::_1,
1357  lib::placeholders::_2
1358  )
1359  );
1360 }
1361 
1362 template <typename config>
1363 void connection<config>::handle_read_http_response(lib::error_code const & ec,
1364  size_t bytes_transferred)
1365 {
1366  m_alog.write(log::alevel::devel,"handle_read_http_response");
1367 
1368  this->atomic_state_check(
1369  istate::READ_HTTP_RESPONSE,
1370  "handle_read_http_response must be called from READ_HTTP_RESPONSE state"
1371  );
1372 
1373  if (ec) {
1374  log_err(log::elevel::rerror,"handle_send_http_response",ec);
1375  this->terminate(ec);
1376  return;
1377  }
1378  size_t bytes_processed = 0;
1379  // TODO: refactor this to use error codes rather than exceptions
1380  try {
1381  bytes_processed = m_response.consume(m_buf,bytes_transferred);
1382  } catch (http::exception & e) {
1383  m_elog.write(log::elevel::rerror,
1384  std::string("error in handle_read_http_response: ")+e.what());
1385  this->terminate(make_error_code(error::general));
1386  return;
1387  }
1388 
1389  m_alog.write(log::alevel::devel,std::string("Raw response: ")+m_response.raw());
1390 
1391  if (m_response.headers_ready()) {
1392  if (m_handshake_timer) {
1393  m_handshake_timer->cancel();
1394  m_handshake_timer.reset();
1395  }
1396 
1397  lib::error_code validate_ec = m_processor->validate_server_handshake_response(
1398  m_request,
1399  m_response
1400  );
1401  if (validate_ec) {
1402  log_err(log::elevel::rerror,"Server handshake response",validate_ec);
1403  this->terminate(validate_ec);
1404  return;
1405  }
1406 
1407  // response is valid, connection can now be assumed to be open
1408  this->atomic_state_change(
1409  istate::READ_HTTP_RESPONSE,
1410  istate::PROCESS_CONNECTION,
1411  session::state::connecting,
1412  session::state::open,
1413  "handle_read_http_response must be called from READ_HTTP_RESPONSE state"
1414  );
1415 
1416  this->log_open_result();
1417 
1418  if (m_open_handler) {
1419  m_open_handler(m_connection_hdl);
1420  }
1421 
1422  // The remaining bytes in m_buf are frame data. Copy them to the
1423  // beginning of the buffer and note the length. They will be read after
1424  // the handshake completes and before more bytes are read.
1425  std::copy(m_buf+bytes_processed,m_buf+bytes_transferred,m_buf);
1426  m_buf_cursor = bytes_transferred-bytes_processed;
1427 
1428  this->handle_read_frame(lib::error_code(), m_buf_cursor);
1429  } else {
1430  transport_con_type::async_read_at_least(
1431  1,
1432  m_buf,
1433  config::connection_read_buffer_size,
1434  lib::bind(
1435  &type::handle_read_http_response,
1436  type::get_shared(),
1437  lib::placeholders::_1,
1438  lib::placeholders::_2
1439  )
1440  );
1441  }
1442 }
1443 
1444 template <typename config>
1445 void connection<config>::handle_open_handshake_timeout(
1446  lib::error_code const & ec)
1447 {
1449  m_alog.write(log::alevel::devel,"open handshake timer cancelled");
1450  } else if (ec) {
1451  m_alog.write(log::alevel::devel,
1452  "open handle_open_handshake_timeout error: "+ec.message());
1453  // TODO: ignore or fail here?
1454  } else {
1455  m_alog.write(log::alevel::devel,"open handshake timer expired");
1456  terminate(make_error_code(error::open_handshake_timeout));
1457  }
1458 }
1459 
1460 template <typename config>
1461 void connection<config>::handle_close_handshake_timeout(
1462  lib::error_code const & ec)
1463 {
1465  m_alog.write(log::alevel::devel,"asio close handshake timer cancelled");
1466  } else if (ec) {
1467  m_alog.write(log::alevel::devel,
1468  "asio open handle_close_handshake_timeout error: "+ec.message());
1469  // TODO: ignore or fail here?
1470  } else {
1471  m_alog.write(log::alevel::devel, "asio close handshake timer expired");
1472  terminate(make_error_code(error::close_handshake_timeout));
1473  }
1474 }
1475 
1476 template <typename config>
1477 void connection<config>::terminate(lib::error_code const & ec) {
1478  if (m_alog.static_test(log::alevel::devel)) {
1479  m_alog.write(log::alevel::devel,"connection terminate");
1480  }
1481 
1482  // Cancel close handshake timer
1483  if (m_handshake_timer) {
1484  m_handshake_timer->cancel();
1485  m_handshake_timer.reset();
1486  }
1487 
1488  terminate_status tstat = unknown;
1489  if (ec) {
1490  m_ec = ec;
1491  m_local_close_code = close::status::abnormal_close;
1492  m_local_close_reason = ec.message();
1493  }
1494 
1495  if (m_state == session::state::connecting) {
1496  m_state = session::state::closed;
1497  tstat = failed;
1498  } else if (m_state != session::state::closed) {
1499  m_state = session::state::closed;
1500  tstat = closed;
1501  } else {
1502  m_alog.write(log::alevel::devel,
1503  "terminate called on connection that was already terminated");
1504  return;
1505  }
1506 
1507  // TODO: choose between shutdown and close based on error code sent
1508 
1509  transport_con_type::async_shutdown(
1510  lib::bind(
1511  &type::handle_terminate,
1512  type::get_shared(),
1513  tstat,
1514  lib::placeholders::_1
1515  )
1516  );
1517 }
1518 
1519 template <typename config>
1520 void connection<config>::handle_terminate(terminate_status tstat,
1521  lib::error_code const & ec)
1522 {
1523  if (m_alog.static_test(log::alevel::devel)) {
1524  m_alog.write(log::alevel::devel,"connection handle_terminate");
1525  }
1526 
1527  if (ec) {
1528  // there was an error actually shutting down the connection
1529  log_err(log::elevel::devel,"handle_terminate",ec);
1530  }
1531 
1532  // clean shutdown
1533  if (tstat == failed) {
1534  if (m_fail_handler) {
1535  m_fail_handler(m_connection_hdl);
1536  }
1537  log_fail_result();
1538  } else if (tstat == closed) {
1539  if (m_close_handler) {
1540  m_close_handler(m_connection_hdl);
1541  }
1542  log_close_result();
1543  } else {
1544  m_elog.write(log::elevel::rerror,"Unknown terminate_status");
1545  }
1546 
1547  // call the termination handler if it exists
1548  // if it exists it might (but shouldn't) refer to a bad memory location.
1549  // If it does, we don't care and should catch and ignore it.
1550  if (m_termination_handler) {
1551  try {
1552  m_termination_handler(type::get_shared());
1553  } catch (std::exception const & e) {
1554  m_elog.write(log::elevel::warn,
1555  std::string("termination_handler call failed. Reason was: ")+e.what());
1556  }
1557  }
1558 }
1559 
1560 template <typename config>
1562  //m_alog.write(log::alevel::devel,"connection write_frame");
1563 
1564  {
1565  scoped_lock_type lock(m_write_lock);
1566 
1567  // Check the write flag. If true, there is an outstanding transport
1568  // write already. In this case we just return. The write handler will
1569  // start a new write if the write queue isn't empty. If false, we set
1570  // the write flag and proceed to initiate a transport write.
1571  if (m_write_flag) {
1572  return;
1573  }
1574 
1575  // pull off all the messages that are ready to write.
1576  // stop if we get a message marked terminal
1577  message_ptr next_message = write_pop();
1578  while (next_message) {
1579  m_current_msgs.push_back(next_message);
1580  if (!next_message->get_terminal()) {
1581  next_message = write_pop();
1582  } else {
1583  next_message = message_ptr();
1584  }
1585  }
1586 
1587  if (m_current_msgs.empty()) {
1588  // there was nothing to send
1589  return;
1590  } else {
1591  // At this point we own the next messages to be sent and are
1592  // responsible for holding the write flag until they are
1593  // successfully sent or there is some error
1594  m_write_flag = true;
1595  }
1596  }
1597 
1598  typename std::vector<message_ptr>::iterator it;
1599  for (it = m_current_msgs.begin(); it != m_current_msgs.end(); ++it) {
1600  std::string const & header = (*it)->get_header();
1601  std::string const & payload = (*it)->get_payload();
1602 
1603  m_send_buffer.push_back(transport::buffer(header.c_str(),header.size()));
1604  m_send_buffer.push_back(transport::buffer(payload.c_str(),payload.size()));
1605  }
1606 
1607  // Print detailed send stats if those log levels are enabled
1608  if (m_alog.static_test(log::alevel::frame_header)) {
1609  if (m_alog.dynamic_test(log::alevel::frame_header)) {
1610  std::stringstream general,header,payload;
1611 
1612  general << "Dispatching write containing " << m_current_msgs.size()
1613  <<" message(s) containing ";
1614  header << "Header Bytes: \n";
1615  payload << "Payload Bytes: \n";
1616 
1617  size_t hbytes = 0;
1618  size_t pbytes = 0;
1619 
1620  for (size_t i = 0; i < m_current_msgs.size(); i++) {
1621  hbytes += m_current_msgs[i]->get_header().size();
1622  pbytes += m_current_msgs[i]->get_payload().size();
1623 
1624 
1625  header << "[" << i << "] ("
1626  << m_current_msgs[i]->get_header().size() << ") "
1627  << utility::to_hex(m_current_msgs[i]->get_header()) << "\n";
1628 
1629  if (m_alog.static_test(log::alevel::frame_payload)) {
1630  if (m_alog.dynamic_test(log::alevel::frame_payload)) {
1631  payload << "[" << i << "] ("
1632  << m_current_msgs[i]->get_payload().size() << ") "
1633  << utility::to_hex(m_current_msgs[i]->get_payload())
1634  << "\n";
1635  }
1636  }
1637  }
1638 
1639  general << hbytes << " header bytes and " << pbytes << " payload bytes";
1640 
1641  m_alog.write(log::alevel::frame_header,general.str());
1642  m_alog.write(log::alevel::frame_header,header.str());
1643  m_alog.write(log::alevel::frame_payload,payload.str());
1644  }
1645  }
1646 
1647  transport_con_type::async_write(
1648  m_send_buffer,
1649  m_write_frame_handler
1650  );
1651 }
1652 
1653 template <typename config>
1654 void connection<config>::handle_write_frame(lib::error_code const & ec)
1655 {
1656  if (m_alog.static_test(log::alevel::devel)) {
1657  m_alog.write(log::alevel::devel,"connection handle_write_frame");
1658  }
1659 
1660  bool terminal = m_current_msgs.back()->get_terminal();
1661 
1662  m_send_buffer.clear();
1663  m_current_msgs.clear();
1664  // TODO: recycle instead of deleting
1665 
1666  if (ec) {
1667  log_err(log::elevel::fatal,"handle_write_frame",ec);
1668  this->terminate(ec);
1669  return;
1670  }
1671 
1672  if (terminal) {
1673  this->terminate(lib::error_code());
1674  return;
1675  }
1676 
1677  bool needs_writing = false;
1678  {
1679  scoped_lock_type lock(m_write_lock);
1680 
1681  // release write flag
1682  m_write_flag = false;
1683 
1684  needs_writing = !m_send_queue.empty();
1685  }
1686 
1687  if (needs_writing) {
1688  transport_con_type::dispatch(lib::bind(
1689  &type::write_frame,
1690  type::get_shared()
1691  ));
1692  }
1693 }
1694 
1695 template <typename config>
1696 void connection<config>::atomic_state_change(istate_type req, istate_type dest,
1697  std::string msg)
1698 {
1699  scoped_lock_type lock(m_connection_state_lock);
1700 
1701  if (m_internal_state != req) {
1702  throw exception(msg,error::make_error_code(error::invalid_state));
1703  }
1704 
1705  m_internal_state = dest;
1706 }
1707 
1708 template <typename config>
1709 void connection<config>::atomic_state_change(istate_type internal_req,
1710  istate_type internal_dest, session::state::value external_req,
1711  session::state::value external_dest, std::string msg)
1712 {
1713  scoped_lock_type lock(m_connection_state_lock);
1714 
1715  if (m_internal_state != internal_req || m_state != external_req) {
1716  throw exception(msg,error::make_error_code(error::invalid_state));
1717  }
1718 
1719  m_internal_state = internal_dest;
1720  m_state = external_dest;
1721 }
1722 
1723 template <typename config>
1724 void connection<config>::atomic_state_check(istate_type req, std::string msg)
1725 {
1726  scoped_lock_type lock(m_connection_state_lock);
1727 
1728  if (m_internal_state != req) {
1729  throw exception(msg,error::make_error_code(error::invalid_state));
1730  }
1731 }
1732 
1733 template <typename config>
1734 const std::vector<int>& connection<config>::get_supported_versions() const
1735 {
1736  return versions_supported;
1737 }
1738 
1739 template <typename config>
1740 void connection<config>::process_control_frame(typename config::message_type::ptr msg)
1741 {
1742  m_alog.write(log::alevel::devel,"process_control_frame");
1743 
1744  frame::opcode::value op = msg->get_opcode();
1745  lib::error_code ec;
1746 
1747  std::stringstream s;
1748  s << "Control frame received with opcode " << op;
1749  m_alog.write(log::alevel::control,s.str());
1750 
1751  if (m_state == session::state::closed) {
1752  m_elog.write(log::elevel::warn,"got frame in state closed");
1753  return;
1754  }
1755  if (op != frame::opcode::CLOSE && m_state != session::state::open) {
1756  m_elog.write(log::elevel::warn,"got non-close frame in state closing");
1757  return;
1758  }
1759 
1760  if (op == frame::opcode::PING) {
1761  bool should_reply = true;
1762 
1763  if (m_ping_handler) {
1764  should_reply = m_ping_handler(m_connection_hdl, msg->get_payload());
1765  }
1766 
1767  if (should_reply) {
1768  this->pong(msg->get_payload(),ec);
1769  if (ec) {
1770  log_err(log::elevel::devel,"Failed to send response pong",ec);
1771  }
1772  }
1773  } else if (op == frame::opcode::PONG) {
1774  if (m_pong_handler) {
1775  m_pong_handler(m_connection_hdl, msg->get_payload());
1776  }
1777  if (m_ping_timer) {
1778  m_ping_timer->cancel();
1779  }
1780  } else if (op == frame::opcode::CLOSE) {
1781  m_alog.write(log::alevel::devel,"got close frame");
1782  // record close code and reason somewhere
1783 
1784  m_remote_close_code = close::extract_code(msg->get_payload(),ec);
1785  if (ec) {
1786  s.str("");
1787  if (config::drop_on_protocol_error) {
1788  s << "Received invalid close code " << m_remote_close_code
1789  << " dropping connection per config.";
1790  m_elog.write(log::elevel::devel,s.str());
1791  this->terminate(ec);
1792  } else {
1793  s << "Received invalid close code " << m_remote_close_code
1794  << " sending acknowledgement and closing";
1795  m_elog.write(log::elevel::devel,s.str());
1796  ec = send_close_ack(close::status::protocol_error,
1797  "Invalid close code");
1798  if (ec) {
1799  log_err(log::elevel::devel,"send_close_ack",ec);
1800  }
1801  }
1802  return;
1803  }
1804 
1805  m_remote_close_reason = close::extract_reason(msg->get_payload(),ec);
1806  if (ec) {
1807  if (config::drop_on_protocol_error) {
1808  m_elog.write(log::elevel::devel,
1809  "Received invalid close reason. Dropping connection per config");
1810  this->terminate(ec);
1811  } else {
1812  m_elog.write(log::elevel::devel,
1813  "Received invalid close reason. Sending acknowledgement and closing");
1814  ec = send_close_ack(close::status::protocol_error,
1815  "Invalid close reason");
1816  if (ec) {
1817  log_err(log::elevel::devel,"send_close_ack",ec);
1818  }
1819  }
1820  return;
1821  }
1822 
1823  if (m_state == session::state::open) {
1824  s.str("");
1825  s << "Received close frame with code " << m_remote_close_code
1826  << " and reason " << m_remote_close_reason;
1827  m_alog.write(log::alevel::devel,s.str());
1828 
1829  ec = send_close_ack();
1830  if (ec) {
1831  log_err(log::elevel::devel,"send_close_ack",ec);
1832  }
1833  } else if (m_state == session::state::closing && !m_was_clean) {
1834  // ack of our close
1835  m_alog.write(log::alevel::devel, "Got acknowledgement of close");
1836 
1837  m_was_clean = true;
1838 
1839  // If we are a server terminate the connection now. Clients should
1840  // leave the connection open to give the server an opportunity to
1841  // initiate the TCP close. The client's timer will handle closing
1842  // its side of the connection if the server misbehaves.
1843  //
1844  // TODO: different behavior if the underlying transport doesn't
1845  // support timers?
1846  if (m_is_server) {
1847  terminate(lib::error_code());
1848  }
1849  } else {
1850  // spurious, ignore
1851  m_elog.write(log::elevel::devel, "Got close frame in wrong state");
1852  }
1853  } else {
1854  // got an invalid control opcode
1855  m_elog.write(log::elevel::devel, "Got control frame with invalid opcode");
1856  // initiate protocol error shutdown
1857  }
1858 }
1859 
1860 template <typename config>
1861 lib::error_code connection<config>::send_close_ack(close::status::value code,
1862  std::string const & reason)
1863 {
1864  return send_close_frame(code,reason,true,m_is_server);
1865 }
1866 
1867 template <typename config>
1868 lib::error_code connection<config>::send_close_frame(close::status::value code,
1869  std::string const & reason, bool ack, bool terminal)
1870 {
1871  m_alog.write(log::alevel::devel,"send_close_frame");
1872 
1873  // check for special codes
1874 
1875  // If silent close is set, respect it and blank out close information
1876  // Otherwise use whatever has been specified in the parameters. If
1877  // parameters specifies close::status::blank then determine what to do
1878  // based on whether or not this is an ack. If it is not an ack just
1879  // send blank info. If it is an ack then echo the close information from
1880  // the remote endpoint.
1881  if (config::silent_close) {
1882  m_alog.write(log::alevel::devel,"closing silently");
1883  m_local_close_code = close::status::no_status;
1884  m_local_close_reason = "";
1885  } else if (code != close::status::blank) {
1886  m_alog.write(log::alevel::devel,"closing with specified codes");
1887  m_local_close_code = code;
1888  m_local_close_reason = reason;
1889  } else if (!ack) {
1890  m_alog.write(log::alevel::devel,"closing with no status code");
1891  m_local_close_code = close::status::no_status;
1892  m_local_close_reason = "";
1893  } else if (m_remote_close_code == close::status::no_status) {
1894  m_alog.write(log::alevel::devel,
1895  "acknowledging a no-status close with normal code");
1896  m_local_close_code = close::status::normal;
1897  m_local_close_reason = "";
1898  } else {
1899  m_alog.write(log::alevel::devel,"acknowledging with remote codes");
1900  m_local_close_code = m_remote_close_code;
1901  m_local_close_reason = m_remote_close_reason;
1902  }
1903 
1904  std::stringstream s;
1905  s << "Closing with code: " << m_local_close_code << ", and reason: "
1906  << m_local_close_reason;
1907  m_alog.write(log::alevel::devel,s.str());
1908 
1909  message_ptr msg = m_msg_manager->get_message();
1910  if (!msg) {
1911  return error::make_error_code(error::no_outgoing_buffers);
1912  }
1913 
1914  lib::error_code ec = m_processor->prepare_close(m_local_close_code,
1915  m_local_close_reason,msg);
1916  if (ec) {
1917  return ec;
1918  }
1919 
1920  // Messages flagged terminal will result in the TCP connection being dropped
1921  // after the message has been written. This is typically used when servers
1922  // send an ack and when any endpoint encounters a protocol error
1923  if (terminal) {
1924  msg->set_terminal(true);
1925  }
1926 
1927  m_state = session::state::closing;
1928 
1929  if (ack) {
1930  m_was_clean = true;
1931  }
1932 
1933  // Start a timer so we don't wait forever for the acknowledgement close
1934  // frame
1935  if (m_close_handshake_timeout_dur > 0) {
1936  m_handshake_timer = transport_con_type::set_timer(
1937  m_close_handshake_timeout_dur,
1938  lib::bind(
1939  &type::handle_close_handshake_timeout,
1940  type::get_shared(),
1941  lib::placeholders::_1
1942  )
1943  );
1944  }
1945 
1946  bool needs_writing = false;
1947  {
1948  scoped_lock_type lock(m_write_lock);
1949  write_push(msg);
1950  needs_writing = !m_write_flag && !m_send_queue.empty();
1951  }
1952 
1953  if (needs_writing) {
1954  transport_con_type::dispatch(lib::bind(
1955  &type::write_frame,
1956  type::get_shared()
1957  ));
1958  }
1959 
1960  return lib::error_code();
1961 }
1962 
1963 template <typename config>
1964 typename connection<config>::processor_ptr
1965 connection<config>::get_processor(int version) const {
1966  // TODO: allow disabling certain versions
1967 
1968  processor_ptr p;
1969 
1970  switch (version) {
1971  case 0:
1972  p = lib::make_shared<processor::hybi00<config> >(
1973  transport_con_type::is_secure(),
1974  m_is_server,
1975  m_msg_manager
1976  );
1977  break;
1978  case 7:
1979  p = lib::make_shared<processor::hybi07<config> >(
1980  transport_con_type::is_secure(),
1981  m_is_server,
1982  m_msg_manager,
1983  lib::ref(m_rng)
1984  );
1985  break;
1986  case 8:
1987  p = lib::make_shared<processor::hybi08<config> >(
1988  transport_con_type::is_secure(),
1989  m_is_server,
1990  m_msg_manager,
1991  lib::ref(m_rng)
1992  );
1993  break;
1994  case 13:
1995  p = lib::make_shared<processor::hybi13<config> >(
1996  transport_con_type::is_secure(),
1997  m_is_server,
1998  m_msg_manager,
1999  lib::ref(m_rng)
2000  );
2001  break;
2002  default:
2003  return p;
2004  }
2005 
2006  // Settings not configured by the constructor
2007  p->set_max_message_size(m_max_message_size);
2008 
2009  return p;
2010 }
2011 
2012 template <typename config>
2013 void connection<config>::write_push(typename config::message_type::ptr msg)
2014 {
2015  if (!msg) {
2016  return;
2017  }
2018 
2019  m_send_buffer_size += msg->get_payload().size();
2020  m_send_queue.push(msg);
2021 
2022  if (m_alog.static_test(log::alevel::devel)) {
2023  std::stringstream s;
2024  s << "write_push: message count: " << m_send_queue.size()
2025  << " buffer size: " << m_send_buffer_size;
2026  m_alog.write(log::alevel::devel,s.str());
2027  }
2028 }
2029 
2030 template <typename config>
2031 typename config::message_type::ptr connection<config>::write_pop()
2032 {
2033  message_ptr msg;
2034 
2035  if (m_send_queue.empty()) {
2036  return msg;
2037  }
2038 
2039  msg = m_send_queue.front();
2040 
2041  m_send_buffer_size -= msg->get_payload().size();
2042  m_send_queue.pop();
2043 
2044  if (m_alog.static_test(log::alevel::devel)) {
2045  std::stringstream s;
2046  s << "write_pop: message count: " << m_send_queue.size()
2047  << " buffer size: " << m_send_buffer_size;
2048  m_alog.write(log::alevel::devel,s.str());
2049  }
2050  return msg;
2051 }
2052 
2053 template <typename config>
2054 void connection<config>::log_open_result()
2055 {
2056  std::stringstream s;
2057 
2058  int version;
2059  if (!processor::is_websocket_handshake(m_request)) {
2060  version = -1;
2061  } else {
2062  version = processor::get_websocket_version(m_request);
2063  }
2064 
2065  // Connection Type
2066  s << (version == -1 ? "HTTP" : "WebSocket") << " Connection ";
2067 
2068  // Remote endpoint address
2069  s << transport_con_type::get_remote_endpoint() << " ";
2070 
2071  // Version string if WebSocket
2072  if (version != -1) {
2073  s << "v" << version << " ";
2074  }
2075 
2076  // User Agent
2077  std::string ua = m_request.get_header("User-Agent");
2078  if (ua == "") {
2079  s << "\"\" ";
2080  } else {
2081  // check if there are any quotes in the user agent
2082  s << "\"" << utility::string_replace_all(ua,"\"","\\\"") << "\" ";
2083  }
2084 
2085  // URI
2086  s << (m_uri ? m_uri->get_resource() : "NULL") << " ";
2087 
2088  // Status code
2089  s << m_response.get_status_code();
2090 
2091  m_alog.write(log::alevel::connect,s.str());
2092 }
2093 
2094 template <typename config>
2095 void connection<config>::log_close_result()
2096 {
2097  std::stringstream s;
2098 
2099  s << "Disconnect "
2100  << "close local:[" << m_local_close_code
2101  << (m_local_close_reason == "" ? "" : ","+m_local_close_reason)
2102  << "] remote:[" << m_remote_close_code
2103  << (m_remote_close_reason == "" ? "" : ","+m_remote_close_reason) << "]";
2104 
2105  m_alog.write(log::alevel::disconnect,s.str());
2106 }
2107 
2108 template <typename config>
2109 void connection<config>::log_fail_result()
2110 {
2111  // TODO: include more information about the connection?
2112  // should this be filed under connect rather than disconnect?
2113  m_alog.write(log::alevel::disconnect,"Failed: "+m_ec.message());
2114 }
2115 
2116 } // namespace websocketpp
2117 
2118 #endif // WEBSOCKETPP_CONNECTION_IMPL_HPP
bool is_control(value v)
Check if an opcode is for a control frame.
Definition: frame.hpp:138
void add_subprotocol(std::string const &request, lib::error_code &ec)
Adds the given subprotocol string to the request list (exception free)
static level const fatal
Definition: levels.hpp:78
void set_termination_handler(termination_handler new_handler)
void read_frame()
Issue a new transport read unless reading is paused.
std::string const & get_subprotocol() const
Gets the negotated subprotocol.
uint16_t value
The type of a close code value.
Definition: close.hpp:49
std::string const & get_origin() const
Return the same origin policy origin value from the opening request.
static level const control
One line per control frame.
Definition: levels.hpp:125
bool terminal(value code)
Determine if the code represents an unrecoverable error.
Definition: close.hpp:197
uri_ptr get_uri_from_host(request_type &request, std::string scheme)
Extract a URI ptr from the host header of the request.
Definition: processor.hpp:130
lib::error_code pause_reading()
Pause reading of new data.
bool is_websocket_handshake(request_type &r)
Determine whether or not a generic HTTP request is a WebSocket handshake.
Definition: processor.hpp:66
Attempted to use a client specific feature on a server endpoint.
Definition: error.hpp:103
session::state::value get_state() const
Return the connection state.
static std::vector< int > const versions_supported(helper, helper+4)
Container that stores the list of protocol versions supported.
Selected subprotocol was not requested by the client.
Definition: error.hpp:100
int get_websocket_version(request_type &r)
Extract the version from a WebSocket handshake request.
Definition: processor.hpp:105
uri_ptr get_uri() const
Gets the connection URI.
void replace_header(std::string const &key, std::string const &val)
Replace a header.
void ping(std::string const &payload)
Send a ping.
Represents an individual WebSocket connection.
Definition: connection.hpp:221
size_t get_buffered_amount() const
Get the size of the outgoing write buffer (in payload bytes)
std::string const & get_request_header(std::string const &key)
Retrieve a request header.
static level const frame_payload
One line per frame, includes the full message payload (warning: chatty)
Definition: levels.hpp:129
static value const protocol_error
A protocol error occurred.
Definition: close.hpp:83
static value const normal
Definition: close.hpp:76
static level const devel
Low level debugging information (warning: very chatty)
Definition: levels.hpp:63
std::string string_replace_all(std::string subject, const std::string &search, const std::string &replace)
Replace all occurrances of a substring with another.
status::value extract_code(std::string const &payload, lib::error_code &ec)
Extract a close code value from a close payload.
Definition: close.hpp:264
void select_subprotocol(std::string const &value, lib::error_code &ec)
Select a subprotocol to use (exception free)
std::string to_hex(const std::string &input)
Convert std::string to ascii printed string of hex digits.
void pong(std::string const &payload)
Send a pong.
bool is_not_token_char(unsigned char c)
Is the character a non-token.
Definition: constants.hpp:98
void atomic_state_check(istate_type req, std::string msg)
Atomically read and compared the internal state.
void handle_resume_reading()
Resume reading callback.
static level const frame_header
One line per frame, includes the full frame header.
Definition: levels.hpp:127
std::string const & get_resource() const
Returns the resource component of the connection URI.
static level const devel
Development messages (warning: very chatty)
Definition: levels.hpp:141
static level const disconnect
One line for each closed connection. Includes closing codes and reasons.
Definition: levels.hpp:123
const std::vector< int > & get_supported_versions() const
Get array of WebSocket protocol versions that this connection supports.
std::string const & get_host() const
Returns the host component of the connection URI.
lib::error_code resume_reading()
Resume reading of new data.
void close(close::status::value const code, std::string const &reason)
Close the connection.
void atomic_state_change(istate_type req, istate_type dest, std::string msg)
Atomically change the internal connection state.
void set_status(http::status_code::value code)
Set response status code and message.
void handle_pong_timeout(std::string payload, lib::error_code const &ec)
Utility method that gets called back when the ping timer expires.
close::status::value to_ws(lib::error_code ec)
Converts a processor error_code into a websocket close code.
Definition: base.hpp:256
void append_header(std::string const &key, std::string const &val)
Append a header.
lib::error_code send(std::string const &payload, frame::opcode::value op=frame::opcode::text)
Create a message and then add it to the outgoing send queue.
The connection was in the wrong state for this operation.
Definition: error.hpp:72
static level const info
Definition: levels.hpp:69
void write_frame()
Checks if there are frames in the send queue and if there are sends one.
void handle_write_frame(lib::error_code const &ec)
Process the results of a frame write operation and start the next write.
Namespace for the WebSocket++ project.
Definition: base64.hpp:41
uint16_t get_port() const
Returns the port component of the connection URI.
std::string const & get_response_header(std::string const &key)
Retrieve a response header.
A simple utility buffer class.
Definition: connection.hpp:137
The endpoint is out of outgoing message buffers.
Definition: error.hpp:66
lib::shared_ptr< uri > uri_ptr
Pointer to a URI.
Definition: uri.hpp:350
void handle_interrupt()
Transport inturrupt callback.
static value const no_status
A dummy value to indicate that no status code was received.
Definition: close.hpp:97
void handle_pause_reading()
Pause reading callback.
WebSocket close handshake timed out.
Definition: error.hpp:115
Catch-all library error.
Definition: error.hpp:45
std::vector< std::string > const & get_requested_subprotocols() const
Gets all of the subprotocols requested by the client.
bool get_secure() const
Returns the secure flag from the connection URI.
static value const abnormal_close
A dummy value to indicate that the connection was closed abnormally.
Definition: close.hpp:104
static uint8_t const close_reason_size
Maximum size of close frame reason.
Definition: frame.hpp:168
void set_uri(uri_ptr uri)
Sets the connection URI.
WebSocket opening handshake timed out.
Definition: error.hpp:112
Attempted to use a server specific feature on a client endpoint.
Definition: error.hpp:106
lib::error_code interrupt()
Asyncronously invoke handler::on_inturrupt.
std::string extract_reason(std::string const &payload, lib::error_code &ec)
Extract the reason string from a close payload.
Definition: close.hpp:303
static level const rerror
Definition: levels.hpp:75
static level const warn
Definition: levels.hpp:72
static value const blank
A blank value for internal use.
Definition: close.hpp:52
static level const connect
Information about new connections.
Definition: levels.hpp:121
void remove_header(std::string const &key)
Remove a header.
void set_body(std::string const &value)
Set response body content.