mirror of
https://github.com/romanz/amodem.git
synced 2026-04-20 21:26:39 +08:00
gpg: small fixes before merging to master
This commit is contained in:
@@ -89,9 +89,38 @@ def split_bits(value, *bits):
|
|||||||
assert value == 0
|
assert value == 0
|
||||||
return reversed(result)
|
return reversed(result)
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_nist256p1_verifier(mpi):
|
||||||
|
prefix, x, y = split_bits(mpi, 4, 256, 256)
|
||||||
|
assert prefix == 4
|
||||||
|
point = ecdsa.ellipticcurve.Point(curve=ecdsa.NIST256p.curve,
|
||||||
|
x=x, y=y)
|
||||||
|
vk = ecdsa.VerifyingKey.from_public_point(
|
||||||
|
point=point, curve=ecdsa.curves.NIST256p,
|
||||||
|
hashfunc=hashlib.sha256)
|
||||||
|
|
||||||
|
def _nist256p1_verify(signature, digest):
|
||||||
|
vk.verify_digest(signature=signature,
|
||||||
|
digest=digest,
|
||||||
|
sigdecode=lambda rs, order: rs)
|
||||||
|
return _nist256p1_verify
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_ed25519_verifier(mpi):
|
||||||
|
prefix, value = split_bits(mpi, 8, 256)
|
||||||
|
assert prefix == 0x40
|
||||||
|
vk = ed25519.VerifyingKey(num2bytes(value, size=32))
|
||||||
|
|
||||||
|
def _ed25519_verify(signature, digest):
|
||||||
|
sig = b''.join(num2bytes(val, size=32)
|
||||||
|
for val in signature)
|
||||||
|
vk.verify(sig, digest)
|
||||||
|
return _ed25519_verify
|
||||||
|
|
||||||
|
|
||||||
SUPPORTED_CURVES = {
|
SUPPORTED_CURVES = {
|
||||||
b'\x2A\x86\x48\xCE\x3D\x03\x01\x07': 'nist256p1',
|
b'\x2A\x86\x48\xCE\x3D\x03\x01\x07': _parse_nist256p1_verifier,
|
||||||
b'\x2B\x06\x01\x04\x01\xDA\x47\x0F\x01': 'ed25519',
|
b'\x2B\x06\x01\x04\x01\xDA\x47\x0F\x01': _parse_ed25519_verifier,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -177,37 +206,11 @@ class Parser(object):
|
|||||||
oid_size = stream.readfmt('B')
|
oid_size = stream.readfmt('B')
|
||||||
oid = stream.read(oid_size)
|
oid = stream.read(oid_size)
|
||||||
assert oid in SUPPORTED_CURVES
|
assert oid in SUPPORTED_CURVES
|
||||||
curve_name = SUPPORTED_CURVES[oid]
|
parser = SUPPORTED_CURVES[oid]
|
||||||
|
|
||||||
mpi = parse_mpi(stream)
|
mpi = parse_mpi(stream)
|
||||||
log.debug('mpi: %x (%d bits)', mpi, mpi.bit_length())
|
log.debug('mpi: %x (%d bits)', mpi, mpi.bit_length())
|
||||||
if curve_name == 'nist256p1':
|
p['verifier'] = parser(mpi)
|
||||||
prefix, x, y = split_bits(mpi, 4, 256, 256)
|
|
||||||
assert prefix == 4
|
|
||||||
point = ecdsa.ellipticcurve.Point(curve=ecdsa.NIST256p.curve,
|
|
||||||
x=x, y=y)
|
|
||||||
vk = ecdsa.VerifyingKey.from_public_point(
|
|
||||||
point=point, curve=ecdsa.curves.NIST256p,
|
|
||||||
hashfunc=hashlib.sha256)
|
|
||||||
|
|
||||||
def _nist256p1_verify(signature, digest):
|
|
||||||
vk.verify_digest(signature=signature,
|
|
||||||
digest=digest,
|
|
||||||
sigdecode=lambda rs, order: rs)
|
|
||||||
p['verifier'] = _nist256p1_verify
|
|
||||||
elif curve_name == 'ed25519':
|
|
||||||
prefix, value = split_bits(mpi, 8, 256)
|
|
||||||
assert prefix == 0x40
|
|
||||||
vk = ed25519.VerifyingKey(num2bytes(value, size=32))
|
|
||||||
|
|
||||||
def _ed25519_verify(signature, digest):
|
|
||||||
sig = b''.join(num2bytes(val, size=32)
|
|
||||||
for val in signature)
|
|
||||||
vk.verify(sig, digest)
|
|
||||||
p['verifier'] = _ed25519_verify
|
|
||||||
else:
|
|
||||||
raise ValueError('unsupported curve {}'.format(curve_name))
|
|
||||||
|
|
||||||
assert not stream.read()
|
assert not stream.read()
|
||||||
|
|
||||||
# https://tools.ietf.org/html/rfc4880#section-12.2
|
# https://tools.ietf.org/html/rfc4880#section-12.2
|
||||||
|
|||||||
@@ -216,12 +216,11 @@ class Signer(object):
|
|||||||
ecdsa_curve_name=self.curve_name)
|
ecdsa_curve_name=self.curve_name)
|
||||||
assert result.signature[:1] == b'\x00'
|
assert result.signature[:1] == b'\x00'
|
||||||
sig = result.signature[1:]
|
sig = result.signature[1:]
|
||||||
sig = [util.bytes2num(sig[:32]),
|
sig = mpi(util.bytes2num(sig[:32])) + mpi(util.bytes2num(sig[32:]))
|
||||||
util.bytes2num(sig[32:])]
|
|
||||||
|
|
||||||
hash_prefix = digest[:2] # used for decoder's sanity check
|
return (header + hashed + unhashed +
|
||||||
signature = mpi(sig[0]) + mpi(sig[1]) # actual ECDSA signature
|
digest[:2] + # used for decoder's sanity check
|
||||||
return header + hashed + unhashed + hash_prefix + signature
|
sig) # actual ECDSA signature
|
||||||
|
|
||||||
|
|
||||||
def split_lines(body, size):
|
def split_lines(body, size):
|
||||||
@@ -240,9 +239,12 @@ def armor(blob, type_str):
|
|||||||
|
|
||||||
|
|
||||||
def load_from_gpg(user_id):
|
def load_from_gpg(user_id):
|
||||||
log.info('loading public key %r from local GPG keyring', user_id)
|
|
||||||
pubkey_bytes = subprocess.check_output(['gpg2', '--export', user_id])
|
pubkey_bytes = subprocess.check_output(['gpg2', '--export', user_id])
|
||||||
return decode.load_public_key(io.BytesIO(pubkey_bytes))
|
if pubkey_bytes:
|
||||||
|
return decode.load_public_key(io.BytesIO(pubkey_bytes))
|
||||||
|
else:
|
||||||
|
log.error('could not find public key %r in local GPG keyring', user_id)
|
||||||
|
raise KeyError(user_id)
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
|
|||||||
@@ -78,8 +78,8 @@ def frame(*msgs):
|
|||||||
|
|
||||||
|
|
||||||
def crc24(blob):
|
def crc24(blob):
|
||||||
CRC24_INIT = 0xB704CEL
|
CRC24_INIT = 0x0B704CE
|
||||||
CRC24_POLY = 0x1864CFBL
|
CRC24_POLY = 0x1864CFB
|
||||||
|
|
||||||
crc = CRC24_INIT
|
crc = CRC24_INIT
|
||||||
for octet in bytearray(blob):
|
for octet in bytearray(blob):
|
||||||
|
|||||||
Reference in New Issue
Block a user