From 8fe16d24c2ae8ce7e1e6a8f7aa75094ed2a9a0ba Mon Sep 17 00:00:00 2001 From: BTChip Date: Sun, 31 Jul 2016 10:00:46 +0200 Subject: [PATCH 1/2] Ledger integration --- trezor_agent/factory.py | 137 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 136 insertions(+), 1 deletion(-) diff --git a/trezor_agent/factory.py b/trezor_agent/factory.py index 43536b8..422db2c 100644 --- a/trezor_agent/factory.py +++ b/trezor_agent/factory.py @@ -76,10 +76,145 @@ def _load_keepkey(): except ImportError: log.exception('Missing module: install via "pip install keepkey"') +def _load_ledger(): + import struct + class LedgerClientConnection(object): + def __init__(self, dongle): + self.dongle = dongle + + def expand_path(self, path): + result = "" + for pathElement in path: + result = result + struct.pack(">I", pathElement) + return result + + def get_public_node(self, n, ecdsa_curve_name="secp256k1", show_display=False): + from trezorlib.messages_pb2 import PublicKey + donglePath = self.expand_path(n) + if ecdsa_curve_name == "nist256p1": + p2 = "01" + else: + p2 = "02" + apdu = "800200" + p2 + apdu = apdu.decode('hex') + chr(len(donglePath) + 1) + chr(len(donglePath) / 4) + donglePath + result = bytearray(self.dongle.exchange(bytes(apdu)))[1:] + if ecdsa_curve_name == "nist256p1": + if ((result[64] & 1) <> 0): + result = bytearray([0x03]) + result[1:33] + else: + result = bytearray([0x02]) + result[1:33] + else: + #TODO + #result = result[1:] + #keyY = bytearray(result[32:][::-1]) + #keyY = bytearray(result[32:]) + #if ((keyX[31] & 1)<>0): + # keyY[31] |= 0x80 + #result = chr(0) + str(keyY) + result = chr(0) + result[1:33] + publicKey = PublicKey() + publicKey.node.public_key = str(result) + return publicKey + + def sign_identity(self, identity, challenge_hidden, challenge_visual, ecdsa_curve_name="secp256k1"): + from trezor_agent import client + from trezorlib.messages_pb2 import SignedIdentity + import hashlib + n = client.get_address(identity) + donglePath = self.expand_path(n) + if identity.proto == 'ssh': + ins = "04" + p1 = "00" + publicKey = self.get_public_node(n, ecdsa_curve_name) + else: + ins = "08" + p1 = "00" + if ecdsa_curve_name == "nist256p1": + p2 = "01" + else: + p2 = "02" + apdu = "80" + ins + p1 + p2 + apdu = apdu.decode('hex') + chr(len(challenge_hidden) + len(donglePath) + 1) + apdu = apdu + chr(len(donglePath) / 4) + donglePath + apdu = apdu + challenge_hidden + result = bytearray(self.dongle.exchange(bytes(apdu))) + if ecdsa_curve_name == "nist256p1": + offset = 3 + rLength = result[offset] + r = result[offset + 1 : offset + 1 + rLength] + if r[0] == 0: + r = r[1:] + offset = offset + 1 + rLength + 1 + sLength = result[offset] + s = result[offset + 1 : offset + 1 + sLength] + if s[0] == 0: + s = s[1:] + signature = SignedIdentity() + signature.signature = chr(0) + str(r) + str(s) + if identity.proto == 'ssh': + signature.public_key = publicKey.node.public_key + return signature + else: + signature = SignedIdentity() + signature.signature = chr(0) + str(result) + if identity.proto == 'ssh': + signature.public_key = publicKey.node.public_key + return signature + pass + + def get_ecdh_session_key(self, identity, peer_public_key, ecdsa_curve_name="secp256k1"): + from trezor_agent import client + from trezorlib.messages_pb2 import ECDHSessionKey + n = client.get_address(identity, True) + donglePath = self.expand_path(n) + if ecdsa_curve_name == "nist256p1": + p2 = "01" + else: + p2 = "02" + apdu = "800a00" + p2 + apdu = apdu.decode('hex') + chr(len(peer_public_key) + len(donglePath) + 1) + apdu = apdu + chr(len(donglePath) / 4) + donglePath + apdu = apdu + peer_public_key + result = bytearray(self.dongle.exchange(bytes(apdu))) + if ecdsa_curve_name == "nist256p1": + sessionKey = ECDHSessionKey() + sessionKey.session_key = str(result) + return sessionKey + pass + + def clear_session(self): + pass + + def close(self): + self.dongle.close() + pass + + def ping(self, msg, button_protection=False, pin_protection=False, passphrase_protection=False): + return msg + + class CallException(Exception): + def __init__(self, code, message): + super(CallException, self).__init__() + self.args = [code, message] + try: + from ledgerblue.comm import getDongle + from ledgerblue.commException import CommException + except ImportError: + log.exception('Missing module: install via "pip install ledgerblue"') + try: + from trezorlib.types_pb2 import IdentityType + dongle = getDongle(True) + except: + return + yield ClientWrapper(connection=LedgerClientConnection(dongle), + identity_type=IdentityType, + device_name="ledger", + call_exception=CallException) LOADERS = [ _load_trezor, - _load_keepkey + _load_keepkey, + _load_ledger ] From bc1d7a54480202b02e59559207c64462571335f0 Mon Sep 17 00:00:00 2001 From: BTChip Date: Wed, 3 Aug 2016 15:17:04 +0200 Subject: [PATCH 2/2] Fix eddsa, SSH optimization with signature + key, cleanup --- trezor_agent/factory.py | 121 ++++++++++++++++++++++------------------ 1 file changed, 67 insertions(+), 54 deletions(-) diff --git a/trezor_agent/factory.py b/trezor_agent/factory.py index 422db2c..16a4a32 100644 --- a/trezor_agent/factory.py +++ b/trezor_agent/factory.py @@ -1,4 +1,4 @@ -"""Thin wrapper around trezor/keepkey libraries.""" +"""Thin wrapper around trezor/keepkey/ledger libraries.""" import binascii import collections import logging @@ -82,114 +82,127 @@ def _load_ledger(): def __init__(self, dongle): self.dongle = dongle - def expand_path(self, path): + @staticmethod + def expand_path(path): result = "" for pathElement in path: result = result + struct.pack(">I", pathElement) return result - def get_public_node(self, n, ecdsa_curve_name="secp256k1", show_display=False): + @staticmethod + def convert_public_key(ecdsa_curve_name, result): from trezorlib.messages_pb2 import PublicKey - donglePath = self.expand_path(n) + if ecdsa_curve_name == "nist256p1": + if (result[64] & 1) <> 0: + result = bytearray([0x03]) + result[1:33] + else: + result = bytearray([0x02]) + result[1:33] + else: + result = result[1:] + keyX = bytearray(result[0:32]) + keyY = bytearray(result[32:][::-1]) + if (keyX[31] & 1) <> 0: + keyY[31] |= 0x80 + result = chr(0) + str(keyY) + publicKey = PublicKey() + publicKey.node.public_key = str(result) + return publicKey + + + # pylint: disable=unused-argument + def get_public_node(self, n, ecdsa_curve_name="secp256k1", show_display=False): + donglePath = LedgerClientConnection.expand_path(n) if ecdsa_curve_name == "nist256p1": p2 = "01" else: p2 = "02" apdu = "800200" + p2 - apdu = apdu.decode('hex') + chr(len(donglePath) + 1) + chr(len(donglePath) / 4) + donglePath + apdu = apdu.decode('hex') + apdu += chr(len(donglePath) + 1) + chr(len(donglePath) / 4) + apdu += donglePath result = bytearray(self.dongle.exchange(bytes(apdu)))[1:] - if ecdsa_curve_name == "nist256p1": - if ((result[64] & 1) <> 0): - result = bytearray([0x03]) + result[1:33] - else: - result = bytearray([0x02]) + result[1:33] - else: - #TODO - #result = result[1:] - #keyY = bytearray(result[32:][::-1]) - #keyY = bytearray(result[32:]) - #if ((keyX[31] & 1)<>0): - # keyY[31] |= 0x80 - #result = chr(0) + str(keyY) - result = chr(0) + result[1:33] - publicKey = PublicKey() - publicKey.node.public_key = str(result) - return publicKey + return LedgerClientConnection.convert_public_key(ecdsa_curve_name, result) - def sign_identity(self, identity, challenge_hidden, challenge_visual, ecdsa_curve_name="secp256k1"): + # pylint: disable=too-many-locals + def sign_identity(self, identity, challenge_hidden, challenge_visual, + ecdsa_curve_name="secp256k1"): from trezor_agent import client from trezorlib.messages_pb2 import SignedIdentity - import hashlib n = client.get_address(identity) - donglePath = self.expand_path(n) + donglePath = LedgerClientConnection.expand_path(n) if identity.proto == 'ssh': ins = "04" p1 = "00" - publicKey = self.get_public_node(n, ecdsa_curve_name) else: ins = "08" - p1 = "00" + p1 = "00" if ecdsa_curve_name == "nist256p1": - p2 = "01" + p2 = "81" if identity.proto == 'ssh' else "01" else: - p2 = "02" + p2 = "82" if identity.proto == 'ssh' else "02" apdu = "80" + ins + p1 + p2 - apdu = apdu.decode('hex') + chr(len(challenge_hidden) + len(donglePath) + 1) - apdu = apdu + chr(len(donglePath) / 4) + donglePath - apdu = apdu + challenge_hidden + apdu = apdu.decode('hex') + apdu += chr(len(challenge_hidden) + len(donglePath) + 1) + apdu += chr(len(donglePath) / 4) + donglePath + apdu += challenge_hidden result = bytearray(self.dongle.exchange(bytes(apdu))) if ecdsa_curve_name == "nist256p1": offset = 3 - rLength = result[offset] - r = result[offset + 1 : offset + 1 + rLength] + length = result[offset] + r = result[offset + 1 : offset + 1 + length] if r[0] == 0: r = r[1:] - offset = offset + 1 + rLength + 1 - sLength = result[offset] - s = result[offset + 1 : offset + 1 + sLength] + offset = offset + 1 + length + 1 + length = result[offset] + s = result[offset + 1 : offset + 1 + length] if s[0] == 0: s = s[1:] + offset = offset + 1 + length signature = SignedIdentity() signature.signature = chr(0) + str(r) + str(s) if identity.proto == 'ssh': - signature.public_key = publicKey.node.public_key + keyData = result[offset:] + pk = LedgerClientConnection.convert_public_key(ecdsa_curve_name, keyData) + signature.public_key = pk.node.public_key return signature else: signature = SignedIdentity() - signature.signature = chr(0) + str(result) + signature.signature = chr(0) + str(result[0:64]) if identity.proto == 'ssh': - signature.public_key = publicKey.node.public_key - return signature - pass + keyData = result[64:] + pk = LedgerClientConnection.convert_public_key(ecdsa_curve_name, keyData) + signature.public_key = pk.node.public_key + return signature def get_ecdh_session_key(self, identity, peer_public_key, ecdsa_curve_name="secp256k1"): from trezor_agent import client from trezorlib.messages_pb2 import ECDHSessionKey n = client.get_address(identity, True) - donglePath = self.expand_path(n) + donglePath = LedgerClientConnection.expand_path(n) if ecdsa_curve_name == "nist256p1": p2 = "01" else: p2 = "02" apdu = "800a00" + p2 - apdu = apdu.decode('hex') + chr(len(peer_public_key) + len(donglePath) + 1) - apdu = apdu + chr(len(donglePath) / 4) + donglePath - apdu = apdu + peer_public_key + apdu = apdu.decode('hex') + apdu += chr(len(peer_public_key) + len(donglePath) + 1) + apdu += chr(len(donglePath) / 4) + donglePath + apdu += peer_public_key result = bytearray(self.dongle.exchange(bytes(apdu))) - if ecdsa_curve_name == "nist256p1": - sessionKey = ECDHSessionKey() - sessionKey.session_key = str(result) - return sessionKey - pass + sessionKey = ECDHSessionKey() + sessionKey.session_key = str(result) + return sessionKey def clear_session(self): pass def close(self): self.dongle.close() - pass - def ping(self, msg, button_protection=False, pin_protection=False, passphrase_protection=False): + # pylint: disable=unused-argument + # pylint: disable=no-self-use + def ping(self, msg, button_protection=False, pin_protection=False, + passphrase_protection=False): return msg class CallException(Exception): @@ -198,9 +211,9 @@ def _load_ledger(): self.args = [code, message] try: from ledgerblue.comm import getDongle - from ledgerblue.commException import CommException except ImportError: log.exception('Missing module: install via "pip install ledgerblue"') + # pylint: disable=bare-except try: from trezorlib.types_pb2 import IdentityType dongle = getDongle(True)