Public Member Functions | Private Types | Private Member Functions | Static Private Member Functions | Private Attributes | List of all members
FIX::ThreadedSocketInitiator Class Reference

Threaded Socket implementation of Initiator. More...

#include <ThreadedSocketInitiator.h>

Inheritance diagram for FIX::ThreadedSocketInitiator:
Inheritance graph
[legend]
Collaboration diagram for FIX::ThreadedSocketInitiator:
Collaboration graph
[legend]

Public Member Functions

 ThreadedSocketInitiator (Application &, MessageStoreFactory &, const SessionSettings &) throw ( ConfigError )
 
 ThreadedSocketInitiator (Application &, MessageStoreFactory &, const SessionSettings &, LogFactory &) throw ( ConfigError )
 
virtual ~ThreadedSocketInitiator ()
 
- Public Member Functions inherited from FIX::Initiator
 Initiator (Application &, MessageStoreFactory &, const SessionSettings &) throw ( ConfigError )
 
 Initiator (Application &, MessageStoreFactory &, const SessionSettings &, LogFactory &) throw ( ConfigError )
 
virtual ~Initiator ()
 
void start () throw ( ConfigError, RuntimeError )
 Start initiator. More...
 
void block () throw ( ConfigError, RuntimeError )
 Block on the initiator. More...
 
bool poll (double timeout=0.0) throw ( ConfigError, RuntimeError )
 Poll the initiator. More...
 
void stop (bool force=false)
 Stop initiator. More...
 
bool isLoggedOn ()
 Check to see if any sessions are currently logged on. More...
 
SessiongetSession (const SessionID &sessionID, Responder &)
 
const std::set< SessionID > & getSessions () const
 
SessiongetSession (const SessionID &sessionID) const
 
const Dictionary *const getSessionSettings (const SessionID &sessionID) const
 
bool has (const SessionID &id)
 
bool isStopped ()
 
ApplicationgetApplication ()
 
MessageStoreFactorygetMessageStoreFactory ()
 
LoggetLog ()
 

Private Types

typedef std::map< int, thread_idSocketToThread
 
typedef std::map< SessionID, int > SessionToHostNum
 
typedef std::pair< ThreadedSocketInitiator *, ThreadedSocketConnection *> ThreadPair
 

Private Member Functions

void onConfigure (const SessionSettings &) throw ( ConfigError )
 Implemented to configure acceptor. More...
 
void onInitialize (const SessionSettings &) throw ( RuntimeError )
 Implemented to initialize initiator. More...
 
void onStart ()
 Implemented to start connecting to targets. More...
 
bool onPoll (double timeout)
 Implemented to connect and poll for events. More...
 
void onStop ()
 Implemented to stop a running initiator. More...
 
void doConnect (const SessionID &s, const Dictionary &d)
 Implemented to connect a session to its target. More...
 
void addThread (int s, thread_id t)
 
void removeThread (int s)
 
void lock ()
 
void getHost (const SessionID &, const Dictionary &, std::string &, short &)
 

Static Private Member Functions

static THREAD_PROC socketThread (void *p)
 

Private Attributes

SessionSettings m_settings
 
SessionToHostNum m_sessionToHostNum
 
time_t m_lastConnect
 
int m_reconnectInterval
 
bool m_noDelay
 
int m_sendBufSize
 
int m_rcvBufSize
 
SocketToThread m_threads
 
Mutex m_mutex
 

Additional Inherited Members

- Protected Member Functions inherited from FIX::Initiator
void setPending (const SessionID &)
 
void setConnected (const SessionID &)
 
void setDisconnected (const SessionID &)
 
bool isPending (const SessionID &)
 
bool isConnected (const SessionID &)
 
bool isDisconnected (const SessionID &)
 
void connect ()
 

Detailed Description

Threaded Socket implementation of Initiator.

Definition at line 39 of file ThreadedSocketInitiator.h.

Member Typedef Documentation

◆ SessionToHostNum

Definition at line 52 of file ThreadedSocketInitiator.h.

◆ SocketToThread

typedef std::map< int, thread_id > FIX::ThreadedSocketInitiator::SocketToThread
private

Definition at line 51 of file ThreadedSocketInitiator.h.

