55 #include <sys/types.h>
57 #include <sys/socket.h>
60 #include <sys/ioctl.h>
61 #include <sys/param.h>
62 #include <netinet/in.h>
63 #include <arpa/inet.h>
76 #include <qb/qblist.h>
77 #include <qb/qbdefs.h>
78 #include <qb/qbutil.h>
79 #include <qb/qbloop.h>
84 #define LOGSYS_UTILS_ONLY 1
95 #define LOCALHOST_IP inet_addr("127.0.0.1")
96 #define QUEUE_RTR_ITEMS_SIZE_MAX 16384
97 #define RETRANS_MESSAGE_QUEUE_SIZE_MAX 16384
98 #define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500
100 #define RETRANSMIT_ENTRIES_MAX 30
101 #define TOKEN_SIZE_MAX 64000
102 #define LEAVE_DUMMY_NODEID 0
121 #define SEQNO_START_MSG 0x0
122 #define SEQNO_START_TOKEN 0x0
144 #define ENDIAN_LOCAL 0xff22
170 struct qb_list_head
list;
441 const char *
function,
444 const char *format, ...)
__attribute__((format(printf, 6, 7)));;
457 unsigned int msg_len,
458 int endian_conversion_required);
462 const unsigned int *member_list,
size_t member_list_entries,
463 const unsigned int *left_list,
size_t left_list_entries,
464 const unsigned int *joined_list,
size_t joined_list_entries,
539 int endian_conversion_needed);
584 static int message_handler_orf_token (
588 int endian_conversion_needed);
590 static int message_handler_mcast (
594 int endian_conversion_needed);
596 static int message_handler_memb_merge_detect (
600 int endian_conversion_needed);
602 static int message_handler_memb_join (
606 int endian_conversion_needed);
608 static int message_handler_memb_commit_token (
612 int endian_conversion_needed);
614 static int message_handler_token_hold_cancel (
618 int endian_conversion_needed);
622 static void srp_addr_to_nodeid (
624 unsigned int *nodeid_out,
626 unsigned int entries);
628 static int srp_addr_equal (
const struct srp_addr *a,
const struct srp_addr *b);
634 static void messages_deliver_to_app (
struct totemsrp_instance *instance,
int skip,
unsigned int end_point);
636 int fcc_mcasts_allowed);
637 static void messages_free (
struct totemsrp_instance *instance,
unsigned int token_aru);
641 static void target_set_completed (
void *context);
643 static void memb_state_commit_token_target_set (
struct totemsrp_instance *instance);
648 static void orf_token_endian_convert (
const struct orf_token *in,
struct orf_token *out);
650 static void memb_join_endian_convert (
const struct memb_join *in,
struct memb_join *out);
651 static void mcast_endian_convert (
const struct mcast *in,
struct mcast *out);
652 static void memb_merge_detect_endian_convert (
656 static void timer_function_orf_token_timeout (
void *data);
657 static void timer_function_orf_token_warning (
void *data);
658 static void timer_function_pause_timeout (
void *data);
659 static void timer_function_heartbeat_timeout (
void *data);
660 static void timer_function_token_retransmit_timeout (
void *data);
661 static void timer_function_token_hold_retransmit_timeout (
void *data);
662 static void timer_function_merge_detect_timeout (
void *data);
664 static void totemsrp_buffer_release (
struct totemsrp_instance *instance,
void *ptr);
670 unsigned int msg_len,
676 unsigned int iface_no);
681 message_handler_orf_token,
682 message_handler_mcast,
683 message_handler_memb_merge_detect,
684 message_handler_memb_join,
685 message_handler_memb_commit_token,
686 message_handler_token_hold_cancel
690 #define log_printf(level, format, args...) \
692 instance->totemsrp_log_printf ( \
693 level, instance->totemsrp_subsys_id, \
694 __FUNCTION__, __FILE__, __LINE__, \
697 #define LOGSYS_PERROR(err_num, level, fmt, args...) \
699 char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \
700 const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \
701 instance->totemsrp_log_printf ( \
702 level, instance->totemsrp_subsys_id, \
703 __FUNCTION__, __FILE__, __LINE__, \
704 fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \
751 uint64_t timestamp_msec;
754 now_msec = (qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC);
759 "Process pause detected for %d ms, flushing membership messages.", (
unsigned int)(now_msec - timestamp_msec));
774 unsigned long long nano_secs = qb_util_nano_current_get ();
776 time_now = (nano_secs / QB_TIME_NS_IN_MSEC);
805 static void totempg_mtu_changed(
void *context,
int net_mtu)
812 "Net MTU changed to %d, new value is %d",
820 qb_loop_t *poll_handle,
828 unsigned int msg_len,
829 int endian_conversion_required),
833 const unsigned int *member_list,
size_t member_list_entries,
834 const unsigned int *left_list,
size_t left_list_entries,
835 const unsigned int *joined_list,
size_t joined_list_entries,
837 void (*waiting_trans_ack_cb_fn) (
838 int waiting_trans_ack))
844 if (instance == NULL) {
848 totemsrp_instance_initialize (instance);
886 "Token Timeout (%d ms) retransmit timeout (%d ms)",
891 "Token warning every %d ms (%d%% of Token Timeout)",
893 if (token_warning_ms < totem_config->token_retransmit_timeout)
895 "The token warning interval (%d ms) is less than the token retransmit timeout (%d ms) "
896 "which can lead to spurious token warnings. Consider increasing the token_warning parameter.",
900 "Token warnings disabled");
903 "token hold (%d ms) retransmits before loss (%d retrans)",
906 "join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)",
913 "downcheck (%d ms) fail to recv const (%d msgs)",
919 "window size per rotation (%d messages) maximum messages per rotation (%d messages)",
923 "missed count const (%d messages)",
951 timer_function_pause_timeout (instance);
955 "HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0");
966 "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)",
970 "heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay");
972 "heartbeat timeout should be less than the token timeout. Heartbeat is disabled!!");
990 target_set_completed);
1012 token_event_stats_collector,
1018 token_event_stats_collector,
1020 *srp_context = instance;
1032 memb_leave_message_send (instance);
1053 unsigned int *interface_id,
1055 unsigned int interfaces_size,
1057 unsigned int *iface_count)
1073 interface_id[num_ifs] = i;
1075 if (++num_ifs > interfaces_size) {
1084 *iface_count = num_ifs;
1090 const char *cipher_type,
1091 const char *hash_type)
1128 static int srp_addr_equal (
const struct srp_addr *a,
const struct srp_addr *b)
1136 static void srp_addr_to_nodeid (
1138 unsigned int *nodeid_out,
1140 unsigned int entries)
1144 for (i = 0; i < entries; i++) {
1145 nodeid_out[i] = srp_addr_in[i].
nodeid;
1163 static void memb_set_subtract (
1164 struct srp_addr *out_list,
int *out_list_entries,
1165 struct srp_addr *one_list,
int one_list_entries,
1166 struct srp_addr *two_list,
int two_list_entries)
1172 *out_list_entries = 0;
1174 for (i = 0; i < one_list_entries; i++) {
1175 for (j = 0; j < two_list_entries; j++) {
1176 if (srp_addr_equal (&one_list[i], &two_list[j])) {
1182 out_list[*out_list_entries] = one_list[i];
1183 *out_list_entries = *out_list_entries + 1;
1192 static void memb_consensus_set (
1216 static int memb_consensus_isset (
1233 static int memb_consensus_agreed (
1237 int token_memb_entries = 0;
1241 memb_set_subtract (token_memb, &token_memb_entries,
1245 for (i = 0; i < token_memb_entries; i++) {
1246 if (memb_consensus_isset (instance, &token_memb[i]) == 0) {
1261 assert (token_memb_entries >= 1);
1266 static void memb_consensus_notset (
1268 struct srp_addr *no_consensus_list,
1269 int *no_consensus_list_entries,
1271 int comparison_list_entries)
1275 *no_consensus_list_entries = 0;
1278 if (memb_consensus_isset (instance, &instance->
my_proc_list[i]) == 0) {
1279 no_consensus_list[*no_consensus_list_entries] = instance->
my_proc_list[i];
1280 *no_consensus_list_entries = *no_consensus_list_entries + 1;
1288 static int memb_set_equal (
1289 struct srp_addr *set1,
int set1_entries,
1290 struct srp_addr *set2,
int set2_entries)
1297 if (set1_entries != set2_entries) {
1300 for (i = 0; i < set2_entries; i++) {
1301 for (j = 0; j < set1_entries; j++) {
1302 if (srp_addr_equal (&set1[j], &set2[i])) {
1318 static int memb_set_subset (
1319 const struct srp_addr *subset,
int subset_entries,
1320 const struct srp_addr *fullset,
int fullset_entries)
1326 if (subset_entries > fullset_entries) {
1329 for (i = 0; i < subset_entries; i++) {
1330 for (j = 0; j < fullset_entries; j++) {
1331 if (srp_addr_equal (&subset[i], &fullset[j])) {
1345 static void memb_set_merge (
1346 const struct srp_addr *subset,
int subset_entries,
1347 struct srp_addr *fullset,
int *fullset_entries)
1353 for (i = 0; i < subset_entries; i++) {
1354 for (j = 0; j < *fullset_entries; j++) {
1355 if (srp_addr_equal (&fullset[j], &subset[i])) {
1361 fullset[*fullset_entries] = subset[i];
1362 *fullset_entries = *fullset_entries + 1;
1369 static void memb_set_and_with_ring_id (
1385 for (i = 0; i < set2_entries; i++) {
1386 for (j = 0; j < set1_entries; j++) {
1387 if (srp_addr_equal (&set1[j], &set2[i])) {
1388 if (memcmp (&set1_ring_ids[j], old_ring_id,
sizeof (
struct memb_ring_id)) == 0) {
1395 and[*and_entries] = set1[j];
1396 *and_entries = *and_entries + 1;
1403 static void memb_set_log(
1414 memset(list_str, 0,
sizeof(list_str));
1416 for (i = 0; i < list_entries; i++) {
1423 if (strlen(list_str) + strlen(int_buf) >=
sizeof(list_str)) {
1426 strcat(list_str, int_buf);
1429 log_printf(level,
"List '%s' contains %d entries: %s",
string, list_entries, list_str);
1432 static void my_leave_memb_clear(
1439 static unsigned int my_leave_memb_match(
1444 unsigned int ret = 0;
1455 static void my_leave_memb_set(
1481 assert (instance != NULL);
1485 static void totemsrp_buffer_release (
struct totemsrp_instance *instance,
void *ptr)
1487 assert (instance != NULL);
1501 timer_function_token_retransmit_timeout,
1502 &instance->timer_orf_token_retransmit_timeout);
1518 timer_function_merge_detect_timeout,
1519 &instance->timer_merge_detect_timeout);
1547 "Saving state aru %x high seq received %x",
1557 "Restoring instance->my_aru %x my high seq received %x",
1564 "Resetting old ring state");
1577 timer_function_pause_timeout,
1578 &instance->timer_pause_timeout);
1592 timer_function_orf_token_warning,
1593 &instance->timer_orf_token_warning);
1607 timer_function_orf_token_timeout,
1608 &instance->timer_orf_token_timeout);
1614 reset_token_warning(instance);
1625 timer_function_heartbeat_timeout,
1626 &instance->timer_heartbeat_timeout);
1641 cancel_token_warning(instance);
1648 static void cancel_token_retransmit_timeout (
struct totemsrp_instance *instance)
1653 static void start_token_hold_retransmit_timeout (
struct totemsrp_instance *instance)
1661 timer_function_token_hold_retransmit_timeout,
1662 &instance->timer_orf_token_hold_retransmit_timeout);
1668 static void cancel_token_hold_retransmit_timeout (
struct totemsrp_instance *instance)
1674 static void memb_state_consensus_timeout_expired (
1678 int no_consensus_list_entries;
1681 if (memb_consensus_agreed (instance)) {
1682 memb_consensus_reset (instance);
1684 memb_consensus_set (instance, &instance->
my_id);
1686 reset_token_timeout (instance);
1688 memb_consensus_notset (
1691 &no_consensus_list_entries,
1695 memb_set_merge (no_consensus_list, no_consensus_list_entries,
1708 static void timer_function_pause_timeout (
void *data)
1713 reset_pause_timeout (instance);
1718 old_ring_state_restore (instance);
1723 static void timer_function_orf_token_warning (
void *data)
1730 tv_diff = qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC -
1733 "Token has not been received in %d ms ", (
unsigned int) tv_diff);
1734 reset_token_warning(instance);
1736 cancel_token_warning(instance);
1740 static void timer_function_orf_token_timeout (
void *data)
1747 "The token was lost in the OPERATIONAL state.");
1749 "A processor failed, forming new configuration:"
1750 " token timed out (%ums), waiting %ums for consensus.",
1760 "The consensus timeout expired (%ums).",
1762 memb_state_consensus_timeout_expired (instance);
1769 "The token was lost in the COMMIT state.");
1776 "The token was lost in the RECOVERY state.");
1777 memb_recovery_state_token_loss (instance);
1783 static void timer_function_heartbeat_timeout (
void *data)
1787 "HeartBeat Timer expired Invoking token loss mechanism in state %d ", instance->
memb_state);
1788 timer_function_orf_token_timeout(data);
1791 static void memb_timer_function_state_gather (
void *data)
1803 memb_join_message_send (instance);
1814 memb_timer_function_state_gather,
1815 &instance->memb_timer_state_gather_join_timeout);
1824 static void memb_timer_function_gather_consensus_timeout (
void *data)
1827 memb_state_consensus_timeout_expired (instance);
1830 static void deliver_messages_from_recovery_to_regular (
struct totemsrp_instance *instance)
1835 unsigned int range = 0;
1848 for (i = 1; i <= range; i++) {
1854 recovery_message_item = ptr;
1865 regular_message_item.mcast =
1866 (
struct mcast *)(((
char *)recovery_message_item->
mcast) +
sizeof (
struct mcast));
1867 regular_message_item.msg_len =
1868 recovery_message_item->
msg_len -
sizeof (
struct mcast);
1869 mcast = regular_message_item.mcast;
1892 ®ular_message_item,
mcast->
seq);
1910 int joined_list_entries = 0;
1911 unsigned int aru_save;
1918 char left_node_msg[1024];
1919 char joined_node_msg[1024];
1920 char failed_node_msg[1024];
1924 memb_consensus_reset (instance);
1926 old_ring_state_reset (instance);
1928 deliver_messages_from_recovery_to_regular (instance);
1931 "Delivering to app %x to %x",
1934 aru_save = instance->
my_aru;
1947 memb_set_subtract (joined_list, &joined_list_entries,
1965 srp_addr_to_nodeid (instance, trans_memb_list_totemip,
1978 instance->
my_aru = aru_save;
1983 srp_addr_to_nodeid (instance, new_memb_list_totemip,
1985 srp_addr_to_nodeid (instance, joined_list_totemip, joined_list,
1986 joined_list_entries);
1990 joined_list_totemip, joined_list_entries, &instance->
my_ring_id);
2052 regular_message = ptr;
2053 free (regular_message->
mcast);
2059 if (joined_list_entries) {
2061 sptr += snprintf(joined_node_msg,
sizeof(joined_node_msg)-sptr,
" joined:");
2062 for (i=0; i< joined_list_entries; i++) {
2063 sptr += snprintf(joined_node_msg+sptr,
sizeof(joined_node_msg)-sptr,
" " CS_PRI_NODE_ID, joined_list_totemip[i]);
2067 joined_node_msg[0] =
'\0';
2073 sptr += snprintf(left_node_msg,
sizeof(left_node_msg)-sptr,
" left:");
2075 sptr += snprintf(left_node_msg+sptr,
sizeof(left_node_msg)-sptr,
" " CS_PRI_NODE_ID, left_list[i]);
2078 if (my_leave_memb_match(instance, left_list[i]) == 0) {
2080 sptr2 += snprintf(failed_node_msg,
sizeof(failed_node_msg)-sptr2,
" failed:");
2082 sptr2 += snprintf(failed_node_msg+sptr2,
sizeof(left_node_msg)-sptr2,
" " CS_PRI_NODE_ID, left_list[i]);
2086 failed_node_msg[0] =
'\0';
2090 left_node_msg[0] =
'\0';
2091 failed_node_msg[0] =
'\0';
2094 my_leave_memb_clear(instance);
2097 "entering OPERATIONAL state.");
2105 if (strlen(failed_node_msg)) {
2107 "Failed to receive the leave message.%s",
2118 reset_pause_timeout (instance);
2131 static void memb_state_gather_enter (
2142 &instance->
my_id, 1,
2145 memb_join_message_send (instance);
2156 memb_timer_function_state_gather,
2157 &instance->memb_timer_state_gather_join_timeout);
2172 memb_timer_function_gather_consensus_timeout,
2173 &instance->memb_timer_state_gather_consensus_timeout);
2181 cancel_token_retransmit_timeout (instance);
2182 cancel_token_timeout (instance);
2183 cancel_merge_detect_timeout (instance);
2185 memb_consensus_reset (instance);
2187 memb_consensus_set (instance, &instance->
my_id);
2190 "entering GATHER state from %d(%s).",
2191 gather_from, gsfrom_to_msg(gather_from));
2206 static void timer_function_token_retransmit_timeout (
void *data);
2208 static void target_set_completed (
2213 memb_state_commit_token_send (instance);
2217 static void memb_state_commit_enter (
2220 old_ring_state_save (instance);
2222 memb_state_commit_token_update (instance);
2224 memb_state_commit_token_target_set (instance);
2241 "entering COMMIT state.");
2244 reset_token_retransmit_timeout (instance);
2245 reset_token_timeout (instance);
2261 static void memb_state_recovery_enter (
2266 int local_received_flg = 1;
2267 unsigned int low_ring_aru;
2268 unsigned int range = 0;
2269 unsigned int messages_originated = 0;
2278 "entering RECOVERY state.");
2289 memb_state_commit_token_send_recovery (instance, commit_token);
2304 memcpy (&my_new_memb_ring_id_list[i],
2308 memb_set_and_with_ring_id (
2310 my_new_memb_ring_id_list,
2327 memb_list[i].ring_id.rep, (uint64_t)memb_list[i].ring_id.seq);
2330 "aru %x high delivered %x received flag %d",
2332 memb_list[i].high_delivered,
2333 memb_list[i].received_flg);
2344 memb_list[i].received_flg == 0) {
2348 local_received_flg = 0;
2352 if (local_received_flg == 1) {
2365 &memb_list[i].ring_id,
2368 if (sq_lt_compare (memb_list[i].
aru, low_ring_aru)) {
2370 low_ring_aru = memb_list[i].
aru;
2391 "copying all old ring messages from %x-%x.",
2394 for (i = 1; i <= range; i++) {
2401 low_ring_aru + i, &ptr);
2406 messages_originated++;
2429 "Originated %d messages in RECOVERY.", messages_originated);
2434 "Did not need to originate any messages in recovery.");
2444 reset_token_timeout (instance);
2445 reset_token_retransmit_timeout (instance);
2458 token_hold_cancel_send (instance);
2465 struct iovec *iovec,
2466 unsigned int iov_len,
2473 unsigned int addr_idx;
2482 if (cs_queue_is_full (queue_use)) {
2513 addr_idx = sizeof (
struct mcast);
2514 for (i = 0; i < iov_len; i++) {
2515 memcpy (&
addr[addr_idx], iovec[i].iov_base, iovec[i].iov_len);
2516 addr_idx += iovec[i].iov_len;
2545 cs_queue_avail (queue_use, &avail);
2556 static int orf_token_remcast (
2564 struct sq *sort_queue;
2572 res = sq_in_range (sort_queue,
seq);
2581 res = sq_item_get (sort_queue,
seq, &ptr);
2600 static void messages_free (
2602 unsigned int token_aru)
2607 int log_release = 0;
2608 unsigned int release_to;
2609 unsigned int range = 0;
2611 release_to = token_aru;
2612 if (sq_lt_compare (instance->
my_last_aru, release_to)) {
2632 for (i = 1; i <= range; i++) {
2638 regular_message = ptr;
2639 totemsrp_buffer_release (instance, regular_message->
mcast);
2650 "releasing messages up to and including %x", release_to);
2654 static void update_aru (
2659 struct sq *sort_queue;
2661 unsigned int my_aru_saved = 0;
2671 my_aru_saved = instance->
my_aru;
2672 for (i = 1; i <= range; i++) {
2676 res = sq_item_get (sort_queue, my_aru_saved + i, &ptr);
2684 instance->
my_aru += i - 1;
2690 static int orf_token_mcast (
2693 int fcc_mcasts_allowed)
2697 struct sq *sort_queue;
2700 unsigned int fcc_mcast_current;
2705 reset_token_retransmit_timeout (instance);
2716 for (fcc_mcast_current = 0; fcc_mcast_current < fcc_mcasts_allowed; fcc_mcast_current++) {
2717 if (cs_queue_is_empty (mcast_queue)) {
2749 cs_queue_item_remove (mcast_queue);
2757 update_aru (instance);
2762 return (fcc_mcast_current);
2769 static int orf_token_rtr (
2772 unsigned int *fcc_allowed)
2777 struct sq *sort_queue;
2779 unsigned int range = 0;
2780 char retransmit_msg[1024];
2791 strcpy (retransmit_msg,
"Retransmit List: ");
2797 strcat (retransmit_msg,
value);
2799 strcat (retransmit_msg,
"");
2801 "%s", retransmit_msg);
2821 res = orf_token_remcast (instance,
rtr_list[i].
seq);
2848 (i <= range); i++) {
2853 res = sq_in_range (sort_queue, instance->
my_aru + i);
2861 res = sq_item_inuse (sort_queue, instance->
my_aru + i);
2872 res = sq_item_miss_count (sort_queue, instance->
my_aru + i);
2912 static void timer_function_token_retransmit_timeout (
void *data)
2922 token_retransmit (instance);
2923 reset_token_retransmit_timeout (instance);
2928 static void timer_function_token_hold_retransmit_timeout (
void *data)
2939 token_retransmit (instance);
2944 static void timer_function_merge_detect_timeout(
void *data)
2953 memb_merge_detect_transmit (instance);
2966 static int token_send (
2972 unsigned int orf_token_size;
2974 orf_token_size =
sizeof (
struct orf_token) +
2982 if (forward_token == 0) {
3060 res = token_send (instance, &
orf_token, 1);
3065 static void memb_state_commit_token_update (
3070 unsigned int high_aru;
3103 &memb_list[i].ring_id,
3106 if (sq_lt_compare (high_aru, memb_list[i].
aru)) {
3107 high_aru = memb_list[i].
aru;
3114 &memb_list[i].ring_id,
3117 if (sq_lt_compare (memb_list[i].
aru, high_aru)) {
3132 static void memb_state_commit_token_target_set (
3146 static int memb_state_commit_token_send_recovery (
3150 unsigned int commit_token_size;
3172 reset_token_retransmit_timeout (instance);
3176 static int memb_state_commit_token_send (
3179 unsigned int commit_token_size;
3201 reset_token_retransmit_timeout (instance);
3209 int token_memb_entries = 0;
3211 unsigned int lowest_nodeid;
3213 memb_set_subtract (token_memb, &token_memb_entries,
3220 assert(token_memb_entries > 0);
3222 lowest_nodeid = token_memb[0].nodeid;
3223 for (i = 1; i < token_memb_entries; i++) {
3224 if (lowest_nodeid > token_memb[i].
nodeid) {
3225 lowest_nodeid = token_memb[i].nodeid;
3231 static int srp_addr_compare (
const void *a,
const void *b)
3245 static void memb_state_commit_token_create (
3251 int token_memb_entries = 0;
3254 "Creating commit token because I am the rep.");
3256 memb_set_subtract (token_memb, &token_memb_entries,
3275 qsort (token_memb, token_memb_entries,
sizeof (
struct srp_addr),
3284 memcpy (
addr, token_memb,
3285 token_memb_entries *
sizeof (
struct srp_addr));
3286 memset (memb_list, 0,
3292 char memb_join_data[40000];
3295 unsigned int addr_idx;
3306 ((instance->my_proc_list_entries + instance->my_failed_list_entries) *
sizeof(
struct srp_addr));
3308 if (msg_len >
sizeof(memb_join_data)) {
3310 "memb_join_message too long. Ignoring message.");
3326 memcpy (&
addr[addr_idx],
3333 memcpy (&
addr[addr_idx],
3355 char memb_join_data[40000];
3358 unsigned int addr_idx;
3359 int active_memb_entries;
3364 "sending join/leave message");
3371 &instance->
my_id, 1,
3374 memb_set_subtract (active_memb, &active_memb_entries,
3376 &instance->
my_id, 1);
3379 ((active_memb_entries + instance->my_failed_list_entries) *
sizeof(
struct srp_addr));
3381 if (msg_len >
sizeof(memb_join_data)) {
3383 "memb_leave message too long. Ignoring message.");
3406 memcpy (&
addr[addr_idx],
3408 active_memb_entries *
3411 active_memb_entries *
3413 memcpy (&
addr[addr_idx],
3453 static void memb_ring_id_set (
3472 token_hold_cancel_send (instance);
3475 if (callback_handle == 0) {
3478 *handle_out = (
void *)callback_handle;
3479 qb_list_init (&callback_handle->
list);
3481 callback_handle->
data = (
void *)
data;
3483 callback_handle->
delete =
delete;
3502 qb_list_del (&h->
list);
3509 static void token_callbacks_execute (
3513 struct qb_list_head *list, *tmp_iter;
3514 struct qb_list_head *callback_listhead = 0;
3530 qb_list_for_each_safe(
list, tmp_iter, callback_listhead) {
3543 if (res == -1 && del == 1) {
3544 qb_list_add (
list, callback_listhead);
3570 if (queue_use != NULL) {
3571 backlog = cs_queue_used (queue_use);
3578 static int fcc_calculate (
3582 unsigned int transmits_allowed;
3583 unsigned int backlog_calc;
3591 instance->
my_cbl = backlog_get (instance);
3600 if (backlog_calc > 0 && transmits_allowed > backlog_calc) {
3601 transmits_allowed = backlog_calc;
3605 return (transmits_allowed);
3611 static void fcc_rtr_limit (
3614 unsigned int *transmits_allowed)
3618 assert (check >= 0);
3625 *transmits_allowed = 0;
3629 static void fcc_token_update (
3632 unsigned int msgs_transmitted)
3634 token->
fcc += msgs_transmitted - instance->
my_trc;
3636 instance->
my_trc = msgs_transmitted;
3643 static int check_orf_token_sanity(
3647 int endian_conversion_needed)
3651 size_t required_len;
3653 if (msg_len <
sizeof(
struct orf_token)) {
3655 "Received orf_token message is too short... ignoring.");
3660 if (endian_conversion_needed) {
3666 required_len =
sizeof(
struct orf_token) + rtr_entries *
sizeof(
struct rtr_item);
3667 if (msg_len < required_len) {
3669 "Received orf_token message is too short... ignoring.");
3677 static int check_mcast_sanity(
3681 int endian_conversion_needed)
3684 if (msg_len <
sizeof(
struct mcast)) {
3686 "Received mcast message is too short... ignoring.");
3694 static int check_memb_merge_detect_sanity(
3698 int endian_conversion_needed)
3703 "Received memb_merge_detect message is too short... ignoring.");
3711 static int check_memb_join_sanity(
3715 int endian_conversion_needed)
3720 size_t required_len;
3722 if (msg_len <
sizeof(
struct memb_join)) {
3724 "Received memb_join message is too short... ignoring.");
3732 if (endian_conversion_needed) {
3738 if (msg_len < required_len) {
3740 "Received memb_join message is too short... ignoring.");
3748 static int check_memb_commit_token_sanity(
3752 int endian_conversion_needed)
3756 size_t required_len;
3760 "Received memb_commit_token message is too short... ignoring.");
3766 if (endian_conversion_needed) {
3772 if (msg_len < required_len) {
3774 "Received memb_commit_token message is too short... ignoring.");
3782 static int check_token_hold_cancel_sanity(
3786 int endian_conversion_needed)
3791 "Received token_hold_cancel message is too short... ignoring.");
3807 static int message_handler_orf_token (
3811 int endian_conversion_needed)
3813 char token_storage[1500];
3814 char token_convert[1500];
3817 unsigned int transmits_allowed;
3818 unsigned int mcasted_retransmit;
3819 unsigned int mcasted_regular;
3820 unsigned int last_aru;
3823 unsigned long long tv_current;
3824 unsigned long long tv_diff;
3826 tv_current = qb_util_nano_current_get ();
3827 tv_diff = tv_current -
tv_old;
3831 "Time since last token %0.4f ms", ((
float)tv_diff) / 1000000.0);
3834 if (check_orf_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
3841 #ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE
3842 if (random()%100 < TEST_DROP_ORF_TOKEN_PERCENTAGE) {
3847 if (endian_conversion_needed) {
3848 orf_token_endian_convert ((
struct orf_token *)msg,
3850 msg = (
struct orf_token *)token_convert;
3857 token = (
struct orf_token *)token_storage;
3858 memcpy (token, msg,
sizeof (
struct orf_token));
3867 start_merge_detect_timeout (instance);
3870 cancel_merge_detect_timeout (instance);
3871 cancel_token_hold_retransmit_timeout (instance);
3877 #ifdef TEST_RECOVERY_MSG_COUNT
3917 messages_free (instance, token->
aru);
3936 reset_heartbeat_timeout(instance);
3939 cancel_heartbeat_timeout(instance);
3960 transmits_allowed = fcc_calculate (instance, token);
3961 mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
3969 fcc_rtr_limit (instance, token, &transmits_allowed);
3970 mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
3977 fcc_token_update (instance, token, mcasted_retransmit +
3980 if (sq_lt_compare (instance->
my_aru, token->
aru) ||
3985 if (token->
aru == token->
seq) {
3991 if (token->
aru == last_aru && token->
aru_addr != 0) {
4006 "FAILED TO RECEIVE");
4010 memb_set_merge (&instance->
my_id, 1,
4037 "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x",
4050 "install seq %x aru %x high seq received %x",
4068 "retrans flag count %x token aru %x install seq %x aru %x %x",
4072 memb_state_operational_enter (instance);
4079 token_send (instance, token, forward_token);
4082 tv_current = qb_util_nano_current_get ();
4083 tv_diff = tv_current -
tv_old;
4087 ((
float)tv_diff) / 1000000.0);
4090 messages_deliver_to_app (instance, 0,
4098 reset_token_timeout (instance);
4099 reset_token_retransmit_timeout (instance);
4103 start_token_hold_retransmit_timeout (instance);
4113 reset_heartbeat_timeout(instance);
4116 cancel_heartbeat_timeout(instance);
4122 static void messages_deliver_to_app (
4125 unsigned int end_point)
4130 struct mcast *mcast_in;
4131 struct mcast mcast_header;
4132 unsigned int range = 0;
4133 int endian_conversion_required;
4134 unsigned int my_high_delivered_stored = 0;
4135 struct srp_addr aligned_system_from;
4150 for (i = 1; i <= range; i++) {
4158 my_high_delivered_stored + i);
4164 my_high_delivered_stored + i, &ptr);
4168 if (res != 0 && skip == 0) {
4179 sort_queue_item_p = ptr;
4181 mcast_in = sort_queue_item_p->
mcast;
4182 assert (mcast_in != (
struct mcast *)0xdeadbeef);
4184 endian_conversion_required = 0;
4186 endian_conversion_required = 1;
4187 mcast_endian_convert (mcast_in, &mcast_header);
4189 memcpy (&mcast_header, mcast_in,
sizeof (
struct mcast));
4192 aligned_system_from = mcast_header.system_from;
4198 memb_set_subset (&aligned_system_from,
4212 "Delivering MCAST message with seq %x to pending delivery queue",
4219 mcast_header.header.nodeid,
4220 ((
char *)sort_queue_item_p->
mcast) + sizeof (
struct mcast),
4222 endian_conversion_required);
4229 static int message_handler_mcast (
4233 int endian_conversion_needed)
4236 struct sq *sort_queue;
4237 struct mcast mcast_header;
4238 struct srp_addr aligned_system_from;
4240 if (check_mcast_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4244 if (endian_conversion_needed) {
4245 mcast_endian_convert (msg, &mcast_header);
4247 memcpy (&mcast_header, msg,
sizeof (
struct mcast));
4258 #ifdef TEST_DROP_MCAST_PERCENTAGE
4259 if (random()%100 < TEST_DROP_MCAST_PERCENTAGE) {
4267 if (memcmp (&instance->
my_ring_id, &mcast_header.ring_id,
4270 aligned_system_from = mcast_header.system_from;
4275 &aligned_system_from, 1,
4281 if (!memb_set_subset (
4282 &aligned_system_from,
4287 memb_set_merge (&aligned_system_from, 1,
4309 mcast_header.ring_id.rep,
4310 (uint64_t)mcast_header.ring_id.seq,
4318 sq_in_range (sort_queue, mcast_header.seq) &&
4319 sq_item_inuse (sort_queue, mcast_header.seq) == 0) {
4333 mcast_header.seq)) {
4340 update_aru (instance);
4349 static int message_handler_memb_merge_detect (
4353 int endian_conversion_needed)
4356 struct srp_addr aligned_system_from;
4358 if (check_memb_merge_detect_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4362 if (endian_conversion_needed) {
4385 memb_set_merge (&aligned_system_from, 1,
4391 if (!memb_set_subset (
4392 &aligned_system_from,
4397 memb_set_merge (&aligned_system_from, 1,
4415 static void memb_join_process (
4421 int gather_entered = 0;
4422 int fail_minus_memb_entries = 0;
4424 struct srp_addr aligned_system_from;
4466 if (memb_set_equal (proc_list,
4471 memb_set_equal (failed_list,
4477 memb_consensus_set (instance, &aligned_system_from);
4480 if (memb_consensus_agreed (instance) && instance->
failed_to_recv == 1) {
4486 memb_state_commit_token_create (instance);
4488 memb_state_commit_enter (instance);
4491 if (memb_consensus_agreed (instance) &&
4492 memb_lowest_in_config (instance)) {
4494 memb_state_commit_token_create (instance);
4496 memb_state_commit_enter (instance);
4501 if (memb_set_subset (proc_list,
4506 memb_set_subset (failed_list,
4513 if (memb_set_subset (&aligned_system_from, 1,
4518 memb_set_merge (proc_list,
4522 if (memb_set_subset (
4523 &instance->
my_id, 1,
4527 &aligned_system_from, 1,
4530 if (memb_set_subset (
4531 &aligned_system_from, 1,
4535 if (memb_set_subset (
4536 &aligned_system_from, 1,
4540 memb_set_merge (failed_list,
4544 memb_set_subtract (fail_minus_memb,
4545 &fail_minus_memb_entries,
4551 memb_set_merge (fail_minus_memb,
4552 fail_minus_memb_entries,
4563 if (gather_entered == 0 &&
4570 static void memb_join_endian_convert (
const struct memb_join *in,
struct memb_join *out)
4593 out_proc_list[i] = srp_addr_endian_convert (in_proc_list[i]);
4596 out_failed_list[i] = srp_addr_endian_convert (in_failed_list[i]);
4622 out_addr[i] = srp_addr_endian_convert (in_addr[i]);
4639 static void orf_token_endian_convert (
const struct orf_token *in,
struct orf_token *out)
4664 static void mcast_endian_convert (
const struct mcast *in,
struct mcast *out)
4681 static void memb_merge_detect_endian_convert (
4694 static int ignore_join_under_operational (
4701 struct srp_addr aligned_system_from;
4708 if (memb_set_subset (&instance->
my_id, 1,
4717 if ((memb_set_subset (&aligned_system_from, 1,
4726 static int message_handler_memb_join (
4730 int endian_conversion_needed)
4733 struct memb_join *memb_join_convert = alloca (msg_len);
4734 struct srp_addr aligned_system_from;
4736 if (check_memb_join_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4740 if (endian_conversion_needed) {
4742 memb_join_endian_convert (msg, memb_join_convert);
4755 if (pause_flush (instance)) {
4764 if (!ignore_join_under_operational (instance,
memb_join)) {
4765 memb_join_process (instance,
memb_join);
4770 memb_join_process (instance,
memb_join);
4774 if (memb_set_subset (&aligned_system_from,
4781 memb_join_process (instance,
memb_join);
4787 if (memb_set_subset (&aligned_system_from,
4794 memb_join_process (instance,
memb_join);
4795 memb_recovery_state_token_loss (instance);
4803 static int message_handler_memb_commit_token (
4807 int endian_conversion_needed)
4817 "got commit token");
4819 if (check_memb_commit_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4823 if (endian_conversion_needed) {
4824 memb_commit_token_endian_convert (msg, memb_commit_token_convert);
4826 memcpy (memb_commit_token_convert, msg, msg_len);
4831 #ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE
4832 if (random()%100 < TEST_DROP_COMMIT_TOKEN_PERCENTAGE) {
4842 memb_set_subtract (sub, &sub_entries,
4846 if (memb_set_equal (
addr,
4853 memb_state_commit_enter (instance);
4882 "Sending initial ORF token");
4885 orf_token_send_initial (instance);
4886 reset_token_timeout (instance);
4887 reset_token_retransmit_timeout (instance);
4894 static int message_handler_token_hold_cancel (
4898 int endian_conversion_needed)
4902 if (check_token_hold_cancel_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4911 timer_function_token_retransmit_timeout (instance);
4917 static int check_message_header_validity(
4920 unsigned int msg_len,
4925 const char *guessed_str;
4926 const char *msg_byte = msg;
4930 "Message received from %s is too short... Ignoring %u.",
4944 if (message_header->
magic == 0xFFFF) {
4948 guessed_str =
"Corosync 2.2";
4949 }
else if (message_header->
magic == 0xFEFE) {
4953 guessed_str =
"Corosync 2.3+";
4954 }
else if (msg_byte[0] == 0x01) {
4958 guessed_str =
"unencrypted Kronosnet";
4959 }
else if (msg_byte[0] >= 0 && msg_byte[0] <= 5) {
4964 guessed_str =
"unencrypted Corosync 2.0/2.1/1.x/OpenAIS";
4975 guessed_str =
"encrypted Kronosnet/Corosync 2.0/2.1/1.x/OpenAIS or unknown";
4979 "Message received from %s has bad magic number (probably sent by %s).. Ignoring",
4988 "Message received from %s has unsupported version %u... Ignoring",
5002 unsigned int msg_len,
5008 if (check_message_header_validity(context, msg, msg_len,
system_from) == -1) {
5012 switch (message_header->
type) {
5033 "Message received from %s has wrong type... ignoring %d.\n",
5035 (
int)message_header->
type);
5053 unsigned short ip_port,
5054 unsigned int iface_no)
5074 unsigned int iface_no)
5098 "Created or loaded sequence id " CS_PRI_RING_ID " for this ring.",
5134 void (*totem_service_ready) (
void))
5213 timer_function_orf_token_timeout(context);
totem_configuration_type
The totem_configuration_type enum.
@ TOTEM_CONFIGURATION_REGULAR
@ TOTEM_CONFIGURATION_TRANSITIONAL
#define MESSAGE_QUEUE_MAX
unsigned char addr[TOTEMIP_ADDRLEN]
totem_callback_token_type
The totem_callback_token_type enum.
@ TOTEM_CALLBACK_TOKEN_SENT
@ TOTEM_CALLBACK_TOKEN_RECEIVED
#define PROCESSOR_COUNT_MAX
#define CS_PRI_RING_ID_SEQ
icmap_map_t icmap_get_global_map(void)
Return global icmap.
#define LOGSYS_LEVEL_DEBUG
struct memb_ring_id ring_id
struct totem_message_header header
struct srp_addr system_from
unsigned int received_flg
struct memb_ring_id ring_id
unsigned int high_delivered
struct totem_message_header header
unsigned char end_of_commit_token[0]
struct memb_ring_id ring_id
struct srp_addr system_from
struct totem_message_header header
unsigned char end_of_memb_join[0]
unsigned long long ring_seq
unsigned int failed_list_entries
unsigned int proc_list_entries
struct totem_message_header header
struct memb_ring_id ring_id
struct srp_addr system_from
int(* handler_functions[6])(struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed)
struct rtr_item rtr_list[0]
struct totem_message_header header
struct memb_ring_id ring_id
struct memb_ring_id ring_id
int(* callback_fn)(enum totem_callback_token_type type, const void *)
enum totem_callback_token_type callback_type
struct totem_message_header header
struct memb_ring_id ring_id
unsigned int max_messages
unsigned int heartbeat_failures_allowed
unsigned int token_timeout
struct totem_logging_configuration totem_logging_configuration
unsigned int downcheck_timeout
unsigned int miss_count_const
struct totem_interface * interfaces
unsigned int fail_to_recv_const
unsigned int merge_timeout
struct totem_interface * orig_interfaces
void(* totem_memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, unsigned int nodeid)
unsigned int token_retransmits_before_loss_const
unsigned int max_network_delay
unsigned int seqno_unchanged_const
unsigned int consensus_timeout
unsigned int send_join_timeout
void(* totem_memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, unsigned int nodeid)
unsigned int token_retransmit_timeout
unsigned int token_warning
unsigned int join_timeout
unsigned int token_hold_timeout
struct totem_ip_address boundto
struct totem_ip_address member_list[PROCESSOR_COUNT_MAX]
struct totem_ip_address mcast_addr
The totem_ip_address struct.
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
void(*) in log_level_security)
struct totem_ip_address mcast_address
struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX]
int consensus_list_entries
int my_merge_detect_timeout_outstanding
qb_loop_timer_handle timer_heartbeat_timeout
unsigned int my_token_seq
qb_loop_timer_handle memb_timer_state_gather_join_timeout
struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX]
qb_loop_timer_handle memb_timer_state_gather_consensus_timeout
uint32_t threaded_mode_enabled
struct srp_addr my_memb_list[PROCESSOR_COUNT_MAX]
int my_leave_memb_entries
struct srp_addr my_proc_list[PROCESSOR_COUNT_MAX]
struct srp_addr my_new_memb_list[PROCESSOR_COUNT_MAX]
int my_failed_list_entries
struct srp_addr my_failed_list[PROCESSOR_COUNT_MAX]
unsigned int use_heartbeat
void(* memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, unsigned int nodeid)
void(*) enum memb_stat memb_state)
qb_loop_timer_handle memb_timer_state_commit_timeout
struct cs_queue new_message_queue
int orf_token_retransmit_size
unsigned int my_high_seq_received
void(* totemsrp_deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required)
uint32_t orf_token_discard
struct qb_list_head token_callback_sent_listhead
unsigned int last_released
int totemsrp_log_level_notice
struct cs_queue new_message_queue_trans
int totemsrp_log_level_trace
char orf_token_retransmit[TOKEN_SIZE_MAX]
struct cs_queue retrans_message_queue
struct memb_ring_id my_ring_id
int totemsrp_log_level_error
void(* totemsrp_waiting_trans_ack_cb_fn)(int waiting_trans_ack)
unsigned int old_ring_state_high_seq_received
qb_loop_timer_handle timer_orf_token_hold_retransmit_timeout
qb_loop_timer_handle timer_pause_timeout
unsigned int my_high_ring_delivered
qb_loop_timer_handle timer_orf_token_retransmit_timeout
struct totem_config * totem_config
int my_deliver_memb_entries
void(* totemsrp_service_ready_fn)(void)
int my_trans_memb_entries
uint32_t originated_orf_token
void * token_recv_event_handle
struct sq recovery_sort_queue
qb_loop_timer_handle timer_orf_token_timeout
qb_loop_timer_handle timer_merge_detect_timeout
unsigned int my_leave_memb_list[PROCESSOR_COUNT_MAX]
void * token_sent_event_handle
unsigned int my_high_delivered
int totemsrp_log_level_security
int totemsrp_log_level_warning
struct memb_commit_token * commit_token
char commit_token_storage[40000]
struct memb_ring_id my_old_ring_id
void(* memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, unsigned int nodeid)
qb_loop_t * totemsrp_poll_handle
unsigned int my_install_seq
qb_loop_timer_handle timer_orf_token_warning
struct srp_addr my_trans_memb_list[PROCESSOR_COUNT_MAX]
struct qb_list_head token_callback_received_listhead
uint32_t waiting_trans_ack
void(* totemsrp_log_printf)(int level, int subsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf
void(* totemsrp_confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
struct sq regular_sort_queue
unsigned long long token_ring_id_seq
struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX]
int totemsrp_log_level_debug
struct totem_ip_address my_addrs[INTERFACE_MAX]
uint32_t continuous_gather
uint64_t recovery_entered
uint64_t memb_commit_token_rx
uint64_t memb_commit_token_tx
uint64_t operational_token_lost
uint64_t operational_entered
uint64_t gather_token_lost
uint64_t commit_token_lost
uint64_t token_hold_cancel_tx
totemsrp_token_stats_t token[TOTEM_TOKEN_STATS_MAX]
uint64_t recovery_token_lost
uint64_t memb_merge_detect_rx
uint64_t memb_merge_detect_tx
uint64_t token_hold_cancel_rx
uint64_t consensus_timeouts
#define swab64(x)
The swab64 macro.
#define swab16(x)
The swab16 macro.
#define swab32(x)
The swab32 macro.
cfg_message_crypto_reconfig_phase_t
void totemconfig_commit_new_params(struct totem_config *totem_config, icmap_map_t map)
void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
const char * totemip_sa_print(const struct sockaddr *sa)
int totemnet_iface_set(void *net_context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
int totemnet_member_remove(void *net_context, const struct totem_ip_address *member, int ring_no)
int totemnet_token_send(void *net_context, const void *msg, unsigned int msg_len)
int totemnet_send_flush(void *net_context)
int totemnet_initialize(qb_loop_t *loop_pt, void **net_context, struct totem_config *totem_config, totemsrp_stats_t *stats, void *context, void(*deliver_fn)(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from), void(*iface_change_fn)(void *context, const struct totem_ip_address *iface_address, unsigned int ring_no), void(*mtu_changed)(void *context, int net_mtu), void(*target_set_completed)(void *context))
void totemnet_buffer_release(void *net_context, void *ptr)
int totemnet_mcast_flush_send(void *net_context, const void *msg, unsigned int msg_len)
int totemnet_member_add(void *net_context, const struct totem_ip_address *local, const struct totem_ip_address *member, int ring_no)
int totemnet_finalize(void *net_context)
int totemnet_crypto_set(void *net_context, const char *cipher_type, const char *hash_type)
int totemnet_ifaces_get(void *net_context, char ***status, unsigned int *iface_count)
int totemnet_processor_count_set(void *net_context, int processor_count)
int totemnet_token_target_set(void *net_context, unsigned int nodeid)
int totemnet_recv_flush(void *net_context)
int totemnet_iface_check(void *net_context)
int totemnet_mcast_noflush_send(void *net_context, const void *msg, unsigned int msg_len)
int totemnet_recv_mcast_empty(void *net_context)
void totemnet_stats_clear(void *net_context)
int totemnet_reconfigure(void *net_context, struct totem_config *totem_config)
void * totemnet_buffer_alloc(void *net_context)
int totemnet_crypto_reconfigure_phase(void *net_context, struct totem_config *totem_config, cfg_message_crypto_reconfig_phase_t phase)
Totem Network interface - also does encryption/decryption.
int totemsrp_my_family_get(void *srp_context)
#define SEQNO_START_TOKEN
unsigned long long ring_seq
#define RETRANSMIT_ENTRIES_MAX
unsigned long long int tv_old
#define log_printf(level, format, args...)
void totemsrp_force_gather(void *context)
void totemsrp_service_ready_register(void *context, void(*totem_service_ready)(void))
int totemsrp_initialize(qb_loop_t *poll_handle, void **srp_context, struct totem_config *totem_config, totempg_stats_t *stats, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void(*waiting_trans_ack_cb_fn)(int waiting_trans_ack))
Create a protocol instance.
int totemsrp_callback_token_create(void *srp_context, void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
#define RETRANS_MESSAGE_QUEUE_SIZE_MAX
void totemsrp_threaded_mode_enable(void *context)
struct rtr_item rtr_list[0]
@ MESSAGE_TYPE_MEMB_COMMIT_TOKEN
@ MESSAGE_TYPE_TOKEN_HOLD_CANCEL
@ MESSAGE_TYPE_MEMB_MERGE_DETECT
void totemsrp_net_mtu_adjust(struct totem_config *totem_config)
@ MESSAGE_NOT_ENCAPSULATED
unsigned int failed_list_entries
struct message_handlers totemsrp_message_handlers
#define LEAVE_DUMMY_NODEID
#define QUEUE_RTR_ITEMS_SIZE_MAX
void main_iface_change_fn(void *context, const struct totem_ip_address *iface_address, unsigned int iface_no)
@ TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED
@ TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE
@ TOTEMSRP_GSFROM_FAILED_TO_RECEIVE
@ TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT
@ TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE
@ TOTEMSRP_GSFROM_MERGE_DURING_JOIN
@ TOTEMSRP_GSFROM_INTERFACE_CHANGE
@ TOTEMSRP_GSFROM_GATHER_MISSING1
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE
@ TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE
@ TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE
@ TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE
@ TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE
@ TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY
int totemsrp_crypto_set(void *srp_context, const char *cipher_type, const char *hash_type)
int totemsrp_avail(void *srp_context)
Return number of available messages that can be queued.
void totemsrp_stats_clear(void *context, int flags)
int totemsrp_iface_set(void *context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
void totemsrp_finalize(void *srp_context)
struct memb_ring_id ring_id
void totemsrp_trans_ack(void *context)
int totemsrp_crypto_reconfigure_phase(void *context, struct totem_config *totem_config, cfg_message_crypto_reconfig_phase_t phase)
unsigned int totemsrp_my_nodeid_get(void *srp_context)
void totemsrp_event_signal(void *srp_context, enum totem_event_type type, int value)
void totemsrp_callback_token_destroy(void *srp_context, void **handle_out)
unsigned int received_flg
struct message_item __attribute__
int totemsrp_member_add(void *context, const struct totem_ip_address *member, int iface_no)
int totemsrp_ifaces_get(void *srp_context, unsigned int nodeid, unsigned int *interface_id, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
int totemsrp_reconfigure(void *context, struct totem_config *totem_config)
unsigned int high_delivered
struct srp_addr system_from
unsigned int proc_list_entries
const char * gather_state_from_desc[]
int totemsrp_member_remove(void *context, const struct totem_ip_address *member, int iface_no)
int totemsrp_mcast(void *srp_context, struct iovec *iovec, unsigned int iov_len, int guarantee)
Multicast a message.
void main_deliver_fn(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from)
Totem Single Ring Protocol.
#define TOTEMPG_STATS_CLEAR_TRANSPORT
#define TOTEM_TOKEN_STATS_MAX