Merge pull request #58 from romanz/keygrip-agent

gpg: replace TREZOR_GPG_USER_ID usage in gpg-agent mode
This commit is contained in:
Roman Zeyde
2016-10-18 18:15:00 +03:00
committed by GitHub
6 changed files with 85 additions and 98 deletions

View File

@@ -8,7 +8,7 @@ First, verify that you have GPG 2.1+ [installed](https://gist.github.com/vt0r/a2
``` ```
$ gpg2 --version | head -n1 $ gpg2 --version | head -n1
gpg (GnuPG) 2.1.11 gpg (GnuPG) 2.1.15
``` ```
Update you TREZOR firmware to the latest version (at least v1.4.0). Update you TREZOR firmware to the latest version (at least v1.4.0).
@@ -20,7 +20,7 @@ $ pip install --user git+https://github.com/romanz/trezor-agent.git
Define your GPG user ID as an environment variable: Define your GPG user ID as an environment variable:
``` ```
$ export TREZOR_GPG_USER_ID="John Doe <john@doe.bit>" $ TREZOR_GPG_USER_ID="John Doe <john@doe.bit>"
``` ```
There are two ways to generate TREZOR-based GPG public keys, as described below. There are two ways to generate TREZOR-based GPG public keys, as described below.
@@ -28,12 +28,12 @@ There are two ways to generate TREZOR-based GPG public keys, as described below.
## 1. generate a new GPG identity: ## 1. generate a new GPG identity:
``` ```
$ trezor-gpg create | gpg2 --import # use the TREZOR to confirm signing the primary key $ trezor-gpg create "${TREZOR_GPG_USER_ID}" | gpg2 --import # use the TREZOR to confirm signing the primary key
gpg: key 5E4D684D: public key "John Doe <john@doe.bit>" imported gpg: key 5E4D684D: public key "John Doe <john@doe.bit>" imported
gpg: Total number processed: 1 gpg: Total number processed: 1
gpg: imported: 1 gpg: imported: 1
$ gpg2 --edit "${TREZOR_GPG_USER_ID}" trust # set this key to ultimate trust (option #5) $ gpg2 --edit "${TREZOR_GPG_USER_ID}" trust # set this key to ultimate trust (option #5)
$ gpg2 -k $ gpg2 -k
/home/roman/.gnupg/pubring.kbx /home/roman/.gnupg/pubring.kbx
@@ -46,14 +46,14 @@ sub nistp256/A31D9E25 2016-06-17 [E]
## 2. generate a new subkey for an existing GPG identity: ## 2. generate a new subkey for an existing GPG identity:
``` ```
$ gpg2 -k # suppose there is already a GPG primary key $ gpg2 -k # suppose there is already a GPG primary key
/home/roman/.gnupg/pubring.kbx /home/roman/.gnupg/pubring.kbx
------------------------------ ------------------------------
pub rsa2048/87BB07B4 2016-06-17 [SC] pub rsa2048/87BB07B4 2016-06-17 [SC]
uid [ultimate] John Doe <john@doe.bit> uid [ultimate] John Doe <john@doe.bit>
sub rsa2048/7176D31F 2016-06-17 [E] sub rsa2048/7176D31F 2016-06-17 [E]
$ trezor-gpg create --subkey | gpg2 --import # use the TREZOR to confirm signing the subkey $ trezor-gpg create --subkey "${TREZOR_GPG_USER_ID}" | gpg2 --import # use the TREZOR to confirm signing the subkey
gpg: key 87BB07B4: "John Doe <john@doe.bit>" 2 new signatures gpg: key 87BB07B4: "John Doe <john@doe.bit>" 2 new signatures
gpg: key 87BB07B4: "John Doe <john@doe.bit>" 2 new subkeys gpg: key 87BB07B4: "John Doe <john@doe.bit>" 2 new subkeys
gpg: Total number processed: 1 gpg: Total number processed: 1
@@ -83,13 +83,13 @@ when you are done with the TREZOR-based GPG operations.
``` ```
$ echo "Hello World!" | gpg2 --sign | gpg2 --verify $ echo "Hello World!" | gpg2 --sign | gpg2 --verify
gpg: Signature made Fri 17 Jun 2016 08:55:13 PM IDT using ECDSA key ID 5E4D684D gpg: Signature made Fri 17 Jun 2016 08:55:13 PM IDT using ECDSA key ID 5E4D684D
gpg: Good signature from "Roman Zeyde <roman.zeyde@gmail.com>" [ultimate] gpg: Good signature from "John Doe <john@doe.bit>" [ultimate]
``` ```
## Encrypt and decrypt GPG messages: ## Encrypt and decrypt GPG messages:
``` ```
$ date | gpg2 --encrypt -r "${TREZOR_GPG_USER_ID}" | gpg2 --decrypt $ date | gpg2 --encrypt -r "${TREZOR_GPG_USER_ID}" | gpg2 --decrypt
gpg: encrypted with 256-bit ECDH key, ID A31D9E25, created 2016-06-17 gpg: encrypted with 256-bit ECDH key, ID A31D9E25, created 2016-06-17
"Roman Zeyde <roman.zeyde@gmail.com>" "John Doe <john@doe.bit>"
Fri Jun 17 20:55:31 IDT 2016 Fri Jun 17 20:55:31 IDT 2016
``` ```

View File

@@ -3,7 +3,6 @@
import argparse import argparse
import contextlib import contextlib
import logging import logging
import os
import sys import sys
import time import time
@@ -17,17 +16,16 @@ log = logging.getLogger(__name__)
def run_create(args): def run_create(args):
"""Generate a new pubkey for a new/existing GPG identity.""" """Generate a new pubkey for a new/existing GPG identity."""
user_id = os.environ['TREZOR_GPG_USER_ID']
log.warning('NOTE: in order to re-generate the exact same GPG key later, ' log.warning('NOTE: in order to re-generate the exact same GPG key later, '
'run this command with "--time=%d" commandline flag (to set ' 'run this command with "--time=%d" commandline flag (to set '
'the timestamp of the GPG key manually).', args.time) 'the timestamp of the GPG key manually).', args.time)
conn = device.HardwareSigner(user_id=user_id, conn = device.HardwareSigner(user_id=args.user_id,
curve_name=args.ecdsa_curve) curve_name=args.ecdsa_curve)
verifying_key = conn.pubkey(ecdh=False) verifying_key = conn.pubkey(ecdh=False)
decryption_key = conn.pubkey(ecdh=True) decryption_key = conn.pubkey(ecdh=True)
if args.subkey: if args.subkey:
primary_bytes = keyring.export_public_key(user_id=user_id) primary_bytes = keyring.export_public_key(user_id=args.user_id)
# subkey for signing # subkey for signing
signing_key = protocol.PublicKey( signing_key = protocol.PublicKey(
curve_name=args.ecdsa_curve, created=args.time, curve_name=args.ecdsa_curve, created=args.time,
@@ -37,10 +35,10 @@ def run_create(args):
curve_name=formats.get_ecdh_curve_name(args.ecdsa_curve), curve_name=formats.get_ecdh_curve_name(args.ecdsa_curve),
created=args.time, verifying_key=decryption_key, ecdh=True) created=args.time, verifying_key=decryption_key, ecdh=True)
result = encode.create_subkey(primary_bytes=primary_bytes, result = encode.create_subkey(primary_bytes=primary_bytes,
pubkey=signing_key, subkey=signing_key,
signer_func=conn.sign) signer_func=conn.sign)
result = encode.create_subkey(primary_bytes=result, result = encode.create_subkey(primary_bytes=result,
pubkey=encryption_key, subkey=encryption_key,
signer_func=conn.sign) signer_func=conn.sign)
else: else:
# primary key for signing # primary key for signing
@@ -52,11 +50,11 @@ def run_create(args):
curve_name=formats.get_ecdh_curve_name(args.ecdsa_curve), curve_name=formats.get_ecdh_curve_name(args.ecdsa_curve),
created=args.time, verifying_key=decryption_key, ecdh=True) created=args.time, verifying_key=decryption_key, ecdh=True)
result = encode.create_primary(user_id=user_id, result = encode.create_primary(user_id=args.user_id,
pubkey=primary, pubkey=primary,
signer_func=conn.sign) signer_func=conn.sign)
result = encode.create_subkey(primary_bytes=result, result = encode.create_subkey(primary_bytes=result,
pubkey=subkey, subkey=subkey,
signer_func=conn.sign) signer_func=conn.sign)
sys.stdout.write(protocol.armor(result, 'PUBLIC KEY BLOCK')) sys.stdout.write(protocol.armor(result, 'PUBLIC KEY BLOCK'))
@@ -80,6 +78,7 @@ def main():
subparsers.dest = 'command' subparsers.dest = 'command'
create_cmd = subparsers.add_parser('create') create_cmd = subparsers.add_parser('create')
create_cmd.add_argument('user_id')
create_cmd.add_argument('-s', '--subkey', action='store_true', default=False) create_cmd.add_argument('-s', '--subkey', action='store_true', default=False)
create_cmd.add_argument('-e', '--ecdsa-curve', default='nist256p1') create_cmd.add_argument('-e', '--ecdsa-curve', default='nist256p1')
create_cmd.add_argument('-t', '--time', type=int, default=int(time.time())) create_cmd.add_argument('-t', '--time', type=int, default=int(time.time()))

