Drizzled Public API Documentation

rabbitmq_handler.cc
1 /* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3  *
4  * Copyright (C) 2010 Marcus Eriksson
5  *
6  * Authors:
7  *
8  * Marcus Eriksson <krummas@gmail.com>
9  *
10  * This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program; if not, write to the Free Software
22  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23  */
24 
25 
26 #include <config.h>
27 
28 #include <unistd.h>
29 
30 #include <drizzled/gettext.h>
31 
32 #include "rabbitmq_handler.h"
33 
34 using namespace std;
35 
36 namespace drizzle_plugin {
37 namespace rabbitmq {
38 
39 extern bool sysvar_logging_enable;
40 
41 RabbitMQHandler::RabbitMQHandler(const std::string &rabbitMQHost,
42  const in_port_t rabbitMQPort,
43  const std::string &rabbitMQUsername,
44  const std::string &rabbitMQPassword,
45  const std::string &rabbitMQVirtualhost,
46  const std::string &rabbitMQExchange,
47  const std::string &rabbitMQRoutingKey)
49  rabbitmqConnection(amqp_new_connection()),
50  hostname(rabbitMQHost),
51  port(rabbitMQPort),
52  username(rabbitMQUsername),
53  password(rabbitMQPassword),
54  virtualhost(rabbitMQVirtualhost),
55  exchange(rabbitMQExchange),
56  routingKey(rabbitMQRoutingKey),
57  rabbitmq_connection_established(false)
58 {
59  pthread_mutex_init(&publishLock, NULL);
60  connect();
61 }
62 
63 RabbitMQHandler::~RabbitMQHandler()
64 {
65  pthread_mutex_destroy(&publishLock);
66  disconnect();
67 }
68 
69 void RabbitMQHandler::publish(void *message,
70  const int length)
72 {
73  // return if query logging is not enabled
74  //if (sysvar_logging_enable == false)
75  // return;
76 
77  pthread_mutex_lock(&publishLock);
78  amqp_bytes_t b;
79  b.bytes= message;
80  b.len= length;
81 
82  if (amqp_basic_publish(rabbitmqConnection,
83  1,
84  amqp_cstring_bytes(exchange.c_str()),
85  amqp_cstring_bytes(routingKey.c_str()),
86  0,
87  0,
88  NULL,
89  b) < 0)
90  {
91  pthread_mutex_unlock(&publishLock);
92  throw rabbitmq_handler_exception("Could not publish message");
93  }
94  pthread_mutex_unlock(&publishLock);
95 
96 }
97 
98 void RabbitMQHandler::reconnect() throw(rabbitmq_handler_exception)
99 {
100  disconnect();
101  connect();
102 }
103 
104 void RabbitMQHandler::disconnect() throw(rabbitmq_handler_exception)
105 {
106  try
107  {
108  handleAMQPError(amqp_channel_close(rabbitmqConnection,
109  1,
110  AMQP_REPLY_SUCCESS),
111  "close channel");
112  handleAMQPError(amqp_connection_close(rabbitmqConnection,
113  AMQP_REPLY_SUCCESS),
114  "close connection");
115  amqp_destroy_connection(rabbitmqConnection);
116  }
117  catch(exception& e) {} // do not throw in destructorn
118  close(sockfd);
119 }
120 
121 void RabbitMQHandler::connect() throw(rabbitmq_handler_exception) {
122  sockfd = amqp_open_socket(hostname.c_str(), port);
123  if(sockfd < 0)
124  {
125  rabbitmq_connection_established= false;
126  return;
127  }
128  try
129  {
130  amqp_set_sockfd(rabbitmqConnection, sockfd);
131  /* login to rabbitmq, handleAMQPError throws exception if there is a problem */
132  handleAMQPError(amqp_login(rabbitmqConnection,
133  virtualhost.c_str(),
134  0,
135  131072,
136  0,
137  AMQP_SASL_METHOD_PLAIN,
138  username.c_str(),
139  password.c_str()),
140  "rabbitmq login");
141  /* open the channel */
142  amqp_channel_open(rabbitmqConnection, 1);
143  handleAMQPError(amqp_get_rpc_reply(rabbitmqConnection), "RPC Reply");
144  amqp_table_t empty_table = { 0, NULL }; // for users of old librabbitmq users - amqp_empty_table did not exist
145  amqp_exchange_declare(rabbitmqConnection, 1, amqp_cstring_bytes(exchange.c_str()), amqp_cstring_bytes("fanout"), 0, 0, empty_table);
146  handleAMQPError(amqp_get_rpc_reply(rabbitmqConnection), "RPC Reply");
147  rabbitmq_connection_established= true;
148  }
149  catch(exception& e)
150  {
151  rabbitmq_connection_established= false;
152  }
153 }
154 
155 void RabbitMQHandler::handleAMQPError(amqp_rpc_reply_t x, string context) throw(rabbitmq_handler_exception)
156 {
157  string errorMessage("");
158  switch (x.reply_type) {
159  case AMQP_RESPONSE_NORMAL:
160  break;
161  case AMQP_RESPONSE_NONE:
162  errorMessage.assign("No response in ");
163  errorMessage.append(context);
164  throw rabbitmq_handler_exception(errorMessage);
165  case AMQP_RESPONSE_LIBRARY_EXCEPTION:
166  case AMQP_RESPONSE_SERVER_EXCEPTION:
167  switch (x.reply.id) {
168  case AMQP_CONNECTION_CLOSE_METHOD:
169  errorMessage.assign("Connection closed in ");
170  errorMessage.append(context);
171  throw rabbitmq_handler_exception(errorMessage);
172  case AMQP_CHANNEL_CLOSE_METHOD:
173  errorMessage.assign("Channel closed in ");
174  errorMessage.append(context);
175  throw rabbitmq_handler_exception(errorMessage);
176  default:
177  errorMessage.assign("Unknown error in ");
178  errorMessage.append(context);
179  throw rabbitmq_handler_exception(errorMessage);
180  }
181  }
182 }
183 
184 } /* namespace rabbitmq */
185 } /* namespace drizzle_plugin */