diff --git a/setup.py b/setup.py index 601e124..5e0da7a 100644 --- a/setup.py +++ b/setup.py @@ -30,5 +30,7 @@ setup( entry_points={'console_scripts': [ 'trezor-agent = trezor_agent.__main__:run_agent', 'trezor-git = trezor_agent.__main__:run_git', + 'trezor-gpg = trezor_agent.gpg.signer:main', + 'trezor-git-gpg-wrapper = trezor_agent.gpg.git_wrapper:main', ]}, ) diff --git a/trezor_agent/__main__.py b/trezor_agent/__main__.py index 54f935c..37d354d 100644 --- a/trezor_agent/__main__.py +++ b/trezor_agent/__main__.py @@ -2,8 +2,8 @@ import argparse import functools import logging -import re import os +import re import subprocess import sys import time diff --git a/trezor_agent/gpg/__init__.py b/trezor_agent/gpg/__init__.py new file mode 100644 index 0000000..f0448b4 --- /dev/null +++ b/trezor_agent/gpg/__init__.py @@ -0,0 +1,9 @@ +""" +TREZOR support for ECDSA GPG signatures. + +See these links for more details: + - https://www.gnupg.org/faq/whats-new-in-2.1.html + - https://tools.ietf.org/html/rfc4880 + - https://tools.ietf.org/html/rfc6637 + - https://tools.ietf.org/html/draft-irtf-cfrg-eddsa-05 +""" diff --git a/trezor_agent/gpg/check.py b/trezor_agent/gpg/check.py new file mode 100755 index 0000000..58ddba9 --- /dev/null +++ b/trezor_agent/gpg/check.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python +"""Check GPG v2 signature for a given public key.""" +import argparse +import base64 +import io +import logging + +from . import decode +from .. import util + +log = logging.getLogger(__name__) + + +def original_data(filename): + """Locate and load original file data, whose signature is provided.""" + parts = filename.rsplit('.', 1) + if len(parts) == 2 and parts[1] in ('sig', 'asc'): + log.debug('loading file %s', parts[0]) + return open(parts[0], 'rb').read() + + +def verify(pubkey, sig_file): + """Verify correctness of public key and signature.""" + stream = open(sig_file, 'rb') + if stream.name.endswith('.asc'): + lines = stream.readlines()[3:-1] + data = base64.b64decode(''.join(lines)) + payload, checksum = data[:-3], data[-3:] + assert util.crc24(payload) == checksum + stream = io.BytesIO(payload) + parser = decode.Parser(util.Reader(stream), original_data(sig_file)) + signature, = list(parser) + decode.verify_digest(pubkey=pubkey, digest=signature['digest'], + signature=signature['sig'], label='GPG signature') + log.info('%s OK', sig_file) + + +def main(): + """Main function.""" + logging.basicConfig(level=logging.DEBUG, + format='%(asctime)s %(levelname)-10s %(message)s') + p = argparse.ArgumentParser() + p.add_argument('pubkey') + p.add_argument('signature') + args = p.parse_args() + verify(pubkey=decode.load_public_key(open(args.pubkey, 'rb')), + sig_file=args.signature) + +if __name__ == '__main__': + main() diff --git a/trezor_agent/gpg/decode.py b/trezor_agent/gpg/decode.py new file mode 100644 index 0000000..035285d --- /dev/null +++ b/trezor_agent/gpg/decode.py @@ -0,0 +1,234 @@ +"""Decoders for GPG v2 data structures.""" +import hashlib +import io +import logging +import struct +import subprocess + +import ecdsa +import ed25519 + +from .. import util + +log = logging.getLogger(__name__) + + +def parse_subpackets(s): + """See https://tools.ietf.org/html/rfc4880#section-5.2.3.1 for details.""" + subpackets = [] + total_size = s.readfmt('>H') + data = s.read(total_size) + s = util.Reader(io.BytesIO(data)) + + while True: + try: + subpacket_len = s.readfmt('B') + except EOFError: + break + + subpackets.append(s.read(subpacket_len)) + + return subpackets + + +def parse_mpi(s): + """See https://tools.ietf.org/html/rfc4880#section-3.2 for details.""" + bits = s.readfmt('>H') + blob = bytearray(s.read(int((bits + 7) // 8))) + return sum(v << (8 * i) for i, v in enumerate(reversed(blob))) + + +def _parse_nist256p1_verifier(mpi): + prefix, x, y = util.split_bits(mpi, 4, 256, 256) + assert prefix == 4 + point = ecdsa.ellipticcurve.Point(curve=ecdsa.NIST256p.curve, + x=x, y=y) + vk = ecdsa.VerifyingKey.from_public_point( + point=point, curve=ecdsa.curves.NIST256p, + hashfunc=hashlib.sha256) + + def _nist256p1_verify(signature, digest): + vk.verify_digest(signature=signature, + digest=digest, + sigdecode=lambda rs, order: rs) + return _nist256p1_verify + + +def _parse_ed25519_verifier(mpi): + prefix, value = util.split_bits(mpi, 8, 256) + assert prefix == 0x40 + vk = ed25519.VerifyingKey(util.num2bytes(value, size=32)) + + def _ed25519_verify(signature, digest): + sig = b''.join(util.num2bytes(val, size=32) + for val in signature) + vk.verify(sig, digest) + return _ed25519_verify + + +SUPPORTED_CURVES = { + b'\x2A\x86\x48\xCE\x3D\x03\x01\x07': _parse_nist256p1_verifier, + b'\x2B\x06\x01\x04\x01\xDA\x47\x0F\x01': _parse_ed25519_verifier, +} + + +class Parser(object): + """Parse GPG packets from a given stream.""" + + def __init__(self, stream, to_hash=None): + """Create an empty parser.""" + self.stream = stream + self.packet_types = { + 2: self.signature, + 6: self.pubkey, + 11: self.literal, + 13: self.user_id, + } + self.to_hash = io.BytesIO() + if to_hash: + self.to_hash.write(to_hash) + + def __iter__(self): + """Support iterative parsing of available GPG packets.""" + return self + + def literal(self, stream): + """See https://tools.ietf.org/html/rfc4880#section-5.9 for details.""" + p = {'type': 'literal'} + p['format'] = stream.readfmt('c') + filename_len = stream.readfmt('B') + p['filename'] = stream.read(filename_len) + p['date'] = stream.readfmt('>L') + with stream.capture(self.to_hash): + p['content'] = stream.read() + return p + + def signature(self, stream): + """See https://tools.ietf.org/html/rfc4880#section-5.2 for details.""" + p = {'type': 'signature'} + + to_hash = io.BytesIO() + with stream.capture(to_hash): + p['version'] = stream.readfmt('B') + p['sig_type'] = stream.readfmt('B') + p['pubkey_alg'] = stream.readfmt('B') + p['hash_alg'] = stream.readfmt('B') + p['hashed_subpackets'] = parse_subpackets(stream) + self.to_hash.write(to_hash.getvalue()) + + # https://tools.ietf.org/html/rfc4880#section-5.2.4 + self.to_hash.write(b'\x04\xff' + struct.pack('>L', to_hash.tell())) + data_to_sign = self.to_hash.getvalue() + log.debug('hashing %d bytes for signature: %r', + len(data_to_sign), data_to_sign) + digest = hashlib.sha256(data_to_sign).digest() + + p['unhashed_subpackets'] = parse_subpackets(stream) + p['hash_prefix'] = stream.readfmt('2s') + if p['hash_prefix'] != digest[:2]: + log.warning('Bad hash prefix: %r (expected %r)', + digest[:2], p['hash_prefix']) + else: + p['digest'] = digest + p['sig'] = (parse_mpi(stream), parse_mpi(stream)) + assert not stream.read() + + return p + + def pubkey(self, stream): + """See https://tools.ietf.org/html/rfc4880#section-5.5 for details.""" + p = {'type': 'pubkey'} + packet = io.BytesIO() + with stream.capture(packet): + p['version'] = stream.readfmt('B') + p['created'] = stream.readfmt('>L') + p['algo'] = stream.readfmt('B') + + # https://tools.ietf.org/html/rfc6637#section-11 + oid_size = stream.readfmt('B') + oid = stream.read(oid_size) + assert oid in SUPPORTED_CURVES + parser = SUPPORTED_CURVES[oid] + + mpi = parse_mpi(stream) + log.debug('mpi: %x (%d bits)', mpi, mpi.bit_length()) + p['verifier'] = parser(mpi) + assert not stream.read() + + # https://tools.ietf.org/html/rfc4880#section-12.2 + packet_data = packet.getvalue() + data_to_hash = (b'\x99' + struct.pack('>H', len(packet_data)) + + packet_data) + p['key_id'] = hashlib.sha1(data_to_hash).digest()[-8:] + log.debug('key ID: %s', util.hexlify(p['key_id'])) + self.to_hash.write(data_to_hash) + + return p + + def user_id(self, stream): + """See https://tools.ietf.org/html/rfc4880#section-5.11 for details.""" + value = stream.read() + self.to_hash.write(b'\xb4' + struct.pack('>L', len(value))) + self.to_hash.write(value) + return {'type': 'user_id', 'value': value} + + def __next__(self): + """See https://tools.ietf.org/html/rfc4880#section-4.2 for details.""" + try: + value = self.stream.readfmt('B') + except EOFError: + raise StopIteration + + log.debug('prefix byte: %02x', value) + assert util.bit(value, 7) == 1 + assert util.bit(value, 6) == 0 # new format not supported yet + + tag = util.low_bits(value, 6) + length_type = util.low_bits(tag, 2) + tag = tag >> 2 + fmt = {0: '>B', 1: '>H', 2: '>L'}[length_type] + log.debug('length_type: %s', fmt) + packet_size = self.stream.readfmt(fmt) + log.debug('packet length: %d', packet_size) + packet_data = self.stream.read(packet_size) + packet_type = self.packet_types.get(tag) + if packet_type: + p = packet_type(util.Reader(io.BytesIO(packet_data))) + else: + raise ValueError('Unknown packet type: {}'.format(packet_type)) + p['tag'] = tag + log.debug('packet "%s": %s', p['type'], p) + return p + + next = __next__ + + +def load_public_key(stream): + """Parse and validate GPG public key from an input stream.""" + parser = Parser(util.Reader(stream)) + pubkey, userid, signature = list(parser) + log.debug('loaded public key "%s"', userid['value']) + verify_digest(pubkey=pubkey, digest=signature['digest'], + signature=signature['sig'], label='GPG public key') + return pubkey + + +def load_from_gpg(user_id): + """Load existing GPG public key for `user_id` from local keyring.""" + pubkey_bytes = subprocess.check_output(['gpg2', '--export', user_id]) + if pubkey_bytes: + return load_public_key(io.BytesIO(pubkey_bytes)) + else: + log.error('could not find public key %r in local GPG keyring', user_id) + raise KeyError(user_id) + + +def verify_digest(pubkey, digest, signature, label): + """Verify a digest signature from a specified public key.""" + verifier = pubkey['verifier'] + try: + verifier(signature, digest) + log.debug('%s is OK', label) + except ecdsa.keys.BadSignatureError: + log.error('Bad %s!', label) + raise diff --git a/trezor_agent/gpg/demo.sh b/trezor_agent/gpg/demo.sh new file mode 100755 index 0000000..655a547 --- /dev/null +++ b/trezor_agent/gpg/demo.sh @@ -0,0 +1,19 @@ +#!/bin/bash +set -x +CREATED=1460731897 # needed for consistent public key creation +NAME="trezor_key" # will be used as GPG user id and public key name + +echo "Hello GPG World!" > EXAMPLE +# Create, sign and export the public key +./signer.py $NAME --time $CREATED + +# Install GPG v2.1 (modern) and import the public key +gpg2 --import $NAME.pub +gpg2 --list-keys $NAME + +# Perform actual GPG signature using TREZOR +./signer.py $NAME EXAMPLE +./check.py $NAME.pub EXAMPLE.sig # pure Python verification + +# gpg2 --edit-key trezor_key trust # optional: mark it as trusted +gpg2 --verify EXAMPLE.sig diff --git a/trezor_agent/gpg/encode.py b/trezor_agent/gpg/encode.py new file mode 100644 index 0000000..db23955 --- /dev/null +++ b/trezor_agent/gpg/encode.py @@ -0,0 +1,243 @@ +"""Create GPG ECDSA signatures and public keys using TREZOR device.""" +import base64 +import hashlib +import logging +import struct +import time + +from .. import client, factory, formats, util + +log = logging.getLogger(__name__) + + +def packet(tag, blob): + """Create small GPG packet.""" + assert len(blob) < 256 + length_type = 0 # : 1 byte for length + leading_byte = 0x80 | (tag << 2) | (length_type) + return struct.pack('>B', leading_byte) + util.prefix_len('>B', blob) + + +def subpacket(subpacket_type, fmt, *values): + """Create GPG subpacket.""" + blob = struct.pack(fmt, *values) if values else fmt + return struct.pack('>B', subpacket_type) + blob + + +def subpacket_long(subpacket_type, value): + """Create GPG subpacket with 32-bit unsigned integer.""" + return subpacket(subpacket_type, '>L', value) + + +def subpacket_time(value): + """Create GPG subpacket with time in seconds (since Epoch).""" + return subpacket_long(2, value) + + +def subpacket_byte(subpacket_type, value): + """Create GPG subpacket with 8-bit unsigned integer.""" + return subpacket(subpacket_type, '>B', value) + + +def subpackets(*items): + """Serialize several GPG subpackets.""" + prefixed = [util.prefix_len('>B', item) for item in items] + return util.prefix_len('>H', b''.join(prefixed)) + + +def mpi(value): + """Serialize multipresicion integer using GPG format.""" + bits = value.bit_length() + data_size = (bits + 7) // 8 + data_bytes = [0] * data_size + for i in range(data_size): + data_bytes[i] = value & 0xFF + value = value >> 8 + + data_bytes.reverse() + return struct.pack('>H', bits) + bytearray(data_bytes) + + +def _dump_nist256(vk): + return mpi((4 << 512) | + (vk.pubkey.point.x() << 256) | + (vk.pubkey.point.y())) + + +def _dump_ed25519(vk): + return mpi((0x40 << 256) | + util.bytes2num(vk.to_bytes())) + + +SUPPORTED_CURVES = { + formats.CURVE_NIST256: { + # https://tools.ietf.org/html/rfc6637#section-11 + 'oid': b'\x2A\x86\x48\xCE\x3D\x03\x01\x07', + 'algo_id': 19, + 'dump': _dump_nist256 + }, + formats.CURVE_ED25519: { + 'oid': b'\x2B\x06\x01\x04\x01\xDA\x47\x0F\x01', + 'algo_id': 22, + 'dump': _dump_ed25519 + } +} + + +def _find_curve_by_algo_id(algo_id): + curve_name, = [name for name, info in SUPPORTED_CURVES.items() + if info['algo_id'] == algo_id] + return curve_name + + +class Signer(object): + """Performs GPG operations with the TREZOR.""" + + def __init__(self, user_id, created, curve_name): + """Construct and loads a public key from the device.""" + self.user_id = user_id + assert curve_name in formats.SUPPORTED_CURVES + self.curve_name = curve_name + self.client_wrapper = factory.load() + + self.identity = self.client_wrapper.identity_type() + self.identity.proto = 'gpg' + self.identity.host = user_id + + addr = client.get_address(self.identity) + public_node = self.client_wrapper.connection.get_public_node( + n=addr, ecdsa_curve_name=self.curve_name) + + self.verifying_key = formats.decompress_pubkey( + pubkey=public_node.node.public_key, + curve_name=self.curve_name) + + self.created = int(created) + log.info('%s GPG public key %s created at %s', self.curve_name, + self.hex_short_key_id(), util.time_format(self.created)) + + @classmethod + def from_public_key(cls, pubkey, user_id): + """ + Create from an existing GPG public key. + + `pubkey` should be loaded via `load_from_gpg(user_id)` + from the local GPG keyring. + """ + s = Signer(user_id=user_id, + created=pubkey['created'], + curve_name=_find_curve_by_algo_id(pubkey['algo'])) + assert s.key_id() == pubkey['key_id'] + return s + + def _pubkey_data(self): + curve_info = SUPPORTED_CURVES[self.curve_name] + header = struct.pack('>BLB', + 4, # version + self.created, # creation + curve_info['algo_id']) + oid = util.prefix_len('>B', curve_info['oid']) + blob = curve_info['dump'](self.verifying_key) + return header + oid + blob + + def _pubkey_data_to_hash(self): + return b'\x99' + util.prefix_len('>H', self._pubkey_data()) + + def _fingerprint(self): + return hashlib.sha1(self._pubkey_data_to_hash()).digest() + + def key_id(self): + """Short (8 byte) GPG key ID.""" + return self._fingerprint()[-8:] + + def hex_short_key_id(self): + """Short (8 hexadecimal digits) GPG key ID.""" + return util.hexlify(self.key_id()[-4:]) + + def close(self): + """Close connection and turn off the screen of the device.""" + self.client_wrapper.connection.clear_session() + self.client_wrapper.connection.close() + + def export(self): + """Export GPG public key, ready for "gpg2 --import".""" + pubkey_packet = packet(tag=6, blob=self._pubkey_data()) + user_id_packet = packet(tag=13, blob=self.user_id) + + data_to_sign = (self._pubkey_data_to_hash() + + user_id_packet[:1] + + util.prefix_len('>L', self.user_id)) + log.info('signing public key "%s"', self.user_id) + hashed_subpackets = [ + subpacket_time(self.created), # signature creaion time + subpacket_byte(0x1B, 1 | 2), # key flags (certify & sign) + subpacket_byte(0x15, 8), # preferred hash (SHA256) + subpacket_byte(0x16, 0), # preferred compression (none) + subpacket_byte(0x17, 0x80)] # key server prefs (no-modify) + signature = self._make_signature(visual=self.hex_short_key_id(), + data_to_sign=data_to_sign, + sig_type=0x13, # user id & public key + hashed_subpackets=hashed_subpackets) + + sign_packet = packet(tag=2, blob=signature) + return pubkey_packet + user_id_packet + sign_packet + + def sign(self, msg, sign_time=None): + """Sign GPG message at specified time.""" + if sign_time is None: + sign_time = int(time.time()) + + log.info('signing %d byte message at %s', + len(msg), util.time_format(sign_time)) + hashed_subpackets = [subpacket_time(sign_time)] + blob = self._make_signature( + visual=self.hex_short_key_id(), + data_to_sign=msg, hashed_subpackets=hashed_subpackets) + return packet(tag=2, blob=blob) + + def _make_signature(self, visual, data_to_sign, + hashed_subpackets, sig_type=0): + curve_info = SUPPORTED_CURVES[self.curve_name] + header = struct.pack('>BBBB', + 4, # version + sig_type, # rfc4880 (section-5.2.1) + curve_info['algo_id'], + 8) # hash_alg (SHA256) + hashed = subpackets(*hashed_subpackets) + unhashed = subpackets( + subpacket(16, self.key_id()) # issuer key id + ) + tail = b'\x04\xff' + struct.pack('>L', len(header) + len(hashed)) + data_to_hash = data_to_sign + header + hashed + tail + + log.debug('hashing %d bytes', len(data_to_hash)) + digest = hashlib.sha256(data_to_hash).digest() + + result = self.client_wrapper.connection.sign_identity( + identity=self.identity, + challenge_hidden=digest, + challenge_visual=visual, + ecdsa_curve_name=self.curve_name) + assert result.signature[:1] == b'\x00' + sig = result.signature[1:] + sig = mpi(util.bytes2num(sig[:32])) + mpi(util.bytes2num(sig[32:])) + + return (header + hashed + unhashed + + digest[:2] + # used for decoder's sanity check + sig) # actual ECDSA signature + + +def _split_lines(body, size): + lines = [] + for i in range(0, len(body), size): + lines.append(body[i:i+size] + '\n') + return ''.join(lines) + + +def armor(blob, type_str): + """See https://tools.ietf.org/html/rfc4880#section-6 for details.""" + head = '-----BEGIN PGP {}-----\nVersion: GnuPG v2\n\n'.format(type_str) + body = base64.b64encode(blob) + checksum = base64.b64encode(util.crc24(blob)) + tail = '-----END PGP {}-----\n'.format(type_str) + return head + _split_lines(body, 64) + '=' + checksum + '\n' + tail diff --git a/trezor_agent/gpg/git_wrapper.py b/trezor_agent/gpg/git_wrapper.py new file mode 100755 index 0000000..33df87e --- /dev/null +++ b/trezor_agent/gpg/git_wrapper.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python +"""A simple wrapper for Git commit/tag GPG signing.""" +import logging +import subprocess as sp +import sys + +from . import decode, encode + +log = logging.getLogger(__name__) + + +def main(): + """Main function.""" + logging.basicConfig(level=logging.INFO, + format='%(asctime)s %(levelname)-10s %(message)s') + + log.debug('sys.argv: %s', sys.argv) + args = sys.argv[1:] + if '--verify' in args: + return sp.call(['gpg2'] + args) + else: + command = args[0] + user_id = ' '.join(args[1:]) + assert command == '-bsau' # --detach-sign --sign --armor --local-user + pubkey = decode.load_from_gpg(user_id) + s = encode.Signer.from_public_key(user_id=user_id, pubkey=pubkey) + + data = sys.stdin.read() + sig = s.sign(data) + sig = encode.armor(sig, 'SIGNATURE') + sys.stdout.write(sig) + s.close() + +if __name__ == '__main__': + main() diff --git a/trezor_agent/gpg/signer.py b/trezor_agent/gpg/signer.py new file mode 100755 index 0000000..0af2342 --- /dev/null +++ b/trezor_agent/gpg/signer.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python +"""Create signatures and export public keys for GPG using TREZOR.""" +import argparse +import logging +import time + +from . import check, decode, encode + +log = logging.getLogger(__name__) + + +def main(): + """Main function.""" + p = argparse.ArgumentParser() + p.add_argument('user_id') + p.add_argument('filename', nargs='?') + p.add_argument('-t', '--time', type=int, default=int(time.time())) + p.add_argument('-a', '--armor', action='store_true', default=False) + p.add_argument('-v', '--verbose', action='store_true', default=False) + p.add_argument('-e', '--ecdsa-curve', default='nist256p1') + + args = p.parse_args() + logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO, + format='%(asctime)s %(levelname)-10s %(message)s') + user_id = args.user_id.encode('ascii') + if not args.filename: + s = encode.Signer(user_id=user_id, created=args.time, + curve_name=args.ecdsa_curve) + pubkey = s.export() + ext = '.pub' + if args.armor: + pubkey = encode.armor(pubkey, 'PUBLIC KEY BLOCK') + ext = '.asc' + filename = s.hex_short_key_id() + ext + open(filename, 'wb').write(pubkey) + log.info('import to local keyring using "gpg2 --import %s"', filename) + else: + pubkey = decode.load_from_gpg(user_id) + s = encode.Signer.from_public_key(pubkey=pubkey, user_id=user_id) + data = open(args.filename, 'rb').read() + sig, ext = s.sign(data), '.sig' + if args.armor: + sig = encode.armor(sig, 'SIGNATURE') + ext = '.asc' + filename = args.filename + ext + open(filename, 'wb').write(sig) + check.verify(pubkey=pubkey, sig_file=filename) + + s.close() + + +if __name__ == '__main__': + main() diff --git a/trezor_agent/util.py b/trezor_agent/util.py index 8e0e29b..89c3170 100644 --- a/trezor_agent/util.py +++ b/trezor_agent/util.py @@ -1,6 +1,9 @@ """Various I/O and serialization utilities.""" +import binascii +import contextlib import io import struct +import time def send(conn, data): @@ -60,7 +63,7 @@ def num2bytes(value, size): res.append(value & 0xFF) value = value >> 8 assert value == 0 - return bytearray(list(reversed(res))) + return bytes(bytearray(list(reversed(res)))) def pack(fmt, *args): @@ -75,3 +78,102 @@ def frame(*msgs): res.write(msg) msg = res.getvalue() return pack('L', len(msg)) + msg + + +def crc24(blob): + """See https://tools.ietf.org/html/rfc4880#section-6.1 for details.""" + CRC24_INIT = 0x0B704CE + CRC24_POLY = 0x1864CFB + + crc = CRC24_INIT + for octet in bytearray(blob): + crc ^= (octet << 16) + for _ in range(8): + crc <<= 1 + if crc & 0x1000000: + crc ^= CRC24_POLY + assert 0 <= crc < 0x1000000 + crc_bytes = struct.pack('>L', crc) + assert crc_bytes[0] == b'\x00' + return crc_bytes[1:] + + +def bit(value, i): + """Extract the i-th bit out of value.""" + return 1 if value & (1 << i) else 0 + + +def low_bits(value, n): + """Extract the lowest n bits out of value.""" + return value & ((1 << n) - 1) + + +def split_bits(value, *bits): + """ + Split integer value into list of ints, according to `bits` list. + + For example, split_bits(0x1234, 4, 8, 4) == [0x1, 0x23, 0x4] + """ + result = [] + for b in reversed(bits): + mask = (1 << b) - 1 + result.append(value & mask) + value = value >> b + assert value == 0 + return reversed(result) + + +def readfmt(stream, fmt): + """Read and unpack an object from stream, using a struct format string.""" + size = struct.calcsize(fmt) + blob = stream.read(size) + return struct.unpack(fmt, blob) + + +def prefix_len(fmt, blob): + """Prefix `blob` with its size, serialized using `fmt` format.""" + return struct.pack(fmt, len(blob)) + blob + + +def time_format(t): + """Utility for consistent time formatting.""" + return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(t)) + + +def hexlify(blob): + """Utility for consistent hexadecimal formatting.""" + return binascii.hexlify(blob).decode('ascii').upper() + + +class Reader(object): + """Read basic type objects out of given stream.""" + + def __init__(self, stream): + """Create a non-capturing reader.""" + self.s = stream + self._captured = None + + def readfmt(self, fmt): + """Read a specified object, using a struct format string.""" + size = struct.calcsize(fmt) + blob = self.read(size) + obj, = struct.unpack(fmt, blob) + return obj + + def read(self, size=None): + """Read `size` bytes from stream.""" + blob = self.s.read(size) + if size is not None and len(blob) < size: + raise EOFError + if self._captured: + self._captured.write(blob) + return blob + + @contextlib.contextmanager + def capture(self, stream): + """Capture all data read during this context.""" + self._captured = stream + try: + yield + finally: + self._captured = None