From 5047805385a3d545ada7989a1930e1c256c208c0 Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Fri, 14 Oct 2016 10:33:00 +0300 Subject: [PATCH] gpg: move HardwareSigner to device module --- trezor_agent/gpg/__main__.py | 4 +-- trezor_agent/gpg/device.py | 50 +++++++++++++++++++++++++++++++++ trezor_agent/gpg/encode.py | 54 ++---------------------------------- 3 files changed, 55 insertions(+), 53 deletions(-) create mode 100644 trezor_agent/gpg/device.py diff --git a/trezor_agent/gpg/__main__.py b/trezor_agent/gpg/__main__.py index 6dbee00..6c176a9 100755 --- a/trezor_agent/gpg/__main__.py +++ b/trezor_agent/gpg/__main__.py @@ -7,7 +7,7 @@ import os import sys import time -from . import agent, encode, keyring, protocol +from . import agent, device, encode, keyring, protocol from .. import server log = logging.getLogger(__name__) @@ -19,7 +19,7 @@ def run_create(args): log.warning('NOTE: in order to re-generate the exact same GPG key later, ' 'run this command with "--time=%d" commandline flag (to set ' 'the timestamp of the GPG key manually).', args.time) - conn = encode.HardwareSigner(user_id=user_id, + conn = device.HardwareSigner(user_id=user_id, curve_name=args.ecdsa_curve) verifying_key = conn.pubkey(ecdh=False) decryption_key = conn.pubkey(ecdh=True) diff --git a/trezor_agent/gpg/device.py b/trezor_agent/gpg/device.py new file mode 100644 index 0000000..a81a27b --- /dev/null +++ b/trezor_agent/gpg/device.py @@ -0,0 +1,50 @@ +"""Device abstraction layer for GPG operations.""" + +from .. import factory, formats, util + + +class HardwareSigner(object): + """Sign messages and get public keys from a hardware device.""" + + def __init__(self, user_id, curve_name): + """Connect to the device and retrieve required public key.""" + self.client_wrapper = factory.load() + self.identity = self.client_wrapper.identity_type() + self.identity.proto = 'gpg' + self.identity.host = user_id + self.curve_name = curve_name + + def pubkey(self, ecdh=False): + """Return public key as VerifyingKey object.""" + addr = util.get_bip32_address(identity=self.identity, ecdh=ecdh) + public_node = self.client_wrapper.connection.get_public_node( + n=addr, ecdsa_curve_name=self.curve_name) + + return formats.decompress_pubkey( + pubkey=public_node.node.public_key, + curve_name=self.curve_name) + + def sign(self, digest): + """Sign the digest and return a serialized signature.""" + result = self.client_wrapper.connection.sign_identity( + identity=self.identity, + challenge_hidden=digest, + challenge_visual='', + ecdsa_curve_name=self.curve_name) + assert result.signature[:1] == b'\x00' + sig = result.signature[1:] + return (util.bytes2num(sig[:32]), util.bytes2num(sig[32:])) + + def ecdh(self, pubkey): + """Derive shared secret using ECDH from remote public key.""" + result = self.client_wrapper.connection.get_ecdh_session_key( + identity=self.identity, + peer_public_key=pubkey, + ecdsa_curve_name=self.curve_name) + assert len(result.session_key) == 65 + assert result.session_key[:1] == b'\x04' + return result.session_key + + def close(self): + """Close the connection to the device.""" + self.client_wrapper.connection.close() diff --git a/trezor_agent/gpg/encode.py b/trezor_agent/gpg/encode.py index 0513472..bc15873 100644 --- a/trezor_agent/gpg/encode.py +++ b/trezor_agent/gpg/encode.py @@ -2,59 +2,11 @@ import logging import time -from . import decode, keyring, protocol -from .. import factory, formats, util +from . import decode, device, keyring, protocol +from .. import formats, util log = logging.getLogger(__name__) - -class HardwareSigner(object): - """Sign messages and get public keys from a hardware device.""" - - def __init__(self, user_id, curve_name): - """Connect to the device and retrieve required public key.""" - self.client_wrapper = factory.load() - self.identity = self.client_wrapper.identity_type() - self.identity.proto = 'gpg' - self.identity.host = user_id - self.curve_name = curve_name - - def pubkey(self, ecdh=False): - """Return public key as VerifyingKey object.""" - addr = util.get_bip32_address(identity=self.identity, ecdh=ecdh) - public_node = self.client_wrapper.connection.get_public_node( - n=addr, ecdsa_curve_name=self.curve_name) - - return formats.decompress_pubkey( - pubkey=public_node.node.public_key, - curve_name=self.curve_name) - - def sign(self, digest): - """Sign the digest and return a serialized signature.""" - result = self.client_wrapper.connection.sign_identity( - identity=self.identity, - challenge_hidden=digest, - challenge_visual='', - ecdsa_curve_name=self.curve_name) - assert result.signature[:1] == b'\x00' - sig = result.signature[1:] - return (util.bytes2num(sig[:32]), util.bytes2num(sig[32:])) - - def ecdh(self, pubkey): - """Derive shared secret using ECDH from remote public key.""" - result = self.client_wrapper.connection.get_ecdh_session_key( - identity=self.identity, - peer_public_key=pubkey, - ecdsa_curve_name=self.curve_name) - assert len(result.session_key) == 65 - assert result.session_key[:1] == b'\x04' - return result.session_key - - def close(self): - """Close the connection to the device.""" - self.client_wrapper.connection.close() - - def _time_format(t): return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(t)) @@ -163,7 +115,7 @@ def load_from_public_key(pubkey_dict): assert curve_name in formats.SUPPORTED_CURVES ecdh = (pubkey_dict['algo'] == protocol.ECDH_ALGO_ID) - conn = HardwareSigner(user_id, curve_name=curve_name) + conn = device.HardwareSigner(user_id, curve_name=curve_name) pubkey = protocol.PublicKey( curve_name=curve_name, created=created, verifying_key=conn.pubkey(ecdh=ecdh), ecdh=ecdh)