19 #ifndef STREAMSOCKET_IMPL_H_
20 #define STREAMSOCKET_IMPL_H_
26 #include "StreamSocket.h"
28 #include <Atlas/Codec.h>
30 static const int CONNECT_TIMEOUT_SECONDS = 5;
35 template<
typename ProtocolT>
36 AsioStreamSocket<ProtocolT>::AsioStreamSocket(
37 boost::asio::io_service& io_service,
const std::string& client_name,
38 Atlas::Bridge& bridge, StreamSocket::Callbacks callbacks) :
39 StreamSocket(io_service, client_name, bridge, std::move(callbacks)),
44 template<
typename ProtocolT>
45 AsioStreamSocket<ProtocolT>::~AsioStreamSocket()
47 if (m_socket.is_open()) {
50 m_socket.shutdown(ProtocolT::socket::shutdown_both);
51 }
catch (
const std::exception& e) {
52 warning() <<
"Error when shutting down socket: " << e.what();
57 }
catch (
const std::exception&) {
58 warning() <<
"Error when closing socket.";
63 template<
typename ProtocolT>
64 typename ProtocolT::socket& AsioStreamSocket<ProtocolT>::getAsioSocket()
69 template<
typename ProtocolT>
70 ResolvableAsioStreamSocket<ProtocolT>::ResolvableAsioStreamSocket(
71 boost::asio::io_service& io_service,
const std::string& client_name,
72 Atlas::Bridge& bridge, StreamSocket::Callbacks callbacks) :
73 AsioStreamSocket<ProtocolT>(io_service, client_name, bridge, std::move(callbacks)),
74 m_resolver(io_service)
80 template<
typename ProtocolT>
81 void ResolvableAsioStreamSocket<ProtocolT>::connectWithQuery(
82 const typename ProtocolT::resolver::query& query)
84 auto self(this->shared_from_this());
85 m_resolver.async_resolve(query,
86 [&,
self](
const boost::system::error_code& ec,
typename ProtocolT::resolver::iterator iterator) {
87 if (this->_callbacks.stateChanged) {
88 if (!ec && iterator !=
typename ProtocolT::resolver::iterator()) {
89 this->connect(*iterator);
97 template<
typename ProtocolT>
98 void AsioStreamSocket<ProtocolT>::connect(
99 const typename ProtocolT::endpoint& endpoint)
101 _connectTimer.expires_from_now(
102 std::chrono::seconds(CONNECT_TIMEOUT_SECONDS));
103 auto self(this->shared_from_this());
104 _connectTimer.async_wait([&,
self](boost::system::error_code ec) {
106 if (_callbacks.stateChanged) {
107 _callbacks.stateChanged(CONNECTING_TIMEOUT);
112 m_socket.async_connect(endpoint,
113 [
this,
self](boost::system::error_code ec) {
114 if (_callbacks.stateChanged) {
116 this->_connectTimer.cancel();
117 m_is_connected =
true;
118 this->startNegotiation();
120 _callbacks.stateChanged(CONNECTING_FAILED);
126 template<
typename ProtocolT>
127 void AsioStreamSocket<ProtocolT>::negotiate_read()
129 auto self(this->shared_from_this());
130 m_socket.async_read_some(mReadBuffer.prepare(read_buffer_size),
131 [
this,
self](boost::system::error_code ec, std::size_t length)
133 if (_callbacks.stateChanged) {
136 mReadBuffer.commit(length);
138 auto negotiateResult = this->negotiate();
139 if (negotiateResult == Atlas::Negotiate::FAILED) {
141 _callbacks.stateChanged(NEGOTIATE_FAILED);
147 if (_sc ==
nullptr) {
151 this->negotiate_write();
152 this->negotiate_read();
155 if (ec != boost::asio::error::operation_aborted) {
156 _callbacks.stateChanged(CONNECTION_FAILED);
158 warning() <<
"Error when reading from socket while negotiating: (" << ec <<
") " << ec.message();
165 template<
typename ProtocolT>
166 void AsioStreamSocket<ProtocolT>::do_read()
168 auto self(this->shared_from_this());
169 m_socket.async_read_some(mReadBuffer.prepare(read_buffer_size),
170 [
this,
self](boost::system::error_code ec, std::size_t length)
172 if (_callbacks.stateChanged) {
175 mReadBuffer.commit(length);
177 _callbacks.dispatch();
180 if (ec != boost::asio::error::operation_aborted) {
181 _callbacks.stateChanged(CONNECTION_FAILED);
183 warning() <<
"Error when reading from socket: (" << ec <<
") " << ec.message();
190 template<
typename ProtocolT>
193 if (mWriteBuffer->size() != 0) {
204 auto self(this->shared_from_this());
206 std::swap(mWriteBuffer, mSendBuffer);
207 mOutStream.rdbuf(mWriteBuffer.get());
210 async_write(m_socket, mSendBuffer->data(),
211 [
this,
self](boost::system::error_code ec, std::size_t length)
213 mSendBuffer->consume(length);
221 if (ec != boost::asio::error::operation_aborted) {
222 if (_callbacks.stateChanged) {
223 _callbacks.stateChanged(CONNECTION_FAILED);
226 warning() <<
"Error when writing to socket: (" << ec <<
") " << ec.message();
234 template<
typename ProtocolT>
235 void AsioStreamSocket<ProtocolT>::negotiate_write()
238 if (mWriteBuffer->size() != 0) {
239 auto self(this->shared_from_this());
240 boost::asio::async_write(m_socket, mWriteBuffer->data(),
241 [
this,
self](boost::system::error_code ec, std::size_t length)
245 this->mWriteBuffer->consume(length);
247 warning() <<
"Error when writing to socket while negotiating: (" << ec <<
") " << ec.message();