Index  Source Files  Annotated Class List  Alphabetical Class List  Class Hierarchy  Graphical Class Hierarchy 

MySQLStore.cpp

Go to the documentation of this file.
00001 /****************************************************************************
00002 ** Copyright (c) quickfixengine.org  All rights reserved.
00003 **
00004 ** This file is part of the QuickFIX FIX Engine
00005 **
00006 ** This file may be distributed under the terms of the quickfixengine.org
00007 ** license as defined by quickfixengine.org and appearing in the file
00008 ** LICENSE included in the packaging of this file.
00009 **
00010 ** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
00011 ** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
00012 **
00013 ** See http://www.quickfixengine.org/LICENSE for licensing information.
00014 **
00015 ** Contact ask@quickfixengine.org if any conditions of this licensing are
00016 ** not clear to you.
00017 **
00018 ****************************************************************************/
00019 
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026 
00027 #ifdef HAVE_MYSQL
00028 
00029 #include "MySQLStore.h"
00030 #include "SessionID.h"
00031 #include "SessionSettings.h"
00032 #include "FieldConvertors.h"
00033 #include "Parser.h"
00034 #include "Utility.h"
00035 #include "strptime.h"
00036 #include <fstream>
00037 
00038 namespace FIX
00039 {
00040 
00041 const std::string MySQLStoreFactory::DEFAULT_DATABASE = "quickfix";
00042 const std::string MySQLStoreFactory::DEFAULT_USER = "";
00043 const std::string MySQLStoreFactory::DEFAULT_PASSWORD = "";
00044 const std::string MySQLStoreFactory::DEFAULT_HOST = "localhost";
00045 const short MySQLStoreFactory::DEFAULT_PORT = 3306;
00046 
00047 MySQLStore::MySQLStore
00048 ( const SessionID& s, const DatabaseConnectionID& d, MySQLConnectionPool* p )
00049   : m_pConnectionPool( p ), m_sessionID( s )
00050 {
00051   m_pConnection = m_pConnectionPool->create( d );
00052   populateCache();
00053 }
00054 
00055 MySQLStore::MySQLStore
00056 ( const SessionID& s, const std::string& database, const std::string& user,
00057   const std::string& password, const std::string& host, short port )
00058   : m_pConnectionPool( 0 ), m_sessionID( s )
00059 {
00060   m_pConnection = new MySQLConnection( database, user, password, host, port );
00061   populateCache();
00062 }
00063 
00064 MySQLStore::~MySQLStore()
00065 {
00066   if( m_pConnectionPool )
00067     m_pConnectionPool->destroy( m_pConnection );
00068   else
00069     delete m_pConnection;
00070 }
00071 
00072 void MySQLStore::populateCache()
00073 { QF_STACK_PUSH(MySQLStore::populateCache)
00074 
00075   std::stringstream queryString;
00076 
00077   queryString << "SELECT creation_time, incoming_seqnum, outgoing_seqnum FROM sessions WHERE "
00078   << "beginstring=" << "\"" << m_sessionID.getBeginString().getValue() << "\" and "
00079   << "sendercompid=" << "\"" << m_sessionID.getSenderCompID().getValue() << "\" and "
00080   << "targetcompid=" << "\"" << m_sessionID.getTargetCompID().getValue() << "\" and "
00081   << "session_qualifier=" << "\"" << m_sessionID.getSessionQualifier() << "\"";
00082 
00083   MySQLQuery query( queryString.str() );
00084   if( !m_pConnection->execute(query) )
00085     throw ConfigError( "No entries found for session in database" );
00086 
00087   int rows = query.rows();
00088   if( rows > 1 )
00089     throw ConfigError( "Multiple entries found for session in database" );
00090 
00091   if( rows == 1 )
00092   {
00093     struct tm time;
00094     std::string sqlTime = query.getValue( 0, 0 );
00095     strptime( sqlTime.c_str(), "%Y-%m-%d %H:%M:%S", &time );
00096     m_cache.setCreationTime (UtcTimeStamp (&time));
00097     m_cache.setNextTargetMsgSeqNum( atol( query.getValue( 0, 1 ) ) );
00098     m_cache.setNextSenderMsgSeqNum( atol( query.getValue( 0, 2 ) ) );
00099   }
00100   else
00101   {
00102     UtcTimeStamp time = m_cache.getCreationTime();
00103     char sqlTime[ 20 ];
00104     int year, month, day, hour, minute, second, millis;
00105     time.getYMD (year, month, day);
00106     time.getHMS (hour, minute, second, millis);
00107     STRING_SPRINTF( sqlTime, "%d-%02d-%02d %02d:%02d:%02d",
00108              year, month, day, hour, minute, second );
00109     std::stringstream queryString2;
00110     queryString2 << "INSERT INTO sessions (beginstring, sendercompid, targetcompid, session_qualifier,"
00111     << "creation_time, incoming_seqnum, outgoing_seqnum) VALUES("
00112     << "\"" << m_sessionID.getBeginString().getValue() << "\","
00113     << "\"" << m_sessionID.getSenderCompID().getValue() << "\","
00114     << "\"" << m_sessionID.getTargetCompID().getValue() << "\","
00115     << "\"" << m_sessionID.getSessionQualifier() << "\","
00116     << "'" << sqlTime << "',"
00117     << m_cache.getNextTargetMsgSeqNum() << ","
00118     << m_cache.getNextSenderMsgSeqNum() << ")";
00119 
00120     MySQLQuery query2( queryString2.str() );
00121     if( !m_pConnection->execute(query2) )
00122       throw ConfigError( "Unable to create session in database" );
00123   }
00124 
00125   QF_STACK_POP
00126 }
00127 
00128 MessageStore* MySQLStoreFactory::create( const SessionID& s )
00129 { QF_STACK_PUSH(MySQLStoreFactory::create)
00130 
00131   if( m_useSettings )
00132     return create( s, m_settings.get(s) );
00133   else if( m_useDictionary )
00134     return create( s, m_dictionary );
00135   else
00136   {
00137     DatabaseConnectionID id( m_database, m_user, m_password, m_host, m_port );
00138     return new MySQLStore( s, id, m_connectionPoolPtr.get() );
00139   }
00140 
00141   QF_STACK_POP
00142 }
00143 
00144 MessageStore* MySQLStoreFactory::create( const SessionID& s, const Dictionary& settings )
00145 { QF_STACK_PUSH(MySQLStoreFactory::create)
00146 
00147   std::string database = DEFAULT_DATABASE;
00148   std::string user = DEFAULT_USER;
00149   std::string password = DEFAULT_PASSWORD;
00150   std::string host = DEFAULT_HOST;
00151   short port = DEFAULT_PORT;
00152 
00153   try { database = settings.getString( MYSQL_STORE_DATABASE ); }
00154   catch( ConfigError& ) {}
00155 
00156   try { user = settings.getString( MYSQL_STORE_USER ); }
00157   catch( ConfigError& ) {}
00158 
00159   try { password = settings.getString( MYSQL_STORE_PASSWORD ); }
00160   catch( ConfigError& ) {}
00161 
00162   try { host = settings.getString( MYSQL_STORE_HOST ); }
00163   catch( ConfigError& ) {}
00164 
00165   try { port = ( short ) settings.getLong( MYSQL_STORE_PORT ); }
00166   catch( ConfigError& ) {}
00167 
00168   DatabaseConnectionID id( database, user, password, host, port );
00169   return new MySQLStore( s, id, m_connectionPoolPtr.get() );
00170 
00171   QF_STACK_POP
00172 }
00173 
00174 void MySQLStoreFactory::destroy( MessageStore* pStore )
00175 { QF_STACK_PUSH(MySQLStoreFactory::destroy)
00176   delete pStore;
00177   QF_STACK_POP
00178 }
00179 
00180 bool MySQLStore::set( int msgSeqNum, const std::string& msg )
00181 throw ( IOException )
00182 { QF_STACK_PUSH(MySQLStore::set)
00183 
00184   char* msgCopy = new char[ (msg.size() * 2) + 1 ];
00185   mysql_escape_string( msgCopy, msg.c_str(), msg.size() );
00186 
00187   std::stringstream queryString;
00188   queryString << "INSERT INTO messages "
00189   << "(beginstring, sendercompid, targetcompid, session_qualifier, msgseqnum, message) "
00190   << "VALUES ("
00191   << "\"" << m_sessionID.getBeginString().getValue() << "\","
00192   << "\"" << m_sessionID.getSenderCompID().getValue() << "\","
00193   << "\"" << m_sessionID.getTargetCompID().getValue() << "\","
00194   << "\"" << m_sessionID.getSessionQualifier() << "\","
00195   << msgSeqNum << ","
00196   << "\"" << msgCopy << "\")";
00197 
00198   delete [] msgCopy;
00199 
00200   MySQLQuery query( queryString.str() );
00201   if( !m_pConnection->execute(query) )
00202   {
00203     std::stringstream queryString2;
00204     queryString2 << "UPDATE messages SET message=\"" << msg << "\" WHERE "
00205     << "beginstring=" << "\"" << m_sessionID.getBeginString().getValue() << "\" and "
00206     << "sendercompid=" << "\"" << m_sessionID.getSenderCompID().getValue() << "\" and "
00207     << "targetcompid=" << "\"" << m_sessionID.getTargetCompID().getValue() << "\" and "
00208     << "session_qualifier=" << "\"" << m_sessionID.getSessionQualifier() << "\" and "
00209     << "msgseqnum=" << msgSeqNum;
00210     MySQLQuery query2( queryString2.str() );
00211     if( !m_pConnection->execute(query2) )
00212       query2.throwException();
00213   }
00214   return true;
00215 
00216   QF_STACK_POP
00217 }
00218 
00219 void MySQLStore::get( int begin, int end,
00220                       std::vector < std::string > & result ) const
00221 throw ( IOException )
00222 { QF_STACK_PUSH(MySQLStore::get)
00223 
00224   result.clear();
00225   std::stringstream queryString;
00226   queryString << "SELECT message FROM messages WHERE "
00227   << "beginstring=" << "\"" << m_sessionID.getBeginString().getValue() << "\" and "
00228   << "sendercompid=" << "\"" << m_sessionID.getSenderCompID().getValue() << "\" and "
00229   << "targetcompid=" << "\"" << m_sessionID.getTargetCompID().getValue() << "\" and "
00230   << "session_qualifier=" << "\"" << m_sessionID.getSessionQualifier() << "\" and "
00231   << "msgseqnum>=" << begin << " and " << "msgseqnum<=" << end << " "
00232   << "ORDER BY msgseqnum";
00233 
00234   MySQLQuery query( queryString.str() );
00235   if( !m_pConnection->execute(query) )
00236     query.throwException();
00237 
00238   int rows = query.rows();
00239   for( int row = 0; row < rows; row++ )
00240     result.push_back( query.getValue( row, 0 ) );
00241 
00242   QF_STACK_POP
00243 }
00244 
00245 int MySQLStore::getNextSenderMsgSeqNum() const throw ( IOException )
00246 { QF_STACK_PUSH(MySQLStore::getNextSenderMsgSeqNum)
00247   return m_cache.getNextSenderMsgSeqNum();
00248   QF_STACK_POP
00249 }
00250 
00251 int MySQLStore::getNextTargetMsgSeqNum() const throw ( IOException )
00252 { QF_STACK_PUSH(MySQLStore::getNextTargetMsgSeqNum)
00253   return m_cache.getNextTargetMsgSeqNum();
00254   QF_STACK_POP
00255 }
00256 
00257 void MySQLStore::setNextSenderMsgSeqNum( int value ) throw ( IOException )
00258 { QF_STACK_PUSH(MySQLStore::setNextSenderMsgSeqNum)
00259 
00260   std::stringstream queryString;
00261   queryString << "UPDATE sessions SET outgoing_seqnum=" << value << " WHERE "
00262   << "beginstring=" << "\"" << m_sessionID.getBeginString().getValue() << "\" and "
00263   << "sendercompid=" << "\"" << m_sessionID.getSenderCompID().getValue() << "\" and "
00264   << "targetcompid=" << "\"" << m_sessionID.getTargetCompID().getValue() << "\" and "
00265   << "session_qualifier=" << "\"" << m_sessionID.getSessionQualifier() << "\"";
00266   MySQLQuery query( queryString.str() );
00267   if( !m_pConnection->execute(query) )
00268     query.throwException();
00269   m_cache.setNextSenderMsgSeqNum( value );
00270 
00271   QF_STACK_POP
00272 }
00273 
00274 void MySQLStore::setNextTargetMsgSeqNum( int value ) throw ( IOException )
00275 { QF_STACK_PUSH(MySQLStore::setNextTargetMsgSeqNum)
00276 
00277   std::stringstream queryString;
00278   queryString << "UPDATE sessions SET incoming_seqnum=" << value << " WHERE "
00279   << "beginstring=" << "\"" << m_sessionID.getBeginString().getValue() << "\" and "
00280   << "sendercompid=" << "\"" << m_sessionID.getSenderCompID().getValue() << "\" and "
00281   << "targetcompid=" << "\"" << m_sessionID.getTargetCompID().getValue() << "\" and "
00282   << "session_qualifier=" << "\"" << m_sessionID.getSessionQualifier() << "\"";
00283 
00284   MySQLQuery query( queryString.str() );
00285   if( !m_pConnection->execute(query) )
00286     query.throwException();
00287 
00288   m_cache.setNextTargetMsgSeqNum( value );
00289 
00290   QF_STACK_POP
00291 }
00292 
00293 void MySQLStore::incrNextSenderMsgSeqNum() throw ( IOException )
00294 { QF_STACK_PUSH(MySQLStore::incrNextSenderMsgSeqNum)
00295   m_cache.incrNextSenderMsgSeqNum();
00296   setNextSenderMsgSeqNum( m_cache.getNextSenderMsgSeqNum() );
00297   QF_STACK_POP
00298 }
00299 
00300 void MySQLStore::incrNextTargetMsgSeqNum() throw ( IOException )
00301 { QF_STACK_PUSH(MySQLStore::incrNextTargetMsgSeqNum)
00302   m_cache.incrNextTargetMsgSeqNum();
00303   setNextTargetMsgSeqNum( m_cache.getNextTargetMsgSeqNum() );
00304   QF_STACK_POP
00305 }
00306 
00307 UtcTimeStamp MySQLStore::getCreationTime() const throw ( IOException )
00308 { QF_STACK_PUSH(MySQLStore::getCreationTime)
00309   return m_cache.getCreationTime();
00310   QF_STACK_POP
00311 }
00312 
00313 void MySQLStore::reset() throw ( IOException )
00314 { QF_STACK_PUSH(MySQLStore::reset)
00315 
00316   std::stringstream queryString;
00317   queryString << "DELETE FROM messages WHERE "
00318   << "beginstring=" << "\"" << m_sessionID.getBeginString().getValue() << "\" and "
00319   << "sendercompid=" << "\"" << m_sessionID.getSenderCompID().getValue() << "\" and "
00320   << "targetcompid=" << "\"" << m_sessionID.getTargetCompID().getValue() << "\" and "
00321   << "session_qualifier=" << "\"" << m_sessionID.getSessionQualifier() << "\"";
00322 
00323   MySQLQuery query( queryString.str() );
00324   if( !m_pConnection->execute(query) )
00325     query.throwException();
00326 
00327   m_cache.reset();
00328   UtcTimeStamp time = m_cache.getCreationTime();
00329 
00330   int year, month, day, hour, minute, second, millis;
00331   time.getYMD( year, month, day );
00332   time.getHMS( hour, minute, second, millis );
00333 
00334   char sqlTime[ 20 ];
00335   STRING_SPRINTF( sqlTime, "%d-%02d-%02d %02d:%02d:%02d",
00336            year, month, day, hour, minute, second );
00337 
00338   std::stringstream queryString2;
00339   queryString2 << "UPDATE sessions SET creation_time='" << sqlTime << "', "
00340   << "incoming_seqnum=" << m_cache.getNextTargetMsgSeqNum() << ", "
00341   << "outgoing_seqnum=" << m_cache.getNextSenderMsgSeqNum() << " WHERE "
00342   << "beginstring=" << "\"" << m_sessionID.getBeginString().getValue() << "\" and "
00343   << "sendercompid=" << "\"" << m_sessionID.getSenderCompID().getValue() << "\" and "
00344   << "targetcompid=" << "\"" << m_sessionID.getTargetCompID().getValue() << "\" and "
00345   << "session_qualifier=" << "\"" << m_sessionID.getSessionQualifier() << "\"";
00346 
00347   MySQLQuery query2( queryString2.str() );
00348   if( !m_pConnection->execute(query2) )
00349     query2.throwException();
00350 
00351   QF_STACK_POP
00352 }
00353 
00354 void MySQLStore::refresh() throw ( IOException )
00355 { QF_STACK_PUSH(MySQLStore::refresh)
00356 
00357   m_cache.reset();
00358   populateCache(); 
00359 
00360   QF_STACK_POP
00361 }
00362 
00363 }
00364 
00365 #endif

Generated on Mon Apr 5 20:59:50 2010 for QuickFIX by doxygen 1.6.1 written by Dimitri van Heesch, © 1997-2001