ssh: move related code to a separate subdirectory

This commit is contained in:
Roman Zeyde
2017-05-05 11:22:00 +03:00
parent 6c2273387d
commit 257992d04c
8 changed files with 46 additions and 47 deletions

View File

@@ -1,16 +1,23 @@
"""SSH-agent implementation using hardware authentication devices."""
import argparse
import contextlib
import functools
import logging
import os
import re
import subprocess
import sys
import tempfile
import threading
from .. import client, device, formats, protocol, server, util
from .. import device, formats, server, util
from . import client, protocol
log = logging.getLogger(__name__)
UNIX_SOCKET_TIMEOUT = 0.1
def ssh_args(label):
"""Create SSH command for connecting specified server."""
@@ -51,7 +58,7 @@ def create_parser():
default=formats.CURVE_NIST256,
help='specify ECDSA curve name: ' + curve_names)
p.add_argument('--timeout',
default=server.UNIX_SOCKET_TIMEOUT, type=float,
default=UNIX_SOCKET_TIMEOUT, type=float,
help='Timeout for accepting SSH client connections')
p.add_argument('--debug', default=False, action='store_true',
help='Log SSH protocol messages for debugging.')
@@ -110,11 +117,44 @@ def git_host(remote_name, attributes):
return '{user}@{host}'.format(**match.groupdict())
@contextlib.contextmanager
def serve(handler, sock_path=None, timeout=UNIX_SOCKET_TIMEOUT):
"""
Start the ssh-agent server on a UNIX-domain socket.
If no connection is made during the specified timeout,
retry until the context is over.
"""
ssh_version = subprocess.check_output(['ssh', '-V'],
stderr=subprocess.STDOUT)
log.debug('local SSH version: %r', ssh_version)
if sock_path is None:
sock_path = tempfile.mktemp(prefix='trezor-ssh-agent-')
environ = {'SSH_AUTH_SOCK': sock_path, 'SSH_AGENT_PID': str(os.getpid())}
device_mutex = threading.Lock()
with server.unix_domain_socket_server(sock_path) as sock:
sock.settimeout(timeout)
quit_event = threading.Event()
handle_conn = functools.partial(server.handle_connection,
handler=handler,
mutex=device_mutex)
kwargs = dict(sock=sock,
handle_conn=handle_conn,
quit_event=quit_event)
with server.spawn(server.server_thread, kwargs):
try:
yield environ
finally:
log.debug('closing server')
quit_event.set()
def run_server(conn, command, debug, timeout):
"""Common code for run_agent and run_git below."""
try:
handler = protocol.Handler(conn=conn, debug=debug)
with server.serve(handler=handler, timeout=timeout) as env:
with serve(handler=handler, timeout=timeout) as env:
return server.run_process(command=command, environ=env)
except KeyboardInterrupt:
log.info('server stopped')

64
libagent/ssh/client.py Normal file
View File

@@ -0,0 +1,64 @@
"""
Connection to hardware authentication device.
It is used for getting SSH public keys and ECDSA signing of server requests.
"""
import io
import logging
from . import formats, util
log = logging.getLogger(__name__)
class Client(object):
"""Client wrapper for SSH authentication device."""
def __init__(self, device):
"""Connect to hardware device."""
self.device = device
def export_public_keys(self, identities):
"""Export SSH public keys from the device."""
public_keys = []
with self.device:
for i in identities:
pubkey = self.device.pubkey(identity=i)
vk = formats.decompress_pubkey(pubkey=pubkey,
curve_name=i.curve_name)
public_keys.append(formats.export_public_key(vk=vk,
label=str(i)))
return public_keys
def sign_ssh_challenge(self, blob, identity):
"""Sign given blob using a private key on the device."""
msg = _parse_ssh_blob(blob)
log.debug('%s: user %r via %r (%r)',
msg['conn'], msg['user'], msg['auth'], msg['key_type'])
log.debug('nonce: %r', msg['nonce'])
fp = msg['public_key']['fingerprint']
log.debug('fingerprint: %s', fp)
log.debug('hidden challenge size: %d bytes', len(blob))
log.info('please confirm user "%s" login to "%s" using %s...',
msg['user'].decode('ascii'), identity,
self.device)
with self.device:
return self.device.sign(blob=blob, identity=identity)
def _parse_ssh_blob(data):
res = {}
i = io.BytesIO(data)
res['nonce'] = util.read_frame(i)
i.read(1) # SSH2_MSG_USERAUTH_REQUEST == 50 (from ssh2.h, line 108)
res['user'] = util.read_frame(i)
res['conn'] = util.read_frame(i)
res['auth'] = util.read_frame(i)
i.read(1) # have_sig == 1 (from sshconnect2.c, line 1056)
res['key_type'] = util.read_frame(i)
public_key = util.read_frame(i)
res['public_key'] = formats.parse_pubkey(public_key)
assert not i.read()
return res

