Compare commits

...

81 Commits

Author SHA1 Message Date
Roman Zeyde
1cfdddc33a Bump version: 0.14.5 → 0.14.6 2022-10-21 12:02:20 +03:00
Roman Zeyde
523dcb139a Merge pull request #399 from JamieDriver/update_jade_api
Update Jade api version
2022-09-21 21:15:07 +03:00
Jamie C. Driver
751ef7321b Update Jade api version 2022-09-15 11:07:17 +01:00
Roman Zeyde
38a3131a07 Merge pull request #396 from JamieDriver/jade_agent_install
Update setup.py for jade-agent
2022-09-01 20:33:19 +03:00
Jamie C. Driver
7c01789529 Update setup.py for jade-agent 2022-08-17 15:55:08 +01:00
Roman Zeyde
1763b0ea0c Drop Python 3.6 in CI 2022-07-10 20:06:06 +03:00
Roman Zeyde
8c5a9bfe02 Fixup pylint issues 2022-07-10 19:53:35 +03:00
Roman Zeyde
807c25a9fc Bump 'trezor_agent' version: 0.11.0 → 0.12.0 2022-07-09 19:20:46 +03:00
Roman Zeyde
b177da9ee8 Bump version: 0.14.4 → 0.14.5 2022-07-09 19:09:04 +03:00
Roman Zeyde
4242599114 Merge pull request #395 from afreakk/master
fix: provide buf argument to unsupported_extension function
2022-05-23 08:58:04 +03:00
afreakk
54e670c7ee fix: provide buf argument to unsupported_extension function 2022-05-22 14:09:40 +02:00
Roman Zeyde
5832d4a67b Merge pull request #394 from romanz/latest-ssh
Don't fail if not all request is parsed
2022-05-21 13:17:26 +03:00
Roman Zeyde
8d4536b37a Fixup linting issues 2022-05-21 12:09:00 +03:00
Roman Zeyde
b1b3e4b7ea Don't fail if not all request is parsed
https://www.openssh.com/agent-restrict.html

266678e19e
2022-05-21 12:07:07 +03:00
Roman Zeyde
80bfda7899 Merge pull request #386 from seandlg/patch-1
Fix a typo
2022-03-27 22:53:53 +03:00
Roman Zeyde
1166917461 Merge pull request #384 from yanchenko-igor/master
Handle usupported extensions, fixes #383 fixes #379
2022-03-27 22:53:25 +03:00
Sean Eulenberg
d7f6ceb429 Fix a typo
Fix a typo
2022-03-24 09:28:47 +01:00
Igor Yanchenko
a8f2d74d02 Handle usupported extesions, fixes #383 2022-03-14 17:18:39 +02:00
Roman Zeyde
df84c4c15f Small style fixes following #382
Tested with `tox`.
2022-03-11 09:24:16 +02:00
Roman Zeyde
0662ced2f4 Merge pull request #382 from JamieDriver/blockstream_jade_support
Add support for the Blockstream Jade hww
2022-03-10 23:37:28 +02:00
Jamie C. Driver
471d0e03e7 Add support for the Blockstream Jade hww
Supports ssh and gpg, incl. ecdh/decryption.
Initially only supports curve 'nist256p1'.
2022-03-10 17:24:11 +00:00
Roman Zeyde
e4d16a361a Merge branch 'master' of https://github.com/yanchenko-igor/trezor-agent 2022-01-21 21:23:09 +02:00
Igor Yanchenko
c6f30083ff Update setup.py
Added age support
2022-01-21 20:55:26 +02:00
Senjuu
23f8ef09a5 Add Support for NIST256 ssh-certificates
Adopt suggested naming scheme

Adding new unit tests
2021-12-21 19:38:44 +02:00
Roman Zeyde
f0769655ad Add age plugin support
See https://github.com/str4d/rage/tree/main/age-plugin.

Example usage:

	RAGE_DIR=$PWD/../Rust/rage
	(cd $RAGE_DIR; cargo build --all)
	export PATH=$PATH:$RAGE_DIR/target/debug

	age-plugin-trezor -i "John Doe" | tee trezor.id
	R=$(grep recipient trezor.id | cut -f 3 -d ' ')

	date | tee msg.txt
	rage -er $R < msg.txt > enc.txt
	rage -di trezor.id < enc.txt
2021-12-14 20:43:04 +02:00
Roman Zeyde
2a6a47f400 Support SSH signatures
https://www.agwa.name/blog/post/ssh_signatures

See here for more details:
https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.sshsig
https://github.com/openssh/openssh-portable/blob/master/sshsig.c
2a9c9f7272
2021-12-13 10:58:15 +02:00
Roman Zeyde
47c827519e Bump dependency to latest trezorlib 2021-12-09 20:23:00 +02:00
Roman Zeyde
a9117c965c Bump version: 0.14.3 → 0.14.4 2021-11-06 14:41:42 +02:00
Roman Zeyde
8107e6378c Don't use sys.argv for device name parsing 2021-11-05 10:03:22 +02:00
Roman Zeyde
85d2da5460 Bump version: 0.14.2 → 0.14.3 2021-11-02 09:28:03 +02:00
Roman Zeyde
5e5a96b96f Merge branch 'fixes' 2021-11-02 09:23:55 +02:00
Roman Zeyde
69c5c57489 Support "fast-path" key listing
https://dev.gnupg.org/rG40da61b89b62dcb77847dc79eb159e885f52f817#change-o4DEJvEV1Dx2

Also, refactor decoding and add a few tests.
2021-11-02 09:22:19 +02:00
Roman Zeyde
b9db213912 Use Popen.communicate to get stdout from subprocess 2021-11-01 14:07:02 +02:00
Roman Zeyde
6c2b880b7d Support daemonization of GPG agent 2021-11-01 14:07:02 +02:00
Roman Zeyde
37510a2d75 Fix FakeDevice close() and pubkey() 2021-10-25 21:15:20 +03:00
Roman Zeyde
b6de68e95c Run CI also on Python 3.10 2021-10-25 08:37:51 +03:00
Roman Zeyde
ee4b1fcdb6 Multiple style fixes 2021-10-22 19:47:15 +03:00
Roman Zeyde
6d55512619 Merge pull request #361 from melpomene/patch-1
Udev rule configuration link was dead
2021-09-20 10:20:59 +03:00
Christopher Käck
b902f43ba1 Udev rule configuration link was dead
and redirecting to the root page for documentation.
2021-09-19 22:19:32 -07:00
Roman Zeyde
338a075ed5 Allow looking TREZOR by path prefix 2021-06-21 21:32:43 +03:00
Roman Zeyde
bcea720e95 Test on Python 3.{6,7,8,9} 2021-05-22 22:13:58 +03:00
Roman Zeyde
1c6d2cb65a Update README badge 2021-05-22 22:08:23 +03:00
Roman Zeyde
53fe6cd5ad Merge branch 'github-ci' 2021-05-22 22:04:00 +03:00
Roman Zeyde
a0e7aae1d2 Enable isort check in tox 2021-05-22 21:54:20 +03:00
Roman Zeyde
7f4269ab88 Add GitHub CI
Fixup a few pylint comments
2021-05-22 21:51:45 +03:00
Roman Zeyde
36e7afde17 Remove Travis CI 2021-05-22 14:46:21 +03:00
Roman Zeyde
020572ef5f Support Signify-based signatures
http://www.openbsd.org/papers/bsdcan-signify.html
2020-12-29 09:14:06 +02:00
Roman Zeyde
dbae284487 Short-circuit calling tty if stdin is redirected 2020-12-25 16:30:22 +02:00
Roman Zeyde
f5b99c0794 Bump version: 0.14.1 → 0.14.2 2020-12-16 20:29:17 +02:00
Roman Zeyde
f66da28cc3 Unbump setup.py 2020-12-16 20:28:52 +02:00
Roman Zeyde
c3853e97c7 Merge branch 'feature/fix-continous-integration' of https://github.com/galuszkak/trezor-agent into fix-ci 2020-10-22 18:41:56 +03:00
Kamil Gałuszka
32eff19bb6 fix: linter fixes and added python 3.9 to tests 2020-10-15 01:05:40 +02:00
onlykey
fd182e744f Add OnlyKey support 2020-09-24 22:29:21 +03:00
Roman Zeyde
a12202d809 Move decompression into device.pubkey() 2020-09-24 14:41:50 +03:00
Roman Zeyde
d0e7fa7cca Require older version of isort for pylint 2020-08-05 08:52:37 +03:00
onlykey
e1bbdb4bcc Replace 'ed25519' by 'pynacl' 2020-08-05 08:51:40 +03:00
Roman Zeyde
4d9d6c0741 Fix a typo in systemd unit example 2020-07-04 11:12:19 +03:00
Roman Zeyde
4c3c5a7c53 Merge pull request #330 from Karunamon/patch-1
Quote PATH when writing agent invocation script
2020-05-17 15:28:16 +03:00
Michael Parks
362ddcc707 Quote PATH when writing agent invocation script
If the PATH contains spaces, the agent invocation script will fail parsing. This quotes the variable so that spaces don't break the script.
2020-05-17 00:41:56 -06:00
Roman Zeyde
88ff57187f Bump version: 0.14.0 → 0.14.1 2020-05-02 17:43:44 +03:00
Roman Zeyde
52d840cbbb Initialize passphrase cache at UI c-tor 2020-04-29 22:01:06 +03:00
Roman Zeyde
8c22e5030b Bump 'trezor_agent' version: 0.10.0 → 0.11.0 2020-04-17 14:42:15 +03:00
Roman Zeyde
18c80b4cca Bump version: 0.13.1 → 0.14.0 2020-04-17 14:31:41 +03:00
Roman Zeyde
7eab4933ed Add more Python version to Travis 2020-04-17 14:30:36 +03:00
Roman Zeyde
d103ebee6f Fix pylint warning 2020-04-17 14:28:50 +03:00
matejcik
d8bcca3ccb support trezorlib 0.12 2020-04-09 14:41:56 +02:00
Roman Zeyde
67ef11419a Merge pull request #320 from eli-b/patch-5
docs: Install libagent from source too
2020-04-06 23:30:46 +03:00
Eli Boyarski
d4d168c746 docs: Install libagent from source too
Installing the trezor/ledger agent from source installs the libagent module from PyPI unless libagent is already installed from source beforehand.
2020-04-06 20:44:25 +03:00
Roman Zeyde
61cfcef35c Merge branch 'NTICompass/keepkey-webusb' 2020-03-16 23:21:01 +02:00
Eric Siegel
0f627e8322 Clean up code... 2020-03-16 15:26:15 -04:00
Eric Siegel (Rocket Hazmat)
7bdfa7609d Upgrade KeepKey for new libagent code
Add get_public_node for KeepKey
2020-03-13 13:50:09 -04:00
Eric Siegel (Rocket Hazmat)
53b08f4968 Fix detecting KeepKey USB device
The new KeepKey firmware uses WebUSB instead of HID
2020-03-13 13:05:08 -04:00
Roman Zeyde
15b0218bf2 Default GPG key creation time to 0 (i.e. Jan 1 1970) 2019-10-29 09:14:26 +02:00
Roman Zeyde
f52e959639 Merge branch 'patch-2' of https://github.com/zack-shoylev/trezor-agent 2019-10-29 09:12:18 +02:00
Roman Zeyde
d98f49445e Merge branch 'patch-1' of https://github.com/korzq/trezor-agent 2019-10-26 13:52:02 +03:00
Roman Zeyde
ab6892f42f Fix pylint warnings 2019-10-26 13:47:29 +03:00
Eric Zhu
f03312d61f Update README-SSH.md 2019-10-02 17:41:51 -04:00
Roman Zeyde
b75cf74976 Merge pull request #301 from hkjn/20190925-describe-versioning
Add components section
2019-09-25 18:44:47 +03:00
Henrik Jonsson
363b4d633f Add components section 2019-09-25 12:19:33 +02:00
Zack Shoylev
b7d0ef0f94 Update README-SSH.md
Fix typo
2019-08-27 15:33:02 -05:00
Zack Shoylev
8c3744c30c Update README-SSH.md
Small systemd doc improvements.
2019-06-25 13:24:32 -05:00
52 changed files with 1473 additions and 206 deletions

View File

@@ -1,7 +1,6 @@
[bumpversion]
commit = True
tag = True
current_version = 0.13.1
current_version = 0.14.6
[bumpversion:file:setup.py]

24
.github/workflows/ci.yml vendored Normal file
View File

@@ -0,0 +1,24 @@
name: Build
on: [push]
jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.7', '3.8', '3.9', '3.10']
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip tox
- name: Build and test
run: |
tox

View File

@@ -1,5 +1,4 @@
[MESSAGES CONTROL]
disable=invalid-name, missing-docstring, locally-disabled, unbalanced-tuple-unpacking,no-else-return,fixme,duplicate-code,cyclic-import
disable=invalid-name, missing-docstring, locally-disabled, unbalanced-tuple-unpacking,no-else-return,fixme,duplicate-code,cyclic-import,import-outside-toplevel,consider-using-with,consider-using-f-string,unspecified-encoding
[SIMILARITIES]
min-similarity-lines=5

View File

@@ -1,27 +0,0 @@
sudo: false
language: python
python:
- "3.5"
- "3.6"
cache:
directories:
- $HOME/.cache/pip
before_install:
- pip install -U pip wheel
- pip install -U setuptools
- pip install -U pylint coverage pycodestyle pydocstyle
install:
- pip install -U -e .
script:
- pycodestyle libagent
- pylint --reports=no --rcfile .pylintrc libagent
- pydocstyle libagent
- coverage run --source libagent/ -m py.test -v
after_success:
- coverage report

View File