◆ ThreadPair

Definition at line 53 of file ThreadedSocketInitiator.h.

Constructor & Destructor Documentation

◆ ThreadedSocketInitiator() [1/2]

FIX::ThreadedSocketInitiator::ThreadedSocketInitiator ( Application application,
MessageStoreFactory factory,
const SessionSettings settings 
)
throw (ConfigError
)

Definition at line 32 of file ThreadedSocketInitiator.cpp.

References FIX::socket_init().

36 : Initiator( application, factory, settings ),
37  m_lastConnect( 0 ), m_reconnectInterval( 30 ), m_noDelay( false ),
38  m_sendBufSize( 0 ), m_rcvBufSize( 0 )
39 {
40  socket_init();
41 }
void socket_init()
Definition: Utility.cpp:81
Initiator(Application &, MessageStoreFactory &, const SessionSettings &)
Definition: Initiator.cpp:36

◆ ThreadedSocketInitiator() [2/2]

FIX::ThreadedSocketInitiator::ThreadedSocketInitiator ( Application application,
MessageStoreFactory factory,
const SessionSettings settings,
LogFactory logFactory 
)
throw (ConfigError
)

Definition at line 43 of file ThreadedSocketInitiator.cpp.

References FIX::socket_init().

48 : Initiator( application, factory, settings, logFactory ),
49  m_lastConnect( 0 ), m_reconnectInterval( 30 ), m_noDelay( false ),
50  m_sendBufSize( 0 ), m_rcvBufSize( 0 )
51 {
52  socket_init();
53 }
void socket_init()
Definition: Utility.cpp:81
Initiator(Application &, MessageStoreFactory &, const SessionSettings &)
Definition: Initiator.cpp:36

◆ ~ThreadedSocketInitiator()

FIX::ThreadedSocketInitiator::~ThreadedSocketInitiator ( )
virtual

Definition at line 55 of file ThreadedSocketInitiator.cpp.

References FIX::socket_term().

56 {
57  socket_term();
58 }
void socket_term()
Definition: Utility.cpp:96

Member Function Documentation

◆ addThread()

void FIX::ThreadedSocketInitiator::addThread ( int  s,
thread_id  t 
)
private

Definition at line 181 of file ThreadedSocketInitiator.cpp.

References m_mutex, and m_threads.

Referenced by doConnect().

182 {
183  Locker l(m_mutex);
184 
185  m_threads[ s ] = t;
186 }

◆ doConnect()

void FIX::ThreadedSocketInitiator::doConnect ( const SessionID ,
const Dictionary  
)
privatevirtual

Implemented to connect a session to its target.

Implements FIX::Initiator.

Definition at line 133 of file ThreadedSocketInitiator.cpp.

References addThread(), FIX::IntConvertor::convert(), FIX::ThreadedSocketConnection::disconnect(), getHost(), FIX::Initiator::getLog(), FIX::Session::getLog(), FIX::Session::isSessionTime(), FIX::Session::lookupSession(), m_mutex, m_noDelay, m_rcvBufSize, m_sendBufSize, FIX::Log::onEvent(), FIX::Initiator::setDisconnected(), FIX::Initiator::setPending(), FIX::socket_createConnector(), FIX::socket_setsockopt(), socketThread(), and FIX::thread_spawn().

