mirror of
https://github.com/romanz/amodem.git
synced 2026-03-27 13:29:03 +08:00
gpg: move HardwareSigner to device module
This commit is contained in:
@@ -7,7 +7,7 @@ import os
|
||||
import sys
|
||||
import time
|
||||
|
||||
from . import agent, encode, keyring, protocol
|
||||
from . import agent, device, encode, keyring, protocol
|
||||
from .. import server
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@@ -19,7 +19,7 @@ def run_create(args):
|
||||
log.warning('NOTE: in order to re-generate the exact same GPG key later, '
|
||||
'run this command with "--time=%d" commandline flag (to set '
|
||||
'the timestamp of the GPG key manually).', args.time)
|
||||
conn = encode.HardwareSigner(user_id=user_id,
|
||||
conn = device.HardwareSigner(user_id=user_id,
|
||||
curve_name=args.ecdsa_curve)
|
||||
verifying_key = conn.pubkey(ecdh=False)
|
||||
decryption_key = conn.pubkey(ecdh=True)
|
||||
|
||||
50
trezor_agent/gpg/device.py
Normal file
50
trezor_agent/gpg/device.py
Normal file
@@ -0,0 +1,50 @@
|
||||
"""Device abstraction layer for GPG operations."""
|
||||
|
||||
from .. import factory, formats, util
|
||||
|
||||
|
||||
class HardwareSigner(object):
|
||||
"""Sign messages and get public keys from a hardware device."""
|
||||
|
||||
def __init__(self, user_id, curve_name):
|
||||
"""Connect to the device and retrieve required public key."""
|
||||
self.client_wrapper = factory.load()
|
||||
self.identity = self.client_wrapper.identity_type()
|
||||
self.identity.proto = 'gpg'
|
||||
self.identity.host = user_id
|
||||
self.curve_name = curve_name
|
||||
|
||||
def pubkey(self, ecdh=False):
|
||||
"""Return public key as VerifyingKey object."""
|
||||
addr = util.get_bip32_address(identity=self.identity, ecdh=ecdh)
|
||||
public_node = self.client_wrapper.connection.get_public_node(
|
||||
n=addr, ecdsa_curve_name=self.curve_name)
|
||||
|
||||
return formats.decompress_pubkey(
|
||||
pubkey=public_node.node.public_key,
|
||||
curve_name=self.curve_name)
|
||||
|
||||
def sign(self, digest):
|
||||
"""Sign the digest and return a serialized signature."""
|
||||
result = self.client_wrapper.connection.sign_identity(
|
||||
identity=self.identity,
|
||||
challenge_hidden=digest,
|
||||
challenge_visual='',
|
||||
ecdsa_curve_name=self.curve_name)
|
||||
assert result.signature[:1] == b'\x00'
|
||||
sig = result.signature[1:]
|
||||
return (util.bytes2num(sig[:32]), util.bytes2num(sig[32:]))
|
||||
|
||||
def ecdh(self, pubkey):
|
||||
"""Derive shared secret using ECDH from remote public key."""
|
||||
result = self.client_wrapper.connection.get_ecdh_session_key(
|
||||
identity=self.identity,
|
||||
peer_public_key=pubkey,
|
||||
ecdsa_curve_name=self.curve_name)
|
||||
assert len(result.session_key) == 65
|
||||
assert result.session_key[:1] == b'\x04'
|
||||
return result.session_key
|
||||
|
||||
def close(self):
|
||||
"""Close the connection to the device."""
|
||||
self.client_wrapper.connection.close()
|
||||
@@ -2,59 +2,11 @@
|
||||
import logging
|
||||
import time
|
||||
|
||||
from . import decode, keyring, protocol
|
||||
from .. import factory, formats, util
|
||||
from . import decode, device, keyring, protocol
|
||||
from .. import formats, util
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HardwareSigner(object):
|
||||
"""Sign messages and get public keys from a hardware device."""
|
||||
|
||||
def __init__(self, user_id, curve_name):
|
||||
"""Connect to the device and retrieve required public key."""
|
||||
self.client_wrapper = factory.load()
|
||||
self.identity = self.client_wrapper.identity_type()
|
||||
self.identity.proto = 'gpg'
|
||||
self.identity.host = user_id
|
||||
self.curve_name = curve_name
|
||||
|
||||
def pubkey(self, ecdh=False):
|
||||
"""Return public key as VerifyingKey object."""
|
||||
addr = util.get_bip32_address(identity=self.identity, ecdh=ecdh)
|
||||
public_node = self.client_wrapper.connection.get_public_node(
|
||||
n=addr, ecdsa_curve_name=self.curve_name)
|
||||
|
||||
return formats.decompress_pubkey(
|
||||
pubkey=public_node.node.public_key,
|
||||
curve_name=self.curve_name)
|
||||
|
||||
def sign(self, digest):
|
||||
"""Sign the digest and return a serialized signature."""
|
||||
result = self.client_wrapper.connection.sign_identity(
|
||||
identity=self.identity,
|
||||
challenge_hidden=digest,
|
||||
challenge_visual='',
|
||||
ecdsa_curve_name=self.curve_name)
|
||||
assert result.signature[:1] == b'\x00'
|
||||
sig = result.signature[1:]
|
||||
return (util.bytes2num(sig[:32]), util.bytes2num(sig[32:]))
|
||||
|
||||
def ecdh(self, pubkey):
|
||||
"""Derive shared secret using ECDH from remote public key."""
|
||||
result = self.client_wrapper.connection.get_ecdh_session_key(
|
||||
identity=self.identity,
|
||||
peer_public_key=pubkey,
|
||||
ecdsa_curve_name=self.curve_name)
|
||||
assert len(result.session_key) == 65
|
||||
assert result.session_key[:1] == b'\x04'
|
||||
return result.session_key
|
||||
|
||||
def close(self):
|
||||
"""Close the connection to the device."""
|
||||
self.client_wrapper.connection.close()
|
||||
|
||||
|
||||
def _time_format(t):
|
||||
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(t))
|
||||
|
||||
@@ -163,7 +115,7 @@ def load_from_public_key(pubkey_dict):
|
||||
assert curve_name in formats.SUPPORTED_CURVES
|
||||
ecdh = (pubkey_dict['algo'] == protocol.ECDH_ALGO_ID)
|
||||
|
||||
conn = HardwareSigner(user_id, curve_name=curve_name)
|
||||
conn = device.HardwareSigner(user_id, curve_name=curve_name)
|
||||
pubkey = protocol.PublicKey(
|
||||
curve_name=curve_name, created=created,
|
||||
verifying_key=conn.pubkey(ecdh=ecdh), ecdh=ecdh)
|
||||
|
||||
Reference in New Issue
Block a user