diff --git a/trezor_agent/gpg/keyring.py b/trezor_agent/gpg/keyring.py index 90e9848..f5053dc 100644 --- a/trezor_agent/gpg/keyring.py +++ b/trezor_agent/gpg/keyring.py @@ -34,6 +34,8 @@ def _recvline(sock): while True: c = sock.recv(1) + if not c: + raise EOFError if c == b'\n': break reply.write(c) @@ -105,7 +107,7 @@ def parse_sig(sig): return parser(args=sig[1:]) -def sign_digest(sock, keygrip, digest, sp=subprocess): +def sign_digest(sock, keygrip, digest, sp=subprocess, environ=None): """Sign a digest using specified key using GPG agent.""" hash_algo = 8 # SHA256 assert len(digest) == 32 @@ -115,7 +117,7 @@ def sign_digest(sock, keygrip, digest, sp=subprocess): ttyname = sp.check_output(['tty']).strip() options = ['ttyname={}'.format(ttyname)] # set TTY for passphrase entry - display = os.environ.get('DISPLAY') + display = (environ or os.environ).get('DISPLAY') if display is not None: options.append('display={}'.format(display)) @@ -127,9 +129,8 @@ def sign_digest(sock, keygrip, digest, sp=subprocess): assert _communicate(sock, 'SETHASH {} {}'.format(hash_algo, hex_digest)) == b'OK' - desc = ('Please+enter+the+passphrase+to+unlock+the+OpenPGP%0A' - 'secret+key,+to+sign+a+new+TREZOR-based+subkey') - assert _communicate(sock, 'SETKEYDESC {}'.format(desc)) == b'OK' + assert _communicate(sock, 'SETKEYDESC ' + 'Sign+a+new+TREZOR-based+subkey') == b'OK' assert _communicate(sock, 'PKSIGN') == b'OK' line = _recvline(sock).strip() line = unescape(line) diff --git a/trezor_agent/gpg/tests/test_keyring.py b/trezor_agent/gpg/tests/test_keyring.py index b312a5b..53a7c86 100644 --- a/trezor_agent/gpg/tests/test_keyring.py +++ b/trezor_agent/gpg/tests/test_keyring.py @@ -1,3 +1,6 @@ +import io +import mock + from .. import keyring @@ -35,3 +38,39 @@ def test_parse_rsa(): assert sig == [b'sig-val', [b'rsa', [b's', b'\x01\x02\x03\x04']]] assert rest == b'' assert keyring.parse_sig(sig) == (0x1020304,) + + +class FakeSocket(object): + def __init__(self): + self.rx = io.BytesIO() + self.tx = io.BytesIO() + + def recv(self, n): + return self.rx.read(n) + + def sendall(self, data): + self.tx.write(data) + + +def test_sign_digest(): + sock = FakeSocket() + sock.rx.write(b'OK Pleased to meet you, process XYZ\n') + sock.rx.write(b'OK\n' * 6) + sock.rx.write(b'D (7:sig-val(3:rsa(1:s16:0123456789ABCDEF)))\n') + sock.rx.seek(0) + keygrip = '1234' + digest = b'A' * 32 + sp = mock.Mock(spec=['check_output']) + sp.check_output.return_value = '/dev/pts/0' + sig = keyring.sign_digest(sock=sock, keygrip=keygrip, + digest=digest, sp=sp, + environ={'DISPLAY': ':0'}) + assert sig == (0x30313233343536373839414243444546,) + assert sock.tx.getvalue() == b'''RESET +OPTION ttyname=/dev/pts/0 +OPTION display=:0 +SIGKEY 1234 +SETHASH 8 4141414141414141414141414141414141414141414141414141414141414141 +SETKEYDESC Sign+a+new+TREZOR-based+subkey +PKSIGN +'''