Public Types | Public Member Functions | Private Member Functions | Private Attributes | List of all members
FIX::ThreadedSocketConnection Class Reference

Encapsulates a socket file descriptor (multi-threaded). More...

#include <ThreadedSocketConnection.h>

Inheritance diagram for FIX::ThreadedSocketConnection:
Inheritance graph
[legend]
Collaboration diagram for FIX::ThreadedSocketConnection:
Collaboration graph
[legend]

Public Types

typedef std::set< SessionIDSessions
 

Public Member Functions

 ThreadedSocketConnection (int s, Sessions sessions, Log *pLog)
 
 ThreadedSocketConnection (const SessionID &, int s, const std::string &address, short port, Log *pLog)
 
virtual ~ThreadedSocketConnection ()
 
SessiongetSession () const
 
int getSocket () const
 
bool connect ()
 
void disconnect ()
 
bool read ()
 

Private Member Functions

bool readMessage (std::string &msg) throw ( SocketRecvFailed )
 
void processStream ()
 
bool send (const std::string &)
 
bool setSession (const std::string &msg)
 
- Private Member Functions inherited from FIX::Responder
virtual ~Responder ()
 

Private Attributes

int m_socket
 
char m_buffer [BUFSIZ]
 
std::string m_address
 
int m_port
 
Logm_pLog
 
Parser m_parser
 
Sessions m_sessions
 
Sessionm_pSession
 
bool m_disconnect
 
fd_set m_fds
 

Detailed Description

Encapsulates a socket file descriptor (multi-threaded).

Definition at line 44 of file ThreadedSocketConnection.h.

Member Typedef Documentation

◆ Sessions

Definition at line 47 of file ThreadedSocketConnection.h.

Constructor & Destructor Documentation

◆ ThreadedSocketConnection() [1/2]

FIX::ThreadedSocketConnection::ThreadedSocketConnection ( int  s,
Sessions  sessions,
Log pLog 
)

◆ ThreadedSocketConnection() [2/2]

FIX::ThreadedSocketConnection::ThreadedSocketConnection ( const SessionID sessionID,
int  s,
const std::string &  address,
short  port,
Log pLog 
)

Definition at line 45 of file ThreadedSocketConnection.cpp.

48  : m_socket( s ), m_address( address ), m_port( port ),
49  m_pLog( pLog ),
50  m_pSession( Session::lookupSession( sessionID ) ),
51  m_disconnect( false )
52 {
53  FD_ZERO( &m_fds );
54  FD_SET( m_socket, &m_fds );
55  if ( m_pSession ) m_pSession->setResponder( this );
56 }
static Session * lookupSession(const SessionID &)
Definition: Session.cpp:1403
void setResponder(Responder *pR)
Definition: Session.h:197

◆ ~ThreadedSocketConnection()

FIX::ThreadedSocketConnection::~ThreadedSocketConnection ( )
virtual

Definition at line 58 of file ThreadedSocketConnection.cpp.

References FIX::Session::getSessionID(), m_pSession, FIX::Session::setResponder(), and FIX::Session::unregisterSession().

59 {
60  if ( m_pSession )
61  {
64  }
65 }
static void unregisterSession(const SessionID &)
Definition: Session.cpp:1454
const SessionID & getSessionID() const
Definition: Session.h:75
void setResponder(Responder *pR)
Definition: Session.h:197

Member Function Documentation

◆ connect()

bool FIX::ThreadedSocketConnection::connect ( )

Definition at line 80 of file ThreadedSocketConnection.cpp.

References getSocket(), m_address, m_port, and FIX::socket_connect().

Referenced by getSocket(), and FIX::ThreadedSocketInitiator::socketThread().

81 {
82  return socket_connect(getSocket(), m_address.c_str(), m_port) >= 0;
83 }
int socket_connect(int socket, const char *address, int port)
Definition: Utility.cpp:131

◆ disconnect()

void FIX::ThreadedSocketConnection::disconnect ( )
virtual

◆ getSession()

Session* FIX::ThreadedSocketConnection::getSession ( ) const
inline

Definition at line 55 of file ThreadedSocketConnection.h.

References m_pSession.

