diff --git a/trezor_agent/gpg/check.py b/trezor_agent/gpg/check.py index 7a3b2d1..f31f6dd 100755 --- a/trezor_agent/gpg/check.py +++ b/trezor_agent/gpg/check.py @@ -28,9 +28,9 @@ def verify(pubkey, sig_file): payload, checksum = data[:-3], data[-3:] assert util.crc24(payload) == checksum stream = io.BytesIO(payload) - parser = decode.Parser(util.Reader(stream), original_data(sig_file)) - signature, = list(parser) - decode.verify_digest(pubkey=pubkey, digest=signature['digest'], + + signature, digest = decode.load_signature(stream, original_data(sig_file)) + decode.verify_digest(pubkey=pubkey, digest=digest, signature=signature['sig'], label='GPG signature') log.info('%s OK', sig_file) diff --git a/trezor_agent/gpg/decode.py b/trezor_agent/gpg/decode.py index 2d38c22..f1245cf 100644 --- a/trezor_agent/gpg/decode.py +++ b/trezor_agent/gpg/decode.py @@ -75,7 +75,7 @@ SUPPORTED_CURVES = { class Parser(object): """Parse GPG packets from a given stream.""" - def __init__(self, stream, to_hash=None): + def __init__(self, stream): """Create an empty parser.""" self.stream = stream self.packet_types = { @@ -84,9 +84,6 @@ class Parser(object): 11: self.literal, 13: self.user_id, } - self.to_hash = io.BytesIO() - if to_hash: - self.to_hash.write(to_hash) def __iter__(self): """Support iterative parsing of available GPG packets.""" @@ -99,8 +96,8 @@ class Parser(object): filename_len = stream.readfmt('B') p['filename'] = stream.read(filename_len) p['date'] = stream.readfmt('>L') - with stream.capture(self.to_hash): - p['content'] = stream.read() + p['content'] = stream.read() + p['_to_hash'] = p['content'] return p def signature(self, stream): @@ -114,22 +111,14 @@ class Parser(object): p['pubkey_alg'] = stream.readfmt('B') p['hash_alg'] = stream.readfmt('B') p['hashed_subpackets'] = parse_subpackets(stream) - self.to_hash.write(to_hash.getvalue()) # https://tools.ietf.org/html/rfc4880#section-5.2.4 - self.to_hash.write(b'\x04\xff' + struct.pack('>L', to_hash.tell())) - data_to_sign = self.to_hash.getvalue() - log.debug('hashing %d bytes for signature: %r', - len(data_to_sign), data_to_sign) - digest = hashlib.sha256(data_to_sign).digest() + tail_to_hash = b'\x04\xff' + struct.pack('>L', to_hash.tell()) + + p['_to_hash'] = to_hash.getvalue() + tail_to_hash p['unhashed_subpackets'] = parse_subpackets(stream) p['hash_prefix'] = stream.readfmt('2s') - if p['hash_prefix'] != digest[:2]: - log.warning('Bad hash prefix: %r (expected %r)', - digest[:2], p['hash_prefix']) - else: - p['digest'] = digest p['sig'] = (parse_mpi(stream), parse_mpi(stream)) assert not stream.read() @@ -160,17 +149,16 @@ class Parser(object): data_to_hash = (b'\x99' + struct.pack('>H', len(packet_data)) + packet_data) p['key_id'] = hashlib.sha1(data_to_hash).digest()[-8:] + p['_to_hash'] = data_to_hash log.debug('key ID: %s', util.hexlify(p['key_id'])) - self.to_hash.write(data_to_hash) return p def user_id(self, stream): """See https://tools.ietf.org/html/rfc4880#section-5.11 for details.""" value = stream.read() - self.to_hash.write(b'\xb4' + struct.pack('>L', len(value))) - self.to_hash.write(value) - return {'type': 'user_id', 'value': value} + to_hash = b'\xb4' + util.prefix_len('>L', value) + return {'type': 'user_id', 'value': value, '_to_hash': to_hash} def __next__(self): """See https://tools.ietf.org/html/rfc4880#section-4.2 for details.""" @@ -203,15 +191,31 @@ class Parser(object): next = __next__ +def digest_packets(packets): + data_to_hash = io.BytesIO() + for p in packets: + data_to_hash.write(p['_to_hash']) + return hashlib.sha256(data_to_hash.getvalue()).digest() + + def load_public_key(stream): """Parse and validate GPG public key from an input stream.""" parser = Parser(util.Reader(stream)) pubkey, userid, signature = list(parser) + digest = digest_packets([pubkey, userid, signature]) + assert signature['hash_prefix'] == digest[:2] log.debug('loaded public key "%s"', userid['value']) - verify_digest(pubkey=pubkey, digest=signature['digest'], + verify_digest(pubkey=pubkey, digest=digest, signature=signature['sig'], label='GPG public key') return pubkey +def load_signature(stream, original_data): + parser = Parser(util.Reader(stream)) + signature, = parser + digest = digest_packets([{'_to_hash': original_data}, signature]) + assert signature['hash_prefix'] == digest[:2] + return signature, digest + def load_from_gpg(user_id): """Load existing GPG public key for `user_id` from local keyring."""