Drizzled Public API Documentation

rabbitmq_log.cc
1 /* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3  *
4  * Copyright (C) 2010 Marcus Eriksson
5  *
6  * Authors:
7  *
8  * Marcus Eriksson <krummas@gmail.com>
9  *
10  * This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program; if not, write to the Free Software
22  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23  */
24 
25 #include <config.h>
26 #include "rabbitmq_log.h"
27 #include <drizzled/message/transaction.pb.h>
28 #include <google/protobuf/io/coded_stream.h>
29 #include <stdio.h>
30 #include <drizzled/module/registry.h>
31 #include <drizzled/plugin.h>
32 #include <stdint.h>
33 #include "rabbitmq_handler.h"
34 #include <boost/program_options.hpp>
36 #include <drizzled/item.h>
37 
38 namespace po= boost::program_options;
39 
40 using namespace std;
41 using namespace drizzled;
42 using namespace google;
43 
44 namespace drizzle_plugin {
45 namespace rabbitmq {
46 
50 static port_constraint sysvar_rabbitmq_port;
51 bool sysvar_logging_enable= true;
52 string sysvar_rabbitmq_host;
53 string sysvar_rabbitmq_username;
54 string sysvar_rabbitmq_password;
55 string sysvar_rabbitmq_virtualhost;
56 string sysvar_rabbitmq_exchange;
57 string sysvar_rabbitmq_routingkey;
58 void updateSysvarLoggingEnable(Session *, sql_var_t);
59 bool updateSysvarRabbitMQHost(Session *, set_var *var);
60 int updateSysvarRabbitMQPort(Session *, set_var *var);
61 bool updateSysvarRabbitMQUserName(Session *, set_var *var);
62 bool updateSysvarRabbitMQPassword(Session *, set_var *var);
63 bool updateSysvarRabbitMQVirtualHost(Session *, set_var *var);
64 bool updateSysvarRabbitMQExchange(Session *, set_var *var);
65 bool updateSysvarRabbitMQRoutingKey(Session *, set_var *var);
66 
67 RabbitMQLog::RabbitMQLog(const string &name,
68  RabbitMQHandler* mqHandler) :
69  plugin::TransactionApplier(name),
70  _rabbitMQHandler(mqHandler)
71 { }
72 
73 RabbitMQLog::~RabbitMQLog()
74 {
75  _rabbitMQHandler->disconnect();
76  delete _rabbitMQHandler;
77 }
78 
79 plugin::ReplicationReturnCode
81 {
82  if(not sysvar_logging_enable)
83  return plugin::SUCCESS;
84 
85  size_t message_byte_length= to_apply.ByteSize();
86  uint8_t* buffer= new uint8_t[message_byte_length];
87  if(buffer == NULL)
88  {
89  errmsg_printf(error::ERROR, _("Failed to allocate enough memory to transaction message\n"));
90  deactivate();
91  return plugin::UNKNOWN_ERROR;
92  }
93 
94  to_apply.SerializeWithCachedSizesToArray(buffer);
95  short tries = 3;
96  bool sent = false;
97  while (!sent && tries > 0) {
98  tries--;
99  try
100  {
101  _rabbitMQHandler->publish(buffer, int(message_byte_length));
102  sent = true;
103  }
104  catch(exception& e)
105  {
106  errmsg_printf(error::ERROR, "%s", e.what());
107  try {
108  _rabbitMQHandler->reconnect();
109  } catch(exception &e) {
110  errmsg_printf(error::ERROR, _("Could not reconnect, trying again.. - waiting 10 seconds for server to come back"));
111  sleep(10);
112  } //
113  }
114  }
115 
116  delete[] buffer;
117  if(sent) return plugin::SUCCESS;
118  errmsg_printf(error::ERROR, _("RabbitMQ server has disappeared, failing transaction."));
119  deactivate();
120  return plugin::UNKNOWN_ERROR;
121 }
122 
123 void RabbitMQLog::setRabbitMQHandler(RabbitMQHandler* new_rabbitMQHandler)
124 {
125  _rabbitMQHandler= new_rabbitMQHandler;
126 }
127 
128 static RabbitMQLog *rabbitmqLogger;
129 static RabbitMQHandler* rabbitmqHandler;
130 
131 
132 void updateSysvarLoggingEnable(Session *, sql_var_t)
133 {
134  if(not sysvar_logging_enable)
135  {
136  sysvar_logging_enable = false;
137  delete rabbitmqHandler;
138  }
139  else
140  {
141  rabbitmqHandler= new RabbitMQHandler(sysvar_rabbitmq_host,
142  sysvar_rabbitmq_port,
143  sysvar_rabbitmq_username,
144  sysvar_rabbitmq_password,
145  sysvar_rabbitmq_virtualhost,
146  sysvar_rabbitmq_exchange,
147  sysvar_rabbitmq_routingkey);
148  if(rabbitmqHandler->rabbitmq_connection_established)
149  {
150  rabbitmqLogger->setRabbitMQHandler(rabbitmqHandler);
151  sysvar_logging_enable= true;
152  }
153  else
154  {
155  errmsg_printf(error::ERROR, _("Could not open socket, is rabbitmq running?"));
156  sysvar_logging_enable= false;
157  }
158  }
159 }
160 
161 bool updateSysvarRabbitMQHost(Session *, set_var *var)
162 {
163  if(sysvar_logging_enable)
164  {
165  errmsg_printf(error::ERROR, _("Value of rabbitmq_host cannot be changed as rabbitmq plugin is enabled. You need to disable the plugin first."));
166  return true; // error
167  }
168  if (not var->value->str_value.empty())
169  {
170  sysvar_rabbitmq_host = var->value->str_value.data();
171  return false;
172  }
173  else
174  {
175  errmsg_printf(error::ERROR, _("rabbitmq_host cannot be NULL"));
176  return true; // error
177  }
178  return true; // error
179 }
180 
181 int updateSysvarRabbitMQPort(Session *, set_var *var)
182 {
183  if(sysvar_logging_enable)
184  {
185  errmsg_printf(error::ERROR, _("Value of rabbitmq_port cannot be changed as rabbitmq plugin is enabled. You need to disable the plugin first."));
186  return 1; // error
187  }
188  if (var->value->val_int())
189  {
190  sysvar_rabbitmq_port = var->value->val_int();
191  return 0;
192  }
193  else
194  {
195  errmsg_printf(error::ERROR, _("rabbitmq_port cannot be NULL"));
196  return 1; // error
197  }
198  return 1; // error
199 }
200 
201 bool updateSysvarRabbitMQUserName(Session *, set_var *var)
202 {
203  if(sysvar_logging_enable)
204  {
205  errmsg_printf(error::ERROR, _("Value of rabbitmq_username cannot be changed as rabbitmq plugin is enabled. You need to disable the plugin first."));
206  return true; // error
207  }
208  if (not var->value->str_value.empty())
209  {
210  sysvar_rabbitmq_username = var->value->str_value.data();
211  return false;
212  }
213  else
214  {
215  errmsg_printf(error::ERROR, _("rabbitmq_username cannot be NULL"));
216  return true; // error
217  }
218  return true; // error
219 }
220 
221 bool updateSysvarRabbitMQPassword(Session *, set_var *var)
222 {
223  if(sysvar_logging_enable)
224  {
225  errmsg_printf(error::ERROR, _("Value of rabbitmq_password cannot be changed as rabbitmq plugin is enabled. You need to disable the plugin first."));
226  return true; // error
227  }
228  if (not var->value->str_value.empty())
229  {
230  sysvar_rabbitmq_password = var->value->str_value.data();
231  return false;
232  }
233  else
234  {
235  errmsg_printf(error::ERROR, _("rabbitmq_password cannot be NULL"));
236  return true; // error
237  }
238  return true; // error
239 }
240 
241 bool updateSysvarRabbitMQVirtualHost(Session *, set_var *var)
242 {
243  if(sysvar_logging_enable)
244  {
245  errmsg_printf(error::ERROR, _("Value of rabbitmq_virtualhost cannot be changed as rabbitmq plugin is enabled. You need to disable the plugin first."));
246  return true; // error
247  }
248  if (not var->value->str_value.empty())
249  {
250  sysvar_rabbitmq_virtualhost = var->value->str_value.data();
251  return false;
252  }
253  else
254  {
255  errmsg_printf(error::ERROR, _("rabbitmq_virtualhost cannot be NULL"));
256  return true; // error
257  }
258  return true; // error
259 }
260 
261 bool updateSysvarRabbitMQExchange(Session *, set_var *var)
262 {
263  if(sysvar_logging_enable)
264  {
265  errmsg_printf(error::ERROR, _("Value of rabbitmq_exchange cannot be changed as rabbitmq plugin is enabled. You need to disable the plugin first."));
266  return true; // error
267  }
268  if (not var->value->str_value.empty())
269  {
270  sysvar_rabbitmq_exchange = var->value->str_value.data();
271  return false;
272  }
273  else
274  {
275  errmsg_printf(error::ERROR, _("rabbitmq_exchange cannot be NULL"));
276  return true; // error
277  }
278  return true; // error
279 }
280 
281 bool updateSysvarRabbitMQRoutingKey(Session *, set_var *var)
282 {
283  if(sysvar_logging_enable)
284  {
285  errmsg_printf(error::ERROR, _("Value of rabbitmq_routingkey cannot be changed as rabbitmq plugin is enabled. You need to disable the plugin first."));
286  return true; // error
287  }
288  if (not var->value->str_value.empty())
289  {
290  sysvar_rabbitmq_routingkey = var->value->str_value.data();
291  return false;
292  }
293  else
294  {
295  errmsg_printf(error::ERROR, _("rabbitmq_routingkey cannot be NULL"));
296  return true; // error
297  }
298  return true; // error
299 }
300 
306 static int init(drizzled::module::Context &context)
307 {
308  const module::option_map &vm= context.getOptions();
309 
310  try
311  {
312  rabbitmqHandler= new RabbitMQHandler(vm["host"].as<string>(),
313  sysvar_rabbitmq_port,
314  vm["username"].as<string>(),
315  vm["password"].as<string>(),
316  vm["virtualhost"].as<string>(),
317  vm["exchange"].as<string>(),
318  vm["routingkey"].as<string>());
319  if(not rabbitmqHandler->rabbitmq_connection_established)
320  {
321  throw rabbitmq_handler_exception(_("Could not open socket, is rabbitmq running?"));
322  }
323  }
324  catch (exception& e)
325  {
326  errmsg_printf(error::ERROR, _("Failed to allocate the RabbitMQHandler. Got error: %s\n"),
327  e.what());
328  return 1;
329  }
330  try
331  {
332  rabbitmqLogger= new RabbitMQLog("rabbitmq_applier", rabbitmqHandler);
333  }
334  catch (exception& e)
335  {
336  errmsg_printf(error::ERROR, _("Failed to allocate the RabbitMQLog instance. Got error: %s\n"),
337  e.what());
338  return 1;
339  }
340 
341  context.add(rabbitmqLogger);
342  ReplicationServices::attachApplier(rabbitmqLogger, vm["use-replicator"].as<string>());
343 
344  context.registerVariable(new sys_var_bool_ptr("logging_enable", &sysvar_logging_enable, &updateSysvarLoggingEnable));
345  context.registerVariable(new sys_var_std_string("host", sysvar_rabbitmq_host, NULL, &updateSysvarRabbitMQHost));
346  context.registerVariable(new sys_var_constrained_value<in_port_t>("port", sysvar_rabbitmq_port, &updateSysvarRabbitMQPort));
347  context.registerVariable(new sys_var_std_string("username", sysvar_rabbitmq_username, NULL, &updateSysvarRabbitMQUserName));
348  context.registerVariable(new sys_var_std_string("password", sysvar_rabbitmq_password, NULL, &updateSysvarRabbitMQPassword));
349  context.registerVariable(new sys_var_std_string("virtualhost", sysvar_rabbitmq_virtualhost, NULL, &updateSysvarRabbitMQVirtualHost));
350  context.registerVariable(new sys_var_std_string("exchange", sysvar_rabbitmq_exchange, NULL, &updateSysvarRabbitMQExchange));
351  context.registerVariable(new sys_var_std_string("routingkey", sysvar_rabbitmq_routingkey, NULL, &updateSysvarRabbitMQRoutingKey));
352 
353  return 0;
354 }
355 
356 
357 static void init_options(drizzled::module::option_context &context)
358 {
359  context("logging-enable",
360  po::value<bool>(&sysvar_logging_enable)->default_value(true)->zero_tokens(),
361  _("Enable logging to rabbitmq server"));
362  context("host",
363  po::value<string>(&sysvar_rabbitmq_host)->default_value("localhost"),
364  _("Host name to connect to"));
365  context("port",
366  po::value<port_constraint>(&sysvar_rabbitmq_port)->default_value(5672),
367  _("Port to connect to"));
368  context("virtualhost",
369  po::value<string>(&sysvar_rabbitmq_virtualhost)->default_value("/"),
370  _("RabbitMQ virtualhost"));
371  context("username",
372  po::value<string>(&sysvar_rabbitmq_username)->default_value("guest"),
373  _("RabbitMQ username"));
374  context("password",
375  po::value<string>(&sysvar_rabbitmq_password)->default_value("guest"),
376  _("RabbitMQ password"));
377  context("use-replicator",
378  po::value<string>()->default_value("default_replicator"),
379  _("Name of the replicator plugin to use (default='default_replicator')"));
380  context("exchange",
381  po::value<string>(&sysvar_rabbitmq_exchange)->default_value("ReplicationExchange"),
382  _("Name of RabbitMQ exchange to publish to"));
383  context("routingkey",
384  po::value<string>(&sysvar_rabbitmq_routingkey)->default_value("ReplicationRoutingKey"),
385  _("Name of RabbitMQ routing key to use"));
386 }
387 
388 } /* namespace rabbitmq */
389 } /* namespace drizzle_plugin */
390 
391 DRIZZLE_DECLARE_PLUGIN
392 {
393  DRIZZLE_VERSION_ID,
394  "rabbitmq",
395  "0.1",
396  "Marcus Eriksson",
397  N_("Publishes transactions to RabbitMQ"),
398  PLUGIN_LICENSE_GPL,
399  drizzle_plugin::rabbitmq::init,
400  NULL,
401  drizzle_plugin::rabbitmq::init_options
402 }
403 DRIZZLE_DECLARE_PLUGIN_END;
virtual int64_t val_int()=0
TODO: Rename this file - func.h is stupid.
An Proxy Wrapper around boost::program_options::variables_map.
wrapper around librabbitmq, hides error handling and reconnections etc TODO: add reconnection handlin...
drizzled::plugin::ReplicationReturnCode apply(drizzled::Session &session, const drizzled::message::Transaction &to_apply)
Serializes the transaction and uses a RabbiMQHandler to publish the message.
Definition: rabbitmq_log.cc:80
Definition: engine.cc:41
static void attachApplier(plugin::TransactionApplier *in_applier, const std::string &requested_replicator)
void publish(void *message, const int length)
Publishes the message to the server.
String str_value
Definition: item.h:107