22 #include <plugin/slave/replication_slave.h>
23 #include <drizzled/errmsg_print.h>
24 #include <drizzled/program_options/config_file.h>
25 #include <boost/lexical_cast.hpp>
26 #include <boost/program_options.hpp>
28 #include <drizzled/plugin.h>
33 namespace po= boost::program_options;
38 void ReplicationSlave::startup(
Session &session)
41 if (not initWithConfig())
43 errmsg_printf(error::ERROR, _(
"Could not start slave services: %s\n"),
49 boost::unordered_map<uint32_t, Master *>::const_iterator it;
50 for (it= _masters.begin(); it != _masters.end(); ++it)
54 _consumer.addMasterId(it->first);
57 _consumer_thread= boost::thread(&QueueConsumer::run, &_consumer);
61 bool ReplicationSlave::initWithConfig()
64 po::options_description slave_options(
"Options for the slave plugin");
67 slave_options.add_options()
68 (
"seconds-between-reconnects", po::value<uint32_t>()->default_value(30))
69 (
"io-thread-sleep", po::value<uint32_t>()->default_value(5))
70 (
"applier-thread-sleep", po::value<uint32_t>()->default_value(5))
71 (
"ignore-errors", po::value<bool>()->default_value(
false)->zero_tokens());
74 for (
size_t num= 1; num <= 10; num++)
76 string section(
"master");
77 section.append(boost::lexical_cast<string>(num));
78 slave_options.add_options()
79 ((section +
".master-host").c_str(), po::value<string>()->default_value(
""))
80 ((section +
".master-port").c_str(), po::value<uint16_t>()->default_value(3306))
81 ((section +
".master-user").c_str(), po::value<string>()->default_value(
""))
82 ((section +
".master-pass").c_str(), po::value<string>()->default_value(
""))
83 ((section +
".max-reconnects").c_str(), po::value<uint32_t>()->default_value(10))
84 ((section +
".max-commit-id").c_str(), po::value<uint64_t>());
87 ifstream cf_stream(_config_file.c_str());
89 if (not cf_stream.is_open())
91 _error=
"Unable to open file ";
92 _error.append(_config_file);
96 po::store(drizzled::program_options::parse_config_file(cf_stream, slave_options), vm);
106 for (
size_t num= 1; num <= 10; num++)
108 string section(
"master");
109 section.append(boost::lexical_cast<string>(num));
117 if (vm[section +
".master-host"].as<string>() ==
"")
120 _masters[num]=
new (std::nothrow)
Master(num);
122 if (vm.count(section +
".master-host"))
123 master(num).producer().setMasterHost(vm[section +
".master-host"].as<string>());
125 if (vm.count(section +
".master-port"))
126 master(num).producer().setMasterPort(vm[section +
".master-port"].as<uint16_t>());
128 if (vm.count(section +
".master-user"))
129 master(num).producer().setMasterUser(vm[section +
".master-user"].as<string>());
131 if (vm.count(section +
".master-pass"))
132 master(num).producer().setMasterPassword(vm[section +
".master-pass"].as<string>());
134 if (vm.count(section +
".max-commit-id"))
135 master(num).producer().setCachedMaxCommitId(vm[section +
".max-commit-id"].as<uint64_t>());
138 boost::unordered_map<uint32_t, Master *>::const_iterator it;
140 for (it= _masters.begin(); it != _masters.end(); ++it)
142 if (vm.count(
"max-reconnects"))
143 it->second->producer().setMaxReconnectAttempts(vm[
"max-reconnects"].as<uint32_t>());
145 if (vm.count(
"seconds-between-reconnects"))
146 it->second->producer().setSecondsBetweenReconnects(vm[
"seconds-between-reconnects"].as<uint32_t>());
148 if (vm.count(
"io-thread-sleep"))
149 it->second->producer().setSleepInterval(vm[
"io-thread-sleep"].as<uint32_t>());
152 if (vm.count(
"applier-thread-sleep"))
153 _consumer.setSleepInterval(vm[
"applier-thread-sleep"].as<uint32_t>());
154 if (vm.count(
"ignore-errors"))
155 _consumer.setIgnoreErrors(vm[
"ignore-errors"].as<
bool>());
161 _error= rs.getErrorMessage();
165 for (it= _masters.begin(); it != _masters.end(); ++it)
168 rs.createInitialApplierRow(it->first);
169 rs.createInitialIORow(it->first);
171 uint64_t cachedValue= it->second->producer().cachedMaxCommitId();
176 _error= rs.getErrorMessage();
bool setInitialMaxCommitId(uint32_t master_id, uint64_t value)
TODO: Rename this file - func.h is stupid.