Move to a text format

This commit is contained in:
Juhani Krekelä 2018-08-29 12:32:01 +03:00
parent e4042c0664
commit 9e0696bbdc
3 changed files with 127 additions and 73 deletions

View File

@ -2,7 +2,7 @@ from collections import namedtuple
import hashing import hashing
# Entry(bytes[32], bytes[32], bytes[32], bytes[0…2¹⁶-1]) # Entry(bytes[32], bytes[32], bytes[32], str)
Entry = namedtuple('Entry', ['salt', 'hashed_host', 'fingerprint', 'comment']) Entry = namedtuple('Entry', ['salt', 'hashed_host', 'fingerprint', 'comment'])
class UnacceptableComment(Exception): pass class UnacceptableComment(Exception): pass
@ -47,10 +47,4 @@ def create_entry(domain, port, fingerprint, comment):
if '\n' in comment: if '\n' in comment:
raise UnacceptableComment('Comment contains newlines') raise UnacceptableComment('Comment contains newlines')
comment_encoded = comment.encode('utf-8') return Entry(salt, hashed_host, fingerprint, comment)
# Comment may be at max 2¹⁶-1 bytes long
if len(comment_encoded) >= 1<<16:
raise UnacceptableComment('Comment length of %i bytes is too long' % len(comment_encoded))
return Entry(salt, hashed_host, fingerprint, comment_encoded)

View File

@ -1,67 +1,113 @@
import base64
import entry import entry
class FileFormatError(Exception): pass class FileFormatError(Exception): pass
class VersionMismatch(Exception): pass class VersionMismatch(Exception): pass
def check_header(f): def parse_header(header):
"""check_header(file(rb)) """parse_header(bytes) → str
Throw an error if the header isn't good""" Throw an error if the header isn't good and return the file comment
# Magic is b'WOT' (if any) if it is"""
magic = f.read(3) assert type(header) == bytes
if magic != b'WOT':
magic = header[0:6]
if magic != b'SSHWOT':
raise FileFormatError('Invalid magic') raise FileFormatError('Invalid magic')
# Version 0 is the current one # Version 0 is the current one
version = f.read(1) version = header[6:7]
if version == b'': if version == b'':
raise FileFormatError('Unexpected end of file') raise FileFormatError('No newline after header')
if version != b'\0': if version != b'0':
raise VersionMismatch('Version %i not supported' % version[0]) raise VersionMismatch('Version %i not supported' % version[0])
def read_entry(f): # See if we have a comment
"""read_entry(file(rb)) → Entry / None if header[7:8] == b' ':
Returns None if the end of file has been reached""" # It says we have
# u8[32]: salt if header[8:9] == b'\n':
salt = f.read(32) # No, we don't, but we do have a space telling we
if len(salt) == 0: # have. The header is malformed
# End of file has been reached, return None to mark that raise FileFormatError('Missing comment or spurious space in the header')
return None else:
elif len(salt) != 32: # Yes, we do
raise FileFormatError('Unexpected end of file') # Check it ends with a newline
if header[-1] != 0x0a:
raise FileFormatError('Missing newline at the end of the header')
# u8[32]: hashed_host try:
hashed_host = f.read(32) file_comment = header[8:-1].decode('utf-8')
if len(hashed_host) != 32: except UnicodeDecodeError:
raise FileFormatError('Unexpected end of file') raise FileFormatError('Comment is not valid utf-8')
# u8[32]: fingerprint return file_comment
fingerprint = f.read(32)
if len(fingerprint) != 32:
raise FileFormatError('Unexpected end of file')
# u16le: comment_length elif header[7:8] == b'\n':
length_bytes = f.read(2) # No, we have newline
if len(length_bytes) != 2: return ''
raise FileFormatError('Unexpected end of file')
comment_length = length_bytes[0] | length_bytes[1] << 8
# u8[comment_length]: comment else:
comment = f.read(comment_length) # No, we have something else
if len(comment) != comment_length: raise FileFormatError("Expected a space or a newline but got '%s' instead" % header[7:].decode('utf-8'))
raise FileFormatError('Unexpected end of file')
def parse_entry(line):
"""parse_entry(bytes) → Entry"""
assert type(line) == bytes
def extract_b64_field(rest):
"""extract_b64_field(bytes) → (bytes: decoded_field, bytes:rest)"""
field_b64 = rest[0:44]
if len(field_b64) != 44:
raise FileFormatError('Unexpected end of line')
try:
field = base64.b64decode(field_b64, validate = True)
except (ValueError, base64.binascii.Error) as err:
raise FileFormatError('Malformed base64 string: %s' % field_b64.decode('utf-8')) from err
return field, rest[44:]
salt, rest = extract_b64_field(line)
hashed_host, rest = extract_b64_field(rest)
fingerprint, rest = extract_b64_field(rest)
# What do we have after that?
if rest[0:1] == b' ':
# A comment?
if rest[1:2] == b'\n':
# No, but it says we have. It's malformed
raise FileFormatError('Missing comment or spurious space in the entry')
else:
# Yes. Make sure it ends in a newline
if rest[-1] != 0x0a:
raise FileFormatError('No newline after entry')
try:
comment = rest[1:-1].decode('utf-8')
except UnicodeDecodeError:
raise FileFormatError('Comment is not valid utf-8')
elif rest[0:1] == b'\n':
# A newline
comment = ''
else:
# Something else
raise FileFormatError('Expected a space or a newline but got "%s" instead' % rest.decode('utf-8'))
return entry.Entry(salt, hashed_host, fingerprint, comment) return entry.Entry(salt, hashed_host, fingerprint, comment)
def read(f): def read(f):
"""read_file(file(rb)) → [Entry]""" """read(file(rb)) → ([Entry]: entries, str: file_comment)"""
check_header(f) lines = [line for line in f]
if len(lines) == 0:
raise FileFormatError('Missing header')
file_comment = parse_header(lines[0])
entries = [] entries = []
while True: for line in lines[1:]:
# Read until we reach the end of file entries.append(parse_entry(line))
entry = read_entry(f)
if entry is None: break
entries.append(entry)
return entries return entries, file_comment