134 {
135  try
136  {
137  Session* session = Session::lookupSession( s );
138  if( !session->isSessionTime(UtcTimeStamp()) ) return;
139 
140  Log* log = session->getLog();
141 
142  std::string address;
143  short port = 0;
144  getHost( s, d, address, port );
145 
146  int socket = socket_createConnector();
147  if( m_noDelay )
148  socket_setsockopt( socket, TCP_NODELAY );
149  if( m_sendBufSize )
150  socket_setsockopt( socket, SO_SNDBUF, m_sendBufSize );
151  if( m_rcvBufSize )
152  socket_setsockopt( socket, SO_RCVBUF, m_rcvBufSize );
153 
154  setPending( s );
155  log->onEvent( "Connecting to " + address + " on port " + IntConvertor::convert((unsigned short)port) );
156 
157  ThreadedSocketConnection* pConnection =
158  new ThreadedSocketConnection( s, socket, address, port, getLog() );
159 
160  ThreadPair* pair = new ThreadPair( this, pConnection );
161 
162  {
163  Locker l( m_mutex );
164  thread_id thread;
165  if ( thread_spawn( &socketThread, pair, thread ) )
166  {
167  addThread( socket, thread );
168  }
169  else
170  {
171  delete pair;
172  pConnection->disconnect();
173  delete pConnection;
174  setDisconnected( s );
175  }
176  }
177  }
178  catch ( std::exception& ) {}
179 }
void getHost(const SessionID &, const Dictionary &, std::string &, short &)
static Session * lookupSession(const SessionID &)
Definition: Session.cpp:1403
std::pair< ThreadedSocketInitiator *, ThreadedSocketConnection *> ThreadPair
int socket_createConnector()
Definition: Utility.cpp:126
bool thread_spawn(THREAD_START_ROUTINE func, void *var, thread_id &thread)
Definition: Utility.cpp:394
static std::string convert(signed_int value)
static THREAD_PROC socketThread(void *p)
int socket_setsockopt(int s, int opt)
Definition: Utility.cpp:186
Log * getLog()
Definition: Initiator.h:90
pthread_t thread_id
Definition: Utility.h:164
void setPending(const SessionID &)
Definition: Initiator.cpp:145
void setDisconnected(const SessionID &)
Definition: Initiator.cpp:163

◆ getHost()

void FIX::ThreadedSocketInitiator::getHost ( const SessionID s,
const Dictionary d,
std::string &  address,
short &  port 
)
private

Definition at line 238 of file ThreadedSocketInitiator.cpp.

References FIX::Dictionary::getInt(), FIX::Dictionary::getString(), FIX::Dictionary::has(), m_sessionToHostNum, FIX::SOCKET_CONNECT_HOST, and FIX::SOCKET_CONNECT_PORT.

Referenced by doConnect(), and lock().

240 {
241  int num = 0;
242  SessionToHostNum::iterator i = m_sessionToHostNum.find( s );
243  if ( i != m_sessionToHostNum.end() ) num = i->second;
244 
245  std::stringstream hostStream;
246  hostStream << SOCKET_CONNECT_HOST << num;
247  std::string hostString = hostStream.str();
248 
249  std::stringstream portStream;
250  portStream << SOCKET_CONNECT_PORT << num;
251  std::string portString = portStream.str();
252 
253  if( d.has(hostString) && d.has(portString) )
254  {
255  address = d.getString( hostString );
256  port = ( short ) d.getInt( portString );
257  }
258  else
259  {
260  num = 0;
261  address = d.getString( SOCKET_CONNECT_HOST );
262  port = ( short ) d.getInt( SOCKET_CONNECT_PORT );
263  }
264 
265  m_sessionToHostNum[ s ] = ++num;
266 }
const char SOCKET_CONNECT_HOST[]
const char SOCKET_CONNECT_PORT[]

◆ lock()

void FIX::ThreadedSocketInitiator::lock ( )
inlineprivate

Definition at line 66 of file ThreadedSocketInitiator.h.

References getHost(), m_mutex, socketThread(), and THREAD_PROC.

◆ onConfigure()

void FIX::ThreadedSocketInitiator::onConfigure ( const SessionSettings )
throw (ConfigError
)
privatevirtual

Implemented to configure acceptor.

Reimplemented from FIX::Initiator.

Definition at line 60 of file ThreadedSocketInitiator.cpp.

References FIX::Dictionary::getBool(), FIX::Dictionary::getInt(), FIX::Dictionary::has(), m_noDelay, m_rcvBufSize, m_reconnectInterval, m_sendBufSize, FIX::RECONNECT_INTERVAL, FIX::SOCKET_NODELAY, FIX::SOCKET_RECEIVE_BUFFER_SIZE, and FIX::SOCKET_SEND_BUFFER_SIZE.

