From d3a214ec86affa65bdf9835b964f55312bc99f44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juhani=20Krekel=C3=A4?= Date: Sun, 10 Dec 2017 17:43:39 +0200 Subject: [PATCH] Add ping timeouting --- constants.py | 5 +- cron.py | 150 +++++++++++++++++++++++++++++++++++++++++++++++++++ ircbot.py | 41 +++++++++++--- 3 files changed, 187 insertions(+), 9 deletions(-) create mode 100644 cron.py diff --git a/constants.py b/constants.py index 8981855..93719ee 100644 --- a/constants.py +++ b/constants.py @@ -7,4 +7,7 @@ class internal_submessage_types(enum.Enum): quit, error = range(2) class controlmessage_types(enum.Enum): - quit, send_line = range(2) + quit, send_line, ping, ping_timeout = range(4) + +class cronmessage_types(enum.Enum): + quit, schedule, delete, reschedule = range(4) diff --git a/cron.py b/cron.py new file mode 100644 index 0000000..8acebbf --- /dev/null +++ b/cron.py @@ -0,0 +1,150 @@ +import select +import threading +import time +from collections import namedtuple + +import channel +from constants import cronmessage_types + +# time field uses the monotonic time returned by time.monotonic() +Event = namedtuple('Event', ['time', 'channel', 'message']) + +class CronThread(threading.Thread): + def __init__(self, cron_control_channel): + self.cron_control_channel = cron_control_channel + + # Sorted array of events + self.events = [] + + threading.Thread.__init__(self) + + def get_timeout_value(self): + if len(self.events) == 0: + # No events, block until we get a message + # Timeout of -1 makes poll block indefinitely + return -1 + + else: + # First event in the array is always the earliest + seconds_to_wait = self.events[0].time - time.monotonic() + + # time.monotonic() uses fractional second but poll uses milliseconds, convert + ms_to_wait = int(seconds_to_wait * 1000) + + # In case we somehow took long enough that next one should be run by now, make it run now + if ms_to_wait < 0: + ms_to_wait = 0 + + return ms_to_wait + + def run_events(self): + assert len(self.events) > 0 + + current_time = time.monotonic() + + # Look for first event that should be run after current time, and split the array there + # index should point at the first to be after current time, or at end of array + # Either way, we can split the array at that location, first part being what to run and second rest + index = 0 + while index < len(self.events): + if self.events[index].time > current_time: + break + index += 1 + + # Split array + to_run = self.events[:index] + self.events = self.events[index:] + + # Run events + for event in to_run: + event.channel.send(event.message) + + def add_event(self, event): + # Look for first event that should be run after event, and split the array there + # index should point at the first to be after event, or at end of array + # Either way, we can split the array at that location safely + index = 0 + while index < len(self.events): + if self.events[index].time > event.time: + break + index += 1 + + self.events = self.events[:index] + [event] + self.events[index:] + + def delete_event(self, event): + # Try to find the element with same channel and message + index = 0 + while index < len(self.events): + if self.events[index].channel == event.channel and self.events[index].message == event.message: + break + index += 1 + + if index < len(self.events): + # The event at index is the one we need to delete + self.events = self.events[:index] + self.events[index + 1:] + + def reschedule_event(self, event): + self.delete_event(event) + self.add_event(event) + + def run(self): + # Create poll object and register the control channel + # The poll object is used to implement both waiting and control of the cron thread + poll = select.poll() + poll.register(self.cron_control_channel, select.POLLIN) + + while True: + timeout = self.get_timeout_value() + poll_result = poll.poll(timeout) + + if len(poll_result) == 0: + # No fds were ready → we timed out. Time to run some events + self.run_events() + + else: + # New message was received, handle it + command_type, *arguments = self.cron_control_channel.recv() + + if command_type == cronmessage_types.quit: + break + + elif command_type == cronmessage_types.schedule: + event, = arguments + self.add_event(event) + + elif command_type == cronmessage_types.delete: + event, = arguments + self.delete_event(event) + + elif command_type == cronmessage_types.reschedule: + event, = arguments + self.reschedule_event(event) + + else: + assert False #unreachable + +def start(): + cron_control_channel = channel.Channel() + CronThread(cron_control_channel).start() + return cron_control_channel + +def quit(cron_control_channel): + """Stop the cron instance""" + cron_control_channel.send((cronmessage_types.quit,)) + +def schedule(cron_control_channel, seconds, channel, message): + """Schedules message to be send to channel""" + event = Event(time.monotonic() + seconds, channel, message) + cron_control_channel.send((cronmessage_types.schedule, event)) + +def delete(cron_control_channel, channel, message): + """Remove an event. If event is not found, this is a no-op. + Matches events based on channel and message, and only applies to the earlier one found.""" + event = Event(None, channel, message) + cron_control_channel.send((cronmessage_types.delete, event)) + +def reschedule(cron_control_channel, seconds, channel, message): + """Reschedules message to be send to channel. If event is not found, a new one is created. + Matches events based on channel and message, and only applies to the earlier one found.""" + event = Event(time.monotonic() + seconds, channel, message) + cron_control_channel.send((cronmessage_types.reschedule, event)) diff --git a/ircbot.py b/ircbot.py index 5fa527c..91a564f 100644 --- a/ircbot.py +++ b/ircbot.py @@ -7,8 +7,9 @@ from collections import namedtuple import channel from constants import logmessage_types, internal_submessage_types, controlmessage_types -import line_handling import botcmd +import cron +import line_handling Server = namedtuple('Server', ['host', 'port', 'nick', 'realname', 'channels']) @@ -100,9 +101,10 @@ class API: # ServerThread(server, control_socket) # Creates a new server main loop thread class ServerThread(threading.Thread): - def __init__(self, server, control_channel, logging_channel): + def __init__(self, server, control_channel, cron_control_channel, logging_channel): self.server = server self.control_channel = control_channel + self.cron_control_channel = cron_control_channel self.logging_channel = logging_channel self.server_socket_write_lock = threading.Lock() @@ -122,14 +124,18 @@ class ServerThread(threading.Thread): with self.server_socket_write_lock: self.server_socket.sendall(line + b'\r\n') - # Don't log PONGs - if not (len(line) >= 5 and line[:5] == b'PONG '): + # Don't log PINGs or PONGs + if not (len(line) >= 5 and (line[:5] == b'PING ' or line[:5] == b'PONG ')): self.logging_channel.send((logmessage_types.sent, line.decode(encoding = 'utf-8', errors = 'replace'))) def handle_line(self, line): command, _, arguments = line.partition(b' ') - if command.upper() == b'PING': + split = line.split(b' ') + if len(split) >= 1 and split[0].upper() == b'PING': self.send_line_raw(b'PONG ' + arguments) + elif len(split) >= 2 and split[0][0:1] == b':' and split[1].upper() == b'PONG': + # No need to do anything special for PONGs + pass else: self.logging_channel.send((logmessage_types.received, line.decode(encoding = 'utf-8', errors = 'replace'))) line_handling.handle_line(line, irc = self.api) @@ -163,6 +169,10 @@ class ServerThread(threading.Thread): self.handle_line(line) + # Remove possible pending ping timeout timer and reset ping timer to 5 minutes + cron.delete(self.cron_control_channel, self.control_channel, (controlmessage_types.ping_timeout,)) + cron.reschedule(self.cron_control_channel, 5 * 60, self.control_channel, (controlmessage_types.ping,)) + # Control elif fd == self.control_channel.fileno(): command_type, *arguments = self.control_channel.recv() @@ -175,6 +185,16 @@ class ServerThread(threading.Thread): line = irc_command.upper() + space + arguments self.send_line_raw(line) + elif command_type == controlmessage_types.ping: + assert len(arguments) == 0 + self.send_line_raw(b'PING :foo') + # Reset ping timeout timer to 3 minutes + cron.reschedule(self.cron_control_channel, 3 * 60, self.control_channel, (controlmessage_types.ping_timeout,)) + + elif command_type == controlmessage_types.ping_timeout: + self.logging_channel.send((logmessage_types.internal, internal_submessage_types.error, 'Ping timeout')) + quitting = True + else: error_message = 'Unknown control message: %s' % repr((command_type, *arguments)) self.logging_channel.send((logmessage_types.internal, internal_submessage_types.error, error_message)) @@ -215,12 +235,15 @@ class ServerThread(threading.Thread): # Tell controller we're quiting self.logging_channel.send((logmessage_types.internal, internal_submessage_types.quit)) + # Tell cron we're quiting + cron.quit(cron_control_channel) + # spawn_serverthread(server, logging_channel) → control_channel # Creates a ServerThread for given server and returns the channel for controlling it -def spawn_serverthread(server, logging_channel): +def spawn_serverthread(server, cron_control_channel, logging_channel): thread_control_socket, spawner_control_socket = socket.socketpair() control_channel = channel.Channel() - ServerThread(server, control_channel, logging_channel).start() + ServerThread(server, control_channel, cron_control_channel, logging_channel).start() return control_channel # spawn_loggerthread() → logging_channel, dead_notify_channel @@ -236,8 +259,9 @@ if __name__ == '__main__': botcmd.initialize() + cron_control_channel = cron.start() logging_channel, dead_notify_channel = spawn_loggerthread() - control_channel = spawn_serverthread(server, logging_channel) + control_channel = spawn_serverthread(server, cron_control_channel, logging_channel) while True: message = dead_notify_channel.recv(blocking = False) @@ -250,6 +274,7 @@ if __name__ == '__main__': print('Keyboard quit') control_channel.send((controlmessage_types.quit,)) logging_channel.send((logmessage_types.internal, internal_submessage_types.quit)) + cron.quit(cron_control_channel) break elif len(cmd) > 0 and cmd[0] == '/':