diff --git a/trezor_agent/__main__.py b/trezor_agent/__main__.py index a56264b..6fae2c3 100644 --- a/trezor_agent/__main__.py +++ b/trezor_agent/__main__.py @@ -7,14 +7,14 @@ import re import subprocess import sys -from . import client, formats, protocol, server +from . import client, formats, protocol, server, util log = logging.getLogger(__name__) def ssh_args(label): """Create SSH command for connecting specified server.""" - identity = client.string_to_identity(label, identity_type=dict) + identity = util.string_to_identity(label, identity_type=dict) args = [] if 'port' in identity: diff --git a/trezor_agent/client.py b/trezor_agent/client.py index eea90a7..e8d2e35 100644 --- a/trezor_agent/client.py +++ b/trezor_agent/client.py @@ -6,8 +6,6 @@ It is used for getting SSH public keys and ECDSA signing of server requests. import binascii import io import logging -import re -import struct from . import factory, formats, util @@ -40,7 +38,7 @@ class Client(object): def get_identity(self, label, index=0): """Parse label string into Identity protobuf.""" - identity = string_to_identity(label, self.identity_type) + identity = util.string_to_identity(label, self.identity_type) identity.proto = 'ssh' identity.index = index return identity @@ -48,10 +46,10 @@ class Client(object): def get_public_key(self, label): """Get SSH public key corresponding to specified by label.""" identity = self.get_identity(label=label) - label = identity_to_string(identity) # canonize key label + label = util.identity_to_string(identity) # canonize key label log.info('getting "%s" public key (%s) from %s...', label, self.curve, self.device_name) - addr = get_address(identity) + addr = util.get_bip32_address(identity) node = self.client.get_public_node(n=addr, ecdsa_curve_name=self.curve) @@ -93,55 +91,6 @@ class Client(object): return result.signature[1:] -_identity_regexp = re.compile(''.join([ - '^' - r'(?:(?P.*)://)?', - r'(?:(?P.*)@)?', - r'(?P.*?)', - r'(?::(?P\w*))?', - r'(?P/.*)?', - '$' -])) - - -def string_to_identity(s, identity_type): - """Parse string into Identity protobuf.""" - m = _identity_regexp.match(s) - result = m.groupdict() - log.debug('parsed identity: %s', result) - kwargs = {k: v for k, v in result.items() if v} - return identity_type(**kwargs) - - -def identity_to_string(identity): - """Dump Identity protobuf into its string representation.""" - result = [] - if identity.proto: - result.append(identity.proto + '://') - if identity.user: - result.append(identity.user + '@') - result.append(identity.host) - if identity.port: - result.append(':' + identity.port) - if identity.path: - result.append(identity.path) - return ''.join(result) - - -def get_address(identity, ecdh=False): - """Compute BIP32 derivation address according to SLIP-0013/0017.""" - index = struct.pack('.*)://)?', + r'(?:(?P.*)@)?', + r'(?P.*?)', + r'(?::(?P\w*))?', + r'(?P/.*)?', + '$' +])) + + +def string_to_identity(s, identity_type): + """Parse string into Identity protobuf.""" + m = _identity_regexp.match(s) + result = m.groupdict() + log.debug('parsed identity: %s', result) + kwargs = {k: v for k, v in result.items() if v} + return identity_type(**kwargs) + + +def identity_to_string(identity): + """Dump Identity protobuf into its string representation.""" + result = [] + if identity.proto: + result.append(identity.proto + '://') + if identity.user: + result.append(identity.user + '@') + result.append(identity.host) + if identity.port: + result.append(':' + identity.port) + if identity.path: + result.append(identity.path) + return ''.join(result) + + +def get_bip32_address(identity, ecdh=False): + """Compute BIP32 derivation address according to SLIP-0013/0017.""" + index = struct.pack('