oonbotti2/o2_botcmd.py

711 lines
20 KiB
Python

import eliza
import threading
import random
import re
import time
import functools
blacklist = []
doctor = eliza.eliza()
# channel: [user1, user2, ..., userN]
trusted = {}
trustedlock = threading.Lock()
gods = {}
godslock = threading.Lock()
# receiver: [(sender1, origin1, message1), (sender2, origin2, message2), ..., (senderN, origin2, messageN)]
msgs = {}
msgslock = threading.Lock()
# (ID, nick, account)
accountcheck = []
accountcheckid = 0
accountchecklock = threading.Lock()
die_expr=re.compile("#[0-9]*d([0-9]+|%)([+-][0-9]+)?$")
class Cron(threading.Thread):
def __init__(self):
self.timedjobs = []
self.timedjobslock = threading.Lock()
self.cronctrl = []
self.cronctrllock = threading.Lock()
threading.Thread.__init__(self)
def queuejob(self, time, fn):
self.timedjobslock.acquire()
self.timedjobs.append((time, fn))
self.timedjobslock.release()
def ctrl(self, cmd):
self.cronctrllock.acquire()
self.cronctrl.append(cmd)
self.cronctrllock.release()
def run(self):
run = True
while run:
time.sleep(1) # Accuracy doesn't need to be high
self.cronctrllock.acquire()
for cmd in self.cronctrl:
if cmd == 'QUIT':
run = False
self.cronctrl=[]
self.cronctrllock.release()
self.timedjobslock.acquire()
self.timedjobs = list(map((lambda time_fn: (time_fn[0]-1, time_fn[1])), self.timedjobs))
torun = list(map((lambda time_fn: time_fn[1]), filter((lambda time_fn: time_fn[0]<=0), self.timedjobs)))
self.timedjobs = list(filter((lambda time_fn: time_fn[0]>0), self.timedjobs))
self.timedjobslock.release()
for fn in torun:
fn()
def loadmessages():
global msgs, msgslock
with msgslock:
msgs = {}
f = open('msgs.txt', 'r')
for line in f:
while len(line) > 0 and line[-1] == '\n':
line = line[:-1]
if len(line.split('\t')) == 4:
receiver, sender, origin, msg = line.split('\t')
if receiver not in msgs:
msgs[receiver] = []
msgs[receiver].append((sender, origin, msg))
f.close()
def savemessages():
global msgs, msgslock
with msgslock:
f=open('msgs.txt', 'w')
for receiver in msgs:
for sender, origin, msg in msgs[receiver]:
f.write('%s\t%s\t%s\t%s\n' % (receiver, sender, origin, msg))
f.close()
loadmessages()
def addtrusted(chan, account):
global trusted, trustedlock
if type(chan) != str: chan = chan.decode()
if type(account) != str: account = account.decode()
trustedlock.acquire()
if chan not in trusted:
trusted[chan] = []
if account not in trusted[chan]:
trusted[chan].append(account)
trustedlock.release()
def rmtrusted(chan, account):
global trusted, trustedlock
if type(chan) != str: chan = chan.decode()
if type(account) != str: account = account.decode()
trustedlock.acquire()
if chan in trusted and account in trusted[chan]:
trusted[chan].remove(account)
trustedlock.release()
def loadtrusted():
global trusted, trustedlock
trustedlock.acquire()
trusted = {}
trustedlock.release()
f=open('trusted.txt', 'r')
for line in f:
while len(line) > 0 and line[-1] == '\n':
line = line[:-1]
if len(line) > 0:
chan, account = line.split()
addtrusted(chan, account)
f.close()
def loadgods():
global gods, godslock
godslock.acquire()
gods = {}
f=open('gods.txt', 'r')
for line in f:
while len(line) > 0 and line[-1] == '\n':
line = line[:-1]
if len(line) > 0:
chan, account = line.split()
if chan not in gods:
gods[chan] = []
gods[chan].append(account)
addtrusted(chan, account)
f.close()
godslock.release()
def savetrusted():
global trusted, trustedlock
trustedlock.acquire()
f=open('trusted.txt', 'w')
for chan in trusted:
for account in trusted[chan]:
f.write('%s %s\n' % (chan, account))
f.close
trustedlock.release()
def init():
global cron
cron = Cron()
cron.start()
loadtrusted()
loadgods()
def chmode(irc, chan, nick, mode, args):
if type(nick) != str: nick = nick.decode()
set_unset = mode[0].encode()
mode = mode[1:].encode()
if isauthorized(irc, chan, nick):
if args == ['']:
irc.send_raw(b'MODE %s %s %s' % (chan, set_unset+mode, nick.encode()))
else:
nicks = []
for nick in args:
nicks.append(nick.encode())
if len(nicks) == 4:
irc.send_raw(b'MODE %s %s %s' % (chan, set_unset+mode*4, b' '.join(nicks)))
nicks = []
if nicks:
irc.send_raw(b'MODE %s %s %s' % (chan, set_unset+mode*len(nicks), b' '.join(nicks)))
def istrusted(chan, account):
if type(chan) != str: chan = chan.decode()
if type(account) != str: account = account.decode()
trustedlock.acquire()
if chan in trusted and account in trusted[chan]:
trustedlock.release()
return True
else:
trustedlock.release()
return False
def initaccountcheck(nick):
global accountcheck, accountcheckid, accountchecklock
accountchecklock.acquire()
id = accountcheckid
accountcheck.append((id, nick, None))
accountcheckid += 1
accountchecklock.release()
return id
# Warning: this does no locking, should only be used internally
# The index returned cannot be guaranteed valid if lock is released between call to getindexbyaccountcheckid and use!
def getindexbyaccountcheckid(id):
global accountcheck
for index in range(len(accountcheck)):
ckid, cknick, ckaccount = accountcheck[index]
if ckid == id:
return index
return None
def setaccountcheckvalue(id, value):
global accountcheck, accountchecklock
accountchecklock.acquire()
index = getindexbyaccountcheckid(id)
if index is not None:
ckid, nick, ckvalue = accountcheck[index]
accountcheck[index] = (id, nick, value)
accountchecklock.release()
def getaccountcheckvalue(id):
global accountcheck, accountchecklock
accountchecklock.acquire()
index = getindexbyaccountcheckid(id)
if index is not None:
ckid, cknick, value = accountcheck[index]
accountchecklock.release()
return value
def removeaccountcheck(id):
global accountcheck, accountchecklock
accountchecklock.acquire()
index = getindexbyaccountcheckid(id)
if index is not None:
del accountcheck[index]
accountchecklock.release()
def getaccountcheckidbynick(nick):
global accountcheck, accountchecklock
if type(nick) != str: nick = nick.decode()
accountchecklock.acquire()
getid = lambda id_nick_account: id_nick_account[0]
filterbynick = lambda id_cknick_account: id_cknick_account[1] == nick
ids = list(map(getid, filter(filterbynick, accountcheck)))
accountchecklock.release()
return ids
def getaccount(irc, nick):
if type(nick) != str: nick = nick.decode()
id = initaccountcheck(nick)
irc.send_raw(b'WHOIS ' + nick.encode())
cron.queuejob(5, (lambda : setaccountcheckvalue(id, '')))
account = None
while account == None:
account = getaccountcheckvalue(id)
time.sleep(0.1)
removeaccountcheck(id)
if account == '': # '' Signifies failure
return None
else:
return account
def isauthorized(irc, chan, nick):
if type(nick) != str: nick = nick.decode()
account = getaccount(irc, nick)
if account:
return istrusted(chan, account)
else:
irc.bot_response(nick.encode(), 'Identify with NickServ')
class ArgsfmtError(Exception):
def __init__(self, msg):
self.msg = msg
def __str__(self):
return 'Error with argument format: ' + msg
ARG_STD = 0
ARG_OPT = 1
ARG_UNL = 2
def parseargsfmt(args):
# parses the argument format used by matchcmd and parsecmd
# e.g. parseargsfmt("foo [bar] {baz} ) -> [ARG_STD, ARG_OPT, ARG_UNL]
args = args.split(' ')
out = []
for arg in args:
if len(arg) >= 2 and arg[0] == '[' and arg[-1] == ']': # Optional (0-1) argument: [bar]
out.append(ARG_OPT)
elif len(arg) >= 2 and arg[0] == '{' and arg[-1] == '}': # Unlimited (0-) number of arguments: {baz}
out.append(ARG_UNL)
else: # Normal argument: foo
out.append(ARG_STD)
return out
def getargnums(argtypes):
min = 0
max = 0 # max = None if number of arguments is unlimited
for argtype in argtypes:
if argtype == ARG_STD:
min += 1
if max != None: # Don't try to increment if max is unlimited
max += 1
elif argtype == ARG_OPT:
if max != None: # Don't try to increment if max is unlimited
max += 1
elif argtype == ARG_UNL:
max = None
return min, max
def matchcmd(line, cmd, args=None):
# matchcmd(line, cmd) matched if the command cmd is used, matchcmd(line, cmd, args) checks whether the args match too
if len(line) == 0:
return False
if line[0] != cmd:
return False
if not args:
return True
min, max = getargnums(parseargsfmt(args))
if max and len(line)-1 >= min and len(line)-1 <= max:
return True
elif not max and len(line)-1 >= min:
return True
else:
return False
def parsecmd(line, args):
# Returns a tuple containing the arguments. An optional argument that didn't get a value will be assigned ''
argtypes = parseargsfmt(args)
if len(argtypes) >= 1 and ARG_UNL in argtypes[:-1]: # Disallow non-final unlimited arguments
raise ArgsfmtError('Non-final unlimited argument')
if len(list(filter((lambda type: type == ARG_OPT or type == ARG_UNL), argtypes))) > 1: # Disallow more than one optional or unlimited argument per argument string
raise ArgsfmtError('Ambiguous argument format')
# Remove the command
if len(line) == 0:
raise ArgsfmtError('No command given')
line = line[1:]
min, max = getargnums(argtypes)
if len(line) == min:
# Only standard arguments given
out = []
for type in argtypes:
if type == ARG_STD:
out.append(line[0])
line = line[1:]
else:
out.append('')
elif max and len(line) == max:
# Optional argument given
out = []
for type in argtypes:
if type == ARG_STD or type == ARG_OPT:
out.append(line[0])
line = line[1:]
else:
out.append('')
elif not max and len(line) > min:
# Unlimited argument given
out = []
for type in argtypes:
if type == ARG_STD or type == ARG_OPT:
out.append(line[0])
line = line[1:]
elif type == ARG_UNL:
out.append(' '.join(line))
line = []
else:
raise ArgsfmtError('Number of given arguments not possible for given format string')
if len(out) == 1:
return out[0]
else:
return out
def parse(nick, chan, command, arguments, irc):
global blacklist
global msgs, msgslock
global trusted, trustedlock, gods, godslock
global doctor, die_expr
zwsp = '\u200b'
if nick in blacklist:
return
elif len(arguments) >= 2 and len(arguments[1]) >= len(zwsp.encode('utf-8')) and arguments[1][:len(zwsp.encode('utf-8'))] == zwsp.encode('utf-8'): # If line begins with ZWSP
return
if command==b'PRIVMSG' and arguments[1][:1] != b' ':
reply = chan
cmdline = arguments[1].decode('utf-8').split(' ')
while '' in cmdline:
cmdline.remove('')
# #chan: channel override prefix
# Don't allow this in private messages for more transparent bot usage
if matchcmd(cmdline, '#chan') and chan != nick:
if matchcmd(cmdline, '#chan', 'channel {command}'):
newchan, newcmdline = parsecmd(cmdline, 'channel {command}')
newcmdline = newcmdline.split(' ')
if isauthorized(irc, newchan, nick):
chan = newchan.encode()
cmdline = newcmdline
else:
irc.bot_response(chan, usage('#chan'))
if matchcmd(cmdline, '#echo'):
text = parsecmd(cmdline, '{text}')
irc.bot_response(reply, text)
elif matchcmd(cmdline, '#op'):
args = parsecmd(cmdline, '{args}')
chmode(irc, chan, nick, '+o', args.split(' '))
elif matchcmd(cmdline, '#deop'):
args = parsecmd(cmdline, '{args}')
chmode(irc, chan, nick, '-o', args.split(' '))
elif matchcmd(cmdline, '#voice'):
args = parsecmd(cmdline, '{args}')
chmode(irc, chan, nick, '+v', args.split(' '))
elif matchcmd(cmdline, '#quiet'):
if matchcmd(cmdline, '#quiet', 'nick'):
arg = parsecmd(cmdline, 'nick')
chmode(irc, chan, nick, '+q', [arg + '!*@*'])
else:
irc.bot_response(reply, usage('#quiet'))
elif matchcmd(cmdline, '#dequiet'):
if matchcmd(cmdline, '#dequiet', 'nick'):
arg = parsecmd(cmdline, 'nick')
chmode(irc, chan, nick, '-q', [arg + '!*@*'])
else:
irc.bot_response(reply, usage('#dequiet'))
elif matchcmd(cmdline, '#devoice'):
args = parsecmd(cmdline, '{args}')
chmode(irc, chan, nick, '-v', args.split(' '))
elif matchcmd(cmdline, '#kick'):
if matchcmd(cmdline, '#kick', 'nick {reason}'):
kicknick, kickreason = parsecmd(cmdline, 'nick {reason}')
if kicknick.lower() == irc.get_nick().decode('utf-8'):
irc.send_raw(b'KICK %s %s :Fuck you' % (chan, nick))
else:
if isauthorized(irc, chan, nick):
irc.send_raw(b'KICK %s %s :%s'%(chan, kicknick.encode(), kickreason.encode()))
else:
irc.bot_response(reply, usage('#kick'))
elif matchcmd(cmdline, '#src'):
irc.bot_response(reply, 'https://ahti.space/git/nortti/oonbotti2')
elif matchcmd(cmdline, '#prefix') and chan == '#osdev-offtopic':
irc.bot_response(reply, 'gopher://ayu.smar.fi:7070/0/hash-prefix')
elif matchcmd(cmdline, '#msg'):
if matchcmd(cmdline, '#msg', 'nick {message}'):
msgnick, message = parsecmd(cmdline, 'nick {message}')
if chan == nick: # In a query:
origin = "[query]"
else: # In a channel
origin = chan.decode()
with msgslock:
if msgnick not in msgs:
msgs[msgnick] = []
msgs[msgnick].append((nick.decode(), origin, message))
savemessages()
else:
irc.bot_response(reply, usage('#msg'))
elif matchcmd(cmdline, '#trusted?'):
if matchcmd(cmdline, '#trusted?', '[nick]'):
trustnick = parsecmd(cmdline, '[nick]')
if trustnick == '':
trustnick = nick.decode()
account = getaccount(irc, trustnick)
if account:
if istrusted(chan, account):
irc.bot_response(reply, '%s is trusted' % trustnick)
else:
irc.bot_response(reply, '%s is not trusted' % trustnick)
else:
irc.bot_response(reply, 'Failed to get account for %s' % trustnick)
else:
irc.bot_response(reply, usage('#truste?'))
elif matchcmd(cmdline, '#trust'):
if matchcmd(cmdline, '#trust', 'nick'):
trustnick = parsecmd(cmdline, 'nick')
if isauthorized(irc, chan, nick):
account = getaccount(irc, trustnick)
if account:
addtrusted(chan, account)
savetrusted()
else:
irc.bot_response(reply, 'Failed to get account for %s' % trustnick)
else:
irc.bot_response(reply, usage('#trust'))
elif matchcmd(cmdline, '#untrust'):
if matchcmd(cmdline, '#untrust', 'nick'):
untrustnick = parsecmd(cmdline, 'nick')
if isauthorized(irc, chan, nick):
account = getaccount(irc, untrustnick)
# If account can't be found (e.g. it has been deleted, use the parameter as-is
if not account:
if istrusted(chan, untrustnick):
account = untrustnick
if account:
godslock.acquire()
if chan.decode() not in gods or account not in gods[chan.decode()]:
rmtrusted(chan.decode(), untrustnick)
godslock.release()
savetrusted()
else:
irc.bot_response(reply, 'Failed to get account for %s' % untrustnick)
else:
irc.bot_response(reply, usage('#untrust'))
elif matchcmd(cmdline, '#ls-trusted'):
trustedlock.acquire()
if chan.decode() in trusted:
lines = []
line = ''
for account in trusted[chan.decode()]:
if line == '':
line = account
elif len(line + ', ' + account) <= 255: # Playing it safe not to get truncated
line += ', ' + account
else:
lines.append(line)
line = account
if line != '':
lines.append(line)
for line in lines:
irc.bot_response(nick, '%s: %s' % (chan.decode(), line))
trustedlock.release()
elif matchcmd(cmdline, '#invite'):
irc.bot_response(chan, '%s: #invite has been removed. Use manual invite' % nick.decode())
elif matchcmd(cmdline, '#help'):
if matchcmd(cmdline, '#help', '[command]'):
command = parsecmd(cmdline, '[command]')
helptext = help(command)
if helptext:
irc.bot_response(reply, helptext)
elif matchcmd(cmdline, '#esoteric') and chan == '#esoteric':
irc.bot_response(reply, 'Nothing here')
elif cmdline[0] in [irc.get_nick().decode(), irc.get_nick().decode()+',', irc.get_nick().decode()+':']:
question = parsecmd(cmdline, '{question}')
if len(question) < 2 or question[:2] != ':D': # Mandated by #osdev-offtopic law
irc.bot_response(reply, '%s: %s' % (nick.decode(), doctor.respond(question)))
elif die_expr.match(cmdline[0]):
die = cmdline[0][1:].split('d')
times = int(die[0]) if die[0] else 1
if '+' in die[1]:
split = die[1].index('+')
plus = int(die[1][split + 1:])
die[1] = die[1][:split]
elif '-' in die[1]:
split = die[1].index('-')
plus = -int(die[1][split + 1:])
die[1] = die[1][:split]
else:
plus = 0
die = '%' if die[1] == '%' else int(die[1])
if die == '%':
if times != 1:
irc.bot_response(reply, 'Not supported')
else:
irc.bot_response(reply, '%s%s' % (random.randint(0,9), random.randint(0,9)))
elif die < 1:
irc.bot_response(reply, 'This die is not available in your space-time region.')
elif times < 1:
irc.bot_response(reply, 'What exactly do you want me to do?')
elif times > 128:
irc.bot_response(reply, 'Sorry, I don\'t have that many. Can I borrow yours?')
else:
rolls = [random.randint(1, die) for i in range(times)]
result = functools.reduce((lambda x, y: x + y), rolls)
if times > 1:
text = '%s (%s)' % (str(result), ', '.join([str(i) for i in rolls]))
else:
text = str(result)
if plus > 0:
text = '%i (%s + %i)' % (result + plus, text, plus)
elif plus < 0:
text = '%i (%s - %i)' % (result + plus, text, -plus)
irc.bot_response(reply, text)
elif command == b'330': # WHOIS: is logged in as
whoisnick = arguments[1].decode('utf-8')
account = arguments[2].decode('utf-8')
for id in getaccountcheckidbynick(whoisnick):
setaccountcheckvalue(id, account)
elif command == '318': # WHOIS: End of /WHOIS list.
whoisnick = arguments[1].decode('utf-8')
for id in getaccountcheckidbynick(whoisnick):
if getaccountcheckvalue(id) == None:
setaccountcheckvalue(id, '') # Mark as failed, '' is used because None is already reserved
elif command == b'INVITE' and arguments[0] == irc.get_nick() and arguments[1] in irc.get_channel().split(' '):
if isauthorized(irc, arguments[1], nick):
irc.send_raw(b'JOIN ' + arguments[1])
elif command == b'482':
irc.bot_response(arguments[1], 'Not op')
msgs_changed = False
with msgslock:
if (command == b'PRIVMSG' or command == b'JOIN') and nick.decode('utf-8') in msgs:
for sender, origin, msg in msgs.pop(nick.decode()):
irc.bot_response(nick, '%s <%s> %s' % (origin, sender, msg))
msgs_changed = True
if msgs_changed:
savemessages()
def usage(cmd, message = True):
usage = {'#echo': 'text',
'#op': '[nick]',
'#deop': '[nick]',
'#voice': '[nick]',
'#devoice': '[nick]',
'#quiet': 'nick',
'#dequiet': 'nick',
'#kick': 'nick [reason]',
'#src': '',
'#msg': 'nick message',
'#trusted?': '[nick]',
'#trust': 'nick',
'#untrust': 'nick',
'#ls-trusted': '',
'#chan': 'channel command',
'#help': '[command]'}
if cmd in usage:
if message:
return 'Usage: %s %s' % (cmd, usage[cmd])
else:
return usage[cmd]
else:
return None
def help(cmd):
helptext = {'#echo': '#echo text back',
'#op': 'give nick or yourself op rights in case you are trusted by oonbotti2 and identified with NickServ',
'#deop': 'remove your/nick\'s op rights',
'#voice': 'give nick or yourself voice in case you are trusted by oonbotti2 and identified with NickServ',
'#devoice': 'remove your or nick\'s voice in case you are trusted by oonbotti2 and identified with NickServ',
'#quiet': 'give +q to nick!*@*',
'#dequiet': 'remove +q from nick!*@*',
'#kick': 'kicks nick with specified reason',
'#src': 'paste a link to oonbotti2\'s git repo',
'#msg': 'send a message to nick',
'#trusted?': 'tell you if nick or yourself is trusted by oonbotti2',
'#trust': 'add nick to trusted list',
'#untrust': 'remove nick from trusted list',
'#ls-trusted': 'list nicks that are trusted. use only in a query',
'#chan': 'Runs the command as if it was sent on the specified channel. Requires user to be trusted',
'#help': 'give short info of command or list commands'}
if cmd=='':
return '#echo #op #deop #voice #devoice #quiet #dequiet #kick #src #msg #trusted? #trust #untrust #ls-trusted #chan #help'
elif cmd=='me':
return 'I shall.'
elif cmd in helptext:
if helptext[cmd]:
return '%s %s %s' % (cmd, usage(cmd, False), helptext[cmd])
else:
return '%s %s' % (cmd, usage(cmd, False))
else:
return None