62 {
63  const Dictionary& dict = s.get();
64 
65  if( dict.has( RECONNECT_INTERVAL ) )
66  m_reconnectInterval = dict.getInt( RECONNECT_INTERVAL );
67  if( dict.has( SOCKET_NODELAY ) )
68  m_noDelay = dict.getBool( SOCKET_NODELAY );
69  if( dict.has( SOCKET_SEND_BUFFER_SIZE ) )
70  m_sendBufSize = dict.getInt( SOCKET_SEND_BUFFER_SIZE );
71  if( dict.has( SOCKET_RECEIVE_BUFFER_SIZE ) )
73 }
const char RECONNECT_INTERVAL[]
const char SOCKET_NODELAY[]
const char SOCKET_SEND_BUFFER_SIZE[]
const char SOCKET_RECEIVE_BUFFER_SIZE[]

◆ onInitialize()

void FIX::ThreadedSocketInitiator::onInitialize ( const SessionSettings )
throw (RuntimeError
)
privatevirtual

Implemented to initialize initiator.

Reimplemented from FIX::Initiator.

Definition at line 75 of file ThreadedSocketInitiator.cpp.

77 {
78 }

◆ onPoll()

bool FIX::ThreadedSocketInitiator::onPoll ( double  timeout)
privatevirtual

Implemented to connect and poll for events.

Implements FIX::Initiator.

Definition at line 98 of file ThreadedSocketInitiator.cpp.

99 {
100  return false;
101 }

◆ onStart()

void FIX::ThreadedSocketInitiator::onStart ( )
privatevirtual

Implemented to start connecting to targets.

Implements FIX::Initiator.

Definition at line 80 of file ThreadedSocketInitiator.cpp.

References FIX::Initiator::connect(), FIX::Initiator::isStopped(), m_lastConnect, m_mutex, m_reconnectInterval, and FIX::process_sleep().

81 {
82  while ( !isStopped() )
83  {
84  time_t now;
85  ::time( &now );
86 
87  if ( (now - m_lastConnect) >= m_reconnectInterval )
88  {
89  Locker l( m_mutex );
90  connect();
91  m_lastConnect = now;
92  }
93 
94  process_sleep( 1 );
95  }
96 }
void process_sleep(double s)
Definition: Utility.cpp:444
bool isStopped()
Definition: Initiator.h:83

◆ onStop()

void FIX::ThreadedSocketInitiator::onStop ( )
privatevirtual

Implemented to stop a running initiator.

Implements FIX::Initiator.

Definition at line 103 of file ThreadedSocketInitiator.cpp.

References FIX::Initiator::isLoggedOn(), m_mutex, m_threads, FIX::socket_close(), FIX::Initiator::start(), and FIX::thread_join().

104 {
105  SocketToThread threads;
106  SocketToThread::iterator i;
107 
108  {
109  Locker l(m_mutex);
110 
111  time_t start = 0;
112  time_t now = 0;
113 
114  ::time( &start );
115  while ( isLoggedOn() )
116  {
117  if( ::time(&now) -5 >= start )
118  break;
119  }
120 
121  threads = m_threads;
122  m_threads.clear();
123  }
124 
125  for ( i = threads.begin(); i != threads.end(); ++i )
126  socket_close( i->first );
127 
128  for ( i = threads.begin(); i != threads.end(); ++i )
129  thread_join( i->second );
130  threads.clear();
131 }
void thread_join(thread_id thread)
Definition: Utility.cpp:415
std::map< int, thread_id > SocketToThread
void start()
Start initiator.
Definition: Initiator.cpp:190
bool isLoggedOn()
Check to see if any sessions are currently logged on.
Definition: Initiator.cpp:269
void socket_close(int s)
Definition: Utility.cpp:158

◆ removeThread()

void FIX::ThreadedSocketInitiator::removeThread ( int  s)
private

Definition at line 188 of file ThreadedSocketInitiator.cpp.

References m_mutex, m_threads, and FIX::thread_detach().

189 {
190  Locker l(m_mutex);
191  SocketToThread::iterator i = m_threads.find( s );
192 
193  if ( i != m_threads.end() )
194  {
195  thread_detach( i->second );
196  m_threads.erase( i );
197  }
198 }
void thread_detach(thread_id thread)
Definition: Utility.cpp:425

◆ socketThread()

THREAD_PROC FIX::ThreadedSocketInitiator::socketThread ( void *  p)
staticprivate

Definition at line 200 of file ThreadedSocketInitiator.cpp.

