#include "dht-chord.h"
static int nb_bits = 24;
static int nb_keys = 0;
static int timeout = 50;
static int max_simulation_time = 1000;
static int periodic_stabilize_delay = 20;
static int periodic_fix_fingers_delay = 120;
static int periodic_check_predecessor_delay = 120;
static int periodic_lookup_delay = 10;
static const double sleep_delay = 4.9999;
static int *powers2;
static void chord_initialize(void)
{
unsigned int pow = 1;
unsigned i;
for (i = 0; i < nb_bits; i++) {
powers2[i] = pow;
pow = pow << 1;
}
nb_keys = pow;
char descr[512];
}
}
static void chord_exit(void)
{
unsigned i;
}
}
static int normalize(int id)
{
return id % nb_keys;
}
static int is_in_interval(int id, int start, int end)
{
int i = normalize(id);
int s = normalize(start);
int e = normalize(end);
if (e < s) {
e += nb_keys;
}
if (i < s) {
i += nb_keys;
}
return i <= e;
}
static void get_mailbox(int node_id, char* mailbox)
{
snprintf(mailbox, MAILBOX_NAME_SIZE - 1, "%d", node_id);
}
static void task_free(void* task)
{
if(task != NULL){
}
}
static void print_finger_table(node_t node)
{
for (int i = 0; i < nb_bits; i++) {
XBT_VERB(
" %3d | %3d", (node->id + powers2[i]) % nb_keys, node->fingers[i].id);
}
XBT_VERB(
"Predecessor: %d", node->pred_id);
}
}
static void set_finger(node_t node, int finger_index, int id)
{
if (id != node->fingers[finger_index].id) {
node->fingers[finger_index].id = id;
get_mailbox(id, node->fingers[finger_index].mailbox);
XBT_DEBUG(
"My new finger #%d is %d", finger_index,
id);
}
}
static void set_predecessor(node_t node, int predecessor_id)
{
if (predecessor_id != node->pred_id) {
node->pred_id = predecessor_id;
if (predecessor_id != -1) {
get_mailbox(predecessor_id, node->pred_mailbox);
}
XBT_DEBUG(
"My new predecessor is %d", predecessor_id);
}
}
static void handle_task(node_t node,
msg_task_t task)
{
char mailbox[MAILBOX_NAME_SIZE];
e_task_type_t
type = task_data->type;
switch (type) {
case TASK_FIND_SUCCESSOR:
XBT_DEBUG(
"Receiving a 'Find Successor' request from %s for id %d",
task_data->issuer_host_name, task_data->request_id);
if (is_in_interval(task_data->request_id, node->id + 1, node->fingers[0].id)) {
task_data->type = TASK_FIND_SUCCESSOR_ANSWER;
task_data->answer_id = node->fingers[0].id;
XBT_DEBUG(
"Sending back a 'Find Successor Answer' to %s (mailbox %s): the successor of %d is %d",
task_data->issuer_host_name, task_data->answer_to, task_data->request_id, task_data->answer_id);
} else {
int closest = closest_preceding_node(node, task_data->request_id);
XBT_DEBUG(
"Forwarding the 'Find Successor' request for id %d to my closest preceding finger %d",
task_data->request_id, closest);
get_mailbox(closest, mailbox);
}
break;
case TASK_GET_PREDECESSOR:
XBT_DEBUG(
"Receiving a 'Get Predecessor' request from %s", task_data->issuer_host_name);
task_data->type = TASK_GET_PREDECESSOR_ANSWER;
task_data->answer_id = node->pred_id;
XBT_DEBUG(
"Sending back a 'Get Predecessor Answer' to %s via mailbox '%s': my predecessor is %d",
task_data->issuer_host_name, task_data->answer_to, task_data->answer_id);
break;
case TASK_NOTIFY:
XBT_DEBUG(
"Receiving a 'Notify' request from %s", task_data->issuer_host_name);
notify(node, task_data->request_id);
task_free(task);
break;
case TASK_PREDECESSOR_LEAVING:
XBT_DEBUG(
"Receiving a 'Predecessor Leaving' message from %s", task_data->issuer_host_name);
set_predecessor(node, task_data->request_id);
task_free(task);
break;
case TASK_SUCCESSOR_LEAVING:
XBT_DEBUG(
"Receiving a 'Successor Leaving' message from %s", task_data->issuer_host_name);
set_finger(node, 0, task_data->request_id);
task_free(task);
break;
case TASK_FIND_SUCCESSOR_ANSWER:
case TASK_GET_PREDECESSOR_ANSWER:
case TASK_PREDECESSOR_ALIVE_ANSWER:
XBT_DEBUG(
"Ignoring unexpected task of type %d (%p)", (
int)type, task);
task_free(task);
break;
case TASK_PREDECESSOR_ALIVE:
XBT_DEBUG(
"Receiving a 'Predecessor Alive' request from %s", task_data->issuer_host_name);
task_data->type = TASK_PREDECESSOR_ALIVE_ANSWER;
XBT_DEBUG(
"Sending back a 'Predecessor Alive Answer' to %s (mailbox %s)",
task_data->issuer_host_name, task_data->answer_to);
break;
default:
}
}
void create(node_t node)
{
set_predecessor(node, -1);
print_finger_table(node);
}
int join(node_t node, int known_id)
{
XBT_INFO(
"Joining the ring with id %d, knowing node %d", node->id, known_id);
set_predecessor(node, -1);
int successor_id = remote_find_successor(node, known_id, node->id);
if (successor_id == -1) {
}
else {
set_finger(node, 0, successor_id);
print_finger_table(node);
}
return successor_id != -1;
}
void leave(node_t node)
{
XBT_DEBUG(
"Well Guys! I Think it's time for me to quit ;)");
quit_notify(node);
}
void quit_notify(node_t node)
{
char mailbox[MAILBOX_NAME_SIZE];
task_data_t req_data =
xbt_new0(s_task_data_t,1);
req_data->type = TASK_PREDECESSOR_LEAVING;
req_data->request_id = node->pred_id;
get_mailbox(node->id, req_data->answer_to);
XBT_DEBUG(
"Sending a 'PREDECESSOR_LEAVING' to my successor %d",node->fingers[0].id);
XBT_DEBUG(
"Timeout expired when sending a 'PREDECESSOR_LEAVING' to my successor %d", node->fingers[0].id);
task_free(task_sent);
}
get_mailbox(node->pred_id, mailbox);
task_data_t req_data_s =
xbt_new0(s_task_data_t,1);
req_data_s->type = TASK_SUCCESSOR_LEAVING;
req_data_s->request_id = node->fingers[0].id;
req_data_s->request_id = node->pred_id;
get_mailbox(node->id, req_data_s->answer_to);
XBT_DEBUG(
"Sending a 'SUCCESSOR_LEAVING' to my predecessor %d",node->pred_id);
XBT_DEBUG(
"Timeout expired when sending a 'SUCCESSOR_LEAVING' to my predecessor %d", node->pred_id);
task_free(task_sent_s);
}
}
int find_successor(node_t node, int id)
{
if (is_in_interval(id, node->id + 1, node->fingers[0].id)) {
return node->fingers[0].id;
}
int closest = closest_preceding_node(node, id);
return remote_find_successor(node, closest, id);
}
int remote_find_successor(node_t node, int ask_to, int id)
{
int successor = -1;
int stop = 0;
char mailbox[MAILBOX_NAME_SIZE];
get_mailbox(ask_to, mailbox);
task_data_t req_data =
xbt_new0(s_task_data_t, 1);
req_data->type = TASK_FIND_SUCCESSOR;
req_data->request_id = id;
get_mailbox(node->id, req_data->answer_to);
XBT_DEBUG(
"Sending a 'Find Successor' request (task %p) to %d for id %d", task_sent, ask_to,
id);
XBT_DEBUG(
"Failed to send the 'Find Successor' request (task %p) to %d for id %d", task_sent, ask_to,
id);
task_free(task_sent);
} else {
XBT_DEBUG(
"Sent a 'Find Successor' request (task %p) to %d for key %d, waiting for the answer",
task_sent, ask_to, id);
do {
if (node->comm_receive == NULL) {
}
XBT_DEBUG(
"Failed to receive the answer to my 'Find Successor' request (task %p): %d",
task_sent, (int)res);
stop = 1;
node->comm_receive = NULL;
}
else {
XBT_DEBUG(
"Received a task (%p)", task_received);
if (task_received != task_sent ||
ans_data->type != TASK_FIND_SUCCESSOR_ANSWER) {
node->comm_receive = NULL;
handle_task(node, task_received);
}
else {
XBT_DEBUG(
"Received the answer to my 'Find Successor' request for id %d (task %p): the successor of key %d is %d",
ans_data->request_id, task_received, id, ans_data->answer_id);
successor = ans_data->answer_id;
stop = 1;
node->comm_receive = NULL;
task_free(task_received);
}
}
} while (!stop);
}
return successor;
}
int remote_get_predecessor(node_t node, int ask_to)
{
int predecessor_id = -1;
int stop = 0;
char mailbox[MAILBOX_NAME_SIZE];
get_mailbox(ask_to, mailbox);
task_data_t req_data =
xbt_new0(s_task_data_t, 1);
req_data->type = TASK_GET_PREDECESSOR;
get_mailbox(node->id, req_data->answer_to);
XBT_DEBUG(
"Sending a 'Get Predecessor' request to %d", ask_to);
XBT_DEBUG(
"Failed to send the 'Get Predecessor' request (task %p) to %d",
task_sent, ask_to);
task_free(task_sent);
}
else {
XBT_DEBUG(
"Sent 'Get Predecessor' request (task %p) to %d, waiting for the answer on my mailbox '%s'",
task_sent, ask_to, req_data->answer_to);
do {
if (node->comm_receive == NULL) {
}
XBT_DEBUG(
"Failed to receive the answer to my 'Get Predecessor' request (task %p): %d",
task_sent, (int)res);
stop = 1;
node->comm_receive = NULL;
}
else {
if (task_received != task_sent ||
ans_data->type != TASK_GET_PREDECESSOR_ANSWER) {
node->comm_receive = NULL;
handle_task(node, task_received);
}
else {
XBT_DEBUG(
"Received the answer to my 'Get Predecessor' request (task %p): the predecessor of node %d is %d",
task_received, ask_to, ans_data->answer_id);
predecessor_id = ans_data->answer_id;
stop = 1;
node->comm_receive = NULL;
task_free(task_received);
}
}
} while (!stop);
}
return predecessor_id;
}
int closest_preceding_node(node_t node, int id)
{
int i;
for (i = nb_bits - 1; i >= 0; i--) {
if (is_in_interval(node->fingers[i].id, node->id + 1, id - 1)) {
return node->fingers[i].id;
}
}
return node->id;
}
void stabilize(node_t node)
{
int candidate_id;
int successor_id = node->fingers[0].id;
if (successor_id != node->id) {
candidate_id = remote_get_predecessor(node, successor_id);
}
else {
candidate_id = node->pred_id;
}
if (candidate_id != -1
&& is_in_interval(candidate_id, node->id + 1, successor_id - 1)) {
set_finger(node, 0, candidate_id);
}
if (successor_id != node->id) {
remote_notify(successor_id, node->id);
}
}
void notify(node_t node, int predecessor_candidate_id) {
if (node->pred_id == -1
|| is_in_interval(predecessor_candidate_id, node->pred_id + 1, node->id - 1)) {
set_predecessor(node, predecessor_candidate_id);
print_finger_table(node);
}
else {
XBT_DEBUG(
"I don't have to change my predecessor to %d", predecessor_candidate_id);
}
}
void remote_notify(int notify_id, int predecessor_candidate_id) {
task_data_t req_data =
xbt_new0(s_task_data_t, 1);
req_data->type = TASK_NOTIFY;
req_data->request_id = predecessor_candidate_id;
XBT_DEBUG(
"Sending a 'Notify' request (task %p) to %d", task, notify_id);
char mailbox[MAILBOX_NAME_SIZE];
get_mailbox(notify_id, mailbox);
}
void fix_fingers(node_t node) {
int i = node->next_finger_to_fix;
int id = find_successor(node, node->id + powers2[i]);
if (id != -1) {
if (id != node->fingers[i].id) {
set_finger(node, i, id);
print_finger_table(node);
}
node->next_finger_to_fix = (i + 1) % nb_bits;
}
}
void check_predecessor(node_t node)
{
XBT_DEBUG(
"Checking whether my predecessor is alive");
if(node->pred_id == -1)
return;
int stop = 0;
char mailbox[MAILBOX_NAME_SIZE];
get_mailbox(node->pred_id, mailbox);
task_data_t req_data =
xbt_new0(s_task_data_t,1);
req_data->type = TASK_PREDECESSOR_ALIVE;
req_data->request_id = node->pred_id;
get_mailbox(node->id, req_data->answer_to);
XBT_DEBUG(
"Sending a 'Predecessor Alive' request to my predecessor %d", node->pred_id);
XBT_DEBUG(
"Failed to send the 'Predecessor Alive' request (task %p) to %d", task_sent, node->pred_id);
task_free(task_sent);
} else {
XBT_DEBUG(
"Sent 'Predecessor Alive' request (task %p) to %d, waiting for the answer on my mailbox '%s'",
task_sent, node->pred_id, req_data->answer_to);
do {
if (node->comm_receive == NULL) {
}
XBT_DEBUG(
"Failed to receive the answer to my 'Predecessor Alive' request (task %p): %d",
task_sent, (int)res);
stop = 1;
node->comm_receive = NULL;
node->pred_id = -1;
} else {
if (task_received != task_sent) {
node->comm_receive = NULL;
handle_task(node, task_received);
}else{
XBT_DEBUG(
"Received the answer to my 'Predecessor Alive' request (task %p) : my predecessor %d is alive",
task_received, node->pred_id);
stop = 1;
node->comm_receive = NULL;
task_free(task_received);
}
}
} while (!stop);
}
}
void random_lookup(node_t node)
{
int random_id = node->fingers[random_index].id;
XBT_DEBUG(
"Making a lookup request for id %d", random_id);
int res = find_successor(node, random_id);
XBT_DEBUG(
"The successor of node %d is %d", random_id, res);
}
{
periodic_stabilize_delay = 8;
periodic_fix_fingers_delay = 8;
periodic_check_predecessor_delay = 8;
}
int i;
int join_success = 0;
double deadline;
double next_stabilize_date = init_time + periodic_stabilize_delay;
double next_fix_fingers_date = init_time + periodic_fix_fingers_delay;
double next_check_predecessor_date = init_time + periodic_check_predecessor_delay;
double next_lookup_date = init_time + periodic_lookup_delay;
xbt_assert(argc == 3 || argc == 5,
"Wrong number of arguments for this node");
s_node_t node = {0};
get_mailbox(node.id, node.mailbox);
node.next_finger_to_fix = 0;
node.fingers =
xbt_new0(s_finger_t, nb_bits);
node.last_change_date = init_time;
for (i = 0; i < nb_bits; i++) {
node.fingers[i].id = -1;
set_finger(&node, i, node.id);
}
if (argc == 3) {
create(&node);
join_success = 1;
} else {
join_success = join(&node, known_id);
}
if (join_success) {
int listen = 0;
int no_op = 0;
while (now < init_time + deadline && now < max_simulation_time) {
if (node.comm_receive == NULL) {
task_received = NULL;
}
if(listen == 0 && (sub_protocol > 0)){
if(sub_protocol == 1)
stabilize(&node);
else if(sub_protocol == 2)
fix_fingers(&node);
else if(sub_protocol == 3)
check_predecessor(&node);
else
random_lookup(&node);
listen = 1;
} else {
no_op = 1;
}
}else{
if (now >= next_stabilize_date) {
stabilize(&node);
next_stabilize_date =
MSG_get_clock() + periodic_stabilize_delay;
}else if (now >= next_fix_fingers_date) {
fix_fingers(&node);
next_fix_fingers_date =
MSG_get_clock() + periodic_fix_fingers_delay;
}else if (now >= next_check_predecessor_date) {
check_predecessor(&node);
next_check_predecessor_date =
MSG_get_clock() + periodic_check_predecessor_delay;
}else if (now >= next_lookup_date) {
random_lookup(&node);
}else {
}
}
} else {
node.comm_receive = NULL;
handle_task(&node, task_received);
else
XBT_DEBUG(
"Failed to receive a task. Nevermind.");
}
}
if (node.comm_receive) {
task_free(task_received);
node.comm_receive = NULL;
}
leave(&node);
}
return 0;
}
int main(
int argc,
char *argv[])
{
xbt_assert(argc > 2,
"Usage: %s [-nb_bits=n] [-timeout=t] platform_file deployment_file\n" "\tExample: %s ../msg_platform.xml chord.xml\n", argv[0], argv[0]);
char **options = &argv[1];
while (!strncmp(options[0], "-", 1)) {
int length = strlen("-nb_bits=");
if (!strncmp(options[0], "-nb_bits=", length) && strlen(options[0]) > length) {
} else {
length = strlen("-timeout=");
if (!strncmp(options[0], "-timeout=", length) && strlen(options[0]) > length) {
} else {
xbt_die(
"Invalid chord option '%s'", options[0]);
}
}
options++;
}
chord_initialize();
chord_exit();
}