12 #if defined(US_AUTH_PEERCRED_UCRED) || defined(US_AUTH_PEERCRED_SOCKPEERCRED)
13 # ifdef US_AUTH_PEERCRED_UCRED
18 # include <sys/socket.h>
19 #elif defined(US_AUTH_GETPEERUCRED)
23 #include <sys/param.h>
26 #include <sys/types.h>
42 #define PCMK_IPC_VERSION 1
45 #define PCMK_IPC_DEFAULT_QUEUE_MAX 500
47 struct crm_ipc_response_header {
48 struct qb_ipc_response_header qb;
49 uint32_t size_uncompressed;
50 uint32_t size_compressed;
55 static int hdr_offset = 0;
56 static unsigned int ipc_buffer_max = 0;
57 static unsigned int pick_ipc_buffer(
unsigned int max);
62 if (hdr_offset == 0) {
63 hdr_offset =
sizeof(
struct crm_ipc_response_header);
65 if (ipc_buffer_max == 0) {
66 ipc_buffer_max = pick_ipc_buffer(0);
73 return pick_ipc_buffer(0);
77 generateReference(
const char *custom1,
const char *custom2)
79 static uint ref_counter = 0;
82 (custom1? custom1 :
"_empty_"),
83 (custom2? custom2 :
"_empty_"),
84 (
long long) time(NULL), ref_counter++);
89 const char *host_to,
const char *sys_to,
90 const char *sys_from,
const char *uuid_from,
const char *origin)
92 char *true_from = NULL;
93 xmlNode *request = NULL;
94 char *reference = generateReference(task, sys_from);
96 if (uuid_from != NULL) {
98 }
else if (sys_from != NULL) {
99 true_from = strdup(sys_from);
101 crm_err(
"No sys from specified");
116 if (host_to != NULL && strlen(host_to) > 0) {
120 if (msg_data != NULL) {
133 create_reply_adv(xmlNode * original_request, xmlNode * xml_response_data,
const char *origin)
135 xmlNode *reply = NULL;
145 crm_err(
"Cannot create new_message, no message type in original message");
150 crm_err(
"Cannot create new_message, original message was not a request");
156 crm_err(
"Cannot create new_message, malloc failed");
172 if (host_from != NULL && strlen(host_from) > 0) {
176 if (xml_response_data != NULL) {
209 while (g_hash_table_iter_next(&iter, &key, (gpointer *) & client)) {
210 if (strcmp(client->id,
id) == 0) {
216 crm_trace(
"No client found with id=%s",
id);
225 }
else if (c->
name == NULL && c->
id == NULL) {
227 }
else if (c->
name == NULL) {
242 #ifdef HAVE_GNUTLS_GNUTLS_H
267 crm_err(
"Exiting with %d active connections", active);
276 qb_ipcs_connection_t *c = NULL;
278 if (service == NULL) {
282 c = qb_ipcs_connection_first_get(service);
285 qb_ipcs_connection_t *last = c;
287 c = qb_ipcs_connection_next_get(service, last);
291 qb_ipcs_disconnect(last);
292 qb_ipcs_connection_unref(last);
307 client_from_connection(qb_ipcs_connection_t *c,
void *key, uid_t uid_client)
311 if (client == NULL) {
319 if (client->
user == NULL) {
320 client->
user = strdup(
"#unprivileged");
322 crm_err(
"Unable to enforce ACLs for user ID %d, assuming unprivileged",
335 if (client->
id == NULL) {
336 crm_err(
"Could not generate UUID for client");
358 crm_client_t *client = client_from_connection(NULL, key, 0);
367 static gid_t uid_cluster = 0;
368 static gid_t gid_cluster = 0;
374 if (uid_cluster == 0) {
376 static bool need_log = TRUE;
379 crm_warn(
"Could not find user and group IDs for user %s",
386 if (uid_client != 0) {
387 crm_trace(
"Giving access to group %u", gid_cluster);
389 qb_ipcs_connection_auth_set(c, -1, gid_cluster, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
395 client = client_from_connection(c, NULL, uid_client);
396 if (client == NULL) {
400 if ((uid_client == 0) || (uid_client == uid_cluster)) {
405 crm_debug(
"Connecting %p for uid=%d gid=%d pid=%u id=%s", c, uid_client, gid_client, client->
pid, client->
id);
410 static struct iovec *
411 pcmk__new_ipc_event()
413 struct iovec *iov = calloc(2,
sizeof(
struct iovec));
428 free(event[0].iov_base);
429 free(event[1].iov_base);
435 free_event(gpointer
data)
458 crm_trace(
"Destroying %p/%p (%d remaining)",
463 crm_trace(
"Destroying remote connection %p (%d remaining)",
505 if ((errno == 0) && (qmax_int > 0)) {
516 struct qb_ipcs_connection_stats stats;
518 stats.client_pid = 0;
519 qb_ipcs_connection_stats_get(c, &stats, 0);
520 return stats.client_pid;
527 char *uncompressed = NULL;
528 char *text = ((
char *)
data) +
sizeof(
struct crm_ipc_response_header);
529 struct crm_ipc_response_header *header =
data;
532 *
id = ((
struct qb_ipc_response_header *)
data)->id;
535 *
flags = header->flags;
547 crm_err(
"Filtering incompatible v%d IPC message, we only support versions <= %d",
552 if (header->size_compressed) {
554 unsigned int size_u = 1 + header->size_uncompressed;
555 uncompressed = calloc(1, size_u);
557 crm_trace(
"Decompressing message data %u bytes into %u bytes",
558 header->size_compressed, size_u);
560 rc = BZ2_bzBuffToBuffDecompress(uncompressed, &size_u, text, header->size_compressed, 1, 0);
571 CRM_ASSERT(text[header->size_uncompressed - 1] == 0);
583 crm_ipcs_flush_events_cb(gpointer
data)
600 delay_next_flush(
crm_client_t *c,
unsigned int queue_len)
603 guint delay = (queue_len < 5)? (1000 + 100 * queue_len) : 1500;
605 c->
event_timer = g_timeout_add(delay, crm_ipcs_flush_events_cb, c);
612 unsigned int sent = 0;
613 unsigned int queue_len = 0;
628 struct crm_ipc_response_header *header = NULL;
629 struct iovec *
event = NULL;
639 rc = qb_ipcs_event_sendv(c->
ipcs, event, 2);
646 header =
event[0].iov_base;
647 if (header->size_compressed) {
648 crm_trace(
"Event %d to %p[%d] (%lld compressed bytes) sent",
649 header->qb.id, c->
ipcs, c->
pid, (
long long) rc);
651 crm_trace(
"Event %d to %p[%d] (%lld bytes) sent: %.120s",
652 header->qb.id, c->
ipcs, c->
pid, (
long long) rc,
653 (
char *) (event[1].iov_base));
659 if (sent > 0 || queue_len) {
660 crm_trace(
"Sent %d events (%d remaining) for %p[%d]: %s (%lld)",
661 sent, queue_len, c->
ipcs, c->
pid,
672 if ((c->
queue_backlog <= 1) || (queue_len < c->queue_backlog)) {
674 crm_warn(
"Client with process ID %u has a backlog of %u messages "
677 crm_err(
"Evicting client with process ID %u due to backlog of %u messages "
680 qb_ipcs_disconnect(c->
ipcs);
686 delay_next_flush(c, queue_len);
697 crm_ipc_prepare(uint32_t request, xmlNode * message,
struct iovec ** result, uint32_t max_send_size)
699 static unsigned int biggest = 0;
701 unsigned int total = 0;
702 char *compressed = NULL;
704 struct crm_ipc_response_header *header = calloc(1,
sizeof(
struct crm_ipc_response_header));
710 if (max_send_size == 0) {
711 max_send_size = ipc_buffer_max;
717 iov = pcmk__new_ipc_event();
718 iov[0].iov_len = hdr_offset;
719 iov[0].iov_base = header;
722 header->size_uncompressed = 1 + strlen(buffer);
723 total = iov[0].iov_len + header->size_uncompressed;
725 if (total < max_send_size) {
726 iov[1].iov_base = buffer;
727 iov[1].iov_len = header->size_uncompressed;
730 unsigned int new_size = 0;
733 (buffer, header->size_uncompressed, max_send_size, &compressed, &new_size)) {
736 header->size_compressed = new_size;
738 iov[1].iov_len = header->size_compressed;
739 iov[1].iov_base = compressed;
743 biggest = QB_MAX(header->size_compressed, biggest);
746 ssize_t rc = -EMSGSIZE;
749 biggest = QB_MAX(header->size_uncompressed, biggest);
752 (
"Could not compress the message (%u bytes) into less than the configured ipc limit (%u bytes). "
753 "Set PCMK_ipc_buffer to a higher value (%u bytes suggested)",
754 header->size_uncompressed, max_send_size, 4 * biggest);
763 header->qb.size = iov[0].iov_len + iov[1].iov_len;
764 header->qb.id = (int32_t)request;
768 return header->qb.size;
775 static uint32_t
id = 1;
776 struct crm_ipc_response_header *header = iov[0].iov_base;
788 header->flags |=
flags;
790 header->qb.id =
id++;
797 struct iovec *iov_copy = pcmk__new_ipc_event();
800 iov_copy[0].iov_len = iov[0].iov_len;
801 iov_copy[0].iov_base = malloc(iov[0].iov_len);
802 memcpy(iov_copy[0].iov_base, iov[0].iov_base, iov[0].iov_len);
804 iov_copy[1].iov_len = iov[1].iov_len;
805 iov_copy[1].iov_base = malloc(iov[1].iov_len);
806 memcpy(iov_copy[1].iov_base, iov[1].iov_base, iov[1].iov_len);
808 add_event(c, iov_copy);
814 rc = qb_ipcs_response_sendv(c->
ipcs, iov, 2);
815 if (rc < header->qb.size) {
816 crm_notice(
"Response %d to pid %d failed: %s "
817 CRM_XS " bytes=%u rc=%lld ipcs=%p",
819 header->qb.size, (
long long) rc, c->
ipcs);
822 crm_trace(
"Response %d sent, %lld bytes to %p[%d]",
823 header->qb.id, (
long long) rc, c->
ipcs, c->
pid);
837 if (rc == -EPIPE || rc == -ENOTCONN) {
848 struct iovec *iov = NULL;
852 return -EDESTADDRREQ;
885 #define MIN_MSG_SIZE 12336
886 #define MAX_MSG_SIZE 128*1024
892 unsigned int max_buf_size;
894 unsigned int buf_size;
900 qb_ipcc_connection_t *ipc;
905 pick_ipc_buffer(
unsigned int max)
907 static unsigned int global_max = 0;
909 if (global_max == 0) {
910 const char *env = getenv(
"PCMK_ipc_buffer");
922 return QB_MAX(max, global_max);
932 client->name = strdup(name);
933 client->buf_size = pick_ipc_buffer(max_size);
934 client->buffer = malloc(client->buf_size);
937 client->max_buf_size = client->buf_size;
940 client->pfd.events = POLLIN;
941 client->pfd.revents = 0;
958 static uid_t cl_uid = 0;
959 static gid_t cl_gid = 0;
960 pid_t found_pid = 0; uid_t found_uid = 0; gid_t found_gid = 0;
963 client->need_reply = FALSE;
964 client->ipc = qb_ipcc_connect(client->name, client->buf_size);
966 if (client->ipc == NULL) {
972 if (client->pfd.fd < 0) {
980 if (!cl_uid && !cl_gid
990 &found_pid, &found_uid,
992 crm_err(
"Daemon (IPC %s) is not authentic:"
993 " process %lld (uid: %lld, gid: %lld)",
995 (
long long) found_uid, (
long long) found_gid);
997 errno = ECONNABORTED;
1000 }
else if (rv < 0) {
1002 crm_perror(LOG_ERR,
"Could not verify authenticity of daemon (IPC %s)",
1009 qb_ipcc_context_set(client->ipc, client);
1011 #ifdef HAVE_IPCS_GET_BUFFER_SIZE
1012 client->max_buf_size = qb_ipcc_get_buffer_size(client->ipc);
1013 if (client->max_buf_size > client->buf_size) {
1014 free(client->buffer);
1015 client->buffer = calloc(1, client->max_buf_size);
1016 client->buf_size = client->max_buf_size;
1027 crm_trace(
"Disconnecting %s IPC connection %p (%p)", client->name, client, client->ipc);
1030 qb_ipcc_connection_t *ipc = client->ipc;
1033 qb_ipcc_disconnect(ipc);
1042 if (client->ipc && qb_ipcc_is_connected(client->ipc)) {
1043 crm_notice(
"Destroying an active IPC connection to %s", client->name);
1054 crm_trace(
"Destroying IPC connection to %s: %p", client->name, client);
1055 free(client->buffer);
1066 if (client && client->ipc && (qb_ipcc_fd_get(client->ipc, &fd) == 0)) {
1070 crm_perror(LOG_ERR,
"Could not obtain file IPC descriptor for %s",
1071 (client? client->name :
"unspecified client"));
1080 if (client == NULL) {
1084 }
else if (client->ipc == NULL) {
1088 }
else if (client->pfd.fd < 0) {
1093 rc = qb_ipcc_is_connected(client->ipc);
1095 client->pfd.fd = -EINVAL;
1118 client->pfd.revents = 0;
1119 rc = poll(&(client->pfd), 1, 0);
1120 return (rc < 0)? -errno : rc;
1126 struct crm_ipc_response_header *header = (
struct crm_ipc_response_header *)(
void*)client->buffer;
1128 if (header->size_compressed) {
1130 unsigned int size_u = 1 + header->size_uncompressed;
1132 unsigned int new_buf_size = QB_MAX((hdr_offset + size_u), client->max_buf_size);
1133 char *uncompressed = calloc(1, new_buf_size);
1135 crm_trace(
"Decompressing message data %u bytes into %u bytes",
1136 header->size_compressed, size_u);
1138 rc = BZ2_bzBuffToBuffDecompress(uncompressed + hdr_offset, &size_u,
1139 client->buffer + hdr_offset, header->size_compressed, 1, 0);
1156 CRM_ASSERT(size_u == header->size_uncompressed);
1158 memcpy(uncompressed, client->buffer, hdr_offset);
1159 header = (
struct crm_ipc_response_header *)(
void*)uncompressed;
1161 free(client->buffer);
1162 client->buf_size = new_buf_size;
1163 client->buffer = uncompressed;
1166 CRM_ASSERT(client->buffer[hdr_offset + header->size_uncompressed - 1] == 0);
1173 struct crm_ipc_response_header *header = NULL;
1181 client->buffer[0] = 0;
1182 client->msg_size = qb_ipcc_event_recv(client->ipc, client->buffer,
1183 client->buf_size, 0);
1184 if (client->msg_size >= 0) {
1185 int rc = crm_ipc_decompress(client);
1191 header = (
struct crm_ipc_response_header *)(
void*)client->buffer;
1193 crm_err(
"Filtering incompatible v%d IPC message, we only support versions <= %d",
1198 crm_trace(
"Received %s event %d, size=%u, rc=%d, text: %.100s",
1199 client->name, header->qb.id, header->qb.size, client->msg_size,
1200 client->buffer + hdr_offset);
1207 crm_err(
"Connection to %s failed", client->name);
1212 return header->size_uncompressed;
1221 return client->buffer +
sizeof(
struct crm_ipc_response_header);
1227 struct crm_ipc_response_header *header = NULL;
1230 if (client->buffer == NULL) {
1234 header = (
struct crm_ipc_response_header *)(
void*)client->buffer;
1235 return header->flags;
1242 return client->name;
1246 internal_ipc_send_recv(
crm_ipc_t * client,
const void *iov)
1251 rc = qb_ipcc_sendv_recv(client->ipc, iov, 2, client->buffer, client->buf_size, -1);
1258 internal_ipc_send_request(
crm_ipc_t * client,
const void *iov,
int ms_timeout)
1261 time_t timeout = time(NULL) + 1 + (ms_timeout / 1000);
1264 rc = qb_ipcc_sendv(client->ipc, iov, 2);
1271 internal_ipc_get_reply(
crm_ipc_t * client,
int request_id,
int ms_timeout)
1273 time_t timeout = time(NULL) + 1 + (ms_timeout / 1000);
1279 crm_trace(
"client %s waiting on reply to msg id %d", client->name, request_id);
1282 rc = qb_ipcc_recv(client->ipc, client->buffer, client->buf_size, 1000);
1284 struct crm_ipc_response_header *hdr = NULL;
1286 int rc = crm_ipc_decompress(client);
1292 hdr = (
struct crm_ipc_response_header *)(
void*)client->buffer;
1293 if (hdr->qb.id == request_id) {
1296 }
else if (hdr->qb.id < request_id) {
1299 crm_err(
"Discarding old reply %d (need %d)", hdr->qb.id, request_id);
1305 crm_err(
"Discarding newer reply %d (need %d)", hdr->qb.id, request_id);
1310 crm_err(
"Server disconnected client %s while waiting for msg id %d", client->name,
1315 }
while (time(NULL) < timeout);
1326 static uint32_t
id = 0;
1327 static int factor = 8;
1328 struct crm_ipc_response_header *header;
1332 if (client == NULL) {
1338 crm_notice(
"Connection to %s closed", client->name);
1342 if (ms_timeout == 0) {
1346 if (client->need_reply) {
1347 crm_trace(
"Trying again to obtain pending reply from %s", client->name);
1348 rc = qb_ipcc_recv(client->ipc, client->buffer, client->buf_size, ms_timeout);
1350 crm_warn(
"Sending to %s (%p) is disabled until pending reply is received", client->name,
1355 crm_notice(
"Lost reply from %s (%p) finally arrived, sending re-enabled", client->name,
1357 client->need_reply = FALSE;
1368 header = iov[0].iov_base;
1369 header->flags |=
flags;
1376 if(header->size_compressed) {
1377 if(factor < 10 && (client->max_buf_size / 10) < (rc / factor)) {
1378 crm_notice(
"Compressed message exceeds %d0%% of the configured ipc limit (%u bytes), "
1379 "consider setting PCMK_ipc_buffer to %u or higher",
1380 factor, client->max_buf_size, 2 * client->max_buf_size);
1385 crm_trace(
"Sending from client: %s request id: %d bytes: %u timeout:%d msg...",
1386 client->name, header->qb.id, header->qb.size, ms_timeout);
1390 rc = internal_ipc_send_request(client, iov, ms_timeout);
1393 crm_trace(
"Failed to send from client %s request %d with %u bytes...",
1394 client->name, header->qb.id, header->qb.size);
1398 crm_trace(
"Message sent, not waiting for reply to %d from %s to %u bytes...",
1399 header->qb.id, client->name, header->qb.size);
1404 rc = internal_ipc_get_reply(client, header->qb.id, ms_timeout);
1413 client->need_reply = TRUE;
1417 rc = internal_ipc_send_recv(client, iov);
1421 struct crm_ipc_response_header *hdr = (
struct crm_ipc_response_header *)(
void*)client->buffer;
1423 crm_trace(
"Received response %d, size=%u, rc=%ld, text: %.200s", hdr->qb.id, hdr->qb.size,
1431 crm_trace(
"Response not received: rc=%ld, errno=%d", rc, errno);
1438 }
else if (rc == -ETIMEDOUT) {
1439 crm_warn(
"Request %d to %s (%p) failed: %s (%ld) after %dms",
1440 header->qb.id, client->name, client->ipc,
pcmk_strerror(rc), rc, ms_timeout);
1443 }
else if (rc <= 0) {
1444 crm_warn(
"Request %d to %s (%p) failed: %s (%ld)",
1445 header->qb.id, client->name, client->ipc,
pcmk_strerror(rc), rc);
1454 pid_t *gotpid, uid_t *gotuid, gid_t *gotgid) {
1456 pid_t found_pid = 0; uid_t found_uid = 0; gid_t found_gid = 0;
1457 #if defined(US_AUTH_PEERCRED_UCRED)
1459 socklen_t ucred_len =
sizeof(ucred);
1461 if (!getsockopt(sock, SOL_SOCKET, SO_PEERCRED,
1463 && ucred_len ==
sizeof(ucred)) {
1464 found_pid = ucred.pid; found_uid = ucred.uid; found_gid = ucred.gid;
1466 #elif defined(US_AUTH_PEERCRED_SOCKPEERCRED)
1467 struct sockpeercred sockpeercred;
1468 socklen_t sockpeercred_len =
sizeof(sockpeercred);
1470 if (!getsockopt(sock, SOL_SOCKET, SO_PEERCRED,
1471 &sockpeercred, &sockpeercred_len)
1472 && sockpeercred_len ==
sizeof(sockpeercred_len)) {
1473 found_pid = sockpeercred.pid;
1474 found_uid = sockpeercred.uid; found_gid = sockpeercred.gid;
1476 #elif defined(US_AUTH_GETPEEREID)
1477 if (!getpeereid(sock, &found_uid, &found_gid)) {
1480 #elif defined(US_AUTH_GETPEERUCRED)
1482 if (!getpeerucred(sock, &ucred)) {
1484 found_pid = ucred_getpid(ucred);
1485 found_uid = ucred_geteuid(ucred); found_gid = ucred_getegid(ucred);
1493 # error "No way to authenticate a Unix socket peer"
1497 if (gotpid != NULL) {
1498 *gotpid = found_pid;
1500 if (gotuid != NULL) {
1501 *gotuid = found_uid;
1503 if (gotgid != NULL) {
1504 *gotgid = found_gid;
1506 ret = (found_uid == 0 || found_uid == refuid || found_gid == refgid);
1516 gid_t refgid, pid_t *gotpid) {
1517 static char last_asked_name[PATH_MAX / 2] =
"";
1519 pid_t found_pid = 0; uid_t found_uid = 0; gid_t found_gid = 0;
1520 qb_ipcc_connection_t *c;
1522 if ((c = qb_ipcc_connect(name, 0)) == NULL) {
1525 }
else if ((ret = qb_ipcc_fd_get(c, &fd))) {
1526 crm_err(
"Could not get fd from %s IPC: %s (%d)", name,
1531 &found_pid, &found_uid,
1534 crm_err(
"Could not get peer credentials from %s IPC", name);
1536 crm_err(
"Could not get peer credentials from %s IPC: %s (%d)",
1542 if (gotpid != NULL) {
1543 *gotpid = found_pid;
1547 crm_err(
"Daemon (IPC %s) effectively blocked with unauthorized"
1548 " process %lld (uid: %lld, gid: %lld)",
1550 (
long long) found_uid, (
long long) found_gid);
1552 }
else if ((found_uid != refuid || found_gid != refgid)
1553 && strncmp(last_asked_name, name,
sizeof(last_asked_name))) {
1554 if (!found_uid && refuid) {
1555 crm_warn(
"Daemon (IPC %s) runs as root, whereas the expected"
1556 " credentials are %lld:%lld, hazard of violating"
1557 " the least privilege principle",
1558 name, (
long long) refuid, (
long long) refgid);
1560 crm_notice(
"Daemon (IPC %s) runs as %lld:%lld, whereas the"
1561 " expected credentials are %lld:%lld, which may"
1562 " mean a different set of privileges than expected",
1563 name, (
long long) found_uid, (
long long) found_gid,
1564 (
long long) refuid, (
long long) refgid);
1566 memccpy(last_asked_name, name,
'\0',
sizeof(last_asked_name));
1571 qb_ipcc_disconnect(c);
1582 const char *client_name,
const char *major_version,
const char *minor_version)
1584 xmlNode *hello_node = NULL;
1585 xmlNode *hello = NULL;
1587 if (uuid == NULL || strlen(uuid) == 0
1588 || client_name == NULL || strlen(client_name) == 0
1589 || major_version == NULL || strlen(major_version) == 0
1590 || minor_version == NULL || strlen(minor_version) == 0) {
1591 crm_err(
"Missing fields, Hello message will not be valid.");
1596 crm_xml_add(hello_node,
"major_version", major_version);
1597 crm_xml_add(hello_node,
"minor_version", minor_version);
1598 crm_xml_add(hello_node,
"client_name", client_name);