View File

@@ -2,9 +2,8 @@
import binascii import binascii
import contextlib import contextlib
import logging import logging
import os
from . import decode, encode, keyring from . import decode, device, keyring, protocol
from .. import util from .. import util
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -43,16 +42,37 @@ def _verify_keygrip(expected, actual):
raise KeyError('Keygrip mismatch: {!r} != {!r}', expected, actual) raise KeyError('Keygrip mismatch: {!r} != {!r}', expected, actual)
@contextlib.contextmanager
def open_connection(keygrip_bytes):
"""
Connect to the device for the specified keygrip.
Parse GPG public key to find the first user ID, which is used to
specify the correct signature/decryption key on the device.
"""
pubkey_dict, user_ids = decode.load_by_keygrip(
pubkey_bytes=keyring.export_public_keys(),
keygrip=keygrip_bytes)
# We assume the first user ID is used to generate TREZOR-based GPG keys.
user_id = user_ids[0]['value']
curve_name = protocol.get_curve_name_by_oid(pubkey_dict['curve_oid'])
ecdh = (pubkey_dict['algo'] == protocol.ECDH_ALGO_ID)
conn = device.HardwareSigner(user_id, curve_name=curve_name)
with contextlib.closing(conn):
pubkey = protocol.PublicKey(
curve_name=curve_name, created=pubkey_dict['created'],
verifying_key=conn.pubkey(ecdh=ecdh), ecdh=ecdh)
assert pubkey.key_id() == pubkey_dict['key_id']
assert pubkey.keygrip == keygrip_bytes
yield conn
def pksign(keygrip, digest, algo): def pksign(keygrip, digest, algo):
"""Sign a message digest using a private EC key.""" """Sign a message digest using a private EC key."""
assert algo == b'8', 'Unsupported hash algorithm ID {}'.format(algo) assert algo == b'8', 'Unsupported hash algorithm ID {}'.format(algo)
user_id = os.environ['TREZOR_GPG_USER_ID'] keygrip_bytes = binascii.unhexlify(keygrip)
pubkey_dict = decode.load_public_key( with open_connection(keygrip_bytes) as conn:
pubkey_bytes=keyring.export_public_key(user_id=user_id),
use_custom=True, ecdh=False)
pubkey, conn = encode.load_from_public_key(pubkey_dict=pubkey_dict)
with contextlib.closing(conn):
_verify_keygrip(pubkey.keygrip, binascii.unhexlify(keygrip))
r, s = conn.sign(binascii.unhexlify(digest)) r, s = conn.sign(binascii.unhexlify(digest))
result = sig_encode(r, s) result = sig_encode(r, s)
log.debug('result: %r', result) log.debug('result: %r', result)
@@ -89,13 +109,8 @@ def pkdecrypt(keygrip, conn):
assert keyring.recvline(conn) == b'END' assert keyring.recvline(conn) == b'END'
remote_pubkey = parse_ecdh(line) remote_pubkey = parse_ecdh(line)
user_id = os.environ['TREZOR_GPG_USER_ID'] keygrip_bytes = binascii.unhexlify(keygrip)
local_pubkey = decode.load_public_key( with open_connection(keygrip_bytes) as conn:
pubkey_bytes=keyring.export_public_key(user_id=user_id),
use_custom=True, ecdh=True)
pubkey, conn = encode.load_from_public_key(pubkey_dict=local_pubkey)
with contextlib.closing(conn):
_verify_keygrip(pubkey.keygrip, binascii.unhexlify(keygrip))
return _serialize_point(conn.ecdh(remote_pubkey)) return _serialize_point(conn.ecdh(remote_pubkey))

