Fully implement sending

This commit is contained in:
Juhani Krekelä 2019-07-10 16:22:02 +03:00
parent dab1565f31
commit e7024d853b
1 changed files with 100 additions and 15 deletions

View File

@ -24,6 +24,9 @@
#define EM_PROTOCOL_VERSION 0
#define EM_MESSAGE_MAX_LENGTH (1500 - 2 - 2 - 2)
#define EM_STATUS_BROADCAST_TIME (5 * 60 * 1000 + 1000 * random_byte() / 64)
#define EM_RETRANSMIT_TIME (1000 + random_byte() * 2)
#define EM_MAX_RETRANSMIT 5
#define EMT_SPEAK_VERSION 0
#define EMT_STATUS_REQUEST 1
@ -53,12 +56,14 @@ unsigned char own_nick[256];
unsigned char own_nick_length = 0;
struct timespec next_status_broadcast;
enum message_send_states {IDLE, QUEUED, SENDING};
enum message_send_states {IDLE, QUEUED, WAITING_MSGID, SENDING, WAITING_ACK};
enum message_send_states own_message_send_state = IDLE;
unsigned char own_message_destination_mac[6];
unsigned char own_message[EM_MESSAGE_MAX_LENGTH];
size_t own_message_length = 0;
uint16_t own_message_msgid = 0;
unsigned char retransmission_count = 0;
struct timespec next_retransmission;
struct msgid_cache_entry {
unsigned char other_mac[6];
@ -126,7 +131,7 @@ struct timespec ms_in_future(intmax_t ms) {
#endif
ts.tv_nsec += (ms % 1000) * 1000 * 1000;
if (ts.tv_nsec >= 1000 * 1000 * 1000) {
ts.tv_sec += -1;
ts.tv_sec += 1;
ts.tv_nsec -= 1000 * 1000 * 1000;
}
@ -447,8 +452,18 @@ void handle_msgid(const unsigned char source_mac[6], const unsigned char *data,
cache_index = msgid_cache_add(source_mac);
}
msgid_cache[cache_index].next_send = msgid;
msgid_cache[cache_index].know_send = true;
if (msgid_cache[cache_index].know_send) {
// There is sth in the cache, test whether we should update it
uint16_t diff = msgid - msgid_cache[cache_index].next_send;
if (diff < 0x8000) {
// See the description in handle_message for what is going on
msgid_cache[cache_index].next_send = msgid;
}
} else {
// Nothing in the cache
msgid_cache[cache_index].next_send = msgid;
msgid_cache[cache_index].know_send = true;
}
char mac[18];
format_mac(source_mac, mac);
@ -505,7 +520,8 @@ void handle_message(const unsigned char source_mac[6], const unsigned char *data
msgid_cache[cache_index].know_receive = true;
}
if (msgid_cache[cache_index].next_receive - msgid < 0x8000) {
uint16_t diff = msgid - msgid_cache[cache_index].next_receive;
if (diff < 0x8000) {
// The msgid counter can wrap around, so a simple larger than
// comparison will not work. Instead, we look at the distance
// between the message's msgid and the one we're expecting to
@ -571,7 +587,7 @@ void handle_ack(const unsigned char source_mac[6], const unsigned char *data, si
err(1, "printf");
}
if (own_message_send_state == SENDING && msgid == own_message_msgid) {
if (own_message_send_state == WAITING_ACK && msgid == own_message_msgid) {
own_message_send_state = IDLE;
}
}
@ -669,6 +685,8 @@ void eventloop(void) {
pollfds[1].events = POLLIN;
while (running) {
int retransmit_wait_ms = INT_MAX;
// (Attempt) to process a message send
if (own_message_send_state == QUEUED) {
// We need to have the correct msgid to be able to send
@ -677,8 +695,11 @@ void eventloop(void) {
if (cache_index == -1 || !msgid_cache[cache_index].know_send) {
// We don't know what the msgid should be
// -> ask the other side
// TODO: Implement a timer
send_msgid_request(own_message_destination_mac);
// Wait around 1 to 1.5s before asking again
next_retransmission = ms_in_future(EM_RETRANSMIT_TIME);
retransmission_count = 0;
own_message_send_state = WAITING_MSGID;
} else {
// It is in the cache
own_message_msgid = msgid_cache[cache_index].next_send++;
@ -686,18 +707,82 @@ void eventloop(void) {
}
}
if (own_message_send_state == SENDING) {
send_message();
if (own_message_send_state == WAITING_MSGID) {
ssize_t cache_index = msgid_cache_lookup(own_message_destination_mac);
if (cache_index == -1 || !msgid_cache[cache_index].know_send) {
// Still no msgid
retransmit_wait_ms = wait_ms_until(next_retransmission);
if (retransmit_wait_ms == 0 && retransmission_count < EM_MAX_RETRANSMIT) {
// Time to resend
send_msgid_request(own_message_destination_mac);
// Wait around 1 to 1.5s before asking again
next_retransmission = ms_in_future(EM_RETRANSMIT_TIME);
retransmission_count++;
} else if (retransmit_wait_ms == 0 && retransmission_count >= EM_MAX_RETRANSMIT) {
// Time to give up
own_message_send_state = IDLE;
if (printf("Could not get msgid for message\n") == -1) {
err(1, "printf");
}
if (fflush(stdout) == EOF) {
err(1, "fflush");
}
}
} else {
// Found msgid
own_message_msgid = msgid_cache[cache_index].next_send++;
own_message_send_state = SENDING;
}
}
// Figure out how many ms to wait
int wait_ms = wait_ms_until(next_status_broadcast);
if (wait_ms == 0) {
if (own_message_send_state == SENDING) {
send_message();
// Wait around 1 to 1.5 before sending again
next_retransmission = ms_in_future(EM_RETRANSMIT_TIME);
retransmission_count = 0;
own_message_send_state = WAITING_ACK;
}
if (own_message_send_state == WAITING_ACK) {
retransmit_wait_ms = wait_ms_until(next_retransmission);
if (retransmit_wait_ms == 0 && retransmission_count < EM_MAX_RETRANSMIT) {
// Time to resend
send_message();
// Wait around 1 to 1.5 before sending again
next_retransmission = ms_in_future(EM_RETRANSMIT_TIME);
retransmission_count++;
} else if (retransmit_wait_ms == 0 && retransmission_count >= EM_MAX_RETRANSMIT) {
// Time to give up
own_message_send_state = IDLE;
if (printf("Could not send message %" PRIu16 "\n", own_message_msgid) == -1) {
err(1, "printf");
}
if (fflush(stdout) == EOF) {
err(1, "fflush");
}
}
}
// Process status broadcasting
int status_broadcast_wait_ms = wait_ms_until(next_status_broadcast);
if (status_broadcast_wait_ms == 0) {
// The time has come to send the status broadcast
send_status(broadcast_mac);
// Do next one in about 5 minutes
next_status_broadcast = ms_in_future(5 * 60 * 1000 + 1000 * random_byte() / 64);
wait_ms = wait_ms_until(next_status_broadcast);
next_status_broadcast = ms_in_future(EM_STATUS_BROADCAST_TIME);
}
// Figure out how many ms to wait
int wait_ms;
if (retransmit_wait_ms < status_broadcast_wait_ms) {
wait_ms = retransmit_wait_ms;
} else {
wait_ms = status_broadcast_wait_ms;
}
int ready = poll(pollfds, sizeof(pollfds) / sizeof(*pollfds), wait_ms);
@ -802,7 +887,7 @@ int main(int argc, char **argv) {
send_status(broadcast_mac);
// Schedule next broadcast of our status about 5 min in the future
next_status_broadcast = ms_in_future(5 * 60 * 1000 + 1000 * random_byte() / 64);
next_status_broadcast = ms_in_future(EM_STATUS_BROADCAST_TIME);
// Request status from everyone, so that we can get an idea of who is on the network
send_status_request(broadcast_mac);