diff --git a/trezor_agent/gpg/agent.py b/trezor_agent/gpg/agent.py index 34a5dcf..e058ac7 100644 --- a/trezor_agent/gpg/agent.py +++ b/trezor_agent/gpg/agent.py @@ -76,15 +76,47 @@ def _parse(s): return _parse_term(s) +def _parse_ecdsa_sig(args): + (r, sig_r), (s, sig_s) = args + assert r == 'r' + assert s == 's' + return (util.bytes2num(sig_r), + util.bytes2num(sig_s)) + + +def _parse_rsa_sig(args): + (s, sig_s), = args + assert s == 's' + return (util.bytes2num(sig_s),) + + +def _parse_sig(sig): + label, sig = sig + assert label == 'sig-val' + algo_name = sig[0] + parser = {'rsa': _parse_rsa_sig, 'ecdsa': _parse_ecdsa_sig}[algo_name] + return parser(args=sig[1:]) + + def sign(sock, keygrip, digest): """Sign a digest using specified key using GPG agent.""" hash_algo = 8 # SHA256 assert len(digest) == 32 assert _communicate(sock, 'RESET').startswith('OK') + + ttyname = sp.check_output('tty').strip() + options = ['ttyname={}'.format(ttyname)] # set TTY for passphrase entry + for opt in options: + assert _communicate(sock, 'OPTION {}'.format(opt)) == 'OK' + assert _communicate(sock, 'SIGKEY {}'.format(keygrip)) == 'OK' assert _communicate(sock, 'SETHASH {} {}'.format(hash_algo, _hex(digest))) == 'OK' + + desc = ('Please+enter+the+passphrase+to+unlock+the+OpenPGP%0A' + 'secret+key,+to+sign+a+new+TREZOR-based+subkey') + assert _communicate(sock, 'SETKEYDESC {}'.format(desc)) == 'OK' assert _communicate(sock, 'PKSIGN') == 'OK' line = _recvline(sock).strip() @@ -95,14 +127,7 @@ def sign(sock, keygrip, digest): sig, leftover = _parse(sig) assert not leftover - - data, (algo, (r, sig_r), (s, sig_s)) = sig - assert data == 'sig-val' - assert algo == 'ecdsa' - assert r == 'r' - assert s == 's' - return (util.bytes2num(sig_r), - util.bytes2num(sig_s)) + return _parse_sig(sig) def get_keygrip(user_id): @@ -113,8 +138,9 @@ def get_keygrip(user_id): def _main(): - logging.basicConfig(level=logging.INFO, - format='%(asctime)s %(levelname)-10s %(message)s') + logging.basicConfig(level=logging.DEBUG, + format='%(asctime)s %(levelname)-10s %(message)-100s ' + '%(filename)s:%(lineno)d') p = argparse.ArgumentParser() p.add_argument('user_id') diff --git a/trezor_agent/gpg/decode.py b/trezor_agent/gpg/decode.py index d57bd2c..96eb3da 100644 --- a/trezor_agent/gpg/decode.py +++ b/trezor_agent/gpg/decode.py @@ -68,11 +68,27 @@ def _parse_ed25519_verifier(mpi): return _ed25519_verify, vk +def _create_rsa_verifier(n, e): + def verifier(signature, digest): + s, = signature + size = n.bit_length() + result = pow(s, e, n) % (2 ** 256) + digest = util.bytes2num(digest) + if result == digest: + log.debug('RSA-%d signature is OK', size) + return True + else: + raise ValueError('invalid RSA signature') + + return verifier + SUPPORTED_CURVES = { b'\x2A\x86\x48\xCE\x3D\x03\x01\x07': _parse_nist256p1_verifier, b'\x2B\x06\x01\x04\x01\xDA\x47\x0F\x01': _parse_ed25519_verifier, } +ECDSA_ALGO_IDS = (19, 22) # (nist256, ed25519) + def _parse_literal(stream): """See https://tools.ietf.org/html/rfc4880#section-5.9 for details.""" @@ -115,10 +131,15 @@ def _parse_signature(stream): p['unhashed_subpackets'] = parse_subpackets(stream) embedded = list(_parse_embedded_signatures(p['unhashed_subpackets'])) if embedded: + log.info('embedded sigs: %s', embedded) p['embedded'] = embedded p['hash_prefix'] = stream.readfmt('2s') - p['sig'] = (parse_mpi(stream), parse_mpi(stream)) + if p['pubkey_alg'] in ECDSA_ALGO_IDS: + p['sig'] = (parse_mpi(stream), parse_mpi(stream)) + else: # RSA + p['sig'] = (parse_mpi(stream),) + assert not stream.read() return p @@ -131,16 +152,23 @@ def _parse_pubkey(stream): p['version'] = stream.readfmt('B') p['created'] = stream.readfmt('>L') p['algo'] = stream.readfmt('B') + if p['algo'] in ECDSA_ALGO_IDS: + # https://tools.ietf.org/html/rfc6637#section-11 + oid_size = stream.readfmt('B') + oid = stream.read(oid_size) + assert oid in SUPPORTED_CURVES, util.hexlify(oid) + parser = SUPPORTED_CURVES[oid] - # https://tools.ietf.org/html/rfc6637#section-11 - oid_size = stream.readfmt('B') - oid = stream.read(oid_size) - assert oid in SUPPORTED_CURVES - parser = SUPPORTED_CURVES[oid] + mpi = parse_mpi(stream) + log.debug('mpi: %x (%d bits)', mpi, mpi.bit_length()) + p['verifier'], p['verifying_key'] = parser(mpi) + else: # RSA + n = parse_mpi(stream) + e = parse_mpi(stream) + log.debug('n: %x (%d bits)', n, n.bit_length()) + log.debug('e: %x (%d bits)', e, e.bit_length()) + p['verifier'] = _create_rsa_verifier(n, e) - mpi = parse_mpi(stream) - log.debug('mpi: %x (%d bits)', mpi, mpi.bit_length()) - p['verifier'], p['verifying_key'] = parser(mpi) assert not stream.read() # https://tools.ietf.org/html/rfc4880#section-12.2 @@ -248,13 +276,20 @@ def digest_packets(packets): def load_public_key(stream): """Parse and validate GPG public key from an input stream.""" packets = list(parse_packets(util.Reader(stream))) - pubkey, userid, signature = packets[:3] + subkey = subsig = None + if len(packets) == 5: + pubkey, userid, signature, subkey, subsig = packets + log.debug('subkey: %s', subkey) + log.debug('subsig: %s', subsig) + else: + pubkey, userid, signature = packets + digest = digest_packets([pubkey, userid, signature]) assert signature['hash_prefix'] == digest[:2] log.debug('loaded public key "%s"', userid['value']) verify_digest(pubkey=pubkey, digest=digest, signature=signature['sig'], label='GPG public key') - return pubkey + return subkey or pubkey def load_signature(stream, original_data): diff --git a/trezor_agent/gpg/encode.py b/trezor_agent/gpg/encode.py index 3b09c2e..3be6a43 100644 --- a/trezor_agent/gpg/encode.py +++ b/trezor_agent/gpg/encode.py @@ -13,10 +13,18 @@ log = logging.getLogger(__name__) def packet(tag, blob): """Create small GPG packet.""" - assert len(blob) < 256 - length_type = 0 # : 1 byte for length + assert len(blob) < 2**32 + + if len(blob) < 2**8: + length_type = 0 + elif len(blob) < 2**16: + length_type = 1 + else: + length_type = 2 + + fmt = ['>B', '>H', '>L'][length_type] leading_byte = 0x80 | (tag << 2) | (length_type) - return struct.pack('>B', leading_byte) + util.prefix_len('>B', blob) + return struct.pack('>B', leading_byte) + util.prefix_len(fmt, blob) def subpacket(subpacket_type, fmt, *values): @@ -50,13 +58,13 @@ def mpi(value): """Serialize multipresicion integer using GPG format.""" bits = value.bit_length() data_size = (bits + 7) // 8 - data_bytes = [0] * data_size + data_bytes = bytearray(data_size) for i in range(data_size): data_bytes[i] = value & 0xFF value = value >> 8 data_bytes.reverse() - return struct.pack('>H', bits) + bytearray(data_bytes) + return struct.pack('>H', bits) + bytes(data_bytes) def _serialize_nist256(vk): @@ -92,7 +100,10 @@ def _find_curve_by_algo_id(algo_id): class HardwareSigner(object): + """Sign messages and get public keys from a hardware device.""" + def __init__(self, user_id, curve_name): + """Connect to the device and retrieve required public key.""" self.client_wrapper = factory.load() self.identity = self.client_wrapper.identity_type() self.identity.proto = 'gpg' @@ -100,6 +111,7 @@ class HardwareSigner(object): self.curve_name = curve_name def pubkey(self): + """Return public key as VerifyingKey object.""" addr = client.get_address(self.identity) public_node = self.client_wrapper.connection.get_public_node( n=addr, ecdsa_curve_name=self.curve_name) @@ -108,56 +120,100 @@ class HardwareSigner(object): pubkey=public_node.node.public_key, curve_name=self.curve_name) - def sign(self, digest, visual): + def sign(self, digest): + """Sign the digest and return a serialized signature.""" result = self.client_wrapper.connection.sign_identity( identity=self.identity, challenge_hidden=digest, - challenge_visual=visual, + challenge_visual=util.hexlify(digest), ecdsa_curve_name=self.curve_name) assert result.signature[:1] == b'\x00' sig = result.signature[1:] return mpi(util.bytes2num(sig[:32])) + mpi(util.bytes2num(sig[32:])) def close(self): + """Close the connection to the device.""" self.client_wrapper.connection.clear_session() self.client_wrapper.connection.close() class AgentSigner(object): - def __init__(self, user_id, curve_name): + """Sign messages and get public keys using gpg-agent tool.""" + + def __init__(self, user_id): + """Connect to the agent and retrieve required public key.""" self.sock = agent.connect() - assert curve_name == formats.CURVE_NIST256 - self.curve_name = curve_name self.keygrip = agent.get_keygrip(user_id) self.public_key = decode.load_from_gpg(user_id) def pubkey(self): + """Return public key as VerifyingKey object.""" return self.public_key['verifying_key'] - def sign(self, digest, visual): - r, s = agent.sign(sock=self.sock, keygrip=self.keygrip, digest=digest) - return mpi(r) + mpi(s) + def sign(self, digest): + """Sign the digest and return an ECDSA signature.""" + params = agent.sign(sock=self.sock, + keygrip=self.keygrip, digest=digest) + return b''.join(mpi(p) for p in params) def close(self): + """Close the connection to gpg-agent.""" self.sock.close() +class PublicKey(object): + """GPG representation for public key packets.""" + + def __init__(self, curve_name, created, verifying_key): + """Contruct using a ECDSA VerifyingKey object.""" + self.curve_info = SUPPORTED_CURVES[curve_name] + self.created = int(created) # time since Epoch + self.verifying_key = verifying_key + self.algo_id = self.curve_info['algo_id'] + + def data(self): + """Data for packet creation.""" + header = struct.pack('>BLB', + 4, # version + self.created, # creation + self.algo_id) # public key algorithm ID + oid = util.prefix_len('>B', self.curve_info['oid']) + blob = self.curve_info['serialize'](self.verifying_key) + return header + oid + blob + + def data_to_hash(self): + """Data for digest computation.""" + return b'\x99' + util.prefix_len('>H', self.data()) + + def _fingerprint(self): + return hashlib.sha1(self.data_to_hash()).digest() + + def key_id(self): + """Short (8 byte) GPG key ID.""" + return self._fingerprint()[-8:] + + def __repr__(self): + """Short (8 hexadecimal digits) GPG key ID.""" + return '<{}>'.format(util.hexlify(self.key_id())) + + __str__ = __repr__ + + class Signer(object): """Performs GPG signing operations.""" - SignerType = AgentSigner - def __init__(self, user_id, created, curve_name): """Construct and loads a public key from the device.""" self.user_id = user_id assert curve_name in formats.SUPPORTED_CURVES - self.conn = self.SignerType(user_id, curve_name=curve_name) - self.verifying_key = self.conn.pubkey() + self.conn = HardwareSigner(user_id, curve_name=curve_name) + self.pubkey = PublicKey( + curve_name=curve_name, created=created, + verifying_key=self.conn.pubkey()) - self.created = int(created) log.info('%s GPG public key %s created at %s', curve_name, - self.hex_short_key_id(), util.time_format(self.created)) + self.pubkey, util.time_format(self.pubkey.created)) @classmethod def from_public_key(cls, pubkey, user_id): @@ -170,64 +226,79 @@ class Signer(object): s = Signer(user_id=user_id, created=pubkey['created'], curve_name=_find_curve_by_algo_id(pubkey['algo'])) - assert s.key_id() == pubkey['key_id'] + assert s.pubkey.key_id() == pubkey['key_id'] return s - def _pubkey_data(self): - curve_info = SUPPORTED_CURVES[self.conn.curve_name] - header = struct.pack('>BLB', - 4, # version - self.created, # creation - curve_info['algo_id']) - oid = util.prefix_len('>B', curve_info['oid']) - blob = curve_info['serialize'](self.verifying_key) - return header + oid + blob - - def _pubkey_data_to_hash(self): - return b'\x99' + util.prefix_len('>H', self._pubkey_data()) - - def _fingerprint(self): - return hashlib.sha1(self._pubkey_data_to_hash()).digest() - - def key_id(self): - """Short (8 byte) GPG key ID.""" - return self._fingerprint()[-8:] - - def hex_short_key_id(self): - """Short (8 hexadecimal digits) GPG key ID.""" - return util.hexlify(self.key_id()[-4:]) - def close(self): """Close connection and turn off the screen of the device.""" self.conn.close() def export(self): """Export GPG public key, ready for "gpg2 --import".""" - pubkey_packet = packet(tag=6, blob=self._pubkey_data()) + pubkey_packet = packet(tag=6, blob=self.pubkey.data()) user_id_packet = packet(tag=13, blob=self.user_id) - data_to_sign = (self._pubkey_data_to_hash() + + data_to_sign = (self.pubkey.data_to_hash() + user_id_packet[:1] + util.prefix_len('>L', self.user_id)) log.info('signing public key "%s"', self.user_id) hashed_subpackets = [ - subpacket_time(self.created), # signature creaion time + subpacket_time(self.pubkey.created), # signature creaion time subpacket_byte(0x1B, 1 | 2), # key flags (certify & sign) subpacket_byte(0x15, 8), # preferred hash (SHA256) subpacket_byte(0x16, 0), # preferred compression (none) subpacket_byte(0x17, 0x80)] # key server prefs (no-modify) - signature = self._make_signature(visual=self.hex_short_key_id(), - data_to_sign=data_to_sign, - sig_type=0x13, # user id & public key - hashed_subpackets=hashed_subpackets) + unhashed_subpackets = [ + subpacket(16, self.pubkey.key_id())] # issuer key id + + signature = _make_signature( + signer_func=self.conn.sign, + public_algo=self.pubkey.algo_id, + data_to_sign=data_to_sign, + sig_type=0x13, # user id & public key + hashed_subpackets=hashed_subpackets, + unhashed_subpackets=unhashed_subpackets) sign_packet = packet(tag=2, blob=signature) return pubkey_packet + user_id_packet + sign_packet - def subkey(self, user_id): - primary = decode.load_from_gpg(user_id) - keygrip = agent.get_keygrip(user_id) - log.info('adding as subkey to %s (%s)', user_id, keygrip) + def subkey(self): + """Export a subkey to `self.user_id` GPG primary key.""" + subkey_packet = packet(tag=14, blob=self.pubkey.data()) + primary = decode.load_from_gpg(self.user_id) + keygrip = agent.get_keygrip(self.user_id) + log.info('adding as subkey to %s (%s)', self.user_id, keygrip) + data_to_sign = primary['_to_hash'] + self.pubkey.data_to_hash() + + # Primary Key Binding Signature + hashed_subpackets = [ + subpacket_time(self.pubkey.created)] # signature creaion time + unhashed_subpackets = [ + subpacket(16, self.pubkey.key_id())] # issuer key id + embedded_sig = _make_signature(signer_func=self.conn.sign, + data_to_sign=data_to_sign, + public_algo=self.pubkey.algo_id, + sig_type=0x19, + hashed_subpackets=hashed_subpackets, + unhashed_subpackets=unhashed_subpackets) + log.info('embedded signature: %r', embedded_sig) + + # Subkey Binding Signature + hashed_subpackets = [ + subpacket_time(self.pubkey.created), # signature creaion time + subpacket_byte(0x1B, 2)] # key flags (certify & sign) + unhashed_subpackets = [ + subpacket(16, primary['key_id']), # issuer key id + subpacket(32, embedded_sig)] + gpg_agent = AgentSigner(self.user_id) + signature = _make_signature(signer_func=gpg_agent.sign, + data_to_sign=data_to_sign, + public_algo=primary['algo'], + sig_type=0x18, + hashed_subpackets=hashed_subpackets, + unhashed_subpackets=unhashed_subpackets) + sign_packet = packet(tag=2, blob=signature) + return subkey_packet + sign_packet def sign(self, msg, sign_time=None): """Sign GPG message at specified time.""" @@ -237,34 +308,38 @@ class Signer(object): log.info('signing %d byte message at %s', len(msg), util.time_format(sign_time)) hashed_subpackets = [subpacket_time(sign_time)] - blob = self._make_signature( - visual=self.hex_short_key_id(), - data_to_sign=msg, hashed_subpackets=hashed_subpackets) + unhashed_subpackets = [ + subpacket(16, self.pubkey.key_id())] # issuer key id + + blob = _make_signature(signer_func=self.conn.sign, + data_to_sign=msg, + public_algo=self.pubkey.algo_id, + hashed_subpackets=hashed_subpackets, + unhashed_subpackets=unhashed_subpackets) return packet(tag=2, blob=blob) - def _make_signature(self, visual, data_to_sign, - hashed_subpackets, sig_type=0): - curve_info = SUPPORTED_CURVES[self.conn.curve_name] - header = struct.pack('>BBBB', - 4, # version - sig_type, # rfc4880 (section-5.2.1) - curve_info['algo_id'], - 8) # hash_alg (SHA256) - hashed = subpackets(*hashed_subpackets) - unhashed = subpackets( - subpacket(16, self.key_id()) # issuer key id - ) - tail = b'\x04\xff' + struct.pack('>L', len(header) + len(hashed)) - data_to_hash = data_to_sign + header + hashed + tail - log.debug('hashing %d bytes', len(data_to_hash)) - digest = hashlib.sha256(data_to_hash).digest() +def _make_signature(signer_func, data_to_sign, public_algo, + hashed_subpackets, unhashed_subpackets, sig_type=0): + # pylint: disable=too-many-arguments + header = struct.pack('>BBBB', + 4, # version + sig_type, # rfc4880 (section-5.2.1) + public_algo, + 8) # hash_alg (SHA256) + hashed = subpackets(*hashed_subpackets) + unhashed = subpackets(*unhashed_subpackets) + tail = b'\x04\xff' + struct.pack('>L', len(header) + len(hashed)) + data_to_hash = data_to_sign + header + hashed + tail - sig = self.conn.sign(digest=digest, visual=visual) + log.debug('hashing %d bytes', len(data_to_hash)) + digest = hashlib.sha256(data_to_hash).digest() - return (header + hashed + unhashed + - digest[:2] + # used for decoder's sanity check - sig) # actual ECDSA signature + sig = signer_func(digest=digest) + + return bytes(header + hashed + unhashed + + digest[:2] + # used for decoder's sanity check + sig) # actual ECDSA signature def _split_lines(body, size): diff --git a/trezor_agent/gpg/signer.py b/trezor_agent/gpg/signer.py index 7654251..c9b424a 100755 --- a/trezor_agent/gpg/signer.py +++ b/trezor_agent/gpg/signer.py @@ -31,6 +31,7 @@ def main(): p.add_argument('-t', '--time', type=int, default=int(time.time())) p.add_argument('-a', '--armor', action='store_true', default=False) p.add_argument('-v', '--verbose', action='store_true', default=False) + p.add_argument('-s', '--subkey', action='store_true', default=False) p.add_argument('-e', '--ecdsa-curve', default='nist256p1') p.add_argument('-o', '--output', help='Output file name for the results. ' @@ -44,12 +45,16 @@ def main(): if not args.filename: s = encode.Signer(user_id=user_id, created=args.time, curve_name=args.ecdsa_curve) - pubkey = s.export() + if args.subkey: + pubkey = s.subkey() + else: + pubkey = s.export() + ext = '.pub' if args.armor: pubkey = encode.armor(pubkey, 'PUBLIC KEY BLOCK') ext = '.asc' - filename = args.output or (s.hex_short_key_id() + ext) + filename = args.output or '-' # use stdout if no file specified if filename == 'GPG': log.info('importing public key to local keyring') _call_with_input(['gpg2', '--import'], pubkey) diff --git a/trezor_agent/gpg/test.sh b/trezor_agent/gpg/test.sh new file mode 100644 index 0000000..de1cba4 --- /dev/null +++ b/trezor_agent/gpg/test.sh @@ -0,0 +1,15 @@ +# NEVER RUN ON YOUR OWN REAL GPG KEYS!!!!! THEY WILL BE DELETED!!!!! +set -x -e -u +CURVE=ed25519 +#CURVE=nist256p1 +(cd ~/.gnupg && rm -rf openpgp-revocs.d/ private-keys-v1.d/ pubring.kbx* trustdb.gpg /tmp/log *.gpg; killall gpg-agent || true) +gpg2 --full-gen-key --expert +gpg2 --export > romanz.pub +NOW=`date +%s` +trezor-gpg -t $NOW -v -e $CURVE --subkey "romanz" -o subkey.pub +gpg2 -K +gpg2 -vv --import <(cat romanz.pub subkey.pub) +gpg2 -K + +trezor-gpg -t $NOW -v -e $CURVE "romanz" EXAMPLE +gpg2 --verify EXAMPLE.sig