View File

@ -1,39 +1,53 @@
def write_header(f): import base64
"""write_header(file(wb))
def write_header(f, file_comment):
"""write_header(file(wb), str)
Writes the header to the given file.""" Writes the header to the given file."""
# b'WOT' magic assert type(file_comment) == str
f.write(b'WOT') # b'SSHWOT' magic
f.write(b'SSHWOT')
# Version number # Version number
f.write(bytes([0])) f.write(b'0')
# b' ' + file_comment, if there is one
if len(file_comment) > 0:
f.write(b' ')
assert b'\n' not in file_comment
f.write(file_comment)
# End of header marked with b'\n'
f.write(b'\n')
def write_entry(f, salt, hashed_host, fingerprint, comment): def write_entry(f, salt, hashed_host, fingerprint, comment):
"""write_entry(file(wb), bytes[32], bytes[32], bytes[32], bytes[0…2¹⁶-1]) """write_entry(file(wb), bytes[32], bytes[32], bytes[32], str)
Writes an entry to the given file.""" Writes an entry to the given file."""
assert type(salt) == bytes and len(salt) == 32 assert type(salt) == bytes and len(salt) == 32
assert type(hashed_host) == bytes and len(hashed_host) == 32 assert type(hashed_host) == bytes and len(hashed_host) == 32
assert type(fingerprint) == bytes and len(fingerprint) == 32 assert type(fingerprint) == bytes and len(fingerprint) == 32
assert type(comment) == bytes and 0 <= len(comment) <= (1<<16) - 1 assert type(comment) == str
# u8[32]: salt # base64 encoded (44 bytes): salt
f.write(salt) f.write(base64.b64encode(salt))
# u8[32]: hashed_host # base64 encoded (44 bytes): hashed_host
f.write(hashed_host) f.write(base64.b64encode(hashed_host))
# u8[32]: fingerprint # base64 encoded (44 bytes): fingerprint
f.write(fingerprint) f.write(base64.b64encode(fingerprint))
# u16le: len(comment) # b' ' + comment, if there is one
comment_len = len(comment) if len(comment) > 0:
f.write(bytes([comment_len & 0xff, comment_len >> 8])) f.write(b' ')
assert '\n' not in comment
f.write(comment.encode('utf-8'))
# u8[]: comment # End of entry marked with b'\n'
f.write(comment) f.write(b'\n')
def write(f, entries): def write(f, entries, file_comment = ''):
"""write(file(wb), [Entry]) """write(file(wb), [Entry], str)
Creates a file containing all of the entries""" Creates a file containing all of the entries"""
write_header(f) assert type(file_comment) == str
write_header(f, file_comment)
for entry in entries: for entry in entries:
write_entry(f, entry.salt, entry.hashed_host, entry.fingerprint, entry.comment) write_entry(f, entry.salt, entry.hashed_host, entry.fingerprint, entry.comment)