Referenced by FIX::ThreadedSocketInitiator::socketThread().

◆ getSocket()

int FIX::ThreadedSocketConnection::getSocket ( ) const
inline

◆ processStream()

void FIX::ThreadedSocketConnection::processStream ( )
private

Definition at line 150 of file ThreadedSocketConnection.cpp.

References disconnect(), FIX::Session::isLoggedOn(), m_pSession, FIX::Session::next(), readMessage(), setSession(), and FIX::TYPE::UtcTimeStamp.

Referenced by getSocket(), and read().

151 {
152  std::string msg;
153  while( readMessage(msg) )
154  {
155  if ( !m_pSession )
156  {
157  if ( !setSession( msg ) )
158  { disconnect(); continue; }
159  }
160  try
161  {
162  m_pSession->next( msg, UtcTimeStamp() );
163  }
164  catch( InvalidMessage& )
165  {
166  if( !m_pSession->isLoggedOn() )
167  {
168  disconnect();
169  return;
170  }
171  }
172  }
173 }
void next()
Definition: Session.cpp:125
bool setSession(const std::string &msg)
bool isLoggedOn()
Definition: Session.h:65

◆ read()

bool FIX::ThreadedSocketConnection::read ( )

Definition at line 91 of file ThreadedSocketConnection.cpp.

References FIX::Parser::addToStream(), disconnect(), FIX::Session::disconnect(), FIX::Session::getLog(), m_buffer, m_disconnect, m_fds, m_parser, m_pSession, m_socket, FIX::Session::next(), FIX::Log::onEvent(), and processStream().

Referenced by getSocket(), FIX::ThreadedSocketAcceptor::socketConnectionThread(), and FIX::ThreadedSocketInitiator::socketThread().

92 {
93  struct timeval timeout = { 1, 0 };
94  fd_set readset = m_fds;
95 
96  try
97  {
98  // Wait for input (1 second timeout)
99  int result = select( 1 + m_socket, &readset, 0, 0, &timeout );
100 
101  if( result > 0 ) // Something to read
102  {
103  // We can read without blocking
104  ssize_t size = recv( m_socket, m_buffer, sizeof(m_buffer), 0 );
105  if ( size <= 0 ) { throw SocketRecvFailed( size ); }
106  m_parser.addToStream( m_buffer, size );
107  }
108  else if( result == 0 && m_pSession ) // Timeout
109  {
110  m_pSession->next();
111  }
112  else if( result < 0 ) // Error
113  {
114  throw SocketRecvFailed( result );
115  }
116 
117  processStream();
118  return true;
119  }
120  catch ( SocketRecvFailed& e )
121  {
122  if( m_disconnect )
123  return false;
124 
125  if( m_pSession )
126  {
127  m_pSession->getLog()->onEvent( e.what() );
129  }
130  else
131  {
132  disconnect();
133  }
134 
135  return false;
136  }
137 }
void next()
Definition: Session.cpp:125
virtual void onEvent(const std::string &)=0
Log * getLog()
Definition: Session.h:214
void disconnect()
Definition: Session.cpp:542
void addToStream(const char *str, size_t len)
Definition: Parser.h:48

◆ readMessage()

bool FIX::ThreadedSocketConnection::readMessage ( std::string &  msg)
throw (SocketRecvFailed
)
private

Definition at line 139 of file ThreadedSocketConnection.cpp.

References m_parser, and FIX::Parser::readFixMessage().

Referenced by getSocket(), and processStream().

141 {
142  try
143  {
144  return m_parser.readFixMessage( msg );
145  }
146  catch ( MessageParseError& ) {}
147  return true;
148 }
bool readFixMessage(std::string &str)
Definition: Parser.cpp:59

◆ send()

bool FIX::ThreadedSocketConnection::send ( const std::string &  msg)
privatevirtual

Implements FIX::Responder.

Definition at line 67 of file ThreadedSocketConnection.cpp.

References m_socket, and FIX::socket_send().

Referenced by getSocket().

68 {
69  int totalSent = 0;
70  while(totalSent < (int)msg.length())
71  {
72  ssize_t sent = socket_send( m_socket, msg.c_str() + totalSent, msg.length() );
73  if(sent < 0) return false;
74  totalSent += sent;
75  }
76 
77  return true;
78 }
ssize_t socket_send(int s, const char *msg, size_t length)
Definition: Utility.cpp:153

