From 8e7339e3f123a2e55f417f2c28516fe016e5f112 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juhani=20Krekel=C3=A4?= Date: Mon, 15 Jul 2019 20:02:04 +0300 Subject: [PATCH] Implement a send queue --- ethermess.py | 97 ++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 86 insertions(+), 11 deletions(-) diff --git a/ethermess.py b/ethermess.py index 808b9c5..e7a53a6 100644 --- a/ethermess.py +++ b/ethermess.py @@ -33,6 +33,22 @@ class Peer: else: return r +send_queue = [] +ack_failed = {} +class Message: + def __init__(self, mac, message, queue_id = None, msgid = None): + self.mac = mac + self.message = message + self.queue_id = queue_id + self.msgid = msgid + + def __repr__(self): + r = 'Message(%s, %s, %s, %s)' % (repr(self.mac), repr(self.message), repr(self.queue_id), repr(self.msgid)) + if __name__ != '__main__': + return '%s.%s' % (__name__, r) + else: + return r + def writeall(f, b): written = 0 while written < len(b): @@ -80,6 +96,26 @@ def send_message(backend, mac, message): encoded = message.encode('utf-8') writeall(backend, b'm' + mac + bytes([len(encoded) >> 8, len(encoded) & 0xff]) + encoded) +def queue_message(backend, mac, message): + global send_queue + global next_queue_id + + if len(send_queue) == 0: + # Nothing being processed atm, send directly + send_message(backend, mac, message) + send_queue.append(Message(mac, message)) + + else: + # Enqueue with an ID + if send_queue[-1].queue_id is None: + next_queue_id = 0 + else: + next_queue_id = send_queue[-1].queue_id + 1 + + send_queue.append(Message(mac, message, next_queue_id)) + + print('--- Queued (%i)' % next_queue_id) + def send_status_request(backend, mac): writeall(backend, b'r' + mac) @@ -98,13 +134,13 @@ def handle_user_command(backend, line): if default_target_mac is None: print('--- Default target not set, set with /target') else: - send_message(backend, default_target_mac, rest) + queue_message(backend, default_target_mac, rest) elif command == '/msg': # Send message to target mac_str, _, message = rest.partition(' ') mac = parse_mac(mac_str) - send_message(backend, mac, message) + queue_message(backend, mac, message) elif command == '/status': # Request status @@ -135,7 +171,7 @@ def handle_user_command(backend, line): if default_target_mac is None: print('--- Default target not set, set with /target') else: - send_message(backend, default_target_mac, line) + queue_message(backend, default_target_mac, line) def handle_status(mac, status, nick): global peers @@ -163,11 +199,11 @@ def handle_status(mac, status, nick): peers[mac].status = status -def handle_message(mac, message): +def nick_from_mac(mac): global peers if mac not in peers: - nick = format_mac(mac) + return format_mac(mac) else: nick = peers[mac].nick @@ -184,16 +220,19 @@ def handle_message(mac, message): if unique: # Unique nicks: ~nick - nick = '~' + nick + return '~' + nick else: # Non-unique nicks: [MAC]~nick - nick = '[%s]~%s' % (format_mac(mac), nick) + return '[%s]~%s' % (format_mac(mac), nick) +def handle_message(mac, message): + nick = nick_from_mac(mac) for line in message.split('\n'): print('<%s> %s' % (nick, line)) def eventloop(proc): global peers + global send_queue, ack_failed # Create unbuffered version of stdin and close the old one as we # won't need it anymore @@ -249,23 +288,59 @@ def eventloop(proc): elif event_type == b'i': # Msgid for message msgid = readall_u16(proc.stdout) - print('(msgid: %i)' % msgid) #debg + send_queue[0].msgid = msgid elif event_type == b'I': # Failed to get msgid for message - print('(msgid fail)') #debg + message = send_queue.pop(0) + nick = nick_from_mac(message.mac) + if message.queue_id is not None: + print('--- Failed to send to %s (%i)' % (nick, message.queue_id)) + else: + print('--- Failed to send to %s' % nick) + + # Send next message if there is one queued + if len(send_queue) > 0: + send_message(proc.stdin, send_queue[0].mac, send_queue[0].message) elif event_type == b'a': # ACK received source_mac = readall(proc.stdout, 6) msgid = readall_u16(proc.stdout) - print('(ack: %s %i)' % (format_mac(source_mac), msgid)) #debg peers[source_mac].lastseen = time.monotonic() + # Was it for a message currently waiting? + if len(send_queue) > 0 and send_queue[0].msgid == msgid: + # Yes, drop is from the queue + send_queue.pop(0) + + # Send next message if there is one queued + if len(send_queue) > 0: + send_message(proc.stdin, send_queue[0].mac, send_queue[0].message) + + elif msgid in ack_failed: + # No, but it was one we thought to have failed + message = ack_failed[msgid] + del ack_failed[msgid] + nick = nick_from_mac(message.mac) + for line in message.message.split('\n'): + print('--- %s acknowledged receive: %s' % (nick, line)) + elif event_type == b'A': # ACK not received (and message send failed) - print('(ack failed)') #debg + # Add it to the messages where ack failed + message = send_queue.pop(0) + ack_failed[message.msgid] = message + nick = nick_from_mac(message.mac) + if message.queue_id is not None: + print('--- Failed to send to %s (%i)' % (nick, message.queue_id)) + else: + print('--- Failed to send to %s' % nick) + + # Send next message if there is one queued + if len(send_queue) > 0: + send_message(proc.stdin, send_queue[0].mac, send_queue[0].message) elif event_type == b'm': # Message received