mirror of
https://github.com/romanz/amodem.git
synced 2026-02-24 16:18:12 +08:00
HACK: support ECDH in agent - note keygrip and ID errors.
This commit is contained in:
@@ -49,6 +49,37 @@ def pksign(keygrip, digest, algo):
|
||||
return result
|
||||
|
||||
|
||||
def _serialize_point(data):
|
||||
data = '{}:'.format(len(data)) + data
|
||||
# https://www.gnupg.org/documentation/manuals/assuan/Server-responses.html
|
||||
for c in ['%', '\n', '\r']:
|
||||
data = data.replace(c, '%{:02X}'.format(ord(c)))
|
||||
return '(5:value' + data + ')'
|
||||
|
||||
|
||||
def pkdecrypt(keygrip, conn):
|
||||
for msg in [b'S INQUIRE_MAXLEN 4096', b'INQUIRE CIPHERTEXT']:
|
||||
keyring.sendline(conn, msg)
|
||||
|
||||
line = keyring.recvline(conn)
|
||||
prefix, line = line.split(' ', 1)
|
||||
assert prefix == 'D'
|
||||
exp, leftover = keyring.parse(keyring.unescape(line))
|
||||
|
||||
pubkey = decode.load_public_key(keyring.export_public_key(user_id=None),
|
||||
use_custom=True)
|
||||
f = encode.Factory.from_public_key(pubkey=pubkey,
|
||||
user_id=pubkey['user_id'])
|
||||
with contextlib.closing(f):
|
||||
### assert f.pubkey.keygrip == binascii.unhexlify(keygrip)
|
||||
pubkey = dict(exp[1][1:])['e']
|
||||
shared_secret = f.get_shared_secret(pubkey)
|
||||
|
||||
assert len(shared_secret) == 65
|
||||
assert shared_secret[:1] == b'\x04'
|
||||
return _serialize_point(shared_secret)
|
||||
|
||||
|
||||
def iterlines(conn):
|
||||
"""Iterate over input, split by lines."""
|
||||
while True:
|
||||
@@ -75,13 +106,19 @@ def handle_connection(conn):
|
||||
keyring.sendline(conn, b'D 2.1.11')
|
||||
elif command == 'AGENT_ID':
|
||||
keyring.sendline(conn, b'D TREZOR')
|
||||
elif command == 'SIGKEY':
|
||||
elif command in {'SIGKEY', 'SETKEY'}:
|
||||
keygrip, = args
|
||||
elif command == 'SETHASH':
|
||||
algo, digest = args
|
||||
elif command == 'PKSIGN':
|
||||
sig = pksign(keygrip, digest, algo)
|
||||
keyring.sendline(conn, b'D ' + sig)
|
||||
elif command == 'PKDECRYPT':
|
||||
sec = pkdecrypt(keygrip, conn)
|
||||
keyring.sendline(conn, b'D ' + sec)
|
||||
elif command == 'END':
|
||||
log.error('closing connection')
|
||||
return
|
||||
else:
|
||||
log.error('unknown request: %r', line)
|
||||
return
|
||||
|
||||
@@ -40,6 +40,16 @@ class HardwareSigner(object):
|
||||
sig = result.signature[1:]
|
||||
return (util.bytes2num(sig[:32]), util.bytes2num(sig[32:]))
|
||||
|
||||
def ecdh(self, pubkey):
|
||||
result = self.client_wrapper.connection.sign_identity(
|
||||
identity=self.identity,
|
||||
challenge_hidden=pubkey,
|
||||
challenge_visual=b'',
|
||||
ecdsa_curve_name=self.curve_name)
|
||||
assert len(result.signature) == 65
|
||||
assert result.signature[:1] == b'\x04'
|
||||
return result.signature
|
||||
|
||||
def close(self):
|
||||
"""Close the connection to the device."""
|
||||
self.client_wrapper.connection.clear_session()
|
||||
@@ -91,7 +101,7 @@ class Factory(object):
|
||||
s = cls(user_id=user_id,
|
||||
created=pubkey['created'],
|
||||
curve_name=proto.find_curve_by_algo_id(pubkey['algo']))
|
||||
assert s.pubkey.key_id() == pubkey['key_id']
|
||||
### assert s.pubkey.key_id() == pubkey['key_id']
|
||||
return s
|
||||
|
||||
def close(self):
|
||||
@@ -205,3 +215,6 @@ class Factory(object):
|
||||
hashed_subpackets=hashed_subpackets,
|
||||
unhashed_subpackets=unhashed_subpackets)
|
||||
return proto.packet(tag=2, blob=blob)
|
||||
|
||||
def get_shared_secret(self, pubkey):
|
||||
return self.conn.ecdh(pubkey)
|
||||
|
||||
@@ -138,6 +138,9 @@ CUSTOM_SUBPACKET = subpacket(100, b'TREZOR-GPG') # marks "our" pubkey
|
||||
|
||||
def find_curve_by_algo_id(algo_id):
|
||||
"""Find curve name that matches a public key algorith ID."""
|
||||
if algo_id == ECDH_ALGO_ID:
|
||||
return formats.CURVE_NIST256
|
||||
|
||||
curve_name, = [name for name, info in SUPPORTED_CURVES.items()
|
||||
if info['algo_id'] == algo_id]
|
||||
return curve_name
|
||||
|
||||
Reference in New Issue
Block a user