21 #include <sys/socket.h>
22 #include <netinet/in.h>
23 #include <arpa/inet.h>
29 #include <sys/utsname.h>
31 #include <qb/qbipcc.h>
32 #include <qb/qbutil.h>
34 #include <corosync/corodefs.h>
35 #include <corosync/corotypes.h>
36 #include <corosync/hdb.h>
37 #include <corosync/cpg.h>
43 static bool cpg_evicted = FALSE;
46 #define cs_repeat(counter, max, code) do { \
48 if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { \
50 crm_debug("Retrying operation after %ds", counter); \
55 } while(counter < max)
61 if (cluster->cpg_handle) {
63 cpg_leave(cluster->cpg_handle, &cluster->group);
64 cpg_finalize(cluster->cpg_handle);
65 cluster->cpg_handle = 0;
77 cpg_handle_t local_handle = handle;
78 cpg_callbacks_t cb = { };
80 if(local_nodeid != 0) {
87 get_ais_details(&local_nodeid, NULL);
94 cs_repeat(retries, 5, rc = cpg_initialize(&local_handle, &cb));
100 cs_repeat(retries, 5, rc = cpg_local_get(local_handle, &local_nodeid));
104 crm_err(
"Could not get local node id from the CPG API: %s (%d)", ais_error2text(rc), rc);
108 cpg_finalize(local_handle);
110 crm_debug(
"Local nodeid is %u", local_nodeid);
118 static ssize_t crm_cs_flush(gpointer
data);
121 crm_cs_flush_cb(gpointer
data)
128 #define CS_SEND_MAX 200
130 crm_cs_flush(gpointer
data)
135 static unsigned int last_sent = 0;
136 cpg_handle_t *handle = (cpg_handle_t *)data;
144 if ((queue_len % 1000) == 0 && queue_len > 1) {
145 crm_err(
"CPG queue has grown to %d", queue_len);
148 crm_warn(
"CPG queue has grown to %d", queue_len);
161 rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
169 crm_trace(
"CPG message sent, size=%d", iov->iov_len);
178 crm_info(
"Sent %d CPG messages (%d remaining, last=%u): %s (%d)",
179 sent, queue_len, last_sent, ais_error2text(rc), rc);
181 crm_trace(
"Sent %d CPG messages (%d remaining, last=%u): %s (%d)",
182 sent, queue_len, last_sent, ais_error2text(rc), rc);
189 delay_ms = QB_MIN(1000,
CS_SEND_MAX + (10 * queue_len));
200 static unsigned int queued = 0;
203 crm_trace(
"Queueing CPG message %u (%d bytes)", queued, iov->iov_len);
210 pcmk_cpg_dispatch(gpointer user_data)
215 rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
217 crm_err(
"Connection to the CPG API failed: %s (%d)", ais_error2text(rc), rc);
218 cluster->cpg_handle = 0;
221 }
else if(cpg_evicted) {
222 crm_err(
"Evicted from CPG membership");
243 crm_err(
"Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->
sender.
id);
246 }
else if (msg->
host.
id != 0 && (local_nodeid != msg->
host.
id)) {
261 crm_err(
"Peer with nodeid=%u is unknown", nodeid);
263 }
else if (peer->
uname == NULL) {
264 crm_err(
"No uname for peer with nodeid=%u", nodeid);
267 crm_notice(
"Fixing uname for peer with nodeid=%u", nodeid);
275 crm_trace(
"Got new%s message (size=%d, %d, %d)",
280 *kind = msg->header.
id;
288 char *uncompressed = NULL;
289 unsigned int new_size = msg->
size + 1;
296 uncompressed = calloc(1, new_size);
297 rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->
data, msg->
compressed_size, 1, 0);
300 crm_err(
"Decompression failed: %d", rc);
322 data = strdup(msg->
data);
333 crm_info(
"Removing peer %s/%u", data,
id);
348 crm_err(
"Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
349 " min=%d, total=%d, size=%d, bz2_size=%d",
361 const struct cpg_name *groupName,
362 const struct cpg_address *member_list,
size_t member_list_entries,
363 const struct cpg_address *left_list,
size_t left_list_entries,
364 const struct cpg_address *joined_list,
size_t joined_list_entries)
367 gboolean found = FALSE;
371 for (i = 0; i < left_list_entries; i++) {
374 crm_info(
"Node %u left group %s (peer=%s, counter=%d.%d)",
375 left_list[i].nodeid, groupName->value,
376 (peer? peer->
uname :
"<none>"), counter, i);
382 for (i = 0; i < joined_list_entries; i++) {
383 crm_info(
"Node %u joined group %s (counter=%d.%d)",
384 joined_list[i].nodeid, groupName->value, counter, i);
387 for (i = 0; i < member_list_entries; i++) {
390 crm_info(
"Node %u still member of group %s (peer=%s, counter=%d.%d)",
391 member_list[i].nodeid, groupName->value,
392 (peer? peer->
uname :
"<none>"), counter, i);
400 time_t now = time(NULL);
403 if(peer->
votes == 0) {
406 }
else if(now > (60 + peer->
votes)) {
412 crm_err(
"Node %s[%u] appears to be online even though we think it is dead", peer->
uname, peer->
id);
419 if (local_nodeid == member_list[i].nodeid) {
425 crm_err(
"We're not part of CPG group '%s' anymore!", groupName->value);
440 cpg_handle_t handle = 0;
447 cpg_callbacks_t cpg_callbacks = {
448 .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
449 .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
455 cluster->group.length = 0;
456 cluster->group.value[0] = 0;
460 cluster->group.value[127] = 0;
463 cs_repeat(retries, 30, rc = cpg_initialize(&handle, &cpg_callbacks));
465 crm_err(
"Could not connect to the Cluster Process Group API: %d\n", rc);
471 crm_err(
"Could not get local node id from the CPG API");
478 cs_repeat(retries, 30, rc = cpg_join(handle, &cluster->group));
484 rc = cpg_fd_get(handle, &fd);
486 crm_err(
"Could not obtain the CPG API connection: %d\n", rc);
491 cluster->cpg_handle = handle;
496 cpg_finalize(handle);
521 static int msg_id = 0;
522 static int local_pid = 0;
523 static int local_name_len = 0;
524 static const char *local_name = NULL;
539 if(local_name == NULL) {
542 if(local_name_len == 0 && local_name) {
543 local_name_len = strlen(local_name);
550 if (local_pid == 0) {
551 local_pid = getpid();
562 msg->header.
id =
class;
563 msg->header.error = CS_OK;
570 target = strdup(node->
uname);
579 target = strdup(
"all");
591 msg->
size = 1 + strlen(data);
595 msg = realloc_safe(msg, msg->header.
size);
599 char *compressed = NULL;
600 unsigned int new_size = 0;
601 char *uncompressed = strdup(data);
606 msg = realloc_safe(msg, msg->header.
size);
607 memcpy(msg->
data, compressed, new_size);
613 msg = realloc_safe(msg, msg->header.
size);
621 iov = calloc(1,
sizeof(
struct iovec));
623 iov->iov_len = msg->header.
size;
626 crm_trace(
"Queueing CPG message %u to %s (%d bytes, %d bytes compressed payload): %.200s",
629 crm_trace(
"Queueing CPG message %u to %s (%d bytes, %d bytes payload): %.200s",
630 msg->
id, target, iov->iov_len, msg->
size, data);
679 int scan_rc = sscanf(text,
"%d", &type);
bool send_plugin_text(int class, struct iovec *iov)
enum crm_ais_msg_types type
#define CRM_CHECK(expr, failure_action)
gboolean send_cpg_iov(struct iovec *iov)
gboolean(* pcmk_cpg_dispatch_fn)(int kind, const char *from, const char *data)
#define crm_notice(fmt, args...)
gboolean safe_str_neq(const char *a, const char *b)
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
gboolean crm_is_peer_active(const crm_node_t *node)
const char * get_local_node_name(void)
void(* destroy)(gpointer)
crm_node_t * crm_get_peer(unsigned int id, const char *uname)
int(* dispatch)(gpointer userdata)
char * pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content, uint32_t *kind, const char **from)
Wrappers for and extensions to glib mainloop.
void plugin_handle_membership(AIS_Message *msg)
void cluster_disconnect_cpg(crm_cluster_t *cluster)
#define crm_warn(fmt, args...)
#define crm_debug(fmt, args...)
GListPtr cs_message_queue
#define crm_trace(fmt, args...)
crm_node_t * crm_update_peer_proc(const char *source, crm_node_t *peer, uint32_t flag, const char *status)
#define CRM_SYSTEM_PENGINE
gboolean send_cluster_text(int class, const char *data, gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
gboolean check_message_sanity(const AIS_Message *msg, const char *data)
struct crm_ais_msg_s AIS_Message
cpg_handle_t pcmk_cpg_handle
#define ais_data_len(msg)
guint reap_crm_member(uint32_t id, const char *name)
Remove all peer cache entries matching a node ID and/or uname.
#define CRM_SYSTEM_STONITHD
crm_node_t * crm_update_peer_state(const char *source, crm_node_t *node, const char *state, int membership)
Update a node's state and membership information.
#define CRM_SYSTEM_TENGINE
uint32_t get_local_nodeid(cpg_handle_t handle)
bool crm_compress_string(const char *data, int length, int max, char **result, unsigned int *result_len)
#define crm_err(fmt, args...)
#define G_PRIORITY_MEDIUM
enum crm_ais_msg_types text2msg_type(const char *text)
#define CRM_BZ2_THRESHOLD
char * dump_xml_unformatted(xmlNode *msg)
gboolean send_cluster_message_cs(xmlNode *msg, gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
Wrappers for and extensions to libqb IPC.
#define cs_repeat(counter, max, code)
char * crm_itoa(int an_int)
#define safe_str_eq(a, b)
char * crm_strdup_printf(char const *format,...) __attribute__((__format__(__printf__
crm_node_t * crm_find_peer(unsigned int id, const char *uname)
void pcmk_cpg_membership(cpg_handle_t handle, const struct cpg_name *groupName, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
long long crm_int_helper(const char *text, char **end_text)
#define crm_info(fmt, args...)
gboolean cluster_connect_cpg(crm_cluster_t *cluster)
gboolean is_classic_ais_cluster(void)
enum crm_ais_msg_types type
enum cluster_type_e get_cluster_type(void)