From 9a435ae23ea52781d7390dd5e846c16ce60ca975 Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Sun, 24 Apr 2016 13:04:53 +0300 Subject: [PATCH] gpg: minor renames and code refactoring --- trezor_agent/gpg/check.py | 10 +- trezor_agent/gpg/decode.py | 109 +++---------- trezor_agent/gpg/encode.py | 243 +++++++++++++++++++++++++++ trezor_agent/gpg/git_wrapper.py | 8 +- trezor_agent/gpg/signer.py | 280 +------------------------------- trezor_agent/util.py | 84 ++++++++++ 6 files changed, 369 insertions(+), 365 deletions(-) create mode 100644 trezor_agent/gpg/encode.py diff --git a/trezor_agent/gpg/check.py b/trezor_agent/gpg/check.py index f179582..58ddba9 100755 --- a/trezor_agent/gpg/check.py +++ b/trezor_agent/gpg/check.py @@ -20,15 +20,15 @@ def original_data(filename): def verify(pubkey, sig_file): - d = open(sig_file, 'rb') - if d.name.endswith('.asc'): - lines = d.readlines()[3:-1] """Verify correctness of public key and signature.""" + stream = open(sig_file, 'rb') + if stream.name.endswith('.asc'): + lines = stream.readlines()[3:-1] data = base64.b64decode(''.join(lines)) payload, checksum = data[:-3], data[-3:] assert util.crc24(payload) == checksum - d = io.BytesIO(payload) - parser = decode.Parser(decode.Reader(d), original_data(sig_file)) + stream = io.BytesIO(payload) + parser = decode.Parser(util.Reader(stream), original_data(sig_file)) signature, = list(parser) decode.verify_digest(pubkey=pubkey, digest=signature['digest'], signature=signature['sig'], label='GPG signature') diff --git a/trezor_agent/gpg/decode.py b/trezor_agent/gpg/decode.py index 75fd030..035285d 100644 --- a/trezor_agent/gpg/decode.py +++ b/trezor_agent/gpg/decode.py @@ -1,78 +1,24 @@ """Decoders for GPG v2 data structures.""" -import binascii -import contextlib import hashlib import io import logging import struct +import subprocess import ecdsa import ed25519 -from trezor_agent.util import num2bytes +from .. import util log = logging.getLogger(__name__) -def bit(value, i): - """Extract the i-th bit out of value.""" - return 1 if value & (1 << i) else 0 - - -def low_bits(value, n): - """Extract the lowest n bits out of value.""" - return value & ((1 << n) - 1) - - -def readfmt(stream, fmt): - """Read and unpack an object from stream, using a struct format string.""" - size = struct.calcsize(fmt) - blob = stream.read(size) - return struct.unpack(fmt, blob) - - -class Reader(object): - """Read basic type objects out of given stream.""" - - def __init__(self, stream): - """Create a non-capturing reader.""" - self.s = stream - self._captured = None - - def readfmt(self, fmt): - """Read a specified object, using a struct format string.""" - size = struct.calcsize(fmt) - blob = self.read(size) - obj, = struct.unpack(fmt, blob) - return obj - - def read(self, size=None): - """Read `size` bytes from stream.""" - blob = self.s.read(size) - if size is not None and len(blob) < size: - raise EOFError - if self._captured: - self._captured.write(blob) - return blob - - @contextlib.contextmanager - def capture(self, stream): - """Capture all data read during this context.""" - self._captured = stream - try: - yield - finally: - self._captured = None - -length_types = {0: '>B', 1: '>H', 2: '>L'} - - def parse_subpackets(s): """See https://tools.ietf.org/html/rfc4880#section-5.2.3.1 for details.""" subpackets = [] total_size = s.readfmt('>H') data = s.read(total_size) - s = Reader(io.BytesIO(data)) + s = util.Reader(io.BytesIO(data)) while True: try: @@ -92,23 +38,8 @@ def parse_mpi(s): return sum(v << (8 * i) for i, v in enumerate(reversed(blob))) -def split_bits(value, *bits): - """ - Split integer value into list of ints, according to `bits` list. - - For example, split_bits(0x1234, 4, 8, 4) == [0x1, 0x23, 0x4] - """ - result = [] - for b in reversed(bits): - mask = (1 << b) - 1 - result.append(value & mask) - value = value >> b - assert value == 0 - return reversed(result) - - def _parse_nist256p1_verifier(mpi): - prefix, x, y = split_bits(mpi, 4, 256, 256) + prefix, x, y = util.split_bits(mpi, 4, 256, 256) assert prefix == 4 point = ecdsa.ellipticcurve.Point(curve=ecdsa.NIST256p.curve, x=x, y=y) @@ -124,12 +55,12 @@ def _parse_nist256p1_verifier(mpi): def _parse_ed25519_verifier(mpi): - prefix, value = split_bits(mpi, 8, 256) + prefix, value = util.split_bits(mpi, 8, 256) assert prefix == 0x40 - vk = ed25519.VerifyingKey(num2bytes(value, size=32)) + vk = ed25519.VerifyingKey(util.num2bytes(value, size=32)) def _ed25519_verify(signature, digest): - sig = b''.join(num2bytes(val, size=32) + sig = b''.join(util.num2bytes(val, size=32) for val in signature) vk.verify(sig, digest) return _ed25519_verify @@ -229,7 +160,7 @@ class Parser(object): data_to_hash = (b'\x99' + struct.pack('>H', len(packet_data)) + packet_data) p['key_id'] = hashlib.sha1(data_to_hash).digest()[-8:] - log.debug('key ID: %s', binascii.hexlify(p['key_id']).decode('ascii')) + log.debug('key ID: %s', util.hexlify(p['key_id'])) self.to_hash.write(data_to_hash) return p @@ -249,20 +180,20 @@ class Parser(object): raise StopIteration log.debug('prefix byte: %02x', value) - assert bit(value, 7) == 1 - assert bit(value, 6) == 0 # new format not supported yet + assert util.bit(value, 7) == 1 + assert util.bit(value, 6) == 0 # new format not supported yet - tag = low_bits(value, 6) - length_type = low_bits(tag, 2) + tag = util.low_bits(value, 6) + length_type = util.low_bits(tag, 2) tag = tag >> 2 - fmt = length_types[length_type] + fmt = {0: '>B', 1: '>H', 2: '>L'}[length_type] log.debug('length_type: %s', fmt) packet_size = self.stream.readfmt(fmt) log.debug('packet length: %d', packet_size) packet_data = self.stream.read(packet_size) packet_type = self.packet_types.get(tag) if packet_type: - p = packet_type(Reader(io.BytesIO(packet_data))) + p = packet_type(util.Reader(io.BytesIO(packet_data))) else: raise ValueError('Unknown packet type: {}'.format(packet_type)) p['tag'] = tag @@ -274,7 +205,7 @@ class Parser(object): def load_public_key(stream): """Parse and validate GPG public key from an input stream.""" - parser = Parser(Reader(stream)) + parser = Parser(util.Reader(stream)) pubkey, userid, signature = list(parser) log.debug('loaded public key "%s"', userid['value']) verify_digest(pubkey=pubkey, digest=signature['digest'], @@ -282,6 +213,16 @@ def load_public_key(stream): return pubkey +def load_from_gpg(user_id): + """Load existing GPG public key for `user_id` from local keyring.""" + pubkey_bytes = subprocess.check_output(['gpg2', '--export', user_id]) + if pubkey_bytes: + return load_public_key(io.BytesIO(pubkey_bytes)) + else: + log.error('could not find public key %r in local GPG keyring', user_id) + raise KeyError(user_id) + + def verify_digest(pubkey, digest, signature, label): """Verify a digest signature from a specified public key.""" verifier = pubkey['verifier'] diff --git a/trezor_agent/gpg/encode.py b/trezor_agent/gpg/encode.py new file mode 100644 index 0000000..db23955 --- /dev/null +++ b/trezor_agent/gpg/encode.py @@ -0,0 +1,243 @@ +"""Create GPG ECDSA signatures and public keys using TREZOR device.""" +import base64 +import hashlib +import logging +import struct +import time + +from .. import client, factory, formats, util + +log = logging.getLogger(__name__) + + +def packet(tag, blob): + """Create small GPG packet.""" + assert len(blob) < 256 + length_type = 0 # : 1 byte for length + leading_byte = 0x80 | (tag << 2) | (length_type) + return struct.pack('>B', leading_byte) + util.prefix_len('>B', blob) + + +def subpacket(subpacket_type, fmt, *values): + """Create GPG subpacket.""" + blob = struct.pack(fmt, *values) if values else fmt + return struct.pack('>B', subpacket_type) + blob + + +def subpacket_long(subpacket_type, value): + """Create GPG subpacket with 32-bit unsigned integer.""" + return subpacket(subpacket_type, '>L', value) + + +def subpacket_time(value): + """Create GPG subpacket with time in seconds (since Epoch).""" + return subpacket_long(2, value) + + +def subpacket_byte(subpacket_type, value): + """Create GPG subpacket with 8-bit unsigned integer.""" + return subpacket(subpacket_type, '>B', value) + + +def subpackets(*items): + """Serialize several GPG subpackets.""" + prefixed = [util.prefix_len('>B', item) for item in items] + return util.prefix_len('>H', b''.join(prefixed)) + + +def mpi(value): + """Serialize multipresicion integer using GPG format.""" + bits = value.bit_length() + data_size = (bits + 7) // 8 + data_bytes = [0] * data_size + for i in range(data_size): + data_bytes[i] = value & 0xFF + value = value >> 8 + + data_bytes.reverse() + return struct.pack('>H', bits) + bytearray(data_bytes) + + +def _dump_nist256(vk): + return mpi((4 << 512) | + (vk.pubkey.point.x() << 256) | + (vk.pubkey.point.y())) + + +def _dump_ed25519(vk): + return mpi((0x40 << 256) | + util.bytes2num(vk.to_bytes())) + + +SUPPORTED_CURVES = { + formats.CURVE_NIST256: { + # https://tools.ietf.org/html/rfc6637#section-11 + 'oid': b'\x2A\x86\x48\xCE\x3D\x03\x01\x07', + 'algo_id': 19, + 'dump': _dump_nist256 + }, + formats.CURVE_ED25519: { + 'oid': b'\x2B\x06\x01\x04\x01\xDA\x47\x0F\x01', + 'algo_id': 22, + 'dump': _dump_ed25519 + } +} + + +def _find_curve_by_algo_id(algo_id): + curve_name, = [name for name, info in SUPPORTED_CURVES.items() + if info['algo_id'] == algo_id] + return curve_name + + +class Signer(object): + """Performs GPG operations with the TREZOR.""" + + def __init__(self, user_id, created, curve_name): + """Construct and loads a public key from the device.""" + self.user_id = user_id + assert curve_name in formats.SUPPORTED_CURVES + self.curve_name = curve_name + self.client_wrapper = factory.load() + + self.identity = self.client_wrapper.identity_type() + self.identity.proto = 'gpg' + self.identity.host = user_id + + addr = client.get_address(self.identity) + public_node = self.client_wrapper.connection.get_public_node( + n=addr, ecdsa_curve_name=self.curve_name) + + self.verifying_key = formats.decompress_pubkey( + pubkey=public_node.node.public_key, + curve_name=self.curve_name) + + self.created = int(created) + log.info('%s GPG public key %s created at %s', self.curve_name, + self.hex_short_key_id(), util.time_format(self.created)) + + @classmethod + def from_public_key(cls, pubkey, user_id): + """ + Create from an existing GPG public key. + + `pubkey` should be loaded via `load_from_gpg(user_id)` + from the local GPG keyring. + """ + s = Signer(user_id=user_id, + created=pubkey['created'], + curve_name=_find_curve_by_algo_id(pubkey['algo'])) + assert s.key_id() == pubkey['key_id'] + return s + + def _pubkey_data(self): + curve_info = SUPPORTED_CURVES[self.curve_name] + header = struct.pack('>BLB', + 4, # version + self.created, # creation + curve_info['algo_id']) + oid = util.prefix_len('>B', curve_info['oid']) + blob = curve_info['dump'](self.verifying_key) + return header + oid + blob + + def _pubkey_data_to_hash(self): + return b'\x99' + util.prefix_len('>H', self._pubkey_data()) + + def _fingerprint(self): + return hashlib.sha1(self._pubkey_data_to_hash()).digest() + + def key_id(self): + """Short (8 byte) GPG key ID.""" + return self._fingerprint()[-8:] + + def hex_short_key_id(self): + """Short (8 hexadecimal digits) GPG key ID.""" + return util.hexlify(self.key_id()[-4:]) + + def close(self): + """Close connection and turn off the screen of the device.""" + self.client_wrapper.connection.clear_session() + self.client_wrapper.connection.close() + + def export(self): + """Export GPG public key, ready for "gpg2 --import".""" + pubkey_packet = packet(tag=6, blob=self._pubkey_data()) + user_id_packet = packet(tag=13, blob=self.user_id) + + data_to_sign = (self._pubkey_data_to_hash() + + user_id_packet[:1] + + util.prefix_len('>L', self.user_id)) + log.info('signing public key "%s"', self.user_id) + hashed_subpackets = [ + subpacket_time(self.created), # signature creaion time + subpacket_byte(0x1B, 1 | 2), # key flags (certify & sign) + subpacket_byte(0x15, 8), # preferred hash (SHA256) + subpacket_byte(0x16, 0), # preferred compression (none) + subpacket_byte(0x17, 0x80)] # key server prefs (no-modify) + signature = self._make_signature(visual=self.hex_short_key_id(), + data_to_sign=data_to_sign, + sig_type=0x13, # user id & public key + hashed_subpackets=hashed_subpackets) + + sign_packet = packet(tag=2, blob=signature) + return pubkey_packet + user_id_packet + sign_packet + + def sign(self, msg, sign_time=None): + """Sign GPG message at specified time.""" + if sign_time is None: + sign_time = int(time.time()) + + log.info('signing %d byte message at %s', + len(msg), util.time_format(sign_time)) + hashed_subpackets = [subpacket_time(sign_time)] + blob = self._make_signature( + visual=self.hex_short_key_id(), + data_to_sign=msg, hashed_subpackets=hashed_subpackets) + return packet(tag=2, blob=blob) + + def _make_signature(self, visual, data_to_sign, + hashed_subpackets, sig_type=0): + curve_info = SUPPORTED_CURVES[self.curve_name] + header = struct.pack('>BBBB', + 4, # version + sig_type, # rfc4880 (section-5.2.1) + curve_info['algo_id'], + 8) # hash_alg (SHA256) + hashed = subpackets(*hashed_subpackets) + unhashed = subpackets( + subpacket(16, self.key_id()) # issuer key id + ) + tail = b'\x04\xff' + struct.pack('>L', len(header) + len(hashed)) + data_to_hash = data_to_sign + header + hashed + tail + + log.debug('hashing %d bytes', len(data_to_hash)) + digest = hashlib.sha256(data_to_hash).digest() + + result = self.client_wrapper.connection.sign_identity( + identity=self.identity, + challenge_hidden=digest, + challenge_visual=visual, + ecdsa_curve_name=self.curve_name) + assert result.signature[:1] == b'\x00' + sig = result.signature[1:] + sig = mpi(util.bytes2num(sig[:32])) + mpi(util.bytes2num(sig[32:])) + + return (header + hashed + unhashed + + digest[:2] + # used for decoder's sanity check + sig) # actual ECDSA signature + + +def _split_lines(body, size): + lines = [] + for i in range(0, len(body), size): + lines.append(body[i:i+size] + '\n') + return ''.join(lines) + + +def armor(blob, type_str): + """See https://tools.ietf.org/html/rfc4880#section-6 for details.""" + head = '-----BEGIN PGP {}-----\nVersion: GnuPG v2\n\n'.format(type_str) + body = base64.b64encode(blob) + checksum = base64.b64encode(util.crc24(blob)) + tail = '-----END PGP {}-----\n'.format(type_str) + return head + _split_lines(body, 64) + '=' + checksum + '\n' + tail diff --git a/trezor_agent/gpg/git_wrapper.py b/trezor_agent/gpg/git_wrapper.py index 7b6d9a2..33df87e 100755 --- a/trezor_agent/gpg/git_wrapper.py +++ b/trezor_agent/gpg/git_wrapper.py @@ -4,7 +4,7 @@ import logging import subprocess as sp import sys -from . import signer +from . import decode, encode log = logging.getLogger(__name__) @@ -22,12 +22,12 @@ def main(): command = args[0] user_id = ' '.join(args[1:]) assert command == '-bsau' # --detach-sign --sign --armor --local-user - pubkey = signer.load_from_gpg(user_id) - s = signer.Signer.from_public_key(user_id=user_id, pubkey=pubkey) + pubkey = decode.load_from_gpg(user_id) + s = encode.Signer.from_public_key(user_id=user_id, pubkey=pubkey) data = sys.stdin.read() sig = s.sign(data) - sig = signer.armor(sig, 'SIGNATURE') + sig = encode.armor(sig, 'SIGNATURE') sys.stdout.write(sig) s.close() diff --git a/trezor_agent/gpg/signer.py b/trezor_agent/gpg/signer.py index b77a626..0af2342 100755 --- a/trezor_agent/gpg/signer.py +++ b/trezor_agent/gpg/signer.py @@ -1,278 +1,14 @@ #!/usr/bin/env python -"""Create GPG ECDSA signatures and public keys using TREZOR device.""" +"""Create signatures and export public keys for GPG using TREZOR.""" import argparse -import base64 -import binascii -import hashlib -import io import logging -import struct -import subprocess import time -from . import decode, check -from .. import client, factory, formats, util +from . import check, decode, encode log = logging.getLogger(__name__) -def prefix_len(fmt, blob): - """Prefix `blob` with its size, serialized using `fmt` format.""" - return struct.pack(fmt, len(blob)) + blob - - -def packet(tag, blob): - """Create small GPG packet.""" - assert len(blob) < 256 - length_type = 0 # : 1 byte for length - leading_byte = 0x80 | (tag << 2) | (length_type) - return struct.pack('>B', leading_byte) + prefix_len('>B', blob) - - -def subpacket(subpacket_type, fmt, *values): - """Create GPG subpacket.""" - blob = struct.pack(fmt, *values) if values else fmt - return struct.pack('>B', subpacket_type) + blob - - -def subpacket_long(subpacket_type, value): - """Create GPG subpacket with 32-bit unsigned integer.""" - return subpacket(subpacket_type, '>L', value) - - -def subpacket_time(value): - """Create GPG subpacket with time in seconds (since Epoch).""" - return subpacket_long(2, value) - - -def subpacket_byte(subpacket_type, value): - """Create GPG subpacket with 8-bit unsigned integer.""" - return subpacket(subpacket_type, '>B', value) - - -def subpackets(*items): - """Serialize several GPG subpackets.""" - prefixed = [prefix_len('>B', item) for item in items] - return prefix_len('>H', b''.join(prefixed)) - - -def mpi(value): - """Serialize multipresicion integer using GPG format.""" - bits = value.bit_length() - data_size = (bits + 7) // 8 - data_bytes = [0] * data_size - for i in range(data_size): - data_bytes[i] = value & 0xFF - value = value >> 8 - - data_bytes.reverse() - return struct.pack('>H', bits) + bytearray(data_bytes) - - -def time_format(t): - """Utility for consistent time formatting.""" - return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(t)) - - -def hexlify(blob): - """Utility for consistent hexadecimal formatting.""" - return binascii.hexlify(blob).decode('ascii').upper() - - -def _dump_nist256(vk): - return mpi((4 << 512) | - (vk.pubkey.point.x() << 256) | - (vk.pubkey.point.y())) - - -def _dump_ed25519(vk): - return mpi((0x40 << 256) | - util.bytes2num(vk.to_bytes())) - - -SUPPORTED_CURVES = { - formats.CURVE_NIST256: { - # https://tools.ietf.org/html/rfc6637#section-11 - 'oid': b'\x2A\x86\x48\xCE\x3D\x03\x01\x07', - 'algo_id': 19, - 'dump': _dump_nist256 - }, - formats.CURVE_ED25519: { - 'oid': b'\x2B\x06\x01\x04\x01\xDA\x47\x0F\x01', - 'algo_id': 22, - 'dump': _dump_ed25519 - } -} - - -def _find_curve_by_algo_id(algo_id): - curve_name, = [name for name, info in SUPPORTED_CURVES.items() - if info['algo_id'] == algo_id] - return curve_name - - -class Signer(object): - """Performs GPG operations with the TREZOR.""" - - def __init__(self, user_id, created, curve_name): - """Construct and loads a public key from the device.""" - self.user_id = user_id - assert curve_name in formats.SUPPORTED_CURVES - self.curve_name = curve_name - self.client_wrapper = factory.load() - - self.identity = self.client_wrapper.identity_type() - self.identity.proto = 'gpg' - self.identity.host = user_id - - addr = client.get_address(self.identity) - public_node = self.client_wrapper.connection.get_public_node( - n=addr, ecdsa_curve_name=self.curve_name) - - self.verifying_key = formats.decompress_pubkey( - pubkey=public_node.node.public_key, - curve_name=self.curve_name) - - self.created = int(created) - log.info('%s GPG public key %s created at %s', self.curve_name, - self.hex_short_key_id(), time_format(self.created)) - - @classmethod - def from_public_key(cls, pubkey, user_id): - """ - Create from an existing GPG public key. - - `pubkey` should be loaded via `load_from_gpg(user_id)` - from the local GPG keyring. - """ - s = Signer(user_id=user_id, - created=pubkey['created'], - curve_name=_find_curve_by_algo_id(pubkey['algo'])) - assert s.key_id() == pubkey['key_id'] - return s - - def _pubkey_data(self): - curve_info = SUPPORTED_CURVES[self.curve_name] - header = struct.pack('>BLB', - 4, # version - self.created, # creation - curve_info['algo_id']) - oid = prefix_len('>B', curve_info['oid']) - blob = curve_info['dump'](self.verifying_key) - return header + oid + blob - - def _pubkey_data_to_hash(self): - return b'\x99' + prefix_len('>H', self._pubkey_data()) - - def _fingerprint(self): - return hashlib.sha1(self._pubkey_data_to_hash()).digest() - - def key_id(self): - """Short (8 byte) GPG key ID.""" - return self._fingerprint()[-8:] - - def hex_short_key_id(self): - """Short (8 hexadecimal digits) GPG key ID.""" - return hexlify(self.key_id()[-4:]) - - def close(self): - """Close connection and turn off the screen of the device.""" - self.client_wrapper.connection.clear_session() - self.client_wrapper.connection.close() - - def export(self): - """Export GPG public key, ready for "gpg2 --import".""" - pubkey_packet = packet(tag=6, blob=self._pubkey_data()) - user_id_packet = packet(tag=13, blob=self.user_id) - - user_id_to_hash = user_id_packet[:1] + prefix_len('>L', self.user_id) - data_to_sign = self._pubkey_data_to_hash() + user_id_to_hash - log.info('signing public key "%s"', self.user_id) - hashed_subpackets = [ - subpacket_time(self.created), # signature creaion time - subpacket_byte(0x1B, 1 | 2), # key flags (certify & sign) - subpacket_byte(0x15, 8), # preferred hash (SHA256) - subpacket_byte(0x16, 0), # preferred compression (none) - subpacket_byte(0x17, 0x80)] # key server prefs (no-modify) - signature = self._make_signature(visual=self.hex_short_key_id(), - data_to_sign=data_to_sign, - sig_type=0x13, # user id & public key - hashed_subpackets=hashed_subpackets) - - sign_packet = packet(tag=2, blob=signature) - return pubkey_packet + user_id_packet + sign_packet - - def sign(self, msg, sign_time=None): - """Sign GPG message at specified time.""" - if sign_time is None: - sign_time = int(time.time()) - - log.info('signing %d byte message at %s', - len(msg), time_format(sign_time)) - hashed_subpackets = [subpacket_time(sign_time)] - blob = self._make_signature( - visual=self.hex_short_key_id(), - data_to_sign=msg, hashed_subpackets=hashed_subpackets) - return packet(tag=2, blob=blob) - - def _make_signature(self, visual, data_to_sign, - hashed_subpackets, sig_type=0): - curve_info = SUPPORTED_CURVES[self.curve_name] - header = struct.pack('>BBBB', - 4, # version - sig_type, # rfc4880 (section-5.2.1) - curve_info['algo_id'], - 8) # hash_alg (SHA256) - hashed = subpackets(*hashed_subpackets) - unhashed = subpackets( - subpacket(16, self.key_id()) # issuer key id - ) - tail = b'\x04\xff' + struct.pack('>L', len(header) + len(hashed)) - data_to_hash = data_to_sign + header + hashed + tail - - log.debug('hashing %d bytes', len(data_to_hash)) - digest = hashlib.sha256(data_to_hash).digest() - - result = self.client_wrapper.connection.sign_identity( - identity=self.identity, - challenge_hidden=digest, - challenge_visual=visual, - ecdsa_curve_name=self.curve_name) - assert result.signature[:1] == b'\x00' - sig = result.signature[1:] - sig = mpi(util.bytes2num(sig[:32])) + mpi(util.bytes2num(sig[32:])) - - return (header + hashed + unhashed + - digest[:2] + # used for decoder's sanity check - sig) # actual ECDSA signature - - -def _split_lines(body, size): - lines = [] - for i in range(0, len(body), size): - lines.append(body[i:i+size] + '\n') - return ''.join(lines) - - -def armor(blob, type_str): - """See https://tools.ietf.org/html/rfc4880#section-6 for details.""" - head = '-----BEGIN PGP {}-----\nVersion: GnuPG v2\n\n'.format(type_str) - body = base64.b64encode(blob) - checksum = base64.b64encode(util.crc24(blob)) - tail = '-----END PGP {}-----\n'.format(type_str) - return head + _split_lines(body, 64) + '=' + checksum + '\n' + tail - - -def load_from_gpg(user_id): - """Load existing GPG public key for `user_id` from local keyring.""" - pubkey_bytes = subprocess.check_output(['gpg2', '--export', user_id]) - if pubkey_bytes: - return decode.load_public_key(io.BytesIO(pubkey_bytes)) - else: - log.error('could not find public key %r in local GPG keyring', user_id) - raise KeyError(user_id) - - def main(): """Main function.""" p = argparse.ArgumentParser() @@ -288,23 +24,23 @@ def main(): format='%(asctime)s %(levelname)-10s %(message)s') user_id = args.user_id.encode('ascii') if not args.filename: - s = Signer(user_id=user_id, created=args.time, - curve_name=args.ecdsa_curve) + s = encode.Signer(user_id=user_id, created=args.time, + curve_name=args.ecdsa_curve) pubkey = s.export() ext = '.pub' if args.armor: - pubkey = armor(pubkey, 'PUBLIC KEY BLOCK') + pubkey = encode.armor(pubkey, 'PUBLIC KEY BLOCK') ext = '.asc' filename = s.hex_short_key_id() + ext open(filename, 'wb').write(pubkey) log.info('import to local keyring using "gpg2 --import %s"', filename) else: - pubkey = load_from_gpg(user_id) - s = Signer.from_public_key(pubkey=pubkey, user_id=user_id) + pubkey = decode.load_from_gpg(user_id) + s = encode.Signer.from_public_key(pubkey=pubkey, user_id=user_id) data = open(args.filename, 'rb').read() sig, ext = s.sign(data), '.sig' if args.armor: - sig = armor(sig, 'SIGNATURE') + sig = encode.armor(sig, 'SIGNATURE') ext = '.asc' filename = args.filename + ext open(filename, 'wb').write(sig) diff --git a/trezor_agent/util.py b/trezor_agent/util.py index 7e96428..89c3170 100644 --- a/trezor_agent/util.py +++ b/trezor_agent/util.py @@ -1,6 +1,9 @@ """Various I/O and serialization utilities.""" +import binascii +import contextlib import io import struct +import time def send(conn, data): @@ -93,3 +96,84 @@ def crc24(blob): crc_bytes = struct.pack('>L', crc) assert crc_bytes[0] == b'\x00' return crc_bytes[1:] + + +def bit(value, i): + """Extract the i-th bit out of value.""" + return 1 if value & (1 << i) else 0 + + +def low_bits(value, n): + """Extract the lowest n bits out of value.""" + return value & ((1 << n) - 1) + + +def split_bits(value, *bits): + """ + Split integer value into list of ints, according to `bits` list. + + For example, split_bits(0x1234, 4, 8, 4) == [0x1, 0x23, 0x4] + """ + result = [] + for b in reversed(bits): + mask = (1 << b) - 1 + result.append(value & mask) + value = value >> b + assert value == 0 + return reversed(result) + + +def readfmt(stream, fmt): + """Read and unpack an object from stream, using a struct format string.""" + size = struct.calcsize(fmt) + blob = stream.read(size) + return struct.unpack(fmt, blob) + + +def prefix_len(fmt, blob): + """Prefix `blob` with its size, serialized using `fmt` format.""" + return struct.pack(fmt, len(blob)) + blob + + +def time_format(t): + """Utility for consistent time formatting.""" + return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(t)) + + +def hexlify(blob): + """Utility for consistent hexadecimal formatting.""" + return binascii.hexlify(blob).decode('ascii').upper() + + +class Reader(object): + """Read basic type objects out of given stream.""" + + def __init__(self, stream): + """Create a non-capturing reader.""" + self.s = stream + self._captured = None + + def readfmt(self, fmt): + """Read a specified object, using a struct format string.""" + size = struct.calcsize(fmt) + blob = self.read(size) + obj, = struct.unpack(fmt, blob) + return obj + + def read(self, size=None): + """Read `size` bytes from stream.""" + blob = self.s.read(size) + if size is not None and len(blob) < size: + raise EOFError + if self._captured: + self._captured.write(blob) + return blob + + @contextlib.contextmanager + def capture(self, stream): + """Capture all data read during this context.""" + self._captured = stream + try: + yield + finally: + self._captured = None