25 #include <sys/utsname.h>
26 #include <sys/socket.h>
30 # include <corosync/confdb.h>
31 # include <corosync/corodefs.h>
32 # include <corosync/cpg.h>
33 # include <corosync/cfg.h>
37 # include <corosync/cmap.h>
42 cman_handle_t pcmk_cman_handle = NULL;
55 static bool valid_cman_name(
const char *name,
uint32_t nodeid)
64 crm_notice(
"Ignoring inferred name from cman: %s", fakename);
83 if(
id) *
id = local_id;
84 if(uname) *uname = strdup(local_uname);
92 iov.iov_base = &header;
93 iov.iov_len = header.size;
97 rc = coroipcc_msg_send_reply_receive(
ais_ipc_handle, &iov, 1, &answer,
sizeof(answer));
100 crm_err(
"Odd message: id=%d, size=%d, error=%d",
101 answer.header.id, answer.header.size, answer.header.error));
103 crm_err(
"Bad response id: %d", answer.header.id));
106 if ((rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) && retries < 20) {
108 crm_info(
"Peer overloaded: Re-sending message (Attempt %d of 20)", retries);
114 crm_err(
"Sending nodeid request: FAILED (rc=%d): %s", rc, ais_error2text(rc));
117 }
else if (answer.header.error != CS_OK) {
118 crm_err(
"Bad response from peer: (rc=%d): %s", rc, ais_error2text(rc));
122 crm_info(
"Server details: id=%u uname=%s cname=%s", answer.id, answer.uname, answer.cname);
124 local_id = answer.id;
125 local_uname = strdup(answer.uname);
127 if(
id) *
id = local_id;
128 if(uname) *uname = strdup(local_uname);
138 char *buf = malloc(buf_len);
148 if (rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) {
150 crm_info(
"Peer overloaded or membership in flux:"
151 " Re-sending message (Attempt %d of 20)", retries);
156 rc = coroipcc_msg_send_reply_receive(
ais_ipc_handle, iov, 1, buf, buf_len);
158 }
while ((rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) && retries < 20);
162 crm_err(
"Odd message: id=%d, size=%d, class=%d, error=%d",
163 header->id, header->size,
class, header->error));
167 crm_err(
"Bad response id (%d) for request (%d)", header->id,
168 ais_msg->header.
id));
169 CRM_CHECK(header->error == CS_OK, rc = header->error);
172 crm_perror(LOG_ERR,
"Sending plugin message %d FAILED: %s (%d)",
173 ais_msg->
id, ais_error2text(rc), rc);
176 free(iov[0].iov_base);
180 return (rc == CS_OK);
201 if (pcmk_cman_handle) {
203 if (cman_stop_notification(pcmk_cman_handle) >= 0) {
205 cman_finish(pcmk_cman_handle);
221 xmlNode *member = NULL;
222 const char *value = NULL;
227 crm_err(
"Invalid membership update: %s", msg->
data);
242 crm_notice(
"Membership %s: quorum %s", value, quorate ?
"acquired" :
"lost");
246 crm_info(
"Membership %s: quorum %s", value, quorate ?
"retained" :
"still lost");
249 for (member = __xml_first_child(xml); member != NULL; member = __xml_next(member)) {
267 crm_update_peer(__FUNCTION__,
id, born, seen, votes, procs, uname, uname, addr, state);
274 plugin_default_deliver_message(cpg_handle_t handle,
275 const struct cpg_name *groupName,
279 const char *from = NULL;
295 if (rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) {
299 crm_perror(LOG_ERR,
"Receiving message body failed: (%d) %s", rc, ais_error2text(rc));
302 if (buffer == NULL) {
310 if (cluster && cluster->cpg.cpg_deliver_fn) {
311 cluster->cpg.cpg_deliver_fn(0, NULL, 0, 0, buffer, 0);
314 plugin_default_deliver_message(0, NULL, 0, 0, buffer, 0);
325 plugin_destroy(gpointer user_data)
327 crm_err(
"AIS connection terminated");
335 pcmk_cman_dispatch(gpointer user_data)
337 int rc = cman_dispatch(pcmk_cman_handle, CMAN_DISPATCH_ALL);
340 crm_err(
"Connection to cman failed: %d", rc);
341 pcmk_cman_handle = 0;
347 # define MAX_NODES 256
350 cman_event_callback(cman_handle_t handle,
void *privdata,
int reason,
int arg)
352 int rc = 0, lpc = 0, node_count = 0;
354 cman_cluster_t cluster;
355 static cman_node_t cman_nodes[MAX_NODES];
357 gboolean(*dispatch) (
unsigned long long, gboolean) = privdata;
360 case CMAN_REASON_STATECHANGE:
362 memset(&cluster, 0,
sizeof(cluster));
363 rc = cman_get_cluster(pcmk_cman_handle, &cluster);
365 crm_err(
"Couldn't query cman cluster details: %d %d", rc, errno);
376 arg ?
"retained" :
"still lost");
379 memset(cman_nodes, 0, MAX_NODES *
sizeof(cman_node_t));
380 rc = cman_get_nodes(pcmk_cman_handle, MAX_NODES, &node_count, cman_nodes);
382 crm_err(
"Couldn't query cman node list: %d %d", rc, errno);
386 for (lpc = 0; lpc < node_count; lpc++) {
388 const char *name = NULL;
390 if (cman_nodes[lpc].cn_nodeid == 0) {
396 if(valid_cman_name(cman_nodes[lpc].cn_name, cman_nodes[lpc].cn_nodeid)) {
397 name = cman_nodes[lpc].cn_name;
401 if(cman_nodes[lpc].cn_member) {
404 }
else if(peer->
state) {
408 crm_info(
"State of node %s[%u] is still unknown", peer->
uname, peer->
id);
417 case CMAN_REASON_TRY_SHUTDOWN:
419 crm_notice(
"CMAN wants to shut down: %s", arg ?
"forced" :
"optional");
420 cman_replyto_shutdown(pcmk_cman_handle, 0);
423 case CMAN_REASON_CONFIG_UPDATE:
434 int rc = -1, fd = -1;
435 cman_cluster_t cluster;
442 crm_info(
"Configuring Pacemaker to obtain quorum from cman");
444 memset(&cluster, 0,
sizeof(cluster));
446 pcmk_cman_handle = cman_init(
dispatch);
447 if (pcmk_cman_handle == NULL || cman_is_active(pcmk_cman_handle) == FALSE) {
448 crm_err(
"Couldn't connect to cman");
452 rc = cman_start_notification(pcmk_cman_handle, cman_event_callback);
454 crm_err(
"Couldn't register for cman notifications: %d %d", rc, errno);
459 cman_event_callback(pcmk_cman_handle,
dispatch, CMAN_REASON_STATECHANGE,
460 cman_is_quorate(pcmk_cman_handle));
462 fd = cman_get_fd(pcmk_cman_handle);
468 cman_finish(pcmk_cman_handle);
472 crm_err(
"cman qorum is not supported in this build");
478 # ifdef SUPPORT_COROSYNC
484 crm_err(
"The Corosync quorum API is not supported in this build");
495 const char *name = NULL;
504 crm_info(
"Creating connection to our Corosync plugin");
511 crm_info(
"Connection to our Corosync plugin (%d) failed: %s (%d)",
516 crm_err(
"No context created, but connection reported 'ok'");
521 ais_error2text(rc), rc);
528 if (ais_fd_callbacks.
destroy == NULL) {
529 ais_fd_callbacks.
destroy = plugin_destroy;
533 crm_info(
"AIS connection established");
543 plugin_get_details(NULL, &(cluster->
uname));
545 crm_crit(
"Node name mismatch! Corosync supplied %s but our lookup returned %s",
546 cluster->
uname, name);
548 (
"Node name mismatches usually occur when assigned automatically by DHCP servers");
560 pcmk_mcp_dispatch(
const char *buffer, ssize_t length, gpointer userdata)
565 xmlNode *node = NULL;
567 for (node = __xml_first_child(msg); node != NULL; node = __xml_next(node)) {
589 pcmk_mcp_destroy(gpointer user_data)
591 void (*callback) (gpointer
data) = user_data;
605 .destroy = pcmk_mcp_destroy
608 while (retries < 5) {
618 cluster->
destroy, &mcp_callbacks);
625 case CS_ERR_TRY_AGAIN:
626 case CS_ERR_QUEUE_FULL:
634 crm_err(
"Retry count exceeded: %d", retries);
653 cman = cman_init(NULL);
654 if (cman != NULL && cman_is_active(cman)) {
656 memset(&us, 0,
sizeof(cman_node_t));
657 cman_get_node(cman, nodeid, &us);
658 if(valid_cman_name(us.cn_name, nodeid)) {
659 name = strdup(us.cn_name);
660 crm_info(
"Using CMAN node name %s for %u", name, nodeid);
667 crm_debug(
"Unable to get node name for nodeid %u", nodeid);
685 if (init_cs_connection_classic(cluster) == FALSE) {
695 crm_info(
"Could not find an active corosync based cluster");
707 if(cluster->
nodeid == 0) {
708 crm_err(
"Could not establish local nodeid");
713 if(cluster->
uname == NULL) {
714 crm_err(
"Could not establish local node name");
728 gboolean sane = TRUE;
732 if (sane && msg->header.
size == 0) {
737 if (sane && msg->header.error != CS_OK) {
738 crm_warn(
"Message header contains an error: %d", msg->header.error);
749 crm_warn(
"Message with no payload");
754 int str_size = strlen(data) + 1;
759 crm_warn(
"Message payload is corrupted: expected %d bytes, got %d",
762 for (lpc = (str_size - 10); lpc < msg->
size; lpc++) {
766 crm_debug(
"bad_data[%d]: %d / '%c'", lpc, data[lpc], data[lpc]);
772 crm_err(
"Invalid message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)",
773 msg->
id, ais_dest(&(msg->
host)), msg_type2text(dest),
779 (
"Verified message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)",
780 msg->
id, ais_dest(&(msg->
host)), msg_type2text(dest), ais_dest(&(msg->
sender)),
791 hdb_handle_t object_handle,
const char *key,
char **value,
const char *fallback)
794 char *env_key = NULL;
795 const char *env_value = NULL;
803 if (object_handle > 0) {
804 if (CS_OK == confdb_key_get(config, object_handle, key, strlen(key), &buffer, &len)) {
805 *value = strdup(buffer);
810 crm_info(
"Found '%s' for option: %s", *value, key);
815 env_value = getenv(env_key);
819 crm_info(
"Found '%s' in ENV for option: %s", *value, key);
820 *value = strdup(env_value);
825 crm_info(
"Defaulting to '%s' for option: %s", fallback, key);
826 *value = strdup(fallback);
829 crm_info(
"No default for option: %s", key);
835 static confdb_handle_t
838 cs_error_t rc = CS_OK;
839 confdb_handle_t local_handle = OBJECT_PARENT_HANDLE;
841 rc = confdb_object_find_start(config, local_handle);
845 crm_err(
"Couldn't create search context: %d", rc);
851 config_find_next(confdb_handle_t config,
const char *name, confdb_handle_t top_handle)
853 cs_error_t rc = CS_OK;
854 hdb_handle_t local_handle = 0;
856 if (top_handle == 0) {
857 crm_err(
"Couldn't search for %s: no valid context", name);
861 crm_trace(
"Searching for %s in " HDB_X_FORMAT, name, top_handle);
862 rc = confdb_object_find(config, top_handle, name, strlen(name), &local_handle);
864 crm_info(
"No additional configuration supplied for: %s", name);
867 crm_info(
"Processing additional %s options...", name);
875 confdb_handle_t config;
880 confdb_handle_t top_handle = 0;
881 hdb_handle_t local_handle = 0;
882 static confdb_callbacks_t callbacks = { };
884 rc = confdb_initialize(&config, &callbacks);
886 crm_debug(
"Could not initialize Cluster Configuration Database API instance error %d", rc);
892 while (local_handle) {
898 crm_trace(
"Found Pacemaker plugin version: %s", value);
917 confdb_finalize(config);
920 (
"Corosync is running, but Pacemaker could not find the CMAN or Pacemaker plugin loaded");
948 crm_debug(
"%s: unknown process list, assuming active for now", node->
uname);
bool send_plugin_text(int class, struct iovec *iov)
enum crm_ais_msg_types type
#define CRM_CHECK(expr, failure_action)
gboolean init_cman_connection(gboolean(*dispatch)(unsigned long long, gboolean), void(*destroy)(gpointer))
void terminate_cs_connection(crm_cluster_t *cluster)
#define crm_notice(fmt, args...)
#define crm_crit(fmt, args...)
gboolean safe_str_neq(const char *a, const char *b)
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
hdb_handle_t config_find_init(struct corosync_api_v1 *config, char *name)
char * get_corosync_uuid(crm_node_t *peer)
const char * get_local_node_name(void)
void(* destroy)(gpointer)
struct mainloop_io_s mainloop_io_t
int plugin_dispatch(gpointer user_data)
crm_node_t * crm_get_peer(unsigned int id, const char *uname)
int(* dispatch)(gpointer userdata)
char * get_node_name(uint32_t nodeid)
char * strerror(int errnum)
enum cluster_type_e find_corosync_variant(void)
char * pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content, uint32_t *kind, const char **from)
#define AIS_IPC_MESSAGE_SIZE
gboolean cluster_connect_quorum(gboolean(*dispatch)(unsigned long long, gboolean), void(*destroy)(gpointer))
Wrappers for and extensions to glib mainloop.
xmlNode * string2xml(const char *input)
crm_node_t * crm_update_peer(const char *source, unsigned int id, uint64_t born, uint64_t seen, int32_t votes, uint32_t children, const char *uuid, const char *uname, const char *addr, const char *state)
gboolean init_cs_connection(crm_cluster_t *cluster)
hdb_handle_t ais_ipc_handle
void plugin_handle_membership(AIS_Message *msg)
void cluster_disconnect_cpg(crm_cluster_t *cluster)
#define crm_warn(fmt, args...)
#define crm_debug(fmt, args...)
void(* destroy)(gpointer userdata)
int get_config_opt(struct corosync_api_v1 *config, hdb_handle_t object_service_handle, char *key, char **value, const char *fallback)
#define crm_trace(fmt, args...)
crm_node_t * crm_update_peer_proc(const char *source, crm_node_t *peer, uint32_t flag, const char *status)
#define CRM_MESSAGE_IPC_ACK
gboolean crm_is_corosync_peer_active(const crm_node_t *node)
gboolean send_cluster_text(int class, const char *data, gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
gboolean check_message_sanity(const AIS_Message *msg, const char *data)
xmlNode * create_xml_node(xmlNode *parent, const char *name)
struct crm_ais_msg_s AIS_Message
int crm_element_value_int(xmlNode *data, const char *name, int *dest)
const char * crm_element_value(xmlNode *data, const char *name)
#define ais_data_len(msg)
struct qb_ipc_response_header cs_ipc_header_response_t
unsigned long long crm_peer_seq
gboolean is_cman_cluster(void)
void free_xml(xmlNode *child)
gboolean crm_str_eq(const char *a, const char *b, gboolean use_case)
const char * name_for_cluster_type(enum cluster_type_e type)
int set_cluster_type(enum cluster_type_e type)
#define DAEMON_RESPAWN_STOP
gboolean init_cs_connection_once(crm_cluster_t *cluster)
hdb_handle_t config_find_next(struct corosync_api_v1 *config, char *name, hdb_handle_t top_handle)
crm_node_t * crm_update_peer_state(const char *source, crm_node_t *node, const char *state, int membership)
Update a node's state and membership information.
#define crm_log_xml_err(xml, text)
#define crm_perror(level, fmt, args...)
Log a system error message.
uint32_t get_local_nodeid(cpg_handle_t handle)
crm_ipc_t * mainloop_get_ipc_client(mainloop_io_t *client)
#define crm_err(fmt, args...)
#define G_PRIORITY_MEDIUM
int crm_ipc_send(crm_ipc_t *client, xmlNode *message, enum crm_ipc_flags flags, int32_t ms_timeout, xmlNode **reply)
Wrappers for and extensions to libqb IPC.
gboolean ais_membership_force
gboolean crm_is_true(const char *s)
mainloop_io_t * mainloop_add_ipc_client(const char *name, int priority, size_t max_size, void *userdata, struct ipc_client_callbacks *callbacks)
char * crm_concat(const char *prefix, const char *suffix, char join)
char * crm_itoa(int an_int)
#define safe_str_eq(a, b)
char * crm_strdup_printf(char const *format,...) __attribute__((__format__(__printf__
long long crm_int_helper(const char *text, char **end_text)
#define crm_info(fmt, args...)
int(* dispatch)(const char *buffer, ssize_t length, gpointer userdata)
gboolean cluster_connect_cpg(crm_cluster_t *cluster)
gboolean is_classic_ais_cluster(void)
enum crm_ais_msg_types type
enum cluster_type_e get_cluster_type(void)