pacemaker  2.0.1-9e909a5bdd
Scalable High-Availability cluster resource manager
cpg.c
Go to the documentation of this file.
1 /*
2  * Copyright 2004-2019 the Pacemaker project contributors
3  *
4  * The version control history for this file may have further details.
5  *
6  * This source code is licensed under the GNU Lesser General Public License
7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8  */
9 
10 #include <crm_internal.h>
11 #include <bzlib.h>
12 #include <sys/socket.h>
13 #include <netinet/in.h>
14 #include <arpa/inet.h>
15 #include <netdb.h>
16 
17 #include <crm/common/ipc.h>
18 #include <crm/cluster/internal.h>
19 #include <crm/common/mainloop.h>
20 #include <sys/utsname.h>
21 
22 #include <qb/qbipcc.h>
23 #include <qb/qbutil.h>
24 
25 #include <corosync/corodefs.h>
26 #include <corosync/corotypes.h>
27 #include <corosync/hdb.h>
28 #include <corosync/cpg.h>
29 
30 #include <crm/msg_xml.h>
31 
32 #include <crm/common/ipc_internal.h> /* PCMK__SPECIAL_PID* */
33 
34 cpg_handle_t pcmk_cpg_handle = 0; /* TODO: Remove, use cluster.cpg_handle */
35 
36 static bool cpg_evicted = FALSE;
37 gboolean(*pcmk_cpg_dispatch_fn) (int kind, const char *from, const char *data) = NULL;
38 
39 #define cs_repeat(counter, max, code) do { \
40  code; \
41  if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { \
42  counter++; \
43  crm_debug("Retrying operation after %ds", counter); \
44  sleep(counter); \
45  } else { \
46  break; \
47  } \
48  } while(counter < max)
49 
50 void
52 {
53  pcmk_cpg_handle = 0;
54  if (cluster->cpg_handle) {
55  crm_trace("Disconnecting CPG");
56  cpg_leave(cluster->cpg_handle, &cluster->group);
57  cpg_finalize(cluster->cpg_handle);
58  cluster->cpg_handle = 0;
59 
60  } else {
61  crm_info("No CPG connection");
62  }
63 }
64 
65 uint32_t get_local_nodeid(cpg_handle_t handle)
66 {
67  cs_error_t rc = CS_OK;
68  int retries = 0;
69  static uint32_t local_nodeid = 0;
70  cpg_handle_t local_handle = handle;
71  cpg_callbacks_t cb = { };
72  int fd = -1;
73  uid_t found_uid = 0;
74  gid_t found_gid = 0;
75  pid_t found_pid = 0;
76  int rv;
77 
78  if(local_nodeid != 0) {
79  return local_nodeid;
80  }
81 
82  if(handle == 0) {
83  crm_trace("Creating connection");
84  cs_repeat(retries, 5, rc = cpg_initialize(&local_handle, &cb));
85  if (rc != CS_OK) {
86  crm_err("Could not connect to the CPG API: %s (%d)",
87  cs_strerror(rc), rc);
88  return 0;
89  }
90 
91  rc = cpg_fd_get(local_handle, &fd);
92  if (rc != CS_OK) {
93  crm_err("Could not obtain the CPG API connection: %s (%d)",
94  cs_strerror(rc), rc);
95  goto bail;
96  }
97 
98  /* CPG provider run as root (in given user namespace, anyway)? */
99  if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
100  &found_uid, &found_gid))) {
101  crm_err("CPG provider is not authentic:"
102  " process %lld (uid: %lld, gid: %lld)",
103  (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
104  (long long) found_uid, (long long) found_gid);
105  goto bail;
106  } else if (rv < 0) {
107  crm_err("Could not verify authenticity of CPG provider: %s (%d)",
108  strerror(-rv), -rv);
109  goto bail;
110  }
111  }
112 
113  if (rc == CS_OK) {
114  retries = 0;
115  crm_trace("Performing lookup");
116  cs_repeat(retries, 5, rc = cpg_local_get(local_handle, &local_nodeid));
117  }
118 
119  if (rc != CS_OK) {
120  crm_err("Could not get local node id from the CPG API: %s (%d)", ais_error2text(rc), rc);
121  }
122 
123 bail:
124  if(handle == 0) {
125  crm_trace("Closing connection");
126  cpg_finalize(local_handle);
127  }
128  crm_debug("Local nodeid is %u", local_nodeid);
129  return local_nodeid;
130 }
131 
132 
135 
136 static ssize_t crm_cs_flush(gpointer data);
137 
138 static gboolean
139 crm_cs_flush_cb(gpointer data)
140 {
141  cs_message_timer = 0;
142  crm_cs_flush(data);
143  return FALSE;
144 }
145 
146 #define CS_SEND_MAX 200
147 static ssize_t
148 crm_cs_flush(gpointer data)
149 {
150  int sent = 0;
151  ssize_t rc = 0;
152  int queue_len = 0;
153  static unsigned int last_sent = 0;
154  cpg_handle_t *handle = (cpg_handle_t *)data;
155 
156  if (*handle == 0) {
157  crm_trace("Connection is dead");
158  return pcmk_ok;
159  }
160 
161  queue_len = g_list_length(cs_message_queue);
162  if ((queue_len % 1000) == 0 && queue_len > 1) {
163  crm_err("CPG queue has grown to %d", queue_len);
164 
165  } else if (queue_len == CS_SEND_MAX) {
166  crm_warn("CPG queue has grown to %d", queue_len);
167  }
168 
169  if (cs_message_timer) {
170  /* There is already a timer, wait until it goes off */
171  crm_trace("Timer active %d", cs_message_timer);
172  return pcmk_ok;
173  }
174 
175  while (cs_message_queue && sent < CS_SEND_MAX) {
176  struct iovec *iov = cs_message_queue->data;
177 
178  errno = 0;
179  rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
180 
181  if (rc != CS_OK) {
182  break;
183  }
184 
185  sent++;
186  last_sent++;
187  crm_trace("CPG message sent, size=%llu",
188  (unsigned long long) iov->iov_len);
189 
190  cs_message_queue = g_list_remove(cs_message_queue, iov);
191  free(iov->iov_base);
192  free(iov);
193  }
194 
195  queue_len -= sent;
196  if (sent > 1 || cs_message_queue) {
197  crm_info("Sent %d CPG messages (%d remaining, last=%u): %s (%lld)",
198  sent, queue_len, last_sent, ais_error2text(rc),
199  (long long) rc);
200  } else {
201  crm_trace("Sent %d CPG messages (%d remaining, last=%u): %s (%lld)",
202  sent, queue_len, last_sent, ais_error2text(rc),
203  (long long) rc);
204  }
205 
206  if (cs_message_queue) {
207  uint32_t delay_ms = 100;
208  if(rc != CS_OK) {
209  /* Proportionally more if sending failed but cap at 1s */
210  delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
211  }
212  cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
213  }
214 
215  return rc;
216 }
217 
218 gboolean
219 send_cpg_iov(struct iovec * iov)
220 {
221  static unsigned int queued = 0;
222 
223  queued++;
224  crm_trace("Queueing CPG message %u (%llu bytes)",
225  queued, (unsigned long long) iov->iov_len);
226  cs_message_queue = g_list_append(cs_message_queue, iov);
227  crm_cs_flush(&pcmk_cpg_handle);
228  return TRUE;
229 }
230 
231 static int
232 pcmk_cpg_dispatch(gpointer user_data)
233 {
234  int rc = 0;
235  crm_cluster_t *cluster = (crm_cluster_t*) user_data;
236 
237  rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
238  if (rc != CS_OK) {
239  crm_err("Connection to the CPG API failed: %s (%d)", ais_error2text(rc), rc);
240  cluster->cpg_handle = 0;
241  return -1;
242 
243  } else if(cpg_evicted) {
244  crm_err("Evicted from CPG membership");
245  return -1;
246  }
247  return 0;
248 }
249 
250 char *
251 pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content,
252  uint32_t *kind, const char **from)
253 {
254  char *data = NULL;
255  AIS_Message *msg = (AIS_Message *) content;
256 
257  if(handle) {
258  // Do filtering and field massaging
259  uint32_t local_nodeid = get_local_nodeid(handle);
260  const char *local_name = get_local_node_name();
261 
262  if (msg->sender.id > 0 && msg->sender.id != nodeid) {
263  crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id);
264  return NULL;
265 
266  } else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) {
267  /* Not for us */
268  crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid);
269  return NULL;
270  } else if (msg->host.size != 0 && safe_str_neq(msg->host.uname, local_name)) {
271  /* Not for us */
272  crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
273  return NULL;
274  }
275 
276  msg->sender.id = nodeid;
277  if (msg->sender.size == 0) {
278  crm_node_t *peer = crm_get_peer(nodeid, NULL);
279 
280  if (peer == NULL) {
281  crm_err("Peer with nodeid=%u is unknown", nodeid);
282 
283  } else if (peer->uname == NULL) {
284  crm_err("No uname for peer with nodeid=%u", nodeid);
285 
286  } else {
287  crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
288  msg->sender.size = strlen(peer->uname);
289  memset(msg->sender.uname, 0, MAX_NAME);
290  memcpy(msg->sender.uname, peer->uname, msg->sender.size);
291  }
292  }
293  }
294 
295  crm_trace("Got new%s message (size=%d, %d, %d)",
296  msg->is_compressed ? " compressed" : "",
297  ais_data_len(msg), msg->size, msg->compressed_size);
298 
299  if (kind != NULL) {
300  *kind = msg->header.id;
301  }
302  if (from != NULL) {
303  *from = msg->sender.uname;
304  }
305 
306  if (msg->is_compressed && msg->size > 0) {
307  int rc = BZ_OK;
308  char *uncompressed = NULL;
309  unsigned int new_size = msg->size + 1;
310 
311  if (check_message_sanity(msg, NULL) == FALSE) {
312  goto badmsg;
313  }
314 
315  crm_trace("Decompressing message data");
316  uncompressed = calloc(1, new_size);
317  rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
318 
319  if (rc != BZ_OK) {
320  crm_err("Decompression failed: %s " CRM_XS " bzerror=%d",
321  bz2_strerror(rc), rc);
322  free(uncompressed);
323  goto badmsg;
324  }
325 
326  CRM_ASSERT(rc == BZ_OK);
327  CRM_ASSERT(new_size == msg->size);
328 
329  data = uncompressed;
330 
331  } else if (check_message_sanity(msg, data) == FALSE) {
332  goto badmsg;
333 
334  } else if (safe_str_eq("identify", data)) {
335  char *pid_s = crm_getpid_s();
336 
337  send_cluster_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais);
338  free(pid_s);
339  return NULL;
340 
341  } else {
342  data = strdup(msg->data);
343  }
344 
345  // Is this necessary?
346  crm_get_peer(msg->sender.id, msg->sender.uname);
347 
348  crm_trace("Payload: %.200s", data);
349  return data;
350 
351  badmsg:
352  crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
353  " min=%d, total=%d, size=%d, bz2_size=%d",
354  msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
355  ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
356  msg->sender.pid, (int)sizeof(AIS_Message),
357  msg->header.size, msg->size, msg->compressed_size);
358 
359  free(data);
360  return NULL;
361 }
362 
363 #define PEER_NAME(peer) ((peer)? ((peer)->uname? (peer)->uname : "<unknown>") : "<none>")
364 
365 static int cmp_member_list_nodeid(const void *first,
366  const void *second)
367 {
368  const struct cpg_address *const a = *((const struct cpg_address **) first),
369  *const b = *((const struct cpg_address **) second);
370  if (a->nodeid < b->nodeid) {
371  return -1;
372  } else if (a->nodeid > b->nodeid) {
373  return 1;
374  }
375  /* don't bother with "reason" nor "pid" */
376  return 0;
377 }
378 
379 void
380 pcmk_cpg_membership(cpg_handle_t handle,
381  const struct cpg_name *groupName,
382  const struct cpg_address *member_list, size_t member_list_entries,
383  const struct cpg_address *left_list, size_t left_list_entries,
384  const struct cpg_address *joined_list, size_t joined_list_entries)
385 {
386  int i;
387  gboolean found = FALSE;
388  static int counter = 0;
389  uint32_t local_nodeid = get_local_nodeid(handle);
390  const struct cpg_address *key, **rival, **sorted;
391 
392  sorted = malloc(member_list_entries * sizeof(const struct cpg_address *));
393  CRM_ASSERT(sorted != NULL);
394 
395  for (size_t iter = 0; iter < member_list_entries; iter++) {
396  sorted[iter] = member_list + iter;
397  }
398  /* so that the cross-matching multiply-subscribed nodes is then cheap */
399  qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
400  cmp_member_list_nodeid);
401 
402  for (i = 0; i < left_list_entries; i++) {
403  crm_node_t *peer = crm_find_peer(left_list[i].nodeid, NULL);
404 
405  crm_info("Group event %s.%d: node %u (%s) left: %llu",
406  groupName->value, counter, left_list[i].nodeid,
407  PEER_NAME(peer), (unsigned long long) left_list[i].pid);
408 
409  /* in CPG world, NODE:PROCESS-IN-MEMBERSHIP-OF-G is an 1:N relation
410  and not playing by this rule may go wild in case of multiple
411  residual instances of the same pacemaker daemon at the same node
412  -- we must ensure that the possible local rival(s) won't make us
413  cry out and bail (e.g. when they quit themselves), since all the
414  surrounding logic denies this simple fact that the full membership
415  is discriminated also per the PID of the process beside mere node
416  ID (and implicitly, group ID); practically, this will be sound in
417  terms of not preventing progress, since all the CPG joiners are
418  also API end-point carriers, and that's what matters locally
419  (who's the winner);
420  remotely, we will just compare leave_list and member_list and if
421  the left process has it's node retained in member_list (under some
422  other PID, anyway) we will just ignore it as well
423  XXX: long-term fix is to establish in-out PID-aware tracking? */
424  if (peer) {
425  key = &left_list[i];
426  rival = bsearch(&key, sorted, member_list_entries,
427  sizeof(const struct cpg_address *),
428  cmp_member_list_nodeid);
429  if (rival == NULL) {
430  crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg,
431  OFFLINESTATUS);
432  } else if (left_list[i].nodeid == local_nodeid) {
433  crm_info("Ignoring the above event %s.%d, comes from a local"
434  " rival process (presumably not us): %llu",
435  groupName->value, counter,
436  (unsigned long long) left_list[i].pid);
437  } else {
438  crm_info("Ignoring the above event %s.%d, comes from"
439  " a rival-rich node: %llu (e.g. %llu process"
440  " carries on)",
441  groupName->value, counter,
442  (unsigned long long) left_list[i].pid,
443  (unsigned long long) (*rival)->pid);
444  }
445  }
446  }
447  free(sorted);
448  sorted = NULL;
449 
450  for (i = 0; i < joined_list_entries; i++) {
451  crm_info("Group event %s.%d: node %u joined: %llu"
452  " (unchecked for rivals)",
453  groupName->value, counter, joined_list[i].nodeid,
454  (unsigned long long) joined_list[i].pid);
455  }
456 
457  for (i = 0; i < member_list_entries; i++) {
458  crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
459 
460  crm_info("Group event %s.%d: node %u (%s) is member: %llu"
461  " (at least once)",
462  groupName->value, counter, member_list[i].nodeid,
463  PEER_NAME(peer), member_list[i].pid);
464 
465  if (member_list[i].nodeid == local_nodeid
466  && member_list[i].pid != getpid()) {
467  /* see the note above */
468  crm_info("Ignoring the above event %s.%d, comes from a local rival"
469  " process: %llu", groupName->value, counter,
470  (unsigned long long) member_list[i].pid);
471  continue;
472  }
473 
474  /* If the caller left auto-reaping enabled, this will also update the
475  * state to member.
476  */
477  peer = crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
478 
479  if (peer && peer->state && strcmp(peer->state, CRM_NODE_MEMBER)) {
480  /* The node is a CPG member, but we currently think it's not a
481  * cluster member. This is possible only if auto-reaping was
482  * disabled. The node may be joining, and we happened to get the CPG
483  * notification before the quorum notification; or the node may have
484  * just died, and we are processing its final messages; or a bug
485  * has affected the peer cache.
486  */
487  time_t now = time(NULL);
488 
489  if (peer->when_lost == 0) {
490  // Track when we first got into this contradictory state
491  peer->when_lost = now;
492 
493  } else if (now > (peer->when_lost + 60)) {
494  // If it persists for more than a minute, update the state
495  crm_warn("Node %u member of group %s but believed offline"
496  " (unchecked for rivals)",
497  member_list[i].nodeid, groupName->value);
498  crm_update_peer_state(__FUNCTION__, peer, CRM_NODE_MEMBER, 0);
499  }
500  }
501 
502  if (local_nodeid == member_list[i].nodeid) {
503  found = TRUE;
504  }
505  }
506 
507  if (!found) {
508  crm_err("We're not part of CPG group '%s' anymore!", groupName->value);
509  cpg_evicted = TRUE;
510  }
511 
512  counter++;
513 }
514 
515 gboolean
517 {
518  cs_error_t rc;
519  int fd = -1;
520  int retries = 0;
521  uint32_t id = 0;
522  crm_node_t *peer = NULL;
523  cpg_handle_t handle = 0;
524  const char *message_name = pcmk_message_name(crm_system_name);
525  uid_t found_uid = 0;
526  gid_t found_gid = 0;
527  pid_t found_pid = 0;
528  int rv;
529 
530  struct mainloop_fd_callbacks cpg_fd_callbacks = {
531  .dispatch = pcmk_cpg_dispatch,
532  .destroy = cluster->destroy,
533  };
534 
535  cpg_callbacks_t cpg_callbacks = {
536  .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
537  .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
538  /* .cpg_deliver_fn = pcmk_cpg_deliver, */
539  /* .cpg_confchg_fn = pcmk_cpg_membership, */
540  };
541 
542  cpg_evicted = FALSE;
543  cluster->group.length = 0;
544  cluster->group.value[0] = 0;
545 
546  /* group.value is char[128] */
547  strncpy(cluster->group.value, message_name, 127);
548  cluster->group.value[127] = 0;
549  cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
550 
551  cs_repeat(retries, 30, rc = cpg_initialize(&handle, &cpg_callbacks));
552  if (rc != CS_OK) {
553  crm_err("Could not connect to the CPG API: %s (%d)",
554  cs_strerror(rc), rc);
555  goto bail;
556  }
557 
558  rc = cpg_fd_get(handle, &fd);
559  if (rc != CS_OK) {
560  crm_err("Could not obtain the CPG API connection: %s (%d)",
561  cs_strerror(rc), rc);
562  goto bail;
563  }
564 
565  /* CPG provider run as root (in given user namespace, anyway)? */
566  if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
567  &found_uid, &found_gid))) {
568  crm_err("CPG provider is not authentic:"
569  " process %lld (uid: %lld, gid: %lld)",
570  (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
571  (long long) found_uid, (long long) found_gid);
572  rc = CS_ERR_ACCESS;
573  goto bail;
574  } else if (rv < 0) {
575  crm_err("Could not verify authenticity of CPG provider: %s (%d)",
576  strerror(-rv), -rv);
577  rc = CS_ERR_ACCESS;
578  goto bail;
579  }
580 
581  id = get_local_nodeid(handle);
582  if (id == 0) {
583  crm_err("Could not get local node id from the CPG API");
584  goto bail;
585 
586  }
587  cluster->nodeid = id;
588 
589  retries = 0;
590  cs_repeat(retries, 30, rc = cpg_join(handle, &cluster->group));
591  if (rc != CS_OK) {
592  crm_err("Could not join the CPG group '%s': %d", message_name, rc);
593  goto bail;
594  }
595 
596  pcmk_cpg_handle = handle;
597  cluster->cpg_handle = handle;
598  mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
599 
600  bail:
601  if (rc != CS_OK) {
602  cpg_finalize(handle);
603  return FALSE;
604  }
605 
606  peer = crm_get_peer(id, NULL);
607  crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
608  return TRUE;
609 }
610 
611 gboolean
612 send_cluster_message_cs(xmlNode * msg, gboolean local, crm_node_t * node, enum crm_ais_msg_types dest)
613 {
614  gboolean rc = TRUE;
615  char *data = NULL;
616 
617  data = dump_xml_unformatted(msg);
618  rc = send_cluster_text(crm_class_cluster, data, local, node, dest);
619  free(data);
620  return rc;
621 }
622 
623 gboolean
624 send_cluster_text(enum crm_ais_msg_class msg_class, const char *data,
625  gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
626 {
627  static int msg_id = 0;
628  static int local_pid = 0;
629  static int local_name_len = 0;
630  static const char *local_name = NULL;
631 
632  char *target = NULL;
633  struct iovec *iov;
634  AIS_Message *msg = NULL;
636 
637  switch (msg_class) {
638  case crm_class_cluster:
639  break;
640  default:
641  crm_err("Invalid message class: %d", msg_class);
642  return FALSE;
643  }
644 
645  CRM_CHECK(dest != crm_msg_ais, return FALSE);
646 
647  if(local_name == NULL) {
648  local_name = get_local_node_name();
649  }
650  if(local_name_len == 0 && local_name) {
651  local_name_len = strlen(local_name);
652  }
653 
654  if (data == NULL) {
655  data = "";
656  }
657 
658  if (local_pid == 0) {
659  local_pid = getpid();
660  }
661 
662  if (sender == crm_msg_none) {
663  sender = local_pid;
664  }
665 
666  msg = calloc(1, sizeof(AIS_Message));
667 
668  msg_id++;
669  msg->id = msg_id;
670  msg->header.id = msg_class;
671  msg->header.error = CS_OK;
672 
673  msg->host.type = dest;
674  msg->host.local = local;
675 
676  if (node) {
677  if (node->uname) {
678  target = strdup(node->uname);
679  msg->host.size = strlen(node->uname);
680  memset(msg->host.uname, 0, MAX_NAME);
681  memcpy(msg->host.uname, node->uname, msg->host.size);
682  } else {
683  target = crm_strdup_printf("%u", node->id);
684  }
685  msg->host.id = node->id;
686  } else {
687  target = strdup("all");
688  }
689 
690  msg->sender.id = 0;
691  msg->sender.type = sender;
692  msg->sender.pid = local_pid;
693  msg->sender.size = local_name_len;
694  memset(msg->sender.uname, 0, MAX_NAME);
695  if(local_name && msg->sender.size) {
696  memcpy(msg->sender.uname, local_name, msg->sender.size);
697  }
698 
699  msg->size = 1 + strlen(data);
700  msg->header.size = sizeof(AIS_Message) + msg->size;
701 
702  if (msg->size < CRM_BZ2_THRESHOLD) {
703  msg = realloc_safe(msg, msg->header.size);
704  memcpy(msg->data, data, msg->size);
705 
706  } else {
707  char *compressed = NULL;
708  unsigned int new_size = 0;
709  char *uncompressed = strdup(data);
710 
711  if (crm_compress_string(uncompressed, msg->size, 0, &compressed, &new_size)) {
712 
713  msg->header.size = sizeof(AIS_Message) + new_size;
714  msg = realloc_safe(msg, msg->header.size);
715  memcpy(msg->data, compressed, new_size);
716 
717  msg->is_compressed = TRUE;
718  msg->compressed_size = new_size;
719 
720  } else {
721  msg = realloc_safe(msg, msg->header.size);
722  memcpy(msg->data, data, msg->size);
723  }
724 
725  free(uncompressed);
726  free(compressed);
727  }
728 
729  iov = calloc(1, sizeof(struct iovec));
730  iov->iov_base = msg;
731  iov->iov_len = msg->header.size;
732 
733  if (msg->compressed_size) {
734  crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s",
735  msg->id, target, (unsigned long long) iov->iov_len,
736  msg->compressed_size, data);
737  } else {
738  crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s",
739  msg->id, target, (unsigned long long) iov->iov_len,
740  msg->size, data);
741  }
742  free(target);
743 
744  send_cpg_iov(iov);
745 
746  return TRUE;
747 }
748 
750 text2msg_type(const char *text)
751 {
752  int type = crm_msg_none;
753 
754  CRM_CHECK(text != NULL, return type);
755  text = pcmk_message_name(text);
756  if (safe_str_eq(text, "ais")) {
757  type = crm_msg_ais;
758  } else if (safe_str_eq(text, CRM_SYSTEM_CIB)) {
759  type = crm_msg_cib;
760  } else if (safe_str_eq(text, CRM_SYSTEM_CRMD)
761  || safe_str_eq(text, CRM_SYSTEM_DC)) {
762  type = crm_msg_crmd;
763  } else if (safe_str_eq(text, CRM_SYSTEM_TENGINE)) {
764  type = crm_msg_te;
765  } else if (safe_str_eq(text, CRM_SYSTEM_PENGINE)) {
766  type = crm_msg_pe;
767  } else if (safe_str_eq(text, CRM_SYSTEM_LRMD)) {
768  type = crm_msg_lrmd;
769  } else if (safe_str_eq(text, CRM_SYSTEM_STONITHD)) {
770  type = crm_msg_stonithd;
771  } else if (safe_str_eq(text, "stonith-ng")) {
772  type = crm_msg_stonith_ng;
773  } else if (safe_str_eq(text, "attrd")) {
774  type = crm_msg_attrd;
775 
776  } else {
777  /* This will normally be a transient client rather than
778  * a cluster daemon. Set the type to the pid of the client
779  */
780  int scan_rc = sscanf(text, "%d", &type);
781 
782  if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
783  /* Ensure it's sane */
784  type = crm_msg_none;
785  }
786  }
787  return type;
788 }
enum crm_ais_msg_types type
Definition: internal.h:20
#define CRM_CHECK(expr, failure_action)
Definition: logging.h:165
char data[0]
Definition: internal.h:37
gboolean send_cpg_iov(struct iovec *iov)
Definition: cpg.c:219
#define crm_notice(fmt, args...)
Definition: logging.h:251
gboolean is_compressed
Definition: internal.h:29
const char * bz2_strerror(int rc)
Definition: results.c:425
uint32_t size
Definition: internal.h:34
gboolean safe_str_neq(const char *a, const char *b)
Definition: strings.c:141
crm_ais_msg_types
Definition: cluster.h:94
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
Definition: mainloop.c:804
uint32_t nodeid
Definition: cluster.h:74
uint32_t id
Definition: cluster.h:60
const char * get_local_node_name(void)
Definition: cluster.c:118
void(* destroy)(gpointer)
Definition: cluster.h:76
uint32_t id
Definition: internal.h:17
#define PCMK__SPECIAL_PID_AS_0(p)
Definition: ipc_internal.h:34
crm_node_t * crm_get_peer(unsigned int id, const char *uname)
Definition: membership.c:653
char * crm_system_name
Definition: utils.c:59
#define CS_SEND_MAX
Definition: cpg.c:146
uint32_t pid
Definition: internal.h:81
char * strerror(int errnum)
AIS_Host sender
Definition: internal.h:85
gboolean send_cluster_text(enum crm_ais_msg_class msg_class, const char *data, gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
Definition: cpg.c:624
char * pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content, uint32_t *kind, const char **from)
Definition: cpg.c:251
Wrappers for and extensions to glib mainloop.
#define CRM_SYSTEM_DC
Definition: crm.h:74
uint32_t id
Definition: internal.h:80
void cluster_disconnect_cpg(crm_cluster_t *cluster)
Definition: cpg.c:51
int(* dispatch)(gpointer userdata)
Definition: mainloop.h:84
int cs_message_timer
Definition: cpg.c:134
#define crm_warn(fmt, args...)
Definition: logging.h:250
#define crm_debug(fmt, args...)
Definition: logging.h:254
const char * pcmk_message_name(const char *name)
Get name to be used as identifier for cluster messages.
Definition: utils.c:1097
time_t when_lost
Definition: cluster.h:61
GListPtr cs_message_queue
Definition: cpg.c:133
#define crm_trace(fmt, args...)
Definition: logging.h:255
gboolean local
Definition: internal.h:19
crm_node_t * crm_update_peer_proc(const char *source, crm_node_t *peer, uint32_t flag, const char *status)
Definition: membership.c:785
#define CRM_SYSTEM_PENGINE
Definition: crm.h:80
AIS_Host sender
Definition: internal.h:32
uint32_t id
Definition: internal.h:28
gboolean check_message_sanity(const AIS_Message *msg, const char *data)
Definition: corosync.c:421
struct crm_ais_msg_s AIS_Message
Definition: internal.h:14
cpg_handle_t pcmk_cpg_handle
Definition: cpg.c:34
#define ais_data_len(msg)
Definition: internal.h:123
uint32_t size
Definition: internal.h:21
#define CRM_NODE_MEMBER
Definition: cluster.h:29
#define PEER_NAME(peer)
Definition: cpg.c:363
uint32_t compressed_size
Definition: internal.h:35
#define MAX_NAME
Definition: crm.h:36
#define CRM_SYSTEM_CRMD
Definition: crm.h:78
crm_ais_msg_class
Definition: cluster.h:90
#define CRM_XS
Definition: logging.h:43
bool crm_compress_string(const char *data, int length, int max, char **result, unsigned int *result_len)
Definition: strings.c:411
#define CRM_SYSTEM_STONITHD
Definition: crm.h:82
crm_node_t * crm_update_peer_state(const char *source, crm_node_t *node, const char *state, int membership)
Update a node&#39;s state and membership information.
Definition: membership.c:965
#define CRM_SYSTEM_CIB
Definition: crm.h:77
#define CRM_SYSTEM_TENGINE
Definition: crm.h:81
uint32_t get_local_nodeid(cpg_handle_t handle)
Definition: cpg.c:65
gboolean(* pcmk_cpg_dispatch_fn)(int kind, const char *from, const char *data)
Definition: cpg.c:37
#define crm_err(fmt, args...)
Definition: logging.h:249
#define G_PRIORITY_MEDIUM
Definition: mainloop.h:121
#define CRM_ASSERT(expr)
Definition: results.h:20
char uname[MAX_NAME]
Definition: internal.h:22
#define OFFLINESTATUS
Definition: util.h:35
enum crm_ais_msg_types text2msg_type(const char *text)
Definition: cpg.c:750
#define CRM_BZ2_THRESHOLD
Definition: xml.h:45
char * dump_xml_unformatted(xmlNode *msg)
Definition: xml.c:3164
#define CRM_SYSTEM_LRMD
Definition: crm.h:79
gboolean send_cluster_message_cs(xmlNode *msg, gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
Definition: cpg.c:612
#define uint32_t
Definition: stdint.in.h:158
char data[0]
Definition: internal.h:90
char * state
Definition: cluster.h:54
#define pcmk_ok
Definition: results.h:35
Wrappers for and extensions to libqb IPC.
uint32_t pid
Definition: internal.h:18
char * uname
Definition: cluster.h:52
#define cs_repeat(counter, max, code)
Definition: cpg.c:39
AIS_Host host
Definition: internal.h:31
#define safe_str_eq(a, b)
Definition: util.h:54
#define ONLINESTATUS
Definition: util.h:34
crm_node_t * crm_find_peer(unsigned int id, const char *uname)
Definition: membership.c:522
void pcmk_cpg_membership(cpg_handle_t handle, const struct cpg_name *groupName, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
Definition: cpg.c:380
char * crm_strdup_printf(char const *format,...) __attribute__((__format__(__printf__
GList * GListPtr
Definition: crm.h:190
#define crm_info(fmt, args...)
Definition: logging.h:252
gboolean cluster_connect_cpg(crm_cluster_t *cluster)
Definition: cpg.c:516
enum crm_ais_msg_types type
Definition: internal.h:83
int crm_ipc_is_authentic_process(int sock, uid_t refuid, gid_t refgid, pid_t *gotpid, uid_t *gotuid, gid_t *gotgid)
Check the authenticity of the IPC socket peer process.
Definition: ipc.c:1452
gboolean local
Definition: internal.h:82