diff --git a/trezor_agent/gpg/encode.py b/trezor_agent/gpg/encode.py index 1621223..b3d1d79 100644 --- a/trezor_agent/gpg/encode.py +++ b/trezor_agent/gpg/encode.py @@ -56,11 +56,6 @@ class AgentSigner(object): """Connect to the agent and retrieve required public key.""" self.sock = agent.connect() self.keygrip = agent.get_keygrip(user_id) - self.public_key = decode.load_from_gpg(user_id) - - def pubkey(self): - """Return public key as VerifyingKey object.""" - return self.public_key['verifying_key'] def sign(self, digest): """Sign the digest and return an ECDSA signature.""" @@ -77,7 +72,7 @@ def _time_format(t): return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(t)) -class Signer(object): +class Factory(object): """Performs GPG signing operations.""" def __init__(self, user_id, created, curve_name): @@ -102,9 +97,9 @@ class Signer(object): `pubkey` should be loaded via `decode.load_from_gpg(user_id)` from the local GPG keyring. """ - s = Signer(user_id=user_id, - created=pubkey['created'], - curve_name=proto.find_curve_by_algo_id(pubkey['algo'])) + s = cls(user_id=user_id, + created=pubkey['created'], + curve_name=proto.find_curve_by_algo_id(pubkey['algo'])) assert s.pubkey.key_id() == pubkey['key_id'] return s @@ -112,8 +107,8 @@ class Signer(object): """Close connection and turn off the screen of the device.""" self.conn.close() - def export(self): - """Export GPG public key, ready for "gpg2 --import".""" + def create_primary(self): + """Export new primary GPG public key, ready for "gpg2 --import".""" pubkey_packet = proto.packet(tag=6, blob=self.pubkey.data()) user_id_packet = proto.packet(tag=13, blob=self.user_id) @@ -137,7 +132,7 @@ class Signer(object): proto.subpacket(16, self.pubkey.key_id()), # issuer key id proto.CUSTOM_SUBPACKET] - signature = _make_signature( + signature = proto.make_signature( signer_func=self.conn.sign, public_algo=self.pubkey.algo_id, data_to_sign=data_to_sign, @@ -148,8 +143,8 @@ class Signer(object): sign_packet = proto.packet(tag=2, blob=signature) return pubkey_packet + user_id_packet + sign_packet - def subkey(self): - """Export a subkey to `self.user_id` GPG primary key.""" + def create_subkey(self): + """Export new subkey to `self.user_id` GPG primary key.""" subkey_packet = proto.packet(tag=14, blob=self.pubkey.data()) primary = decode.load_from_gpg(self.user_id) log.info('adding subkey to primary GPG key "%s" (%s)', @@ -162,12 +157,13 @@ class Signer(object): unhashed_subpackets = [ proto.subpacket(16, self.pubkey.key_id())] # issuer key id log.info('confirm signing subkey with hardware device') - embedded_sig = _make_signature(signer_func=self.conn.sign, - data_to_sign=data_to_sign, - public_algo=self.pubkey.algo_id, - sig_type=0x19, - hashed_subpackets=hashed_subpackets, - unhashed_subpackets=unhashed_subpackets) + embedded_sig = proto.make_signature( + signer_func=self.conn.sign, + data_to_sign=data_to_sign, + public_algo=self.pubkey.algo_id, + sig_type=0x19, + hashed_subpackets=hashed_subpackets, + unhashed_subpackets=unhashed_subpackets) # Subkey Binding Signature hashed_subpackets = [ @@ -179,16 +175,17 @@ class Signer(object): proto.CUSTOM_SUBPACKET] log.info('confirm signing subkey with gpg-agent') gpg_agent = AgentSigner(self.user_id) - signature = _make_signature(signer_func=gpg_agent.sign, - data_to_sign=data_to_sign, - public_algo=primary['algo'], - sig_type=0x18, - hashed_subpackets=hashed_subpackets, - unhashed_subpackets=unhashed_subpackets) + signature = proto.make_signature( + signer_func=gpg_agent.sign, + data_to_sign=data_to_sign, + public_algo=primary['algo'], + sig_type=0x18, + hashed_subpackets=hashed_subpackets, + unhashed_subpackets=unhashed_subpackets) sign_packet = proto.packet(tag=2, blob=signature) return subkey_packet + sign_packet - def sign(self, msg, sign_time=None): + def sign_message(self, msg, sign_time=None): """Sign GPG message at specified time.""" if sign_time is None: sign_time = int(time.time()) @@ -199,32 +196,10 @@ class Signer(object): unhashed_subpackets = [ proto.subpacket(16, self.pubkey.key_id())] # issuer key id - blob = _make_signature(signer_func=self.conn.sign, - data_to_sign=msg, - public_algo=self.pubkey.algo_id, - hashed_subpackets=hashed_subpackets, - unhashed_subpackets=unhashed_subpackets) + blob = proto.make_signature( + signer_func=self.conn.sign, + data_to_sign=msg, + public_algo=self.pubkey.algo_id, + hashed_subpackets=hashed_subpackets, + unhashed_subpackets=unhashed_subpackets) return proto.packet(tag=2, blob=blob) - - -def _make_signature(signer_func, data_to_sign, public_algo, - hashed_subpackets, unhashed_subpackets, sig_type=0): - # pylint: disable=too-many-arguments - header = struct.pack('>BBBB', - 4, # version - sig_type, # rfc4880 (section-5.2.1) - public_algo, - 8) # hash_alg (SHA256) - hashed = proto.subpackets(*hashed_subpackets) - unhashed = proto.subpackets(*unhashed_subpackets) - tail = b'\x04\xff' + struct.pack('>L', len(header) + len(hashed)) - data_to_hash = data_to_sign + header + hashed + tail - - log.debug('hashing %d bytes', len(data_to_hash)) - digest = hashlib.sha256(data_to_hash).digest() - log.info('signing digest: %s', util.hexlify(digest)) - sig = signer_func(digest=digest) - - return bytes(header + hashed + unhashed + - digest[:2] + # used for decoder's sanity check - sig) # actual ECDSA signature diff --git a/trezor_agent/gpg/proto.py b/trezor_agent/gpg/proto.py index c9a7b21..f703a50 100644 --- a/trezor_agent/gpg/proto.py +++ b/trezor_agent/gpg/proto.py @@ -154,3 +154,27 @@ def armor(blob, type_str): checksum = base64.b64encode(util.crc24(blob)) tail = '-----END PGP {}-----\n'.format(type_str) return head + _split_lines(body, 64) + '=' + checksum + '\n' + tail + + +def make_signature(signer_func, data_to_sign, public_algo, + hashed_subpackets, unhashed_subpackets, sig_type=0): + """Create new GPG signature.""" + # pylint: disable=too-many-arguments + header = struct.pack('>BBBB', + 4, # version + sig_type, # rfc4880 (section-5.2.1) + public_algo, + 8) # hash_alg (SHA256) + hashed = subpackets(*hashed_subpackets) + unhashed = subpackets(*unhashed_subpackets) + tail = b'\x04\xff' + struct.pack('>L', len(header) + len(hashed)) + data_to_hash = data_to_sign + header + hashed + tail + + log.debug('hashing %d bytes', len(data_to_hash)) + digest = hashlib.sha256(data_to_hash).digest() + log.info('signing digest: %s', util.hexlify(digest)) + sig = signer_func(digest=digest) + + return bytes(header + hashed + unhashed + + digest[:2] + # used for decoder's sanity check + sig) # actual ECDSA signature diff --git a/trezor_agent/gpg/signer.py b/trezor_agent/gpg/signer.py index 1c1f608..397bff0 100755 --- a/trezor_agent/gpg/signer.py +++ b/trezor_agent/gpg/signer.py @@ -16,16 +16,16 @@ log = logging.getLogger(__name__) def run_create(args): """Generate a new pubkey for a new/existing GPG identity.""" user_id = os.environ['TREZOR_GPG_USER_ID'] - s = encode.Signer(user_id=user_id, created=args.time, + f = encode.Factory(user_id=user_id, created=args.time, curve_name=args.ecdsa_curve) - with contextlib.closing(s): + with contextlib.closing(f): if args.subkey: - subkey = s.subkey() + subkey = f.create_subkey() primary = sp.check_output(['gpg2', '--export', user_id]) result = primary + subkey else: - result = s.export() + result = f.create_primary() sys.stdout.write(proto.armor(result, 'PUBLIC KEY BLOCK')) @@ -33,13 +33,13 @@ def run_create(args): def run_sign(args): """Generate a GPG signature using hardware-based device.""" pubkey = decode.load_from_gpg(user_id=None, use_custom=True) - s = encode.Signer.from_public_key(pubkey=pubkey, user_id=pubkey['user_id']) - with contextlib.closing(s): + f = encode.Factory.from_public_key(pubkey=pubkey, user_id=pubkey['user_id']) + with contextlib.closing(f): if args.filename: data = open(args.filename, 'rb').read() else: data = sys.stdin.read() - sig = s.sign(data) + sig = f.sign_message(data) sig = proto.armor(sig, 'SIGNATURE') decode.verify(pubkey=pubkey, signature=sig, original_data=data)