View File

@@ -176,14 +176,11 @@ def _parse_pubkey(stream, packet_type='pubkey'):
p['keygrip'] = keygrip p['keygrip'] = keygrip
elif p['algo'] == DSA_ALGO_ID: elif p['algo'] == DSA_ALGO_ID:
log.warning('DSA signatures are not verified') parse_mpis(stream, n=4) # DSA keys are not supported
parse_mpis(stream, n=4)
elif p['algo'] == ELGAMAL_ALGO_ID: elif p['algo'] == ELGAMAL_ALGO_ID:
log.warning('ElGamal signatures are not verified') parse_mpis(stream, n=3) # ElGamal keys are not supported
parse_mpis(stream, n=3)
else: # assume RSA else: # assume RSA
log.warning('RSA signatures are not verified') parse_mpis(stream, n=2) # RSA keys are not supported
parse_mpis(stream, n=2)
assert not stream.read() assert not stream.read()
# https://tools.ietf.org/html/rfc4880#section-12.2 # https://tools.ietf.org/html/rfc4880#section-12.2
@@ -285,31 +282,22 @@ HASH_ALGORITHMS = {
} }
def load_public_key(pubkey_bytes, use_custom=False, ecdh=False): def load_by_keygrip(pubkey_bytes, keygrip):
"""Parse and validate GPG public key from an input stream.""" """Return public key and first user ID for specified keygrip."""
stream = io.BytesIO(pubkey_bytes) stream = io.BytesIO(pubkey_bytes)
packets = list(parse_packets(stream)) packets = list(parse_packets(stream))
pubkey, userid, signature = packets[:3] packets_per_pubkey = []
packets = packets[3:] for p in packets:
log.debug('loaded public key "%s"', userid['value']) if p['type'] == 'pubkey':
# Add a new packet list for each pubkey.
packets_per_pubkey.append([])
packets_per_pubkey[-1].append(p)
packet = pubkey for packets in packets_per_pubkey:
while use_custom: user_ids = [p for p in packets if p['type'] == 'user_id']
log.debug('GPG packet type: %s (algo = %s, custom = %s)', for p in packets:
packet['type'], packet['algo'], signature['_is_custom']) if p.get('keygrip') == keygrip:
if packet['type'] in ('pubkey', 'subkey') and signature['_is_custom']: return p, user_ids
if ecdh == (packet['algo'] == protocol.ECDH_ALGO_ID):
log.debug('found custom %s', packet['type'])
break
while packets[1]['type'] != 'signature':
packets = packets[1:]
packet, signature = packets[:2]
packets = packets[2:]
packet['user_id'] = userid['value']
packet['_is_custom'] = signature['_is_custom']
return packet
def load_signature(stream, original_data): def load_signature(stream, original_data):