160
libagent/ssh/protocol.py Normal file
View File

@@ -0,0 +1,160 @@
"""
SSH-agent protocol implementation library.
See https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.agent and
http://ptspts.blogspot.co.il/2010/06/how-to-use-ssh-agent-programmatically.html
for more details.
The server's source code can be found here:
https://github.com/openssh/openssh-portable/blob/master/authfd.c
"""
import io
import logging
from . import formats, util
log = logging.getLogger(__name__)
# Taken from https://github.com/openssh/openssh-portable/blob/master/authfd.h
COMMANDS = dict(
SSH_AGENTC_REQUEST_RSA_IDENTITIES=1,
SSH_AGENT_RSA_IDENTITIES_ANSWER=2,
SSH_AGENTC_RSA_CHALLENGE=3,
SSH_AGENT_RSA_RESPONSE=4,
SSH_AGENT_FAILURE=5,
SSH_AGENT_SUCCESS=6,
SSH_AGENTC_ADD_RSA_IDENTITY=7,
SSH_AGENTC_REMOVE_RSA_IDENTITY=8,
SSH_AGENTC_REMOVE_ALL_RSA_IDENTITIES=9,
SSH2_AGENTC_REQUEST_IDENTITIES=11,
SSH2_AGENT_IDENTITIES_ANSWER=12,
SSH2_AGENTC_SIGN_REQUEST=13,
SSH2_AGENT_SIGN_RESPONSE=14,
SSH2_AGENTC_ADD_IDENTITY=17,
SSH2_AGENTC_REMOVE_IDENTITY=18,
SSH2_AGENTC_REMOVE_ALL_IDENTITIES=19,
SSH_AGENTC_ADD_SMARTCARD_KEY=20,
SSH_AGENTC_REMOVE_SMARTCARD_KEY=21,
SSH_AGENTC_LOCK=22,
SSH_AGENTC_UNLOCK=23,
SSH_AGENTC_ADD_RSA_ID_CONSTRAINED=24,
SSH2_AGENTC_ADD_ID_CONSTRAINED=25,
SSH_AGENTC_ADD_SMARTCARD_KEY_CONSTRAINED=26,
)
def msg_code(name):
"""Convert string name into a integer message code."""
return COMMANDS[name]
def msg_name(code):
"""Convert integer message code into a string name."""
ids = {v: k for k, v in COMMANDS.items()}
return ids[code]
def failure():
"""Return error code to SSH binary."""
error_msg = util.pack('B', msg_code('SSH_AGENT_FAILURE'))
return util.frame(error_msg)
def _legacy_pubs(buf):
"""SSH v1 public keys are not supported."""
leftover = buf.read()
if leftover:
log.warning('skipping leftover: %r', leftover)
code = util.pack('B', msg_code('SSH_AGENT_RSA_IDENTITIES_ANSWER'))
num = util.pack('L', 0) # no SSH v1 keys
return util.frame(code, num)
class Handler(object):
"""ssh-agent protocol handler."""
def __init__(self, conn, debug=False):
"""
Create a protocol handler with specified public keys.
Use specified signer function to sign SSH authentication requests.
"""
self.conn = conn
self.debug = debug
self.methods = {
msg_code('SSH_AGENTC_REQUEST_RSA_IDENTITIES'): _legacy_pubs,
msg_code('SSH2_AGENTC_REQUEST_IDENTITIES'): self.list_pubs,
msg_code('SSH2_AGENTC_SIGN_REQUEST'): self.sign_message,
}
def handle(self, msg):
"""Handle SSH message from the SSH client and return the response."""
debug_msg = ': {!r}'.format(msg) if self.debug else ''
log.debug('request: %d bytes%s', len(msg), debug_msg)
buf = io.BytesIO(msg)
code, = util.recv(buf, '>B')
if code not in self.methods:
log.warning('Unsupported command: %s (%d)', msg_name(code), code)
return failure()
method = self.methods[code]
log.debug('calling %s()', method.__name__)
reply = method(buf=buf)
debug_reply = ': {!r}'.format(reply) if self.debug else ''
log.debug('reply: %d bytes%s', len(reply), debug_reply)
return reply
def list_pubs(self, buf):
"""SSH v2 public keys are serialized and returned."""
assert not buf.read()
keys = self.conn.parse_public_keys()
code = util.pack('B', msg_code('SSH2_AGENT_IDENTITIES_ANSWER'))
num = util.pack('L', len(keys))
log.debug('available keys: %s', [k['name'] for k in keys])
for i, k in enumerate(keys):
log.debug('%2d) %s', i+1, k['fingerprint'])
pubs = [util.frame(k['blob']) + util.frame(k['name']) for k in keys]
return util.frame(code, num, *pubs)
def sign_message(self, buf):
"""
SSH v2 public key authentication is performed.
If the required key is not supported, raise KeyError
If the signature is invalid, raise ValueError
"""
key = formats.parse_pubkey(util.read_frame(buf))
log.debug('looking for %s', key['fingerprint'])
blob = util.read_frame(buf)
assert util.read_frame(buf) == b''
assert not buf.read()
for k in self.conn.parse_public_keys():
if (k['fingerprint']) == (key['fingerprint']):
log.debug('using key %r (%s)', k['name'], k['fingerprint'])
key = k
break
else:
raise KeyError('key not found')
label = key['name'].decode('ascii') # label should be a string
log.debug('signing %d-byte blob with "%s" key', len(blob), label)
try:
signature = self.conn.sign(blob=blob, identity=key['identity'])
except IOError:
return failure()
log.debug('signature: %r', signature)
try:
sig_bytes = key['verifier'](sig=signature, msg=blob)
log.info('signature status: OK')
except formats.ecdsa.BadSignatureError:
log.exception('signature status: ERROR')
raise ValueError('invalid ECDSA signature')
log.debug('signature size: %d bytes', len(sig_bytes))
data = util.frame(util.frame(key['type']), util.frame(sig_bytes))
code = util.pack('B', msg_code('SSH2_AGENT_SIGN_RESPONSE'))
return util.frame(code, data)

