mirror of
https://github.com/romanz/amodem.git
synced 2026-04-21 13:46:30 +08:00
ssh: retrieve all keys using a single device session
This commit is contained in:
@@ -152,7 +152,7 @@ class JustInTimeConnection(object):
|
|||||||
def _public_keys(self):
|
def _public_keys(self):
|
||||||
"""Return a list of SSH public keys (in textual format)."""
|
"""Return a list of SSH public keys (in textual format)."""
|
||||||
conn = self.conn_factory()
|
conn = self.conn_factory()
|
||||||
return [conn.get_public_key(i) for i in self.identities]
|
return conn.export_public_keys(self.identities)
|
||||||
|
|
||||||
def parse_public_keys(self):
|
def parse_public_keys(self):
|
||||||
"""Parse SSH public keys into dictionaries."""
|
"""Parse SSH public keys into dictionaries."""
|
||||||
|
|||||||
@@ -18,15 +18,17 @@ class Client(object):
|
|||||||
"""Connect to hardware device."""
|
"""Connect to hardware device."""
|
||||||
self.device = device
|
self.device = device
|
||||||
|
|
||||||
def get_public_key(self, identity):
|
def export_public_keys(self, identities):
|
||||||
"""Get SSH public key from the device."""
|
"""Export SSH public keys from the device."""
|
||||||
|
public_keys = []
|
||||||
with self.device:
|
with self.device:
|
||||||
pubkey = self.device.pubkey(identity)
|
for i in identities:
|
||||||
|
pubkey = self.device.pubkey(identity=i)
|
||||||
vk = formats.decompress_pubkey(pubkey=pubkey,
|
vk = formats.decompress_pubkey(pubkey=pubkey,
|
||||||
curve_name=identity.curve_name)
|
curve_name=i.curve_name)
|
||||||
return formats.export_public_key(vk=vk,
|
public_keys.append(formats.export_public_key(vk=vk,
|
||||||
label=str(identity))
|
label=str(i)))
|
||||||
|
return public_keys
|
||||||
|
|
||||||
def sign_ssh_challenge(self, blob, identity):
|
def sign_ssh_challenge(self, blob, identity):
|
||||||
"""Sign given blob using a private key on the device."""
|
"""Sign given blob using a private key on the device."""
|
||||||
|
|||||||
@@ -49,7 +49,7 @@ def test_ssh_agent():
|
|||||||
identity = device.interface.Identity(identity_str='localhost:22',
|
identity = device.interface.Identity(identity_str='localhost:22',
|
||||||
curve_name=CURVE)
|
curve_name=CURVE)
|
||||||
c = client.Client(device=MockDevice())
|
c = client.Client(device=MockDevice())
|
||||||
assert c.get_public_key(identity) == PUBKEY_TEXT
|
assert c.export_public_keys([identity]) == [PUBKEY_TEXT]
|
||||||
signature = c.sign_ssh_challenge(blob=BLOB, identity=identity)
|
signature = c.sign_ssh_challenge(blob=BLOB, identity=identity)
|
||||||
|
|
||||||
key = formats.import_public_key(PUBKEY_TEXT)
|
key = formats.import_public_key(PUBKEY_TEXT)
|
||||||
|
|||||||
Reference in New Issue
Block a user