Index  Source Files  Annotated Class List  Alphabetical Class List  Class Hierarchy  Graphical Class Hierarchy 

SocketConnection.cpp

Go to the documentation of this file.
00001 /****************************************************************************
00002 ** Copyright (c) quickfixengine.org  All rights reserved.
00003 **
00004 ** This file is part of the QuickFIX FIX Engine
00005 **
00006 ** This file may be distributed under the terms of the quickfixengine.org
00007 ** license as defined by quickfixengine.org and appearing in the file
00008 ** LICENSE included in the packaging of this file.
00009 **
00010 ** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
00011 ** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
00012 **
00013 ** See http://www.quickfixengine.org/LICENSE for licensing information.
00014 **
00015 ** Contact ask@quickfixengine.org if any conditions of this licensing are
00016 ** not clear to you.
00017 **
00018 ****************************************************************************/
00019 
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026 
00027 #include "SocketConnection.h"
00028 #include "SocketAcceptor.h"
00029 #include "SocketConnector.h"
00030 #include "SocketInitiator.h"
00031 #include "Session.h"
00032 #include "Utility.h"
00033 
00034 namespace FIX
00035 {
00036 SocketConnection::SocketConnection( int s, Sessions sessions,
00037                                     SocketMonitor* pMonitor )
00038 : m_socket( s ), m_sendLength( 0 ),
00039   m_sessions(sessions), m_pSession( 0 ), m_pMonitor( pMonitor )
00040 {
00041   FD_ZERO( &m_fds );
00042   FD_SET( m_socket, &m_fds );
00043 }
00044 
00045 SocketConnection::SocketConnection( SocketInitiator& i,
00046                                     const SessionID& sessionID, int s,
00047                                     SocketMonitor* pMonitor )
00048 : m_socket( s ), m_sendLength( 0 ),
00049   m_pSession( i.getSession( sessionID, *this ) ),
00050   m_pMonitor( pMonitor ) 
00051 {
00052   FD_ZERO( &m_fds );
00053   FD_SET( m_socket, &m_fds );
00054   m_sessions.insert( sessionID );
00055 }
00056 
00057 SocketConnection::~SocketConnection()
00058 {
00059   if ( m_pSession )
00060     Session::unregisterSession( m_pSession->getSessionID() );
00061 }
00062 
00063 bool SocketConnection::send( const std::string& msg )
00064 { QF_STACK_PUSH(SocketConnection::send)
00065 
00066   Locker l( m_mutex );
00067 
00068   m_sendQueue.push_back( msg );
00069   processQueue();
00070   signal();
00071   return true;
00072 
00073   QF_STACK_POP
00074 }
00075 
00076 bool SocketConnection::processQueue()
00077 { QF_STACK_PUSH(SocketConnection::processQueue)
00078 
00079   Locker l( m_mutex );
00080 
00081   if( !m_sendQueue.size() ) return true;
00082 
00083   struct timeval timeout = { 0, 0 };
00084   fd_set writeset = m_fds;
00085   if( select( 1 + m_socket, 0, &writeset, 0, &timeout ) <= 0 )
00086     return false;
00087     
00088   const std::string& msg = m_sendQueue.front();
00089 
00090   int result = socket_send
00091     ( m_socket, msg.c_str() + m_sendLength, msg.length() - m_sendLength );
00092 
00093   if( result > 0 )
00094     m_sendLength += result;
00095 
00096   if( m_sendLength == msg.length() )
00097   {
00098     m_sendLength = 0;
00099     m_sendQueue.pop_front();
00100   }
00101 
00102   return !m_sendQueue.size();
00103 
00104   QF_STACK_POP
00105 }
00106 
00107 void SocketConnection::disconnect()
00108 { QF_STACK_PUSH(SocketConnection::disconnect)
00109 
00110   if ( m_pMonitor )
00111     m_pMonitor->drop( m_socket );
00112 
00113   QF_STACK_POP
00114 }
00115 
00116 bool SocketConnection::read( SocketConnector& s )
00117 { QF_STACK_PUSH(SocketConnection::read)
00118 
00119   if ( !m_pSession ) return false;
00120 
00121   try
00122   {
00123     readFromSocket();
00124     readMessages( s.getMonitor() );
00125   }
00126   catch( SocketRecvFailed& e )
00127   {
00128     m_pSession->getLog()->onEvent( e.what() );
00129     return false;
00130   }
00131   return true;
00132 
00133   QF_STACK_POP
00134 }
00135 
00136 bool SocketConnection::read( SocketAcceptor& a, SocketServer& s )
00137 { QF_STACK_PUSH(SocketConnection::read)
00138 
00139   std::string msg;
00140   try
00141   {
00142     readFromSocket();
00143 
00144     if ( !m_pSession )
00145     {
00146       if ( !readMessage( msg ) ) return false;
00147       m_pSession = Session::lookupSession( msg, true );
00148       if( !isValidSession() )
00149       {
00150         m_pSession = 0;
00151         if( a.getLog() )
00152         {
00153           a.getLog()->onEvent( "Session not found for incoming message: " + msg );
00154           a.getLog()->onIncoming( msg );
00155         }
00156       }
00157       if( m_pSession )
00158         m_pSession = a.getSession( msg, *this );
00159       if( m_pSession )
00160         m_pSession->next( msg, UtcTimeStamp() );
00161       if( !m_pSession )
00162       {
00163         s.getMonitor().drop( m_socket );
00164         return false;
00165       }
00166 
00167       Session::registerSession( m_pSession->getSessionID() );
00168     }
00169 
00170     readMessages( s.getMonitor() );
00171     return true;
00172   }
00173   catch ( SocketRecvFailed& e )
00174   {
00175     if( m_pSession )
00176       m_pSession->getLog()->onEvent( e.what() );
00177     s.getMonitor().drop( m_socket );
00178   }
00179   catch ( InvalidMessage& )
00180   {
00181     s.getMonitor().drop( m_socket );
00182   }
00183   return false;
00184 
00185   QF_STACK_POP
00186 }
00187 
00188 bool SocketConnection::isValidSession()
00189 { QF_STACK_PUSH(SocketConnection::isValidSession)
00190 
00191   if( m_pSession == 0 )
00192     return false;
00193   SessionID sessionID = m_pSession->getSessionID();
00194   if( Session::isSessionRegistered(sessionID) )
00195     return false;
00196   return !( m_sessions.find(sessionID) == m_sessions.end() );
00197 
00198   QF_STACK_POP
00199 }
00200 
00201 void SocketConnection::readFromSocket()
00202 throw( SocketRecvFailed )
00203 { QF_STACK_PUSH(SocketConnection::readFromSocket)
00204 
00205   int size = recv( m_socket, m_buffer, sizeof(m_buffer), 0 );
00206   if( size <= 0 ) throw SocketRecvFailed( size );
00207   m_parser.addToStream( m_buffer, size );
00208 
00209   QF_STACK_POP
00210 }
00211 
00212 bool SocketConnection::readMessage( std::string& msg )
00213 { QF_STACK_PUSH(SocketConnection::readMessage)
00214 
00215   try
00216   {
00217     return m_parser.readFixMessage( msg );
00218   }
00219   catch ( MessageParseError& ) {}
00220   return true;
00221 
00222   QF_STACK_POP
00223 }
00224 
00225 void SocketConnection::readMessages( SocketMonitor& s )
00226 {
00227   if( !m_pSession ) return;
00228 
00229   std::string msg;
00230   while( readMessage( msg ) )
00231   {
00232     try
00233     {
00234       m_pSession->next( msg, UtcTimeStamp() );
00235     }
00236     catch ( InvalidMessage& )
00237     {
00238       if( !m_pSession->isLoggedOn() )
00239         s.drop( m_socket );
00240     }
00241   }
00242 }
00243 
00244 void SocketConnection::onTimeout()
00245 { QF_STACK_PUSH(SocketConnection::onTimeout)
00246   if ( m_pSession ) m_pSession->next();
00247   QF_STACK_POP
00248 }
00249 } // namespace FIX

Generated on Mon Apr 5 20:59:51 2010 for QuickFIX by doxygen 1.6.1 written by Dimitri van Heesch, © 1997-2001