◆ setSession()

bool FIX::ThreadedSocketConnection::setSession ( const std::string &  msg)
private

Definition at line 175 of file ThreadedSocketConnection.cpp.

References FIX::Session::getSessionID(), FIX::Session::isSessionRegistered(), FIX::Session::lookupSession(), m_pLog, m_pSession, m_sessions, FIX::Log::onEvent(), FIX::Log::onIncoming(), FIX::process_sleep(), FIX::Session::registerSession(), and FIX::Session::setResponder().

Referenced by getSocket(), and processStream().

176 {
177  m_pSession = Session::lookupSession( msg, true );
178  if ( !m_pSession )
179  {
180  if( m_pLog )
181  {
182  m_pLog->onEvent( "Session not found for incoming message: " + msg );
183  m_pLog->onIncoming( msg );
184  }
185  return false;
186  }
187 
188  SessionID sessionID = m_pSession->getSessionID();
189  m_pSession = 0;
190 
191  // see if the session frees up within 5 seconds
192  for( int i = 1; i <= 5; i++ )
193  {
194  if( !Session::isSessionRegistered( sessionID ) )
195  m_pSession = Session::registerSession( sessionID );
196  if( m_pSession ) break;
197  process_sleep( 1 );
198  }
199 
200  if ( !m_pSession )
201  return false;
202  if ( m_sessions.find(m_pSession->getSessionID()) == m_sessions.end() )
203  return false;
204 
205  m_pSession->setResponder( this );
206  return true;
207 }
static Session * lookupSession(const SessionID &)
Definition: Session.cpp:1403
void process_sleep(double s)
Definition: Utility.cpp:444
static bool isSessionRegistered(const SessionID &)
Definition: Session.cpp:1438
static Session * registerSession(const SessionID &)
Definition: Session.cpp:1444
virtual void onEvent(const std::string &)=0
const SessionID & getSessionID() const
Definition: Session.h:75
virtual void onIncoming(const std::string &)=0
void setResponder(Responder *pR)
Definition: Session.h:197

Member Data Documentation

◆ m_address

std::string FIX::ThreadedSocketConnection::m_address
private

Definition at line 70 of file ThreadedSocketConnection.h.

Referenced by connect().

◆ m_buffer

char FIX::ThreadedSocketConnection::m_buffer[BUFSIZ]
private

Definition at line 68 of file ThreadedSocketConnection.h.

Referenced by read().

◆ m_disconnect

bool FIX::ThreadedSocketConnection::m_disconnect
private

Definition at line 77 of file ThreadedSocketConnection.h.

Referenced by disconnect(), and read().

◆ m_fds

fd_set FIX::ThreadedSocketConnection::m_fds
private

Definition at line 78 of file ThreadedSocketConnection.h.

Referenced by read().

◆ m_parser

Parser FIX::ThreadedSocketConnection::m_parser
private

Definition at line 74 of file ThreadedSocketConnection.h.

Referenced by read(), and readMessage().

◆ m_pLog

Log* FIX::ThreadedSocketConnection::m_pLog
private

Definition at line 73 of file ThreadedSocketConnection.h.

Referenced by setSession().

◆ m_port

int FIX::ThreadedSocketConnection::m_port
private

Definition at line 71 of file ThreadedSocketConnection.h.

Referenced by connect().

◆ m_pSession

Session* FIX::ThreadedSocketConnection::m_pSession
private

◆ m_sessions

Sessions FIX::ThreadedSocketConnection::m_sessions
private

Definition at line 75 of file ThreadedSocketConnection.h.

Referenced by setSession().

◆ m_socket

int FIX::ThreadedSocketConnection::m_socket
private

Definition at line 67 of file ThreadedSocketConnection.h.

Referenced by disconnect(), getSocket(), read(), and send().


The documentation for this class was generated from the following files:

Generated on Thu Sep 5 2019 11:07:58 for QuickFIX by doxygen 1.8.13 written by Dimitri van Heesch, © 1997-2001