21 #include <sys/param.h>
24 #include <sys/types.h>
38 #define PCMK_IPC_VERSION 1
40 struct crm_ipc_response_header {
41 struct qb_ipc_response_header qb;
48 static int hdr_offset = 0;
49 static unsigned int ipc_buffer_max = 0;
50 static unsigned int pick_ipc_buffer(
unsigned int max);
55 if (hdr_offset == 0) {
56 hdr_offset =
sizeof(
struct crm_ipc_response_header);
58 if (ipc_buffer_max == 0) {
59 ipc_buffer_max = pick_ipc_buffer(0);
66 return pick_ipc_buffer(0);
70 generateReference(
const char *custom1,
const char *custom2)
72 static uint ref_counter = 0;
73 const char *local_cust1 = custom1;
74 const char *local_cust2 = custom2;
75 int reference_len = 4;
76 char *since_epoch = NULL;
81 if (local_cust1 == NULL) {
82 local_cust1 =
"_empty_";
84 reference_len += strlen(local_cust1);
86 if (local_cust2 == NULL) {
87 local_cust2 =
"_empty_";
89 reference_len += strlen(local_cust2);
91 since_epoch = calloc(1, reference_len);
93 if (since_epoch != NULL) {
94 sprintf(since_epoch,
"%s-%s-%lu-%u",
95 local_cust1, local_cust2, (
unsigned long)time(NULL), ref_counter++);
103 const char *host_to,
const char *sys_to,
104 const char *sys_from,
const char *uuid_from,
const char *origin)
106 char *true_from = NULL;
107 xmlNode *request = NULL;
108 char *reference = generateReference(task, sys_from);
110 if (uuid_from != NULL) {
112 }
else if (sys_from != NULL) {
113 true_from = strdup(sys_from);
115 crm_err(
"No sys from specified");
130 if (host_to != NULL && strlen(host_to) > 0) {
134 if (msg_data != NULL) {
147 create_reply_adv(xmlNode * original_request, xmlNode * xml_response_data,
const char *origin)
149 xmlNode *reply = NULL;
159 crm_err(
"Cannot create new_message, no message type in original message");
164 crm_err(
"Cannot create new_message, original message was not a request");
170 crm_err(
"Cannot create new_message, malloc failed");
186 if (host_from != NULL && strlen(host_from) > 0) {
190 if (xml_response_data != NULL) {
206 if (client_connections) {
207 return g_hash_table_lookup(client_connections, c);
221 if (client_connections &&
id) {
222 g_hash_table_iter_init(&iter, client_connections);
223 while (g_hash_table_iter_next(&iter, &key, (gpointer *) & client)) {
224 if (strcmp(client->
id,
id) == 0) {
230 crm_trace(
"No client found with id=%s",
id);
239 }
else if (c->
name == NULL && c->
id == NULL) {
241 }
else if (c->
name == NULL) {
251 if (client_connections == NULL) {
253 client_connections = g_hash_table_new(g_direct_hash, g_direct_equal);
260 if (client_connections != NULL) {
261 int active = g_hash_table_size(client_connections);
264 crm_err(
"Exiting with %d active connections", active);
266 g_hash_table_destroy(client_connections); client_connections = NULL;
273 qb_ipcs_connection_t *c = qb_ipcs_connection_first_get(service);
276 qb_ipcs_connection_t *last = c;
278 c = qb_ipcs_connection_next_get(service, last);
282 qb_ipcs_disconnect(last);
283 qb_ipcs_connection_unref(last);
290 static uid_t uid_server = 0;
291 static gid_t gid_cluster = 0;
300 if (gid_cluster == 0) {
301 uid_server = getuid();
303 static bool have_error = FALSE;
304 if(have_error == FALSE) {
311 if(gid_cluster != 0 && gid_client != 0) {
314 if(uid_client == 0 || uid_server == 0) {
315 best_uid = QB_MAX(uid_client, uid_server);
316 crm_trace(
"Allowing user %u to clean up after disconnect", best_uid);
319 crm_trace(
"Giving access to group %u", gid_cluster);
320 qb_ipcs_connection_auth_set(c, best_uid, gid_cluster, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
334 crm_debug(
"Connecting %p for uid=%d gid=%d pid=%u id=%s", c, uid_client, gid_client, client->
pid, client->
id);
340 g_hash_table_insert(client_connections, c, client);
351 if (client_connections) {
353 crm_trace(
"Destroying %p/%p (%d remaining)",
354 c, c->
ipcs, crm_hash_table_size(client_connections) - 1);
355 g_hash_table_remove(client_connections, c->
ipcs);
358 crm_trace(
"Destroying remote connection %p (%d remaining)",
359 c, crm_hash_table_size(client_connections) - 1);
360 g_hash_table_remove(client_connections, c->
id);
373 free(event[0].iov_base);
374 free(event[1].iov_base);
394 struct qb_ipcs_connection_stats stats;
396 stats.client_pid = 0;
397 qb_ipcs_connection_stats_get(c, &stats, 0);
398 return stats.client_pid;
405 char *uncompressed = NULL;
406 char *text = ((
char *)data) +
sizeof(
struct crm_ipc_response_header);
407 struct crm_ipc_response_header *header =
data;
410 *
id = ((
struct qb_ipc_response_header *)data)->id;
413 *flags = header->flags;
424 crm_err(
"Filtering incompatible v%d IPC message, we only support versions <= %d",
429 if (header->size_compressed) {
431 unsigned int size_u = 1 + header->size_uncompressed;
432 uncompressed = calloc(1, size_u);
434 crm_trace(
"Decompressing message data %u bytes into %u bytes",
435 header->size_compressed, size_u);
437 rc = BZ2_bzBuffToBuffDecompress(uncompressed, &size_u, text, header->size_compressed, 1, 0);
447 CRM_ASSERT(text[header->size_uncompressed - 1] == 0);
459 crm_ipcs_flush_events_cb(gpointer
data)
486 struct crm_ipc_response_header *header = NULL;
489 rc = qb_ipcs_event_sendv(c->
ipcs, event, 2);
495 header =
event[0].iov_base;
496 if (header->size_compressed) {
497 crm_trace(
"Event %d to %p[%d] (%d compressed bytes) sent",
498 header->qb.id, c->
ipcs, c->
pid, rc);
500 crm_trace(
"Event %d to %p[%d] (%d bytes) sent: %.120s",
501 header->qb.id, c->
ipcs, c->
pid, rc, event[1].iov_base);
505 free(event[0].iov_base);
506 free(event[1].iov_base);
512 crm_trace(
"Sent %d events (%d remaining) for %p[%d]: %s (%d)",
517 if (queue_len % 100 == 0 && queue_len > 99) {
518 crm_warn(
"Event queue for %p[%d] has grown to %d", c->
ipcs, c->
pid, queue_len);
520 }
else if (queue_len > 500) {
521 crm_err(
"Evicting slow client %p[%d]: event queue reached %d entries",
523 qb_ipcs_disconnect(c->
ipcs);
527 c->
event_timer = g_timeout_add(1000 + 100 * queue_len, crm_ipcs_flush_events_cb, c);
536 static unsigned int biggest = 0;
538 unsigned int total = 0;
539 char *compressed = NULL;
541 struct crm_ipc_response_header *header = calloc(1,
sizeof(
struct crm_ipc_response_header));
547 if (max_send_size == 0) {
548 max_send_size = ipc_buffer_max;
554 iov = calloc(2,
sizeof(
struct iovec));
557 iov[0].iov_len = hdr_offset;
558 iov[0].iov_base = header;
561 header->size_uncompressed = 1 + strlen(buffer);
562 total = iov[0].iov_len + header->size_uncompressed;
564 if (total < max_send_size) {
565 iov[1].iov_base = buffer;
566 iov[1].iov_len = header->size_uncompressed;
569 unsigned int new_size = 0;
572 (buffer, header->size_uncompressed, max_send_size, &compressed, &new_size)) {
575 header->size_compressed = new_size;
577 iov[1].iov_len = header->size_compressed;
578 iov[1].iov_base = compressed;
582 biggest = QB_MAX(header->size_compressed, biggest);
585 ssize_t rc = -EMSGSIZE;
588 biggest = QB_MAX(header->size_uncompressed, biggest);
591 (
"Could not compress the message (%u bytes) into less than the configured ipc limit (%u bytes). "
592 "Set PCMK_ipc_buffer to a higher value (%u bytes suggested)",
593 header->size_uncompressed, max_send_size, 4 * biggest);
604 header->qb.size = iov[0].iov_len + iov[1].iov_len;
605 header->qb.id = (
int32_t)request;
609 return header->qb.size;
617 struct crm_ipc_response_header *header = iov[0].iov_base;
629 header->flags |=
flags;
631 header->qb.id =
id++;
638 struct iovec *iov_copy = calloc(2,
sizeof(
struct iovec));
641 iov_copy[0].iov_len = iov[0].iov_len;
642 iov_copy[0].iov_base = malloc(iov[0].iov_len);
643 memcpy(iov_copy[0].iov_base, iov[0].iov_base, iov[0].iov_len);
645 iov_copy[1].iov_len = iov[1].iov_len;
646 iov_copy[1].iov_base = malloc(iov[1].iov_len);
647 memcpy(iov_copy[1].iov_base, iov[1].iov_base, iov[1].iov_len);
655 rc = qb_ipcs_response_sendv(c->
ipcs, iov, 2);
656 if (rc < header->qb.size) {
657 crm_notice(
"Response %d to %p[%d] (%u bytes) failed: %s (%d)",
661 crm_trace(
"Response %d sent, %d bytes to %p[%d]", header->qb.id, rc, c->
ipcs, c->
pid);
665 free(iov[0].iov_base);
666 free(iov[1].iov_base);
671 if (flags & crm_ipc_server_event) {
677 if (rc == -EPIPE || rc == -ENOTCONN) {
688 struct iovec *iov = NULL;
692 return -EDESTADDRREQ;
702 crm_notice(
"Message to %p[%d] failed: %s (%d)",
727 #define MIN_MSG_SIZE 12336
728 #define MAX_MSG_SIZE 128*1024
734 unsigned int max_buf_size;
736 unsigned int buf_size;
743 qb_ipcc_connection_t *ipc;
748 pick_ipc_buffer(
unsigned int max)
750 static unsigned int global_max = 0;
752 if(global_max == 0) {
753 const char *env = getenv(
"PCMK_ipc_buffer");
765 return QB_MAX(max, global_max);
775 client->name = strdup(name);
776 client->buf_size = pick_ipc_buffer(max_size);
777 client->buffer = malloc(client->buf_size);
780 client->max_buf_size = client->buf_size;
783 client->pfd.events = POLLIN;
784 client->pfd.revents = 0;
799 client->need_reply = FALSE;
800 client->ipc = qb_ipcc_connect(client->name, client->buf_size);
802 if (client->ipc == NULL) {
808 if (client->pfd.fd < 0) {
809 crm_debug(
"Could not obtain file descriptor for %s connection: %s (%d)", client->name,
pcmk_strerror(errno), errno);
813 qb_ipcc_context_set(client->ipc, client);
815 #ifdef HAVE_IPCS_GET_BUFFER_SIZE
816 client->max_buf_size = qb_ipcc_get_buffer_size(client->ipc);
817 if (client->max_buf_size > client->buf_size) {
818 free(client->buffer);
819 client->buffer = calloc(1, client->max_buf_size);
820 client->buf_size = client->max_buf_size;
831 crm_trace(
"Disconnecting %s IPC connection %p (%p.%p)", client->name, client, client->ipc);
834 qb_ipcc_connection_t *ipc = client->ipc;
837 qb_ipcc_disconnect(ipc);
846 if (client->ipc && qb_ipcc_is_connected(client->ipc)) {
847 crm_notice(
"Destroying an active IPC connection to %s", client->name);
858 crm_trace(
"Destroying IPC connection to %s: %p", client->name, client);
859 free(client->buffer);
870 if (client && client->ipc && (qb_ipcc_fd_get(client->ipc, &fd) == 0)) {
874 crm_perror(LOG_ERR,
"Could not obtain file IPC descriptor for %s",
875 (client? client->name :
"unspecified client"));
884 if (client == NULL) {
888 }
else if (client->ipc == NULL) {
892 }
else if (client->pfd.fd < 0) {
897 rc = qb_ipcc_is_connected(client->ipc);
899 client->pfd.fd = -EINVAL;
913 client->pfd.revents = 0;
914 return poll(&(client->pfd), 1, 0);
920 struct crm_ipc_response_header *header = (
struct crm_ipc_response_header *)(
void*)client->buffer;
922 if (header->size_compressed) {
924 unsigned int size_u = 1 + header->size_uncompressed;
926 unsigned int new_buf_size = QB_MAX((hdr_offset + size_u), client->max_buf_size);
927 char *uncompressed = calloc(1, new_buf_size);
929 crm_trace(
"Decompressing message data %u bytes into %u bytes",
930 header->size_compressed, size_u);
932 rc = BZ2_bzBuffToBuffDecompress(uncompressed + hdr_offset, &size_u,
933 client->buffer + hdr_offset, header->size_compressed, 1, 0);
949 CRM_ASSERT(size_u == header->size_uncompressed);
951 memcpy(uncompressed, client->buffer, hdr_offset);
952 header = (
struct crm_ipc_response_header *)(
void*)uncompressed;
954 free(client->buffer);
955 client->buf_size = new_buf_size;
956 client->buffer = uncompressed;
959 CRM_ASSERT(client->buffer[hdr_offset + header->size_uncompressed - 1] == 0);
966 struct crm_ipc_response_header *header = NULL;
974 client->buffer[0] = 0;
975 client->msg_size = qb_ipcc_event_recv(client->ipc, client->buffer, client->buf_size - 1, 0);
976 if (client->msg_size >= 0) {
977 int rc = crm_ipc_decompress(client);
983 header = (
struct crm_ipc_response_header *)(
void*)client->buffer;
985 crm_err(
"Filtering incompatible v%d IPC message, we only support versions <= %d",
990 crm_trace(
"Received %s event %d, size=%u, rc=%d, text: %.100s",
991 client->name, header->qb.id, header->qb.size, client->msg_size,
992 client->buffer + hdr_offset);
999 crm_err(
"Connection to %s failed", client->name);
1004 return header->size_uncompressed;
1013 return client->buffer +
sizeof(
struct crm_ipc_response_header);
1019 struct crm_ipc_response_header *header = NULL;
1022 if (client->buffer == NULL) {
1026 header = (
struct crm_ipc_response_header *)(
void*)client->buffer;
1027 return header->flags;
1034 return client->name;
1038 internal_ipc_send_recv(
crm_ipc_t * client,
const void *iov)
1043 rc = qb_ipcc_sendv_recv(client->ipc, iov, 2, client->buffer, client->buf_size, -1);
1050 internal_ipc_send_request(
crm_ipc_t * client,
const void *iov,
int ms_timeout)
1053 time_t timeout = time(NULL) + 1 + (ms_timeout / 1000);
1056 rc = qb_ipcc_sendv(client->ipc, iov, 2);
1063 internal_ipc_get_reply(
crm_ipc_t * client,
int request_id,
int ms_timeout)
1065 time_t timeout = time(NULL) + 1 + (ms_timeout / 1000);
1071 crm_trace(
"client %s waiting on reply to msg id %d", client->name, request_id);
1074 rc = qb_ipcc_recv(client->ipc, client->buffer, client->buf_size, 1000);
1076 struct crm_ipc_response_header *hdr = NULL;
1078 int rc = crm_ipc_decompress(client);
1084 hdr = (
struct crm_ipc_response_header *)(
void*)client->buffer;
1085 if (hdr->qb.id == request_id) {
1088 }
else if (hdr->qb.id < request_id) {
1091 crm_err(
"Discarding old reply %d (need %d)", hdr->qb.id, request_id);
1097 crm_err(
"Discarding newer reply %d (need %d)", hdr->qb.id, request_id);
1102 crm_err(
"Server disconnected client %s while waiting for msg id %d", client->name,
1107 }
while (time(NULL) < timeout);
1119 static int factor = 8;
1120 struct crm_ipc_response_header *header;
1124 if (client == NULL) {
1130 crm_notice(
"Connection to %s closed", client->name);
1134 if (ms_timeout == 0) {
1138 if (client->need_reply) {
1139 crm_trace(
"Trying again to obtain pending reply from %s", client->name);
1140 rc = qb_ipcc_recv(client->ipc, client->buffer, client->buf_size, ms_timeout);
1142 crm_warn(
"Sending to %s (%p) is disabled until pending reply is received", client->name,
1147 crm_notice(
"Lost reply from %s (%p) finally arrived, sending re-enabled", client->name,
1149 client->need_reply = FALSE;
1160 header = iov[0].iov_base;
1161 header->flags |=
flags;
1168 if(header->size_compressed) {
1169 if(factor < 10 && (client->max_buf_size / 10) < (rc / factor)) {
1170 crm_notice(
"Compressed message exceeds %d0%% of the configured ipc limit (%u bytes), "
1171 "consider setting PCMK_ipc_buffer to %u or higher",
1172 factor, client->max_buf_size, 2 * client->max_buf_size);
1177 crm_trace(
"Sending from client: %s request id: %d bytes: %u timeout:%d msg...",
1178 client->name, header->qb.id, header->qb.size, ms_timeout);
1182 rc = internal_ipc_send_request(client, iov, ms_timeout);
1185 crm_trace(
"Failed to send from client %s request %d with %u bytes...",
1186 client->name, header->qb.id, header->qb.size);
1190 crm_trace(
"Message sent, not waiting for reply to %d from %s to %u bytes...",
1191 header->qb.id, client->name, header->qb.size);
1196 rc = internal_ipc_get_reply(client, header->qb.id, ms_timeout);
1205 client->need_reply = TRUE;
1209 rc = internal_ipc_send_recv(client, iov);
1213 struct crm_ipc_response_header *hdr = (
struct crm_ipc_response_header *)(
void*)client->buffer;
1215 crm_trace(
"Received response %d, size=%u, rc=%ld, text: %.200s", hdr->qb.id, hdr->qb.size,
1223 crm_trace(
"Response not received: rc=%ld, errno=%d", rc, errno);
1230 }
else if (rc == -ETIMEDOUT) {
1231 crm_warn(
"Request %d to %s (%p) failed: %s (%ld) after %dms",
1232 header->qb.id, client->name, client->ipc,
pcmk_strerror(rc), rc, ms_timeout);
1235 }
else if (rc <= 0) {
1236 crm_warn(
"Request %d to %s (%p) failed: %s (%ld)",
1237 header->qb.id, client->name, client->ipc,
pcmk_strerror(rc), rc);
1241 free(iov[1].iov_base);
1250 const char *client_name,
const char *major_version,
const char *minor_version)
1252 xmlNode *hello_node = NULL;
1253 xmlNode *hello = NULL;
1255 if (uuid == NULL || strlen(uuid) == 0
1256 || client_name == NULL || strlen(client_name) == 0
1257 || major_version == NULL || strlen(major_version) == 0
1258 || minor_version == NULL || strlen(minor_version) == 0) {
1259 crm_err(
"Missing fields, Hello message will not be valid.");
1264 crm_xml_add(hello_node,
"major_version", major_version);
1265 crm_xml_add(hello_node,
"minor_version", minor_version);
1266 crm_xml_add(hello_node,
"client_name", client_name);
const char * crm_ipc_buffer(crm_ipc_t *client)
void crm_write_blackbox(int nsig, struct qb_log_callsite *callsite)
void crm_ipc_close(crm_ipc_t *client)
#define crm_notice(fmt, args...)
char * crm_generate_uuid(void)
uint32_t crm_ipc_buffer_flags(crm_ipc_t *client)
int crm_ipc_send(crm_ipc_t *client, xmlNode *message, enum crm_ipc_flags flags, int32_t ms_timeout, xmlNode **reply)
const char * pcmk_strerror(int rc)
crm_client_t * crm_client_get(qb_ipcs_connection_t *c)
qb_ipcs_connection_t * ipcs
crm_client_t * crm_client_get_by_id(const char *id)
xmlNode * create_reply_adv(xmlNode *original_request, xmlNode *xml_response_data, const char *origin)
bool crm_ipc_connect(crm_ipc_t *client)
Establish an IPC connection to a Pacemaker component.
int crm_parse_int(const char *text, const char *default_text)
struct crm_remote_s * remote
int crm_user_lookup(const char *name, uid_t *uid, gid_t *gid)
#define CRM_LOG_ASSERT(expr)
void crm_client_destroy(crm_client_t *c)
int crm_ipc_get_fd(crm_ipc_t *client)
void crm_client_init(void)
#define clear_bit(word, bit)
void crm_client_disconnect_all(qb_ipcs_service_t *service)
ssize_t crm_ipc_prepare(uint32_t request, xmlNode *message, struct iovec **result, uint32_t max_send_size)
ssize_t crm_ipcs_flush_events(crm_client_t *c)
xmlNode * string2xml(const char *input)
ssize_t crm_ipcs_sendv(crm_client_t *c, struct iovec *iov, enum crm_ipc_flags flags)
#define crm_warn(fmt, args...)
crm_client_t * crm_client_new(qb_ipcs_connection_t *c, uid_t uid_client, gid_t gid_client)
#define crm_debug(fmt, args...)
struct crm_ipc_s crm_ipc_t
const char * crm_ipc_name(crm_ipc_t *client)
GHashTable * client_connections
unsigned int crm_ipc_default_buffer_size(void)
#define crm_trace(fmt, args...)
void crm_ipc_destroy(crm_ipc_t *client)
xmlNode * create_xml_node(xmlNode *parent, const char *name)
const char * crm_element_value(xmlNode *data, const char *name)
gboolean add_message_xml(xmlNode *msg, const char *field, xmlNode *xml)
void free_xml(xmlNode *child)
void crm_ipcs_send_ack(crm_client_t *c, uint32_t request, uint32_t flags, const char *tag, const char *function, int line)
xmlNode * crm_ipcs_recv(crm_client_t *c, void *data, size_t size, uint32_t *id, uint32_t *flags)
const char * crm_xml_add(xmlNode *node, const char *name, const char *value)
const char * crm_xml_add_int(xmlNode *node, const char *name, int value)
ssize_t crm_ipcs_send(crm_client_t *c, uint32_t request, xmlNode *message, enum crm_ipc_flags flags)
#define crm_perror(level, fmt, args...)
Log a system error message.
bool crm_compress_string(const char *data, int length, int max, char **result, unsigned int *result_len)
xmlNode * create_hello_message(const char *uuid, const char *client_name, const char *major_version, const char *minor_version)
#define crm_err(fmt, args...)
const char * bz2_strerror(int rc)
#define crm_log_xml_notice(xml, text)
char * dump_xml_unformatted(xmlNode *msg)
int crm_ipc_ready(crm_ipc_t *client)
#define XML_ATTR_RESPONSE
long crm_ipc_read(crm_ipc_t *client)
Wrappers for and extensions to libqb IPC.
char * generate_hash_key(const char *crm_msg_reference, const char *sys)
int crm_ipcs_client_pid(qb_ipcs_connection_t *c)
#define crm_log_xml_trace(xml, text)
void crm_client_cleanup(void)
const char * crm_client_name(crm_client_t *c)
bool crm_ipc_connected(crm_ipc_t *client)
crm_ipc_t * crm_ipc_new(const char *name, size_t max_size)
xmlNode * create_request_adv(const char *task, xmlNode *msg_data, const char *host_to, const char *sys_to, const char *sys_from, const char *uuid_from, const char *origin)
#define create_request(task, xml_data, host_to, sys_to, sys_from, uuid_from)
char * uid2username(uid_t uid)
enum crm_ais_msg_types type