From 8fe16d24c2ae8ce7e1e6a8f7aa75094ed2a9a0ba Mon Sep 17 00:00:00 2001 From: BTChip Date: Sun, 31 Jul 2016 10:00:46 +0200 Subject: [PATCH 1/9] Ledger integration --- trezor_agent/factory.py | 137 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 136 insertions(+), 1 deletion(-) diff --git a/trezor_agent/factory.py b/trezor_agent/factory.py index 43536b8..422db2c 100644 --- a/trezor_agent/factory.py +++ b/trezor_agent/factory.py @@ -76,10 +76,145 @@ def _load_keepkey(): except ImportError: log.exception('Missing module: install via "pip install keepkey"') +def _load_ledger(): + import struct + class LedgerClientConnection(object): + def __init__(self, dongle): + self.dongle = dongle + + def expand_path(self, path): + result = "" + for pathElement in path: + result = result + struct.pack(">I", pathElement) + return result + + def get_public_node(self, n, ecdsa_curve_name="secp256k1", show_display=False): + from trezorlib.messages_pb2 import PublicKey + donglePath = self.expand_path(n) + if ecdsa_curve_name == "nist256p1": + p2 = "01" + else: + p2 = "02" + apdu = "800200" + p2 + apdu = apdu.decode('hex') + chr(len(donglePath) + 1) + chr(len(donglePath) / 4) + donglePath + result = bytearray(self.dongle.exchange(bytes(apdu)))[1:] + if ecdsa_curve_name == "nist256p1": + if ((result[64] & 1) <> 0): + result = bytearray([0x03]) + result[1:33] + else: + result = bytearray([0x02]) + result[1:33] + else: + #TODO + #result = result[1:] + #keyY = bytearray(result[32:][::-1]) + #keyY = bytearray(result[32:]) + #if ((keyX[31] & 1)<>0): + # keyY[31] |= 0x80 + #result = chr(0) + str(keyY) + result = chr(0) + result[1:33] + publicKey = PublicKey() + publicKey.node.public_key = str(result) + return publicKey + + def sign_identity(self, identity, challenge_hidden, challenge_visual, ecdsa_curve_name="secp256k1"): + from trezor_agent import client + from trezorlib.messages_pb2 import SignedIdentity + import hashlib + n = client.get_address(identity) + donglePath = self.expand_path(n) + if identity.proto == 'ssh': + ins = "04" + p1 = "00" + publicKey = self.get_public_node(n, ecdsa_curve_name) + else: + ins = "08" + p1 = "00" + if ecdsa_curve_name == "nist256p1": + p2 = "01" + else: + p2 = "02" + apdu = "80" + ins + p1 + p2 + apdu = apdu.decode('hex') + chr(len(challenge_hidden) + len(donglePath) + 1) + apdu = apdu + chr(len(donglePath) / 4) + donglePath + apdu = apdu + challenge_hidden + result = bytearray(self.dongle.exchange(bytes(apdu))) + if ecdsa_curve_name == "nist256p1": + offset = 3 + rLength = result[offset] + r = result[offset + 1 : offset + 1 + rLength] + if r[0] == 0: + r = r[1:] + offset = offset + 1 + rLength + 1 + sLength = result[offset] + s = result[offset + 1 : offset + 1 + sLength] + if s[0] == 0: + s = s[1:] + signature = SignedIdentity() + signature.signature = chr(0) + str(r) + str(s) + if identity.proto == 'ssh': + signature.public_key = publicKey.node.public_key + return signature + else: + signature = SignedIdentity() + signature.signature = chr(0) + str(result) + if identity.proto == 'ssh': + signature.public_key = publicKey.node.public_key + return signature + pass + + def get_ecdh_session_key(self, identity, peer_public_key, ecdsa_curve_name="secp256k1"): + from trezor_agent import client + from trezorlib.messages_pb2 import ECDHSessionKey + n = client.get_address(identity, True) + donglePath = self.expand_path(n) + if ecdsa_curve_name == "nist256p1": + p2 = "01" + else: + p2 = "02" + apdu = "800a00" + p2 + apdu = apdu.decode('hex') + chr(len(peer_public_key) + len(donglePath) + 1) + apdu = apdu + chr(len(donglePath) / 4) + donglePath + apdu = apdu + peer_public_key + result = bytearray(self.dongle.exchange(bytes(apdu))) + if ecdsa_curve_name == "nist256p1": + sessionKey = ECDHSessionKey() + sessionKey.session_key = str(result) + return sessionKey + pass + + def clear_session(self): + pass + + def close(self): + self.dongle.close() + pass + + def ping(self, msg, button_protection=False, pin_protection=False, passphrase_protection=False): + return msg + + class CallException(Exception): + def __init__(self, code, message): + super(CallException, self).__init__() + self.args = [code, message] + try: + from ledgerblue.comm import getDongle + from ledgerblue.commException import CommException + except ImportError: + log.exception('Missing module: install via "pip install ledgerblue"') + try: + from trezorlib.types_pb2 import IdentityType + dongle = getDongle(True) + except: + return + yield ClientWrapper(connection=LedgerClientConnection(dongle), + identity_type=IdentityType, + device_name="ledger", + call_exception=CallException) LOADERS = [ _load_trezor, - _load_keepkey + _load_keepkey, + _load_ledger ] From bc1d7a54480202b02e59559207c64462571335f0 Mon Sep 17 00:00:00 2001 From: BTChip Date: Wed, 3 Aug 2016 15:17:04 +0200 Subject: [PATCH 2/9] Fix eddsa, SSH optimization with signature + key, cleanup --- trezor_agent/factory.py | 121 ++++++++++++++++++++++------------------ 1 file changed, 67 insertions(+), 54 deletions(-) diff --git a/trezor_agent/factory.py b/trezor_agent/factory.py index 422db2c..16a4a32 100644 --- a/trezor_agent/factory.py +++ b/trezor_agent/factory.py @@ -1,4 +1,4 @@ -"""Thin wrapper around trezor/keepkey libraries.""" +"""Thin wrapper around trezor/keepkey/ledger libraries.""" import binascii import collections import logging @@ -82,114 +82,127 @@ def _load_ledger(): def __init__(self, dongle): self.dongle = dongle - def expand_path(self, path): + @staticmethod + def expand_path(path): result = "" for pathElement in path: result = result + struct.pack(">I", pathElement) return result - def get_public_node(self, n, ecdsa_curve_name="secp256k1", show_display=False): + @staticmethod + def convert_public_key(ecdsa_curve_name, result): from trezorlib.messages_pb2 import PublicKey - donglePath = self.expand_path(n) + if ecdsa_curve_name == "nist256p1": + if (result[64] & 1) <> 0: + result = bytearray([0x03]) + result[1:33] + else: + result = bytearray([0x02]) + result[1:33] + else: + result = result[1:] + keyX = bytearray(result[0:32]) + keyY = bytearray(result[32:][::-1]) + if (keyX[31] & 1) <> 0: + keyY[31] |= 0x80 + result = chr(0) + str(keyY) + publicKey = PublicKey() + publicKey.node.public_key = str(result) + return publicKey + + + # pylint: disable=unused-argument + def get_public_node(self, n, ecdsa_curve_name="secp256k1", show_display=False): + donglePath = LedgerClientConnection.expand_path(n) if ecdsa_curve_name == "nist256p1": p2 = "01" else: p2 = "02" apdu = "800200" + p2 - apdu = apdu.decode('hex') + chr(len(donglePath) + 1) + chr(len(donglePath) / 4) + donglePath + apdu = apdu.decode('hex') + apdu += chr(len(donglePath) + 1) + chr(len(donglePath) / 4) + apdu += donglePath result = bytearray(self.dongle.exchange(bytes(apdu)))[1:] - if ecdsa_curve_name == "nist256p1": - if ((result[64] & 1) <> 0): - result = bytearray([0x03]) + result[1:33] - else: - result = bytearray([0x02]) + result[1:33] - else: - #TODO - #result = result[1:] - #keyY = bytearray(result[32:][::-1]) - #keyY = bytearray(result[32:]) - #if ((keyX[31] & 1)<>0): - # keyY[31] |= 0x80 - #result = chr(0) + str(keyY) - result = chr(0) + result[1:33] - publicKey = PublicKey() - publicKey.node.public_key = str(result) - return publicKey + return LedgerClientConnection.convert_public_key(ecdsa_curve_name, result) - def sign_identity(self, identity, challenge_hidden, challenge_visual, ecdsa_curve_name="secp256k1"): + # pylint: disable=too-many-locals + def sign_identity(self, identity, challenge_hidden, challenge_visual, + ecdsa_curve_name="secp256k1"): from trezor_agent import client from trezorlib.messages_pb2 import SignedIdentity - import hashlib n = client.get_address(identity) - donglePath = self.expand_path(n) + donglePath = LedgerClientConnection.expand_path(n) if identity.proto == 'ssh': ins = "04" p1 = "00" - publicKey = self.get_public_node(n, ecdsa_curve_name) else: ins = "08" - p1 = "00" + p1 = "00" if ecdsa_curve_name == "nist256p1": - p2 = "01" + p2 = "81" if identity.proto == 'ssh' else "01" else: - p2 = "02" + p2 = "82" if identity.proto == 'ssh' else "02" apdu = "80" + ins + p1 + p2 - apdu = apdu.decode('hex') + chr(len(challenge_hidden) + len(donglePath) + 1) - apdu = apdu + chr(len(donglePath) / 4) + donglePath - apdu = apdu + challenge_hidden + apdu = apdu.decode('hex') + apdu += chr(len(challenge_hidden) + len(donglePath) + 1) + apdu += chr(len(donglePath) / 4) + donglePath + apdu += challenge_hidden result = bytearray(self.dongle.exchange(bytes(apdu))) if ecdsa_curve_name == "nist256p1": offset = 3 - rLength = result[offset] - r = result[offset + 1 : offset + 1 + rLength] + length = result[offset] + r = result[offset + 1 : offset + 1 + length] if r[0] == 0: r = r[1:] - offset = offset + 1 + rLength + 1 - sLength = result[offset] - s = result[offset + 1 : offset + 1 + sLength] + offset = offset + 1 + length + 1 + length = result[offset] + s = result[offset + 1 : offset + 1 + length] if s[0] == 0: s = s[1:] + offset = offset + 1 + length signature = SignedIdentity() signature.signature = chr(0) + str(r) + str(s) if identity.proto == 'ssh': - signature.public_key = publicKey.node.public_key + keyData = result[offset:] + pk = LedgerClientConnection.convert_public_key(ecdsa_curve_name, keyData) + signature.public_key = pk.node.public_key return signature else: signature = SignedIdentity() - signature.signature = chr(0) + str(result) + signature.signature = chr(0) + str(result[0:64]) if identity.proto == 'ssh': - signature.public_key = publicKey.node.public_key - return signature - pass + keyData = result[64:] + pk = LedgerClientConnection.convert_public_key(ecdsa_curve_name, keyData) + signature.public_key = pk.node.public_key + return signature def get_ecdh_session_key(self, identity, peer_public_key, ecdsa_curve_name="secp256k1"): from trezor_agent import client from trezorlib.messages_pb2 import ECDHSessionKey n = client.get_address(identity, True) - donglePath = self.expand_path(n) + donglePath = LedgerClientConnection.expand_path(n) if ecdsa_curve_name == "nist256p1": p2 = "01" else: p2 = "02" apdu = "800a00" + p2 - apdu = apdu.decode('hex') + chr(len(peer_public_key) + len(donglePath) + 1) - apdu = apdu + chr(len(donglePath) / 4) + donglePath - apdu = apdu + peer_public_key + apdu = apdu.decode('hex') + apdu += chr(len(peer_public_key) + len(donglePath) + 1) + apdu += chr(len(donglePath) / 4) + donglePath + apdu += peer_public_key result = bytearray(self.dongle.exchange(bytes(apdu))) - if ecdsa_curve_name == "nist256p1": - sessionKey = ECDHSessionKey() - sessionKey.session_key = str(result) - return sessionKey - pass + sessionKey = ECDHSessionKey() + sessionKey.session_key = str(result) + return sessionKey def clear_session(self): pass def close(self): self.dongle.close() - pass - def ping(self, msg, button_protection=False, pin_protection=False, passphrase_protection=False): + # pylint: disable=unused-argument + # pylint: disable=no-self-use + def ping(self, msg, button_protection=False, pin_protection=False, + passphrase_protection=False): return msg class CallException(Exception): @@ -198,9 +211,9 @@ def _load_ledger(): self.args = [code, message] try: from ledgerblue.comm import getDongle - from ledgerblue.commException import CommException except ImportError: log.exception('Missing module: install via "pip install ledgerblue"') + # pylint: disable=bare-except try: from trezorlib.types_pb2 import IdentityType dongle = getDongle(True) From adb09cd8ca79d1906843c915bb48cfde978c266c Mon Sep 17 00:00:00 2001 From: BTChip Date: Sun, 31 Jul 2016 10:00:46 +0200 Subject: [PATCH 3/9] Ledger integration --- trezor_agent/factory.py | 137 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 136 insertions(+), 1 deletion(-) diff --git a/trezor_agent/factory.py b/trezor_agent/factory.py index 43536b8..422db2c 100644 --- a/trezor_agent/factory.py +++ b/trezor_agent/factory.py @@ -76,10 +76,145 @@ def _load_keepkey(): except ImportError: log.exception('Missing module: install via "pip install keepkey"') +def _load_ledger(): + import struct + class LedgerClientConnection(object): + def __init__(self, dongle): + self.dongle = dongle + + def expand_path(self, path): + result = "" + for pathElement in path: + result = result + struct.pack(">I", pathElement) + return result + + def get_public_node(self, n, ecdsa_curve_name="secp256k1", show_display=False): + from trezorlib.messages_pb2 import PublicKey + donglePath = self.expand_path(n) + if ecdsa_curve_name == "nist256p1": + p2 = "01" + else: + p2 = "02" + apdu = "800200" + p2 + apdu = apdu.decode('hex') + chr(len(donglePath) + 1) + chr(len(donglePath) / 4) + donglePath + result = bytearray(self.dongle.exchange(bytes(apdu)))[1:] + if ecdsa_curve_name == "nist256p1": + if ((result[64] & 1) <> 0): + result = bytearray([0x03]) + result[1:33] + else: + result = bytearray([0x02]) + result[1:33] + else: + #TODO + #result = result[1:] + #keyY = bytearray(result[32:][::-1]) + #keyY = bytearray(result[32:]) + #if ((keyX[31] & 1)<>0): + # keyY[31] |= 0x80 + #result = chr(0) + str(keyY) + result = chr(0) + result[1:33] + publicKey = PublicKey() + publicKey.node.public_key = str(result) + return publicKey + + def sign_identity(self, identity, challenge_hidden, challenge_visual, ecdsa_curve_name="secp256k1"): + from trezor_agent import client + from trezorlib.messages_pb2 import SignedIdentity + import hashlib + n = client.get_address(identity) + donglePath = self.expand_path(n) + if identity.proto == 'ssh': + ins = "04" + p1 = "00" + publicKey = self.get_public_node(n, ecdsa_curve_name) + else: + ins = "08" + p1 = "00" + if ecdsa_curve_name == "nist256p1": + p2 = "01" + else: + p2 = "02" + apdu = "80" + ins + p1 + p2 + apdu = apdu.decode('hex') + chr(len(challenge_hidden) + len(donglePath) + 1) + apdu = apdu + chr(len(donglePath) / 4) + donglePath + apdu = apdu + challenge_hidden + result = bytearray(self.dongle.exchange(bytes(apdu))) + if ecdsa_curve_name == "nist256p1": + offset = 3 + rLength = result[offset] + r = result[offset + 1 : offset + 1 + rLength] + if r[0] == 0: + r = r[1:] + offset = offset + 1 + rLength + 1 + sLength = result[offset] + s = result[offset + 1 : offset + 1 + sLength] + if s[0] == 0: + s = s[1:] + signature = SignedIdentity() + signature.signature = chr(0) + str(r) + str(s) + if identity.proto == 'ssh': + signature.public_key = publicKey.node.public_key + return signature + else: + signature = SignedIdentity() + signature.signature = chr(0) + str(result) + if identity.proto == 'ssh': + signature.public_key = publicKey.node.public_key + return signature + pass + + def get_ecdh_session_key(self, identity, peer_public_key, ecdsa_curve_name="secp256k1"): + from trezor_agent import client + from trezorlib.messages_pb2 import ECDHSessionKey + n = client.get_address(identity, True) + donglePath = self.expand_path(n) + if ecdsa_curve_name == "nist256p1": + p2 = "01" + else: + p2 = "02" + apdu = "800a00" + p2 + apdu = apdu.decode('hex') + chr(len(peer_public_key) + len(donglePath) + 1) + apdu = apdu + chr(len(donglePath) / 4) + donglePath + apdu = apdu + peer_public_key + result = bytearray(self.dongle.exchange(bytes(apdu))) + if ecdsa_curve_name == "nist256p1": + sessionKey = ECDHSessionKey() + sessionKey.session_key = str(result) + return sessionKey + pass + + def clear_session(self): + pass + + def close(self): + self.dongle.close() + pass + + def ping(self, msg, button_protection=False, pin_protection=False, passphrase_protection=False): + return msg + + class CallException(Exception): + def __init__(self, code, message): + super(CallException, self).__init__() + self.args = [code, message] + try: + from ledgerblue.comm import getDongle + from ledgerblue.commException import CommException + except ImportError: + log.exception('Missing module: install via "pip install ledgerblue"') + try: + from trezorlib.types_pb2 import IdentityType + dongle = getDongle(True) + except: + return + yield ClientWrapper(connection=LedgerClientConnection(dongle), + identity_type=IdentityType, + device_name="ledger", + call_exception=CallException) LOADERS = [ _load_trezor, - _load_keepkey + _load_keepkey, + _load_ledger ] From 33747592ca3a5bf396d5edd389d0fd903b833cff Mon Sep 17 00:00:00 2001 From: BTChip Date: Wed, 3 Aug 2016 15:17:04 +0200 Subject: [PATCH 4/9] Fix eddsa, SSH optimization with signature + key, cleanup --- trezor_agent/factory.py | 121 ++++++++++++++++++++++------------------ 1 file changed, 67 insertions(+), 54 deletions(-) diff --git a/trezor_agent/factory.py b/trezor_agent/factory.py index 422db2c..16a4a32 100644 --- a/trezor_agent/factory.py +++ b/trezor_agent/factory.py @@ -1,4 +1,4 @@ -"""Thin wrapper around trezor/keepkey libraries.""" +"""Thin wrapper around trezor/keepkey/ledger libraries.""" import binascii import collections import logging @@ -82,114 +82,127 @@ def _load_ledger(): def __init__(self, dongle): self.dongle = dongle - def expand_path(self, path): + @staticmethod + def expand_path(path): result = "" for pathElement in path: result = result + struct.pack(">I", pathElement) return result - def get_public_node(self, n, ecdsa_curve_name="secp256k1", show_display=False): + @staticmethod + def convert_public_key(ecdsa_curve_name, result): from trezorlib.messages_pb2 import PublicKey - donglePath = self.expand_path(n) + if ecdsa_curve_name == "nist256p1": + if (result[64] & 1) <> 0: + result = bytearray([0x03]) + result[1:33] + else: + result = bytearray([0x02]) + result[1:33] + else: + result = result[1:] + keyX = bytearray(result[0:32]) + keyY = bytearray(result[32:][::-1]) + if (keyX[31] & 1) <> 0: + keyY[31] |= 0x80 + result = chr(0) + str(keyY) + publicKey = PublicKey() + publicKey.node.public_key = str(result) + return publicKey + + + # pylint: disable=unused-argument + def get_public_node(self, n, ecdsa_curve_name="secp256k1", show_display=False): + donglePath = LedgerClientConnection.expand_path(n) if ecdsa_curve_name == "nist256p1": p2 = "01" else: p2 = "02" apdu = "800200" + p2 - apdu = apdu.decode('hex') + chr(len(donglePath) + 1) + chr(len(donglePath) / 4) + donglePath + apdu = apdu.decode('hex') + apdu += chr(len(donglePath) + 1) + chr(len(donglePath) / 4) + apdu += donglePath result = bytearray(self.dongle.exchange(bytes(apdu)))[1:] - if ecdsa_curve_name == "nist256p1": - if ((result[64] & 1) <> 0): - result = bytearray([0x03]) + result[1:33] - else: - result = bytearray([0x02]) + result[1:33] - else: - #TODO - #result = result[1:] - #keyY = bytearray(result[32:][::-1]) - #keyY = bytearray(result[32:]) - #if ((keyX[31] & 1)<>0): - # keyY[31] |= 0x80 - #result = chr(0) + str(keyY) - result = chr(0) + result[1:33] - publicKey = PublicKey() - publicKey.node.public_key = str(result) - return publicKey + return LedgerClientConnection.convert_public_key(ecdsa_curve_name, result) - def sign_identity(self, identity, challenge_hidden, challenge_visual, ecdsa_curve_name="secp256k1"): + # pylint: disable=too-many-locals + def sign_identity(self, identity, challenge_hidden, challenge_visual, + ecdsa_curve_name="secp256k1"): from trezor_agent import client from trezorlib.messages_pb2 import SignedIdentity - import hashlib n = client.get_address(identity) - donglePath = self.expand_path(n) + donglePath = LedgerClientConnection.expand_path(n) if identity.proto == 'ssh': ins = "04" p1 = "00" - publicKey = self.get_public_node(n, ecdsa_curve_name) else: ins = "08" - p1 = "00" + p1 = "00" if ecdsa_curve_name == "nist256p1": - p2 = "01" + p2 = "81" if identity.proto == 'ssh' else "01" else: - p2 = "02" + p2 = "82" if identity.proto == 'ssh' else "02" apdu = "80" + ins + p1 + p2 - apdu = apdu.decode('hex') + chr(len(challenge_hidden) + len(donglePath) + 1) - apdu = apdu + chr(len(donglePath) / 4) + donglePath - apdu = apdu + challenge_hidden + apdu = apdu.decode('hex') + apdu += chr(len(challenge_hidden) + len(donglePath) + 1) + apdu += chr(len(donglePath) / 4) + donglePath + apdu += challenge_hidden result = bytearray(self.dongle.exchange(bytes(apdu))) if ecdsa_curve_name == "nist256p1": offset = 3 - rLength = result[offset] - r = result[offset + 1 : offset + 1 + rLength] + length = result[offset] + r = result[offset + 1 : offset + 1 + length] if r[0] == 0: r = r[1:] - offset = offset + 1 + rLength + 1 - sLength = result[offset] - s = result[offset + 1 : offset + 1 + sLength] + offset = offset + 1 + length + 1 + length = result[offset] + s = result[offset + 1 : offset + 1 + length] if s[0] == 0: s = s[1:] + offset = offset + 1 + length signature = SignedIdentity() signature.signature = chr(0) + str(r) + str(s) if identity.proto == 'ssh': - signature.public_key = publicKey.node.public_key + keyData = result[offset:] + pk = LedgerClientConnection.convert_public_key(ecdsa_curve_name, keyData) + signature.public_key = pk.node.public_key return signature else: signature = SignedIdentity() - signature.signature = chr(0) + str(result) + signature.signature = chr(0) + str(result[0:64]) if identity.proto == 'ssh': - signature.public_key = publicKey.node.public_key - return signature - pass + keyData = result[64:] + pk = LedgerClientConnection.convert_public_key(ecdsa_curve_name, keyData) + signature.public_key = pk.node.public_key + return signature def get_ecdh_session_key(self, identity, peer_public_key, ecdsa_curve_name="secp256k1"): from trezor_agent import client from trezorlib.messages_pb2 import ECDHSessionKey n = client.get_address(identity, True) - donglePath = self.expand_path(n) + donglePath = LedgerClientConnection.expand_path(n) if ecdsa_curve_name == "nist256p1": p2 = "01" else: p2 = "02" apdu = "800a00" + p2 - apdu = apdu.decode('hex') + chr(len(peer_public_key) + len(donglePath) + 1) - apdu = apdu + chr(len(donglePath) / 4) + donglePath - apdu = apdu + peer_public_key + apdu = apdu.decode('hex') + apdu += chr(len(peer_public_key) + len(donglePath) + 1) + apdu += chr(len(donglePath) / 4) + donglePath + apdu += peer_public_key result = bytearray(self.dongle.exchange(bytes(apdu))) - if ecdsa_curve_name == "nist256p1": - sessionKey = ECDHSessionKey() - sessionKey.session_key = str(result) - return sessionKey - pass + sessionKey = ECDHSessionKey() + sessionKey.session_key = str(result) + return sessionKey def clear_session(self): pass def close(self): self.dongle.close() - pass - def ping(self, msg, button_protection=False, pin_protection=False, passphrase_protection=False): + # pylint: disable=unused-argument + # pylint: disable=no-self-use + def ping(self, msg, button_protection=False, pin_protection=False, + passphrase_protection=False): return msg class CallException(Exception): @@ -198,9 +211,9 @@ def _load_ledger(): self.args = [code, message] try: from ledgerblue.comm import getDongle - from ledgerblue.commException import CommException except ImportError: log.exception('Missing module: install via "pip install ledgerblue"') + # pylint: disable=bare-except try: from trezorlib.types_pb2 import IdentityType dongle = getDongle(True) From c4bbac0e77d73230e78776f2d839b93a1fb77eba Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Thu, 11 Aug 2016 22:30:59 +0300 Subject: [PATCH 5/9] util: move BIP32 address related functions --- trezor_agent/__main__.py | 4 +-- trezor_agent/client.py | 57 ++----------------------------- trezor_agent/gpg/encode.py | 4 +-- trezor_agent/tests/test_client.py | 6 ++-- trezor_agent/util.py | 54 +++++++++++++++++++++++++++++ 5 files changed, 64 insertions(+), 61 deletions(-) diff --git a/trezor_agent/__main__.py b/trezor_agent/__main__.py index a56264b..6fae2c3 100644 --- a/trezor_agent/__main__.py +++ b/trezor_agent/__main__.py @@ -7,14 +7,14 @@ import re import subprocess import sys -from . import client, formats, protocol, server +from . import client, formats, protocol, server, util log = logging.getLogger(__name__) def ssh_args(label): """Create SSH command for connecting specified server.""" - identity = client.string_to_identity(label, identity_type=dict) + identity = util.string_to_identity(label, identity_type=dict) args = [] if 'port' in identity: diff --git a/trezor_agent/client.py b/trezor_agent/client.py index eea90a7..e8d2e35 100644 --- a/trezor_agent/client.py +++ b/trezor_agent/client.py @@ -6,8 +6,6 @@ It is used for getting SSH public keys and ECDSA signing of server requests. import binascii import io import logging -import re -import struct from . import factory, formats, util @@ -40,7 +38,7 @@ class Client(object): def get_identity(self, label, index=0): """Parse label string into Identity protobuf.""" - identity = string_to_identity(label, self.identity_type) + identity = util.string_to_identity(label, self.identity_type) identity.proto = 'ssh' identity.index = index return identity @@ -48,10 +46,10 @@ class Client(object): def get_public_key(self, label): """Get SSH public key corresponding to specified by label.""" identity = self.get_identity(label=label) - label = identity_to_string(identity) # canonize key label + label = util.identity_to_string(identity) # canonize key label log.info('getting "%s" public key (%s) from %s...', label, self.curve, self.device_name) - addr = get_address(identity) + addr = util.get_bip32_address(identity) node = self.client.get_public_node(n=addr, ecdsa_curve_name=self.curve) @@ -93,55 +91,6 @@ class Client(object): return result.signature[1:] -_identity_regexp = re.compile(''.join([ - '^' - r'(?:(?P.*)://)?', - r'(?:(?P.*)@)?', - r'(?P.*?)', - r'(?::(?P\w*))?', - r'(?P/.*)?', - '$' -])) - - -def string_to_identity(s, identity_type): - """Parse string into Identity protobuf.""" - m = _identity_regexp.match(s) - result = m.groupdict() - log.debug('parsed identity: %s', result) - kwargs = {k: v for k, v in result.items() if v} - return identity_type(**kwargs) - - -def identity_to_string(identity): - """Dump Identity protobuf into its string representation.""" - result = [] - if identity.proto: - result.append(identity.proto + '://') - if identity.user: - result.append(identity.user + '@') - result.append(identity.host) - if identity.port: - result.append(':' + identity.port) - if identity.path: - result.append(identity.path) - return ''.join(result) - - -def get_address(identity, ecdh=False): - """Compute BIP32 derivation address according to SLIP-0013/0017.""" - index = struct.pack('.*)://)?', + r'(?:(?P.*)@)?', + r'(?P.*?)', + r'(?::(?P\w*))?', + r'(?P/.*)?', + '$' +])) + + +def string_to_identity(s, identity_type): + """Parse string into Identity protobuf.""" + m = _identity_regexp.match(s) + result = m.groupdict() + log.debug('parsed identity: %s', result) + kwargs = {k: v for k, v in result.items() if v} + return identity_type(**kwargs) + + +def identity_to_string(identity): + """Dump Identity protobuf into its string representation.""" + result = [] + if identity.proto: + result.append(identity.proto + '://') + if identity.user: + result.append(identity.user + '@') + result.append(identity.host) + if identity.port: + result.append(':' + identity.port) + if identity.path: + result.append(identity.path) + return ''.join(result) + + +def get_bip32_address(identity, ecdh=False): + """Compute BIP32 derivation address according to SLIP-0013/0017.""" + index = struct.pack(' Date: Thu, 11 Aug 2016 22:31:24 +0300 Subject: [PATCH 6/9] factory: fix pep8 and pylint warnings --- trezor_agent/factory.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/trezor_agent/factory.py b/trezor_agent/factory.py index 16a4a32..336003e 100644 --- a/trezor_agent/factory.py +++ b/trezor_agent/factory.py @@ -5,6 +5,8 @@ import logging import semver +from . import util + log = logging.getLogger(__name__) ClientWrapper = collections.namedtuple( @@ -76,8 +78,10 @@ def _load_keepkey(): except ImportError: log.exception('Missing module: install via "pip install keepkey"') + def _load_ledger(): import struct + class LedgerClientConnection(object): def __init__(self, dongle): self.dongle = dongle @@ -93,7 +97,7 @@ def _load_ledger(): def convert_public_key(ecdsa_curve_name, result): from trezorlib.messages_pb2 import PublicKey if ecdsa_curve_name == "nist256p1": - if (result[64] & 1) <> 0: + if (result[64] & 1) != 0: result = bytearray([0x03]) + result[1:33] else: result = bytearray([0x02]) + result[1:33] @@ -101,14 +105,13 @@ def _load_ledger(): result = result[1:] keyX = bytearray(result[0:32]) keyY = bytearray(result[32:][::-1]) - if (keyX[31] & 1) <> 0: + if (keyX[31] & 1) != 0: keyY[31] |= 0x80 result = chr(0) + str(keyY) publicKey = PublicKey() publicKey.node.public_key = str(result) return publicKey - # pylint: disable=unused-argument def get_public_node(self, n, ecdsa_curve_name="secp256k1", show_display=False): donglePath = LedgerClientConnection.expand_path(n) @@ -126,9 +129,8 @@ def _load_ledger(): # pylint: disable=too-many-locals def sign_identity(self, identity, challenge_hidden, challenge_visual, ecdsa_curve_name="secp256k1"): - from trezor_agent import client from trezorlib.messages_pb2 import SignedIdentity - n = client.get_address(identity) + n = util.get_bip32_address(identity) donglePath = LedgerClientConnection.expand_path(n) if identity.proto == 'ssh': ins = "04" @@ -149,12 +151,12 @@ def _load_ledger(): if ecdsa_curve_name == "nist256p1": offset = 3 length = result[offset] - r = result[offset + 1 : offset + 1 + length] + r = result[offset+1:offset+1+length] if r[0] == 0: r = r[1:] offset = offset + 1 + length + 1 length = result[offset] - s = result[offset + 1 : offset + 1 + length] + s = result[offset+1:offset+1+length] if s[0] == 0: s = s[1:] offset = offset + 1 + length @@ -175,9 +177,8 @@ def _load_ledger(): return signature def get_ecdh_session_key(self, identity, peer_public_key, ecdsa_curve_name="secp256k1"): - from trezor_agent import client from trezorlib.messages_pb2 import ECDHSessionKey - n = client.get_address(identity, True) + n = util.get_bip32_address(identity, True) donglePath = LedgerClientConnection.expand_path(n) if ecdsa_curve_name == "nist256p1": p2 = "01" From 4897b70888ac2c03790a813d7ead88e401e4f70b Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Thu, 11 Aug 2016 22:38:12 +0300 Subject: [PATCH 7/9] factory: fix pylint import-error warnings --- trezor_agent/factory.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/trezor_agent/factory.py b/trezor_agent/factory.py index 336003e..e876a18 100644 --- a/trezor_agent/factory.py +++ b/trezor_agent/factory.py @@ -95,7 +95,7 @@ def _load_ledger(): @staticmethod def convert_public_key(ecdsa_curve_name, result): - from trezorlib.messages_pb2 import PublicKey + from trezorlib.messages_pb2 import PublicKey # pylint: disable=import-error if ecdsa_curve_name == "nist256p1": if (result[64] & 1) != 0: result = bytearray([0x03]) + result[1:33] @@ -129,7 +129,7 @@ def _load_ledger(): # pylint: disable=too-many-locals def sign_identity(self, identity, challenge_hidden, challenge_visual, ecdsa_curve_name="secp256k1"): - from trezorlib.messages_pb2 import SignedIdentity + from trezorlib.messages_pb2 import SignedIdentity # pylint: disable=import-error n = util.get_bip32_address(identity) donglePath = LedgerClientConnection.expand_path(n) if identity.proto == 'ssh': @@ -177,7 +177,7 @@ def _load_ledger(): return signature def get_ecdh_session_key(self, identity, peer_public_key, ecdsa_curve_name="secp256k1"): - from trezorlib.messages_pb2 import ECDHSessionKey + from trezorlib.messages_pb2 import ECDHSessionKey # pylint: disable=import-error n = util.get_bip32_address(identity, True) donglePath = LedgerClientConnection.expand_path(n) if ecdsa_curve_name == "nist256p1": @@ -216,7 +216,7 @@ def _load_ledger(): log.exception('Missing module: install via "pip install ledgerblue"') # pylint: disable=bare-except try: - from trezorlib.types_pb2 import IdentityType + from trezorlib.types_pb2 import IdentityType # pylint: disable=import-error dongle = getDongle(True) except: return From 030ae4c3f625ff2822686f16f6100ded555eb2cf Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Sat, 13 Aug 2016 10:06:52 +0300 Subject: [PATCH 8/9] gpg: include unsupport hash algorithm ID in exception message --- trezor_agent/gpg/agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trezor_agent/gpg/agent.py b/trezor_agent/gpg/agent.py index 63cac3b..cd6f8be 100644 --- a/trezor_agent/gpg/agent.py +++ b/trezor_agent/gpg/agent.py @@ -39,7 +39,7 @@ def sig_encode(r, s): def pksign(keygrip, digest, algo): """Sign a message digest using a private EC key.""" - assert algo == '8' + assert algo == '8', 'Unsupported hash algorithm ID {}'.format(algo) user_id = os.environ['TREZOR_GPG_USER_ID'] pubkey_dict = decode.load_public_key( pubkey_bytes=keyring.export_public_key(user_id=user_id), From 459b882b89d86f0ccc0d643b4e27e853de218a83 Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Wed, 14 Sep 2016 23:07:27 +0300 Subject: [PATCH 9/9] ledger: don't use debug=True --- trezor_agent/factory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trezor_agent/factory.py b/trezor_agent/factory.py index b19f508..c7d770e 100644 --- a/trezor_agent/factory.py +++ b/trezor_agent/factory.py @@ -217,7 +217,7 @@ def _load_ledger(): # pylint: disable=bare-except try: from trezorlib.types_pb2 import IdentityType # pylint: disable=import-error - dongle = getDongle(True) + dongle = getDongle() except: return yield ClientWrapper(connection=LedgerClientConnection(dongle),