Implement a send queue
This commit is contained in:
parent
d406d269da
commit
8e7339e3f1
97
ethermess.py
97
ethermess.py
|
@ -33,6 +33,22 @@ class Peer:
|
||||||
else:
|
else:
|
||||||
return r
|
return r
|
||||||
|
|
||||||
|
send_queue = []
|
||||||
|
ack_failed = {}
|
||||||
|
class Message:
|
||||||
|
def __init__(self, mac, message, queue_id = None, msgid = None):
|
||||||
|
self.mac = mac
|
||||||
|
self.message = message
|
||||||
|
self.queue_id = queue_id
|
||||||
|
self.msgid = msgid
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
r = 'Message(%s, %s, %s, %s)' % (repr(self.mac), repr(self.message), repr(self.queue_id), repr(self.msgid))
|
||||||
|
if __name__ != '__main__':
|
||||||
|
return '%s.%s' % (__name__, r)
|
||||||
|
else:
|
||||||
|
return r
|
||||||
|
|
||||||
def writeall(f, b):
|
def writeall(f, b):
|
||||||
written = 0
|
written = 0
|
||||||
while written < len(b):
|
while written < len(b):
|
||||||
|
@ -80,6 +96,26 @@ def send_message(backend, mac, message):
|
||||||
encoded = message.encode('utf-8')
|
encoded = message.encode('utf-8')
|
||||||
writeall(backend, b'm' + mac + bytes([len(encoded) >> 8, len(encoded) & 0xff]) + encoded)
|
writeall(backend, b'm' + mac + bytes([len(encoded) >> 8, len(encoded) & 0xff]) + encoded)
|
||||||
|
|
||||||
|
def queue_message(backend, mac, message):
|
||||||
|
global send_queue
|
||||||
|
global next_queue_id
|
||||||
|
|
||||||
|
if len(send_queue) == 0:
|
||||||
|
# Nothing being processed atm, send directly
|
||||||
|
send_message(backend, mac, message)
|
||||||
|
send_queue.append(Message(mac, message))
|
||||||
|
|
||||||
|
else:
|
||||||
|
# Enqueue with an ID
|
||||||
|
if send_queue[-1].queue_id is None:
|
||||||
|
next_queue_id = 0
|
||||||
|
else:
|
||||||
|
next_queue_id = send_queue[-1].queue_id + 1
|
||||||
|
|
||||||
|
send_queue.append(Message(mac, message, next_queue_id))
|
||||||
|
|
||||||
|
print('--- Queued (%i)' % next_queue_id)
|
||||||
|
|
||||||
def send_status_request(backend, mac):
|
def send_status_request(backend, mac):
|
||||||
writeall(backend, b'r' + mac)
|
writeall(backend, b'r' + mac)
|
||||||
|
|
||||||
|
@ -98,13 +134,13 @@ def handle_user_command(backend, line):
|
||||||
if default_target_mac is None:
|
if default_target_mac is None:
|
||||||
print('--- Default target not set, set with /target')
|
print('--- Default target not set, set with /target')
|
||||||
else:
|
else:
|
||||||
send_message(backend, default_target_mac, rest)
|
queue_message(backend, default_target_mac, rest)
|
||||||
|
|
||||||
elif command == '/msg':
|
elif command == '/msg':
|
||||||
# Send message to target
|
# Send message to target
|
||||||
mac_str, _, message = rest.partition(' ')
|
mac_str, _, message = rest.partition(' ')
|
||||||
mac = parse_mac(mac_str)
|
mac = parse_mac(mac_str)
|
||||||
send_message(backend, mac, message)
|
queue_message(backend, mac, message)
|
||||||
|
|
||||||
elif command == '/status':
|
elif command == '/status':
|
||||||
# Request status
|
# Request status
|
||||||
|
@ -135,7 +171,7 @@ def handle_user_command(backend, line):
|
||||||
if default_target_mac is None:
|
if default_target_mac is None:
|
||||||
print('--- Default target not set, set with /target')
|
print('--- Default target not set, set with /target')
|
||||||
else:
|
else:
|
||||||
send_message(backend, default_target_mac, line)
|
queue_message(backend, default_target_mac, line)
|
||||||
|
|
||||||
def handle_status(mac, status, nick):
|
def handle_status(mac, status, nick):
|
||||||
global peers
|
global peers
|
||||||
|
@ -163,11 +199,11 @@ def handle_status(mac, status, nick):
|
||||||
|
|
||||||
peers[mac].status = status
|
peers[mac].status = status
|
||||||
|
|
||||||
def handle_message(mac, message):
|
def nick_from_mac(mac):
|
||||||
global peers
|
global peers
|
||||||
|
|
||||||
if mac not in peers:
|
if mac not in peers:
|
||||||
nick = format_mac(mac)
|
return format_mac(mac)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
nick = peers[mac].nick
|
nick = peers[mac].nick
|
||||||
|
@ -184,16 +220,19 @@ def handle_message(mac, message):
|
||||||
|
|
||||||
if unique:
|
if unique:
|
||||||
# Unique nicks: ~nick
|
# Unique nicks: ~nick
|
||||||
nick = '~' + nick
|
return '~' + nick
|
||||||
else:
|
else:
|
||||||
# Non-unique nicks: [MAC]~nick
|
# Non-unique nicks: [MAC]~nick
|
||||||
nick = '[%s]~%s' % (format_mac(mac), nick)
|
return '[%s]~%s' % (format_mac(mac), nick)
|
||||||
|
|
||||||
|
def handle_message(mac, message):
|
||||||
|
nick = nick_from_mac(mac)
|
||||||
for line in message.split('\n'):
|
for line in message.split('\n'):
|
||||||
print('<%s> %s' % (nick, line))
|
print('<%s> %s' % (nick, line))
|
||||||
|
|
||||||
def eventloop(proc):
|
def eventloop(proc):
|
||||||
global peers
|
global peers
|
||||||
|
global send_queue, ack_failed
|
||||||
|
|
||||||
# Create unbuffered version of stdin and close the old one as we
|
# Create unbuffered version of stdin and close the old one as we
|
||||||
# won't need it anymore
|
# won't need it anymore
|
||||||
|
@ -249,23 +288,59 @@ def eventloop(proc):
|
||||||
elif event_type == b'i':
|
elif event_type == b'i':
|
||||||
# Msgid for message
|
# Msgid for message
|
||||||
msgid = readall_u16(proc.stdout)
|
msgid = readall_u16(proc.stdout)
|
||||||
print('(msgid: %i)' % msgid) #debg
|
send_queue[0].msgid = msgid
|
||||||
|
|
||||||
elif event_type == b'I':
|
elif event_type == b'I':
|
||||||
# Failed to get msgid for message
|
# Failed to get msgid for message
|
||||||
print('(msgid fail)') #debg
|
message = send_queue.pop(0)
|
||||||
|
nick = nick_from_mac(message.mac)
|
||||||
|
if message.queue_id is not None:
|
||||||
|
print('--- Failed to send to %s (%i)' % (nick, message.queue_id))
|
||||||
|
else:
|
||||||
|
print('--- Failed to send to %s' % nick)
|
||||||
|
|
||||||
|
# Send next message if there is one queued
|
||||||
|
if len(send_queue) > 0:
|
||||||
|
send_message(proc.stdin, send_queue[0].mac, send_queue[0].message)
|
||||||
|
|
||||||
elif event_type == b'a':
|
elif event_type == b'a':
|
||||||
# ACK received
|
# ACK received
|
||||||
source_mac = readall(proc.stdout, 6)
|
source_mac = readall(proc.stdout, 6)
|
||||||
msgid = readall_u16(proc.stdout)
|
msgid = readall_u16(proc.stdout)
|
||||||
print('(ack: %s %i)' % (format_mac(source_mac), msgid)) #debg
|
|
||||||
|
|
||||||
peers[source_mac].lastseen = time.monotonic()
|
peers[source_mac].lastseen = time.monotonic()
|
||||||
|
|
||||||
|
# Was it for a message currently waiting?
|
||||||
|
if len(send_queue) > 0 and send_queue[0].msgid == msgid:
|
||||||
|
# Yes, drop is from the queue
|
||||||
|
send_queue.pop(0)
|
||||||
|
|
||||||
|
# Send next message if there is one queued
|
||||||
|
if len(send_queue) > 0:
|
||||||
|
send_message(proc.stdin, send_queue[0].mac, send_queue[0].message)
|
||||||
|
|
||||||
|
elif msgid in ack_failed:
|
||||||
|
# No, but it was one we thought to have failed
|
||||||
|
message = ack_failed[msgid]
|
||||||
|
del ack_failed[msgid]
|
||||||
|
nick = nick_from_mac(message.mac)
|
||||||
|
for line in message.message.split('\n'):
|
||||||
|
print('--- %s acknowledged receive: %s' % (nick, line))
|
||||||
|
|
||||||
elif event_type == b'A':
|
elif event_type == b'A':
|
||||||
# ACK not received (and message send failed)
|
# ACK not received (and message send failed)
|
||||||
print('(ack failed)') #debg
|
# Add it to the messages where ack failed
|
||||||
|
message = send_queue.pop(0)
|
||||||
|
ack_failed[message.msgid] = message
|
||||||
|
nick = nick_from_mac(message.mac)
|
||||||
|
if message.queue_id is not None:
|
||||||
|
print('--- Failed to send to %s (%i)' % (nick, message.queue_id))
|
||||||
|
else:
|
||||||
|
print('--- Failed to send to %s' % nick)
|
||||||
|
|
||||||
|
# Send next message if there is one queued
|
||||||
|
if len(send_queue) > 0:
|
||||||
|
send_message(proc.stdin, send_queue[0].mac, send_queue[0].message)
|
||||||
|
|
||||||
elif event_type == b'm':
|
elif event_type == b'm':
|
||||||
# Message received
|
# Message received
|
||||||
|
|
Loading…
Reference in New Issue