28 #ifndef WEBSOCKETPP_TRANSPORT_ASIO_CON_HPP 29 #define WEBSOCKETPP_TRANSPORT_ASIO_CON_HPP 31 #include <websocketpp/transport/asio/base.hpp> 33 #include <websocketpp/transport/base/connection.hpp> 35 #include <websocketpp/logger/levels.hpp> 36 #include <websocketpp/http/constants.hpp> 38 #include <websocketpp/base64/base64.hpp> 39 #include <websocketpp/error.hpp> 40 #include <websocketpp/uri.hpp> 42 #include <websocketpp/common/asio.hpp> 43 #include <websocketpp/common/chrono.hpp> 44 #include <websocketpp/common/cpp11.hpp> 45 #include <websocketpp/common/memory.hpp> 46 #include <websocketpp/common/functional.hpp> 47 #include <websocketpp/common/connection_hdl.hpp> 54 namespace websocketpp {
58 typedef lib::function<
void(connection_hdl)> tcp_init_handler;
66 template <
typename config>
67 class connection :
public config::socket_type::socket_con_type {
70 typedef connection<config> type;
72 typedef lib::shared_ptr<type> ptr;
75 typedef typename config::socket_type::socket_con_type socket_con_type;
77 typedef typename socket_con_type::ptr socket_con_ptr;
79 typedef typename config::alog_type alog_type;
81 typedef typename config::elog_type elog_type;
83 typedef typename config::request_type request_type;
84 typedef typename request_type::ptr request_ptr;
85 typedef typename config::response_type response_type;
86 typedef typename response_type::ptr response_ptr;
89 typedef lib::asio::io_service * io_service_ptr;
91 typedef lib::shared_ptr<lib::asio::io_service::strand> strand_ptr;
93 typedef lib::shared_ptr<lib::asio::steady_timer> timer_ptr;
98 friend class endpoint<config>;
101 explicit connection(
bool is_server,
const lib::shared_ptr<alog_type> & alog,
const lib::shared_ptr<elog_type> & elog)
102 : m_is_server(is_server)
106 m_alog->write(log::alevel::devel,
"asio con transport constructor");
111 return lib::static_pointer_cast<type>(socket_con_type::get_shared());
114 bool is_secure()
const {
115 return socket_con_type::is_secure();
130 void set_uri(uri_ptr u) {
131 socket_con_type::set_uri(u);
144 void set_tcp_pre_init_handler(tcp_init_handler h) {
145 m_tcp_pre_init_handler = h;
158 void set_tcp_init_handler(tcp_init_handler h) {
159 set_tcp_pre_init_handler(h);
173 void set_tcp_post_init_handler(tcp_init_handler h) {
174 m_tcp_post_init_handler = h;
189 void set_proxy(std::string
const & uri, lib::error_code & ec) {
193 m_proxy_data = lib::make_shared<proxy_data>();
194 ec = lib::error_code();
198 void set_proxy(std::string
const & uri) {
201 if (ec) {
throw exception(ec); }
217 void set_proxy_basic_auth(std::string
const & username, std::string
const &
218 password, lib::error_code & ec)
221 ec = make_error_code(websocketpp::error::invalid_state);
226 std::string val =
"Basic "+base64_encode(username +
":" + password);
227 m_proxy_data->req.replace_header(
"Proxy-Authorization",val);
228 ec = lib::error_code();
232 void set_proxy_basic_auth(std::string
const & username, std::string
const &
236 set_proxy_basic_auth(username,password,ec);
237 if (ec) {
throw exception(ec); }
250 void set_proxy_timeout(
long duration, lib::error_code & ec) {
252 ec = make_error_code(websocketpp::error::invalid_state);
256 m_proxy_data->timeout_proxy = duration;
257 ec = lib::error_code();
261 void set_proxy_timeout(
long duration) {
263 set_proxy_timeout(duration,ec);
264 if (ec) {
throw exception(ec); }
267 std::string
const & get_proxy()
const {
281 std::string get_remote_endpoint()
const {
284 std::string ret = socket_con_type::get_remote_endpoint(ec);
287 m_elog->write(log::elevel::info,ret);
295 connection_hdl get_handle()
const {
296 return m_connection_hdl;
313 timer_ptr set_timer(
long duration, timer_handler callback) {
314 timer_ptr new_timer = lib::make_shared<lib::asio::steady_timer>(
315 lib::ref(*m_io_service),
316 lib::asio::milliseconds(duration)
319 if (config::enable_multithreading) {
320 new_timer->async_wait(m_strand->wrap(lib::bind(
321 &type::handle_timer, get_shared(),
324 lib::placeholders::_1
327 new_timer->async_wait(lib::bind(
328 &type::handle_timer, get_shared(),
331 lib::placeholders::_1
349 void handle_timer(timer_ptr, timer_handler callback,
350 lib::asio::error_code
const & ec)
353 if (ec == lib::asio::error::operation_aborted) {
354 callback(make_error_code(transport::error::operation_aborted));
356 log_err(log::elevel::info,
"asio handle_timer",ec);
357 callback(make_error_code(error::pass_through));
360 callback(lib::error_code());
365 strand_ptr get_strand() {
388 lib::asio::error_code get_transport_ec()
const {
410 void init(init_handler callback) {
411 if (m_alog->static_test(log::alevel::devel)) {
412 m_alog->write(log::alevel::devel,
"asio connection init");
418 socket_con_type::pre_init(
420 &type::handle_pre_init,
423 lib::placeholders::_1
436 lib::error_code proxy_init(std::string
const & authority) {
438 return websocketpp::error::make_error_code(
439 websocketpp::error::invalid_state);
441 m_proxy_data->req.set_version(
"HTTP/1.1");
442 m_proxy_data->req.set_method(
"CONNECT");
444 m_proxy_data->req.set_uri(authority);
445 m_proxy_data->req.replace_header(
"Host",authority);
447 return lib::error_code();
460 lib::error_code init_asio (io_service_ptr io_service) {
461 m_io_service = io_service;
463 if (config::enable_multithreading) {
464 m_strand = lib::make_shared<lib::asio::io_service::strand>(
465 lib::ref(*io_service));
468 lib::error_code ec = socket_con_type::init_asio(io_service, m_strand,
474 void handle_pre_init(init_handler callback, lib::error_code
const & ec) {
475 if (m_alog->static_test(log::alevel::devel)) {
476 m_alog->write(log::alevel::devel,
"asio connection handle pre_init");
479 if (m_tcp_pre_init_handler) {
480 m_tcp_pre_init_handler(m_connection_hdl);
489 if (!m_proxy.empty()) {
490 proxy_write(callback);
496 void post_init(init_handler callback) {
497 if (m_alog->static_test(log::alevel::devel)) {
498 m_alog->write(log::alevel::devel,
"asio connection post_init");
501 timer_ptr post_timer;
503 if (config::timeout_socket_post_init > 0) {
504 post_timer = set_timer(
505 config::timeout_socket_post_init,
507 &type::handle_post_init_timeout,
511 lib::placeholders::_1
516 socket_con_type::post_init(
518 &type::handle_post_init,
522 lib::placeholders::_1
536 void handle_post_init_timeout(timer_ptr, init_handler callback,
537 lib::error_code
const & ec)
539 lib::error_code ret_ec;
542 if (ec == transport::error::operation_aborted) {
543 m_alog->write(log::alevel::devel,
544 "asio post init timer cancelled");
548 log_err(log::elevel::devel,
"asio handle_post_init_timeout",ec);
551 if (socket_con_type::get_ec()) {
552 ret_ec = socket_con_type::get_ec();
554 ret_ec = make_error_code(transport::error::timeout);
558 m_alog->write(log::alevel::devel,
"Asio transport post-init timed out");
559 cancel_socket_checked();
572 void handle_post_init(timer_ptr post_timer, init_handler callback,
573 lib::error_code
const & ec)
575 if (ec == transport::error::operation_aborted ||
576 (post_timer && lib::asio::is_neg(post_timer->expires_from_now())))
578 m_alog->write(log::alevel::devel,
"post_init cancelled");
583 post_timer->cancel();
586 if (m_alog->static_test(log::alevel::devel)) {
587 m_alog->write(log::alevel::devel,
"asio connection handle_post_init");
590 if (m_tcp_post_init_handler) {
591 m_tcp_post_init_handler(m_connection_hdl);
597 void proxy_write(init_handler callback) {
598 if (m_alog->static_test(log::alevel::devel)) {
599 m_alog->write(log::alevel::devel,
"asio connection proxy_write");
603 m_elog->write(log::elevel::library,
604 "assertion failed: !m_proxy_data in asio::connection::proxy_write");
605 callback(make_error_code(error::general));
609 m_proxy_data->write_buf = m_proxy_data->req.raw();
611 m_bufs.push_back(lib::asio::buffer(m_proxy_data->write_buf.data(),
612 m_proxy_data->write_buf.size()));
614 m_alog->write(log::alevel::devel,m_proxy_data->write_buf);
617 m_proxy_data->timer =
this->set_timer(
618 m_proxy_data->timeout_proxy,
620 &type::handle_proxy_timeout,
623 lib::placeholders::_1
628 if (config::enable_multithreading) {
629 lib::asio::async_write(
630 socket_con_type::get_next_layer(),
632 m_strand->wrap(lib::bind(
633 &type::handle_proxy_write, get_shared(),
635 lib::placeholders::_1
639 lib::asio::async_write(
640 socket_con_type::get_next_layer(),
643 &type::handle_proxy_write, get_shared(),
645 lib::placeholders::_1
651 void handle_proxy_timeout(init_handler callback, lib::error_code
const & ec)
653 if (ec == transport::error::operation_aborted) {
654 m_alog->write(log::alevel::devel,
655 "asio handle_proxy_write timer cancelled");
658 log_err(log::elevel::devel,
"asio handle_proxy_write",ec);
661 m_alog->write(log::alevel::devel,
662 "asio handle_proxy_write timer expired");
663 cancel_socket_checked();
664 callback(make_error_code(transport::error::timeout));
668 void handle_proxy_write(init_handler callback,
669 lib::asio::error_code
const & ec)
671 if (m_alog->static_test(log::alevel::devel)) {
672 m_alog->write(log::alevel::devel,
673 "asio connection handle_proxy_write");
681 if (ec == lib::asio::error::operation_aborted ||
682 lib::asio::is_neg(m_proxy_data->timer->expires_from_now()))
684 m_elog->write(log::elevel::devel,
"write operation aborted");
689 log_err(log::elevel::info,
"asio handle_proxy_write",ec);
690 m_proxy_data->timer->cancel();
691 callback(make_error_code(error::pass_through));
695 proxy_read(callback);
698 void proxy_read(init_handler callback) {
699 if (m_alog->static_test(log::alevel::devel)) {
700 m_alog->write(log::alevel::devel,
"asio connection proxy_read");
704 m_elog->write(log::elevel::library,
705 "assertion failed: !m_proxy_data in asio::connection::proxy_read");
706 m_proxy_data->timer->cancel();
707 callback(make_error_code(error::general));
711 if (config::enable_multithreading) {
712 lib::asio::async_read_until(
713 socket_con_type::get_next_layer(),
714 m_proxy_data->read_buf,
716 m_strand->wrap(lib::bind(
717 &type::handle_proxy_read, get_shared(),
719 lib::placeholders::_1, lib::placeholders::_2
723 lib::asio::async_read_until(
724 socket_con_type::get_next_layer(),
725 m_proxy_data->read_buf,
728 &type::handle_proxy_read, get_shared(),
730 lib::placeholders::_1, lib::placeholders::_2
742 void handle_proxy_read(init_handler callback,
743 lib::asio::error_code
const & ec, size_t)
745 if (m_alog->static_test(log::alevel::devel)) {
746 m_alog->write(log::alevel::devel,
747 "asio connection handle_proxy_read");
753 if (ec == lib::asio::error::operation_aborted ||
754 lib::asio::is_neg(m_proxy_data->timer->expires_from_now()))
756 m_elog->write(log::elevel::devel,
"read operation aborted");
761 m_proxy_data->timer->cancel();
764 m_elog->write(log::elevel::info,
765 "asio handle_proxy_read error: "+ec.message());
766 callback(make_error_code(error::pass_through));
769 m_elog->write(log::elevel::library,
770 "assertion failed: !m_proxy_data in asio::connection::handle_proxy_read");
771 callback(make_error_code(error::general));
775 std::istream input(&m_proxy_data->read_buf);
777 m_proxy_data->res.consume(input);
779 if (!m_proxy_data->res.headers_ready()) {
782 callback(make_error_code(error::general));
786 m_alog->write(log::alevel::devel,m_proxy_data->res.raw());
788 if (m_proxy_data->res.get_status_code() != http::status_code::ok) {
793 s <<
"Proxy connection error: " 794 << m_proxy_data->res.get_status_code()
796 << m_proxy_data->res.get_status_msg()
798 m_elog->write(log::elevel::info,s.str());
799 callback(make_error_code(error::proxy_failed));
812 m_proxy_data.reset();
820 void async_read_at_least(size_t num_bytes,
char *buf, size_t len,
821 read_handler handler)
823 if (m_alog->static_test(log::alevel::devel)) {
825 s <<
"asio async_read_at_least: " << num_bytes;
826 m_alog->write(log::alevel::devel,s.str());
839 if (config::enable_multithreading) {
840 lib::asio::async_read(
841 socket_con_type::get_socket(),
842 lib::asio::buffer(buf,len),
843 lib::asio::transfer_at_least(num_bytes),
844 m_strand->wrap(make_custom_alloc_handler(
845 m_read_handler_allocator,
847 &type::handle_async_read, get_shared(),
849 lib::placeholders::_1, lib::placeholders::_2
854 lib::asio::async_read(
855 socket_con_type::get_socket(),
856 lib::asio::buffer(buf,len),
857 lib::asio::transfer_at_least(num_bytes),
858 make_custom_alloc_handler(
859 m_read_handler_allocator,
861 &type::handle_async_read, get_shared(),
863 lib::placeholders::_1, lib::placeholders::_2
871 void handle_async_read(read_handler handler, lib::asio::error_code
const & ec,
872 size_t bytes_transferred)
874 m_alog->write(log::alevel::devel,
"asio con handle_async_read");
878 if (ec == lib::asio::error::eof) {
879 tec = make_error_code(transport::error::eof);
883 tec = socket_con_type::translate_ec(ec);
886 if (tec == transport::error::tls_error ||
887 tec == transport::error::pass_through)
892 log_err(log::elevel::info,
"asio async_read_at_least",ec);
896 handler(tec,bytes_transferred);
900 m_alog->write(log::alevel::devel,
901 "handle_async_read called with null read handler");
906 void async_write(
const char* buf, size_t len, write_handler handler) {
907 m_bufs.push_back(lib::asio::buffer(buf,len));
909 if (config::enable_multithreading) {
910 lib::asio::async_write(
911 socket_con_type::get_socket(),
913 m_strand->wrap(make_custom_alloc_handler(
914 m_write_handler_allocator,
916 &type::handle_async_write, get_shared(),
918 lib::placeholders::_1, lib::placeholders::_2
923 lib::asio::async_write(
924 socket_con_type::get_socket(),
926 make_custom_alloc_handler(
927 m_write_handler_allocator,
929 &type::handle_async_write, get_shared(),
931 lib::placeholders::_1, lib::placeholders::_2
939 void async_write(std::vector<buffer>
const & bufs, write_handler handler) {
940 std::vector<buffer>::const_iterator it;
942 for (it = bufs.begin(); it != bufs.end(); ++it) {
943 m_bufs.push_back(lib::asio::buffer((*it).buf,(*it).len));
946 if (config::enable_multithreading) {
947 lib::asio::async_write(
948 socket_con_type::get_socket(),
950 m_strand->wrap(make_custom_alloc_handler(
951 m_write_handler_allocator,
953 &type::handle_async_write, get_shared(),
955 lib::placeholders::_1, lib::placeholders::_2
960 lib::asio::async_write(
961 socket_con_type::get_socket(),
963 make_custom_alloc_handler(
964 m_write_handler_allocator,
966 &type::handle_async_write, get_shared(),
968 lib::placeholders::_1, lib::placeholders::_2
980 void handle_async_write(write_handler handler, lib::asio::error_code
const & ec, size_t) {
984 log_err(log::elevel::info,
"asio async_write",ec);
985 tec = make_error_code(transport::error::pass_through);
992 m_alog->write(log::alevel::devel,
993 "handle_async_write called with null write handler");
1004 void set_handle(connection_hdl hdl) {
1005 m_connection_hdl = hdl;
1006 socket_con_type::set_handle(hdl);
1013 lib::error_code interrupt(interrupt_handler handler) {
1014 if (config::enable_multithreading) {
1015 m_io_service->post(m_strand->wrap(handler));
1017 m_io_service->post(handler);
1019 return lib::error_code();
1022 lib::error_code dispatch(dispatch_handler handler) {
1023 if (config::enable_multithreading) {
1024 m_io_service->post(m_strand->wrap(handler));
1026 m_io_service->post(handler);
1028 return lib::error_code();
1036 void async_shutdown(shutdown_handler callback) {
1037 if (m_alog->static_test(log::alevel::devel)) {
1038 m_alog->write(log::alevel::devel,
"asio connection async_shutdown");
1041 timer_ptr shutdown_timer;
1042 shutdown_timer = set_timer(
1043 config::timeout_socket_shutdown,
1045 &type::handle_async_shutdown_timeout,
1049 lib::placeholders::_1
1053 socket_con_type::async_shutdown(
1055 &type::handle_async_shutdown,
1059 lib::placeholders::_1
1070 void handle_async_shutdown_timeout(timer_ptr, init_handler callback,
1071 lib::error_code
const & ec)
1073 lib::error_code ret_ec;
1076 if (ec == transport::error::operation_aborted) {
1077 m_alog->write(log::alevel::devel,
1078 "asio socket shutdown timer cancelled");
1082 log_err(log::elevel::devel,
"asio handle_async_shutdown_timeout",ec);
1085 ret_ec = make_error_code(transport::error::timeout);
1088 m_alog->write(log::alevel::devel,
1089 "Asio transport socket shutdown timed out");
1090 cancel_socket_checked();
1094 void handle_async_shutdown(timer_ptr shutdown_timer, shutdown_handler
1095 callback, lib::asio::error_code
const & ec)
1097 if (ec == lib::asio::error::operation_aborted ||
1098 lib::asio::is_neg(shutdown_timer->expires_from_now()))
1100 m_alog->write(log::alevel::devel,
"async_shutdown cancelled");
1104 shutdown_timer->cancel();
1106 lib::error_code tec;
1108 if (ec == lib::asio::error::not_connected) {
1116 tec = socket_con_type::translate_ec(ec);
1122 log_err(log::elevel::info,
"asio async_shutdown",ec);
1125 if (m_alog->static_test(log::alevel::devel)) {
1126 m_alog->write(log::alevel::devel,
1127 "asio con handle_async_shutdown");
1134 void cancel_socket_checked() {
1135 lib::asio::error_code cec = socket_con_type::cancel_socket();
1137 if (cec == lib::asio::error::operation_not_supported) {
1139 m_alog->write(log::alevel::devel,
"socket cancel not supported");
1141 log_err(log::elevel::warn,
"socket cancel failed", cec);
1148 template <
typename error_type>
1149 void log_err(log::level l,
const char * msg,
const error_type & ec) {
1150 std::stringstream s;
1151 s << msg <<
" error: " << ec <<
" (" << ec.message() <<
")";
1152 m_elog->write(l,s.str());
1156 const bool m_is_server;
1157 lib::shared_ptr<alog_type> m_alog;
1158 lib::shared_ptr<elog_type> m_elog;
1161 proxy_data() : timeout_proxy(config::timeout_proxy) {}
1165 std::string write_buf;
1166 lib::asio::streambuf read_buf;
1171 std::string m_proxy;
1172 lib::shared_ptr<proxy_data> m_proxy_data;
1175 io_service_ptr m_io_service;
1176 strand_ptr m_strand;
1177 connection_hdl m_connection_hdl;
1179 std::vector<lib::asio::const_buffer> m_bufs;
1182 lib::asio::error_code m_tec;
1185 tcp_init_handler m_tcp_pre_init_handler;
1186 tcp_init_handler m_tcp_post_init_handler;
1188 handler_allocator m_read_handler_allocator;
1189 handler_allocator m_write_handler_allocator;