View File

@@ -1,8 +1,9 @@
"""Create GPG ECDSA signatures and public keys using TREZOR device.""" """Create GPG ECDSA signatures and public keys using TREZOR device."""
import io
import logging import logging
import time import time
from . import decode, device, keyring, protocol from . import decode, keyring, protocol
from .. import util from .. import util
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -21,7 +22,6 @@ def create_primary(user_id, pubkey, signer_func):
data_to_sign = (pubkey.data_to_hash() + data_to_sign = (pubkey.data_to_hash() +
user_id_packet[:1] + user_id_packet[:1] +
util.prefix_len('>L', user_id.encode('ascii'))) util.prefix_len('>L', user_id.encode('ascii')))
log.info('creating primary GPG key "%s"', user_id)
hashed_subpackets = [ hashed_subpackets = [
protocol.subpacket_time(pubkey.created), # signature time protocol.subpacket_time(pubkey.created), # signature time
# https://tools.ietf.org/html/rfc4880#section-5.2.3.7 # https://tools.ietf.org/html/rfc4880#section-5.2.3.7
@@ -40,7 +40,6 @@ def create_primary(user_id, pubkey, signer_func):
protocol.subpacket(16, pubkey.key_id()), # issuer key id protocol.subpacket(16, pubkey.key_id()), # issuer key id
protocol.CUSTOM_SUBPACKET] protocol.CUSTOM_SUBPACKET]
log.info('confirm signing with primary key')
signature = protocol.make_signature( signature = protocol.make_signature(
signer_func=signer_func, signer_func=signer_func,
public_algo=pubkey.algo_id, public_algo=pubkey.algo_id,
@@ -53,26 +52,26 @@ def create_primary(user_id, pubkey, signer_func):
return pubkey_packet + user_id_packet + sign_packet return pubkey_packet + user_id_packet + sign_packet
def create_subkey(primary_bytes, pubkey, signer_func): def create_subkey(primary_bytes, subkey, signer_func, user_id=None):
"""Export new subkey to GPG primary key.""" """Export new subkey to GPG primary key."""
subkey_packet = protocol.packet(tag=14, blob=pubkey.data()) subkey_packet = protocol.packet(tag=14, blob=subkey.data())
primary = decode.load_public_key(primary_bytes) packets = list(decode.parse_packets(io.BytesIO(primary_bytes)))
log.info('adding subkey to primary GPG key "%s"', primary['user_id']) primary, user_id, signature = packets[:3]
data_to_sign = primary['_to_hash'] + pubkey.data_to_hash()
if pubkey.ecdh: data_to_sign = primary['_to_hash'] + subkey.data_to_hash()
if subkey.ecdh:
embedded_sig = None embedded_sig = None
else: else:
# Primary Key Binding Signature # Primary Key Binding Signature
hashed_subpackets = [ hashed_subpackets = [
protocol.subpacket_time(pubkey.created)] # signature time protocol.subpacket_time(subkey.created)] # signature time
unhashed_subpackets = [ unhashed_subpackets = [
protocol.subpacket(16, pubkey.key_id())] # issuer key id protocol.subpacket(16, subkey.key_id())] # issuer key id
log.info('confirm signing with new subkey')
embedded_sig = protocol.make_signature( embedded_sig = protocol.make_signature(
signer_func=signer_func, signer_func=signer_func,
data_to_sign=data_to_sign, data_to_sign=data_to_sign,
public_algo=pubkey.algo_id, public_algo=subkey.algo_id,
sig_type=0x19, sig_type=0x19,
hashed_subpackets=hashed_subpackets, hashed_subpackets=hashed_subpackets,
unhashed_subpackets=unhashed_subpackets) unhashed_subpackets=unhashed_subpackets)
@@ -81,10 +80,10 @@ def create_subkey(primary_bytes, pubkey, signer_func):
# Key flags: https://tools.ietf.org/html/rfc4880#section-5.2.3.21 # Key flags: https://tools.ietf.org/html/rfc4880#section-5.2.3.21
# (certify & sign) (encrypt) # (certify & sign) (encrypt)
flags = (2) if (not pubkey.ecdh) else (4 | 8) flags = (2) if (not subkey.ecdh) else (4 | 8)
hashed_subpackets = [ hashed_subpackets = [
protocol.subpacket_time(pubkey.created), # signature time protocol.subpacket_time(subkey.created), # signature time
protocol.subpacket_byte(0x1B, flags)] protocol.subpacket_byte(0x1B, flags)]
unhashed_subpackets = [] unhashed_subpackets = []
@@ -93,9 +92,8 @@ def create_subkey(primary_bytes, pubkey, signer_func):
unhashed_subpackets.append(protocol.subpacket(32, embedded_sig)) unhashed_subpackets.append(protocol.subpacket(32, embedded_sig))
unhashed_subpackets.append(protocol.CUSTOM_SUBPACKET) unhashed_subpackets.append(protocol.CUSTOM_SUBPACKET)
log.info('confirm signing with primary key') if not signature['_is_custom']:
if not primary['_is_custom']: signer_func = keyring.create_agent_signer(user_id['value'])
signer_func = keyring.create_agent_signer(primary['user_id'])
signature = protocol.make_signature( signature = protocol.make_signature(
signer_func=signer_func, signer_func=signer_func,
@@ -106,22 +104,3 @@ def create_subkey(primary_bytes, pubkey, signer_func):
unhashed_subpackets=unhashed_subpackets) unhashed_subpackets=unhashed_subpackets)
sign_packet = protocol.packet(tag=2, blob=signature) sign_packet = protocol.packet(tag=2, blob=signature)
return primary_bytes + subkey_packet + sign_packet return primary_bytes + subkey_packet + sign_packet
def load_from_public_key(pubkey_dict):
"""Load correct public key from the device."""
log.debug('pubkey_dict: %s', pubkey_dict)
user_id = pubkey_dict['user_id']
created = pubkey_dict['created']
curve_name = protocol.get_curve_name_by_oid(pubkey_dict['curve_oid'])
ecdh = (pubkey_dict['algo'] == protocol.ECDH_ALGO_ID)
conn = device.HardwareSigner(user_id, curve_name=curve_name)
pubkey = protocol.PublicKey(
curve_name=curve_name, created=created,
verifying_key=conn.pubkey(ecdh=ecdh), ecdh=ecdh)
assert pubkey.key_id() == pubkey_dict['key_id']
log.info('%s created at %s for "%s"',
pubkey, _time_format(pubkey.created), user_id)
return pubkey, conn

View File

@@ -184,7 +184,7 @@ def gpg_command(args, env=None):
def get_keygrip(user_id, sp=subprocess): def get_keygrip(user_id, sp=subprocess):
"""Get a keygrip of the primary GPG key of the specified user.""" """Get a keygrip of the primary GPG key of the specified user."""
args = gpg_command(['--list-keys', '--with-keygrip', user_id]) args = gpg_command(['--list-keys', '--with-keygrip', user_id])
output = sp.check_output(args) output = sp.check_output(args).decode('ascii')
return re.findall(r'Keygrip = (\w+)', output)[0] return re.findall(r'Keygrip = (\w+)', output)[0]
@@ -206,6 +206,12 @@ def export_public_key(user_id, sp=subprocess):
return result return result
def export_public_keys(sp=subprocess):
"""Export all GPG public keys."""
args = gpg_command(['--export'])
return sp.check_output(args=args)
def create_agent_signer(user_id): def create_agent_signer(user_id):
"""Sign digest with existing GPG keys using gpg-agent tool.""" """Sign digest with existing GPG keys using gpg-agent tool."""
sock = connect_to_agent() sock = connect_to_agent()