Validate input encoding

This commit is contained in:
Juhani Krekelä 2019-07-15 22:57:54 +03:00
parent cbb4252441
commit 79dec74dcf
1 changed files with 85 additions and 18 deletions

View File

@ -123,6 +123,54 @@ def format_status(status):
else:
raise ValueError('Unknown status %i' % status)
class NonCharacterError(Exception): pass
class ControlCharacterError(Exception): pass
def validate_encoding(text, newline_allowed):
# We assume text is valid unicode, so we skip stuff like surrogate pairs
for char in text:
codepoint = ord(char)
# Reject non-characters
if codepoint & 0xffff in [0xfffe, 0xffff]:
# Plane end non-character
raise NonCharacterError(char)
elif 0xfdd0 <= codepoint <= 0xfdef:
# BMP non-character block
raise NonCharacterError(char)
# Reject control characters
if codepoint <= 0x1f:
# C0 control characters
if not newline_allowed or codepoint != 0x0a:
raise ControlCharacterError(char)
elif 0x80 <= codepoint <= 0x9f:
# C1 control characters
raise ControlCharacterError(char)
elif codepoint in [0x2028, 0x2029]:
# U+2028 LINE SEPARATOR and U+2029 PARAGRAPH SEPARATOR
raise ControlCharacterError(char)
class NickLengthError(Exception): pass
def validate_nick(nick):
validate_encoding(nick, newline_allowed = False)
# Nick length is stored as one byte
if len(nick.encode('utf-8')) > 255:
raise NickLengthError
class MessageLengthError(Exception): pass
def validate_message(message):
validate_encoding(message, newline_allowed = True)
# Maximum frame payload is 1500 bytes
# -2 for EtherMess packet header
# -2 for msgid
# -2 for message length
# = 1494
if len(message.encode('utf-8')) > 1494:
raise MessageLengthError
def send_message(backend, mac, message):
encoded = message.encode('utf-8')
writeall(backend, b'm' + mac + bytes([len(encoded) >> 8, len(encoded) & 0xff]) + encoded)
@ -131,6 +179,8 @@ def queue_message(backend, mac, message):
global send_queue
global next_queue_id
validate_message(message)
if len(send_queue) == 0:
# Nothing being processed atm, send directly
send_message(backend, mac, message)
@ -158,13 +208,12 @@ def handle_user_command(backend, line):
global own_nick, own_status
global default_target_mac
if len(line) > 0 and line[0] == '/':
command, _, rest = line.partition(' ')
try:
if len(line) > 0 and line[0] == '/':
command, _, rest = line.partition(' ')
try:
if command == '/':
# Send quoted message
# TODO: Validate message
if default_target_mac is None:
print('--- Default target not set, set with /target')
else:
@ -201,7 +250,8 @@ def handle_user_command(backend, line):
elif command == '/nick':
# Change nick
# TODO: Validate nick
validate_nick(rest)
own_nick = rest
set_status_nick(backend, own_status, own_nick)
@ -217,18 +267,30 @@ def handle_user_command(backend, line):
# Display usage
print('--- / <message>; /msg <target> <message>; /status [<target>]; /available; /unavailable; /nick <nick>; /target <target>; /quit')
except NoMatchesError as err:
print('--- name %s matches no peers' % err.args[0])
except TooManyMatchesError as err:
print('--- name %s matches several peers' % err.args[0])
else:
# Send message
if default_target_mac is None:
print('--- Default target not set, set with /target')
else:
queue_message(backend, default_target_mac, line)
# Send message
if default_target_mac is None:
print('--- Default target not set, set with /target')
else:
queue_message(backend, default_target_mac, line)
except NoMatchesError as err:
print('--- Name %s matches no peers' % err.args[0])
except TooManyMatchesError as err:
print('--- Name %s matches several peers' % err.args[0])
except NonCharacterError as err:
print('--- Error: contains non-character U+%04X' % ord(err.args[0]))
except ControlCharacterError as err:
print('--- Error: contains control character U+%04X' % ord(err.args[0]))
except NickLengthError:
print('--- Error: nick too long (max. 255B)')
except MessageLengthError:
print('--- Error: message too long (max. 1494B)')
def handle_status(mac, status, nick):
global peers
@ -413,7 +475,7 @@ def eventloop(proc):
raise ValueError('Unknown event type from backend: %s' % repr(event_type))
elif fd == proc.stdout.fileno() and event & select.POLLHUP:
print('Backend exited')
print('--- Backend exited')
running = False
elif fd == unbuf_stdin.fileno() and event & select.POLLIN:
@ -426,7 +488,12 @@ def eventloop(proc):
break
line, _, input_buffer = input_buffer.partition(b'\n')
if handle_user_command(proc.stdin, line.decode('utf-8')) == 'quit':
try:
line = line.decode('utf-8')
except UnicodeDecodeError:
print('--- Error: malformed utf-8')
if handle_user_command(proc.stdin, line) == 'quit':
writeall(proc.stdin, b'q')
running = False