41 #include <sys/types.h>
45 #include <sys/socket.h>
48 #include <sys/resource.h>
49 #include <netinet/in.h>
50 #include <arpa/inet.h>
62 #include <qb/qbipc_common.h>
71 #define YKD_PROCESSOR_COUNT_MAX 32
115 static void *ykd_group_handle;
119 static int state_received_confchg_entries;
123 static int state_received_process_entries;
129 static int ykd_view_list_entries;
131 static int session_id_max;
137 static int ambiguous_sessions_max_entries;
139 static int ykd_primary_designated = 0;
149 static void (*ykd_primary_callback_fn) (
150 const unsigned int *view_list,
151 size_t view_list_entries,
152 int primary_designated,
155 static void ykd_state_init (
void)
164 static int ykd_state_send_msg (
const void *context)
166 struct iovec iovec[2];
172 iovec[0].iov_base = (
char *)&
header;
173 iovec[0].iov_len =
sizeof (
struct ykd_header);
175 iovec[1].iov_len =
sizeof (
struct ykd_state);
183 static void ykd_state_send (
void)
191 static int ykd_attempt_send_msg (
const void *context)
199 iovec.iov_base = (
char *)&
header;
208 static void ykd_attempt_send (
void)
212 ykd_attempt_send_msg,
216 static void compute (
void)
223 ambiguous_sessions_max_entries = 0;
225 for (i = 0; i < state_received_process_entries; i++) {
245 memcpy (&ambiguous_sessions_max[ambiguous_sessions_max_entries],
248 ambiguous_sessions_max_entries += 1;
254 static int subquorum (
255 unsigned int *member_list,
256 int member_list_entries,
259 int intersections = 0;
263 for (i = 0; i < member_list_entries; i++) {
287 static int decide (
void)
294 if (subquorum (ykd_view_list, ykd_view_list_entries, last_primary_max) == 0) {
298 for (i = 0; i < ambiguous_sessions_max_entries; i++) {
299 if (subquorum (ykd_view_list, ykd_view_list_entries, &ambiguous_sessions_max[i]) == 0) {
320 static void ykd_state_endian_convert (
struct ykd_state *state)
330 ykd_session_endian_convert (&state->
last_formed[i]);
338 static void ykd_deliver_fn (
341 unsigned int msg_len,
342 int endian_conversion_required)
344 int all_received = 1;
345 int state_position = 0;
348 char *msg_state = (
char *)msg +
sizeof (
struct ykd_header);
356 "This processor is within the primary component.");
357 primary_designated = 1;
359 ykd_primary_callback_fn (
361 ykd_view_list_entries,
367 if (endian_conversion_required &&
369 ykd_state_endian_convert ((
struct ykd_state *)msg_state);
375 for (state_position = 0; state_position < state_received_confchg_entries; state_position++) {
376 if (
nodeid == state_received_process[state_position].
nodeid) {
381 state_received_process[state_position].
received = 1;
389 for (i = 0; i < state_received_confchg_entries; i++) {
390 if (state_received_process[i].received == 0) {
404 assert (msg_len >
sizeof (
struct ykd_header));
408 memcpy (&state_received_process[state_position].
ykd_state,
415 for (i = 0; i < state_received_confchg_entries; i++) {
416 state_received_process[i].
received = 0;
427 ykd_view_list, sizeof (
unsigned int) * ykd_view_list_entries);
438 "This processor is within the primary component.");
439 ykd_primary_designated = 1;
441 ykd_primary_callback_fn (
443 ykd_view_list_entries,
444 ykd_primary_designated,
457 static void ykd_confchg_fn (
459 const unsigned int *member_list,
size_t member_list_entries,
460 const unsigned int *left_list,
size_t left_list_entries,
461 const unsigned int *joined_list,
size_t joined_list_entries,
478 memcpy (ykd_view_list, member_list,
479 member_list_entries *
sizeof (
unsigned int));
480 ykd_view_list_entries = member_list_entries;
484 ykd_primary_designated = 0;
486 ykd_primary_callback_fn (
488 ykd_view_list_entries,
489 ykd_primary_designated,
492 memset (&state_received_confchg, 0,
sizeof (state_received_confchg));
493 for (i = 0; i < member_list_entries; i++) {
494 state_received_confchg[i].
nodeid = member_list[i];
495 state_received_confchg[i].
received = 0;
497 memcpy (state_received_process, state_received_confchg,
498 sizeof (state_received_confchg));
500 state_received_confchg_entries = member_list_entries;
501 state_received_process_entries = member_list_entries;
515 const char *error = NULL;
517 ykd_primary_callback_fn = set_primary;
520 if (set_primary == 0) {
521 error = (
char *)
"set primary not set";
536 return ((
char *)error);