Drizzled Public API Documentation

replication_services.cc
1 /* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3  *
4  * Copyright (C) 2008-2009 Sun Microsystems, Inc.
5  * Copyright (C) 2009-2010 Jay Pipes <jaypipes@gmail.com>
6  *
7  * Authors:
8  *
9  * Jay Pipes <jaypipes@gmail.com>
10  *
11  * This program is free software; you can redistribute it and/or modify
12  * it under the terms of the GNU General Public License as published by
13  * the Free Software Foundation; version 2 of the License.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program; if not, write to the Free Software
22  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23  */
24 
37 #include <config.h>
38 #include <drizzled/replication_services.h>
39 #include <drizzled/plugin/transaction_replicator.h>
40 #include <drizzled/plugin/transaction_applier.h>
41 #include <drizzled/message/transaction.pb.h>
42 #include <drizzled/gettext.h>
43 #include <drizzled/session.h>
44 #include <drizzled/error.h>
45 #include <drizzled/errmsg_print.h>
46 
47 #include <string>
48 #include <vector>
49 #include <algorithm>
50 
51 using namespace std;
52 
53 namespace drizzled {
54 
55 typedef std::vector<plugin::TransactionReplicator*> Replicators;
56 typedef std::vector<std::pair<std::string, plugin::TransactionApplier*> > Appliers;
57 
62 static bool is_active= false;
69 static Replicators replicators;
71 static Appliers appliers;
73 static ReplicationServices::ReplicationStreams replication_streams;
74 
79 static void normalizeReplicatorName(string &name)
80 {
81  boost::to_lower(name);
82  if (name.find("replicator") == string::npos)
83  name.append("replicator");
84  {
85  size_t found_underscore= name.find('_');
86  while (found_underscore != string::npos)
87  {
88  name.erase(found_underscore, 1);
89  found_underscore= name.find('_');
90  }
91  }
92 }
93 
94 bool ReplicationServices::evaluateRegisteredPlugins()
95 {
96  /*
97  * We loop through appliers that have registered with us
98  * and attempts to pair the applier with its requested
99  * replicator. If an applier has requested a replicator
100  * that has either not been built or has not registered
101  * with the replication services, we print an error and
102  * return false
103  */
104  if (appliers.empty())
105  return true;
106 
107  if (replicators.empty() && not appliers.empty())
108  {
109  errmsg_printf(error::ERROR,
110  N_("You registered a TransactionApplier plugin but no "
111  "TransactionReplicator plugins were registered.\n"));
112  return false;
113  }
114 
115  BOOST_FOREACH(Appliers::reference appl_iter, appliers)
116  {
117  plugin::TransactionApplier *applier= appl_iter.second;
118  string requested_replicator_name= appl_iter.first;
119  normalizeReplicatorName(requested_replicator_name);
120 
121  bool found= false;
122  Replicators::iterator repl_iter;
123  for (repl_iter= replicators.begin(); repl_iter != replicators.end(); ++repl_iter)
124  {
125  string replicator_name= (*repl_iter)->getName();
126  normalizeReplicatorName(replicator_name);
127 
128  if (requested_replicator_name.compare(replicator_name) == 0)
129  {
130  found= true;
131  break;
132  }
133  }
134  if (not found)
135  {
136  errmsg_printf(error::ERROR,
137  N_("You registered a TransactionApplier plugin but no "
138  "TransactionReplicator plugins were registered that match the "
139  "requested replicator name of '%s'.\n"
140  "We have deactivated the TransactionApplier '%s'.\n"),
141  requested_replicator_name.c_str(),
142  applier->getName().c_str());
143  applier->deactivate();
144  return false;
145  }
146  else
147  {
148  replication_streams.push_back(make_pair(*repl_iter, applier));
149  }
150  }
151  is_active= true;
152  return true;
153 }
154 
155 void ReplicationServices::attachReplicator(plugin::TransactionReplicator *in_replicator)
156 {
157  replicators.push_back(in_replicator);
158 }
159 
160 void ReplicationServices::detachReplicator(plugin::TransactionReplicator *in_replicator)
161 {
162  replicators.erase(std::find(replicators.begin(), replicators.end(), in_replicator));
163 }
164 
165 void ReplicationServices::attachApplier(plugin::TransactionApplier *in_applier, const string &requested_replicator_name)
166 {
167  appliers.push_back(make_pair(requested_replicator_name, in_applier));
168 }
169 
170 void ReplicationServices::detachApplier(plugin::TransactionApplier *)
171 {
172 }
173 
174 bool ReplicationServices::isActive()
175 {
176  return is_active;
177 }
178 
179 plugin::ReplicationReturnCode ReplicationServices::pushTransactionMessage(Session &in_session,
180  message::Transaction &to_push)
181 {
182  plugin::ReplicationReturnCode result= plugin::SUCCESS;
183 
184  BOOST_FOREACH(ReplicationStreams::reference iter, replication_streams)
185  {
186  plugin::TransactionReplicator *cur_repl= iter.first;
187  plugin::TransactionApplier *cur_appl= iter.second;
188 
189  result= cur_repl->replicate(cur_appl, in_session, to_push);
190 
191  if (result == plugin::SUCCESS)
192  {
193  /*
194  * We update the timestamp for the last applied Transaction so that
195  * publisher plugins can ask the replication services when the
196  * last known applied Transaction was using the getLastAppliedTimestamp()
197  * method.
198  */
199  last_applied_timestamp.fetch_and_store(to_push.transaction_context().end_timestamp());
200  }
201  else
202  return result;
203  }
204  return result;
205 }
206 
207 ReplicationServices::ReplicationStreams &ReplicationServices::getReplicationStreams()
208 {
209  return replication_streams;
210 }
211 
212 } /* namespace drizzled */
virtual ReplicationReturnCode replicate(TransactionApplier *in_applier, Session &session, message::Transaction &to_replicate)=0
static bool is_active
static void normalizeReplicatorName(string &name)
static Appliers appliers
static Replicators replicators
static ReplicationServices::ReplicationStreams replication_streams
static atomic< uint64_t > last_applied_timestamp