mirror of
https://github.com/romanz/amodem.git
synced 2026-04-20 13:16:42 +08:00
gpg: refactor ecdh case
This commit is contained in:
@@ -57,23 +57,36 @@ def _serialize_point(data):
|
|||||||
return '(5:value' + data + ')'
|
return '(5:value' + data + ')'
|
||||||
|
|
||||||
|
|
||||||
|
def parse_ecdh(line):
|
||||||
|
prefix, line = line.split(' ', 1)
|
||||||
|
assert prefix == 'D'
|
||||||
|
exp, leftover = keyring.parse(keyring.unescape(line))
|
||||||
|
log.debug('ECDH s-exp: %r', exp)
|
||||||
|
assert not leftover
|
||||||
|
label, exp = exp
|
||||||
|
assert label == b'enc-val'
|
||||||
|
assert exp[0] == b'ecdh'
|
||||||
|
items = exp[1:]
|
||||||
|
log.debug('ECDH parameters: %r', items)
|
||||||
|
return dict(items)['e']
|
||||||
|
|
||||||
|
|
||||||
def pkdecrypt(keygrip, conn):
|
def pkdecrypt(keygrip, conn):
|
||||||
for msg in [b'S INQUIRE_MAXLEN 4096', b'INQUIRE CIPHERTEXT']:
|
for msg in [b'S INQUIRE_MAXLEN 4096', b'INQUIRE CIPHERTEXT']:
|
||||||
keyring.sendline(conn, msg)
|
keyring.sendline(conn, msg)
|
||||||
|
|
||||||
line = keyring.recvline(conn)
|
line = keyring.recvline(conn)
|
||||||
prefix, line = line.split(' ', 1)
|
assert keyring.recvline(conn) == b'END'
|
||||||
assert prefix == 'D'
|
remote_pubkey = parse_ecdh(line)
|
||||||
exp, leftover = keyring.parse(keyring.unescape(line))
|
|
||||||
|
|
||||||
pubkey = decode.load_public_key(keyring.export_public_key(user_id=None),
|
local_pubkey = decode.load_public_key(
|
||||||
use_custom=True)
|
pubkey_bytes=keyring.export_public_key(user_id=None),
|
||||||
f = encode.Factory.from_public_key(pubkey=pubkey,
|
use_custom=True)
|
||||||
user_id=pubkey['user_id'])
|
f = encode.Factory.from_public_key(
|
||||||
|
pubkey=local_pubkey, user_id=local_pubkey['user_id'])
|
||||||
with contextlib.closing(f):
|
with contextlib.closing(f):
|
||||||
### assert f.pubkey.keygrip == binascii.unhexlify(keygrip)
|
### assert f.pubkey.keygrip == binascii.unhexlify(keygrip)
|
||||||
pubkey = dict(exp[1][1:])['e']
|
shared_secret = f.get_shared_secret(remote_pubkey)
|
||||||
shared_secret = f.get_shared_secret(pubkey)
|
|
||||||
|
|
||||||
assert len(shared_secret) == 65
|
assert len(shared_secret) == 65
|
||||||
assert shared_secret[:1] == b'\x04'
|
assert shared_secret[:1] == b'\x04'
|
||||||
@@ -116,9 +129,6 @@ def handle_connection(conn):
|
|||||||
elif command == 'PKDECRYPT':
|
elif command == 'PKDECRYPT':
|
||||||
sec = pkdecrypt(keygrip, conn)
|
sec = pkdecrypt(keygrip, conn)
|
||||||
keyring.sendline(conn, b'D ' + sec)
|
keyring.sendline(conn, b'D ' + sec)
|
||||||
elif command == 'END':
|
|
||||||
log.error('closing connection')
|
|
||||||
return
|
|
||||||
else:
|
else:
|
||||||
log.error('unknown request: %r', line)
|
log.error('unknown request: %r', line)
|
||||||
return
|
return
|
||||||
|
|||||||
Reference in New Issue
Block a user