@@ -1,6 +1,6 @@
# Hardware-based SSH/GPG agent
[![Build Status](https://travis-ci.org/romanz/trezor-agent.svg?branch=master)](https://travis-ci.org/romanz/trezor-agent)
[![Build](https://github.com/romanz/trezor-agent/actions/workflows/ci.yml/badge.svg)](https://github.com/romanz/trezor-agent/actions)
[![Chat](https://badges.gitter.im/romanz/trezor-agent.svg)](https://gitter.im/romanz/trezor-agent)
This project allows you to use various hardware security devices to operate GPG and SSH. Instead of keeping your key on your computer and decrypting it with a passphrase when you want to use it, the key is generated and stored on the device and never reaches your computer. Read more about the design [here](doc/DESIGN.md).
@@ -14,7 +14,23 @@ See the following blog posts about this tool:
- [TREZOR Firmware 1.4.0GPG decryption support](https://www.reddit.com/r/TREZOR/comments/50h8r9/new_trezor_firmware_fidou2f_and_initial_ethereum/d7420q7/)
- [A Step by Step Guide to Securing your SSH Keys with the Ledger Nano S](https://thoughts.t37.net/a-step-by-step-guide-to-securing-your-ssh-keys-with-the-ledger-nano-s-92e58c64a005)
Currently [TREZOR One](https://trezor.io/), [TREZOR Model T](https://trezor.io/), [Keepkey](https://www.keepkey.com/), and [Ledger Nano S](https://www.ledgerwallet.com/products/ledger-nano-s) are supported.
Currently [TREZOR One](https://trezor.io/), [TREZOR Model T](https://trezor.io/), [Keepkey](https://www.keepkey.com/), [Ledger Nano S](https://www.ledgerwallet.com/products/ledger-nano-s), and [OnlyKey](https://onlykey.io) are supported.
## Components
This repository contains source code for one library as well as
agents to interact with several different hardware devices:
* [`libagent`](https://pypi.org/project/libagent/): shared library
* [`trezor-agent`](https://pypi.org/project/trezor-agent/): Using Trezor as hardware-based SSH/PGP agent
* [`ledger_agent`](https://pypi.org/project/ledger_agent/): Using Ledger as hardware-based SSH/PGP agent
* [`jade_agent`](https://pypi.org/project/jade_agent/): Using Blockstream Jade as hardware-based SSH/PGP agent
* [`keepkey_agent`](https://pypi.org/project/keepkey_agent/): Using KeepKey as hardware-based SSH/PGP agent
* [`onlykey-agent`](https://pypi.org/project/onlykey-agent/): Using OnlyKey as hardware-based SSH/PGP agent
The [/releases](/releases) page on Github contains the `libagent`
releases.
## Documentation
@@ -24,4 +40,4 @@ Currently [TREZOR One](https://trezor.io/), [TREZOR Model T](https://trezor.io/)
Note: If you're using Windows, see [trezor-ssh-agent](https://github.com/martin-lizner/trezor-ssh-agent) by Martin Lízner.
* **GPG** instructions and common use cases are [here](doc/README-GPG.md)
* Instructions to configure a Trezor-style **PIN entry** program are [here](doc/README-PINENTRY.md)
* Instructions to configure a Trezor-style **PIN entry** program are [here](doc/README-PINENTRY.md)

View File

@@ -0,0 +1,7 @@
import libagent.gpg
import libagent.ssh
from libagent.device.jade import BlockstreamJade as DeviceType
ssh_agent = lambda: libagent.ssh.main(DeviceType)
gpg_tool = lambda: libagent.gpg.main(DeviceType)
gpg_agent = lambda: libagent.gpg.run_agent(DeviceType)

41
agents/jade/setup.py Normal file
View File

@@ -0,0 +1,41 @@
#!/usr/bin/env python
from setuptools import setup
setup(
name='jade_agent',
version='0.1.0',
description='Using Blockstream Jade as hardware SSH agent',
author='Jamie C. Driver',
author_email='jamie@blockstream.com',
url='http://github.com/romanz/trezor-agent',
scripts=['jade_agent.py'],
install_requires=[
'libagent>=0.14.5',
# Jade py api from github source, v0.1.37
'jadepy[requests] @ git+https://github.com/Blockstream/Jade.git@0.1.37'
],
platforms=['POSIX'],
classifiers=[
'Environment :: Console',
'Development Status :: 4 - Beta',
'Intended Audience :: Developers',
'Intended Audience :: Information Technology',
'Intended Audience :: System Administrators',
'License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)',
'Operating System :: POSIX',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Topic :: Software Development :: Libraries :: Python Modules',
'Topic :: System :: Networking',
'Topic :: Communications',
'Topic :: Security',
'Topic :: Utilities',
],
entry_points={'console_scripts': [
'jade-agent = jade_agent:ssh_agent',
'jade-gpg = jade_agent:gpg_tool',
'jade-gpg-agent = jade_agent:gpg_agent',
]},
)

View File

@@ -0,0 +1,7 @@
import libagent.gpg
import libagent.ssh
from libagent.device.onlykey import OnlyKey as DeviceType
ssh_agent = lambda: libagent.ssh.main(DeviceType)
gpg_tool = lambda: libagent.gpg.main(DeviceType)
gpg_agent = lambda: libagent.gpg.run_agent(DeviceType)

40
agents/onlykey/setup.py Normal file
View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python
from setuptools import setup
setup(
name='onlykey-agent',
version='1.2.0',
description='Using onlykey as hardware SSH/GPG agent',
author='CryptoTrust',
author_email='t@crp.to',
url='http://github.com/trustcrypto/onlykey-agent',
scripts=['onlykey_agent.py'],
install_requires=[
'libagent>=0.14.2',
'onlykey>=1.2.0'
],
platforms=['POSIX'],
classifiers=[
'Environment :: Console',
'Development Status :: 4 - Beta',
'Intended Audience :: Developers',
'Intended Audience :: Information Technology',
'Intended Audience :: System Administrators',
'License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)',
'Operating System :: POSIX',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Topic :: Software Development :: Libraries :: Python Modules',
'Topic :: System :: Networking',
'Topic :: Communications',
'Topic :: Security',
'Topic :: Utilities',
],
entry_points={'console_scripts': [
'onlykey-agent = onlykey_agent:ssh_agent',
'onlykey-gpg = onlykey_agent:gpg_tool',
'onlykey-gpg-agent = onlykey_agent:gpg_agent',
]},
)

View File

@@ -3,15 +3,15 @@ from setuptools import setup
setup(
name='trezor_agent',
version='0.10.0',
version='0.12.0',
description='Using Trezor as hardware SSH/GPG agent',
author='Roman Zeyde',
author_email='roman.zeyde@gmail.com',
url='http://github.com/romanz/trezor-agent',
scripts=['trezor_agent.py'],
install_requires=[
'libagent>=0.13.0',
'trezor[hidapi]>=0.11.0'
'libagent>=0.14.0',
'trezor[hidapi]>=0.13'
],
platforms=['POSIX'],
classifiers=[
@@ -22,10 +22,10 @@ setup(
'Intended Audience :: System Administrators',
'License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)',
'Operating System :: POSIX',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Topic :: Software Development :: Libraries :: Python Modules',
'Topic :: System :: Networking',
'Topic :: Communications',
@@ -36,5 +36,7 @@ setup(
'trezor-agent = trezor_agent:ssh_agent',
'trezor-gpg = trezor_agent:gpg_tool',
'trezor-gpg-agent = trezor_agent:gpg_agent',
'trezor-signify = trezor_agent:signify_tool',
'age-plugin-trezor = trezor_agent:age_tool', # see https://github.com/str4d/rage/blob/main/age-plugin/README.md
]},
)

View File

@@ -1,7 +1,8 @@
import libagent.gpg
import libagent.ssh
from libagent import age, signify, gpg, ssh
from libagent.device.trezor import Trezor as DeviceType
ssh_agent = lambda: libagent.ssh.main(DeviceType)
gpg_tool = lambda: libagent.gpg.main(DeviceType)
gpg_agent = lambda: libagent.gpg.run_agent(DeviceType)
age_tool = lambda: age.main(DeviceType)
ssh_agent = lambda: ssh.main(DeviceType)
gpg_tool = lambda: gpg.main(DeviceType)
gpg_agent = lambda: gpg.run_agent(DeviceType)
signify_tool = lambda: signify.main(DeviceType)

View File

@@ -6,7 +6,7 @@ SSH and GPG do this by means of a simple interprocess communication protocol (us
These two agents make the connection between the front end (e.g. a `gpg --sign` command, or an `ssh user@fqdn`). And then they wait for a request from the 'front end', and then do the actual asking for a password and subsequent using the private key to sign or decrypt something.
The various hardware wallets (Trezor, KeepKey and Ledger) each have the ability (as of Firmware 1.3.4) to use the NIST P-256 elliptic curve to sign, encrypt or decrypt. This curve can be used with S/MIME, GPG and SSH.
The various hardware wallets (Trezor, KeepKey, Ledger and Jade) each have the ability (as of Firmware 1.3.4) to use the NIST P-256 elliptic curve to sign, encrypt or decrypt. This curve can be used with S/MIME, GPG and SSH.
So when you `ssh` to a machine - rather than consult the normal ssh-agent (which in turn will use your private SSH key in files such as `~/.ssh/id_rsa`) -- the trezor-agent will aks your hardware wallet to use its private key to sign the challenge.
@@ -30,7 +30,7 @@ So taking a commmand such as:
The `trezor-agent` will take the `user`@`fqdn.com`; canonicalise it (e.g. to add the ssh default port number if none was specified) and then apply some simple hashing (See [SLIP-0013 : Authentication using deterministic hierarchy][2]). The resulting 128bit hash is then used to construct a lead 'HD node' that contains an extened public private *child* key.
This way they keypair is specific to the server/hostname/port and protocol combination used. And it is this private key that is used to sign the nonce passed by the SSH server (as opposed to the master key).
This way the keypair is specific to the server/hostname/port and protocol combination used. And it is this private key that is used to sign the nonce passed by the SSH server (as opposed to the master key).
The `trezor-agent` then instructs SSH to connect to the server. It will then engage in the normal challenge response process, ask the hardware wallet to blindly sign any nonce flashed by the server with the derived child private key and return this to the server. It then hands over to normal SSH for the rest of the logged in session.

View File

@@ -61,7 +61,7 @@ gpg (GnuPG) 2.1.15
* [TREZOR firmware releases](https://wallet.trezor.io/data/firmware/releases.json): `1.4.2+`
2. Make sure that your `udev` rules are configured [correctly](https://doc.satoshilabs.com/trezor-user/settingupchromeonlinux.html#manual-configuration-of-udev-rules).
2. Make sure that your `udev` rules are configured [correctly](https://wiki.trezor.io/Udev_rules).
3. Then, install the latest [trezor_agent](https://pypi.python.org/pypi/trezor_agent) package:
@@ -74,6 +74,7 @@ gpg (GnuPG) 2.1.15
```
$ git clone https://github.com/romanz/trezor-agent
$ pip3 install --user -e trezor-agent
$ pip3 install --user -e trezor-agent/agents/trezor
```
@@ -126,10 +127,56 @@ Then, install the latest [keepkey_agent](https://pypi.python.org/pypi/keepkey_ag
```
$ git clone https://github.com/romanz/trezor-agent
$ pip3 install --user -e trezor-agent
$ pip3 install --user -e trezor-agent/agents/ledger
```
# 5. Installation Troubleshooting
# 5. Install the OnlyKey agent
1. Make sure you are running the latest firmware version on your OnlyKey:
* [OnlyKey Firmware Upgrade Guide](https://docs.crp.to/upgradeguide.html)
2. Make sure that your `udev` rules are configured [correctly](https://docs.crp.to/linux.html#udev-rule).
3. Then, install the latest [onlykey-agent](https://pypi.python.org/pypi/onlykey-agent) package:
```
$ pip3 install onlykey-agent
```
Or, directly from the latest source code:
```
$ git clone https://github.com/romanz/trezor-agent
$ pip3 install --user -e trezor-agent
$ pip3 install --user -e trezor-agent/agents/onlykey
```
# 6. Install the Blockstream Jade agent
1. Make sure you are running the latest firmware version on your Blockstream Jade:
* [Jade firmware releases](https://github.com/Blockstream/Jade/blob/master/CHANGELOG.md): `0.1.33+`
2. Make sure that your `udev` rules are configured [correctly](https://github.com/bitcoin-core/HWI/blob/master/hwilib/udev/55-usb-jade.rules).
3. If necessary, ensure the user is added to the [`dialout` group](https://help.blockstream.com/hc/en-us/articles/900005443223-My-Blockstream-Jade-is-not-recognized-by-my-computer)
4. Then, install the latest [jade-agent](https://pypi.python.org/pypi/jade-agent) package:
```
$ pip3 install jade-agent
```
Or, directly from the latest source code:
```
$ git clone https://github.com/romanz/trezor-agent
$ pip3 install --user -e trezor-agent
$ pip3 install --user -e trezor-agent/agents/jade
```
# 7. Installation Troubleshooting
If there is an import problem with the installed `protobuf` package,
see [this issue](https://github.com/romanz/trezor-agent/issues/28) for fixing it.

View File

@@ -5,7 +5,7 @@ and please let me [know](https://github.com/romanz/trezor-agent/issues/new) if s
work well for you. If possible:
* record the session (e.g. using [asciinema](https://asciinema.org))
* attach the GPG agent log from `~/.gnupg/{trezor,ledger}/gpg-agent.log` (can be [encrypted](https://keybase.io/romanz))
* attach the GPG agent log from `~/.gnupg/{trezor,ledger,jade}/gpg-agent.log` (can be [encrypted](https://keybase.io/romanz))
Thanks!
@@ -18,14 +18,14 @@ Thanks!
Run
```
$ (trezor|keepkey|ledger)-gpg init "Roman Zeyde <roman.zeyde@gmail.com>"
$ (trezor|keepkey|ledger|jade|onlykey)-gpg init "Roman Zeyde <roman.zeyde@gmail.com>"
```
Follow the instructions provided to complete the setup. Keep note of the timestamp value which you'll need if you want to regenerate the key later.
If you'd like a Trezor-style PIN entry program, follow [these instructions](README-PINENTRY.md).
2. Add `export GNUPGHOME=~/.gnupg/(trezor|keepkey|ledger)` to your `.bashrc` or other environment file.
2. Add `export GNUPGHOME=~/.gnupg/(trezor|keepkey|ledger|jade|onlykey)` to your `.bashrc` or other environment file.
This `GNUPGHOME` contains your hardware keyring and agent settings. This agent software assumes all keys are backed by hardware devices so you can't use standard GPG keys in `GNUPGHOME` (if you do mix keys you'll receive an error when you attempt to use them).
@@ -203,7 +203,7 @@ Follow [these instructions](enigmail.md) to set up Enigmail in Thunderbird.
##### 1. Create these files in `~/.config/systemd/user`
Replace `trezor` with `keepkey` or `ledger` as required.
Replace `trezor` with `keepkey` or `ledger` or `jade` or `onlykey` as required.
###### `trezor-gpg-agent.service`
@@ -213,7 +213,7 @@ Description=trezor-gpg-agent
Requires=trezor-gpg-agent.socket
[Service]
Type=Simple
Type=simple
Environment="GNUPGHOME=%h/.gnupg/trezor"
Environment="PATH=/bin:/usr/bin:/usr/local/bin:%h/.local/bin"
ExecStart=/usr/bin/trezor-gpg-agent -vv

View File

@@ -4,13 +4,13 @@
SSH requires no configuration, but you may put common command line options in `~/.ssh/agent.conf` to avoid repeating them in every invocation.
See `(trezor|keepkey|ledger)-agent -h` for details on supported options and the configuration file format.
See `(trezor|keepkey|ledger|jade|onlykey)-agent -h` for details on supported options and the configuration file format.
If you'd like a Trezor-style PIN entry program, follow [these instructions](README-PINENTRY.md).
## 2. Usage
Use the `(trezor|keepkey|ledger)-agent` program to work with SSH. It has three main modes of operation:
Use the `(trezor|keepkey|ledger|jade|onlykey)-agent` program to work with SSH. It has three main modes of operation:
##### 1. Export public keys
@@ -18,7 +18,7 @@ To get your public key so you can add it to `authorized_hosts` or allow
ssh access to a service that supports it, run:
```
(trezor|keepkey|ledger)-agent identity@myhost
(trezor|keepkey|ledger|jade|onlykey)-agent identity@myhost
```
The identity (ex: `identity@myhost`) is used to derive the public key and is added as a comment to the exported key string.
@@ -28,26 +28,31 @@ The identity (ex: `identity@myhost`) is used to derive the public key and is add
Run
```
$ (trezor|keepkey|ledger)-agent identity@myhost -- COMMAND --WITH --ARGUMENTS
$ (trezor|keepkey|ledger|jade|onlykey)-agent identity@myhost -- COMMAND --WITH --ARGUMENTS
```
to start the agent in the background and execute the command with environment variables set up to use the SSH agent. The specified identity is used for all SSH connections. The agent will exit after the command completes.
Note the `--` separator, which is used to separate `trezor-agent`'s arguments from the SSH command arguments.
Example:
```
(trezor|keepkey|ledger|jade|onlykey)-agent -e ed25519 bob@example.com -- rsync up/ bob@example.com:/home/bob
```
As a shortcut you can run
```
$ (trezor|keepkey|ledger)-agent identity@myhost -s
$ (trezor|keepkey|ledger|jade|onlykey)-agent identity@myhost -s
```
to start a shell with the proper environment.
##### 3. Connect to a server directly via `(trezor|keepkey|ledger)-agent`
##### 3. Connect to a server directly via `(trezor|keepkey|ledger|jade|onlykey)-agent`
If you just want to connect to a server this is the simplest way to do it:
```
$ (trezor|keepkey|ledger)-agent user@remotehost -c
$ (trezor|keepkey|ledger|jade|onlykey)-agent user@remotehost -c
```
The identity `user@remotehost` is used as both the destination user and host as well as for key derivation, so you must generate a separate key for each host you connect to.
@@ -113,7 +118,7 @@ The same works for Mercurial (e.g. on [BitBucket](https://confluence.atlassian.c
##### 1. Create these files in `~/.config/systemd/user`
Replace `trezor` with `keepkey` or `ledger` as required.
Replace `trezor` with `keepkey` or `ledger` or `jade` or `onlykey` as required.
###### `trezor-ssh-agent.service`
@@ -124,6 +129,7 @@ Requires=trezor-ssh-agent.socket
[Service]
Type=simple
Restart=always
Environment="DISPLAY=:0"
Environment="PATH=/bin:/usr/bin:/usr/local/bin:%h/.local/bin"
ExecStart=/usr/bin/trezor-agent --foreground --sock-path %t/trezor-agent/S.ssh IDENTITY
@@ -133,6 +139,13 @@ If you've installed `trezor-agent` locally you may have to change the path in `E
Replace `IDENTITY` with the identity you used when exporting the public key.
`IDENTITY` can be a path (starting with `/`) to a file containing a list of public keys
generated by Trezor. I.e. `/home/myUser/.ssh/trezor.conf` with one public key per line.
This is a more convenient way to have a systemd setup that has to handle multiple
keys/hosts.
When updating the file, make sure to restart trezor-agent.
If you have multiple Trezors connected, you can select which one to use via a `TREZOR_PATH`
environment variable. Use `trezorctl list` to find the correct path. Then add it
to the agent with the following line:
@@ -168,9 +181,13 @@ systemctl --user enable trezor-ssh-agent.socket
##### 3. Add this line to your `.bashrc` or equivalent file:
```bash
export SSH_AUTH_SOCK=$(systemctl show --user --property=Listen trezor-ssh-agent.socket | grep -o "/run.*")
export SSH_AUTH_SOCK=$(systemctl show --user --property=Listen trezor-ssh-agent.socket | grep -o "/run.*" | cut -d " " -f 1)
```
Make sure the SSH_AUTH_SOCK variable matches the location of the socket that trezor-agent
is listening on: `ps -x | grep trezor-agent`. In this setup trezor-agent should start
automatically when the socket is opened.
##### 4. SSH will now automatically use your device key in all terminals.
## 4. Troubleshooting

175
libagent/age/__init__.py Normal file
View File

@@ -0,0 +1,175 @@
"""
TREZOR support for AGE format.
See these links for more details:
- https://age-encryption.org/v1
- https://github.com/FiloSottile/age
- https://github.com/str4d/rage/
"""
import argparse
import base64
import contextlib
import datetime
import logging
import os
import sys
import traceback
import bech32
import pkg_resources
import semver
from cryptography.exceptions import InvalidTag
from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305
from .. import device, server, util
from . import client
log = logging.getLogger(__name__)
def bech32_decode(prefix, encoded):
"""Decode Bech32-encoded data."""
hrp, data = bech32.bech32_decode(encoded)
assert prefix == hrp
return bytes(bech32.convertbits(data, 5, 8, pad=False))
def bech32_encode(prefix, data):
"""Encode data using Bech32."""
return bech32.bech32_encode(prefix, bech32.convertbits(bytes(data), 8, 5))
def run_pubkey(device_type, args):
"""Initialize hardware-based GnuPG identity."""
log.warning('This AGE tool is still in EXPERIMENTAL mode, '
'so please note that the API and features may '
'change without backwards compatibility!')
c = client.Client(device=device_type())
pubkey = c.pubkey(identity=client.create_identity(args.identity), ecdh=True)
recipient = bech32_encode(prefix="age", data=pubkey)
print(f"# recipient: {recipient}")
print(f"# SLIP-0017: {args.identity}")
data = args.identity.encode()
encoded = bech32_encode(prefix="age-plugin-trezor-", data=data).upper()
decoded = bech32_decode(prefix="age-plugin-trezor-", encoded=encoded)
assert decoded.startswith(data)
print(encoded)
def base64_decode(encoded: str) -> bytes:
"""Decode Base64-encoded data (after padding correctly with '=')."""
k = len(encoded) % 4
pad = (4 - k) if k else 0
return base64.b64decode(encoded + ("=" * pad))
def base64_encode(data: bytes) -> str:
"""Encode data using Base64 (and remove '=')."""
return base64.b64encode(data).replace(b"=", b"").decode()
def decrypt(key, encrypted):
"""Decrypt age-encrypted data."""
cipher = ChaCha20Poly1305(key)
try:
return cipher.decrypt(
nonce=(b"\x00" * 12),
data=encrypted,
associated_data=None)
except InvalidTag:
return None
def run_decrypt(device_type, args):
"""Unlock hardware device (for future interaction)."""
# pylint: disable=too-many-locals
c = client.Client(device=device_type())
lines = (line.strip() for line in sys.stdin) # strip whitespace
lines = (line for line in lines if line) # skip empty lines
identities = []
stanza_map = {}
for line in lines:
log.debug("got %r", line)
if line == "-> done":
break
if line.startswith("-> add-identity "):
encoded = line.split(" ")[-1].lower()
data = bech32_decode("age-plugin-trezor-", encoded)
identity = client.create_identity(data.decode())
identities.append(identity)
elif line.startswith("-> recipient-stanza "):
file_index, tag, *args = line.split(" ")[2:]
body = next(lines)
if tag != "X25519":
continue
peer_pubkey = base64_decode(args[0])
encrypted = base64_decode(body)
stanza_map.setdefault(file_index, []).append((peer_pubkey, encrypted))
for file_index, stanzas in stanza_map.items():
_handle_single_file(file_index, stanzas, identities, c)
sys.stdout.write('-> done\n\n')
sys.stdout.flush()
sys.stdout.close()
def _handle_single_file(file_index, stanzas, identities, c):
d = c.device.__class__.__name__
msg = base64_encode(f'Please confirm decryption on {d} device...'.encode())
for peer_pubkey, encrypted in stanzas:
for identity in identities:
sys.stdout.write(f'-> msg\n{msg}\n')
sys.stdout.flush()
key = c.ecdh(identity=identity, peer_pubkey=peer_pubkey)
result = decrypt(key=key, encrypted=encrypted)
if not result:
continue
sys.stdout.write(f'-> file-key {file_index}\n{base64_encode(result)}\n')
sys.stdout.flush()
return
def main(device_type):
"""Parse command-line arguments."""
p = argparse.ArgumentParser()
agent_package = device_type.package_name()
resources_map = {r.key: r for r in pkg_resources.require(agent_package)}
resources = [resources_map[agent_package], resources_map['libagent']]
versions = '\n'.join('{}={}'.format(r.key, r.version) for r in resources)
p.add_argument('--version', help='print the version info',
action='version', version=versions)
p.add_argument('-i', '--identity')
p.add_argument('-v', '--verbose', default=0, action='count')
p.add_argument('--age-plugin')
args = p.parse_args()
log_path = os.environ.get("TREZOR_AGE_PLUGIN_LOG")
util.setup_logging(verbosity=2, filename=log_path)
log.debug("starting age plugin: %s", args)
device_type.ui = device.ui.UI(device_type=device_type, config=vars(args))
try:
if args.identity:
run_pubkey(device_type=device_type, args=args)
elif args.age_plugin:
run_decrypt(device_type=device_type, args=args)
except Exception as e: # pylint: disable=broad-except
log.exception("age plugin failed: %s", e)
log.debug("closing age plugin")

48
libagent/age/client.py Normal file
View File

@@ -0,0 +1,48 @@
"""Device abstraction layer for AGE operations."""
import logging
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from ..device import interface
log = logging.getLogger(__name__)
def create_identity(user_id):
"""Create AGE identity for hardware device."""
result = interface.Identity(identity_str='age://', curve_name="ed25519")
result.identity_dict['host'] = user_id
return result
class Client:
"""Sign messages and get public keys from a hardware device."""
def __init__(self, device):
"""C-tor."""
self.device = device
def pubkey(self, identity, ecdh=False):
"""Return public key as VerifyingKey object."""
with self.device:
pubkey = bytes(self.device.pubkey(ecdh=ecdh, identity=identity))
assert len(pubkey) == 32
return pubkey
def ecdh(self, identity, peer_pubkey):
"""Derive shared secret using ECDH from peer public key."""
log.info('please confirm AGE decryption on %s for "%s"...',
self.device, identity.to_string())
with self.device:
assert len(peer_pubkey) == 32
result, self_pubkey = self.device.ecdh_with_pubkey(
pubkey=(b"\x40" + peer_pubkey), identity=identity)
assert result[:1] == b"\x04"
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=((peer_pubkey + self_pubkey)),
info=b"age-encryption.org/v1/X25519")
return hkdf.derive(result[1:])

View File

@@ -5,8 +5,8 @@ import logging
import ecdsa
from . import interface
from .. import formats
from . import interface
log = logging.getLogger(__name__)
@@ -39,13 +39,17 @@ class FakeDevice(interface.Device):
self.vk = self.sk.get_verifying_key()
return self
def close(self):
"""Close the device."""
def pubkey(self, identity, ecdh=False):
"""Return public key."""
_verify_support(identity)
data = self.vk.to_string()
x, y = data[:32], data[32:]
prefix = bytearray([2 + (bytearray(y)[0] & 1)])
return bytes(prefix) + x
pubkey = bytes(prefix) + x
return formats.decompress_pubkey(pubkey=pubkey, curve_name=identity.curve_name)
def sign(self, identity, blob):
"""Sign given blob and return the signature (as bytes)."""

View File

@@ -79,7 +79,7 @@ class Identity:
def to_string(self):
"""Return identity serialized to string."""
return u'<{}|{}>'.format(identity_to_string(self.identity_dict), self.curve_name)
return '<{}|{}>'.format(identity_to_string(self.identity_dict), self.curve_name)
def get_bip32_address(self, ecdh=False):
"""Compute BIP32 derivation address according to SLIP-0013/0017."""

143
libagent/device/jade.py Normal file
View File

@@ -0,0 +1,143 @@
"""Jade-related code (see https://blockstream.com/jade/)."""
import logging
import ecdsa
import semver
from .. import formats, util
from . import interface
log = logging.getLogger(__name__)
def _verify_support(identity, ecdh):
"""Make sure the device supports given configuration."""
if identity.get_curve_name(ecdh=ecdh) != formats.CURVE_NIST256:
raise NotImplementedError(
'Unsupported elliptic curve: {}'.format(identity.curve_name))
class BlockstreamJade(interface.Device):
"""Connection to Blockstream Jade device."""
MIN_SUPPORTED_FW_VERSION = semver.VersionInfo(0, 1, 33)
DEVICE_IDS = [(0x10c4, 0xea60), (0x1a86, 0x55d4)]
connection = None
@classmethod
def package_name(cls):
"""Python package name (at PyPI)."""
return 'jade-agent'
def connect(self):
"""Connect to the first matching device."""
# pylint: disable=import-error
from jadepy import JadeAPI
from serial.tools import list_ports
# Return the existing connection if we have one
if BlockstreamJade.connection is not None:
return BlockstreamJade.connection
# Jade is a serial (over usb) device, it shows as a serial/com port device.
# Scan com ports looking for the relevant vid and pid, and connect to the
# first matching device. Then call 'auth_user' - this usually requires network
# access in order to unlock the device with a PIN and the remote blind pinserver.
for devinfo in list_ports.comports():
device_product_key = (devinfo.vid, devinfo.pid)
if device_product_key in self.DEVICE_IDS:
try:
jade = JadeAPI.create_serial(devinfo.device)
# Monkey-patch a no-op 'close()' method to suppress logged errors
jade.close = lambda: log.debug("Close called")
# Connect and fetch version info
jade.connect()
verinfo = jade.get_version_info()
# Check minimum supported firmware version (ignore candidate/build parts)
fwversion = semver.VersionInfo.parse(verinfo['JADE_VERSION'])
if self.MIN_SUPPORTED_FW_VERSION > fwversion.finalize_version():
msg = ('Outdated {} firmware for device. Please update using'
' a Blockstream Green companion app')
raise ValueError(msg.format(fwversion))
# Authenticate the user (unlock with pin)
# NOTE: usually requires network access unless already unlocked
# (or temporary 'Emergency Restore' wallet is already in use).
network = 'testnet' if verinfo.get('JADE_NETWORKS') == 'TEST' else 'mainnet'
while not jade.auth_user(network):
log.warning("PIN incorrect, please try again")
# Cache the connection to jade
BlockstreamJade.connection = jade
return jade
except Exception as e:
raise interface.NotFoundError(
'{} not connected: "{}"'.format(self, e))
return None
@staticmethod
def _get_identity_string(identity):
return interface.identity_to_string(identity.identity_dict)
@staticmethod
def _load_uncompressed_pubkey(pubkey, curve_name):
assert curve_name == formats.CURVE_NIST256
assert len(pubkey) == 65 and pubkey[0] == 0x04
curve = ecdsa.NIST256p
point = ecdsa.ellipticcurve.Point(curve.curve,
util.bytes2num(pubkey[1:33]),
util.bytes2num(pubkey[33:65]))
return ecdsa.VerifyingKey.from_public_point(point, curve=curve,
hashfunc=formats.hashfunc)
def pubkey(self, identity, ecdh=False):
"""Get PublicKey object for specified BIP32 address and elliptic curve."""
_verify_support(identity, ecdh)
identity_string = self._get_identity_string(identity)
curve_name = identity.get_curve_name(ecdh=ecdh)
key_type = 'slip-0017' if ecdh else 'slip-0013'
log.debug('"%s" getting %s public key (%s) from %s',
identity_string, key_type, curve_name, self)
result = self.conn.get_identity_pubkey(identity_string, curve_name, key_type)
log.debug('result: %s', result)
assert len(result) == 33 or len(result) == 65
convert_pubkey = (formats.decompress_pubkey
if len(result) == 33 else
self._load_uncompressed_pubkey)
return convert_pubkey(pubkey=result, curve_name=curve_name)
def sign(self, identity, blob):
"""Sign given blob and return the signature (as bytes)."""
_verify_support(identity, ecdh=False)
identity_string = self._get_identity_string(identity)
curve_name = identity.get_curve_name(ecdh=False)
log.debug('"%s" signing %r (%s) on %s',
identity_string, blob, curve_name, self)
result = self.conn.sign_identity(identity_string, curve_name, blob)
log.debug('result: %s', result)
signature = result['signature']
assert len(signature) == 64 or (len(signature) == 65 and signature[0] == 0x00)
if len(signature) == 65:
signature = signature[1:]
return signature
def ecdh(self, identity, pubkey):
"""Get shared session key using Elliptic Curve Diffie-Hellman."""
_verify_support(identity, ecdh=True)
identity_string = self._get_identity_string(identity)
curve_name = identity.get_curve_name(ecdh=True)
log.debug('"%s" shared session key (%s) for %r from %s',
identity_string, curve_name, pubkey, self)
result = self.conn.get_identity_shared_key(identity_string, curve_name, pubkey)
log.debug('result: %s', result)
return result

View File

@@ -1,7 +1,7 @@
"""KeepKey-related code (see https://www.keepkey.com/)."""
from . import trezor
from .. import formats
from . import trezor
def _verify_support(identity, ecdh):

View File

@@ -2,13 +2,23 @@
# pylint: disable=unused-import,import-error
from keepkeylib.client import CallException, PinException
from keepkeylib.client import CallException
from keepkeylib.client import KeepKeyClient as Client
from keepkeylib.client import PinException
from keepkeylib.messages_pb2 import PassphraseAck, PinMatrixAck
from keepkeylib.transport_hid import HidTransport
from keepkeylib.transport_webusb import WebUsbTransport
from keepkeylib.types_pb2 import IdentityType
get_public_node = Client.get_public_node
sign_identity = Client.sign_identity
Client.state = None
def find_device():
"""Returns first USB HID transport."""
return next(HidTransport(p) for p in HidTransport.enumerate())
"""Returns first WebUSB or HID transport."""
for d in WebUsbTransport.enumerate():
return WebUsbTransport(d)
for d in HidTransport.enumerate():
return HidTransport(d)

View File

@@ -6,6 +6,7 @@ import struct
from ledgerblue import comm # pylint: disable=import-error
from .. import formats
from . import interface
log = logging.getLogger(__name__)
@@ -64,7 +65,9 @@ class LedgerNanoS(interface.Device):
log.debug('apdu: %r', apdu)
result = bytearray(self.conn.exchange(bytes(apdu)))
log.debug('result: %r', result)
return _convert_public_key(curve_name, result[1:])
return formats.decompress_pubkey(
pubkey=_convert_public_key(curve_name, result[1:]),
curve_name=identity.curve_name)
def sign(self, identity, blob):
"""Sign given blob and return the signature (as bytes)."""

377
libagent/device/onlykey.py Normal file
View File

@@ -0,0 +1,377 @@
# pylint: disable=too-many-locals,too-many-statements,too-many-branches
# pylint: disable=attribute-defined-outside-init
"""OnlyKey-related code (see https://www.onlykey.io/)."""
import codecs
import hashlib
import logging
import time
import ecdsa
import nacl.signing
import unidecode
from . import interface
# import pgpy
# from pgpy import PGPKey
log = logging.getLogger(__name__)
class OnlyKey(interface.Device):
"""Connection to OnlyKey device."""
@classmethod
def package_name(cls):
"""Python package name (at PyPI)."""
return 'onlykey-agent'
@property
def _defs(self):
from . import onlykey_defs
return onlykey_defs
def connect(self):
"""Enumerate and connect to the first USB HID interface."""
try:
self.device_name = 'OnlyKey'
self.ok = self._defs.OnlyKey()
self.ok.set_time(time.time())
self.okversion = self.ok.read_string(timeout_ms=500)
self.okversion = self.okversion[8:]
self.skeyslot = 132
self.dkeyslot = 132
except Exception as e:
raise interface.NotFoundError('{} not connected: "{}"') from e
def set_skey(self, skey):
"""Set signing key to use."""
self.skeyslot = skey
log.debug('Setting skey slot = %s', skey)
def set_dkey(self, dkey):
"""Set decryption key to use."""
self.dkeyslot = dkey
log.debug('Setting dkey slot = %s', dkey)
def import_pub(self, pubkey):
"""Import PGP public key."""
self.import_pubkey = pubkey
log.debug('Public key to import = %s', pubkey)
# self.import_pubkey_obj, _ = pgpy.PGPKey.from_blob(pubkey)
# self.import_pubkey_bytes = bytes(self.import_pubkey_obj)
def get_sk_dk(self):
"""Get default signing key and decryption key slots."""
self.set_skey(132)
self.set_dkey(132)
def sig_hash(self, sighash):
"""Set signature hashing algorithm to use."""
if sighash in (b'rsa-sha2-512', b'rsa-sha2-256'):
self.sighash = sighash
log.info('Setting RSA signature Hash Type =%s', sighash)
def close(self):
"""Close connection."""
log.info('disconnected from %s', self.device_name)
self.ok.close()
def pubkey(self, identity, ecdh=False):
"""Return public key."""
curve_name = identity.get_curve_name(ecdh=ecdh)
if identity.identity_dict['proto'] != 'ssh' and hasattr('self', 'skeyslot') is False:
self.get_sk_dk()
if identity.identity_dict['proto'] != 'ssh' and self.dkeyslot < 132 and ecdh is True:
this_slot_id = self.dkeyslot
log.info('Key Slot =%s', this_slot_id)
elif self.skeyslot < 132 and ecdh is False:
this_slot_id = self.skeyslot
log.info('Key Slot =%s', this_slot_id)
else:
this_slot_id = 132
log.info('Requesting public key from key slot =%s', this_slot_id)
log.debug('"%s" getting public key (%s) from %s',
identity.to_string(), curve_name, self)
# Calculate hash for key derivation input data
if identity.identity_dict['proto'] == 'ssh':
if identity.identity_dict.get('user'):
id_parts = unidecode.unidecode(identity.identity_dict['user'] + '@' +
identity.identity_dict['host']).encode('ascii')
else:
id_parts = unidecode.unidecode(identity.identity_dict['host']).encode('ascii')
else:
id_parts = identity.to_bytes()
log.info('Identity to hash =%s', id_parts)
h1 = hashlib.sha256()
h1.update(id_parts)
data = h1.hexdigest()
log.info('Identity hash =%s', data)
if this_slot_id > 100:
if curve_name == 'curve25519':
data = '04' + data
elif curve_name == 'secp256k1':
# Not currently supported by agent, for future use
data = '03' + data
elif curve_name == 'nist256p1':
data = '02' + data
elif curve_name == 'ed25519':
data = '01' + data
else:
data = '00' + data
self.ok.send_message(msg=self._defs.Message.OKGETPUBKEY, slot_id=this_slot_id, payload=data)
log.info('curve name= %s', repr(curve_name))
t_end = time.time() + 1.5
if curve_name != 'rsa':
while time.time() < t_end:
try:
ok_pubkey = self.ok.read_bytes(timeout_ms=100)
if len(ok_pubkey) == 64 and len(set(ok_pubkey[0:63])) != 1:
break
except Exception as e:
raise interface.DeviceError(e)
log.info('received= %s', repr(ok_pubkey))
if len(set(ok_pubkey[34:63])) == 1:
if curve_name in ('nist256p1', 'secp256k1'):
raise interface.DeviceError("Public key curve does not match requested type")
ok_pubkey = bytearray(ok_pubkey[0:32])
log.info('Received Public Key generated by OnlyKey= %s', repr(ok_pubkey.hex()))
vk = nacl.signing.VerifyKey(bytes(ok_pubkey),
encoder=nacl.encoding.RawEncoder)
log.info('vk= %s', repr(vk))
# time.sleep(3)
return vk
elif len(ok_pubkey) == 64:
ok_pubkey = bytearray(ok_pubkey[0:64])
if curve_name in ('ed25519', 'curve25519'):
raise interface.DeviceError("Public key curve does not match requested type")
log.info('Received Public Key generated by OnlyKey= %s', repr(ok_pubkey))
if identity.curve_name == 'nist256p1':
vk = ecdsa.VerifyingKey.from_string(ok_pubkey, curve=ecdsa.NIST256p)
else:
vk = ecdsa.VerifyingKey.from_string(ok_pubkey, curve=ecdsa.SECP256k1)
return vk
else:
ok_pubkey = []
while time.time() < t_end:
try:
ok_pub_part = self.ok.read_bytes(timeout_ms=100)
if len(ok_pub_part) == 64 and len(set(ok_pub_part[0:63])) != 1:
log.info('received part= %s', repr(ok_pub_part))
ok_pubkey += ok_pub_part
# Todo know RSA type to know how many packets
except Exception as e:
raise interface.DeviceError(e)
log.info('received= %s', repr(ok_pubkey))
if len(ok_pubkey) == 256:
# https://security.stackexchange.com/questions/42268/how-do-i-get-the-rsa-bit-length-with-the-pubkey-and-openssl
ok_pubkey = b'\x00\x00\x00\x07' + b'\x73\x73\x68\x2d\x72\x73\x61' + \
b'\x00\x00\x00\x03' + b'\x01\x00\x01' + \
b'\x00\x00\x01\x01' + b'\x00' + bytes(ok_pubkey)
# ok_pubkey = b'\x00\x00\x00\x07' + b'\x72\x73\x61\x2d\x73\x68\x61\x32\x2d\x32\x35\x
# 36' + b'\x00\x00\x00\x03' + b'\x01\x00\x01' + b'\x00\x00\x01\x01' + b'\x00' + byte
# s(ok_pubkey)
elif len(ok_pubkey) == 512:
ok_pubkey = b'\x00\x00\x00\x07' + b'\x73\x73\x68\x2d\x72\x73\x61' + \
b'\x00\x00\x00\x03' + b'\x01\x00\x01' + \
b'\x00\x00\x02\x01' + b'\x00' + bytes(ok_pubkey)
else:
raise interface.DeviceError("Error response length is not a valid public key")
log.info('pubkey len = %s', len(ok_pubkey))
return ok_pubkey
def sign(self, identity, blob):
"""Sign given blob and return the signature (as bytes)."""
curve_name = identity.get_curve_name(ecdh=False)
log.debug('"%s" signing %r (%s) on %s',
identity.to_string(), blob, curve_name, self)
if identity.identity_dict['proto'] != 'ssh' and hasattr('self', 'skeyslot') is False:
self.get_sk_dk()
# Calculate hash for SSH signing
if curve_name == 'rsa':
if self.sighash == b'rsa-sha2-512':
log.info('rsa-sha2-512')
h1 = hashlib.sha512()
h1.update(blob)
data = h1.hexdigest()
data = codecs.decode(data, 'hex_codec')
elif self.sighash == b'rsa-sha2-256':
log.info('rsa-sha2-256')
h1 = hashlib.sha256()
h1.update(blob)
data = h1.hexdigest()
data = codecs.decode(data, 'hex_codec')
else:
# Calculate hash for key derivation input data
h1 = hashlib.sha256()
if identity.identity_dict['proto'] == 'ssh':
if identity.identity_dict.get('user'):
id_parts = unidecode.unidecode(identity.identity_dict['user'] + '@' +
identity.identity_dict['host']).encode('ascii')
else:
id_parts = unidecode.unidecode(identity.identity_dict['host']).encode('ascii')
else:
id_parts = identity.to_bytes()
h1.update(id_parts)
data = h1.hexdigest()
data = codecs.decode(data, 'hex_codec')
log.info('Identity to hash =%s', id_parts)
log.info('Identity hash =%s', data)
# Determine type of key to derive on OnlyKey for signature
# Slot 132 used for derived key, slots 101-116 used for stored ecc keys
# slots 1-4 used for stored RSA keys
if self.skeyslot == 132:
if curve_name == 'ed25519':
this_slot_id = 201
log.info('Key type ed25519')
elif curve_name == 'nist256p1':
this_slot_id = 202
log.info('Key type nistp256')
else:
this_slot_id = 203
log.info('Key type secp256k1')
# Send data and identity hash
raw_message = blob + data
else:
this_slot_id = self.skeyslot
# Send just data to sign
raw_message = blob
h2 = hashlib.sha256()
h2.update(raw_message)
d = h2.digest()
assert len(d) == 32
b1, b2, b3 = get_button(self, d[0]), get_button(self, d[15]), get_button(self, d[31])
log.info('Key Slot =%s', this_slot_id)
print('Enter the 3 digit challenge code on OnlyKey to authorize '+identity.to_string())
print('{} {} {}'.format(b1, b2, b3))
t_end = time.time() + 22
if curve_name != 'rsa':
self.ok.send_large_message2(msg=self._defs.Message.OKSIGN, payload=raw_message,
slot_id=this_slot_id)
while time.time() < t_end:
try:
result = self.ok.read_bytes(timeout_ms=100)
if len(result) == 64 and len(set(result[0:63])) != 1:
break
except Exception as e:
raise interface.DeviceError(e)
if len(result) >= 60:
log.info('received= %s', repr(result))
while len(result) < 64:
result.append(0)
log.info('disconnected from %s', self.device_name)
self.ok.close()
return bytes(result)
else:
self.ok.send_large_message2(msg=self._defs.Message.OKSIGN, payload=data,
slot_id=this_slot_id)
result = []
while time.time() < t_end:
try:
sig_part = self.ok.read_bytes(timeout_ms=100)
if len(sig_part) == 64 and len(set(sig_part[0:63])) != 1:
log.info('received part= %s', repr(sig_part))
result += sig_part
t_end = time.time() + 1
# Todo know RSA type to know how many packets
except Exception as e:
raise interface.DeviceError(e)
log.info('received= %s', repr(result))
return bytes(result)
raise Exception('failed to sign challenge')
def ecdh(self, identity, pubkey):
"""Get shared session key using Elliptic Curve Diffie-Hellman."""
curve_name = identity.get_curve_name(ecdh=True)
log.debug('"%s" shared session key (%s) for %r from %s',
identity.to_string(), curve_name, pubkey, self)
# Calculate hash for key derivation input data
h1 = hashlib.sha256()
if identity.identity_dict['proto'] == 'ssh':
if identity.identity_dict.get('user'):
id_parts = unidecode.unidecode(identity.identity_dict['user'] + '@' +
identity.identity_dict['host']).encode('ascii')
else:
id_parts = unidecode.unidecode(identity.identity_dict['host']).encode('ascii')
else:
id_parts = identity.to_bytes()
h1.update(id_parts)
log.info('Identity to hash =%s', id_parts)
data = h1.hexdigest()
log.info('Identity hash =%s', data)
data = codecs.decode(data, 'hex_codec')
# Determine type of key to derive on OnlyKey for ecdh
# Slot 132 used for derived key, slots 101-116 used for stored ecc keys,
# slots 1-4 used for stored RSA keys
if self.dkeyslot == 132:
if curve_name == 'curve25519':
this_slot_id = 204
log.info('Key type curve25519')
elif curve_name == 'nist256p1':
this_slot_id = 202
log.info('Key type nistp256')
else:
this_slot_id = 203
log.info('Key type secp256k1')
raw_message = pubkey + data
else:
this_slot_id = self.dkeyslot
raw_message = pubkey
log.info('Key Slot =%s', this_slot_id)
log.info('data hash =%s', data)
h2 = hashlib.sha256()
h2.update(raw_message)
d = h2.digest()
assert len(d) == 32
b1, b2, b3 = get_button(self, d[0]), get_button(self, d[15]), get_button(self, d[31])
self.ok.send_large_message2(msg=self._defs.Message.OKDECRYPT, payload=raw_message,
slot_id=this_slot_id)
print('Enter the 3 digit challenge code on OnlyKey to authorize ' + identity.to_string())
print('{} {} {}'.format(b1, b2, b3))
t_end = time.time() + 22
if curve_name != 'rsa':
while time.time() < t_end:
try:
result = self.ok.read_bytes(timeout_ms=100)
if len(result) == 64 and len(set(result[0:63])) != 1:
break
except Exception as e:
raise interface.DeviceError(e)
if len(set(result[34:63])) == 1:
result = b'\x04' + bytes(result[0:32])
else:
result = []
while time.time() < t_end:
try:
dec_part = self.ok.read_bytes(timeout_ms=100)
if len(dec_part) == 64 and len(set(dec_part[0:63])) != 1:
log.info('received part= %s', repr(dec_part))
result += dec_part
t_end = time.time() + 1
# Todo know RSA type to know how many packets
except Exception as e:
raise interface.DeviceError(e)
log.info('received= %s', repr(result))
log.info('disconnected from %s', self.device_name)
self.ok.close()
return bytes(result)
def get_button(self, byte):
"""Return button number."""
if str(self.okversion) == 'v0.2-beta.8c':
return byte % 5 + 1
else:
return byte % 6 + 1

View File

@@ -0,0 +1,5 @@
"""OnlyKey-related definitions."""
# pylint: disable=unused-import,import-error,no-name-in-module
from onlykey import Message, OnlyKey

View File

@@ -5,6 +5,7 @@ import logging
import semver
from .. import formats
from . import interface
log = logging.getLogger(__name__)
@@ -26,7 +27,7 @@ class Trezor(interface.Device):
required_version = '>=1.4.0'
ui = None # can be overridden by device's users
cached_state = None
cached_session_id = None
def _verify_version(self, connection):
f = connection.features
@@ -54,11 +55,14 @@ class Trezor(interface.Device):
for _ in range(5): # Retry a few times in case of PIN failures
connection = self._defs.Client(transport=transport,
ui=self.ui,
state=self.__class__.cached_state)
session_id=self.__class__.cached_session_id)
self._verify_version(connection)
try:
connection.ping(msg='', pin_protection=True) # unlock PIN
# unlock PIN and passphrase
self._defs.get_address(connection,
"Testnet",
self._defs.PASSPHRASE_TEST_PATH)
return connection
except (self._defs.PinException, ValueError) as e:
log.error('Invalid PIN: %s, retrying...', e)
@@ -67,10 +71,11 @@ class Trezor(interface.Device):
log.exception('ping failed: %s', e)
connection.close() # so the next HID open() will succeed
raise
return None
def close(self):
"""Close connection."""
self.__class__.cached_state = self.conn.state
self.__class__.cached_session_id = self.conn.session_id
super().close()
def pubkey(self, identity, ecdh=False):
@@ -84,7 +89,8 @@ class Trezor(interface.Device):
n=addr,
ecdsa_curve_name=curve_name)
log.debug('result: %s', result)
return bytes(result.node.public_key)
pubkey = bytes(result.node.public_key)
return formats.decompress_pubkey(pubkey=pubkey, curve_name=identity.curve_name)
def _identity_proto(self, identity):
result = self._defs.IdentityType()
@@ -93,6 +99,11 @@ class Trezor(interface.Device):
return result
def sign(self, identity, blob):
"""Sign given blob and return the signature (as bytes)."""
sig, _ = self.sign_with_pubkey(identity, blob)
return sig
def sign_with_pubkey(self, identity, blob):
"""Sign given blob and return the signature (as bytes)."""
curve_name = identity.get_curve_name(ecdh=False)
log.debug('"%s" signing %r (%s) on %s',
@@ -107,7 +118,7 @@ class Trezor(interface.Device):
log.debug('result: %s', result)
assert len(result.signature) == 65
assert result.signature[:1] == b'\x00'
return bytes(result.signature[1:])
return bytes(result.signature[1:]), bytes(result.public_key)
except self._defs.TrezorFailure as e:
msg = '{} error: {}'.format(self, e)
log.debug(msg, exc_info=True)
@@ -115,6 +126,11 @@ class Trezor(interface.Device):
def ecdh(self, identity, pubkey):
"""Get shared session key using Elliptic Curve Diffie-Hellman."""
session_key, _ = self.ecdh_with_pubkey(identity, pubkey)
return session_key
def ecdh_with_pubkey(self, identity, pubkey):
"""Get shared session key using Elliptic Curve Diffie-Hellman & self public key."""
curve_name = identity.get_curve_name(ecdh=True)
log.debug('"%s" shared session key (%s) for %r from %s',
identity.to_string(), curve_name, pubkey, self)
@@ -127,7 +143,11 @@ class Trezor(interface.Device):
log.debug('result: %s', result)
assert len(result.session_key) in {65, 33} # NIST256 or Curve25519
assert result.session_key[:1] == b'\x04'
return bytes(result.session_key)
self_pubkey = result.public_key
if self_pubkey:
self_pubkey = bytes(self_pubkey[1:])
return bytes(result.session_key), self_pubkey
except self._defs.TrezorFailure as e:
msg = '{} error: {}'.format(self, e)
log.debug(msg, exc_info=True)

View File

@@ -1,20 +1,19 @@
"""TREZOR-related definitions."""
# pylint: disable=unused-import,import-error,no-name-in-module,no-member
import os
import logging
import os
import mnemonic
import semver
import trezorlib
from trezorlib.btc import get_address, get_public_node
from trezorlib.client import PASSPHRASE_TEST_PATH
from trezorlib.client import TrezorClient as Client
from trezorlib.exceptions import TrezorFailure, PinException
from trezorlib.transport import get_transport
from trezorlib.exceptions import PinException, TrezorFailure
from trezorlib.messages import IdentityType
from trezorlib.btc import get_public_node
from trezorlib.misc import sign_identity, get_ecdh_session_key
from trezorlib.misc import get_ecdh_session_key, sign_identity
from trezorlib.transport import get_transport
log = logging.getLogger(__name__)
@@ -25,6 +24,7 @@ def find_device():
If unset, picks first connected device.
"""
try:
return get_transport(os.environ.get("TREZOR_PATH"))
return get_transport(os.environ.get("TREZOR_PATH"), prefix_search=True)
except Exception as e: # pylint: disable=broad-except
log.debug("Failed to find a Trezor device: %s", e)
return None

View File

@@ -3,9 +3,16 @@
import logging
import os
import subprocess
import sys
from .. import util
try:
from trezorlib.client import PASSPHRASE_ON_DEVICE
except ImportError:
PASSPHRASE_ON_DEVICE = object()
log = logging.getLogger(__name__)
@@ -23,7 +30,8 @@ class UI:
default_pinentry)
self.options_getter = create_default_options_getter()
self.device_name = device_type.__name__
self.cached_passphrase_ack = None
self.cached_passphrase_ack = util.ExpiringCache(
seconds=float(config.get('cache_expiry_seconds', 'inf')))
def get_pin(self, _code=None):
"""Ask the user for (scrambled) PIN."""
@@ -40,18 +48,24 @@ class UI:
binary=self.pin_entry_binary,
options=self.options_getter())
def get_passphrase(self, prompt='Passphrase:'):
def get_passphrase(self, prompt='Passphrase:', available_on_device=False):
"""Ask the user for passphrase."""
passphrase = None
if self.cached_passphrase_ack:
passphrase = self.cached_passphrase_ack.get()
if passphrase is None:
passphrase = interact(
title='{} passphrase'.format(self.device_name),
prompt=prompt,
description=None,
binary=self.passphrase_entry_binary,
options=self.options_getter())
env_passphrase = os.environ.get("TREZOR_PASSPHRASE")
if env_passphrase is not None:
passphrase = env_passphrase
elif available_on_device:
passphrase = PASSPHRASE_ON_DEVICE
else:
passphrase = interact(
title='{} passphrase'.format(self.device_name),
prompt=prompt,
description=None,
binary=self.passphrase_entry_binary,
options=self.options_getter())
if self.cached_passphrase_ack:
self.cached_passphrase_ack.set(passphrase)
return passphrase
@@ -64,11 +78,12 @@ class UI:
def create_default_options_getter():
"""Return current TTY and DISPLAY settings for GnuPG pinentry."""
options = []
try:
ttyname = subprocess.check_output(args=['tty']).strip()
options.append(b'ttyname=' + ttyname)
except subprocess.CalledProcessError as e:
log.warning('no TTY found: %s', e)
if sys.stdin.isatty(): # short-circuit calling `tty`
try:
ttyname = subprocess.check_output(args=['tty']).strip()
options.append(b'ttyname=' + ttyname)
except subprocess.CalledProcessError as e:
log.warning('no TTY found: %s', e)
display = os.environ.get('DISPLAY')
if display is not None:

View File

@@ -5,7 +5,7 @@ import io
import logging
import ecdsa
import ed25519
import nacl.signing
from . import util
@@ -25,8 +25,10 @@ SSH_NIST256_DER_OCTET = b'\x04'
SSH_NIST256_KEY_PREFIX = b'ecdsa-sha2-'
SSH_NIST256_CURVE_NAME = b'nistp256'
SSH_NIST256_KEY_TYPE = SSH_NIST256_KEY_PREFIX + SSH_NIST256_CURVE_NAME
SSH_NIST256_CERT_POSTFIX = b'-cert-v01@openssh.com'
SSH_NIST256_CERT_TYPE = SSH_NIST256_KEY_TYPE + SSH_NIST256_CERT_POSTFIX
SSH_ED25519_KEY_TYPE = b'ssh-ed25519'
SUPPORTED_KEY_TYPES = {SSH_NIST256_KEY_TYPE, SSH_ED25519_KEY_TYPE}
SUPPORTED_KEY_TYPES = {SSH_NIST256_KEY_TYPE, SSH_NIST256_CERT_TYPE, SSH_ED25519_KEY_TYPE}
hashfunc = hashlib.sha256
@@ -49,6 +51,7 @@ def parse_pubkey(blob):
The verifier returns the signatures in the required SSH format.
Currently, NIST256P1 and ED25519 elliptic curves are supported.
"""
# pylint: disable=too-many-locals
fp = fingerprint(blob)
s = io.BytesIO(blob)
key_type = util.read_frame(s)
@@ -57,10 +60,27 @@ def parse_pubkey(blob):
result = {'blob': blob, 'type': key_type, 'fingerprint': fp}
if key_type == SSH_NIST256_KEY_TYPE:
if key_type in (SSH_NIST256_KEY_TYPE, SSH_NIST256_CERT_TYPE):
if key_type == SSH_NIST256_CERT_TYPE:
_nonce = util.read_frame(s)
curve_name = util.read_frame(s)
log.debug('curve name: %s', curve_name)
point = util.read_frame(s)
if key_type == SSH_NIST256_CERT_TYPE:
_serial_number = util.recv(s, '>Q')
_type = util.recv(s, '>L')
_key_id = util.read_frame(s)
_valid_principals = util.read_frame(s)
_valid_after = util.recv(s, '>Q')
_valid_before = util.recv(s, '>Q')
_critical_options = util.read_frame(s)
_extensions = util.read_frame(s)
_reserved = util.read_frame(s)
_signature_key = util.read_frame(s)
_signature = util.read_frame(s)
assert s.read() == b''
_type, point = point[:1], point[1:]
assert _type == SSH_NIST256_DER_OCTET
@@ -88,8 +108,10 @@ def parse_pubkey(blob):
def ed25519_verify(sig, msg):
assert len(sig) == 64
vk = ed25519.VerifyingKey(pubkey)
vk.verify(sig, msg)
vk = nacl.signing.VerifyKey(bytes(pubkey),
encoder=nacl.encoding.RawEncoder)
vk.verify(msg, sig)
log.debug('verify signature')
return sig
result.update(curve=CURVE_ED25519, verifier=ed25519_verify)
@@ -99,9 +121,9 @@ def parse_pubkey(blob):
def _decompress_ed25519(pubkey):
"""Load public key from the serialized blob (stripping the prefix byte)."""
if pubkey[:1] == b'\x00':
if pubkey[:1] in {b'\x00', b'\x01'}:
# set by Trezor fsm_msgSignIdentity() and fsm_msgGetPublicKey()
return ed25519.VerifyingKey(pubkey[1:])
return nacl.signing.VerifyKey(pubkey[1:], encoder=nacl.encoding.RawEncoder)
else:
return None
@@ -161,8 +183,8 @@ def serialize_verifying_key(vk):
Currently, NIST256P1 and ED25519 elliptic curves are supported.
Raise TypeError on unsupported key format.
"""
if isinstance(vk, ed25519.keys.VerifyingKey):
pubkey = vk.to_bytes()
if isinstance(vk, nacl.signing.VerifyKey):
pubkey = vk.encode(encoder=nacl.encoding.RawEncoder)
key_type = SSH_ED25519_KEY_TYPE
blob = util.frame(SSH_ED25519_KEY_TYPE) + util.frame(pubkey)
return key_type, blob
@@ -188,7 +210,7 @@ def export_public_key(vk, label):
key_type, blob = serialize_verifying_key(vk)
log.debug('fingerprint: %s', fingerprint(blob))
b64 = base64.b64encode(blob).decode('ascii')
return u'{} {} {}\n'.format(key_type.decode('ascii'), b64, label)
return '{} {} {}\n'.format(key_type.decode('ascii'), b64, label)
def import_public_key(line):

View File

@@ -18,12 +18,12 @@ import subprocess
import sys
import time
import daemon
import pkg_resources
import semver
from . import agent, client, encode, keyring, protocol
from .. import device, formats, server, util
from . import agent, client, encode, keyring, protocol
log = logging.getLogger(__name__)
@@ -122,7 +122,7 @@ def run_init(device_type, args):
verify_gpg_version()
# Prepare new GPG home directory for hardware-based identity
device_name = os.path.basename(sys.argv[0]).rsplit('-', 1)[0]
device_name = device_type.package_name().rsplit('-', 1)[0]
log.info('device name: %s', device_name)
homedir = args.homedir
if not homedir:
@@ -143,7 +143,7 @@ def run_init(device_type, args):
# Prepare GPG agent invocation script (to pass the PATH from environment).
with open(os.path.join(homedir, 'run-agent.sh'), 'w') as f:
f.write(r"""#!/bin/sh
export PATH={0}
export PATH="{0}"
{1} \
-vv \
--pin-entry-binary={pin_entry_binary} \
@@ -226,6 +226,8 @@ def run_agent(device_type):
p.add_argument('-v', '--verbose', default=0, action='count')
p.add_argument('--server', default=False, action='store_true',
help='Use stdin/stdout for communication with GPG.')
p.add_argument('--daemon', default=False, action='store_true',
help='Daemonize the agent.')
p.add_argument('--pin-entry-binary', type=str, default='pinentry',
help='Path to PIN entry UI helper.')
@@ -236,6 +238,15 @@ def run_agent(device_type):
args, _ = p.parse_known_args()
if args.daemon:
with daemon.DaemonContext():
run_agent_internal(args, device_type)
else:
run_agent_internal(args, device_type)
def run_agent_internal(args, device_type):
"""Actually run the server."""
assert args.homedir
log_file = os.path.join(args.homedir, 'gpg-agent.log')
@@ -249,8 +260,6 @@ def run_agent(device_type):
pubkey_bytes = keyring.export_public_keys(env=env)
device_type.ui = device.ui.UI(device_type=device_type,
config=vars(args))
device_type.ui.cached_passphrase_ack = util.ExpiringCache(
seconds=float(args.cache_expiry_seconds))
handler = agent.Handler(device=device_type(),
pubkey_bytes=pubkey_bytes)
@@ -296,7 +305,7 @@ def main(device_type):
help='initialize hardware-based GnuPG identity')
p.add_argument('user_id')
p.add_argument('-e', '--ecdsa-curve', default='nist256p1')
p.add_argument('-t', '--time', type=int, default=int(time.time()))
p.add_argument('-t', '--time', type=int, default=0)
p.add_argument('-v', '--verbose', default=0, action='count')
p.add_argument('-s', '--subkey', default=False, action='store_true')
@@ -318,7 +327,5 @@ def main(device_type):
args = parser.parse_args()
device_type.ui = device.ui.UI(device_type=device_type, config=vars(args))
device_type.ui.cached_passphrase_ack = util.ExpiringCache(
seconds=float(args.cache_expiry_seconds))
return args.func(device_type=device_type, args=args)

View File

@@ -2,8 +2,8 @@
import binascii
import logging
from . import client, decode, keyring, protocol
from .. import util
from . import client, decode, keyring, protocol
log = logging.getLogger(__name__)
@@ -99,7 +99,7 @@ class Handler:
b'SETHASH': lambda _, args: self.set_hash(*args),
b'PKSIGN': lambda conn, _: self.pksign(conn),
b'PKDECRYPT': lambda conn, _: self.pkdecrypt(conn),
b'HAVEKEY': lambda _, args: self.have_key(*args),
b'HAVEKEY': lambda conn, args: self.have_key(conn, *args),
b'KEYINFO': _key_info,
b'SCD': self.handle_scd,
b'GET_PASSPHRASE': self.handle_get_passphrase,
@@ -198,8 +198,16 @@ class Handler:
ec_point = self.client.ecdh(identity=identity, pubkey=remote_pubkey)
keyring.sendline(conn, b'D ' + _serialize_point(ec_point))
def have_key(self, *keygrips):
def have_key(self, conn, *keygrips):
"""Check if any keygrip corresponds to a TREZOR-based key."""
if len(keygrips) == 1 and keygrips[0].startswith(b"--list="):
# Support "fast-path" key listing:
# https://dev.gnupg.org/rG40da61b89b62dcb77847dc79eb159e885f52f817
keygrips = list(decode.iter_keygrips(pubkey_bytes=self.pubkey_bytes))
log.debug('keygrips: %r', keygrips)
keyring.sendline(conn, b'D ' + util.assuan_serialize(b''.join(keygrips)))
return
for keygrip in keygrips:
try:
self.get_identity(keygrip=keygrip)

View File

@@ -25,9 +25,7 @@ class Client:
def pubkey(self, identity, ecdh=False):
"""Return public key as VerifyingKey object."""
with self.device:
pubkey = self.device.pubkey(ecdh=ecdh, identity=identity)
return formats.decompress_pubkey(
pubkey=pubkey, curve_name=identity.curve_name)
return self.device.pubkey(ecdh=ecdh, identity=identity)
def sign(self, identity, digest):
"""Sign the digest and return a serialized signature."""

View File

@@ -7,10 +7,10 @@ import logging
import struct
import ecdsa
import ed25519
import nacl.signing
from . import protocol
from .. import util
from . import protocol
log = logging.getLogger(__name__)
@@ -67,7 +67,8 @@ def _parse_ed25519_pubkey(mpi):
prefix, value = util.split_bits(mpi, 8, 256)
if prefix != 0x40:
raise ValueError('Invalid MPI prefix: {}'.format(prefix))
return ed25519.VerifyingKey(util.num2bytes(value, size=32))
vk = nacl.signing.VerifyKey(util.num2bytes(value, size=32), encoder=nacl.encoding.RawEncoder)
return vk
SUPPORTED_CURVES = {
@@ -281,18 +282,20 @@ HASH_ALGORITHMS = {
}
def load_by_keygrip(pubkey_bytes, keygrip):
"""Return public key and first user ID for specified keygrip."""
def _parse_pubkey_packets(pubkey_bytes):
stream = io.BytesIO(pubkey_bytes)
packets = list(parse_packets(stream))
packets_per_pubkey = []
for p in packets:
for p in parse_packets(stream):
if p['type'] == 'pubkey':
# Add a new packet list for each pubkey.
packets_per_pubkey.append([])
packets_per_pubkey[-1].append(p)
return packets_per_pubkey
for packets in packets_per_pubkey:
def load_by_keygrip(pubkey_bytes, keygrip):
"""Return public key and first user ID for specified keygrip."""
for packets in _parse_pubkey_packets(pubkey_bytes):
user_ids = [p for p in packets if p['type'] == 'user_id']
for p in packets:
if p.get('keygrip') == keygrip:
@@ -300,6 +303,15 @@ def load_by_keygrip(pubkey_bytes, keygrip):
raise KeyError('{} keygrip not found'.format(util.hexlify(keygrip)))
def iter_keygrips(pubkey_bytes):
"""Iterate over all keygrips in this pubkey."""
for packets in _parse_pubkey_packets(pubkey_bytes):
for p in packets:
keygrip = p.get('keygrip')
if keygrip:
yield keygrip
def load_signature(stream, original_data):
"""Load signature from stream, and compute GPG digest for verification."""
signature, = list(parse_packets((stream)))

View File

@@ -2,8 +2,8 @@
import io
import logging
from . import decode, keyring, protocol
from .. import util
from . import decode, keyring, protocol
log = logging.getLogger(__name__)

View File

@@ -17,8 +17,11 @@ log = logging.getLogger(__name__)
def check_output(args, env=None, sp=subprocess):
"""Call an external binary and return its stdout."""
log.debug('calling %s with env %s', args, env)
output = sp.check_output(args=args, env=env)
p = sp.Popen(args=args, env=env, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE)
(output, error) = p.communicate()
log.debug('output: %r', output)
if error:
log.debug('error: %r', error)
return output
@@ -173,9 +176,7 @@ def sign_digest(sock, keygrip, digest, sp=subprocess, environ=None):
assert communicate(sock, 'PKSIGN') == b'OK'
while True:
line = recvline(sock).strip()
if line.startswith(b'S PROGRESS'):
continue
else:
if not line.startswith(b'S PROGRESS'):
break
line = unescape(line)
log.debug('unescaped: %r', line)
@@ -224,7 +225,7 @@ def gpg_version(sp=subprocess):
"""Get a keygrip of the primary GPG key of the specified user."""
args = gpg_command(['--version'])
output = check_output(args=args, sp=sp)
line = output.split(b'\n')[0] # b'gpg (GnuPG) 2.1.11'
line = output.split(b'\n', maxsplit=1)[0] # b'gpg (GnuPG) 2.1.11'
line = line.split(b' ')[-1] # b'2.1.11'
line = line.split(b'-')[0] # remove trailing version parts
return line.split(b'v')[-1] # remove 'v' prefix

View File

@@ -5,6 +5,8 @@ import hashlib
import logging
import struct
import nacl.signing
from .. import formats, util
log = logging.getLogger(__name__)
@@ -92,7 +94,7 @@ def _serialize_nist256(vk):
def _serialize_ed25519(vk):
return mpi((0x40 << 256) |
util.bytes2num(vk.to_bytes()))
util.bytes2num(vk.encode(encoder=nacl.encoding.RawEncoder)))
def _compute_keygrip(params):
@@ -131,7 +133,7 @@ def keygrip_ed25519(vk):
['b', util.num2bytes(0x2DFC9311D490018C7338BF8688861767FF8FF5B2BEBE27548A14B235ECA6874A, size=32)], # nopep8
['g', util.num2bytes(0x04216936D3CD6E53FEC0A4E231FDD6DC5C692CC7609525A7B2C9562D608F25D51A6666666666666666666666666666666666666666666666666666666666666658, size=65)], # nopep8
['n', util.num2bytes(0x1000000000000000000000000000000014DEF9DEA2F79CD65812631A5CF5D3ED, size=32)], # nopep8
['q', vk.to_bytes()],
['q', vk.encode(encoder=nacl.encoding.RawEncoder)],
])
@@ -144,7 +146,7 @@ def keygrip_curve25519(vk):
['b', b'\x01'],
['g', util.num2bytes(0x04000000000000000000000000000000000000000000000000000000000000000920ae19a1b8a086b4e01edd2c7748d14c923d4d7e6d7c61b229e9c5a27eced3d9, size=65)], # nopep8
['n', util.num2bytes(0x1000000000000000000000000000000014DEF9DEA2F79CD65812631A5CF5D3ED, size=32)], # nopep8
['q', vk.to_bytes()],
['q', vk.encode(encoder=nacl.encoding.RawEncoder)],
])

Binary file not shown.

View File

@@ -1,11 +1,10 @@
import glob
import io
import os
import pathlib
import pytest
from .. import decode, protocol
from ... import util
from .. import decode, protocol
def test_subpackets():
@@ -30,8 +29,8 @@ def test_mpi():
assert decode.parse_mpis(util.Reader(s), n=2) == [0x123, 5]
cwd = os.path.join(os.path.dirname(__file__))
input_files = glob.glob(os.path.join(cwd, '*.gpg'))
cwd = pathlib.Path(__file__).parent
input_files = cwd.glob('*.gpg')
@pytest.fixture(params=input_files)
@@ -60,3 +59,20 @@ def test_has_custom_subpacket():
def test_load_by_keygrip_missing():
with pytest.raises(KeyError):
decode.load_by_keygrip(pubkey_bytes=b'', keygrip=b'')
def test_keygrips():
pubkey_bytes = (cwd / "romanz-pubkey.gpg").open("rb").read()
keygrips = list(decode.iter_keygrips(pubkey_bytes))
assert [k.hex() for k in keygrips] == [
'7b2497258d76bc6539ed88d018cd1c739e2dbb6c',
'30ae97f3d8e0e34c5ed80e1715fd442ca24c0a8e',
]
for keygrip in keygrips:
pubkey_dict, user_ids = decode.load_by_keygrip(pubkey_bytes, keygrip)
assert pubkey_dict['keygrip'] == keygrip
assert [u['value'] for u in user_ids] == [
b'Roman Zeyde <roman.zeyde@gmail.com>',
b'Roman Zeyde <me@romanzey.de>',
]

View File

@@ -53,6 +53,14 @@ class FakeSocket:
self.tx.write(data)
def mock_subprocess(output, error=b''):
sp = mock.Mock(spec=['Popen', 'PIPE'])
p = mock.Mock(spec=['communicate'])
sp.Popen.return_value = p
p.communicate.return_value = (output, error)
return sp
def test_sign_digest():
sock = FakeSocket()
sock.rx.write(b'OK Pleased to meet you, process XYZ\n')
@@ -61,10 +69,8 @@ def test_sign_digest():
sock.rx.seek(0)
keygrip = '1234'
digest = b'A' * 32
sp = mock.Mock(spec=['check_output'])
sp.check_output.return_value = '/dev/pts/0'
sig = keyring.sign_digest(sock=sock, keygrip=keygrip,
digest=digest, sp=sp,
digest=digest, sp=mock_subprocess('/dev/pts/0'),
environ={'DISPLAY': ':0'})
assert sig == (0x30313233343536373839414243444546,)
assert sock.tx.getvalue() == b'''RESET
@@ -85,8 +91,7 @@ def test_iterlines():
def test_get_agent_sock_path():
sp = mock.Mock(spec=['check_output'])
sp.check_output.return_value = b'''sysconfdir:/usr/local/etc/gnupg
sp = mock_subprocess(b'''sysconfdir:/usr/local/etc/gnupg
bindir:/usr/local/bin
libexecdir:/usr/local/libexec
libdir:/usr/local/lib/gnupg
@@ -96,6 +101,6 @@ dirmngr-socket:/run/user/1000/gnupg/S.dirmngr
agent-ssh-socket:/run/user/1000/gnupg/S.gpg-agent.ssh
agent-socket:/run/user/1000/gnupg/S.gpg-agent
homedir:/home/roman/.gnupg
'''
''')
expected = b'/run/user/1000/gnupg/S.gpg-agent'
assert keyring.get_agent_sock_path(sp=sp) == expected

View File

@@ -1,9 +1,9 @@
import ecdsa
import ed25519
import nacl.signing
import pytest
from .. import protocol
from ... import formats
from .. import protocol
def test_packet():
@@ -83,8 +83,8 @@ def test_nist256p1_ecdh():
def test_ed25519():
sk = ed25519.SigningKey(b'\x00' * 32)
vk = sk.get_verifying_key()
sk = nacl.signing.SigningKey(b'\x00'*32, encoder=nacl.encoding.RawEncoder)
vk = sk.verify_key
pk = protocol.PublicKey(curve_name=formats.CURVE_ED25519,
created=42, verifying_key=vk)
assert repr(pk) == 'GPG public key ed25519/36B40FE6'
@@ -92,8 +92,8 @@ def test_ed25519():
def test_curve25519():
sk = ed25519.SigningKey(b'\x00' * 32)
vk = sk.get_verifying_key()
sk = nacl.signing.SigningKey(b'\x00'*32, encoder=nacl.encoding.RawEncoder)
vk = sk.verify_key
pk = protocol.PublicKey(curve_name=formats.ECDH_CURVE25519,
created=42, verifying_key=vk)
assert repr(pk) == 'GPG public key curve25519/69460384'

View File

@@ -159,7 +159,7 @@ def run_process(command, environ):
try:
p = subprocess.Popen(args=command, env=env)
except OSError as e:
raise OSError('cannot run %r: %s' % (command, e))
raise OSError('cannot run %r: %s' % (command, e)) from e
log.debug('subprocess %d is running', p.pid)
ret = p.wait()
log.debug('subprocess %d exited: %d', p.pid, ret)

View File

@@ -0,0 +1,109 @@
"""TREZOR support for Ed25519 signify signatures."""
import argparse
import binascii
import contextlib
import functools
import hashlib
import logging
import os
import re
import struct
import subprocess
import sys
import time
import pkg_resources
import semver
from .. import formats, server, util
from ..device import interface, ui
log = logging.getLogger(__name__)
def _create_identity(user_id):
result = interface.Identity(identity_str='signify://', curve_name='ed25519')
result.identity_dict['host'] = user_id
return result
class Client:
"""Sign messages and get public keys from a hardware device."""
def __init__(self, device):
"""C-tor."""
self.device = device
def pubkey(self, identity):
"""Return public key as VerifyingKey object."""
with self.device:
return bytes(self.device.pubkey(ecdh=False, identity=identity))
def sign_with_pubkey(self, identity, data):
"""Sign the data and return a signature."""
log.info('please confirm Signify signature on %s for "%s"...',
self.device, identity.to_string())
log.debug('signing data: %s', util.hexlify(data))
with self.device:
sig, pubkey = self.device.sign_with_pubkey(blob=data, identity=identity)
assert len(sig) == 64
assert len(pubkey) == 33
assert pubkey[:1] == b"\x00"
return sig, pubkey[1:]
def format_payload(pubkey, data):
"""See http://www.openbsd.org/papers/bsdcan-signify.html for details."""
keynum = hashlib.sha256(pubkey).digest()[:8]
return binascii.b2a_base64(b"Ed" + keynum + data).decode("ascii")
def run_pubkey(device_type, args):
"""Export hardware-based Signify public key."""
util.setup_logging(verbosity=args.verbose)
log.warning('This Signify tool is still in EXPERIMENTAL mode, '
'so please note that the key derivation, API, and features '
'may change without backwards compatibility!')
identity = _create_identity(user_id=args.user_id)
pubkey = Client(device=device_type()).pubkey(identity=identity)
comment = f'untrusted comment: identity {identity.to_string()}\n'
result = comment + format_payload(pubkey=pubkey, data=pubkey)
print(result, end="")
def run_sign(device_type, args):
"""Sign an input blob using Ed25519."""
util.setup_logging(verbosity=args.verbose)
identity = _create_identity(user_id=args.user_id)
data = sys.stdin.buffer.read()
sig, pubkey = Client(device=device_type()).sign_with_pubkey(identity, data)
pubkey_str = format_payload(pubkey=pubkey, data=pubkey)
comment = f'untrusted comment: pubkey {pubkey_str}'
result = comment + format_payload(pubkey=pubkey, data=sig)
print(result, end="")
def main(device_type):
"""Parse command-line arguments."""
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(title='Action', dest='action')
subparsers.required = True
p = subparsers.add_parser('pubkey')
p.add_argument('user_id')
p.add_argument('-v', '--verbose', default=0, action='count')
p.set_defaults(func=run_pubkey)
p = subparsers.add_parser('sign')
p.add_argument('user_id')
p.add_argument('-v', '--verbose', default=0, action='count')
p.set_defaults(func=run_sign)
args = parser.parse_args()
device_type.ui = ui.UI(device_type=device_type, config=vars(args))
device_type.ui.cached_passphrase_ack = util.ExpiringCache(seconds=float(60))
return args.func(device_type=device_type, args=args)

View File

@@ -11,9 +11,9 @@ import sys
import tempfile
import threading
import pkg_resources
import configargparse
import daemon
import pkg_resources
from .. import device, formats, server, util
from . import client, protocol
@@ -78,8 +78,7 @@ def create_agent_parser(device_type):
p.add_argument('--version', help='print the version info',
action='version', version=versions)
curve_names = [name for name in formats.SUPPORTED_CURVES]
curve_names = ', '.join(sorted(curve_names))
curve_names = ', '.join(sorted(formats.SUPPORTED_CURVES))
p.add_argument('-e', '--ecdsa-curve-name', metavar='CURVE',
default=formats.CURVE_NIST256,
help='specify ECDSA curve name: ' + curve_names)
@@ -270,13 +269,11 @@ def main(device_type):
identities = [device.interface.Identity(
identity_str=args.identity, curve_name=args.ecdsa_curve_name)]
for index, identity in enumerate(identities):
identity.identity_dict['proto'] = u'ssh'
identity.identity_dict['proto'] = 'ssh'
log.info('identity #%d: %s', index, identity.to_string())
# override default PIN/passphrase entry tools (relevant for TREZOR/Keepkey):
device_type.ui = device.ui.UI(device_type=device_type, config=vars(args))
device_type.ui.cached_passphrase_ack = util.ExpiringCache(
args.cache_expiry_seconds)
conn = JustInTimeConnection(
conn_factory=lambda: client.Client(device_type()),

View File

@@ -20,46 +20,63 @@ class Client:
def export_public_keys(self, identities):
"""Export SSH public keys from the device."""
public_keys = []
pubkeys = []
with self.device:
for i in identities:
pubkey = self.device.pubkey(identity=i)
vk = formats.decompress_pubkey(pubkey=pubkey,
curve_name=i.curve_name)
public_key = formats.export_public_key(vk=vk,
label=i.to_string())
public_keys.append(public_key)
return public_keys
vk = self.device.pubkey(identity=i)
label = i.to_string()
pubkey = formats.export_public_key(vk=vk, label=label)
pubkeys.append(pubkey)
return pubkeys
def sign_ssh_challenge(self, blob, identity):
"""Sign given blob using a private key on the device."""
msg = _parse_ssh_blob(blob)
log.debug('%s: user %r via %r (%r)',
msg['conn'], msg['user'], msg['auth'], msg['key_type'])
log.debug('nonce: %r', msg['nonce'])
fp = msg['public_key']['fingerprint']
log.debug('fingerprint: %s', fp)
log.debug('hidden challenge size: %d bytes', len(blob))
log.debug('blob: %r', blob)
msg = parse_ssh_blob(blob)
if msg['sshsig']:
log.info('please confirm "%s" signature for "%s" using %s...',
msg['namespace'], identity.to_string(), self.device)
else:
log.debug('%s: user %r via %r (%r)',
msg['conn'], msg['user'], msg['auth'], msg['key_type'])
log.debug('nonce: %r', msg['nonce'])
fp = msg['public_key']['fingerprint']
log.debug('fingerprint: %s', fp)
log.debug('hidden challenge size: %d bytes', len(blob))
log.info('please confirm user "%s" login to "%s" using %s...',
msg['user'].decode('ascii'), identity.to_string(),
self.device)
log.info('please confirm user "%s" login to "%s" using %s...',
msg['user'].decode('ascii'), identity.to_string(),
self.device)
with self.device:
return self.device.sign(blob=blob, identity=identity)
def _parse_ssh_blob(data):
def parse_ssh_blob(data):
"""Parse binary data into a dict."""
res = {}
i = io.BytesIO(data)
res['nonce'] = util.read_frame(i)
i.read(1) # SSH2_MSG_USERAUTH_REQUEST == 50 (from ssh2.h, line 108)
res['user'] = util.read_frame(i)
res['conn'] = util.read_frame(i)
res['auth'] = util.read_frame(i)
i.read(1) # have_sig == 1 (from sshconnect2.c, line 1056)
res['key_type'] = util.read_frame(i)
public_key = util.read_frame(i)
res['public_key'] = formats.parse_pubkey(public_key)
assert not i.read()
if data.startswith(b'SSHSIG'):
i = io.BytesIO(data[6:])
# https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.sshsig
res['sshsig'] = True
res['namespace'] = util.read_frame(i)
res['reserved'] = util.read_frame(i)
res['hashalg'] = util.read_frame(i)
res['message'] = util.read_frame(i)
else:
i = io.BytesIO(data)
res['sshsig'] = False
res['nonce'] = util.read_frame(i)
i.read(1) # SSH2_MSG_USERAUTH_REQUEST == 50 (from ssh2.h, line 108)
res['user'] = util.read_frame(i)
res['conn'] = util.read_frame(i)
res['auth'] = util.read_frame(i)
i.read(1) # have_sig == 1 (from sshconnect2.c, line 1056)
res['key_type'] = util.read_frame(i)
public_key = util.read_frame(i)
res['public_key'] = formats.parse_pubkey(public_key)
unparsed = i.read()
if unparsed:
log.warning('unparsed blob: %r', unparsed)
return res

View File

@@ -40,6 +40,8 @@ COMMANDS = dict(
SSH_AGENTC_ADD_RSA_ID_CONSTRAINED=24,
SSH2_AGENTC_ADD_ID_CONSTRAINED=25,
SSH_AGENTC_ADD_SMARTCARD_KEY_CONSTRAINED=26,
SSH_AGENTC_EXTENSION=27,
SSH_AGENT_EXTENSION_FAILURE=28,
)
@@ -86,6 +88,7 @@ class Handler:
msg_code('SSH_AGENTC_REQUEST_RSA_IDENTITIES'): _legacy_pubs,
msg_code('SSH2_AGENTC_REQUEST_IDENTITIES'): self.list_pubs,
msg_code('SSH2_AGENTC_SIGN_REQUEST'): self.sign_message,
msg_code('SSH_AGENTC_EXTENSION'): _unsupported_extension,
}
def handle(self, msg):
@@ -144,17 +147,26 @@ class Handler:
signature = self.conn.sign(blob=blob, identity=key['identity'])
except IOError:
return failure()
except Exception:
log.exception('signature with "%s" key failed', label)
raise
log.debug('signature: %r', signature)
try:
sig_bytes = key['verifier'](sig=signature, msg=blob)
log.info('signature status: OK')
except formats.ecdsa.BadSignatureError:
except formats.ecdsa.BadSignatureError as e:
log.exception('signature status: ERROR')
raise ValueError('invalid ECDSA signature')
raise ValueError('invalid ECDSA signature') from e
log.debug('signature size: %d bytes', len(sig_bytes))
data = util.frame(util.frame(key['type']), util.frame(sig_bytes))
code = util.pack('B', msg_code('SSH2_AGENT_SIGN_RESPONSE'))
return util.frame(code, data)
def _unsupported_extension(buf): # pylint: disable=unused-argument
code = util.pack('B', msg_code('SSH_AGENT_EXTENSION_FAILURE'))
return util.frame(code)

View File

@@ -17,12 +17,16 @@ PUBKEY_TEXT = ('ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzd'
class MockDevice(device.interface.Device): # pylint: disable=abstract-method
def connect(self): # pylint: disable=no-self-use
@classmethod
def package_name(cls):
return 'fake-device-agent'
def connect(self):
return mock.Mock()
def pubkey(self, identity, ecdh=False): # pylint: disable=unused-argument
assert self.conn
return PUBKEY
return formats.decompress_pubkey(pubkey=PUBKEY, curve_name=identity.curve_name)
def sign(self, identity, blob):
"""Sign given blob and return the signature (as bytes)."""
@@ -70,3 +74,52 @@ def test_ssh_agent():
c.device.sign = cancel_sign
with pytest.raises(IOError):
c.sign_ssh_challenge(blob=BLOB, identity=identity)
CHALLENGE_BLOB = (
b'\x00\x00\x00 \xe4\x08\x8e"J#\x83 \x05\x90\x1e\xa9\xf9C\xb1\xd2\x8f\xc3\x8c\xea\xd8\xf6E'
b'%q\xff\x07\xfa\xd8\x8b\xdf\xbd2\x00\x00\x00\x03git\x00\x00\x00\x0essh-connection\x00\x00'
b'\x00\tpublickey\x01\x00\x00\x00\x0bssh-ed25519\x00\x00\x003\x00\x00\x00\x0bssh-ed25519'
b'\x00\x00\x00 \xd1q\x1ab\xc6\xf0d\x19\xe2q<\x05\x0b\xdao\xa1\xcb\xae\xad\xc9\x0b\x16\xf3'
b'\xc2m\x84q8qU\xda\xb0'
)
def test_parse_ssh_challenge():
result = client.parse_ssh_blob(CHALLENGE_BLOB)
result['public_key'].pop('verifier')
assert result == {
'auth': b'publickey',
'conn': b'ssh-connection',
'key_type': b'ssh-ed25519',
'nonce': b'\xe4\x08\x8e"J#\x83 \x05\x90\x1e\xa9\xf9C\xb1\xd2\x8f\xc3\x8c\xea'
b'\xd8\xf6E%q\xff\x07\xfa\xd8\x8b\xdf\xbd',
'public_key': {'blob': b'\x00\x00\x00\x0bssh-ed25519\x00\x00\x00 \xd1'
b'q\x1ab\xc6\xf0d\x19\xe2q<\x05\x0b\xdao\xa1\xcb'
b'\xae\xad\xc9\x0b\x16\xf3\xc2m\x84q8qU\xda\xb0',
'curve': 'ed25519',
'fingerprint': '47:a3:26:af:0b:5d:a2:c3:91:ed:26:36:94:be:3a:d5',
'type': b'ssh-ed25519'},
'sshsig': False,
'user': b'git',
}
FILE_SIG_BLOB = (
b"SSHSIG\x00\x00\x00\x04file\x00\x00\x00\x00\x00\x00\x00\x06sha512\x00\x00\x00@r\xb7r\xfeM"
b"\xe5w\xf0#w\x1dbl\xca\to=\x90\xb69\xd1:u{\xe5\xe4\xf1\xb1\xa8C\xb8\xfcM\x91\x9f\x12\xa8"
b"\x1d`\x00\x848C<\x85\x8e\xf0o\xdab\xdcQ\xce\xf2\xda\xc3\xae\xa9\x1e%\x85\xcd\xe3'"
)
def test_parse_ssh_signature():
result = client.parse_ssh_blob(FILE_SIG_BLOB)
assert result == {
'hashalg': b'sha512',
'message': b'r\xb7r\xfeM\xe5w\xf0#w\x1dbl\xca\to=\x90\xb69\xd1:u{'
b'\xe5\xe4\xf1\xb1\xa8C\xb8\xfcM\x91\x9f\x12\xa8\x1d`\x00\x848C<'
b"\x85\x8e\xf0o\xdab\xdcQ\xce\xf2\xda\xc3\xae\xa9\x1e%\x85\xcd\xe3'",
'namespace': b'file',
'reserved': b'',
'sshsig': True,
}

View File

@@ -23,6 +23,26 @@ _public_key = (
'home\n'
)
_public_key_cert = (
'ecdsa-sha2-nistp256-cert-v01@openssh.com '
'AAAAKGVjZHNhLXNoYTItbmlzdHAyNTYtY2VydC12MDFAb3B'
'lbnNzaC5jb20AAAAgohlAP8H3LPYWz3+w/E+RGDxG6tNAEE'
'3Ao9Z6Pc66khEAAAAIbmlzdHAyNTYAAABBBGI2zqveJSB+g'
'eQEWG46OvGs2h3+0qu7tIdsH8WylrV19vttd7GR5rKvTWJt'
'8b9ErthmnFALelAFKOB/u50jsukAAAAAAAAAFQAAAAEAAAA'
'IdW5pdFRlc3QAAAAIAAAABHVzZXIAAAAAAAAAAP////////'
'//AAAAAAAAAIIAAAAVcGVybWl0LVgxMS1mb3J3YXJkaW5nA'
'AAAAAAAABdwZXJtaXQtYWdlbnQtZm9yd2FyZGluZwAAAAAA'
'AAAWcGVybWl0LXBvcnQtZm9yd2FyZGluZwAAAAAAAAAKcGV'
'ybWl0LXB0eQAAAAAAAAAOcGVybWl0LXVzZXItcmMAAAAAAA'
'AAAAAAADMAAAALc3NoLWVkMjU1MTkAAAAgf9gyPrF24CLZc'
'0rHoZuI1+yjBFWt66G8oUmm20yRO8IAAABTAAAAC3NzaC1l'
'ZDI1NTE5AAAAQCEgVgsR7fSgcTxuAWqMW4h42y7pt1BAKR4'
'HTRg178tl7Vx8WoRtQcNirX9eggBcTA+5ILWmeY3uDN+soW'
't7fwk= '
'home\n'
)
def test_parse_public_key():
key = formats.import_public_key(_public_key)
@@ -34,6 +54,16 @@ def test_parse_public_key():
assert key['type'] == b'ecdsa-sha2-nistp256'
def test_parse_public_key_cert():
key = formats.import_public_key(_public_key_cert)
assert key['name'] == b'home'
assert key['point'] == _point
assert key['curve'] == 'nist256p1'
assert key['fingerprint'] == 'ab:ab:5d:9f:f4:33:f4:d0:c3:68:65:3b:94:86:de:22' # nopep8
assert key['type'] == b'ecdsa-sha2-nistp256-cert-v01@openssh.com'
def test_decompress():
blob = '036236ceabde25207e81e404586e3a3af1acda1dfed2abbbb4876c1fc5b296b575'
vk = formats.decompress_pubkey(binascii.unhexlify(blob),

View File

@@ -2,6 +2,6 @@ from ..device import interface
def test_unicode():
i = interface.Identity(u'ko\u017eu\u0161\u010dek@host', 'ed25519')
i = interface.Identity('ko\u017eu\u0161\u010dek@host', 'ed25519')
assert i.to_bytes() == b'kozuscek@host'
assert sorted(i.items()) == [('host', 'host'), ('user', 'kozuscek')]

View File

@@ -78,12 +78,12 @@ def test_server_thread():
quit_event = threading.Event()
class FakeServer:
def accept(self): # pylint: disable=no-self-use
def accept(self):
if not connections:
raise socket.timeout()
return connections.pop(), 'address'
def getsockname(self): # pylint: disable=no-self-use
def getsockname(self):
return 'fake_server'
def handle_conn(conn):

View File

@@ -242,7 +242,7 @@ def which(cmd):
from shutil import which as _which
except ImportError:
# For Python 2
from backports.shutil_which import which as _which # pylint: disable=relative-import
from backports.shutil_which import which as _which
full_path = _which(cmd)
if full_path is None:
raise OSError('Cannot find {!r} in $PATH'.format(cmd))

View File

@@ -3,25 +3,30 @@ from setuptools import setup
setup(
name='libagent',
version='0.13.1',
version='0.14.6',
description='Using hardware wallets as SSH/GPG agent',
author='Roman Zeyde',
author_email='roman.zeyde@gmail.com',
url='http://github.com/romanz/trezor-agent',
packages=[
'libagent',
'libagent.age',
'libagent.device',
'libagent.gpg',
'libagent.ssh'
'libagent.signify',
'libagent.ssh',
],
install_requires=[
'bech32>=1.2.0',
'cryptography>=3.4.6',
'docutils>=0.14',
'python-daemon>=2.3.0',
'wheel>=0.32.3',
'backports.shutil_which>=3.5.1',
'ConfigArgParse>=0.12.1',
'python-daemon>=2.1.2',
'ecdsa>=0.13',
'ed25519>=1.4',
'pynacl>=1.4.0',
'mnemonic>=0.18',
'pymsgbox>=1.0.6',
'semver>=2.2',

View File

@@ -16,7 +16,7 @@ deps=
isort
commands=
pycodestyle libagent
# isort --skip-glob .tox -c -r libagent
isort --skip-glob .tox -c libagent
pylint --reports=no --rcfile .pylintrc libagent
pydocstyle libagent
coverage run --source libagent -m py.test -v libagent