Drizzled Public API Documentation

queue_consumer.cc
1 /* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3  *
4  * Copyright (C) 2011 David Shrewsbury
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19  */
20 
21 #include <config.h>
22 #include <plugin/slave/queue_consumer.h>
23 #include <drizzled/errmsg_print.h>
24 #include <drizzled/execute.h>
25 #include <drizzled/message/transaction.pb.h>
27 #include <drizzled/sql/result_set.h>
28 #include <string>
29 #include <vector>
30 #include <boost/thread.hpp>
31 #include <boost/lexical_cast.hpp>
32 #include <google/protobuf/text_format.h>
33 
34 using namespace std;
35 using namespace drizzled;
36 
37 namespace slave
38 {
39 
40 bool QueueConsumer::init()
41 {
42  setApplierState("", true);
43  return true;
44 }
45 
46 
47 void QueueConsumer::shutdown()
48 {
49  setApplierState(getErrorMessage(), false);
50 }
51 
52 
53 bool QueueConsumer::process()
54 {
55  for (size_t index= 0; index < _master_ids.size(); index++)
56  {
57  /* We go ahead and get the string version of the master ID
58  * so we don't have to keep converting it from int to string.
59  */
60  const string master_id= boost::lexical_cast<string>(_master_ids[index]);
61 
62  if (not processSingleMaster(master_id))
63  return false;
64  }
65 
66  return true;
67 }
68 
69 bool QueueConsumer::processSingleMaster(const string &master_id)
70 {
71  TrxIdList completedTransactionIds;
72 
73  getListOfCompletedTransactions(master_id, completedTransactionIds);
74 
75  for (size_t x= 0; x < completedTransactionIds.size(); x++)
76  {
77  string commit_id;
78  string originating_server_uuid;
79  uint64_t originating_commit_id= 0;
80  uint64_t trx_id= completedTransactionIds[x];
81 
82  vector<string> aggregate_sql; /* final SQL to execute */
83  vector<string> segmented_sql; /* carryover from segmented statements */
84 
85  message::Transaction transaction;
86  uint32_t segment_id= 1;
87 
88  while (getMessage(transaction, commit_id, master_id, trx_id, originating_server_uuid,
89  originating_commit_id, segment_id++))
90  {
91  convertToSQL(transaction, aggregate_sql, segmented_sql);
92  transaction.Clear();
93  }
94 
95  /*
96  * The last message in a transaction should always have a commit_id
97  * value larger than 0, though other messages of the same transaction
98  * will have commit_id = 0.
99  */
100  assert((not commit_id.empty()) && (commit_id != "0"));
101  assert(segmented_sql.empty());
102 
103  if (not aggregate_sql.empty())
104  {
105  /*
106  * Execution using drizzled::Execute requires some special escaping.
107  */
108  vector<string>::iterator agg_iter;
109  for (agg_iter= aggregate_sql.begin(); agg_iter != aggregate_sql.end(); ++agg_iter)
110  {
111  string &sql= *agg_iter;
112  string::iterator si= sql.begin();
113  for (; si != sql.end(); ++si)
114  {
115  if (*si == '\"')
116  {
117  si= sql.insert(si, '\\');
118  ++si;
119  }
120  else if (*si == '\\')
121  {
122  si= sql.insert(si, '\\');
123  ++si;
124  si= sql.insert(si, '\\');
125  ++si;
126  si= sql.insert(si, '\\');
127  ++si;
128  }
129  else if (*si == ';')
130  {
131  si= sql.insert(si, '\\');
132  ++si; /* advance back to the semicolon */
133  }
134  }
135  }
136  }
137 
138  if (not executeSQLWithCommitId(aggregate_sql, commit_id,
139  originating_server_uuid,
140  originating_commit_id,
141  master_id))
142  {
143  if (_ignore_errors)
144  {
145  clearErrorState();
146 
147  /* Still need to record that we handled this trx */
148  vector<string> sql;
149  string tmp("UPDATE `sys_replication`.`applier_state`"
150  " SET `last_applied_commit_id` = ");
151  tmp.append(commit_id);
152  tmp.append(" WHERE `master_id` = ");
153  tmp.append(master_id);
154  sql.push_back(tmp);
155  executeSQL(sql);
156  }
157  else
158  {
159  return false;
160  }
161  }
162 
163  if (not deleteFromQueue(master_id, trx_id))
164  {
165  return false;
166  }
167  }
168 
169  return true;
170 }
171 
172 
173 bool QueueConsumer::getMessage(message::Transaction &transaction,
174  string &commit_id,
175  const string &master_id,
176  uint64_t trx_id,
177  string &originating_server_uuid,
178  uint64_t &originating_commit_id,
179  uint32_t segment_id)
180 {
181  string sql("SELECT `msg`, `commit_order`, `originating_server_uuid`, "
182  "`originating_commit_id` FROM `sys_replication`.`queue`"
183  " WHERE `trx_id` = ");
184  sql.append(boost::lexical_cast<string>(trx_id));
185  sql.append(" AND `seg_id` = ", 16);
186  sql.append(boost::lexical_cast<string>(segment_id));
187  sql.append(" AND `master_id` = ", 19),
188  sql.append(master_id);
189 
190  sql::ResultSet result_set(4);
191  Execute execute(*(_session.get()), true);
192 
193  execute.run(sql, result_set);
194 
195  assert(result_set.getMetaData().getColumnCount() == 4);
196 
197  /* Really should only be 1 returned row */
198  uint32_t found_rows= 0;
199  while (result_set.next())
200  {
201  string msg= result_set.getString(0);
202  string com_id= result_set.getString(1);
203  string orig_server_uuid= result_set.getString(2);
204  string orig_commit_id= result_set.getString(3);
205 
206  if ((msg == "") || (found_rows == 1))
207  break;
208 
209  /* No columns should be NULL */
210  assert(result_set.isNull(0) == false);
211  assert(result_set.isNull(1) == false);
212  assert(result_set.isNull(2) == false);
213  assert(result_set.isNull(3) == false);
214 
215 
216  google::protobuf::TextFormat::ParseFromString(msg, &transaction);
217 
218  commit_id= com_id;
219  originating_server_uuid= orig_server_uuid;
220  originating_commit_id= boost::lexical_cast<uint64_t>(orig_commit_id);
221  found_rows++;
222  }
223 
224  if (found_rows == 0)
225  return false;
226 
227  return true;
228 }
229 
230 bool QueueConsumer::getListOfCompletedTransactions(const string &master_id,
231  TrxIdList &list)
232 {
233  Execute execute(*(_session.get()), true);
234 
235  string sql("SELECT `trx_id` FROM `sys_replication`.`queue`"
236  " WHERE `commit_order` IS NOT NULL AND `commit_order` > 0"
237  " AND `master_id` = "
238  + master_id
239  + " ORDER BY `commit_order` ASC");
240 
241  /* ResultSet size must match column count */
242  sql::ResultSet result_set(1);
243 
244  execute.run(sql, result_set);
245 
246  assert(result_set.getMetaData().getColumnCount() == 1);
247 
248  while (result_set.next())
249  {
250  assert(result_set.isNull(0) == false);
251  string value= result_set.getString(0);
252 
253  /* empty string returned when no more results */
254  if (value != "")
255  {
256  list.push_back(boost::lexical_cast<uint64_t>(result_set.getString(0)));
257  }
258  }
259 
260  return true;
261 }
262 
263 
264 bool QueueConsumer::convertToSQL(const message::Transaction &transaction,
265  vector<string> &aggregate_sql,
266  vector<string> &segmented_sql)
267 {
268  if (transaction.has_event())
269  return true;
270 
271  size_t num_statements= transaction.statement_size();
272 
273  /*
274  * Loop through all Statement messages within this Transaction and
275  * convert each to equivalent SQL statements. Complete Statements will
276  * be appended to aggregate_sql, while segmented Statements will remain
277  * in segmented_sql to be appended to until completed, or rolled back.
278  */
279 
280  for (size_t idx= 0; idx < num_statements; idx++)
281  {
282  const message::Statement &statement= transaction.statement(idx);
283 
284  /* We won't bother with executing a rolled back transaction */
285  if (statement.type() == message::Statement::ROLLBACK)
286  {
287  assert(idx == (num_statements - 1)); /* should be the final Statement */
288  aggregate_sql.clear();
289  segmented_sql.clear();
290  break;
291  }
292 
293  switch (statement.type())
294  {
295  /* DDL cannot be in a transaction, so precede with a COMMIT */
296  case message::Statement::TRUNCATE_TABLE:
297  case message::Statement::CREATE_SCHEMA:
298  case message::Statement::ALTER_SCHEMA:
299  case message::Statement::DROP_SCHEMA:
300  case message::Statement::CREATE_TABLE:
301  case message::Statement::ALTER_TABLE:
302  case message::Statement::DROP_TABLE:
303  case message::Statement::RAW_SQL: /* currently ALTER TABLE or RENAME */
304  {
305  segmented_sql.push_back("COMMIT");
306  break;
307  }
308 
309  /* Cancel any ongoing statement */
310  case message::Statement::ROLLBACK_STATEMENT:
311  {
312  segmented_sql.clear();
313  continue;
314  }
315 
316  default:
317  {
318  break;
319  }
320  }
321 
322  if (message::transformStatementToSql(statement, segmented_sql,
323  message::DRIZZLE, true))
324  {
325  return false;
326  }
327 
328  if (isEndStatement(statement))
329  {
330  aggregate_sql.insert(aggregate_sql.end(),
331  segmented_sql.begin(),
332  segmented_sql.end());
333  segmented_sql.clear();
334  }
335  }
336 
337  return true;
338 }
339 
340 
341 bool QueueConsumer::isEndStatement(const message::Statement &statement)
342 {
343  switch (statement.type())
344  {
345  case (message::Statement::INSERT):
346  {
347  const message::InsertData &data= statement.insert_data();
348  if (not data.end_segment())
349  return false;
350  break;
351  }
352  case (message::Statement::UPDATE):
353  {
354  const message::UpdateData &data= statement.update_data();
355  if (not data.end_segment())
356  return false;
357  break;
358  }
359  case (message::Statement::DELETE):
360  {
361  const message::DeleteData &data= statement.delete_data();
362  if (not data.end_segment())
363  return false;
364  break;
365  }
366  default:
367  return true;
368  }
369  return true;
370 }
371 
372 
373 /*
374  * TODO: This currently updates every row in the applier_state table.
375  * This use to be a single row. With multi-master support, we now need
376  * a row for every master so we can track the last applied commit ID
377  * value for each. Eventually, we may want multiple consumer threads,
378  * so then we'd need to update each row independently.
379  */
380 void QueueConsumer::setApplierState(const string &err_msg, bool status)
381 {
382  vector<string> statements;
383  string sql;
384  string msg(err_msg);
385 
386  if (not status)
387  {
388  sql= "UPDATE `sys_replication`.`applier_state` SET `status` = 'STOPPED'";
389  }
390  else
391  {
392  sql= "UPDATE `sys_replication`.`applier_state` SET `status` = 'RUNNING'";
393  }
394 
395  sql.append(", `error_msg` = '", 17);
396 
397  /* Escape embedded quotes and statement terminators */
398  string::iterator it;
399  for (it= msg.begin(); it != msg.end(); ++it)
400  {
401  if (*it == '\'')
402  {
403  it= msg.insert(it, '\'');
404  ++it; /* advance back to the quote */
405  }
406  else if (*it == ';')
407  {
408  it= msg.insert(it, '\\');
409  ++it; /* advance back to the semicolon */
410  }
411  }
412 
413  sql.append(msg);
414  sql.append("'", 1);
415 
416  statements.push_back(sql);
417  clearErrorState();
418  executeSQL(statements);
419 }
420 
421 
422 bool QueueConsumer::executeSQLWithCommitId(vector<string> &sql,
423  const string &commit_id,
424  const string &originating_server_uuid,
425  uint64_t originating_commit_id,
426  const string &master_id)
427 {
428  string tmp("UPDATE `sys_replication`.`applier_state`"
429  " SET `last_applied_commit_id` = ");
430  tmp.append(commit_id);
431  tmp.append(", `originating_server_uuid` = '");
432  tmp.append(originating_server_uuid);
433  tmp.append("' , `originating_commit_id` = ");
434  tmp.append(boost::lexical_cast<string>(originating_commit_id));
435 
436  tmp.append(" WHERE `master_id` = ");
437  tmp.append(master_id);
438 
439  sql.push_back(tmp);
440 
441  _session->setOriginatingServerUUID(originating_server_uuid);
442  _session->setOriginatingCommitID(originating_commit_id);
443 
444  return executeSQL(sql);
445 }
446 
447 
448 bool QueueConsumer::deleteFromQueue(const string &master_id, uint64_t trx_id)
449 {
450  string sql("DELETE FROM `sys_replication`.`queue` WHERE `trx_id` = ");
451  sql.append(boost::lexical_cast<std::string>(trx_id));
452 
453  sql.append(" AND `master_id` = ");
454  sql.append(master_id);
455 
456  vector<string> sql_vect;
457  sql_vect.push_back(sql);
458 
459  return executeSQL(sql_vect);
460 }
461 
462 } /* namespace slave */
TODO: Rename this file - func.h is stupid.
Definition: module.cc:36