mirror of
https://github.com/romanz/amodem.git
synced 2026-02-06 08:38:06 +08:00
gpg: rename main API
This commit is contained in:
@@ -56,11 +56,6 @@ class AgentSigner(object):
|
||||
"""Connect to the agent and retrieve required public key."""
|
||||
self.sock = agent.connect()
|
||||
self.keygrip = agent.get_keygrip(user_id)
|
||||
self.public_key = decode.load_from_gpg(user_id)
|
||||
|
||||
def pubkey(self):
|
||||
"""Return public key as VerifyingKey object."""
|
||||
return self.public_key['verifying_key']
|
||||
|
||||
def sign(self, digest):
|
||||
"""Sign the digest and return an ECDSA signature."""
|
||||
@@ -77,7 +72,7 @@ def _time_format(t):
|
||||
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(t))
|
||||
|
||||
|
||||
class Signer(object):
|
||||
class Factory(object):
|
||||
"""Performs GPG signing operations."""
|
||||
|
||||
def __init__(self, user_id, created, curve_name):
|
||||
@@ -102,9 +97,9 @@ class Signer(object):
|
||||
`pubkey` should be loaded via `decode.load_from_gpg(user_id)`
|
||||
from the local GPG keyring.
|
||||
"""
|
||||
s = Signer(user_id=user_id,
|
||||
created=pubkey['created'],
|
||||
curve_name=proto.find_curve_by_algo_id(pubkey['algo']))
|
||||
s = cls(user_id=user_id,
|
||||
created=pubkey['created'],
|
||||
curve_name=proto.find_curve_by_algo_id(pubkey['algo']))
|
||||
assert s.pubkey.key_id() == pubkey['key_id']
|
||||
return s
|
||||
|
||||
@@ -112,8 +107,8 @@ class Signer(object):
|
||||
"""Close connection and turn off the screen of the device."""
|
||||
self.conn.close()
|
||||
|
||||
def export(self):
|
||||
"""Export GPG public key, ready for "gpg2 --import"."""
|
||||
def create_primary(self):
|
||||
"""Export new primary GPG public key, ready for "gpg2 --import"."""
|
||||
pubkey_packet = proto.packet(tag=6, blob=self.pubkey.data())
|
||||
user_id_packet = proto.packet(tag=13, blob=self.user_id)
|
||||
|
||||
@@ -137,7 +132,7 @@ class Signer(object):
|
||||
proto.subpacket(16, self.pubkey.key_id()), # issuer key id
|
||||
proto.CUSTOM_SUBPACKET]
|
||||
|
||||
signature = _make_signature(
|
||||
signature = proto.make_signature(
|
||||
signer_func=self.conn.sign,
|
||||
public_algo=self.pubkey.algo_id,
|
||||
data_to_sign=data_to_sign,
|
||||
@@ -148,8 +143,8 @@ class Signer(object):
|
||||
sign_packet = proto.packet(tag=2, blob=signature)
|
||||
return pubkey_packet + user_id_packet + sign_packet
|
||||
|
||||
def subkey(self):
|
||||
"""Export a subkey to `self.user_id` GPG primary key."""
|
||||
def create_subkey(self):
|
||||
"""Export new subkey to `self.user_id` GPG primary key."""
|
||||
subkey_packet = proto.packet(tag=14, blob=self.pubkey.data())
|
||||
primary = decode.load_from_gpg(self.user_id)
|
||||
log.info('adding subkey to primary GPG key "%s" (%s)',
|
||||
@@ -162,12 +157,13 @@ class Signer(object):
|
||||
unhashed_subpackets = [
|
||||
proto.subpacket(16, self.pubkey.key_id())] # issuer key id
|
||||
log.info('confirm signing subkey with hardware device')
|
||||
embedded_sig = _make_signature(signer_func=self.conn.sign,
|
||||
data_to_sign=data_to_sign,
|
||||
public_algo=self.pubkey.algo_id,
|
||||
sig_type=0x19,
|
||||
hashed_subpackets=hashed_subpackets,
|
||||
unhashed_subpackets=unhashed_subpackets)
|
||||
embedded_sig = proto.make_signature(
|
||||
signer_func=self.conn.sign,
|
||||
data_to_sign=data_to_sign,
|
||||
public_algo=self.pubkey.algo_id,
|
||||
sig_type=0x19,
|
||||
hashed_subpackets=hashed_subpackets,
|
||||
unhashed_subpackets=unhashed_subpackets)
|
||||
|
||||
# Subkey Binding Signature
|
||||
hashed_subpackets = [
|
||||
@@ -179,16 +175,17 @@ class Signer(object):
|
||||
proto.CUSTOM_SUBPACKET]
|
||||
log.info('confirm signing subkey with gpg-agent')
|
||||
gpg_agent = AgentSigner(self.user_id)
|
||||
signature = _make_signature(signer_func=gpg_agent.sign,
|
||||
data_to_sign=data_to_sign,
|
||||
public_algo=primary['algo'],
|
||||
sig_type=0x18,
|
||||
hashed_subpackets=hashed_subpackets,
|
||||
unhashed_subpackets=unhashed_subpackets)
|
||||
signature = proto.make_signature(
|
||||
signer_func=gpg_agent.sign,
|
||||
data_to_sign=data_to_sign,
|
||||
public_algo=primary['algo'],
|
||||
sig_type=0x18,
|
||||
hashed_subpackets=hashed_subpackets,
|
||||
unhashed_subpackets=unhashed_subpackets)
|
||||
sign_packet = proto.packet(tag=2, blob=signature)
|
||||
return subkey_packet + sign_packet
|
||||
|
||||
def sign(self, msg, sign_time=None):
|
||||
def sign_message(self, msg, sign_time=None):
|
||||
"""Sign GPG message at specified time."""
|
||||
if sign_time is None:
|
||||
sign_time = int(time.time())
|
||||
@@ -199,32 +196,10 @@ class Signer(object):
|
||||
unhashed_subpackets = [
|
||||
proto.subpacket(16, self.pubkey.key_id())] # issuer key id
|
||||
|
||||
blob = _make_signature(signer_func=self.conn.sign,
|
||||
data_to_sign=msg,
|
||||
public_algo=self.pubkey.algo_id,
|
||||
hashed_subpackets=hashed_subpackets,
|
||||
unhashed_subpackets=unhashed_subpackets)
|
||||
blob = proto.make_signature(
|
||||
signer_func=self.conn.sign,
|
||||
data_to_sign=msg,
|
||||
public_algo=self.pubkey.algo_id,
|
||||
hashed_subpackets=hashed_subpackets,
|
||||
unhashed_subpackets=unhashed_subpackets)
|
||||
return proto.packet(tag=2, blob=blob)
|
||||
|
||||
|
||||
def _make_signature(signer_func, data_to_sign, public_algo,
|
||||
hashed_subpackets, unhashed_subpackets, sig_type=0):
|
||||
# pylint: disable=too-many-arguments
|
||||
header = struct.pack('>BBBB',
|
||||
4, # version
|
||||
sig_type, # rfc4880 (section-5.2.1)
|
||||
public_algo,
|
||||
8) # hash_alg (SHA256)
|
||||
hashed = proto.subpackets(*hashed_subpackets)
|
||||
unhashed = proto.subpackets(*unhashed_subpackets)
|
||||
tail = b'\x04\xff' + struct.pack('>L', len(header) + len(hashed))
|
||||
data_to_hash = data_to_sign + header + hashed + tail
|
||||
|
||||
log.debug('hashing %d bytes', len(data_to_hash))
|
||||
digest = hashlib.sha256(data_to_hash).digest()
|
||||
log.info('signing digest: %s', util.hexlify(digest))
|
||||
sig = signer_func(digest=digest)
|
||||
|
||||
return bytes(header + hashed + unhashed +
|
||||
digest[:2] + # used for decoder's sanity check
|
||||
sig) # actual ECDSA signature
|
||||
|
||||
@@ -154,3 +154,27 @@ def armor(blob, type_str):
|
||||
checksum = base64.b64encode(util.crc24(blob))
|
||||
tail = '-----END PGP {}-----\n'.format(type_str)
|
||||
return head + _split_lines(body, 64) + '=' + checksum + '\n' + tail
|
||||
|
||||
|
||||
def make_signature(signer_func, data_to_sign, public_algo,
|
||||
hashed_subpackets, unhashed_subpackets, sig_type=0):
|
||||
"""Create new GPG signature."""
|
||||
# pylint: disable=too-many-arguments
|
||||
header = struct.pack('>BBBB',
|
||||
4, # version
|
||||
sig_type, # rfc4880 (section-5.2.1)
|
||||
public_algo,
|
||||
8) # hash_alg (SHA256)
|
||||
hashed = subpackets(*hashed_subpackets)
|
||||
unhashed = subpackets(*unhashed_subpackets)
|
||||
tail = b'\x04\xff' + struct.pack('>L', len(header) + len(hashed))
|
||||
data_to_hash = data_to_sign + header + hashed + tail
|
||||
|
||||
log.debug('hashing %d bytes', len(data_to_hash))
|
||||
digest = hashlib.sha256(data_to_hash).digest()
|
||||
log.info('signing digest: %s', util.hexlify(digest))
|
||||
sig = signer_func(digest=digest)
|
||||
|
||||
return bytes(header + hashed + unhashed +
|
||||
digest[:2] + # used for decoder's sanity check
|
||||
sig) # actual ECDSA signature
|
||||
|
||||
@@ -16,16 +16,16 @@ log = logging.getLogger(__name__)
|
||||
def run_create(args):
|
||||
"""Generate a new pubkey for a new/existing GPG identity."""
|
||||
user_id = os.environ['TREZOR_GPG_USER_ID']
|
||||
s = encode.Signer(user_id=user_id, created=args.time,
|
||||
f = encode.Factory(user_id=user_id, created=args.time,
|
||||
curve_name=args.ecdsa_curve)
|
||||
|
||||
with contextlib.closing(s):
|
||||
with contextlib.closing(f):
|
||||
if args.subkey:
|
||||
subkey = s.subkey()
|
||||
subkey = f.create_subkey()
|
||||
primary = sp.check_output(['gpg2', '--export', user_id])
|
||||
result = primary + subkey
|
||||
else:
|
||||
result = s.export()
|
||||
result = f.create_primary()
|
||||
|
||||
sys.stdout.write(proto.armor(result, 'PUBLIC KEY BLOCK'))
|
||||
|
||||
@@ -33,13 +33,13 @@ def run_create(args):
|
||||
def run_sign(args):
|
||||
"""Generate a GPG signature using hardware-based device."""
|
||||
pubkey = decode.load_from_gpg(user_id=None, use_custom=True)
|
||||
s = encode.Signer.from_public_key(pubkey=pubkey, user_id=pubkey['user_id'])
|
||||
with contextlib.closing(s):
|
||||
f = encode.Factory.from_public_key(pubkey=pubkey, user_id=pubkey['user_id'])
|
||||
with contextlib.closing(f):
|
||||
if args.filename:
|
||||
data = open(args.filename, 'rb').read()
|
||||
else:
|
||||
data = sys.stdin.read()
|
||||
sig = s.sign(data)
|
||||
sig = f.sign_message(data)
|
||||
|
||||
sig = proto.armor(sig, 'SIGNATURE')
|
||||
decode.verify(pubkey=pubkey, signature=sig, original_data=data)
|
||||
|
||||
Reference in New Issue
Block a user