diff --git a/trezor_agent/formats.py b/trezor_agent/formats.py index de8ec82..af23da0 100644 --- a/trezor_agent/formats.py +++ b/trezor_agent/formats.py @@ -11,11 +11,15 @@ from . import util log = logging.getLogger(__name__) -# Supported ECDSA curves +# Supported ECDSA curves (for SSH and GPG) CURVE_NIST256 = 'nist256p1' CURVE_ED25519 = 'ed25519' SUPPORTED_CURVES = {CURVE_NIST256, CURVE_ED25519} +# Supported ECDH curves (for GPG) +ECDH_NIST256 = 'nist256p1' +ECDH_CURVE25519 = 'curve25519' + # SSH key types SSH_NIST256_DER_OCTET = b'\x04' SSH_NIST256_KEY_PREFIX = b'ecdsa-sha2-' @@ -134,7 +138,8 @@ def decompress_pubkey(pubkey, curve_name): if len(pubkey) == 33: decompress = { CURVE_NIST256: _decompress_nist256, - CURVE_ED25519: _decompress_ed25519 + CURVE_ED25519: _decompress_ed25519, + ECDH_CURVE25519: _decompress_ed25519, }[curve_name] vk = decompress(pubkey) @@ -192,3 +197,12 @@ def import_public_key(line): assert result['type'] == file_type.encode('ascii') log.debug('loaded %s public key: %s', file_type, result['fingerprint']) return result + + +def get_ecdh_curve_name(signature_curve_name): + """Return appropriate curve for ECDH for specified signing curve.""" + return { + CURVE_NIST256: ECDH_NIST256, + CURVE_ED25519: ECDH_CURVE25519, + ECDH_CURVE25519: ECDH_CURVE25519, + }[signature_curve_name] diff --git a/trezor_agent/gpg/__main__.py b/trezor_agent/gpg/__main__.py index 6dbee00..648f7be 100755 --- a/trezor_agent/gpg/__main__.py +++ b/trezor_agent/gpg/__main__.py @@ -7,8 +7,8 @@ import os import sys import time -from . import agent, encode, keyring, protocol -from .. import server +from . import agent, device, encode, keyring, protocol +from .. import formats, server log = logging.getLogger(__name__) @@ -19,7 +19,7 @@ def run_create(args): log.warning('NOTE: in order to re-generate the exact same GPG key later, ' 'run this command with "--time=%d" commandline flag (to set ' 'the timestamp of the GPG key manually).', args.time) - conn = encode.HardwareSigner(user_id=user_id, + conn = device.HardwareSigner(user_id=user_id, curve_name=args.ecdsa_curve) verifying_key = conn.pubkey(ecdh=False) decryption_key = conn.pubkey(ecdh=True) @@ -32,8 +32,8 @@ def run_create(args): verifying_key=verifying_key, ecdh=False) # subkey for encryption encryption_key = protocol.PublicKey( - curve_name=args.ecdsa_curve, created=args.time, - verifying_key=decryption_key, ecdh=True) + curve_name=formats.get_ecdh_curve_name(args.ecdsa_curve), + created=args.time, verifying_key=decryption_key, ecdh=True) result = encode.create_subkey(primary_bytes=primary_bytes, pubkey=signing_key, signer_func=conn.sign) @@ -47,8 +47,8 @@ def run_create(args): verifying_key=verifying_key, ecdh=False) # subkey for encryption subkey = protocol.PublicKey( - curve_name=args.ecdsa_curve, created=args.time, - verifying_key=decryption_key, ecdh=True) + curve_name=formats.get_ecdh_curve_name(args.ecdsa_curve), + created=args.time, verifying_key=decryption_key, ecdh=True) result = encode.create_primary(user_id=user_id, pubkey=primary, diff --git a/trezor_agent/gpg/agent.py b/trezor_agent/gpg/agent.py index 89e11de..8eae332 100644 --- a/trezor_agent/gpg/agent.py +++ b/trezor_agent/gpg/agent.py @@ -92,11 +92,7 @@ def pkdecrypt(keygrip, conn): pubkey, conn = encode.load_from_public_key(pubkey_dict=local_pubkey) with contextlib.closing(conn): assert pubkey.keygrip == binascii.unhexlify(keygrip) - shared_secret = conn.ecdh(remote_pubkey) - - assert len(shared_secret) == 65 - assert shared_secret[:1] == b'\x04' - return _serialize_point(shared_secret) + return _serialize_point(conn.ecdh(remote_pubkey)) def handle_connection(conn): diff --git a/trezor_agent/gpg/decode.py b/trezor_agent/gpg/decode.py index ded5c98..3cc7034 100644 --- a/trezor_agent/gpg/decode.py +++ b/trezor_agent/gpg/decode.py @@ -83,9 +83,15 @@ def _parse_ed25519_verifier(mpi): return _ed25519_verify, vk +def _parse_curve25519_verifier(_): + log.warning('Curve25519 ECDH is not verified') + return None, None + + SUPPORTED_CURVES = { b'\x2A\x86\x48\xCE\x3D\x03\x01\x07': _parse_nist256p1_verifier, b'\x2B\x06\x01\x04\x01\xDA\x47\x0F\x01': _parse_ed25519_verifier, + b'\x2B\x06\x01\x04\x01\x97\x55\x01\x05\x01': _parse_curve25519_verifier, } RSA_ALGO_IDS = {1, 2, 3} @@ -168,6 +174,7 @@ def _parse_pubkey(stream, packet_type='pubkey'): oid_size = stream.readfmt('B') oid = stream.read(oid_size) assert oid in SUPPORTED_CURVES, util.hexlify(oid) + p['curve_oid'] = oid parser = SUPPORTED_CURVES[oid] mpi = parse_mpi(stream) diff --git a/trezor_agent/gpg/device.py b/trezor_agent/gpg/device.py new file mode 100644 index 0000000..01d3076 --- /dev/null +++ b/trezor_agent/gpg/device.py @@ -0,0 +1,54 @@ +"""Device abstraction layer for GPG operations.""" + +from .. import factory, formats, util + + +class HardwareSigner(object): + """Sign messages and get public keys from a hardware device.""" + + def __init__(self, user_id, curve_name): + """Connect to the device and retrieve required public key.""" + self.client_wrapper = factory.load() + self.identity = self.client_wrapper.identity_type() + self.identity.proto = 'gpg' + self.identity.host = user_id + self.curve_name = curve_name + + def pubkey(self, ecdh=False): + """Return public key as VerifyingKey object.""" + addr = util.get_bip32_address(identity=self.identity, ecdh=ecdh) + if ecdh: + curve_name = formats.get_ecdh_curve_name(self.curve_name) + else: + curve_name = self.curve_name + public_node = self.client_wrapper.connection.get_public_node( + n=addr, ecdsa_curve_name=curve_name) + + return formats.decompress_pubkey( + pubkey=public_node.node.public_key, + curve_name=curve_name) + + def sign(self, digest): + """Sign the digest and return a serialized signature.""" + result = self.client_wrapper.connection.sign_identity( + identity=self.identity, + challenge_hidden=digest, + challenge_visual='', + ecdsa_curve_name=self.curve_name) + assert result.signature[:1] == b'\x00' + sig = result.signature[1:] + return (util.bytes2num(sig[:32]), util.bytes2num(sig[32:])) + + def ecdh(self, pubkey): + """Derive shared secret using ECDH from remote public key.""" + result = self.client_wrapper.connection.get_ecdh_session_key( + identity=self.identity, + peer_public_key=pubkey, + ecdsa_curve_name=formats.get_ecdh_curve_name(self.curve_name)) + assert len(result.session_key) in {65, 33} # NIST256 or Curve25519 + assert result.session_key[:1] == b'\x04' + return result.session_key + + def close(self): + """Close the connection to the device.""" + self.client_wrapper.connection.close() diff --git a/trezor_agent/gpg/encode.py b/trezor_agent/gpg/encode.py index 43e10eb..46a6aea 100644 --- a/trezor_agent/gpg/encode.py +++ b/trezor_agent/gpg/encode.py @@ -2,77 +2,12 @@ import logging import time -from . import decode, keyring, protocol -from .. import factory, formats, util +from . import decode, device, keyring, protocol +from .. import util log = logging.getLogger(__name__) -class HardwareSigner(object): - """Sign messages and get public keys from a hardware device.""" - - def __init__(self, user_id, curve_name): - """Connect to the device and retrieve required public key.""" - self.client_wrapper = factory.load() - self.identity = self.client_wrapper.identity_type() - self.identity.proto = 'gpg' - self.identity.host = user_id - self.curve_name = curve_name - - def pubkey(self, ecdh=False): - """Return public key as VerifyingKey object.""" - addr = util.get_bip32_address(identity=self.identity, ecdh=ecdh) - public_node = self.client_wrapper.connection.get_public_node( - n=addr, ecdsa_curve_name=self.curve_name) - - return formats.decompress_pubkey( - pubkey=public_node.node.public_key, - curve_name=self.curve_name) - - def sign(self, digest): - """Sign the digest and return a serialized signature.""" - result = self.client_wrapper.connection.sign_identity( - identity=self.identity, - challenge_hidden=digest, - challenge_visual='', - ecdsa_curve_name=self.curve_name) - assert result.signature[:1] == b'\x00' - sig = result.signature[1:] - return (util.bytes2num(sig[:32]), util.bytes2num(sig[32:])) - - def ecdh(self, pubkey): - """Derive shared secret using ECDH from remote public key.""" - result = self.client_wrapper.connection.get_ecdh_session_key( - identity=self.identity, - peer_public_key=pubkey, - ecdsa_curve_name=self.curve_name) - assert len(result.session_key) == 65 - assert result.session_key[:1] == b'\x04' - return result.session_key - - def close(self): - """Close the connection to the device.""" - self.client_wrapper.connection.close() - - -class AgentSigner(object): - """Sign messages and get public keys using gpg-agent tool.""" - - def __init__(self, user_id): - """Connect to the agent and retrieve required public key.""" - self.sock = keyring.connect_to_agent() - self.keygrip = keyring.get_keygrip(user_id) - - def sign(self, digest): - """Sign the digest and return an ECDSA/RSA/DSA signature.""" - return keyring.sign_digest(sock=self.sock, - keygrip=self.keygrip, digest=digest) - - def close(self): - """Close the connection to gpg-agent.""" - self.sock.close() - - def _time_format(t): return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(t)) @@ -160,7 +95,7 @@ def create_subkey(primary_bytes, pubkey, signer_func): log.info('confirm signing with primary key') if not primary['_is_custom']: - signer_func = AgentSigner(primary['user_id']).sign + signer_func = keyring.create_agent_signer(primary['user_id']) signature = protocol.make_signature( signer_func=signer_func, @@ -175,13 +110,13 @@ def create_subkey(primary_bytes, pubkey, signer_func): def load_from_public_key(pubkey_dict): """Load correct public key from the device.""" + log.debug('pubkey_dict: %s', pubkey_dict) user_id = pubkey_dict['user_id'] created = pubkey_dict['created'] - curve_name = protocol.find_curve_by_algo_id(pubkey_dict['algo']) - assert curve_name in formats.SUPPORTED_CURVES + curve_name = protocol.get_curve_name_by_oid(pubkey_dict['curve_oid']) ecdh = (pubkey_dict['algo'] == protocol.ECDH_ALGO_ID) - conn = HardwareSigner(user_id, curve_name=curve_name) + conn = device.HardwareSigner(user_id, curve_name=curve_name) pubkey = protocol.PublicKey( curve_name=curve_name, created=created, verifying_key=conn.pubkey(ecdh=ecdh), ecdh=ecdh) diff --git a/trezor_agent/gpg/keyring.py b/trezor_agent/gpg/keyring.py index caf9cdf..e515a33 100644 --- a/trezor_agent/gpg/keyring.py +++ b/trezor_agent/gpg/keyring.py @@ -204,3 +204,15 @@ def export_public_key(user_id, sp=subprocess): log.error('could not find public key %r in local GPG keyring', user_id) raise KeyError(user_id) return result + + +def create_agent_signer(user_id): + """Sign digest with existing GPG keys using gpg-agent tool.""" + sock = connect_to_agent() + keygrip = get_keygrip(user_id) + + def sign(digest): + """Sign the digest and return an ECDSA/RSA/DSA signature.""" + return sign_digest(sock=sock, keygrip=keygrip, digest=digest) + + return sign diff --git a/trezor_agent/gpg/protocol.py b/trezor_agent/gpg/protocol.py index b4d4e01..74afae4 100644 --- a/trezor_agent/gpg/protocol.py +++ b/trezor_agent/gpg/protocol.py @@ -128,6 +128,18 @@ def _keygrip_ed25519(vk): ]) +def _keygrip_curve25519(vk): + # pylint: disable=line-too-long + return _compute_keygrip([ + ['p', util.num2bytes(0x7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFED, size=32)], # nopep8 + ['a', b'\x01\xDB\x41'], + ['b', b'\x01'], + ['g', util.num2bytes(0x04000000000000000000000000000000000000000000000000000000000000000920ae19a1b8a086b4e01edd2c7748d14c923d4d7e6d7c61b229e9c5a27eced3d9, size=65)], # nopep8 + ['n', util.num2bytes(0x1000000000000000000000000000000014DEF9DEA2F79CD65812631A5CF5D3ED, size=32)], # nopep8 + ['q', vk.to_bytes()], + ]) + + SUPPORTED_CURVES = { formats.CURVE_NIST256: { # https://tools.ietf.org/html/rfc6637#section-11 @@ -141,7 +153,13 @@ SUPPORTED_CURVES = { 'algo_id': 22, 'serialize': _serialize_ed25519, 'keygrip': _keygrip_ed25519, - } + }, + formats.ECDH_CURVE25519: { + 'oid': b'\x2B\x06\x01\x04\x01\x97\x55\x01\x05\x01', + 'algo_id': 18, + 'serialize': _serialize_ed25519, + 'keygrip': _keygrip_curve25519, + }, } ECDH_ALGO_ID = 18 @@ -149,14 +167,12 @@ ECDH_ALGO_ID = 18 CUSTOM_SUBPACKET = subpacket(100, b'TREZOR-GPG') # marks "our" pubkey -def find_curve_by_algo_id(algo_id): - """Find curve name that matches a public key algorith ID.""" - if algo_id == ECDH_ALGO_ID: - return formats.CURVE_NIST256 - - curve_name, = [name for name, info in SUPPORTED_CURVES.items() - if info['algo_id'] == algo_id] - return curve_name +def get_curve_name_by_oid(oid): + """Return curve name matching specified OID, or raise KeyError.""" + for curve_name, info in SUPPORTED_CURVES.items(): + if info['oid'] == oid: + return curve_name + raise KeyError('Unknown OID: {!r}'.format(oid)) class PublicKey(object): @@ -167,7 +183,7 @@ class PublicKey(object): self.curve_info = SUPPORTED_CURVES[curve_name] self.created = int(created) # time since Epoch self.verifying_key = verifying_key - self.ecdh = ecdh + self.ecdh = bool(ecdh) if ecdh: self.algo_id = ECDH_ALGO_ID self.ecdh_packet = b'\x03\x01\x08\x07' diff --git a/trezor_agent/gpg/tests/test_protocol.py b/trezor_agent/gpg/tests/test_protocol.py index b649dd9..2c537b5 100644 --- a/trezor_agent/gpg/tests/test_protocol.py +++ b/trezor_agent/gpg/tests/test_protocol.py @@ -30,11 +30,6 @@ def test_mpi(): assert protocol.mpi(0x123) == b'\x00\x09\x01\x23' -def test_find(): - assert protocol.find_curve_by_algo_id(19) == formats.CURVE_NIST256 - assert protocol.find_curve_by_algo_id(22) == formats.CURVE_ED25519 - - def test_armor(): data = bytearray(range(256)) assert protocol.armor(data, 'TEST') == '''-----BEGIN PGP TEST-----