References FIX::ThreadedSocketConnection::connect(), FIX::ThreadedSocketConnection::disconnect(), FIX::ThreadedSocketConnection::getSession(), FIX::Session::getSessionID(), FIX::ThreadedSocketConnection::getSocket(), FIX::Session::lookupSession(), FIX::Session::next(), and FIX::ThreadedSocketConnection::read().

Referenced by doConnect(), and lock().

201 {
202  ThreadPair * pair = reinterpret_cast < ThreadPair* > ( p );
203 
204  ThreadedSocketInitiator* pInitiator = pair->first;
205  ThreadedSocketConnection* pConnection = pair->second;
206  FIX::SessionID sessionID = pConnection->getSession()->getSessionID();
207  FIX::Session* pSession = FIX::Session::lookupSession( sessionID );
208  int socket = pConnection->getSocket();
209  delete pair;
210 
211  pInitiator->lock();
212 
213  if( !pConnection->connect() )
214  {
215  pInitiator->getLog()->onEvent( "Connection failed" );
216  pConnection->disconnect();
217  delete pConnection;
218  pInitiator->removeThread( socket );
219  pInitiator->setDisconnected( sessionID );
220  return 0;
221  }
222 
223  pInitiator->setConnected( sessionID );
224  pInitiator->getLog()->onEvent( "Connection succeeded" );
225 
226  pSession->next();
227 
228  while ( pConnection->read() ) {}
229 
230  delete pConnection;
231  if( !pInitiator->isStopped() )
232  pInitiator->removeThread( socket );
233 
234  pInitiator->setDisconnected( sessionID );
235  return 0;
236 }
static Session * lookupSession(const SessionID &)
Definition: Session.cpp:1403
Maintains the state and implements the logic of a FIX session.
Definition: Session.h:45
std::pair< ThreadedSocketInitiator *, ThreadedSocketConnection *> ThreadPair
void next()
Definition: Session.cpp:125
ThreadedSocketInitiator(Application &, MessageStoreFactory &, const SessionSettings &)
Unique session id consists of BeginString, SenderCompID and TargetCompID.
Definition: SessionID.h:30

Member Data Documentation

◆ m_lastConnect

time_t FIX::ThreadedSocketInitiator::m_lastConnect
private

Definition at line 73 of file ThreadedSocketInitiator.h.

Referenced by onStart().

◆ m_mutex

Mutex FIX::ThreadedSocketInitiator::m_mutex
private

Definition at line 79 of file ThreadedSocketInitiator.h.

Referenced by addThread(), doConnect(), lock(), onStart(), onStop(), and removeThread().

◆ m_noDelay

bool FIX::ThreadedSocketInitiator::m_noDelay
private

Definition at line 75 of file ThreadedSocketInitiator.h.

Referenced by doConnect(), and onConfigure().

◆ m_rcvBufSize

int FIX::ThreadedSocketInitiator::m_rcvBufSize
private

Definition at line 77 of file ThreadedSocketInitiator.h.

Referenced by doConnect(), and onConfigure().

◆ m_reconnectInterval

int FIX::ThreadedSocketInitiator::m_reconnectInterval
private

Definition at line 74 of file ThreadedSocketInitiator.h.

Referenced by onConfigure(), and onStart().

◆ m_sendBufSize

int FIX::ThreadedSocketInitiator::m_sendBufSize
private

Definition at line 76 of file ThreadedSocketInitiator.h.

Referenced by doConnect(), and onConfigure().

◆ m_sessionToHostNum

SessionToHostNum FIX::ThreadedSocketInitiator::m_sessionToHostNum
private

Definition at line 72 of file ThreadedSocketInitiator.h.

Referenced by getHost().

◆ m_settings

SessionSettings FIX::ThreadedSocketInitiator::m_settings
private

Definition at line 71 of file ThreadedSocketInitiator.h.

◆ m_threads

SocketToThread FIX::ThreadedSocketInitiator::m_threads
private

Definition at line 78 of file ThreadedSocketInitiator.h.

Referenced by addThread(), onStop(), and removeThread().


The documentation for this class was generated from the following files:

Generated on Thu Sep 5 2019 11:07:58 for QuickFIX by doxygen 1.8.13 written by Dimitri van Heesch, © 1997-2001