View File

@@ -0,0 +1 @@
"""Unit-tests for this package."""

View File

@@ -0,0 +1,72 @@
import io
import mock
import pytest
from .. import client, device, formats, util
ADDR = [2147483661, 2810943954, 3938368396, 3454558782, 3848009040]
CURVE = 'nist256p1'
PUBKEY = (b'\x03\xd8(\xb5\xa6`\xbet0\x95\xac:[;]\xdc,\xbd\xdc?\xd7\xc0\xec'
b'\xdd\xbc+\xfar~\x9dAis')
PUBKEY_TEXT = ('ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzd'
'HAyNTYAAABBBNgotaZgvnQwlaw6Wztd3Cy93D/XwOzdvCv6cn6dQWlzNMEQeW'
'VUfhvrGljR2Z/CMRONY6ejB+9PnpUOPuzYqi8= <localhost:22|nist256p1>\n')
class MockDevice(device.interface.Device): # pylint: disable=abstract-method
def connect(self): # pylint: disable=no-self-use
return mock.Mock()
def pubkey(self, identity, ecdh=False): # pylint: disable=unused-argument
assert self.conn
return PUBKEY
def sign(self, identity, blob):
"""Sign given blob and return the signature (as bytes)."""
assert self.conn
assert blob == BLOB
return SIG
BLOB = (b'\x00\x00\x00 \xce\xe0\xc9\xd5\xceu/\xe8\xc5\xf2\xbfR+x\xa1\xcf\xb0'
b'\x8e;R\xd3)m\x96\x1b\xb4\xd8s\xf1\x99\x16\xaa2\x00\x00\x00\x05roman'
b'\x00\x00\x00\x0essh-connection\x00\x00\x00\tpublickey'
b'\x01\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00h\x00\x00\x00'
b'\x13ecdsa-sha2-nistp256\x00\x00\x00\x08nistp256\x00\x00\x00A'
b'\x04\xd8(\xb5\xa6`\xbet0\x95\xac:[;]\xdc,\xbd\xdc?\xd7\xc0\xec'
b'\xdd\xbc+\xfar~\x9dAis4\xc1\x10yeT~\x1b\xeb\x1aX\xd1\xd9\x9f\xc21'
b'\x13\x8dc\xa7\xa3\x07\xefO\x9e\x95\x0e>\xec\xd8\xaa/')
SIG = (b'R\x19T\xf2\x84$\xef#\x0e\xee\x04X\xc6\xc3\x99T`\xd1\xd8\xf7!'
b'\x862@cx\xb8\xb9i@1\x1b3#\x938\x86]\x97*Y\xb2\x02Xa\xdf@\xecK'
b'\xdc\xf0H\xab\xa8\xac\xa7? \x8f=C\x88N\xe2')
def test_ssh_agent():
identity = device.interface.Identity(identity_str='localhost:22',
curve_name=CURVE)
c = client.Client(device=MockDevice())
assert c.export_public_keys([identity]) == [PUBKEY_TEXT]
signature = c.sign_ssh_challenge(blob=BLOB, identity=identity)
key = formats.import_public_key(PUBKEY_TEXT)
serialized_sig = key['verifier'](sig=signature, msg=BLOB)
stream = io.BytesIO(serialized_sig)
r = util.read_frame(stream)
s = util.read_frame(stream)
assert not stream.read()
assert r[:1] == b'\x00'
assert s[:1] == b'\x00'
assert r[1:] + s[1:] == SIG
# pylint: disable=unused-argument
def cancel_sign(identity, blob):
raise IOError(42, 'ERROR')
c.device.sign = cancel_sign
with pytest.raises(IOError):
c.sign_ssh_challenge(blob=BLOB, identity=identity)

