90 #include <sys/types.h>
91 #include <sys/socket.h>
92 #include <netinet/in.h>
93 #include <arpa/inet.h>
105 #include <qb/qbloop.h>
106 #include <qb/qbipcs.h>
108 #define LOGSYS_UTILS_ONLY 1
114 #define min(a,b) ((a) < (b)) ? a : b
121 #if !(defined(__i386__) || defined(__x86_64__))
125 #define TOTEMPG_NEED_ALIGN 1
155 #define TOTEMPG_PACKET_SIZE (totempg_totem_config->net_mtu - \
156 sizeof (struct totempg_mcast))
163 static int mcast_packed_msg_count = 0;
165 static int totempg_reserved = 1;
167 static unsigned int totempg_size_limit;
171 static uint32_t totempg_threaded_mode = 0;
176 static int totempg_log_level_security;
177 static int totempg_log_level_error;
178 static int totempg_log_level_warning;
179 static int totempg_log_level_notice;
180 static int totempg_log_level_debug;
181 static int totempg_subsys_id;
182 static void (*totempg_log_printf) (
185 const char *
function,
188 const char *format, ...)
__attribute__((format(printf, 6, 7)));
233 static unsigned char *fragmentation_data;
235 static int fragment_size = 0;
237 static int fragment_continuation = 0;
239 static int totempg_waiting_transack = 0;
245 unsigned int msg_len,
246 int endian_conversion_required);
250 const unsigned int *member_list,
size_t member_list_entries,
251 const unsigned int *left_list,
size_t left_list_entries,
252 const unsigned int *joined_list,
size_t joined_list_entries,
263 static unsigned char next_fragment = 1;
265 static pthread_mutex_t totempg_mutex = PTHREAD_MUTEX_INITIALIZER;
267 static pthread_mutex_t callback_token_mutex = PTHREAD_MUTEX_INITIALIZER;
269 static pthread_mutex_t mcast_msg_mutex = PTHREAD_MUTEX_INITIALIZER;
271 #define log_printf(level, format, args...) \
273 totempg_log_printf(level, \
275 __FUNCTION__, __FILE__, __LINE__, \
279 static int msg_count_send_ok (
int msg_count);
281 static int byte_count_send_ok (
int byte_count);
283 static void totempg_waiting_trans_ack_cb (
int waiting_trans_ack)
285 log_printf(LOG_DEBUG,
"waiting_trans_ack changed to %u", waiting_trans_ack);
286 totempg_waiting_transack = waiting_trans_ack;
293 struct list_head *active_assembly_list_inuse;
294 struct list_head *active_assembly_list_free;
296 if (totempg_waiting_transack) {
297 active_assembly_list_inuse = &assembly_list_inuse_trans;
298 active_assembly_list_free = &assembly_list_free_trans;
300 active_assembly_list_inuse = &assembly_list_inuse;
301 active_assembly_list_free = &assembly_list_free;
307 for (list = active_assembly_list_inuse->
next;
308 list != active_assembly_list_inuse;
311 assembly =
list_entry (list,
struct assembly, list);
313 if (nodeid == assembly->
nodeid) {
321 if (list_empty (active_assembly_list_free) == 0) {
322 assembly =
list_entry (active_assembly_list_free->
next,
struct assembly, list);
323 list_del (&assembly->
list);
324 list_add (&assembly->
list, active_assembly_list_inuse);
335 assembly = malloc (
sizeof (
struct assembly));
341 assembly->
data[0] = 0;
345 list_init (&assembly->
list);
346 list_add (&assembly->
list, active_assembly_list_inuse);
351 static void assembly_deref (
struct assembly *assembly)
353 struct list_head *active_assembly_list_free;
355 if (totempg_waiting_transack) {
356 active_assembly_list_free = &assembly_list_free_trans;
358 active_assembly_list_free = &assembly_list_free;
361 list_del (&assembly->
list);
362 list_add (&assembly->
list, active_assembly_list_free);
365 static void assembly_deref_from_normal_and_trans (
int nodeid)
369 struct list_head *active_assembly_list_inuse;
370 struct list_head *active_assembly_list_free;
371 struct assembly *assembly;
373 for (j = 0; j < 2; j++) {
375 active_assembly_list_inuse = &assembly_list_inuse;
376 active_assembly_list_free = &assembly_list_free;
378 active_assembly_list_inuse = &assembly_list_inuse_trans;
379 active_assembly_list_free = &assembly_list_free_trans;
382 for (list = active_assembly_list_inuse->
next;
383 list != active_assembly_list_inuse;
386 list_next = list->
next;
387 assembly =
list_entry (list,
struct assembly, list);
389 if (nodeid == assembly->
nodeid) {
390 list_del (&assembly->
list);
391 list_add (&assembly->
list, active_assembly_list_free);
398 static inline void app_confchg_fn (
400 const unsigned int *member_list,
size_t member_list_entries,
401 const unsigned int *left_list,
size_t left_list_entries,
402 const unsigned int *joined_list,
size_t joined_list_entries,
414 for (i = 0; i < left_list_entries; i++) {
415 assembly_deref_from_normal_and_trans (left_list[i]);
418 for (list = totempg_groups_list.
next;
419 list != &totempg_groups_list;
438 static inline void group_endian_convert (
442 unsigned short *group_len;
446 #ifdef TOTEMPG_NEED_ALIGN
450 if ((
size_t)msg % 4 != 0) {
451 aligned_msg = alloca(msg_len);
452 memcpy(aligned_msg, msg, msg_len);
460 group_len = (
unsigned short *)aligned_msg;
461 group_len[0] =
swab16(group_len[0]);
462 for (i = 1; i < group_len[0] + 1; i++) {
463 group_len[i] =
swab16(group_len[i]);
466 if (aligned_msg != msg) {
467 memcpy(msg, aligned_msg, msg_len);
471 static inline int group_matches (
473 unsigned int iov_len,
475 unsigned int group_b_cnt,
476 unsigned int *adjust_iovec)
478 unsigned short *group_len;
482 #ifdef TOTEMPG_NEED_ALIGN
483 struct iovec iovec_aligned = { NULL, 0 };
486 assert (iov_len == 1);
488 #ifdef TOTEMPG_NEED_ALIGN
492 if ((
size_t)iovec->iov_base % 4 != 0) {
493 iovec_aligned.iov_base = alloca(iovec->iov_len);
494 memcpy(iovec_aligned.iov_base, iovec->iov_base, iovec->iov_len);
495 iovec_aligned.iov_len = iovec->iov_len;
496 iovec = &iovec_aligned;
500 group_len = (
unsigned short *)iovec->iov_base;
501 group_name = ((
char *)iovec->iov_base) +
502 sizeof (
unsigned short) * (group_len[0] + 1);
508 *adjust_iovec =
sizeof (
unsigned short) * (group_len[0] + 1);
509 for (i = 1; i < group_len[0] + 1; i++) {
510 *adjust_iovec += group_len[i];
516 for (i = 1; i < group_len[0] + 1; i++) {
517 for (j = 0; j < group_b_cnt; j++) {
518 if ((group_len[i] == groups_b[j].group_len) &&
519 (memcmp (groups_b[j].group, group_name, group_len[i]) == 0)) {
523 group_name += group_len[i];
529 static inline void app_deliver_fn (
532 unsigned int msg_len,
533 int endian_conversion_required)
536 struct iovec stripped_iovec;
537 unsigned int adjust_iovec;
541 struct iovec aligned_iovec = { NULL, 0 };
543 if (endian_conversion_required) {
544 group_endian_convert (msg, msg_len);
553 #ifdef TOTEMPG_NEED_ALIGN
557 aligned_iovec.iov_base = alloca(msg_len);
558 aligned_iovec.iov_len = msg_len;
559 memcpy(aligned_iovec.iov_base, msg, msg_len);
561 aligned_iovec.iov_base = msg;
562 aligned_iovec.iov_len = msg_len;
565 iovec = &aligned_iovec;
567 for (list = totempg_groups_list.
next;
568 list != &totempg_groups_list;
572 if (group_matches (iovec, 1, instance->
groups, instance->
groups_cnt, &adjust_iovec)) {
573 stripped_iovec.iov_len = iovec->iov_len - adjust_iovec;
574 stripped_iovec.iov_base = (
char *)iovec->iov_base + adjust_iovec;
580 if ((
char *)iovec->iov_base + adjust_iovec % 4 != 0) {
584 stripped_iovec.iov_base =
585 alloca (stripped_iovec.iov_len);
586 memcpy (stripped_iovec.iov_base,
587 (
char *)iovec->iov_base + adjust_iovec,
588 stripped_iovec.iov_len);
593 stripped_iovec.iov_base,
594 stripped_iovec.iov_len,
595 endian_conversion_required);
600 static void totempg_confchg_fn (
602 const unsigned int *member_list,
size_t member_list_entries,
603 const unsigned int *left_list,
size_t left_list_entries,
604 const unsigned int *joined_list,
size_t joined_list_entries,
608 app_confchg_fn (configuration_type,
609 member_list, member_list_entries,
610 left_list, left_list_entries,
611 joined_list, joined_list_entries,
615 static void totempg_deliver_fn (
618 unsigned int msg_len,
619 int endian_conversion_required)
622 unsigned short *msg_lens;
624 struct assembly *assembly;
631 struct iovec iov_delv;
633 assembly = assembly_ref (nodeid);
642 if (endian_conversion_required) {
648 msg_count *
sizeof (
unsigned short);
650 memcpy (header, msg, datasize);
653 msg_lens = (
unsigned short *) (header +
sizeof (
struct totempg_mcast));
654 if (endian_conversion_required) {
656 msg_lens[i] =
swab16 (msg_lens[i]);
660 memcpy (&assembly->data[assembly->index], &data[datasize],
671 iov_delv.iov_base = (
void *)&assembly->data[0];
672 iov_delv.iov_len = assembly->index + msg_lens[0];
691 assembly->index += msg_lens[0];
692 iov_delv.iov_base = (
void *)&assembly->data[assembly->index];
693 iov_delv.iov_len = msg_lens[1];
698 if (continuation == assembly->last_frag_num) {
701 app_deliver_fn(nodeid, iov_delv.iov_base, iov_delv.iov_len,
702 endian_conversion_required);
703 assembly->index += msg_lens[i];
704 iov_delv.iov_base = (
void *)&assembly->data[assembly->index];
705 if (i < (msg_count - 1)) {
706 iov_delv.iov_len = msg_lens[i + 1];
710 log_printf (LOG_DEBUG,
"fragmented continuation %u is not equal to assembly last_frag_num %u",
711 continuation, assembly->last_frag_num);
720 assembly->last_frag_num = 0;
722 assembly_deref (assembly);
728 memmove (&assembly->data[0],
729 &assembly->data[assembly->index],
730 msg_lens[msg_count]);
749 struct iovec iovecs[3];
751 if (totempg_threaded_mode == 1) {
752 pthread_mutex_lock (&mcast_msg_mutex);
754 if (mcast_packed_msg_count == 0) {
755 if (totempg_threaded_mode == 1) {
756 pthread_mutex_unlock (&mcast_msg_mutex);
761 if (totempg_threaded_mode == 1) {
762 pthread_mutex_unlock (&mcast_msg_mutex);
775 fragment_continuation = 0;
777 mcast.
msg_count = mcast_packed_msg_count;
779 iovecs[0].iov_base = (
void *)&mcast;
781 iovecs[1].iov_base = (
void *)mcast_packed_msg_lens;
782 iovecs[1].iov_len = mcast_packed_msg_count *
sizeof (
unsigned short);
783 iovecs[2].iov_base = (
void *)&fragmentation_data[0];
784 iovecs[2].iov_len = fragment_size;
787 mcast_packed_msg_count = 0;
790 if (totempg_threaded_mode == 1) {
791 pthread_mutex_unlock (&mcast_msg_mutex);
800 qb_loop_t *poll_handle,
805 totempg_totem_config = totem_config;
815 if (fragmentation_data == 0) {
827 totempg_waiting_trans_ack_cb);
830 &callback_token_received_handle,
833 callback_token_received_fn,
837 (totempg_totem_config->
net_mtu -
840 list_init (&totempg_groups_list);
847 if (totempg_threaded_mode == 1) {
848 pthread_mutex_lock (&totempg_mutex);
851 if (totempg_threaded_mode == 1) {
852 pthread_mutex_unlock (&totempg_mutex);
859 static int mcast_msg (
860 struct iovec *iovec_in,
861 unsigned int iov_len,
866 struct iovec iovecs[3];
867 struct iovec iovec[64];
870 int max_packet_size = 0;
875 if (totempg_threaded_mode == 1) {
876 pthread_mutex_lock (&mcast_msg_mutex);
883 assert (iov_len < 64);
884 for (dest = 0, src = 0; src < iov_len; src++) {
885 if (iovec_in[src].iov_len) {
886 memcpy (&iovec[dest++], &iovec_in[src],
887 sizeof (
struct iovec));
893 (
sizeof (
unsigned short) * (mcast_packed_msg_count + 1));
895 mcast_packed_msg_lens[mcast_packed_msg_count] = 0;
900 for (i = 0; i < iov_len; i++) {
901 total_size += iovec[i].iov_len;
904 if (byte_count_send_ok (total_size +
sizeof(
unsigned short) *
905 (mcast_packed_msg_count)) == 0) {
907 if (totempg_threaded_mode == 1) {
908 pthread_mutex_unlock (&mcast_msg_mutex);
914 for (i = 0; i < iov_len; ) {
917 copy_len = iovec[i].iov_len - copy_base;
925 if ((copy_len + fragment_size) <
926 (max_packet_size -
sizeof (
unsigned short))) {
928 memcpy (&fragmentation_data[fragment_size],
929 (
char *)iovec[i].iov_base + copy_base, copy_len);
930 fragment_size += copy_len;
931 mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len;
942 unsigned char *data_ptr;
944 copy_len =
min(copy_len, max_packet_size - fragment_size);
945 if( copy_len == max_packet_size )
946 data_ptr = (
unsigned char *)iovec[i].iov_base + copy_base;
948 data_ptr = fragmentation_data;
949 memcpy (&fragmentation_data[fragment_size],
950 (
unsigned char *)iovec[i].iov_base + copy_base, copy_len);
953 memcpy (&fragmentation_data[fragment_size],
954 (
unsigned char *)iovec[i].iov_base + copy_base, copy_len);
955 mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len;
962 if ((i < (iov_len - 1)) ||
963 ((copy_base + copy_len) < iovec[i].iov_len)) {
964 if (!next_fragment) {
967 fragment_continuation = next_fragment;
969 assert(fragment_continuation != 0);
972 fragment_continuation = 0;
978 mcast.
msg_count = ++mcast_packed_msg_count;
979 iovecs[0].iov_base = (
void *)&mcast;
981 iovecs[1].iov_base = (
void *)mcast_packed_msg_lens;
982 iovecs[1].iov_len = mcast_packed_msg_count *
983 sizeof(
unsigned short);
984 iovecs[2].iov_base = (
void *)data_ptr;
985 iovecs[2].iov_len = max_packet_size;
995 mcast_packed_msg_lens[0] = 0;
996 mcast_packed_msg_count = 0;
1003 if ((copy_base + copy_len) == iovec[i].iov_len) {
1012 copy_base += copy_len;
1022 if (mcast_packed_msg_lens[mcast_packed_msg_count]) {
1023 mcast_packed_msg_count++;
1027 if (totempg_threaded_mode == 1) {
1028 pthread_mutex_unlock (&mcast_msg_mutex);
1036 static int msg_count_send_ok (
1044 return ((avail - totempg_reserved) > msg_count);
1047 static int byte_count_send_ok (
1050 unsigned int msg_count = 0;
1055 msg_count = (byte_count / (totempg_totem_config->
net_mtu -
sizeof (
struct totempg_mcast) - 16)) + 1;
1057 return (avail >= msg_count);
1060 static int send_reserve (
1063 unsigned int msg_count = 0;
1065 msg_count = (msg_size / (totempg_totem_config->
net_mtu -
sizeof (
struct totempg_mcast) - 16)) + 1;
1072 static void send_release (
1079 #ifndef HAVE_SMALL_MEMORY_FOOTPRINT
1080 #undef MESSAGE_QUEUE_MAX
1081 #define MESSAGE_QUEUE_MAX ((4 * MESSAGE_SIZE_MAX) / totempg_totem_config->net_mtu)
1084 static uint32_t q_level_precent_used(
void)
1097 if (totempg_threaded_mode == 1) {
1098 pthread_mutex_lock (&callback_token_mutex);
1102 if (totempg_threaded_mode == 1) {
1103 pthread_mutex_unlock (&callback_token_mutex);
1111 if (totempg_threaded_mode == 1) {
1112 pthread_mutex_lock (&callback_token_mutex);
1115 if (totempg_threaded_mode == 1) {
1116 pthread_mutex_unlock (&callback_token_mutex);
1125 void **totempg_groups_instance,
1127 void (*deliver_fn) (
1128 unsigned int nodeid,
1130 unsigned int msg_len,
1131 int endian_conversion_required),
1133 void (*confchg_fn) (
1135 const unsigned int *member_list,
size_t member_list_entries,
1136 const unsigned int *left_list,
size_t left_list_entries,
1137 const unsigned int *joined_list,
size_t joined_list_entries,
1142 if (totempg_threaded_mode == 1) {
1143 pthread_mutex_lock (&totempg_mutex);
1147 if (instance == NULL) {
1155 instance->
q_level = QB_LOOP_MED;
1156 list_init (&instance->
list);
1157 list_add (&instance->
list, &totempg_groups_list);
1159 if (totempg_threaded_mode == 1) {
1160 pthread_mutex_unlock (&totempg_mutex);
1162 *totempg_groups_instance = instance;
1166 if (totempg_threaded_mode == 1) {
1167 pthread_mutex_unlock (&totempg_mutex);
1173 void *totempg_groups_instance,
1179 unsigned int res = 0;
1181 if (totempg_threaded_mode == 1) {
1182 pthread_mutex_lock (&totempg_mutex);
1185 new_groups = realloc (instance->
groups,
1188 if (new_groups == 0) {
1194 instance->
groups = new_groups;
1198 if (totempg_threaded_mode == 1) {
1199 pthread_mutex_unlock (&totempg_mutex);
1205 void *totempg_groups_instance,
1209 if (totempg_threaded_mode == 1) {
1210 pthread_mutex_lock (&totempg_mutex);
1213 if (totempg_threaded_mode == 1) {
1214 pthread_mutex_unlock (&totempg_mutex);
1219 #define MAX_IOVECS_FROM_APP 32
1220 #define MAX_GROUPS_PER_MSG 32
1223 void *totempg_groups_instance,
1224 const struct iovec *iovec,
1225 unsigned int iov_len,
1230 struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP];
1234 if (totempg_threaded_mode == 1) {
1235 pthread_mutex_lock (&totempg_mutex);
1245 iovec_mcast[i + 1].iov_base = (
void *) instance->
groups[i].
group;
1247 iovec_mcast[0].iov_len = (instance->
groups_cnt + 1) *
sizeof (
unsigned short);
1248 iovec_mcast[0].iov_base = group_len;
1249 for (i = 0; i < iov_len; i++) {
1250 iovec_mcast[i + instance->
groups_cnt + 1].iov_len = iovec[i].iov_len;
1251 iovec_mcast[i + instance->
groups_cnt + 1].iov_base = iovec[i].iov_base;
1254 res = mcast_msg (iovec_mcast, iov_len + instance->
groups_cnt + 1, guarantee);
1256 if (totempg_threaded_mode == 1) {
1257 pthread_mutex_unlock (&totempg_mutex);
1263 static void check_q_level(
1264 void *totempg_groups_instance)
1267 int32_t old_level = instance->
q_level;
1268 int32_t percent_used = q_level_precent_used();
1279 if (totem_queue_level_changed && old_level != instance->
q_level) {
1280 totem_queue_level_changed(instance->
q_level);
1285 void *totempg_groups_instance)
1289 check_q_level(instance);
1293 void *totempg_groups_instance,
1294 const struct iovec *iovec,
1295 unsigned int iov_len)
1298 unsigned int size = 0;
1300 unsigned int reserved = 0;
1302 if (totempg_threaded_mode == 1) {
1303 pthread_mutex_lock (&totempg_mutex);
1304 pthread_mutex_lock (&mcast_msg_mutex);
1310 for (i = 0; i < iov_len; i++) {
1311 size += iovec[i].iov_len;
1314 if (size >= totempg_size_limit) {
1319 if (byte_count_send_ok (size)) {
1320 reserved = send_reserve (size);
1326 check_q_level(instance);
1328 if (totempg_threaded_mode == 1) {
1329 pthread_mutex_unlock (&mcast_msg_mutex);
1330 pthread_mutex_unlock (&totempg_mutex);
1338 if (totempg_threaded_mode == 1) {
1339 pthread_mutex_lock (&totempg_mutex);
1340 pthread_mutex_lock (&mcast_msg_mutex);
1342 send_release (msg_count);
1343 if (totempg_threaded_mode == 1) {
1344 pthread_mutex_unlock (&mcast_msg_mutex);
1345 pthread_mutex_unlock (&totempg_mutex);
1351 void *totempg_groups_instance,
1355 const struct iovec *iovec,
1356 unsigned int iov_len)
1359 struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP];
1363 if (totempg_threaded_mode == 1) {
1364 pthread_mutex_lock (&totempg_mutex);
1370 group_len[0] = groups_cnt;
1371 for (i = 0; i < groups_cnt; i++) {
1373 iovec_mcast[i + 1].iov_len = groups[i].
group_len;
1374 iovec_mcast[i + 1].iov_base = (
void *) groups[i].group;
1376 iovec_mcast[0].iov_len = (groups_cnt + 1) *
sizeof (
unsigned short);
1377 iovec_mcast[0].iov_base = group_len;
1378 for (i = 0; i < iov_len; i++) {
1379 iovec_mcast[i + groups_cnt + 1].iov_len = iovec[i].iov_len;
1380 iovec_mcast[i + groups_cnt + 1].iov_base = iovec[i].iov_base;
1383 res = mcast_msg (iovec_mcast, iov_len + groups_cnt + 1, guarantee);
1385 if (totempg_threaded_mode == 1) {
1386 pthread_mutex_unlock (&totempg_mutex);
1395 void *totempg_groups_instance,
1398 const struct iovec *iovec,
1399 unsigned int iov_len)
1401 unsigned int size = 0;
1405 if (totempg_threaded_mode == 1) {
1406 pthread_mutex_lock (&totempg_mutex);
1409 for (i = 0; i < groups_cnt; i++) {
1412 for (i = 0; i < iov_len; i++) {
1413 size += iovec[i].iov_len;
1416 res = msg_count_send_ok (size);
1418 if (totempg_threaded_mode == 1) {
1419 pthread_mutex_unlock (&totempg_mutex);
1425 unsigned int nodeid,
1427 unsigned int interfaces_size,
1429 unsigned int *iface_count)
1450 return &totempg_stats;
1454 const char *cipher_type,
1455 const char *hash_type)
1473 #define ONE_IFACE_LEN 63
1480 unsigned int iface_count;
1484 iface_string[0] =
'\0';
1488 return (
"no interface found for nodeid");
1493 for (i = 0; i < iface_count; i++) {
1497 strcat (iface_string, one_iface);
1499 return (iface_string);
1512 void (*totem_service_ready) (
void))
1519 totem_queue_level_changed = fn;
1538 totempg_threaded_mode = 1;
unsigned char last_frag_num
int totempg_initialize(qb_loop_t *poll_handle, struct totem_config *totem_config)
Initialize the totem process groups abstraction.
int totemmrp_ifaces_get(unsigned int nodeid, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
void totemmrp_finalize(void)
Totem Single Ring Protocol.
#define TOTEMPG_NEED_ALIGN
int totempg_groups_initialize(void **totempg_groups_instance, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id))
Initialize a groups instance.
void totemmrp_callback_token_destroy(void *handle_out)
void * totempg_get_stats(void)
const char * totemip_print(const struct totem_ip_address *addr)
Totem Single Ring Protocol.
void(* confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
int totempg_groups_join(void *totempg_groups_instance, const struct totempg_group *groups, size_t group_cnt)
void totempg_queue_level_register_callback(totem_queue_level_changed_fn fn)
#define TOTEMPG_PACKET_SIZE
struct totempg_group * groups
int totemmrp_initialize(qb_loop_t *poll_handle, struct totem_config *totem_config, totempg_stats_t *stats, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void(*waiting_trans_ack_cb_fn)(int waiting_trans_ack))
Initialize the group messaging interface.
void totempg_trans_ack(void)
void totemsrp_net_mtu_adjust(struct totem_config *totem_config)
int totemmrp_member_add(const struct totem_ip_address *member, int ring_no)
int totemmrp_avail(void)
Return number of available messages that can be queued.
struct message_header header
#define log_printf(level, format, args...)
int totemmrp_my_family_get(void)
int totemmrp_crypto_set(const char *cipher_type, const char *hash_type)
void totemmrp_trans_ack(void)
int totempg_groups_send_ok_groups(void *totempg_groups_instance, const struct totempg_group *groups, size_t groups_cnt, const struct iovec *iovec, unsigned int iov_len)
unsigned char data[MESSAGE_SIZE_MAX]
int totempg_groups_mcast_groups(void *totempg_groups_instance, int guarantee, const struct totempg_group *groups, size_t groups_cnt, const struct iovec *iovec, unsigned int iov_len)
int totempg_ifaces_get(unsigned int nodeid, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
void(*) in log_level_security)
#define MAX_GROUPS_PER_MSG
const char * totempg_ifaces_print(unsigned int nodeid)
void totempg_threaded_mode_enable(void)
DECLARE_LIST_INIT(assembly_list_inuse)
int totemmrp_member_remove(const struct totem_ip_address *member, int ring_no)
void(* totem_queue_level_changed_fn)(enum totem_q_level level)
int totempg_groups_leave(void *totempg_groups_instance, const struct totempg_group *groups, size_t group_cnt)
int totemmrp_mcast(struct iovec *iovec, unsigned int iov_len, int priority)
Multicast a message.
int totempg_my_family_get(void)
void(* deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required)
unsigned char continuation
void * callback_token_received_handle
int totemmrp_callback_token_create(void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
int totemmrp_ring_reenable(void)
int totempg_groups_mcast_joined(void *totempg_groups_instance, const struct iovec *iovec, unsigned int iov_len, int guarantee)
void totempg_check_q_level(void *totempg_groups_instance)
unsigned int totempg_my_nodeid_get(void)
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
int totempg_groups_joined_release(int msg_count)
void totempg_event_signal(enum totem_event_type type, int value)
int totempg_member_remove(const struct totem_ip_address *member, int ring_no)
unsigned int totemmrp_my_nodeid_get(void)
void totempg_callback_token_destroy(void *handle_out)
void totemmrp_service_ready_register(void(*totem_service_ready)(void))
int totempg_groups_joined_reserve(void *totempg_groups_instance, const struct iovec *iovec, unsigned int iov_len)
int totempg_ring_reenable(void)
#define MESSAGE_QUEUE_MAX
void totemmrp_threaded_mode_enable(void)
void totempg_service_ready_register(void(*totem_service_ready)(void))
#define list_entry(ptr, type, member)
struct totem_logging_configuration totem_logging_configuration
enum throw_away_mode throw_away_mode
int totempg_crypto_set(const char *cipher_type, const char *hash_type)
void totemmrp_event_signal(enum totem_event_type type, int value)
static void(*) struct totem_config totempg_totem_config)
void totempg_finalize(void)
struct memb_ring_id ring_id
int totempg_member_add(const struct totem_ip_address *member, int ring_no)
Totem Single Ring Protocol.
int totempg_callback_token_create(void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
struct totempg_mcast_header header
totem_callback_token_type