26 #include "rabbitmq_log.h"
27 #include <drizzled/message/transaction.pb.h>
28 #include <google/protobuf/io/coded_stream.h>
30 #include <drizzled/module/registry.h>
31 #include <drizzled/plugin.h>
33 #include "rabbitmq_handler.h"
34 #include <boost/program_options.hpp>
37 namespace po= boost::program_options;
40 using namespace drizzled;
41 using namespace google;
43 namespace drizzle_plugin
52 RabbitMQLog::RabbitMQLog(
const string &name,
54 plugin::TransactionApplier(name),
55 _rabbitMQHandler(mqHandler)
58 RabbitMQLog::~RabbitMQLog()
60 _rabbitMQHandler->disconnect();
61 delete _rabbitMQHandler;
64 plugin::ReplicationReturnCode
67 size_t message_byte_length= to_apply.ByteSize();
68 uint8_t* buffer=
new uint8_t[message_byte_length];
71 errmsg_printf(error::ERROR, _(
"Failed to allocate enough memory to transaction message\n"));
73 return plugin::UNKNOWN_ERROR;
76 to_apply.SerializeWithCachedSizesToArray(buffer);
79 while (!sent && tries > 0) {
83 _rabbitMQHandler->
publish(buffer,
int(message_byte_length));
88 errmsg_printf(error::ERROR,
"%s", e.what());
90 _rabbitMQHandler->reconnect();
91 }
catch(exception &e) {
92 errmsg_printf(error::ERROR, _(
"Could not reconnect, trying again.. - waiting 10 seconds for server to come back"));
99 if(sent)
return plugin::SUCCESS;
100 errmsg_printf(error::ERROR, _(
"RabbitMQ server has disappeared, failing transaction."));
102 return plugin::UNKNOWN_ERROR;
122 vm[
"username"].as<string>(),
123 vm[
"password"].as<string>(),
124 vm[
"virtualhost"].as<string>(),
125 vm[
"exchange"].as<string>(),
126 vm[
"routingkey"].as<string>());
130 errmsg_printf(error::ERROR, _(
"Failed to allocate the RabbitMQHandler. Got error: %s\n"),
140 errmsg_printf(error::ERROR, _(
"Failed to allocate the RabbitMQLog instance. Got error: %s\n"),
163 po::value<string>()->default_value(
"localhost"),
164 _(
"Host name to connect to"));
167 _(
"Port to connect to"));
168 context(
"virtualhost",
169 po::value<string>()->default_value(
"/"),
170 _(
"RabbitMQ virtualhost"));
172 po::value<string>()->default_value(
"guest"),
173 _(
"RabbitMQ username"));
175 po::value<string>()->default_value(
"guest"),
176 _(
"RabbitMQ password"));
177 context(
"use-replicator",
178 po::value<string>()->default_value(
"default_replicator"),
179 _(
"Name of the replicator plugin to use (default='default_replicator')"));
181 po::value<string>()->default_value(
"ReplicationExchange"),
182 _(
"Name of RabbitMQ exchange to publish to"));
183 context(
"routingkey",
184 po::value<string>()->default_value(
"ReplicationRoutingKey"),
185 _(
"Name of RabbitMQ routing key to use"));
190 DRIZZLE_DECLARE_PLUGIN
196 N_(
"Publishes transactions to RabbitMQ"),
198 drizzle_plugin::init,
200 drizzle_plugin::init_options
202 DRIZZLE_DECLARE_PLUGIN_END;
static RabbitMQHandler * rabbitmqHandler
the rabbitmq handler
An Proxy Wrapper around boost::program_options::variables_map.
static RabbitMQLog * rabbitmqLogger
the actual plugin
static port_constraint sysvar_rabbitmq_port
A TransactionApplier that sends the transactions to rabbitmq (or any AMQP 0-8 compliant message queue...
void publish(void *message, const int length)
Publishes the message to the server.
static void attachApplier(plugin::TransactionApplier *in_applier, const std::string &requested_replicator)
wrapper around librabbitmq, hides error handling and reconnections etc TODO: add reconnection handlin...
drizzled::plugin::ReplicationReturnCode apply(drizzled::Session &session, const drizzled::message::Transaction &to_apply)
Serializes the transaction and uses a RabbiMQHandler to publish the message.