mirror of
https://github.com/romanz/amodem.git
synced 2026-03-30 23:57:54 +08:00
gpg: refactor signing providers from actual Signer class
This commit is contained in:
@@ -5,6 +5,7 @@ import logging
|
||||
import struct
|
||||
import time
|
||||
|
||||
from . import agent, decode
|
||||
from .. import client, factory, formats, util
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@@ -90,30 +91,72 @@ def _find_curve_by_algo_id(algo_id):
|
||||
return curve_name
|
||||
|
||||
|
||||
class HardwareSigner(object):
|
||||
def __init__(self, user_id, curve_name):
|
||||
self.client_wrapper = factory.load()
|
||||
self.identity = self.client_wrapper.identity_type()
|
||||
self.identity.proto = 'gpg'
|
||||
self.identity.host = user_id
|
||||
self.curve_name = curve_name
|
||||
|
||||
def pubkey(self):
|
||||
addr = client.get_address(self.identity)
|
||||
public_node = self.client_wrapper.connection.get_public_node(
|
||||
n=addr, ecdsa_curve_name=self.curve_name)
|
||||
|
||||
return formats.decompress_pubkey(
|
||||
pubkey=public_node.node.public_key,
|
||||
curve_name=self.curve_name)
|
||||
|
||||
def sign(self, digest, visual):
|
||||
result = self.client_wrapper.connection.sign_identity(
|
||||
identity=self.identity,
|
||||
challenge_hidden=digest,
|
||||
challenge_visual=visual,
|
||||
ecdsa_curve_name=self.curve_name)
|
||||
assert result.signature[:1] == b'\x00'
|
||||
sig = result.signature[1:]
|
||||
return mpi(util.bytes2num(sig[:32])) + mpi(util.bytes2num(sig[32:]))
|
||||
|
||||
def close(self):
|
||||
self.client_wrapper.connection.clear_session()
|
||||
self.client_wrapper.connection.close()
|
||||
|
||||
|
||||
class AgentSigner(object):
|
||||
def __init__(self, user_id, curve_name):
|
||||
self.sock = agent.connect()
|
||||
assert curve_name == formats.CURVE_NIST256
|
||||
self.curve_name = curve_name
|
||||
self.keygrip = agent.get_keygrip(user_id)
|
||||
self.public_key = decode.load_from_gpg(user_id)
|
||||
|
||||
def pubkey(self):
|
||||
return self.public_key['verifying_key']
|
||||
|
||||
def sign(self, digest, visual):
|
||||
r, s = agent.sign(sock=self.sock, keygrip=self.keygrip, digest=digest)
|
||||
return mpi(r) + mpi(s)
|
||||
|
||||
def close(self):
|
||||
self.sock.close()
|
||||
|
||||
|
||||
class Signer(object):
|
||||
"""Performs GPG operations with the TREZOR."""
|
||||
"""Performs GPG signing operations."""
|
||||
|
||||
SignerType = AgentSigner
|
||||
|
||||
def __init__(self, user_id, created, curve_name):
|
||||
"""Construct and loads a public key from the device."""
|
||||
self.user_id = user_id
|
||||
assert curve_name in formats.SUPPORTED_CURVES
|
||||
self.curve_name = curve_name
|
||||
self.client_wrapper = factory.load()
|
||||
|
||||
self.identity = self.client_wrapper.identity_type()
|
||||
self.identity.proto = 'gpg'
|
||||
self.identity.host = user_id
|
||||
|
||||
addr = client.get_address(self.identity)
|
||||
public_node = self.client_wrapper.connection.get_public_node(
|
||||
n=addr, ecdsa_curve_name=self.curve_name)
|
||||
|
||||
self.verifying_key = formats.decompress_pubkey(
|
||||
pubkey=public_node.node.public_key,
|
||||
curve_name=self.curve_name)
|
||||
self.conn = self.SignerType(user_id, curve_name=curve_name)
|
||||
self.verifying_key = self.conn.pubkey()
|
||||
|
||||
self.created = int(created)
|
||||
log.info('%s GPG public key %s created at %s', self.curve_name,
|
||||
log.info('%s GPG public key %s created at %s', curve_name,
|
||||
self.hex_short_key_id(), util.time_format(self.created))
|
||||
|
||||
@classmethod
|
||||
@@ -131,7 +174,7 @@ class Signer(object):
|
||||
return s
|
||||
|
||||
def _pubkey_data(self):
|
||||
curve_info = SUPPORTED_CURVES[self.curve_name]
|
||||
curve_info = SUPPORTED_CURVES[self.conn.curve_name]
|
||||
header = struct.pack('>BLB',
|
||||
4, # version
|
||||
self.created, # creation
|
||||
@@ -156,8 +199,7 @@ class Signer(object):
|
||||
|
||||
def close(self):
|
||||
"""Close connection and turn off the screen of the device."""
|
||||
self.client_wrapper.connection.clear_session()
|
||||
self.client_wrapper.connection.close()
|
||||
self.conn.close()
|
||||
|
||||
def export(self):
|
||||
"""Export GPG public key, ready for "gpg2 --import"."""
|
||||
@@ -182,6 +224,11 @@ class Signer(object):
|
||||
sign_packet = packet(tag=2, blob=signature)
|
||||
return pubkey_packet + user_id_packet + sign_packet
|
||||
|
||||
def subkey(self, user_id):
|
||||
primary = decode.load_from_gpg(user_id)
|
||||
keygrip = agent.get_keygrip(user_id)
|
||||
log.info('adding as subkey to %s (%s)', user_id, keygrip)
|
||||
|
||||
def sign(self, msg, sign_time=None):
|
||||
"""Sign GPG message at specified time."""
|
||||
if sign_time is None:
|
||||
@@ -197,7 +244,7 @@ class Signer(object):
|
||||
|
||||
def _make_signature(self, visual, data_to_sign,
|
||||
hashed_subpackets, sig_type=0):
|
||||
curve_info = SUPPORTED_CURVES[self.curve_name]
|
||||
curve_info = SUPPORTED_CURVES[self.conn.curve_name]
|
||||
header = struct.pack('>BBBB',
|
||||
4, # version
|
||||
sig_type, # rfc4880 (section-5.2.1)
|
||||
@@ -213,14 +260,7 @@ class Signer(object):
|
||||
log.debug('hashing %d bytes', len(data_to_hash))
|
||||
digest = hashlib.sha256(data_to_hash).digest()
|
||||
|
||||
result = self.client_wrapper.connection.sign_identity(
|
||||
identity=self.identity,
|
||||
challenge_hidden=digest,
|
||||
challenge_visual=visual,
|
||||
ecdsa_curve_name=self.curve_name)
|
||||
assert result.signature[:1] == b'\x00'
|
||||
sig = result.signature[1:]
|
||||
sig = mpi(util.bytes2num(sig[:32])) + mpi(util.bytes2num(sig[32:]))
|
||||
sig = self.conn.sign(digest=digest, visual=visual)
|
||||
|
||||
return (header + hashed + unhashed +
|
||||
digest[:2] + # used for decoder's sanity check
|
||||
|
||||
Reference in New Issue
Block a user