Rewrite ethermess.py to use multiple threads

This commit is contained in:
Juhani Krekelä 2019-07-13 01:02:13 +03:00
parent ab454ca2ee
commit 38f847aac1
1 changed files with 151 additions and 25 deletions

View File

@ -1,10 +1,73 @@
#!/usr/bin/env python3
libexec_dir = __LIBEXECDIR__
import select
import socket
import subprocess
import sys
import threading
import time
class Channel:
"""An asynchronic communication channel that can be used to send python object and can be poll()ed."""
def __init__(self):
# We use a socket to enable polling and blocking reads
self.write_socket, self.read_socket = socket.socketpair()
self.poll = select.poll()
self.poll.register(self.read_socket, select.POLLIN)
# Store messages in a list
self.messages = []
self.messages_lock = threading.Lock()
def send(self, message):
# Add message to the list of messages and write to the write socket to signal there's data to read
with self.messages_lock:
self.write_socket.sendall(b'!')
self.messages.append(message)
def recv(self, blocking = True):
# Timeout of -1 will make poll wait until data is available
# Timeout of 0 will make poll exit immediately if there's no data
if blocking:
timeout = -1
else:
timeout = 0
# See if there is data to read / wait until there is
results = self.poll.poll(timeout)
# None of the sockets were ready. This can only happen if we weren't blocking
# Return None to signal lack of data
if len(results) == 0:
assert not blocking
return None
# Remove first message from the list (FIFO principle), and read one byte from the socket
# This keeps the number of available messages and the number of bytes readable in the socket in sync
with self.messages_lock:
message = self.messages.pop(0)
self.read_socket.recv(1)
return message
def fileno(self):
# Allows for a Channel object to be passed directly to poll()
return self.read_socket.fileno()
def close(self):
# Close the file descriptors, so that we aren't leaking them
self.write_socket.close()
self.read_socket.close()
# Support with-statements
def __enter__(self):
return self
def __exit__(self, exception_type, exception_value, traceback):
self.close()
def writeall(f, b):
written = 0
while written < len(b):
@ -22,40 +85,103 @@ def parse_mac(text):
return parsed
nick = input('nick> ').encode('utf-8')
class PollBasedThread(threading.Thread):
def run(self):
poll = select.poll()
for f in self.pollin:
poll.register(f, select.POLLIN)
proc = subprocess.Popen(['sudo', libexec_dir + '/ethermess-backend', *sys.argv[1:]], stdin = subprocess.PIPE, stdout = sys.stdout, stderr = sys.stderr, bufsize = 0)
self.initialize()
writeall(proc.stdin, (bytes([0, len(nick)]) + nick))
running = True
while running:
for fd, event in poll.poll():
if self.poll_loop(fd, event) != None:
running = False
print('s - request status, i - request msgid, m - send message, ^D - quit')
self.finalize()
while True:
try:
try:
command = input('')
def initialize(self):
...
if command == 's':
mac = parse_mac(input('mac> '))
writeall(proc.stdin, b's' + mac)
def poll_loop(self, fd, event):
...
elif command == 'i':
mac = parse_mac(input('mac> '))
writeall(proc.stdin, b'i' + mac)
def finalize(self):
...
elif command == 'm':
mac = parse_mac(input('mac> '))
message = input('message> ').encode('utf-8')
writeall(proc.stdin, b'm' + mac + bytes([len(message) >> 8, len(message) & 0xff]) + message)
class Backend(PollBasedThread):
def __init__(self, writes_channel, control_channel):
self.writes_channel = writes_channel
self.control_channel = control_channel
self.pollin = [self.writes_channel, self.control_channel]
super().__init__()
def initialize(self):
self.proc = subprocess.Popen(['sudo', libexec_dir + '/ethermess-backend', *sys.argv[1:]], stdin = subprocess.PIPE, stdout = sys.stdout, stderr = sys.stderr, bufsize = 0)
def poll_loop(self, fd, event):
if fd == self.writes_channel.fileno() and event & select.POLLIN:
data = self.writes_channel.recv()
writeall(self.proc.stdin, data)
elif fd == self.control_channel.fileno() and event & select.POLLIN:
command = self.control_channel.recv()
if command == 'quit':
return 'quit'
else:
print('s - request status, i - request msgid, m - send message, ^D - quit')
raise Exception('Unreachable')
except EOFError:
writeall(proc.stdin, b'q')
break
else:
raise Exception('Unreachable')
except Exception as err:
print(err)
def finalize(self):
self.proc.wait()
sys.exit(proc.wait())
class Input(threading.Thread):
def __init__(self, writes_channel, control_channel):
self.writes_channel = writes_channel
self.control_channel = control_channel
super().__init__()
def run(self):
print('s - request status, i - request msgid, m - send message, ^D - quit')
while True:
try:
try:
command = input('')
if command == 's':
mac = parse_mac(input('mac> '))
self.writes_channel.send(b's' + mac)
elif command == 'i':
mac = parse_mac(input('mac> '))
self.writes_channel.send(b'i' + mac)
elif command == 'm':
mac = parse_mac(input('mac> '))
message = input('message> ').encode('utf-8')
self.writes_channel.send(b'm' + mac + bytes([len(message) >> 8, len(message) & 0xff]) + message)
else:
print('s - request status, i - request msgid, m - send message, ^D - quit')
except EOFError:
self.writes_channel.send(b'q')
self.control_channel.send('quit')
break
except Exception as err:
print(err)
writes_channel = Channel()
control_channel = Channel()
nick = input('nick> ').encode('utf-8')
writes_channel.send((bytes([0, len(nick)]) + nick))
Backend(writes_channel, control_channel).start()
Input(writes_channel, control_channel).start()