26 #include "zeromq_log.h"
27 #include <drizzled/message/transaction.pb.h>
28 #include <google/protobuf/io/coded_stream.h>
30 #include <drizzled/module/registry.h>
31 #include <drizzled/plugin.h>
32 #include <drizzled/item.h>
34 #include <boost/program_options.hpp>
38 namespace po= boost::program_options;
41 using namespace drizzled;
42 using namespace google;
44 namespace drizzle_plugin {
50 ZeroMQLog::ZeroMQLog(
const string &name,
const string &endpoint) :
51 plugin::TransactionApplier(name),
52 sysvar_endpoint(endpoint)
54 void *context= zmq_init(1);
55 _socket= zmq_socket (context, ZMQ_PUB);
57 int rc= zmq_bind (_socket, endpoint.c_str());
59 pthread_mutex_init(&publishLock, NULL);
62 ZeroMQLog::~ZeroMQLog()
65 pthread_mutex_destroy(&publishLock);
70 return sysvar_endpoint;
75 void *tmp_context= zmq_init(1);
76 void *tmp_socket= zmq_socket(tmp_context, ZMQ_PUB);
79 int tmp_rc= zmq_bind(tmp_socket, new_endpoint.c_str());
83 pthread_mutex_lock(&publishLock);
87 sysvar_endpoint= new_endpoint;
90 pthread_mutex_unlock(&publishLock);
94 plugin::ReplicationReturnCode
97 size_t message_byte_length= to_apply.ByteSize();
98 uint8_t* buffer=
new uint8_t[message_byte_length];
101 errmsg_printf(error::ERROR, _(
"Failed to allocate enough memory to transaction message\n"));
103 return plugin::UNKNOWN_ERROR;
106 string schema= getSchemaName(to_apply);
108 int rc= zmq_msg_init_size(&schemamsg, schema.length());
109 memcpy(zmq_msg_data(&schemamsg), schema.c_str(), schema.length());
111 to_apply.SerializeWithCachedSizesToArray(buffer);
113 rc= zmq_msg_init_size(&msg, message_byte_length);
115 memcpy(zmq_msg_data(&msg), buffer, message_byte_length);
118 pthread_mutex_lock(&publishLock);
119 rc= zmq_send(_socket, &schemamsg, ZMQ_SNDMORE);
120 rc= zmq_send(_socket, &msg, 0);
121 pthread_mutex_unlock(&publishLock);
124 zmq_msg_close(&schemamsg);
126 return plugin::SUCCESS;
130 if(txn.statement_size() == 0)
return "";
134 switch(statement.type())
136 case message::Statement::INSERT:
137 return statement.insert_header().table_metadata().schema_name();
138 case message::Statement::UPDATE:
139 return statement.update_header().table_metadata().schema_name();
140 case message::Statement::DELETE:
141 return statement.delete_header().table_metadata().schema_name();
142 case message::Statement::CREATE_TABLE:
143 return statement.create_table_statement().table().schema();
144 case message::Statement::TRUNCATE_TABLE:
145 return statement.truncate_table_statement().table_metadata().schema_name();
146 case message::Statement::DROP_TABLE:
147 return statement.drop_table_statement().table_metadata().schema_name();
148 case message::Statement::CREATE_SCHEMA:
149 return statement.create_schema_statement().schema().name();
150 case message::Statement::DROP_SCHEMA:
151 return statement.drop_schema_statement().schema_name();
157 static ZeroMQLog *zeromqLogger;
168 std::string new_endpoint(var->value->
str_value.data());
174 errmsg_printf(error::ERROR, _(
"zeromq_endpoint cannot be NULL"));
184 zeromqLogger=
new ZeroMQLog(
"zeromq_applier", vm[
"endpoint"].as<string>());
185 context.add(zeromqLogger);
195 po::value<string>()->default_value(
"tcp://*:9999"),
196 _(
"End point to bind to"));
197 context(
"use-replicator",
198 po::value<string>()->default_value(
"default_replicator"),
199 _(
"Name of the replicator plugin to use (default='default_replicator')"));
206 DRIZZLE_DECLARE_PLUGIN
212 N_(
"Publishes transactions to ZeroMQ"),
214 drizzle_plugin::zeromq::init,
216 drizzle_plugin::zeromq::init_options,
218 DRIZZLE_DECLARE_PLUGIN_END;
bool setEndpoint(std::string new_endpoint)
Setter for endpoint.
An Proxy Wrapper around boost::program_options::variables_map.
static void attachApplier(plugin::TransactionApplier *in_applier, const std::string &requested_replicator)
std::string & getEndpoint()
Getter for endpoint.
drizzled::plugin::ReplicationReturnCode apply(drizzled::Session &session, const drizzled::message::Transaction &to_apply)
Serializes the transaction and publishes the message to zmq.