From dbed773e548135146e47c8cb90f3a9096e50b17d Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Thu, 3 Nov 2016 23:29:45 +0200 Subject: [PATCH] fix pylint and tests --- trezor_agent/__main__.py | 2 +- trezor_agent/device/__init__.py | 3 +-- trezor_agent/device/keepkey.py | 1 - trezor_agent/device/ledger.py | 29 +++++++++++++---------------- trezor_agent/tests/test_client.py | 20 ++++++++++---------- trezor_agent/tests/test_protocol.py | 21 +++++++++++++-------- 6 files changed, 38 insertions(+), 38 deletions(-) diff --git a/trezor_agent/__main__.py b/trezor_agent/__main__.py index 7fd0f37..3fa81c3 100644 --- a/trezor_agent/__main__.py +++ b/trezor_agent/__main__.py @@ -120,7 +120,7 @@ def handle_connection_error(func): def parse_config(fname): """Parse config file into a list of Identity objects.""" contents = open(fname).read() - for identity_str, curve_name in re.findall('\<(.*?)\|(.*?)\>', contents): + for identity_str, curve_name in re.findall(r'\<(.*?)\|(.*?)\>', contents): yield device.interface.Identity(identity_str=identity_str, curve_name=curve_name) diff --git a/trezor_agent/device/__init__.py b/trezor_agent/device/__init__.py index d37e6db..613c1cf 100644 --- a/trezor_agent/device/__init__.py +++ b/trezor_agent/device/__init__.py @@ -24,5 +24,4 @@ def detect(): return d except interface.NotFoundError as e: log.debug('device not found: %s', e) - raise IOError('No device found: "{}" ({})'.format(identity_str, - curve_name)) + raise IOError('No device found!') diff --git a/trezor_agent/device/keepkey.py b/trezor_agent/device/keepkey.py index 23ccf61..75e3958 100644 --- a/trezor_agent/device/keepkey.py +++ b/trezor_agent/device/keepkey.py @@ -1,7 +1,6 @@ """KeepKey-related code (see https://www.keepkey.com/).""" from . import interface, trezor -from .. import formats class KeepKey(trezor.Trezor): diff --git a/trezor_agent/device/ledger.py b/trezor_agent/device/ledger.py index 1745fba..e54c83d 100644 --- a/trezor_agent/device/ledger.py +++ b/trezor_agent/device/ledger.py @@ -44,11 +44,10 @@ class LedgerNanoS(interface.Device): raise interface.NotFoundError( '{} not connected: "{}"'.format(self, e)) - def pubkey(self, ecdh=False): + def pubkey(self, identity, ecdh=False): """Get PublicKey object for specified BIP32 address and elliptic curve.""" - curve_name = self.get_curve_name(ecdh) - path = _expand_path(interface.get_bip32_address(self.identity_dict, - ecdh=ecdh)) + curve_name = identity.get_curve_name(ecdh) + path = _expand_path(identity.get_bip32_address(ecdh)) if curve_name == 'nist256p1': p2 = '01' else: @@ -60,27 +59,26 @@ class LedgerNanoS(interface.Device): result = bytearray(self.conn.exchange(bytes(apdu)))[1:] return _convert_public_key(curve_name, result) - def sign(self, blob): + def sign(self, identity, blob): """Sign given blob and return the signature (as bytes).""" - path = _expand_path(interface.get_bip32_address(self.identity_dict, - ecdh=False)) - if self.identity_dict['proto'] == 'ssh': + path = _expand_path(identity.get_bip32_address(ecdh=False)) + if identity.identity_dict['proto'] == 'ssh': ins = '04' p1 = '00' else: ins = '08' p1 = '00' - if self.curve_name == 'nist256p1': - p2 = '81' if self.identity_dict['proto'] == 'ssh' else '01' + if identity.curve_name == 'nist256p1': + p2 = '81' if identity.identity_dict['proto'] == 'ssh' else '01' else: - p2 = '82' if self.identity_dict['proto'] == 'ssh' else '02' + p2 = '82' if identity.identity_dict['proto'] == 'ssh' else '02' apdu = '80' + ins + p1 + p2 apdu = binascii.unhexlify(apdu) apdu += bytearray([len(blob) + len(path) + 1]) apdu += bytearray([len(path) // 4]) + path apdu += blob result = bytearray(self.conn.exchange(bytes(apdu))) - if self.curve_name == 'nist256p1': + if identity.curve_name == 'nist256p1': offset = 3 length = result[offset] r = result[offset+1:offset+1+length] @@ -96,11 +94,10 @@ class LedgerNanoS(interface.Device): else: return bytes(result[:64]) - def ecdh(self, pubkey): + def ecdh(self, identity, pubkey): """Get shared session key using Elliptic Curve Diffie-Hellman.""" - path = _expand_path(interface.get_bip32_address(self.identity_dict, - ecdh=True)) - if self.curve_name == 'nist256p1': + path = _expand_path(identity.get_bip32_address(ecdh=True)) + if identity.curve_name == 'nist256p1': p2 = '01' else: p2 = '02' diff --git a/trezor_agent/tests/test_client.py b/trezor_agent/tests/test_client.py index b3f4bad..fb9d2b7 100644 --- a/trezor_agent/tests/test_client.py +++ b/trezor_agent/tests/test_client.py @@ -12,7 +12,7 @@ PUBKEY = (b'\x03\xd8(\xb5\xa6`\xbet0\x95\xac:[;]\xdc,\xbd\xdc?\xd7\xc0\xec' b'\xdd\xbc+\xfar~\x9dAis') PUBKEY_TEXT = ('ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzd' 'HAyNTYAAABBBNgotaZgvnQwlaw6Wztd3Cy93D/XwOzdvCv6cn6dQWlzNMEQeW' - 'VUfhvrGljR2Z/CMRONY6ejB+9PnpUOPuzYqi8= ssh://localhost:22\n') + 'VUfhvrGljR2Z/CMRONY6ejB+9PnpUOPuzYqi8= \n') class MockDevice(device.interface.Device): # pylint: disable=abstract-method @@ -23,11 +23,11 @@ class MockDevice(device.interface.Device): # pylint: disable=abstract-method def close(self): self.conn = None - def pubkey(self, ecdh=False): # pylint: disable=unused-argument + def pubkey(self, identity, ecdh=False): # pylint: disable=unused-argument assert self.conn return PUBKEY - def sign(self, blob): + def sign(self, identity, blob): """Sign given blob and return the signature (as bytes).""" assert self.conn assert blob == BLOB @@ -59,11 +59,11 @@ SIG = (b'R\x19T\xf2\x84$\xef#\x0e\xee\x04X\xc6\xc3\x99T`\xd1\xd8\xf7!' def test_ssh_agent(): - identity_str = 'localhost:22' - c = client.Client(device=MockDevice(identity_str=identity_str, - curve_name=CURVE)) - assert c.get_public_key() == PUBKEY_TEXT - signature = c.sign_ssh_challenge(blob=BLOB) + identity = device.interface.Identity(identity_str='localhost:22', + curve_name=CURVE) + c = client.Client(device=MockDevice()) + assert c.get_public_key(identity) == PUBKEY_TEXT + signature = c.sign_ssh_challenge(blob=BLOB, identity=identity) key = formats.import_public_key(PUBKEY_TEXT) serialized_sig = key['verifier'](sig=signature, msg=BLOB) @@ -77,9 +77,9 @@ def test_ssh_agent(): assert r[1:] + s[1:] == SIG # pylint: disable=unused-argument - def cancel_sign(blob): + def cancel_sign(identity, blob): raise IOError(42, 'ERROR') c.device.sign = cancel_sign with pytest.raises(IOError): - c.sign_ssh_challenge(blob=BLOB) + c.sign_ssh_challenge(blob=BLOB, identity=identity) diff --git a/trezor_agent/tests/test_protocol.py b/trezor_agent/tests/test_protocol.py index 17a1001..2d8b0dc 100644 --- a/trezor_agent/tests/test_protocol.py +++ b/trezor_agent/tests/test_protocol.py @@ -1,6 +1,6 @@ import pytest -from .. import formats, protocol +from .. import device, formats, protocol # pylint: disable=line-too-long @@ -17,6 +17,7 @@ NIST256_SIGN_REPLY = b'\x00\x00\x00j\x0e\x00\x00\x00e\x00\x00\x00\x13ecdsa-sha2- def test_list(): key = formats.import_public_key(NIST256_KEY) + key['identity'] = device.interface.Identity('ssh://localhost', 'nist256p1') h = protocol.Handler(keys=[key], signer=None) reply = h.handle(LIST_MSG) assert reply == LIST_NIST256_REPLY @@ -28,13 +29,15 @@ def test_unsupported(): assert reply == b'\x00\x00\x00\x01\x05' -def ecdsa_signer(blob): +def ecdsa_signer(identity, blob): + assert str(identity) == '' assert blob == NIST256_BLOB return NIST256_SIG def test_ecdsa_sign(): key = formats.import_public_key(NIST256_KEY) + key['identity'] = device.interface.Identity('ssh://localhost', 'nist256p1') h = protocol.Handler(keys=[key], signer=ecdsa_signer) reply = h.handle(NIST256_SIGN_MSG) assert reply == NIST256_SIGN_REPLY @@ -42,30 +45,30 @@ def test_ecdsa_sign(): def test_sign_missing(): h = protocol.Handler(keys=[], signer=ecdsa_signer) - with pytest.raises(KeyError): h.handle(NIST256_SIGN_MSG) def test_sign_wrong(): - def wrong_signature(blob): + def wrong_signature(identity, blob): + assert str(identity) == '' assert blob == NIST256_BLOB return b'\x00' * 64 key = formats.import_public_key(NIST256_KEY) + key['identity'] = device.interface.Identity('ssh://localhost', 'nist256p1') h = protocol.Handler(keys=[key], signer=wrong_signature) - with pytest.raises(ValueError): h.handle(NIST256_SIGN_MSG) def test_sign_cancel(): - def cancel_signature(blob): # pylint: disable=unused-argument + def cancel_signature(identity, blob): # pylint: disable=unused-argument raise IOError() key = formats.import_public_key(NIST256_KEY) + key['identity'] = device.interface.Identity('ssh://localhost', 'nist256p1') h = protocol.Handler(keys=[key], signer=cancel_signature) - assert h.handle(NIST256_SIGN_MSG) == protocol.failure() @@ -77,13 +80,15 @@ ED25519_BLOB = b'''\x00\x00\x00 i3\xae}yk\\\xa1L\xb9\xe1\xbf\xbc\x8e\x87\r\x0e\x ED25519_SIG = b'''\x8eb)\xa6\xe9P\x83VE\xfbq\xc6\xbf\x1dV3\xe3' assert blob == ED25519_BLOB return ED25519_SIG def test_ed25519_sign(): key = formats.import_public_key(ED25519_KEY) + key['identity'] = device.interface.Identity('ssh://localhost', 'ed25519') h = protocol.Handler(keys=[key], signer=ed25519_signer) reply = h.handle(ED25519_SIGN_MSG) assert reply == ED25519_SIGN_REPLY