SocketAcceptor.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026
00027 #include "SocketAcceptor.h"
00028 #include "Session.h"
00029 #include "Settings.h"
00030 #include "Utility.h"
00031 #include "Exceptions.h"
00032
00033 namespace FIX
00034 {
00035 SocketAcceptor::SocketAcceptor( Application& application,
00036 MessageStoreFactory& factory,
00037 const SessionSettings& settings ) throw( ConfigError )
00038 : Acceptor( application, factory, settings ),
00039 m_pServer( 0 ) {}
00040
00041 SocketAcceptor::SocketAcceptor( Application& application,
00042 MessageStoreFactory& factory,
00043 const SessionSettings& settings,
00044 LogFactory& logFactory ) throw( ConfigError )
00045 : Acceptor( application, factory, settings, logFactory ),
00046 m_pServer( 0 )
00047 {
00048 }
00049
00050 SocketAcceptor::~SocketAcceptor()
00051 {
00052 SocketConnections::iterator iter;
00053 for ( iter = m_connections.begin(); iter != m_connections.end(); ++iter )
00054 delete iter->second;
00055 }
00056
00057 void SocketAcceptor::onConfigure( const SessionSettings& s )
00058 throw ( ConfigError )
00059 { QF_STACK_PUSH(SocketAcceptor::onConfigure)
00060
00061 std::set<SessionID> sessions = s.getSessions();
00062 std::set<SessionID>::iterator i;
00063 for( i = sessions.begin(); i != sessions.end(); ++i )
00064 {
00065 const Dictionary& settings = s.get( *i );
00066 settings.getLong( SOCKET_ACCEPT_PORT );
00067 if( settings.has(SOCKET_REUSE_ADDRESS) )
00068 settings.getBool( SOCKET_REUSE_ADDRESS );
00069 if( settings.has(SOCKET_NODELAY) )
00070 settings.getBool( SOCKET_NODELAY );
00071 }
00072
00073 QF_STACK_POP
00074 }
00075
00076 void SocketAcceptor::onInitialize( const SessionSettings& s )
00077 throw ( RuntimeError )
00078 { QF_STACK_PUSH(SocketAcceptor::onInitialize)
00079
00080 short port = 0;
00081
00082 try
00083 {
00084 m_pServer = new SocketServer( 1 );
00085
00086 std::set<SessionID> sessions = s.getSessions();
00087 std::set<SessionID>::iterator i = sessions.begin();
00088 for( ; i != sessions.end(); ++i )
00089 {
00090 Dictionary settings = s.get( *i );
00091 short port = (short)settings.getLong( SOCKET_ACCEPT_PORT );
00092
00093 const bool reuseAddress = settings.has( SOCKET_REUSE_ADDRESS ) ?
00094 s.get().getBool( SOCKET_REUSE_ADDRESS ) : true;
00095
00096 const bool noDelay = settings.has( SOCKET_NODELAY ) ?
00097 s.get().getBool( SOCKET_NODELAY ) : false;
00098
00099 const int sendBufSize = settings.has( SOCKET_SEND_BUFFER_SIZE ) ?
00100 s.get().getLong( SOCKET_SEND_BUFFER_SIZE ) : 0;
00101
00102 const int rcvBufSize = settings.has( SOCKET_RECEIVE_BUFFER_SIZE ) ?
00103 s.get().getLong( SOCKET_RECEIVE_BUFFER_SIZE ) : 0;
00104
00105 m_portToSessions[port].insert( *i );
00106 m_pServer->add( port, reuseAddress, noDelay, sendBufSize, rcvBufSize );
00107 }
00108 }
00109 catch( SocketException& e )
00110 {
00111 throw RuntimeError( "Unable to create, bind, or listen to port "
00112 + IntConvertor::convert( (unsigned short)port ) + " (" + e.what() + ")" );
00113 }
00114
00115 QF_STACK_POP
00116 }
00117
00118 void SocketAcceptor::onStart()
00119 { QF_STACK_PUSH(SocketAcceptor::onStart)
00120
00121 while ( !isStopped() && m_pServer && m_pServer->block( *this ) ) {}
00122
00123 if( !m_pServer )
00124 return;
00125
00126 time_t start = 0;
00127 time_t now = 0;
00128
00129 ::time( &start );
00130 while ( isLoggedOn() )
00131 {
00132 m_pServer->block( *this );
00133 if( ::time(&now) -5 >= start )
00134 break;
00135 }
00136
00137 m_pServer->close();
00138 delete m_pServer;
00139 m_pServer = 0;
00140
00141 QF_STACK_POP
00142 }
00143
00144 bool SocketAcceptor::onPoll( double timeout )
00145 { QF_STACK_PUSH(SocketAcceptor::onPoll)
00146
00147 if( !m_pServer )
00148 return false;
00149
00150 time_t start = 0;
00151 time_t now = 0;
00152
00153 if( isStopped() )
00154 {
00155 if( start == 0 )
00156 ::time( &start );
00157 if( !isLoggedOn() )
00158 {
00159 start = 0;
00160 return false;
00161 }
00162 if( ::time(&now) - 5 >= start )
00163 {
00164 start = 0;
00165 return false;
00166 }
00167 }
00168
00169 m_pServer->block( *this, true, timeout );
00170 return true;
00171
00172 QF_STACK_POP
00173 }
00174
00175 void SocketAcceptor::onStop()
00176 { QF_STACK_PUSH(SocketAcceptor::onStop)
00177 QF_STACK_POP
00178 }
00179
00180 void SocketAcceptor::onConnect( SocketServer& server, int a, int s )
00181 { QF_STACK_PUSH(SocketAcceptor::onConnect)
00182
00183 if ( !socket_isValid( s ) ) return;
00184 SocketConnections::iterator i = m_connections.find( s );
00185 if ( i != m_connections.end() ) return;
00186 int port = server.socketToPort( a );
00187 Sessions sessions = m_portToSessions[port];
00188 m_connections[ s ] = new SocketConnection( s, sessions, &server.getMonitor() );
00189
00190 std::stringstream stream;
00191 stream << "Accepted connection from " << socket_peername( s ) << " on port " << port;
00192
00193 if( getLog() )
00194 getLog()->onEvent( stream.str() );
00195
00196 QF_STACK_POP
00197 }
00198
00199 void SocketAcceptor::onWrite( SocketServer& server, int s )
00200 { QF_STACK_PUSH(SocketAcceptor::onWrite)
00201
00202 SocketConnections::iterator i = m_connections.find( s );
00203 if ( i == m_connections.end() ) return ;
00204 SocketConnection* pSocketConnection = i->second;
00205 if( pSocketConnection->processQueue() )
00206 pSocketConnection->unsignal();
00207
00208 QF_STACK_POP
00209 }
00210
00211 bool SocketAcceptor::onData( SocketServer& server, int s )
00212 { QF_STACK_PUSH(SocketAcceptor::onData)
00213
00214 SocketConnections::iterator i = m_connections.find( s );
00215 if ( i == m_connections.end() ) return false;
00216 SocketConnection* pSocketConnection = i->second;
00217 return pSocketConnection->read( *this, server );
00218
00219 QF_STACK_POP
00220 }
00221
00222 void SocketAcceptor::onDisconnect( SocketServer&, int s )
00223 { QF_STACK_PUSH(SocketAcceptor::onDisconnect)
00224
00225 SocketConnections::iterator i = m_connections.find( s );
00226 if ( i == m_connections.end() ) return ;
00227 SocketConnection* pSocketConnection = i->second;
00228
00229 Session* pSession = pSocketConnection->getSession();
00230 if ( pSession ) pSession->disconnect();
00231
00232 delete pSocketConnection;
00233 m_connections.erase( s );
00234
00235 QF_STACK_POP
00236 }
00237
00238 void SocketAcceptor::onError( SocketServer& ) {}
00239
00240 void SocketAcceptor::onTimeout( SocketServer& )
00241 { QF_STACK_PUSH(SocketAcceptor::onInitialize)
00242
00243 SocketConnections::iterator i;
00244 for ( i = m_connections.begin(); i != m_connections.end(); ++i )
00245 i->second->onTimeout();
00246
00247 QF_STACK_POP
00248 }
00249 }