mirror of
https://github.com/romanz/amodem.git
synced 2026-04-25 00:37:35 +08:00
gpg: refactor decode to functional style
This commit is contained in:
@@ -1,10 +1,7 @@
|
||||
#!/usr/bin/env python
|
||||
"""Check GPG v2 signature for a given public key."""
|
||||
import argparse
|
||||
import base64
|
||||
import io
|
||||
import logging
|
||||
import pprint
|
||||
|
||||
from . import decode
|
||||
from .. import util
|
||||
@@ -21,7 +18,7 @@ def main():
|
||||
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO,
|
||||
format='%(asctime)s %(levelname)-10s %(message)s')
|
||||
stream = open(args.pubkey, 'rb')
|
||||
parser = decode.Parser(util.Reader(stream))
|
||||
parser = decode.parse_packets(util.Reader(stream))
|
||||
pubkey, userid, sig1, subkey, sig2 = parser
|
||||
|
||||
digest = decode.digest_packets([pubkey, userid, sig1])
|
||||
|
||||
@@ -74,25 +74,7 @@ SUPPORTED_CURVES = {
|
||||
}
|
||||
|
||||
|
||||
class Parser(object):
|
||||
"""Parse GPG packets from a given stream."""
|
||||
|
||||
def __init__(self, stream):
|
||||
"""Create an empty parser."""
|
||||
self.stream = stream
|
||||
self.packet_types = {
|
||||
2: self.signature,
|
||||
6: self.pubkey,
|
||||
11: self.literal,
|
||||
13: self.user_id,
|
||||
14: self.subkey,
|
||||
}
|
||||
|
||||
def __iter__(self):
|
||||
"""Support iterative parsing of available GPG packets."""
|
||||
return self
|
||||
|
||||
def literal(self, stream):
|
||||
def _parse_literal(stream):
|
||||
"""See https://tools.ietf.org/html/rfc4880#section-5.9 for details."""
|
||||
p = {'type': 'literal'}
|
||||
p['format'] = stream.readfmt('c')
|
||||
@@ -103,16 +85,17 @@ class Parser(object):
|
||||
p['_to_hash'] = p['content']
|
||||
return p
|
||||
|
||||
def _embedded_signatures(self, subpackets):
|
||||
|
||||
def _parse_embedded_signatures(subpackets):
|
||||
for packet in subpackets:
|
||||
data = bytearray(packet)
|
||||
if data[0] == 32:
|
||||
# https://tools.ietf.org/html/rfc4880#section-5.2.3.26
|
||||
stream = io.BytesIO(data[1:])
|
||||
yield self.signature(util.Reader(stream))
|
||||
yield _parse_signature(util.Reader(stream))
|
||||
|
||||
|
||||
def signature(self, stream):
|
||||
def _parse_signature(stream):
|
||||
"""See https://tools.ietf.org/html/rfc4880#section-5.2 for details."""
|
||||
p = {'type': 'signature'}
|
||||
|
||||
@@ -130,17 +113,17 @@ class Parser(object):
|
||||
p['_to_hash'] = to_hash.getvalue() + tail_to_hash
|
||||
|
||||
p['unhashed_subpackets'] = parse_subpackets(stream)
|
||||
embedded = list(self._embedded_signatures(p['unhashed_subpackets']))
|
||||
embedded = list(_parse_embedded_signatures(p['unhashed_subpackets']))
|
||||
if embedded:
|
||||
p['embedded'] = embedded
|
||||
|
||||
p['hash_prefix'] = stream.readfmt('2s')
|
||||
p['sig'] = (parse_mpi(stream), parse_mpi(stream))
|
||||
assert not stream.read()
|
||||
|
||||
return p
|
||||
|
||||
def pubkey(self, stream):
|
||||
|
||||
def _parse_pubkey(stream):
|
||||
"""See https://tools.ietf.org/html/rfc4880#section-5.5 for details."""
|
||||
p = {'type': 'pubkey'}
|
||||
packet = io.BytesIO()
|
||||
@@ -169,7 +152,8 @@ class Parser(object):
|
||||
log.debug('key ID: %s', util.hexlify(p['key_id']))
|
||||
return p
|
||||
|
||||
def subkey(self, stream):
|
||||
|
||||
def _parse_subkey(stream):
|
||||
"""See https://tools.ietf.org/html/rfc4880#section-5.5 for details."""
|
||||
p = {'type': 'subkey'}
|
||||
packet = io.BytesIO()
|
||||
@@ -200,20 +184,36 @@ class Parser(object):
|
||||
log.debug('key ID: %s', util.hexlify(p['key_id']))
|
||||
return p
|
||||
|
||||
def user_id(self, stream):
|
||||
|
||||
def _parse_user_id(stream):
|
||||
"""See https://tools.ietf.org/html/rfc4880#section-5.11 for details."""
|
||||
value = stream.read()
|
||||
to_hash = b'\xb4' + util.prefix_len('>L', value)
|
||||
return {'type': 'user_id', 'value': value, '_to_hash': to_hash}
|
||||
|
||||
def __next__(self):
|
||||
"""See https://tools.ietf.org/html/rfc4880#section-4.2 for details."""
|
||||
try:
|
||||
value = self.stream.readfmt('B')
|
||||
except EOFError:
|
||||
raise StopIteration
|
||||
|
||||
log.debug('prefix byte: %02x', value)
|
||||
PACKET_TYPES = {
|
||||
2: _parse_signature,
|
||||
6: _parse_pubkey,
|
||||
11: _parse_literal,
|
||||
13: _parse_user_id,
|
||||
14: _parse_subkey,
|
||||
}
|
||||
|
||||
|
||||
def parse_packets(stream):
|
||||
"""
|
||||
Support iterative parsing of available GPG packets.
|
||||
|
||||
See https://tools.ietf.org/html/rfc4880#section-4.2 for details.
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
value = stream.readfmt('B')
|
||||
except EOFError:
|
||||
return
|
||||
|
||||
log.debug('prefix byte: %s', bin(value))
|
||||
assert util.bit(value, 7) == 1
|
||||
assert util.bit(value, 6) == 0 # new format not supported yet
|
||||
|
||||
@@ -222,22 +222,24 @@ class Parser(object):
|
||||
tag = tag >> 2
|
||||
fmt = {0: '>B', 1: '>H', 2: '>L'}[length_type]
|
||||
log.debug('length_type: %s', fmt)
|
||||
packet_size = self.stream.readfmt(fmt)
|
||||
packet_size = stream.readfmt(fmt)
|
||||
|
||||
log.debug('packet length: %d', packet_size)
|
||||
packet_data = self.stream.read(packet_size)
|
||||
packet_type = self.packet_types.get(tag)
|
||||
packet_data = stream.read(packet_size)
|
||||
packet_type = PACKET_TYPES.get(tag)
|
||||
|
||||
if packet_type:
|
||||
p = packet_type(util.Reader(io.BytesIO(packet_data)))
|
||||
else:
|
||||
raise ValueError('Unknown packet type: {}'.format(tag))
|
||||
|
||||
p['tag'] = tag
|
||||
log.debug('packet "%s": %s', p['type'], p)
|
||||
return p
|
||||
|
||||
next = __next__
|
||||
yield p
|
||||
|
||||
|
||||
def digest_packets(packets):
|
||||
"""Compute digest on specified packets, according to '_to_hash' field."""
|
||||
data_to_hash = io.BytesIO()
|
||||
for p in packets:
|
||||
data_to_hash.write(p['_to_hash'])
|
||||
@@ -246,8 +248,8 @@ def digest_packets(packets):
|
||||
|
||||
def load_public_key(stream):
|
||||
"""Parse and validate GPG public key from an input stream."""
|
||||
parser = Parser(util.Reader(stream))
|
||||
pubkey, userid, signature = list(parser)
|
||||
packets = list(parse_packets(util.Reader(stream)))
|
||||
pubkey, userid, signature = packets
|
||||
digest = digest_packets([pubkey, userid, signature])
|
||||
assert signature['hash_prefix'] == digest[:2]
|
||||
log.debug('loaded public key "%s"', userid['value'])
|
||||
@@ -257,8 +259,8 @@ def load_public_key(stream):
|
||||
|
||||
|
||||
def load_signature(stream, original_data):
|
||||
parser = Parser(util.Reader(stream))
|
||||
signature, = parser
|
||||
"""Load signature from stream, and compute GPG digest for verification."""
|
||||
signature, = list(parse_packets(util.Reader(stream)))
|
||||
digest = digest_packets([{'_to_hash': original_data}, signature])
|
||||
assert signature['hash_prefix'] == digest[:2]
|
||||
return signature, digest
|
||||
|
||||
Reference in New Issue
Block a user