View File

@@ -0,0 +1,109 @@
import mock
import pytest
from .. import device, formats, protocol
# pylint: disable=line-too-long
NIST256_KEY = 'ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBEUksojS/qRlTKBKLQO7CBX7a7oqFkysuFn1nJ6gzlR3wNuQXEgd7qb2bjmiiBHsjNxyWvH5SxVi3+fghrqODWo= ssh://localhost' # nopep8
NIST256_BLOB = b'\x00\x00\x00 !S^\xe7\xf8\x1cKN\xde\xcbo\x0c\x83\x9e\xc48\r\xac\xeb,]"\xc1\x9bA\x0eit\xc1\x81\xd4E2\x00\x00\x00\x05roman\x00\x00\x00\x0essh-connection\x00\x00\x00\tpublickey\x01\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00h\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00\x08nistp256\x00\x00\x00A\x04E$\xb2\x88\xd2\xfe\xa4eL\xa0J-\x03\xbb\x08\x15\xfbk\xba*\x16L\xac\xb8Y\xf5\x9c\x9e\xa0\xceTw\xc0\xdb\x90\\H\x1d\xee\xa6\xf6n9\xa2\x88\x11\xec\x8c\xdcrZ\xf1\xf9K\x15b\xdf\xe7\xe0\x86\xba\x8e\rj' # nopep8
NIST256_SIG = b'\x88G!\x0c\n\x16:\xbeF\xbe\xb9\xd2\xa9&e\x89\xad\xc4}\x10\xf8\xbc\xdc\xef\x0e\x8d_\x8a6.\xb6\x1fq\xf0\x16>,\x9a\xde\xe7(\xd6\xd7\x93\x1f\xed\xf9\x94ddw\xfe\xbdq\x13\xbb\xfc\xa9K\xea\x9dC\xa1\xe9' # nopep8
LIST_MSG = b'\x0b'
LIST_NIST256_REPLY = b'\x00\x00\x00\x84\x0c\x00\x00\x00\x01\x00\x00\x00h\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00\x08nistp256\x00\x00\x00A\x04E$\xb2\x88\xd2\xfe\xa4eL\xa0J-\x03\xbb\x08\x15\xfbk\xba*\x16L\xac\xb8Y\xf5\x9c\x9e\xa0\xceTw\xc0\xdb\x90\\H\x1d\xee\xa6\xf6n9\xa2\x88\x11\xec\x8c\xdcrZ\xf1\xf9K\x15b\xdf\xe7\xe0\x86\xba\x8e\rj\x00\x00\x00\x0fssh://localhost' # nopep8
NIST256_SIGN_MSG = b'\r\x00\x00\x00h\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00\x08nistp256\x00\x00\x00A\x04E$\xb2\x88\xd2\xfe\xa4eL\xa0J-\x03\xbb\x08\x15\xfbk\xba*\x16L\xac\xb8Y\xf5\x9c\x9e\xa0\xceTw\xc0\xdb\x90\\H\x1d\xee\xa6\xf6n9\xa2\x88\x11\xec\x8c\xdcrZ\xf1\xf9K\x15b\xdf\xe7\xe0\x86\xba\x8e\rj\x00\x00\x00\xd1\x00\x00\x00 !S^\xe7\xf8\x1cKN\xde\xcbo\x0c\x83\x9e\xc48\r\xac\xeb,]"\xc1\x9bA\x0eit\xc1\x81\xd4E2\x00\x00\x00\x05roman\x00\x00\x00\x0essh-connection\x00\x00\x00\tpublickey\x01\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00h\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00\x08nistp256\x00\x00\x00A\x04E$\xb2\x88\xd2\xfe\xa4eL\xa0J-\x03\xbb\x08\x15\xfbk\xba*\x16L\xac\xb8Y\xf5\x9c\x9e\xa0\xceTw\xc0\xdb\x90\\H\x1d\xee\xa6\xf6n9\xa2\x88\x11\xec\x8c\xdcrZ\xf1\xf9K\x15b\xdf\xe7\xe0\x86\xba\x8e\rj\x00\x00\x00\x00' # nopep8
NIST256_SIGN_REPLY = b'\x00\x00\x00j\x0e\x00\x00\x00e\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00J\x00\x00\x00!\x00\x88G!\x0c\n\x16:\xbeF\xbe\xb9\xd2\xa9&e\x89\xad\xc4}\x10\xf8\xbc\xdc\xef\x0e\x8d_\x8a6.\xb6\x1f\x00\x00\x00!\x00q\xf0\x16>,\x9a\xde\xe7(\xd6\xd7\x93\x1f\xed\xf9\x94ddw\xfe\xbdq\x13\xbb\xfc\xa9K\xea\x9dC\xa1\xe9' # nopep8
def fake_connection(keys, signer):
c = mock.Mock()
c.parse_public_keys.return_value = keys
c.sign = signer
return c
def test_list():
key = formats.import_public_key(NIST256_KEY)
key['identity'] = device.interface.Identity('ssh://localhost', 'nist256p1')
h = protocol.Handler(fake_connection(keys=[key], signer=None))
reply = h.handle(LIST_MSG)
assert reply == LIST_NIST256_REPLY
def test_list_legacy_pubs_with_suffix():
h = protocol.Handler(fake_connection(keys=[], signer=None))
suffix = b'\x00\x00\x00\x06foobar'
reply = h.handle(b'\x01' + suffix)
assert reply == b'\x00\x00\x00\x05\x02\x00\x00\x00\x00' # no legacy keys
def test_unsupported():
h = protocol.Handler(fake_connection(keys=[], signer=None))
reply = h.handle(b'\x09')
assert reply == b'\x00\x00\x00\x01\x05'
def ecdsa_signer(identity, blob):
assert str(identity) == '<ssh://localhost|nist256p1>'
assert blob == NIST256_BLOB
return NIST256_SIG
def test_ecdsa_sign():
key = formats.import_public_key(NIST256_KEY)
key['identity'] = device.interface.Identity('ssh://localhost', 'nist256p1')
h = protocol.Handler(fake_connection(keys=[key], signer=ecdsa_signer))
reply = h.handle(NIST256_SIGN_MSG)
assert reply == NIST256_SIGN_REPLY
def test_sign_missing():
h = protocol.Handler(fake_connection(keys=[], signer=ecdsa_signer))
with pytest.raises(KeyError):
h.handle(NIST256_SIGN_MSG)
def test_sign_wrong():
def wrong_signature(identity, blob):
assert str(identity) == '<ssh://localhost|nist256p1>'
assert blob == NIST256_BLOB
return b'\x00' * 64
key = formats.import_public_key(NIST256_KEY)
key['identity'] = device.interface.Identity('ssh://localhost', 'nist256p1')
h = protocol.Handler(fake_connection(keys=[key], signer=wrong_signature))
with pytest.raises(ValueError):
h.handle(NIST256_SIGN_MSG)
def test_sign_cancel():
def cancel_signature(identity, blob): # pylint: disable=unused-argument
raise IOError()
key = formats.import_public_key(NIST256_KEY)
key['identity'] = device.interface.Identity('ssh://localhost', 'nist256p1')
h = protocol.Handler(fake_connection(keys=[key], signer=cancel_signature))
assert h.handle(NIST256_SIGN_MSG) == protocol.failure()
ED25519_KEY = 'ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIFBdF2tjfSO8nLIi736is+f0erq28RTc7CkM11NZtTKR ssh://localhost' # nopep8
ED25519_SIGN_MSG = b'''\r\x00\x00\x003\x00\x00\x00\x0bssh-ed25519\x00\x00\x00 P]\x17kc}#\xbc\x9c\xb2"\xef~\xa2\xb3\xe7\xf4z\xba\xb6\xf1\x14\xdc\xec)\x0c\xd7SY\xb52\x91\x00\x00\x00\x94\x00\x00\x00 i3\xae}yk\\\xa1L\xb9\xe1\xbf\xbc\x8e\x87\r\x0e\xc0\x9f\x97\x0fTC!\x80\x07\x91\xdb^8\xc1\xd62\x00\x00\x00\x05roman\x00\x00\x00\x0essh-connection\x00\x00\x00\tpublickey\x01\x00\x00\x00\x0bssh-ed25519\x00\x00\x003\x00\x00\x00\x0bssh-ed25519\x00\x00\x00 P]\x17kc}#\xbc\x9c\xb2"\xef~\xa2\xb3\xe7\xf4z\xba\xb6\xf1\x14\xdc\xec)\x0c\xd7SY\xb52\x91\x00\x00\x00\x00''' # nopep8
ED25519_SIGN_REPLY = b'''\x00\x00\x00X\x0e\x00\x00\x00S\x00\x00\x00\x0bssh-ed25519\x00\x00\x00@\x8eb)\xa6\xe9P\x83VE\xfbq\xc6\xbf\x1dV3\xe3<O\x11\xc0\xfa\xe4\xed\xb8\x81.\x81\xc8\xa6\xba\x10RA'a\xbc\xa9\xd3\xdb\x98\x07\xf0\x1a\x9c4\x84<\xaf\x99\xb7\xe5G\xeb\xf7$\xc1\r\x86f\x16\x8e\x08\x05''' # nopep8
ED25519_BLOB = b'''\x00\x00\x00 i3\xae}yk\\\xa1L\xb9\xe1\xbf\xbc\x8e\x87\r\x0e\xc0\x9f\x97\x0fTC!\x80\x07\x91\xdb^8\xc1\xd62\x00\x00\x00\x05roman\x00\x00\x00\x0essh-connection\x00\x00\x00\tpublickey\x01\x00\x00\x00\x0bssh-ed25519\x00\x00\x003\x00\x00\x00\x0bssh-ed25519\x00\x00\x00 P]\x17kc}#\xbc\x9c\xb2"\xef~\xa2\xb3\xe7\xf4z\xba\xb6\xf1\x14\xdc\xec)\x0c\xd7SY\xb52\x91''' # nopep8
ED25519_SIG = b'''\x8eb)\xa6\xe9P\x83VE\xfbq\xc6\xbf\x1dV3\xe3<O\x11\xc0\xfa\xe4\xed\xb8\x81.\x81\xc8\xa6\xba\x10RA'a\xbc\xa9\xd3\xdb\x98\x07\xf0\x1a\x9c4\x84<\xaf\x99\xb7\xe5G\xeb\xf7$\xc1\r\x86f\x16\x8e\x08\x05''' # nopep8
def ed25519_signer(identity, blob):
assert str(identity) == '<ssh://localhost|ed25519>'
assert blob == ED25519_BLOB
return ED25519_SIG
def test_ed25519_sign():
key = formats.import_public_key(ED25519_KEY)
key['identity'] = device.interface.Identity('ssh://localhost', 'ed25519')
h = protocol.Handler(fake_connection(keys=[key], signer=ed25519_signer))
reply = h.handle(ED25519_SIGN_MSG)
assert reply == ED25519_SIGN_REPLY