import select import threading import time from collections import namedtuple import channel from constants import cronmessage_types # time field uses the monotonic time returned by time.monotonic() Event = namedtuple('Event', ['time', 'channel', 'message']) class CronThread(threading.Thread): def __init__(self, cron_control_channel): self.cron_control_channel = cron_control_channel # Sorted array of events self.events = [] threading.Thread.__init__(self) def get_timeout_value(self): if len(self.events) == 0: # No events, block until we get a message # Timeout of -1 makes poll block indefinitely return -1 else: # First event in the array is always the earliest seconds_to_wait = self.events[0].time - time.monotonic() # time.monotonic() uses fractional second but poll uses milliseconds, convert ms_to_wait = int(seconds_to_wait * 1000) # In case we somehow took long enough that next one should be run by now, make it run now if ms_to_wait < 0: ms_to_wait = 0 return ms_to_wait def run_events(self): assert len(self.events) > 0 current_time = time.monotonic() # Look for first event that should be run after current time, and split the array there # index should point at the first to be after current time, or at end of array # Either way, we can split the array at that location, first part being what to run and second rest index = 0 while index < len(self.events): if self.events[index].time > current_time: break index += 1 # Split array to_run = self.events[:index] self.events = self.events[index:] # Run events for event in to_run: event.channel.send(event.message) def add_event(self, event): # Look for first event that should be run after event, and split the array there # index should point at the first to be after event, or at end of array # Either way, we can split the array at that location safely index = 0 while index < len(self.events): if self.events[index].time > event.time: break index += 1 self.events = self.events[:index] + [event] + self.events[index:] def delete_event(self, event): # Try to find the element with same channel and message index = 0 while index < len(self.events): if self.events[index].channel == event.channel and self.events[index].message == event.message: break index += 1 if index < len(self.events): # The event at index is the one we need to delete self.events = self.events[:index] + self.events[index + 1:] def reschedule_event(self, event): self.delete_event(event) self.add_event(event) def run(self): # Create poll object and register the control channel # The poll object is used to implement both waiting and control of the cron thread poll = select.poll() poll.register(self.cron_control_channel, select.POLLIN) while True: timeout = self.get_timeout_value() poll_result = poll.poll(timeout) if len(poll_result) == 0: # No fds were ready → we timed out. Time to run some events self.run_events() else: # New message was received, handle it command_type, *arguments = self.cron_control_channel.recv() if command_type == cronmessage_types.quit: break elif command_type == cronmessage_types.schedule: event, = arguments self.add_event(event) elif command_type == cronmessage_types.delete: event, = arguments self.delete_event(event) elif command_type == cronmessage_types.reschedule: event, = arguments self.reschedule_event(event) else: assert False #unreachable def start(): cron_control_channel = channel.Channel() CronThread(cron_control_channel).start() return cron_control_channel def quit(cron_control_channel): """Stop the cron instance""" cron_control_channel.send((cronmessage_types.quit,)) def schedule(cron_control_channel, seconds, channel, message): """Schedules message to be send to channel""" event = Event(time.monotonic() + seconds, channel, message) cron_control_channel.send((cronmessage_types.schedule, event)) def delete(cron_control_channel, channel, message): """Remove an event. If event is not found, this is a no-op. Matches events based on channel and message, and only applies to the earlier one found.""" event = Event(None, channel, message) cron_control_channel.send((cronmessage_types.delete, event)) def reschedule(cron_control_channel, seconds, channel, message): """Reschedules message to be send to channel. If event is not found, a new one is created. Matches events based on channel and message, and only applies to the earlier one found.""" event = Event(time.monotonic() + seconds, channel, message) cron_control_channel.send((cronmessage_types.reschedule, event))