From 0657f423f3081606bf3d509692618ffab4d1b814 Mon Sep 17 00:00:00 2001 From: Juhani Haverinen Date: Tue, 5 Sep 2017 10:55:33 +0300 Subject: [PATCH] Use channels for internal communication --- channel.py | 51 +++++++++++++++++++++++++++++++++++++ ircbot.py | 75 +++++++++++++++++++++++++++--------------------------- 2 files changed, 88 insertions(+), 38 deletions(-) create mode 100644 channel.py diff --git a/channel.py b/channel.py new file mode 100644 index 0000000..4d15dc1 --- /dev/null +++ b/channel.py @@ -0,0 +1,51 @@ +import select +import socket +import threading + +class Channel: + """An asynchronic communication channel that can be used to send python object and can be poll()ed.""" + + def __init__(self): + # We use a socket to enable polling and blocking reads + self.write_socket, self.read_socket = socket.socketpair() + self.poll = select.poll() + self.poll.register(self.read_socket, select.POLLIN) + + # Store messages in a list + self.mesages = [] + self.messages_lock = threading.Lock() + + def send(self, message): + # Add message to the list of messages and write to the write socket to signal there's data to read + with self.messages_lock: + self.write_socket.sendall(b'!') + self.mesages.append(message) + + def recv(self, blocking = True): + # Timeout of -1 will make poll wait until data is available + # Timeout of 0 will make poll exit immediately if there's no data + if blocking: + timeout = -1 + else: + timeout = 0 + + # See if there is data to read / wait until there is + results = self.poll.poll(timeout) + + # None of the sockets were ready. This can only happen if we weren't blocking + # Return None to signal lack of data + if len(results) == 0: + assert not blocking + return None + + # Remove first message from the list (FIFO principle), and read one byte from the socket + # This keeps the number of available messages and the number of bytes readable in the socket in sync + with self.messages_lock: + message = self.mesages.pop(0) + self.read_socket.recv(1) + + return message + + def fileno(self): + # Allows for a Channel object to be passed directly to poll() + return self.read_socket.fileno() diff --git a/ircbot.py b/ircbot.py index abaa600..5d38f9b 100644 --- a/ircbot.py +++ b/ircbot.py @@ -4,14 +4,17 @@ import socket import threading from collections import namedtuple +import channel + Server = namedtuple('Server', ['host', 'port']) # ServerThread(server, control_socket) # Creates a new server main loop thread class ServerThread(threading.Thread): - def __init__(self, server, control_socket): + def __init__(self, server, control_channel, logging_channel): self.server = server - self.control_socket = control_socket + self.control_channel = control_channel + self.logging_channel = logging_channel self.server_socket_write_lock = threading.Lock() @@ -24,23 +27,22 @@ class ServerThread(threading.Thread): with self.server_socket_write_lock: self.server_socket.sendall(line + b'\r\n') - # FIXME: print is not thread safe - print('>' + line.decode(encoding = 'utf-8', errors = 'replace')) + # FIXME: use a real data structure + self.logging_channel.send('>' + line.decode(encoding = 'utf-8', errors = 'replace')) def handle_line(self, line): # TODO: implement line handling - # FIXME: print is not thread safe - print('<' + line.decode(encoding = 'utf-8', errors = 'replace')) + # FIXME: use a real data structure + self.logging_channel.send('<' + line.decode(encoding = 'utf-8', errors = 'replace')) def mainloop(self): - # Register both the server and the control socket to our polling object + # Register both the server socket and the control channel to or polling object poll = select.poll() poll.register(self.server_socket, select.POLLIN) - poll.register(self.control_socket, select.POLLIN) + poll.register(self.control_channel, select.POLLIN) - # Keep buffers for input and output + # Keep buffer for input server_input_buffer = bytearray() - control_input_buffer = bytearray() quitting = False while not quitting: @@ -62,15 +64,21 @@ class ServerThread(threading.Thread): self.handle_line(line) # Control - elif fd == self.control_socket.fileno(): - # Read into buffer and handle full commands - data = self.control_socket.recv(1024) - control_input_buffer.extend(data) + elif fd == self.control_channel.fileno(): + command = self.control_channel.recv() - # TODO: implement command handling - if len(control_input_buffer) > 1: + # FIXME: use a real data structure + if command == 'q': quitting = True + elif len(command) > 0 and command[0] == '/': + irc_command, space, arguments = command[1:].encode('utf-8').partition(b' ') + line = irc_command.upper() + space + arguments + self.send_line_raw(line) + + else: + self.logging_channel.send('?') + else: assert False #unreachable @@ -81,8 +89,8 @@ class ServerThread(threading.Thread): self.server_socket = socket.create_connection(address) except ConnectionRefusedError: # Tell controller we failed - self.control_socket.sendall(b'F') - self.control_socket.close() + self.logging_channel.send('f') + return # Run initialization # TODO: read nick/username/etc. from a config @@ -97,37 +105,28 @@ class ServerThread(threading.Thread): self.server_socket.close() # Tell controller we're quiting - self.control_socket.sendall(b'Q' + b'\n') - self.control_socket.close() + self.logging_channel.send('q') -# spawn_serverthread(server) → control_socket -# Creates a ServerThread for given server and returns the socket for controlling it +# spawn_serverthread(server) → control_channel, logging_channel +# Creates a ServerThread for given server and returns the channels for controlling and monitoring it def spawn_serverthread(server): thread_control_socket, spawner_control_socket = socket.socketpair() - ServerThread(server, thread_control_socket).start() - return spawner_control_socket + control_channel = channel.Channel() + logging_channel = channel.Channel() + ServerThread(server, control_channel, logging_channel).start() + return (control_channel, logging_channel) if __name__ == '__main__': - control_socket = spawn_serverthread(Server('irc.freenode.net', 6667)) + control_channel, logging_channel = spawn_serverthread(Server('irc.freenode.net', 6667)) while True: cmd = input(': ') if cmd == '': - control_messages = bytearray() - while True: - data = control_socket.recv(1024) - - if not data: - break - - control_messages.extend(data) - - print(control_messages.decode(encoding = 'utf-8', errors = 'replace')) + print(logging_channel.recv(blocking = False)) elif cmd == 'q': - control_socket.sendall(b'Q\n') - control_socket.close() + control_channel.send('q') break else: - control_socket.sendall(cmd.encode('utf-8') + b'\n') + control_channel.send(cmd)