ThreadedSocketConnection.cpp
Go to the documentation of this file.
1 /****************************************************************************
2 ** Copyright (c) 2001-2014
3 **
4 ** This file is part of the QuickFIX FIX Engine
5 **
6 ** This file may be distributed under the terms of the quickfixengine.org
7 ** license as defined by quickfixengine.org and appearing in the file
8 ** LICENSE included in the packaging of this file.
9 **
10 ** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
11 ** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
12 **
13 ** See http://www.quickfixengine.org/LICENSE for licensing information.
14 **
15 ** Contact ask@quickfixengine.org if any conditions of this licensing are
16 ** not clear to you.
17 **
18 ****************************************************************************/
19 
20 #ifdef _MSC_VER
21 #include "stdafx.h"
22 #else
23 #include "config.h"
24 #endif
25 
27 #include "ThreadedSocketAcceptor.h"
29 #include "Session.h"
30 #include "Utility.h"
31 
32 namespace FIX
33 {
35 ( int s, Sessions sessions, Log* pLog )
36 : m_socket( s ), m_pLog( pLog ),
37  m_sessions( sessions ), m_pSession( 0 ),
38  m_disconnect( false )
39 {
40  FD_ZERO( &m_fds );
41  FD_SET( m_socket, &m_fds );
42 }
43 
45 ( const SessionID& sessionID, int s,
46  const std::string& address, short port,
47  Log* pLog )
48  : m_socket( s ), m_address( address ), m_port( port ),
49  m_pLog( pLog ),
50  m_pSession( Session::lookupSession( sessionID ) ),
51  m_disconnect( false )
52 {
53  FD_ZERO( &m_fds );
54  FD_SET( m_socket, &m_fds );
55  if ( m_pSession ) m_pSession->setResponder( this );
56 }
57 
59 {
60  if ( m_pSession )
61  {
64  }
65 }
66 
67 bool ThreadedSocketConnection::send( const std::string& msg )
68 {
69  int totalSent = 0;
70  while(totalSent < (int)msg.length())
71  {
72  ssize_t sent = socket_send( m_socket, msg.c_str() + totalSent, msg.length() );
73  if(sent < 0) return false;
74  totalSent += sent;
75  }
76 
77  return true;
78 }
79 
81 {
82  return socket_connect(getSocket(), m_address.c_str(), m_port) >= 0;
83 }
84 
86 {
87  m_disconnect = true;
89 }
90 
92 {
93  struct timeval timeout = { 1, 0 };
94  fd_set readset = m_fds;
95 
96  try
97  {
98  // Wait for input (1 second timeout)
99  int result = select( 1 + m_socket, &readset, 0, 0, &timeout );
100 
101  if( result > 0 ) // Something to read
102  {
103  // We can read without blocking
104  ssize_t size = recv( m_socket, m_buffer, sizeof(m_buffer), 0 );
105  if ( size <= 0 ) { throw SocketRecvFailed( size ); }
106  m_parser.addToStream( m_buffer, size );
107  }
108  else if( result == 0 && m_pSession ) // Timeout
109  {
110  m_pSession->next();
111  }
112  else if( result < 0 ) // Error
113  {
114  throw SocketRecvFailed( result );
115  }
116 
117  processStream();
118  return true;
119  }
120  catch ( SocketRecvFailed& e )
121  {
122  if( m_disconnect )
123  return false;
124 
125  if( m_pSession )
126  {
127  m_pSession->getLog()->onEvent( e.what() );
129  }
130  else
131  {
132  disconnect();
133  }
134 
135  return false;
136  }
137 }
138 
140 throw( SocketRecvFailed )
141 {
142  try
143  {
144  return m_parser.readFixMessage( msg );
145  }
146  catch ( MessageParseError& ) {}
147  return true;
148 }
149 
151 {
152  std::string msg;
153  while( readMessage(msg) )
154  {
155  if ( !m_pSession )
156  {
157  if ( !setSession( msg ) )
158  { disconnect(); continue; }
159  }
160  try
161  {
162  m_pSession->next( msg, UtcTimeStamp() );
163  }
164  catch( InvalidMessage& )
165  {
166  if( !m_pSession->isLoggedOn() )
167  {
168  disconnect();
169  return;
170  }
171  }
172  }
173 }
174 
175 bool ThreadedSocketConnection::setSession( const std::string& msg )
176 {
177  m_pSession = Session::lookupSession( msg, true );
178  if ( !m_pSession )
179  {
180  if( m_pLog )
181  {
182  m_pLog->onEvent( "Session not found for incoming message: " + msg );
183  m_pLog->onIncoming( msg );
184  }
185  return false;
186  }
187 
188  SessionID sessionID = m_pSession->getSessionID();
189  m_pSession = 0;
190 
191  // see if the session frees up within 5 seconds
192  for( int i = 1; i <= 5; i++ )
193  {
194  if( !Session::isSessionRegistered( sessionID ) )
195  m_pSession = Session::registerSession( sessionID );
196  if( m_pSession ) break;
197  process_sleep( 1 );
198  }
199 
200  if ( !m_pSession )
201  return false;
202  if ( m_sessions.find(m_pSession->getSessionID()) == m_sessions.end() )
203  return false;
204 
205  m_pSession->setResponder( this );
206  return true;
207 }
208 
209 } // namespace FIX
static Session * lookupSession(const SessionID &)
Definition: Session.cpp:1403
int socket_connect(int socket, const char *address, int port)
Definition: Utility.cpp:131
static void unregisterSession(const SessionID &)
Definition: Session.cpp:1454
Unable to parse message.
Definition: Exceptions.h:73
void next()
Definition: Session.cpp:125
void process_sleep(double s)
Definition: Utility.cpp:444
ThreadedSocketConnection(int s, Sessions sessions, Log *pLog)
static bool isSessionRegistered(const SessionID &)
Definition: Session.cpp:1438
This interface must be implemented to log messages and events.
Definition: Log.h:81
Definition: Acceptor.cpp:34
static Session * registerSession(const SessionID &)
Definition: Session.cpp:1444
virtual void onEvent(const std::string &)=0
Log * getLog()
Definition: Session.h:214
Socket recv operation failed.
Definition: Exceptions.h:280
void disconnect()
Definition: Session.cpp:542
void addToStream(const char *str, size_t len)
Definition: Parser.h:48
const SessionID & getSessionID() const
Definition: Session.h:75
bool setSession(const std::string &msg)
virtual void onIncoming(const std::string &)=0
bool readFixMessage(std::string &str)
Definition: Parser.cpp:59
ssize_t socket_send(int s, const char *msg, size_t length)
Definition: Utility.cpp:153
void setResponder(Responder *pR)
Definition: Session.h:197
Unique session id consists of BeginString, SenderCompID and TargetCompID.
Definition: SessionID.h:30
void socket_close(int s)
Definition: Utility.cpp:158
Not a recognizable message.
Definition: Exceptions.h:80
bool isLoggedOn()
Definition: Session.h:65

Generated on Thu Sep 5 2019 11:07:58 for QuickFIX by doxygen 1.8.13 written by Dimitri van Heesch, © 1997-2001