22 #include <plugin/slave/queue_consumer.h>
23 #include <drizzled/errmsg_print.h>
24 #include <drizzled/execute.h>
25 #include <drizzled/message/transaction.pb.h>
27 #include <drizzled/sql/result_set.h>
30 #include <boost/thread.hpp>
31 #include <boost/lexical_cast.hpp>
32 #include <google/protobuf/text_format.h>
40 bool QueueConsumer::init()
42 setApplierState(
"",
true);
47 void QueueConsumer::shutdown()
49 setApplierState(getErrorMessage(),
false);
53 bool QueueConsumer::process()
55 for (
size_t index= 0; index < _master_ids.size(); index++)
60 const string master_id= boost::lexical_cast<
string>(_master_ids[index]);
62 if (not processSingleMaster(master_id))
69 bool QueueConsumer::processSingleMaster(
const string &master_id)
71 TrxIdList completedTransactionIds;
73 getListOfCompletedTransactions(master_id, completedTransactionIds);
75 for (
size_t x= 0; x < completedTransactionIds.size(); x++)
78 string originating_server_uuid;
79 uint64_t originating_commit_id= 0;
80 uint64_t trx_id= completedTransactionIds[x];
82 vector<string> aggregate_sql;
83 vector<string> segmented_sql;
86 uint32_t segment_id= 1;
88 while (getMessage(transaction, commit_id, master_id, trx_id, originating_server_uuid,
89 originating_commit_id, segment_id++))
91 convertToSQL(transaction, aggregate_sql, segmented_sql);
100 assert((not commit_id.empty()) && (commit_id !=
"0"));
101 assert(segmented_sql.empty());
103 if (not aggregate_sql.empty())
108 vector<string>::iterator agg_iter;
109 for (agg_iter= aggregate_sql.begin(); agg_iter != aggregate_sql.end(); ++agg_iter)
111 string &sql= *agg_iter;
112 string::iterator si= sql.begin();
113 for (; si != sql.end(); ++si)
117 si= sql.insert(si,
'\\');
120 else if (*si ==
'\\')
122 si= sql.insert(si,
'\\');
124 si= sql.insert(si,
'\\');
126 si= sql.insert(si,
'\\');
131 si= sql.insert(si,
'\\');
138 if (not executeSQLWithCommitId(aggregate_sql, commit_id,
139 originating_server_uuid,
140 originating_commit_id,
149 string tmp(
"UPDATE `sys_replication`.`applier_state`"
150 " SET `last_applied_commit_id` = ");
151 tmp.append(commit_id);
152 tmp.append(
" WHERE `master_id` = ");
153 tmp.append(master_id);
163 if (not deleteFromQueue(master_id, trx_id))
175 const string &master_id,
177 string &originating_server_uuid,
178 uint64_t &originating_commit_id,
181 string sql(
"SELECT `msg`, `commit_order`, `originating_server_uuid`, "
182 "`originating_commit_id` FROM `sys_replication`.`queue`"
183 " WHERE `trx_id` = ");
184 sql.append(boost::lexical_cast<string>(trx_id));
185 sql.append(
" AND `seg_id` = ", 16);
186 sql.append(boost::lexical_cast<string>(segment_id));
187 sql.append(
" AND `master_id` = ", 19),
188 sql.append(master_id);
191 Execute execute(*(_session.get()),
true);
193 execute.run(sql, result_set);
195 assert(result_set.getMetaData().getColumnCount() == 4);
198 uint32_t found_rows= 0;
199 while (result_set.next())
201 string msg= result_set.getString(0);
202 string com_id= result_set.getString(1);
203 string orig_server_uuid= result_set.getString(2);
204 string orig_commit_id= result_set.getString(3);
206 if ((msg ==
"") || (found_rows == 1))
210 assert(result_set.isNull(0) ==
false);
211 assert(result_set.isNull(1) ==
false);
212 assert(result_set.isNull(2) ==
false);
213 assert(result_set.isNull(3) ==
false);
216 google::protobuf::TextFormat::ParseFromString(msg, &transaction);
219 originating_server_uuid= orig_server_uuid;
220 originating_commit_id= boost::lexical_cast<uint64_t>(orig_commit_id);
230 bool QueueConsumer::getListOfCompletedTransactions(
const string &master_id,
233 Execute execute(*(_session.get()),
true);
235 string sql(
"SELECT `trx_id` FROM `sys_replication`.`queue`"
236 " WHERE `commit_order` IS NOT NULL AND `commit_order` > 0"
237 " AND `master_id` = "
239 +
" ORDER BY `commit_order` ASC");
244 execute.run(sql, result_set);
246 assert(result_set.getMetaData().getColumnCount() == 1);
248 while (result_set.next())
250 assert(result_set.isNull(0) ==
false);
251 string value= result_set.getString(0);
256 list.push_back(boost::lexical_cast<uint64_t>(result_set.getString(0)));
265 vector<string> &aggregate_sql,
266 vector<string> &segmented_sql)
268 if (transaction.has_event())
271 size_t num_statements= transaction.statement_size();
280 for (
size_t idx= 0; idx < num_statements; idx++)
285 if (statement.type() == message::Statement::ROLLBACK)
287 assert(idx == (num_statements - 1));
288 aggregate_sql.clear();
289 segmented_sql.clear();
293 switch (statement.type())
296 case message::Statement::TRUNCATE_TABLE:
297 case message::Statement::CREATE_SCHEMA:
298 case message::Statement::ALTER_SCHEMA:
299 case message::Statement::DROP_SCHEMA:
300 case message::Statement::CREATE_TABLE:
301 case message::Statement::ALTER_TABLE:
302 case message::Statement::DROP_TABLE:
303 case message::Statement::RAW_SQL:
305 segmented_sql.push_back(
"COMMIT");
310 case message::Statement::ROLLBACK_STATEMENT:
312 segmented_sql.clear();
322 if (message::transformStatementToSql(statement, segmented_sql,
323 message::DRIZZLE,
true))
328 if (isEndStatement(statement))
330 aggregate_sql.insert(aggregate_sql.end(),
331 segmented_sql.begin(),
332 segmented_sql.end());
333 segmented_sql.clear();
343 switch (statement.type())
345 case (message::Statement::INSERT):
348 if (not data.end_segment())
352 case (message::Statement::UPDATE):
355 if (not data.end_segment())
359 case (message::Statement::DELETE):
362 if (not data.end_segment())
380 void QueueConsumer::setApplierState(
const string &err_msg,
bool status)
382 vector<string> statements;
388 sql=
"UPDATE `sys_replication`.`applier_state` SET `status` = 'STOPPED'";
392 sql=
"UPDATE `sys_replication`.`applier_state` SET `status` = 'RUNNING'";
395 sql.append(
", `error_msg` = '", 17);
399 for (it= msg.begin(); it != msg.end(); ++it)
403 it= msg.insert(it,
'\'');
408 it= msg.insert(it,
'\\');
416 statements.push_back(sql);
418 executeSQL(statements);
422 bool QueueConsumer::executeSQLWithCommitId(vector<string> &sql,
423 const string &commit_id,
424 const string &originating_server_uuid,
425 uint64_t originating_commit_id,
426 const string &master_id)
428 string tmp(
"UPDATE `sys_replication`.`applier_state`"
429 " SET `last_applied_commit_id` = ");
430 tmp.append(commit_id);
431 tmp.append(
", `originating_server_uuid` = '");
432 tmp.append(originating_server_uuid);
433 tmp.append(
"' , `originating_commit_id` = ");
434 tmp.append(boost::lexical_cast<string>(originating_commit_id));
436 tmp.append(
" WHERE `master_id` = ");
437 tmp.append(master_id);
441 _session->setOriginatingServerUUID(originating_server_uuid);
442 _session->setOriginatingCommitID(originating_commit_id);
444 return executeSQL(sql);
448 bool QueueConsumer::deleteFromQueue(
const string &master_id, uint64_t trx_id)
450 string sql(
"DELETE FROM `sys_replication`.`queue` WHERE `trx_id` = ");
451 sql.append(boost::lexical_cast<std::string>(trx_id));
453 sql.append(
" AND `master_id` = ");
454 sql.append(master_id);
456 vector<string> sql_vect;
457 sql_vect.push_back(sql);
459 return executeSQL(sql_vect);
TODO: Rename this file - func.h is stupid.