diff --git a/ethermess.py b/ethermess.py index 58dac16..cfcec78 100644 --- a/ethermess.py +++ b/ethermess.py @@ -1,10 +1,73 @@ #!/usr/bin/env python3 libexec_dir = __LIBEXECDIR__ +import select +import socket import subprocess import sys +import threading import time +class Channel: + """An asynchronic communication channel that can be used to send python object and can be poll()ed.""" + + def __init__(self): + # We use a socket to enable polling and blocking reads + self.write_socket, self.read_socket = socket.socketpair() + self.poll = select.poll() + self.poll.register(self.read_socket, select.POLLIN) + + # Store messages in a list + self.messages = [] + self.messages_lock = threading.Lock() + + def send(self, message): + # Add message to the list of messages and write to the write socket to signal there's data to read + with self.messages_lock: + self.write_socket.sendall(b'!') + self.messages.append(message) + + def recv(self, blocking = True): + # Timeout of -1 will make poll wait until data is available + # Timeout of 0 will make poll exit immediately if there's no data + if blocking: + timeout = -1 + else: + timeout = 0 + + # See if there is data to read / wait until there is + results = self.poll.poll(timeout) + + # None of the sockets were ready. This can only happen if we weren't blocking + # Return None to signal lack of data + if len(results) == 0: + assert not blocking + return None + + # Remove first message from the list (FIFO principle), and read one byte from the socket + # This keeps the number of available messages and the number of bytes readable in the socket in sync + with self.messages_lock: + message = self.messages.pop(0) + self.read_socket.recv(1) + + return message + + def fileno(self): + # Allows for a Channel object to be passed directly to poll() + return self.read_socket.fileno() + + def close(self): + # Close the file descriptors, so that we aren't leaking them + self.write_socket.close() + self.read_socket.close() + + # Support with-statements + def __enter__(self): + return self + + def __exit__(self, exception_type, exception_value, traceback): + self.close() + def writeall(f, b): written = 0 while written < len(b): @@ -22,40 +85,103 @@ def parse_mac(text): return parsed -nick = input('nick> ').encode('utf-8') +class PollBasedThread(threading.Thread): + def run(self): + poll = select.poll() + for f in self.pollin: + poll.register(f, select.POLLIN) -proc = subprocess.Popen(['sudo', libexec_dir + '/ethermess-backend', *sys.argv[1:]], stdin = subprocess.PIPE, stdout = sys.stdout, stderr = sys.stderr, bufsize = 0) + self.initialize() -writeall(proc.stdin, (bytes([0, len(nick)]) + nick)) + running = True + while running: + for fd, event in poll.poll(): + if self.poll_loop(fd, event) != None: + running = False -print('s - request status, i - request msgid, m - send message, ^D - quit') + self.finalize() -while True: - try: - try: - command = input('') + def initialize(self): + ... - if command == 's': - mac = parse_mac(input('mac> ')) - writeall(proc.stdin, b's' + mac) + def poll_loop(self, fd, event): + ... - elif command == 'i': - mac = parse_mac(input('mac> ')) - writeall(proc.stdin, b'i' + mac) + def finalize(self): + ... - elif command == 'm': - mac = parse_mac(input('mac> ')) - message = input('message> ').encode('utf-8') - writeall(proc.stdin, b'm' + mac + bytes([len(message) >> 8, len(message) & 0xff]) + message) +class Backend(PollBasedThread): + def __init__(self, writes_channel, control_channel): + self.writes_channel = writes_channel + self.control_channel = control_channel + self.pollin = [self.writes_channel, self.control_channel] + super().__init__() + + def initialize(self): + self.proc = subprocess.Popen(['sudo', libexec_dir + '/ethermess-backend', *sys.argv[1:]], stdin = subprocess.PIPE, stdout = sys.stdout, stderr = sys.stderr, bufsize = 0) + + def poll_loop(self, fd, event): + if fd == self.writes_channel.fileno() and event & select.POLLIN: + data = self.writes_channel.recv() + writeall(self.proc.stdin, data) + + elif fd == self.control_channel.fileno() and event & select.POLLIN: + command = self.control_channel.recv() + if command == 'quit': + return 'quit' else: - print('s - request status, i - request msgid, m - send message, ^D - quit') + raise Exception('Unreachable') - except EOFError: - writeall(proc.stdin, b'q') - break + else: + raise Exception('Unreachable') - except Exception as err: - print(err) + def finalize(self): + self.proc.wait() -sys.exit(proc.wait()) +class Input(threading.Thread): + def __init__(self, writes_channel, control_channel): + self.writes_channel = writes_channel + self.control_channel = control_channel + super().__init__() + + def run(self): + print('s - request status, i - request msgid, m - send message, ^D - quit') + + while True: + try: + try: + command = input('') + + if command == 's': + mac = parse_mac(input('mac> ')) + self.writes_channel.send(b's' + mac) + + elif command == 'i': + mac = parse_mac(input('mac> ')) + self.writes_channel.send(b'i' + mac) + + elif command == 'm': + mac = parse_mac(input('mac> ')) + message = input('message> ').encode('utf-8') + self.writes_channel.send(b'm' + mac + bytes([len(message) >> 8, len(message) & 0xff]) + message) + + else: + print('s - request status, i - request msgid, m - send message, ^D - quit') + + except EOFError: + self.writes_channel.send(b'q') + self.control_channel.send('quit') + break + + except Exception as err: + print(err) + +writes_channel = Channel() +control_channel = Channel() + +nick = input('nick> ').encode('utf-8') +writes_channel.send((bytes([0, len(nick)]) + nick)) + +Backend(writes_channel, control_channel).start() +Input(writes_channel, control_channel).start()