mirror of
https://github.com/romanz/amodem.git
synced 2026-04-21 05:36:42 +08:00
gpg: refactor cli
This commit is contained in:
22
trezor_agent/gpg/README
Normal file
22
trezor_agent/gpg/README
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
# Generate new stand-along GPG identity
|
||||||
|
|
||||||
|
```
|
||||||
|
$ USER_ID="Satoshi Nakamoto <satoshi@nakamoto.bit>"
|
||||||
|
$ trezor-gpg create "${USER_ID}" > identity.pub # create new TREZOR-based GPG identity
|
||||||
|
$ gpg2 --import identity.pub # import into local GPG public keyring
|
||||||
|
$ gpg2 --edit "${USER_ID}" trust # OPTIONAL: mark the key as trusted
|
||||||
|
```
|
||||||
|
|
||||||
|
# Generate new subkey for existing GPG identity
|
||||||
|
```
|
||||||
|
$ USER_ID="Satoshi Nakamoto <satoshi@nakamoto.bit>"
|
||||||
|
$ gpg2 --list-keys "${USER_ID}" # make sure this identity already exists
|
||||||
|
$ trezor-gpg create --subkey "${USER_ID}" > identity.pub # create new TREZOR-based GPG public key
|
||||||
|
$ gpg2 --import identity.pub # append it to existing identity
|
||||||
|
```
|
||||||
|
|
||||||
|
# Generate signatures using the TREZOR device
|
||||||
|
```
|
||||||
|
$ trezor-gpg sign EXAMPLE > EXAMPLE.sig # confirm signature using the device
|
||||||
|
$ gpg2 --verify EXAMPLE.sig # verify using standard GPG binary
|
||||||
|
```
|
||||||
@@ -1,51 +0,0 @@
|
|||||||
#!/usr/bin/env python
|
|
||||||
"""Check GPG v2 signature for a given public key."""
|
|
||||||
import argparse
|
|
||||||
import base64
|
|
||||||
import io
|
|
||||||
import logging
|
|
||||||
|
|
||||||
from . import decode
|
|
||||||
from .. import util
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
def original_data(filename):
|
|
||||||
"""Locate and load original file data, whose signature is provided."""
|
|
||||||
parts = filename.rsplit('.', 1)
|
|
||||||
if len(parts) == 2 and parts[1] in ('sig', 'asc'):
|
|
||||||
log.debug('loading file %s', parts[0])
|
|
||||||
return open(parts[0], 'rb').read()
|
|
||||||
|
|
||||||
|
|
||||||
def verify(pubkey, sig_file):
|
|
||||||
"""Verify correctness of public key and signature."""
|
|
||||||
stream = open(sig_file, 'rb')
|
|
||||||
if stream.name.endswith('.asc'):
|
|
||||||
lines = stream.readlines()[3:-1]
|
|
||||||
data = base64.b64decode(''.join(lines))
|
|
||||||
payload, checksum = data[:-3], data[-3:]
|
|
||||||
assert util.crc24(payload) == checksum
|
|
||||||
stream = io.BytesIO(payload)
|
|
||||||
|
|
||||||
signature, digest = decode.load_signature(stream, original_data(sig_file))
|
|
||||||
decode.verify_digest(pubkey=pubkey, digest=digest,
|
|
||||||
signature=signature['sig'], label='GPG signature')
|
|
||||||
log.info('%s OK', sig_file)
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
"""Main function."""
|
|
||||||
p = argparse.ArgumentParser()
|
|
||||||
p.add_argument('pubkey')
|
|
||||||
p.add_argument('signature')
|
|
||||||
p.add_argument('-v', '--verbose', action='store_true', default=False)
|
|
||||||
args = p.parse_args()
|
|
||||||
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO,
|
|
||||||
format='%(asctime)s %(levelname)-10s %(message)s')
|
|
||||||
verify(pubkey=decode.load_public_key(open(args.pubkey, 'rb')),
|
|
||||||
sig_file=args.signature)
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
main()
|
|
||||||
@@ -1,4 +1,5 @@
|
|||||||
"""Decoders for GPG v2 data structures."""
|
"""Decoders for GPG v2 data structures."""
|
||||||
|
import base64
|
||||||
import functools
|
import functools
|
||||||
import hashlib
|
import hashlib
|
||||||
import io
|
import io
|
||||||
@@ -268,6 +269,7 @@ def load_public_key(stream, use_custom=False):
|
|||||||
packet, signature = packets[:2]
|
packet, signature = packets[:2]
|
||||||
packets = packets[2:]
|
packets = packets[2:]
|
||||||
|
|
||||||
|
packet['user_id'] = userid['value']
|
||||||
return packet
|
return packet
|
||||||
|
|
||||||
|
|
||||||
@@ -281,7 +283,8 @@ def load_signature(stream, original_data):
|
|||||||
|
|
||||||
def load_from_gpg(user_id, use_custom=False):
|
def load_from_gpg(user_id, use_custom=False):
|
||||||
"""Load existing GPG public key for `user_id` from local keyring."""
|
"""Load existing GPG public key for `user_id` from local keyring."""
|
||||||
pubkey_bytes = subprocess.check_output(['gpg2', '--export', user_id])
|
args = ['gpg2', '--export'] + ([user_id] if user_id else [])
|
||||||
|
pubkey_bytes = subprocess.check_output(args=args)
|
||||||
if pubkey_bytes:
|
if pubkey_bytes:
|
||||||
return load_public_key(io.BytesIO(pubkey_bytes), use_custom=use_custom)
|
return load_public_key(io.BytesIO(pubkey_bytes), use_custom=use_custom)
|
||||||
else:
|
else:
|
||||||
@@ -298,3 +301,19 @@ def verify_digest(pubkey, digest, signature, label):
|
|||||||
except ecdsa.keys.BadSignatureError:
|
except ecdsa.keys.BadSignatureError:
|
||||||
log.error('Bad %s!', label)
|
log.error('Bad %s!', label)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
def verify(pubkey, signature, original_data):
|
||||||
|
"""Verify correctness of public key and signature."""
|
||||||
|
stream = io.BytesIO(signature)
|
||||||
|
|
||||||
|
# remove GPG armor
|
||||||
|
lines = stream.readlines()[3:-1]
|
||||||
|
data = base64.b64decode(''.join(lines))
|
||||||
|
payload, checksum = data[:-3], data[-3:]
|
||||||
|
assert util.crc24(payload) == checksum
|
||||||
|
stream = io.BytesIO(payload)
|
||||||
|
|
||||||
|
signature, digest = load_signature(stream, original_data)
|
||||||
|
verify_digest(pubkey=pubkey, digest=digest,
|
||||||
|
signature=signature['sig'], label='GPG signature')
|
||||||
|
|||||||
@@ -125,8 +125,9 @@ class Signer(object):
|
|||||||
curve_name=curve_name, created=created,
|
curve_name=curve_name, created=created,
|
||||||
verifying_key=self.conn.pubkey())
|
verifying_key=self.conn.pubkey())
|
||||||
|
|
||||||
log.info('%s GPG public key %s created at %s', curve_name,
|
log.info('%s GPG public key %s created at %s for "%s"',
|
||||||
self.pubkey, util.time_format(self.pubkey.created))
|
curve_name, self.pubkey,
|
||||||
|
util.time_format(self.pubkey.created), user_id)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_public_key(cls, pubkey, user_id):
|
def from_public_key(cls, pubkey, user_id):
|
||||||
|
|||||||
@@ -6,73 +6,65 @@ import subprocess as sp
|
|||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from . import check, decode, encode
|
from . import decode, encode
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def _open_output(filename):
|
def run_create(args):
|
||||||
return sys.stdout if filename == '-' else open(filename, 'wb')
|
"""Generate a new pubkey for a new/existing GPG identity."""
|
||||||
|
s = encode.Signer(user_id=args.user_id, created=args.time,
|
||||||
|
curve_name=args.ecdsa_curve)
|
||||||
|
if args.subkey:
|
||||||
|
subkey = s.subkey()
|
||||||
|
primary = sp.check_output(['gpg2', '--export', args.user_id])
|
||||||
|
result = primary + subkey
|
||||||
|
else:
|
||||||
|
result = s.export()
|
||||||
|
s.close()
|
||||||
|
|
||||||
|
return encode.armor(result, 'PUBLIC KEY BLOCK')
|
||||||
|
|
||||||
|
|
||||||
def _call_with_input(args, blob):
|
def run_sign(args):
|
||||||
p = sp.Popen(args=args, stdin=sp.PIPE)
|
"""Generate a GPG signature using hardware-based device."""
|
||||||
p.stdin.write(blob)
|
pubkey = decode.load_from_gpg(user_id=None, use_custom=True)
|
||||||
p.stdin.close()
|
s = encode.Signer.from_public_key(pubkey=pubkey, user_id=pubkey['user_id'])
|
||||||
exit_code = p.wait()
|
if args.filename:
|
||||||
assert exit_code == 0, exit_code
|
data = open(args.filename, 'rb').read()
|
||||||
|
else:
|
||||||
|
data = sys.stdin.read()
|
||||||
|
sig = s.sign(data)
|
||||||
|
s.close()
|
||||||
|
|
||||||
|
sig = encode.armor(sig, 'SIGNATURE')
|
||||||
|
decode.verify(pubkey=pubkey, signature=sig, original_data=data)
|
||||||
|
return sig
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
"""Main function."""
|
"""Main function."""
|
||||||
p = argparse.ArgumentParser()
|
p = argparse.ArgumentParser()
|
||||||
p.add_argument('user_id')
|
|
||||||
p.add_argument('filename', nargs='?')
|
|
||||||
p.add_argument('-t', '--time', type=int, default=int(time.time()))
|
|
||||||
p.add_argument('-a', '--armor', action='store_true', default=False)
|
|
||||||
p.add_argument('-v', '--verbose', action='store_true', default=False)
|
p.add_argument('-v', '--verbose', action='store_true', default=False)
|
||||||
p.add_argument('-s', '--subkey', action='store_true', default=False)
|
subparsers = p.add_subparsers()
|
||||||
p.add_argument('-e', '--ecdsa-curve', default='nist256p1')
|
|
||||||
p.add_argument('-o', '--output',
|
create = subparsers.add_parser('create')
|
||||||
help='Output file name for the results. '
|
create.add_argument('user_id', help='e.g. '
|
||||||
'Use "-" to write the results to stdout or "GPG" '
|
'"Satoshi Nakamoto <satoshi@nakamoto.bit>"')
|
||||||
'to import a public key into the local keyring.')
|
create.add_argument('-s', '--subkey', action='store_true', default=False)
|
||||||
|
create.add_argument('-e', '--ecdsa-curve', default='nist256p1')
|
||||||
|
create.add_argument('-t', '--time', type=int, default=int(time.time()))
|
||||||
|
create.set_defaults(run=run_create)
|
||||||
|
|
||||||
|
sign = subparsers.add_parser('sign')
|
||||||
|
sign.add_argument('filename', nargs='?')
|
||||||
|
sign.set_defaults(run=run_sign)
|
||||||
|
|
||||||
args = p.parse_args()
|
args = p.parse_args()
|
||||||
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO,
|
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO,
|
||||||
format='%(asctime)s %(levelname)-10s %(message)s')
|
format='%(asctime)s %(levelname)-10s %(message)s')
|
||||||
user_id = args.user_id.encode('ascii')
|
result = args.run(args)
|
||||||
if not args.filename:
|
sys.stdout.write(result)
|
||||||
s = encode.Signer(user_id=user_id, created=args.time,
|
|
||||||
curve_name=args.ecdsa_curve)
|
|
||||||
if args.subkey:
|
|
||||||
pubkey = s.subkey()
|
|
||||||
else:
|
|
||||||
pubkey = s.export()
|
|
||||||
|
|
||||||
ext = '.pub'
|
|
||||||
if args.armor:
|
|
||||||
pubkey = encode.armor(pubkey, 'PUBLIC KEY BLOCK')
|
|
||||||
ext = '.asc'
|
|
||||||
filename = args.output or '-' # use stdout if no file specified
|
|
||||||
if filename == 'GPG':
|
|
||||||
log.info('importing public key to local keyring')
|
|
||||||
_call_with_input(['gpg2', '--import'], pubkey)
|
|
||||||
else:
|
|
||||||
_open_output(filename).write(pubkey)
|
|
||||||
else:
|
|
||||||
pubkey = decode.load_from_gpg(user_id, use_custom=True)
|
|
||||||
s = encode.Signer.from_public_key(pubkey=pubkey, user_id=user_id)
|
|
||||||
data = open(args.filename, 'rb').read()
|
|
||||||
sig, ext = s.sign(data), '.sig'
|
|
||||||
if args.armor:
|
|
||||||
sig = encode.armor(sig, 'SIGNATURE')
|
|
||||||
ext = '.asc'
|
|
||||||
filename = args.output or (args.filename + ext)
|
|
||||||
_open_output(filename).write(sig)
|
|
||||||
check.verify(pubkey=pubkey, sig_file=filename)
|
|
||||||
|
|
||||||
s.close()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
|||||||
Reference in New Issue
Block a user