ThreadedSocketInitiator.cpp
Go to the documentation of this file.
1 /****************************************************************************
2 ** Copyright (c) 2001-2014
3 **
4 ** This file is part of the QuickFIX FIX Engine
5 **
6 ** This file may be distributed under the terms of the quickfixengine.org
7 ** license as defined by quickfixengine.org and appearing in the file
8 ** LICENSE included in the packaging of this file.
9 **
10 ** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
11 ** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
12 **
13 ** See http://www.quickfixengine.org/LICENSE for licensing information.
14 **
15 ** Contact ask@quickfixengine.org if any conditions of this licensing are
16 ** not clear to you.
17 **
18 ****************************************************************************/
19 
20 #ifdef _MSC_VER
21 #include "stdafx.h"
22 #else
23 #include "config.h"
24 #endif
25 
27 #include "Session.h"
28 #include "Settings.h"
29 
30 namespace FIX
31 {
33  Application& application,
34  MessageStoreFactory& factory,
35  const SessionSettings& settings ) throw( ConfigError )
36 : Initiator( application, factory, settings ),
37  m_lastConnect( 0 ), m_reconnectInterval( 30 ), m_noDelay( false ),
38  m_sendBufSize( 0 ), m_rcvBufSize( 0 )
39 {
40  socket_init();
41 }
42 
44  Application& application,
45  MessageStoreFactory& factory,
46  const SessionSettings& settings,
47  LogFactory& logFactory ) throw( ConfigError )
48 : Initiator( application, factory, settings, logFactory ),
49  m_lastConnect( 0 ), m_reconnectInterval( 30 ), m_noDelay( false ),
50  m_sendBufSize( 0 ), m_rcvBufSize( 0 )
51 {
52  socket_init();
53 }
54 
56 {
57  socket_term();
58 }
59 
61 throw ( ConfigError )
62 {
63  const Dictionary& dict = s.get();
64 
65  if( dict.has( RECONNECT_INTERVAL ) )
67  if( dict.has( SOCKET_NODELAY ) )
69  if( dict.has( SOCKET_SEND_BUFFER_SIZE ) )
71  if( dict.has( SOCKET_RECEIVE_BUFFER_SIZE ) )
73 }
74 
76 throw ( RuntimeError )
77 {
78 }
79 
81 {
82  while ( !isStopped() )
83  {
84  time_t now;
85  ::time( &now );
86 
87  if ( (now - m_lastConnect) >= m_reconnectInterval )
88  {
89  Locker l( m_mutex );
90  connect();
91  m_lastConnect = now;
92  }
93 
94  process_sleep( 1 );
95  }
96 }
97 
98 bool ThreadedSocketInitiator::onPoll( double timeout )
99 {
100  return false;
101 }
102 
104 {
105  SocketToThread threads;
106  SocketToThread::iterator i;
107 
108  {
109  Locker l(m_mutex);
110 
111  time_t start = 0;
112  time_t now = 0;
113 
114  ::time( &start );
115  while ( isLoggedOn() )
116  {
117  if( ::time(&now) -5 >= start )
118  break;
119  }
120 
121  threads = m_threads;
122  m_threads.clear();
123  }
124 
125  for ( i = threads.begin(); i != threads.end(); ++i )
126  socket_close( i->first );
127 
128  for ( i = threads.begin(); i != threads.end(); ++i )
129  thread_join( i->second );
130  threads.clear();
131 }
132 
134 {
135  try
136  {
137  Session* session = Session::lookupSession( s );
138  if( !session->isSessionTime(UtcTimeStamp()) ) return;
139 
140  Log* log = session->getLog();
141 
142  std::string address;
143  short port = 0;
144  getHost( s, d, address, port );
145 
146  int socket = socket_createConnector();
147  if( m_noDelay )
148  socket_setsockopt( socket, TCP_NODELAY );
149  if( m_sendBufSize )
150  socket_setsockopt( socket, SO_SNDBUF, m_sendBufSize );
151  if( m_rcvBufSize )
152  socket_setsockopt( socket, SO_RCVBUF, m_rcvBufSize );
153 
154  setPending( s );
155  log->onEvent( "Connecting to " + address + " on port " + IntConvertor::convert((unsigned short)port) );
156 
157  ThreadedSocketConnection* pConnection =
158  new ThreadedSocketConnection( s, socket, address, port, getLog() );
159 
160  ThreadPair* pair = new ThreadPair( this, pConnection );
161 
162  {
163  Locker l( m_mutex );
164  thread_id thread;
165  if ( thread_spawn( &socketThread, pair, thread ) )
166  {
167  addThread( socket, thread );
168  }
169  else
170  {
171  delete pair;
172  pConnection->disconnect();
173  delete pConnection;
174  setDisconnected( s );
175  }
176  }
177  }
178  catch ( std::exception& ) {}
179 }
180 
182 {
183  Locker l(m_mutex);
184 
185  m_threads[ s ] = t;
186 }
187 
189 {
190  Locker l(m_mutex);
191  SocketToThread::iterator i = m_threads.find( s );
192 
193  if ( i != m_threads.end() )
194  {
195  thread_detach( i->second );
196  m_threads.erase( i );
197  }
198 }
199 
201 {
202  ThreadPair * pair = reinterpret_cast < ThreadPair* > ( p );
203 
204  ThreadedSocketInitiator* pInitiator = pair->first;
205  ThreadedSocketConnection* pConnection = pair->second;
206  FIX::SessionID sessionID = pConnection->getSession()->getSessionID();
207  FIX::Session* pSession = FIX::Session::lookupSession( sessionID );
208  int socket = pConnection->getSocket();
209  delete pair;
210 
211  pInitiator->lock();
212 
213  if( !pConnection->connect() )
214  {
215  pInitiator->getLog()->onEvent( "Connection failed" );
216  pConnection->disconnect();
217  delete pConnection;
218  pInitiator->removeThread( socket );
219  pInitiator->setDisconnected( sessionID );
220  return 0;
221  }
222 
223  pInitiator->setConnected( sessionID );
224  pInitiator->getLog()->onEvent( "Connection succeeded" );
225 
226  pSession->next();
227 
228  while ( pConnection->read() ) {}
229 
230  delete pConnection;
231  if( !pInitiator->isStopped() )
232  pInitiator->removeThread( socket );
233 
234  pInitiator->setDisconnected( sessionID );
235  return 0;
236 }
237 
239  std::string& address, short& port )
240 {
241  int num = 0;
242  SessionToHostNum::iterator i = m_sessionToHostNum.find( s );
243  if ( i != m_sessionToHostNum.end() ) num = i->second;
244 
245  std::stringstream hostStream;
246  hostStream << SOCKET_CONNECT_HOST << num;
247  std::string hostString = hostStream.str();
248 
249  std::stringstream portStream;
250  portStream << SOCKET_CONNECT_PORT << num;
251  std::string portString = portStream.str();
252 
253  if( d.has(hostString) && d.has(portString) )
254  {
255  address = d.getString( hostString );
256  port = ( short ) d.getInt( portString );
257  }
258  else
259  {
260  num = 0;
261  address = d.getString( SOCKET_CONNECT_HOST );
262  port = ( short ) d.getInt( SOCKET_CONNECT_PORT );
263  }
264 
265  m_sessionToHostNum[ s ] = ++num;
266 }
267 
268 }
void getHost(const SessionID &, const Dictionary &, std::string &, short &)
void thread_join(thread_id thread)
Definition: Utility.cpp:415
static Session * lookupSession(const SessionID &)
Definition: Session.cpp:1403
void socket_init()
Definition: Utility.cpp:81
Maintains the state and implements the logic of a FIX session.
Definition: Session.h:45
Encapsulates a socket file descriptor (multi-threaded).
std::pair< ThreadedSocketInitiator *, ThreadedSocketConnection *> ThreadPair
void socket_term()
Definition: Utility.cpp:96
const char RECONNECT_INTERVAL[]
void next()
Definition: Session.cpp:125
void thread_detach(thread_id thread)
Definition: Utility.cpp:425
int socket_createConnector()
Definition: Utility.cpp:126
const char SOCKET_NODELAY[]
void process_sleep(double s)
Definition: Utility.cpp:444
std::map< int, thread_id > SocketToThread
const char SOCKET_CONNECT_HOST[]
const char SOCKET_SEND_BUFFER_SIZE[]
bool thread_spawn(THREAD_START_ROUTINE func, void *var, thread_id &thread)
Definition: Utility.cpp:394
bool has(const std::string &) const
Check if the dictionary contains a value for key.
Definition: Dictionary.cpp:150
void onStop()
Implemented to stop a running initiator.
const char SOCKET_CONNECT_PORT[]
static std::string convert(signed_int value)
bool onPoll(double timeout)
Implemented to connect and poll for events.
ThreadedSocketInitiator(Application &, MessageStoreFactory &, const SessionSettings &)
static THREAD_PROC socketThread(void *p)
const char SOCKET_RECEIVE_BUFFER_SIZE[]
int socket_setsockopt(int s, int opt)
Definition: Utility.cpp:186
This interface must be implemented to log messages and events.
Definition: Log.h:81
Definition: Acceptor.cpp:34
virtual void onEvent(const std::string &)=0
Log * getLog()
Definition: Session.h:214
This interface must be implemented to define what your FIX application does.
Definition: Application.h:43
Application encountered serious error during runtime
Definition: Exceptions.h:94
Application is not configured correctly
Definition: Exceptions.h:87
Locks/Unlocks a mutex using RAII.
Definition: Mutex.h:95
void onConfigure(const SessionSettings &)
Implemented to configure acceptor.
Container for setting dictionaries mapped to sessions.
void start()
Start initiator.
Definition: Initiator.cpp:190
void onInitialize(const SessionSettings &)
Implemented to initialize initiator.
This interface must be implemented to create a Log.
Definition: Log.h:42
const SessionID & getSessionID() const
Definition: Session.h:75
Log * getLog()
Definition: Initiator.h:90
This interface must be implemented to create a MessageStore.
Definition: MessageStore.h:41
void onStart()
Implemented to start connecting to targets.
bool isLoggedOn()
Check to see if any sessions are currently logged on.
Definition: Initiator.cpp:269
pthread_t thread_id
Definition: Utility.h:164
Date and Time represented in UTC.
Definition: FieldTypes.h:399
For storage and retrieval of key/value pairs.
Definition: Dictionary.h:36
std::string getString(const std::string &, bool capitalize=false) const
Get a value as a string.
Definition: Dictionary.cpp:32
void doConnect(const SessionID &s, const Dictionary &d)
Implemented to connect a session to its target.
Unique session id consists of BeginString, SenderCompID and TargetCompID.
Definition: SessionID.h:30
void socket_close(int s)
Definition: Utility.cpp:158
bool isSessionTime(const UtcTimeStamp &time)
Definition: Session.h:108
bool getBool(const std::string &) const
Get a value as a bool.
Definition: Dictionary.cpp:71
#define THREAD_PROC
Definition: Utility.h:158
int getInt(const std::string &) const
Get a value as a int.
Definition: Dictionary.cpp:45
Threaded Socket implementation of Initiator.
bool isStopped()
Definition: Initiator.h:83
Base for classes which act as an initiator for establishing connections.
Definition: Initiator.h:51
void setPending(const SessionID &)
Definition: Initiator.cpp:145
void setDisconnected(const SessionID &)
Definition: Initiator.cpp:163

Generated on Thu Sep 5 2019 11:07:58 for QuickFIX by doxygen 1.8.13 written by Dimitri van Heesch, © 1997-2001