mirror of
https://github.com/romanz/amodem.git
synced 2026-02-07 01:18:02 +08:00
device: allow loading identities from a file (instead of argument)
This commit is contained in:
@@ -16,11 +16,11 @@ DEVICE_TYPES = [
|
||||
]
|
||||
|
||||
|
||||
def detect(identity_str, curve_name):
|
||||
def detect():
|
||||
"""Detect the first available device and return it to the user."""
|
||||
for device_type in DEVICE_TYPES:
|
||||
try:
|
||||
with device_type(identity_str, curve_name) as d:
|
||||
with device_type() as d:
|
||||
return d
|
||||
except interface.NotFoundError as e:
|
||||
log.debug('device not found: %s', e)
|
||||
|
||||
@@ -45,20 +45,6 @@ def identity_to_string(identity_dict):
|
||||
return ''.join(result)
|
||||
|
||||
|
||||
def get_bip32_address(identity_dict, ecdh=False):
|
||||
"""Compute BIP32 derivation address according to SLIP-0013/0017."""
|
||||
index = struct.pack('<L', identity_dict.get('index', 0))
|
||||
addr = index + identity_to_string(identity_dict).encode('ascii')
|
||||
log.debug('bip32 address string: %r', addr)
|
||||
digest = hashlib.sha256(addr).digest()
|
||||
s = io.BytesIO(bytearray(digest))
|
||||
|
||||
hardened = 0x80000000
|
||||
addr_0 = [13, 17][bool(ecdh)]
|
||||
address_n = [addr_0] + list(util.recv(s, '<LLLL'))
|
||||
return [(hardened | value) for value in address_n]
|
||||
|
||||
|
||||
class Error(Exception):
|
||||
"""Device-related error."""
|
||||
|
||||
@@ -71,18 +57,49 @@ class DeviceError(Error):
|
||||
""""Error during device operation."""
|
||||
|
||||
|
||||
class Device(object):
|
||||
"""Abstract cryptographic hardware device interface."""
|
||||
class Identity(object):
|
||||
"""Represent SLIP-0013 identity, together with a elliptic curve choice."""
|
||||
|
||||
def __init__(self, identity_str, curve_name):
|
||||
"""Configure for specific identity and elliptic curve usage."""
|
||||
self.identity_dict = string_to_identity(identity_str)
|
||||
self.curve_name = curve_name
|
||||
self.conn = None
|
||||
|
||||
def identity_str(self):
|
||||
def items(self):
|
||||
"""Return a copy of identity_dict items."""
|
||||
return self.identity_dict.items()
|
||||
|
||||
def __str__(self):
|
||||
"""Return identity serialized to string."""
|
||||
return identity_to_string(self.identity_dict)
|
||||
return '<{}|{}>'.format(identity_to_string(self.identity_dict), self.curve_name)
|
||||
|
||||
def get_bip32_address(self, ecdh=False):
|
||||
"""Compute BIP32 derivation address according to SLIP-0013/0017."""
|
||||
index = struct.pack('<L', self.identity_dict.get('index', 0))
|
||||
addr = index + identity_to_string(self.identity_dict).encode('ascii')
|
||||
log.debug('bip32 address string: %r', addr)
|
||||
digest = hashlib.sha256(addr).digest()
|
||||
s = io.BytesIO(bytearray(digest))
|
||||
|
||||
hardened = 0x80000000
|
||||
addr_0 = [13, 17][bool(ecdh)]
|
||||
address_n = [addr_0] + list(util.recv(s, '<LLLL'))
|
||||
return [(hardened | value) for value in address_n]
|
||||
|
||||
def get_curve_name(self, ecdh=False):
|
||||
"""Return correct curve name for device operations."""
|
||||
if ecdh:
|
||||
return formats.get_ecdh_curve_name(self.curve_name)
|
||||
else:
|
||||
return self.curve_name
|
||||
|
||||
|
||||
class Device(object):
|
||||
"""Abstract cryptographic hardware device interface."""
|
||||
|
||||
def __init__(self):
|
||||
"""C-tor."""
|
||||
self.conn = None
|
||||
|
||||
def connect(self):
|
||||
"""Connect to device, otherwise raise NotFoundError."""
|
||||
@@ -101,25 +118,18 @@ class Device(object):
|
||||
log.exception('close failed: %s', e)
|
||||
self.conn = None
|
||||
|
||||
def pubkey(self, ecdh=False):
|
||||
def pubkey(self, identity, ecdh=False):
|
||||
"""Get public key (as bytes)."""
|
||||
raise NotImplementedError()
|
||||
|
||||
def sign(self, blob):
|
||||
def sign(self, identity, blob):
|
||||
"""Sign given blob and return the signature (as bytes)."""
|
||||
raise NotImplementedError()
|
||||
|
||||
def ecdh(self, pubkey):
|
||||
def ecdh(self, identity, pubkey):
|
||||
"""Get shared session key using Elliptic Curve Diffie-Hellman."""
|
||||
raise NotImplementedError()
|
||||
|
||||
def __str__(self):
|
||||
"""Human-readable representation."""
|
||||
return '{}'.format(self.__class__.__name__)
|
||||
|
||||
def get_curve_name(self, ecdh=False):
|
||||
"""Return correct curve name for device operations."""
|
||||
if ecdh:
|
||||
return formats.get_ecdh_curve_name(self.curve_name)
|
||||
else:
|
||||
return self.curve_name
|
||||
|
||||
@@ -11,15 +11,7 @@ class KeepKey(trezor.Trezor):
|
||||
|
||||
required_version = '>=1.0.4'
|
||||
|
||||
def connect(self):
|
||||
"""No support for other than NIST256P elliptic curves."""
|
||||
if self.curve_name not in {formats.CURVE_NIST256}:
|
||||
fmt = 'KeepKey does not support {} curve'
|
||||
raise interface.NotFoundError(fmt.format(self.curve_name))
|
||||
|
||||
return trezor.Trezor.connect(self)
|
||||
|
||||
def ecdh(self, pubkey):
|
||||
def ecdh(self, identity, pubkey):
|
||||
"""No support for ECDH in KeepKey firmware."""
|
||||
msg = 'KeepKey does not support ECDH'
|
||||
raise interface.NotFoundError(msg)
|
||||
|
||||
@@ -48,33 +48,31 @@ class Trezor(interface.Device):
|
||||
"""Close connection."""
|
||||
self.conn.close()
|
||||
|
||||
def pubkey(self, ecdh=False):
|
||||
def pubkey(self, identity, ecdh=False):
|
||||
"""Return public key."""
|
||||
curve_name = self.get_curve_name(ecdh=ecdh)
|
||||
curve_name = identity.get_curve_name(ecdh=ecdh)
|
||||
log.debug('"%s" getting public key (%s) from %s',
|
||||
interface.identity_to_string(self.identity_dict),
|
||||
curve_name, self)
|
||||
addr = interface.get_bip32_address(self.identity_dict, ecdh=ecdh)
|
||||
identity, curve_name, self)
|
||||
addr = identity.get_bip32_address(ecdh=ecdh)
|
||||
result = self.conn.get_public_node(n=addr,
|
||||
ecdsa_curve_name=curve_name)
|
||||
log.debug('result: %s', result)
|
||||
return result.node.public_key
|
||||
|
||||
def _identity_proto(self):
|
||||
def _identity_proto(self, identity):
|
||||
result = self.defs.IdentityType()
|
||||
for name, value in self.identity_dict.items():
|
||||
for name, value in identity.items():
|
||||
setattr(result, name, value)
|
||||
return result
|
||||
|
||||
def sign(self, blob):
|
||||
def sign(self, identity, blob):
|
||||
"""Sign given blob and return the signature (as bytes)."""
|
||||
curve_name = self.get_curve_name(ecdh=False)
|
||||
curve_name = identity.get_curve_name(ecdh=False)
|
||||
log.debug('"%s" signing %r (%s) on %s',
|
||||
interface.identity_to_string(self.identity_dict), blob,
|
||||
curve_name, self)
|
||||
identity, blob, curve_name, self)
|
||||
try:
|
||||
result = self.conn.sign_identity(
|
||||
identity=self._identity_proto(),
|
||||
identity=self._identity_proto(identity),
|
||||
challenge_hidden=blob,
|
||||
challenge_visual='',
|
||||
ecdsa_curve_name=curve_name)
|
||||
@@ -87,15 +85,14 @@ class Trezor(interface.Device):
|
||||
log.debug(msg, exc_info=True)
|
||||
raise interface.DeviceError(msg)
|
||||
|
||||
def ecdh(self, pubkey):
|
||||
def ecdh(self, identity, pubkey):
|
||||
"""Get shared session key using Elliptic Curve Diffie-Hellman."""
|
||||
curve_name = self.get_curve_name(ecdh=True)
|
||||
curve_name = identity.get_curve_name(ecdh=True)
|
||||
log.debug('"%s" shared session key (%s) for %r from %s',
|
||||
interface.identity_to_string(self.identity_dict),
|
||||
curve_name, pubkey, self)
|
||||
identity, curve_name, pubkey, self)
|
||||
try:
|
||||
result = self.conn.get_ecdh_session_key(
|
||||
identity=self._identity_proto(),
|
||||
identity=self._identity_proto(identity),
|
||||
peer_public_key=pubkey,
|
||||
ecdsa_curve_name=curve_name)
|
||||
log.debug('result: %s', result)
|
||||
|
||||
Reference in New Issue
Block a user