mirror of
https://github.com/romanz/amodem.git
synced 2026-05-03 08:27:26 +08:00
Compare commits
86 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
58bbb24fb4 | ||
|
|
f28ea64294 | ||
|
|
b782b610a9 | ||
|
|
f4202095f6 | ||
|
|
a9bcabc664 | ||
|
|
83b4aeebf4 | ||
|
|
2e61c84ca6 | ||
|
|
e17ba3f42e | ||
|
|
cd972ed4f3 | ||
|
|
54e7ffb6a5 | ||
|
|
acd52cd771 | ||
|
|
c7384d7e0e | ||
|
|
7c76bb4df7 | ||
|
|
d3817a4eec | ||
|
|
debcda4ce8 | ||
|
|
22c309cd03 | ||
|
|
c300610824 | ||
|
|
bfccf879ce | ||
|
|
3044cfe932 | ||
|
|
0bbb52f24e | ||
|
|
7cf3c520d3 | ||
|
|
91e7970d53 | ||
|
|
8322cf455e | ||
|
|
3fbb2c624e | ||
|
|
1cfdddc33a | ||
|
|
523dcb139a | ||
|
|
751ef7321b | ||
|
|
38a3131a07 | ||
|
|
7c01789529 | ||
|
|
1763b0ea0c | ||
|
|
8c5a9bfe02 | ||
|
|
807c25a9fc | ||
|
|
b177da9ee8 | ||
|
|
4242599114 | ||
|
|
54e670c7ee | ||
|
|
5832d4a67b | ||
|
|
8d4536b37a | ||
|
|
b1b3e4b7ea | ||
|
|
80bfda7899 | ||
|
|
1166917461 | ||
|
|
d7f6ceb429 | ||
|
|
a8f2d74d02 | ||
|
|
df84c4c15f | ||
|
|
0662ced2f4 | ||
|
|
471d0e03e7 | ||
|
|
e4d16a361a | ||
|
|
c6f30083ff | ||
|
|
23f8ef09a5 | ||
|
|
f0769655ad | ||
|
|
2a6a47f400 | ||
|
|
47c827519e | ||
|
|
a9117c965c | ||
|
|
8107e6378c | ||
|
|
85d2da5460 | ||
|
|
5e5a96b96f | ||
|
|
69c5c57489 | ||
|
|
b9db213912 | ||
|
|
6c2b880b7d | ||
|
|
37510a2d75 | ||
|
|
b6de68e95c | ||
|
|
ee4b1fcdb6 | ||
|
|
6d55512619 | ||
|
|
b902f43ba1 | ||
|
|
338a075ed5 | ||
|
|
bcea720e95 | ||
|
|
1c6d2cb65a | ||
|
|
53fe6cd5ad | ||
|
|
a0e7aae1d2 | ||
|
|
7f4269ab88 | ||
|
|
36e7afde17 | ||
|
|
020572ef5f | ||
|
|
dbae284487 | ||
|
|
f5b99c0794 | ||
|
|
f66da28cc3 | ||
|
|
c3853e97c7 | ||
|
|
32eff19bb6 | ||
|
|
fd182e744f | ||
|
|
a12202d809 | ||
|
|
d0e7fa7cca | ||
|
|
e1bbdb4bcc | ||
|
|
4d9d6c0741 | ||
|
|
4c3c5a7c53 | ||
|
|
362ddcc707 | ||
|
|
88ff57187f | ||
|
|
52d840cbbb | ||
|
|
8c22e5030b |
@@ -1,7 +1,6 @@
|
||||
[bumpversion]
|
||||
commit = True
|
||||
tag = True
|
||||
current_version = 0.14.0
|
||||
current_version = 0.14.7
|
||||
|
||||
[bumpversion:file:setup.py]
|
||||
|
||||
|
||||
24
.github/workflows/ci.yml
vendored
Normal file
24
.github/workflows/ci.yml
vendored
Normal file
@@ -0,0 +1,24 @@
|
||||
name: Build
|
||||
|
||||
on: [push]
|
||||
|
||||
jobs:
|
||||
build:
|
||||
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
python-version: ['3.7', '3.8', '3.9', '3.10', '3.11']
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- name: Set up Python ${{ matrix.python-version }}
|
||||
uses: actions/setup-python@v2
|
||||
with:
|
||||
python-version: ${{ matrix.python-version }}
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
python -m pip install --upgrade pip tox
|
||||
- name: Build and test
|
||||
run: |
|
||||
tox
|
||||
@@ -1,5 +1,4 @@
|
||||
[MESSAGES CONTROL]
|
||||
disable=invalid-name, missing-docstring, locally-disabled, unbalanced-tuple-unpacking,no-else-return,fixme,duplicate-code,cyclic-import,import-outside-toplevel
|
||||
|
||||
disable=invalid-name, missing-docstring, locally-disabled, unbalanced-tuple-unpacking,no-else-return,fixme,duplicate-code,cyclic-import,import-outside-toplevel,consider-using-with,consider-using-f-string,unspecified-encoding
|
||||
[SIMILARITIES]
|
||||
min-similarity-lines=5
|
||||
|
||||
29
.travis.yml
29
.travis.yml
@@ -1,29 +0,0 @@
|
||||
sudo: false
|
||||
language: python
|
||||
python:
|
||||
- "3.5"
|
||||
- "3.6"
|
||||
- "3.7"
|
||||
- "3.8"
|
||||
|
||||
cache:
|
||||
directories:
|
||||
- $HOME/.cache/pip
|
||||
|
||||
before_install:
|
||||
- pip install -U pip wheel
|
||||
- pip install -U setuptools
|
||||
- pip install -U pylint coverage pycodestyle pydocstyle
|
||||
|
||||
install:
|
||||
- pip install -U -e .
|
||||
|
||||
script:
|
||||
- pycodestyle libagent
|
||||
- pylint --reports=no --rcfile .pylintrc libagent
|
||||
- pydocstyle libagent
|
||||
- coverage run --source libagent/ -m py.test -v
|
||||
|
||||
after_success:
|
||||
- coverage report
|
||||
|
||||
15
README.md
15
README.md
@@ -1,11 +1,11 @@
|
||||
# Hardware-based SSH/GPG agent
|
||||
# Hardware-based SSH/GPG/age agent
|
||||
|
||||
[](https://travis-ci.org/romanz/trezor-agent)
|
||||
[](https://github.com/romanz/trezor-agent/actions)
|
||||
[](https://gitter.im/romanz/trezor-agent)
|
||||
|
||||
This project allows you to use various hardware security devices to operate GPG and SSH. Instead of keeping your key on your computer and decrypting it with a passphrase when you want to use it, the key is generated and stored on the device and never reaches your computer. Read more about the design [here](doc/DESIGN.md).
|
||||
This project allows you to use various hardware security devices to operate GPG, SSH and age. Instead of keeping your key on your computer and decrypting it with a passphrase when you want to use it, the key is generated and stored on the device and never reaches your computer. Read more about the design [here](doc/DESIGN.md).
|
||||
|
||||
You can do things like sign your emails, git commits, and software packages, manage your passwords (with [pass](https://www.passwordstore.org/) and [gopass](https://www.justwatch.com/gopass/), among others), authenticate web tunnels and file transfers, and more.
|
||||
You can do things like sign your emails, git commits, and software packages, manage your passwords (with [pass](https://www.passwordstore.org/) and [passage](https://github.com/FiloSottile/passage), among others), authenticate web tunnels and file transfers, and more.
|
||||
|
||||
See the following blog posts about this tool:
|
||||
|
||||
@@ -14,7 +14,7 @@ See the following blog posts about this tool:
|
||||
- [TREZOR Firmware 1.4.0 — GPG decryption support](https://www.reddit.com/r/TREZOR/comments/50h8r9/new_trezor_firmware_fidou2f_and_initial_ethereum/d7420q7/)
|
||||
- [A Step by Step Guide to Securing your SSH Keys with the Ledger Nano S](https://thoughts.t37.net/a-step-by-step-guide-to-securing-your-ssh-keys-with-the-ledger-nano-s-92e58c64a005)
|
||||
|
||||
Currently [TREZOR One](https://trezor.io/), [TREZOR Model T](https://trezor.io/), [Keepkey](https://www.keepkey.com/), and [Ledger Nano S](https://www.ledgerwallet.com/products/ledger-nano-s) are supported.
|
||||
Currently [TREZOR One](https://trezor.io/), [TREZOR Model T](https://trezor.io/), [Keepkey](https://www.keepkey.com/), [Ledger Nano S](https://www.ledgerwallet.com/products/ledger-nano-s), and [OnlyKey](https://onlykey.io) are supported.
|
||||
|
||||
## Components
|
||||
|
||||
@@ -22,9 +22,11 @@ This repository contains source code for one library as well as
|
||||
agents to interact with several different hardware devices:
|
||||
|
||||
* [`libagent`](https://pypi.org/project/libagent/): shared library
|
||||
* [`trezor-agent`](https://pypi.org/project/trezor-agent/): Using Trezor as hardware-based SSH/PGP agent
|
||||
* [`trezor-agent`](https://pypi.org/project/trezor-agent/): Using Trezor as hardware-based SSH/PGP/age agent
|
||||
* [`ledger_agent`](https://pypi.org/project/ledger_agent/): Using Ledger as hardware-based SSH/PGP agent
|
||||
* [`jade_agent`](https://pypi.org/project/jade_agent/): Using Blockstream Jade as hardware-based SSH/PGP agent
|
||||
* [`keepkey_agent`](https://pypi.org/project/keepkey_agent/): Using KeepKey as hardware-based SSH/PGP agent
|
||||
* [`onlykey-agent`](https://pypi.org/project/onlykey-agent/): Using OnlyKey as hardware-based SSH/PGP agent
|
||||
|
||||
|
||||
The [/releases](/releases) page on Github contains the `libagent`
|
||||
@@ -38,4 +40,5 @@ releases.
|
||||
Note: If you're using Windows, see [trezor-ssh-agent](https://github.com/martin-lizner/trezor-ssh-agent) by Martin Lízner.
|
||||
|
||||
* **GPG** instructions and common use cases are [here](doc/README-GPG.md)
|
||||
* **age** instructions and common use cases are [here](doc/README-age.md)
|
||||
* Instructions to configure a Trezor-style **PIN entry** program are [here](doc/README-PINENTRY.md)
|
||||
|
||||
7
agents/jade/jade_agent.py
Normal file
7
agents/jade/jade_agent.py
Normal file
@@ -0,0 +1,7 @@
|
||||
import libagent.gpg
|
||||
import libagent.ssh
|
||||
from libagent.device.jade import BlockstreamJade as DeviceType
|
||||
|
||||
ssh_agent = lambda: libagent.ssh.main(DeviceType)
|
||||
gpg_tool = lambda: libagent.gpg.main(DeviceType)
|
||||
gpg_agent = lambda: libagent.gpg.run_agent(DeviceType)
|
||||
41
agents/jade/setup.py
Normal file
41
agents/jade/setup.py
Normal file
@@ -0,0 +1,41 @@
|
||||
#!/usr/bin/env python
|
||||
from setuptools import setup
|
||||
|
||||
setup(
|
||||
name='jade_agent',
|
||||
version='0.1.0',
|
||||
description='Using Blockstream Jade as hardware SSH agent',
|
||||
author='Jamie C. Driver',
|
||||
author_email='jamie@blockstream.com',
|
||||
url='http://github.com/romanz/trezor-agent',
|
||||
scripts=['jade_agent.py'],
|
||||
install_requires=[
|
||||
'libagent>=0.14.5',
|
||||
# Jade py api from github source, v0.1.37
|
||||
'jadepy[requests] @ git+https://github.com/Blockstream/Jade.git@0.1.37'
|
||||
],
|
||||
platforms=['POSIX'],
|
||||
classifiers=[
|
||||
'Environment :: Console',
|
||||
'Development Status :: 4 - Beta',
|
||||
'Intended Audience :: Developers',
|
||||
'Intended Audience :: Information Technology',
|
||||
'Intended Audience :: System Administrators',
|
||||
'License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)',
|
||||
'Operating System :: POSIX',
|
||||
'Programming Language :: Python :: 2.7',
|
||||
'Programming Language :: Python :: 3.4',
|
||||
'Programming Language :: Python :: 3.5',
|
||||
'Programming Language :: Python :: 3.6',
|
||||
'Topic :: Software Development :: Libraries :: Python Modules',
|
||||
'Topic :: System :: Networking',
|
||||
'Topic :: Communications',
|
||||
'Topic :: Security',
|
||||
'Topic :: Utilities',
|
||||
],
|
||||
entry_points={'console_scripts': [
|
||||
'jade-agent = jade_agent:ssh_agent',
|
||||
'jade-gpg = jade_agent:gpg_tool',
|
||||
'jade-gpg-agent = jade_agent:gpg_agent',
|
||||
]},
|
||||
)
|
||||
7
agents/onlykey/onlykey_agent.py
Normal file
7
agents/onlykey/onlykey_agent.py
Normal file
@@ -0,0 +1,7 @@
|
||||
import libagent.gpg
|
||||
import libagent.ssh
|
||||
from libagent.device.onlykey import OnlyKey as DeviceType
|
||||
|
||||
ssh_agent = lambda: libagent.ssh.main(DeviceType)
|
||||
gpg_tool = lambda: libagent.gpg.main(DeviceType)
|
||||
gpg_agent = lambda: libagent.gpg.run_agent(DeviceType)
|
||||
40
agents/onlykey/setup.py
Normal file
40
agents/onlykey/setup.py
Normal file
@@ -0,0 +1,40 @@
|
||||
#!/usr/bin/env python
|
||||
from setuptools import setup
|
||||
|
||||
setup(
|
||||
name='onlykey-agent',
|
||||
version='1.2.0',
|
||||
description='Using onlykey as hardware SSH/GPG agent',
|
||||
author='CryptoTrust',
|
||||
author_email='t@crp.to',
|
||||
url='http://github.com/trustcrypto/onlykey-agent',
|
||||
scripts=['onlykey_agent.py'],
|
||||
install_requires=[
|
||||
'libagent>=0.14.2',
|
||||
'onlykey>=1.2.0'
|
||||
],
|
||||
platforms=['POSIX'],
|
||||
classifiers=[
|
||||
'Environment :: Console',
|
||||
'Development Status :: 4 - Beta',
|
||||
'Intended Audience :: Developers',
|
||||
'Intended Audience :: Information Technology',
|
||||
'Intended Audience :: System Administrators',
|
||||
'License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)',
|
||||
'Operating System :: POSIX',
|
||||
'Programming Language :: Python :: 3.5',
|
||||
'Programming Language :: Python :: 3.6',
|
||||
'Programming Language :: Python :: 3.7',
|
||||
'Programming Language :: Python :: 3.8',
|
||||
'Topic :: Software Development :: Libraries :: Python Modules',
|
||||
'Topic :: System :: Networking',
|
||||
'Topic :: Communications',
|
||||
'Topic :: Security',
|
||||
'Topic :: Utilities',
|
||||
],
|
||||
entry_points={'console_scripts': [
|
||||
'onlykey-agent = onlykey_agent:ssh_agent',
|
||||
'onlykey-gpg = onlykey_agent:gpg_tool',
|
||||
'onlykey-gpg-agent = onlykey_agent:gpg_agent',
|
||||
]},
|
||||
)
|
||||
@@ -3,15 +3,15 @@ from setuptools import setup
|
||||
|
||||
setup(
|
||||
name='trezor_agent',
|
||||
version='0.10.0',
|
||||
version='0.12.0',
|
||||
description='Using Trezor as hardware SSH/GPG agent',
|
||||
author='Roman Zeyde',
|
||||
author_email='roman.zeyde@gmail.com',
|
||||
url='http://github.com/romanz/trezor-agent',
|
||||
scripts=['trezor_agent.py'],
|
||||
install_requires=[
|
||||
'libagent>=0.13.0',
|
||||
'trezor[hidapi]>=0.12.0,<0.13'
|
||||
'libagent>=0.14.0',
|
||||
'trezor[hidapi]>=0.13'
|
||||
],
|
||||
platforms=['POSIX'],
|
||||
classifiers=[
|
||||
@@ -24,6 +24,8 @@ setup(
|
||||
'Operating System :: POSIX',
|
||||
'Programming Language :: Python :: 3.5',
|
||||
'Programming Language :: Python :: 3.6',
|
||||
'Programming Language :: Python :: 3.7',
|
||||
'Programming Language :: Python :: 3.8',
|
||||
'Topic :: Software Development :: Libraries :: Python Modules',
|
||||
'Topic :: System :: Networking',
|
||||
'Topic :: Communications',
|
||||
@@ -34,5 +36,7 @@ setup(
|
||||
'trezor-agent = trezor_agent:ssh_agent',
|
||||
'trezor-gpg = trezor_agent:gpg_tool',
|
||||
'trezor-gpg-agent = trezor_agent:gpg_agent',
|
||||
'trezor-signify = trezor_agent:signify_tool',
|
||||
'age-plugin-trezor = trezor_agent:age_tool', # see https://github.com/str4d/rage/blob/main/age-plugin/README.md
|
||||
]},
|
||||
)
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
import libagent.gpg
|
||||
import libagent.ssh
|
||||
from libagent import age, signify, gpg, ssh
|
||||
from libagent.device.trezor import Trezor as DeviceType
|
||||
|
||||
ssh_agent = lambda: libagent.ssh.main(DeviceType)
|
||||
gpg_tool = lambda: libagent.gpg.main(DeviceType)
|
||||
gpg_agent = lambda: libagent.gpg.run_agent(DeviceType)
|
||||
age_tool = lambda: age.main(DeviceType)
|
||||
ssh_agent = lambda: ssh.main(DeviceType)
|
||||
gpg_tool = lambda: gpg.main(DeviceType)
|
||||
gpg_agent = lambda: gpg.run_agent(DeviceType)
|
||||
signify_tool = lambda: signify.main(DeviceType)
|
||||
|
||||
@@ -6,7 +6,7 @@ SSH and GPG do this by means of a simple interprocess communication protocol (us
|
||||
|
||||
These two agents make the connection between the front end (e.g. a `gpg --sign` command, or an `ssh user@fqdn`). And then they wait for a request from the 'front end', and then do the actual asking for a password and subsequent using the private key to sign or decrypt something.
|
||||
|
||||
The various hardware wallets (Trezor, KeepKey and Ledger) each have the ability (as of Firmware 1.3.4) to use the NIST P-256 elliptic curve to sign, encrypt or decrypt. This curve can be used with S/MIME, GPG and SSH.
|
||||
The various hardware wallets (Trezor, KeepKey, Ledger and Jade) each have the ability (as of Firmware 1.3.4) to use the NIST P-256 elliptic curve to sign, encrypt or decrypt. This curve can be used with S/MIME, GPG and SSH.
|
||||
|
||||
So when you `ssh` to a machine - rather than consult the normal ssh-agent (which in turn will use your private SSH key in files such as `~/.ssh/id_rsa`) -- the trezor-agent will aks your hardware wallet to use its private key to sign the challenge.
|
||||
|
||||
@@ -30,7 +30,7 @@ So taking a commmand such as:
|
||||
|
||||
The `trezor-agent` will take the `user`@`fqdn.com`; canonicalise it (e.g. to add the ssh default port number if none was specified) and then apply some simple hashing (See [SLIP-0013 : Authentication using deterministic hierarchy][2]). The resulting 128bit hash is then used to construct a lead 'HD node' that contains an extened public private *child* key.
|
||||
|
||||
This way they keypair is specific to the server/hostname/port and protocol combination used. And it is this private key that is used to sign the nonce passed by the SSH server (as opposed to the master key).
|
||||
This way the keypair is specific to the server/hostname/port and protocol combination used. And it is this private key that is used to sign the nonce passed by the SSH server (as opposed to the master key).
|
||||
|
||||
The `trezor-agent` then instructs SSH to connect to the server. It will then engage in the normal challenge response process, ask the hardware wallet to blindly sign any nonce flashed by the server with the derived child private key and return this to the server. It then hands over to normal SSH for the rest of the logged in session.
|
||||
|
||||
|
||||
@@ -61,7 +61,7 @@ gpg (GnuPG) 2.1.15
|
||||
|
||||
* [TREZOR firmware releases](https://wallet.trezor.io/data/firmware/releases.json): `1.4.2+`
|
||||
|
||||
2. Make sure that your `udev` rules are configured [correctly](https://doc.satoshilabs.com/trezor-user/settingupchromeonlinux.html#manual-configuration-of-udev-rules).
|
||||
2. Make sure that your `udev` rules are configured [correctly](https://wiki.trezor.io/Udev_rules).
|
||||
|
||||
3. Then, install the latest [trezor_agent](https://pypi.python.org/pypi/trezor_agent) package:
|
||||
|
||||
@@ -131,7 +131,52 @@ Then, install the latest [keepkey_agent](https://pypi.python.org/pypi/keepkey_ag
|
||||
$ pip3 install --user -e trezor-agent/agents/ledger
|
||||
```
|
||||
|
||||
# 5. Installation Troubleshooting
|
||||
# 5. Install the OnlyKey agent
|
||||
|
||||
1. Make sure you are running the latest firmware version on your OnlyKey:
|
||||
|
||||
* [OnlyKey Firmware Upgrade Guide](https://docs.crp.to/upgradeguide.html)
|
||||
|
||||
2. Make sure that your `udev` rules are configured [correctly](https://docs.crp.to/linux.html#udev-rule).
|
||||
3. Then, install the latest [onlykey-agent](https://pypi.python.org/pypi/onlykey-agent) package:
|
||||
|
||||
```
|
||||
$ pip3 install onlykey-agent
|
||||
```
|
||||
|
||||
Or, directly from the latest source code:
|
||||
|
||||
```
|
||||
$ git clone https://github.com/romanz/trezor-agent
|
||||
$ pip3 install --user -e trezor-agent
|
||||
$ pip3 install --user -e trezor-agent/agents/onlykey
|
||||
```
|
||||
|
||||
# 6. Install the Blockstream Jade agent
|
||||
|
||||
1. Make sure you are running the latest firmware version on your Blockstream Jade:
|
||||
|
||||
* [Jade firmware releases](https://github.com/Blockstream/Jade/blob/master/CHANGELOG.md): `0.1.33+`
|
||||
|
||||
2. Make sure that your `udev` rules are configured [correctly](https://github.com/bitcoin-core/HWI/blob/master/hwilib/udev/55-usb-jade.rules).
|
||||
|
||||
3. If necessary, ensure the user is added to the [`dialout` group](https://help.blockstream.com/hc/en-us/articles/900005443223-My-Blockstream-Jade-is-not-recognized-by-my-computer)
|
||||
|
||||
4. Then, install the latest [jade-agent](https://pypi.python.org/pypi/jade-agent) package:
|
||||
|
||||
```
|
||||
$ pip3 install jade-agent
|
||||
```
|
||||
|
||||
Or, directly from the latest source code:
|
||||
|
||||
```
|
||||
$ git clone https://github.com/romanz/trezor-agent
|
||||
$ pip3 install --user -e trezor-agent
|
||||
$ pip3 install --user -e trezor-agent/agents/jade
|
||||
```
|
||||
|
||||
# 7. Installation Troubleshooting
|
||||
|
||||
If there is an import problem with the installed `protobuf` package,
|
||||
see [this issue](https://github.com/romanz/trezor-agent/issues/28) for fixing it.
|
||||
|
||||
@@ -5,7 +5,7 @@ and please let me [know](https://github.com/romanz/trezor-agent/issues/new) if s
|
||||
work well for you. If possible:
|
||||
|
||||
* record the session (e.g. using [asciinema](https://asciinema.org))
|
||||
* attach the GPG agent log from `~/.gnupg/{trezor,ledger}/gpg-agent.log` (can be [encrypted](https://keybase.io/romanz))
|
||||
* attach the GPG agent log from `~/.gnupg/{trezor,ledger,jade}/gpg-agent.log` (can be [encrypted](https://keybase.io/romanz))
|
||||
|
||||
Thanks!
|
||||
|
||||
@@ -18,14 +18,14 @@ Thanks!
|
||||
Run
|
||||
|
||||
```
|
||||
$ (trezor|keepkey|ledger)-gpg init "Roman Zeyde <roman.zeyde@gmail.com>"
|
||||
$ (trezor|keepkey|ledger|jade|onlykey)-gpg init "Roman Zeyde <roman.zeyde@gmail.com>"
|
||||
```
|
||||
|
||||
Follow the instructions provided to complete the setup. Keep note of the timestamp value which you'll need if you want to regenerate the key later.
|
||||
|
||||
If you'd like a Trezor-style PIN entry program, follow [these instructions](README-PINENTRY.md).
|
||||
|
||||
2. Add `export GNUPGHOME=~/.gnupg/(trezor|keepkey|ledger)` to your `.bashrc` or other environment file.
|
||||
2. Add `export GNUPGHOME=~/.gnupg/(trezor|keepkey|ledger|jade|onlykey)` to your `.bashrc` or other environment file.
|
||||
|
||||
This `GNUPGHOME` contains your hardware keyring and agent settings. This agent software assumes all keys are backed by hardware devices so you can't use standard GPG keys in `GNUPGHOME` (if you do mix keys you'll receive an error when you attempt to use them).
|
||||
|
||||
@@ -87,7 +87,7 @@ when committing to git.
|
||||
|
||||
### Manage passwords
|
||||
|
||||
Password managers such as [pass](https://www.passwordstore.org/) and [gopass](https://www.justwatch.com/gopass/) rely on GPG for encryption so you can use your device with them too.
|
||||
Password managers such as [pass](https://www.passwordstore.org/) rely on GPG for encryption so you can use your device with them too.
|
||||
|
||||
##### With `pass`:
|
||||
|
||||
@@ -203,7 +203,7 @@ Follow [these instructions](enigmail.md) to set up Enigmail in Thunderbird.
|
||||
|
||||
##### 1. Create these files in `~/.config/systemd/user`
|
||||
|
||||
Replace `trezor` with `keepkey` or `ledger` as required.
|
||||
Replace `trezor` with `keepkey` or `ledger` or `jade` or `onlykey` as required.
|
||||
|
||||
###### `trezor-gpg-agent.service`
|
||||
|
||||
@@ -213,7 +213,7 @@ Description=trezor-gpg-agent
|
||||
Requires=trezor-gpg-agent.socket
|
||||
|
||||
[Service]
|
||||
Type=Simple
|
||||
Type=simple
|
||||
Environment="GNUPGHOME=%h/.gnupg/trezor"
|
||||
Environment="PATH=/bin:/usr/bin:/usr/local/bin:%h/.local/bin"
|
||||
ExecStart=/usr/bin/trezor-gpg-agent -vv
|
||||
|
||||
@@ -1,16 +1,16 @@
|
||||
# SSH Agent
|
||||
|
||||
## 1. Configuration
|
||||
## Configuration
|
||||
|
||||
SSH requires no configuration, but you may put common command line options in `~/.ssh/agent.conf` to avoid repeating them in every invocation.
|
||||
|
||||
See `(trezor|keepkey|ledger)-agent -h` for details on supported options and the configuration file format.
|
||||
See `(trezor|keepkey|ledger|jade|onlykey)-agent -h` for details on supported options and the configuration file format.
|
||||
|
||||
If you'd like a Trezor-style PIN entry program, follow [these instructions](README-PINENTRY.md).
|
||||
|
||||
## 2. Usage
|
||||
## Usage
|
||||
|
||||
Use the `(trezor|keepkey|ledger)-agent` program to work with SSH. It has three main modes of operation:
|
||||
Use the `(trezor|keepkey|ledger|jade|onlykey)-agent` program to work with SSH. It has three main modes of operation:
|
||||
|
||||
##### 1. Export public keys
|
||||
|
||||
@@ -18,7 +18,7 @@ To get your public key so you can add it to `authorized_hosts` or allow
|
||||
ssh access to a service that supports it, run:
|
||||
|
||||
```
|
||||
(trezor|keepkey|ledger)-agent identity@myhost
|
||||
(trezor|keepkey|ledger|jade|onlykey)-agent identity@myhost
|
||||
```
|
||||
|
||||
The identity (ex: `identity@myhost`) is used to derive the public key and is added as a comment to the exported key string.
|
||||
@@ -28,7 +28,7 @@ The identity (ex: `identity@myhost`) is used to derive the public key and is add
|
||||
Run
|
||||
|
||||
```
|
||||
$ (trezor|keepkey|ledger)-agent identity@myhost -- COMMAND --WITH --ARGUMENTS
|
||||
$ (trezor|keepkey|ledger|jade|onlykey)-agent identity@myhost -- COMMAND --WITH --ARGUMENTS
|
||||
```
|
||||
|
||||
to start the agent in the background and execute the command with environment variables set up to use the SSH agent. The specified identity is used for all SSH connections. The agent will exit after the command completes.
|
||||
@@ -36,28 +36,28 @@ Note the `--` separator, which is used to separate `trezor-agent`'s arguments fr
|
||||
|
||||
Example:
|
||||
```
|
||||
(trezor|keepkey|ledger)-agent -e ed25519 bob@example.com -- rsync up/ bob@example.com:/home/bob
|
||||
(trezor|keepkey|ledger|jade|onlykey)-agent -e ed25519 bob@example.com -- rsync up/ bob@example.com:/home/bob
|
||||
```
|
||||
|
||||
As a shortcut you can run
|
||||
|
||||
```
|
||||
$ (trezor|keepkey|ledger)-agent identity@myhost -s
|
||||
$ (trezor|keepkey|ledger|jade|onlykey)-agent identity@myhost -s
|
||||
```
|
||||
|
||||
to start a shell with the proper environment.
|
||||
|
||||
##### 3. Connect to a server directly via `(trezor|keepkey|ledger)-agent`
|
||||
##### 3. Connect to a server directly via `(trezor|keepkey|ledger|jade|onlykey)-agent`
|
||||
|
||||
If you just want to connect to a server this is the simplest way to do it:
|
||||
|
||||
```
|
||||
$ (trezor|keepkey|ledger)-agent user@remotehost -c
|
||||
$ (trezor|keepkey|ledger|jade|onlykey)-agent user@remotehost -c
|
||||
```
|
||||
|
||||
The identity `user@remotehost` is used as both the destination user and host as well as for key derivation, so you must generate a separate key for each host you connect to.
|
||||
|
||||
## 3. Common Use Cases
|
||||
## Common Use Cases
|
||||
|
||||
### Start a single SSH session
|
||||
[](https://asciinema.org/a/22959)
|
||||
@@ -114,11 +114,47 @@ The same works for Mercurial (e.g. on [BitBucket](https://confluence.atlassian.c
|
||||
$ ssh-shell
|
||||
$ hg push
|
||||
|
||||
### Git commit signing
|
||||
|
||||
For more details, see the following great blog post: https://calebhearth.com/sign-git-with-ssh
|
||||
|
||||
$ trezor-agent -e ed25519 user@host --shell
|
||||
$ ssh-add -L
|
||||
ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIDeAmtnhHlyg4dzGP3/OF4WHX7NoYhClS98EK22q/O5+ <ssh://user@host|ed25519>
|
||||
$ git config --local user.signingkey "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIDeAmtnhHlyg4dzGP3/OF4WHX7NoYhClS98EK22q/O5+"
|
||||
$ git config --local gpg.format ssh
|
||||
$ git config --local commit.gpgsign true
|
||||
|
||||
$ git config --local gpg.ssh.allowedSignersFile $PWD/.git/allowed-signers
|
||||
$ echo "user@host ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIDeAmtnhHlyg4dzGP3/OF4WHX7NoYhClS98EK22q/O5+" >> $PWD/.git/allowed-signers
|
||||
|
||||
$ git commit --allow-empty --message="Testing SSH signing"
|
||||
[master 4a1f730] Testing SSH signing
|
||||
|
||||
$ git log --show-signature -1
|
||||
commit 4a1f730d7f70fd31a0bda334734d0ac4dc9d97ad (HEAD -> master)
|
||||
Good "git" signature for user@host with ED25519 key SHA256:aESFjLsydJHQg1vnAkq42jQDkCcn4Tde4J+v+0XFmwM
|
||||
Author: Roman Zeyde <me@romanzey.de>
|
||||
Date: Fri Oct 21 18:34:09 2022 +0300
|
||||
|
||||
Testing SSH signing
|
||||
|
||||
$ cat .git/config
|
||||
[user]
|
||||
signingkey = ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIDeAmtnhHlyg4dzGP3/OF4WHX7NoYhClS98EK22q/O5+
|
||||
[gpg]
|
||||
format = ssh
|
||||
[commit]
|
||||
gpgsign = true
|
||||
[gpg "ssh"]
|
||||
allowedSignersFile = /home/user/Code/test-git-ssh-sig/.git/allowed-signers
|
||||
|
||||
|
||||
### Start the agent as a systemd unit
|
||||
|
||||
##### 1. Create these files in `~/.config/systemd/user`
|
||||
|
||||
Replace `trezor` with `keepkey` or `ledger` as required.
|
||||
Replace `trezor` with `keepkey` or `ledger` or `jade` or `onlykey` as required.
|
||||
|
||||
###### `trezor-ssh-agent.service`
|
||||
|
||||
@@ -139,9 +175,9 @@ If you've installed `trezor-agent` locally you may have to change the path in `E
|
||||
|
||||
Replace `IDENTITY` with the identity you used when exporting the public key.
|
||||
|
||||
`IDENTITY` can be a path (starting with `/`) to a file containing a list of public keys
|
||||
generated by Trezor. I.e. `/home/myUser/.ssh/trezor.conf` with one public key per line.
|
||||
This is a more convenient way to have a systemd setup that has to handle multiple
|
||||
`IDENTITY` can be a path (starting with `/`) to a file containing a list of public keys
|
||||
generated by Trezor. I.e. `/home/myUser/.ssh/trezor.conf` with one public key per line.
|
||||
This is a more convenient way to have a systemd setup that has to handle multiple
|
||||
keys/hosts.
|
||||
|
||||
When updating the file, make sure to restart trezor-agent.
|
||||
@@ -185,17 +221,55 @@ export SSH_AUTH_SOCK=$(systemctl show --user --property=Listen trezor-ssh-agent.
|
||||
```
|
||||
|
||||
Make sure the SSH_AUTH_SOCK variable matches the location of the socket that trezor-agent
|
||||
is listening on: `ps -x | grep trezor-agent`. In this setup trezor-agent should start
|
||||
is listening on: `ps -x | grep trezor-agent`. In this setup trezor-agent should start
|
||||
automatically when the socket is opened.
|
||||
|
||||
##### 4. SSH will now automatically use your device key in all terminals.
|
||||
|
||||
## 4. Troubleshooting
|
||||
## SSH Signatures
|
||||
|
||||
SSH and ssh-keygen can make and verify signatures, see https://www.agwa.name/blog/post/ssh_signatures.
|
||||
|
||||
See here for more ssh protocol details:
|
||||
|
||||
- https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.sshsig
|
||||
- https://github.com/openssh/openssh-portable/blob/master/sshsig.c
|
||||
- https://github.com/openssh/openssh-portable/commit/2a9c9f7272c1e8665155118fe6536bebdafb6166
|
||||
|
||||
|
||||
##### generate SSH public key
|
||||
```
|
||||
$ trezor-agent -e ed25519 git@github.com | tee ~/.ssh/trezor-github.pub
|
||||
ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIIvcbhXyaXXNytCLTDfEMlLuwEhtfo0XmPP1U5RsnOZ4 <ssh://git@github.com|ed25519>
|
||||
```
|
||||
##### sign the given file using TREZOR
|
||||
```
|
||||
$ trezor-agent -e ed25519 git@github.com -- ssh-keygen -Y sign -f ~/.ssh/trezor-github.pub -n file README.md
|
||||
Signing file README.md
|
||||
Write signature to README.md.sig
|
||||
```
|
||||
##### set allowed identities for verification (using the above public key)
|
||||
```
|
||||
$ cat allowed
|
||||
git@github.com ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIIvcbhXyaXXNytCLTDfEMlLuwEhtfo0XmPP1U5RsnOZ4 <ssh://git@github.com|ed25519>
|
||||
```
|
||||
##### verify the above signature
|
||||
```
|
||||
$ ssh-keygen -Y verify -f allowed -I git@github.com -n file -s README.md.sig -vvv < README.md
|
||||
debug1: sshsig_verify_fd: signature made with hash "sha512"
|
||||
debug1: sshsig_wrap_verify: verify message length 64
|
||||
debug1: Valid (unverified) signature from key SHA256:6UBhPb5SOoCUfasGC1/aCBegYov0/P3ajd6eNbYg77A
|
||||
debug1: parse_principals_key_and_options: allowed:1: matched principal "git@github.com"
|
||||
debug1: allowed:1: matched key and principal
|
||||
Good "file" signature for git@github.com with ED25519 key SHA256:6UBhPb5SOoCUfasGC1/aCBegYov0/P3ajd6eNbYg77A
|
||||
```
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
If SSH connection fails to work, please open an [issue](https://github.com/romanz/trezor-agent/issues)
|
||||
with a verbose log attached (by running `trezor-agent -vv`) .
|
||||
|
||||
##### `IdentitiesOnly` SSH option
|
||||
#### `IdentitiesOnly` SSH option
|
||||
|
||||
Note that your local SSH configuration may ignore `trezor-agent`, if it has `IdentitiesOnly` option set to `yes`.
|
||||
|
||||
|
||||
77
doc/README-age.md
Normal file
77
doc/README-age.md
Normal file
@@ -0,0 +1,77 @@
|
||||
# `age` Agent
|
||||
|
||||
Note: the age-related code is still under development, so please try the current implementation
|
||||
and please let me [know](https://github.com/romanz/trezor-agent/issues/new) if something doesn't
|
||||
work well for you. If possible:
|
||||
|
||||
* record the session (e.g. using [asciinema](https://asciinema.org))
|
||||
* collect the agent log by setting `TREZOR_AGE_PLUGIN_LOG` environment variable
|
||||
|
||||
Thanks!
|
||||
|
||||
## 1. Configuration
|
||||
|
||||
Install [age 1.1.0+](https://github.com/FiloSottile/age/releases/tag/v1.1.0) or [rage 0.9.0+](https://github.com/str4d/rage/releases/tag/v0.9.0).
|
||||
|
||||
Generate an identity by running:
|
||||
|
||||
```
|
||||
$ age-plugin-trezor -i "John Doe" | tee identity
|
||||
# recipient: age1wpl78afms4x36mucnd4j65sanrtj9873up47qq39h68q0aw7n4xqdlw6tk
|
||||
# SLIP-0017: John Doe
|
||||
AGE-PLUGIN-TREZOR-1FFHKSM3QG3HK2PEPD5D
|
||||
```
|
||||
|
||||
## 2. Usage
|
||||
|
||||
### Encrypt
|
||||
|
||||
Use the recipient ID from above (see [age](https://github.com/FiloSottile/age#usage)/[rage](https://github.com/str4d/rage#usage) instructions):
|
||||
```
|
||||
$ date | age -r age1wpl78afms4x36mucnd4j65sanrtj9873up47qq39h68q0aw7n4xqdlw6tk > secret.age
|
||||
```
|
||||
|
||||
### Decrypt
|
||||
|
||||
Make sure `age-plugin-trezor` is installed and available (it will be invoked by `age` for decryption):
|
||||
|
||||
```
|
||||
$ age -d -i identity < secret.age
|
||||
Mon 26 Dec 2022 21:10:26 IST
|
||||
```
|
||||
|
||||
### Manage passwords with `passage`
|
||||
|
||||
First install `passage` from https://github.com/FiloSottile/passage and initialize it to use your hardware-based identity:
|
||||
```
|
||||
$ mkdir -p ~/.passage/store
|
||||
|
||||
$ age-plugin-trezor -i "John Doe" | tee ~/.passage/identities
|
||||
# recipient: age1wpl78afms4x36mucnd4j65sanrtj9873up47qq39h68q0aw7n4xqdlw6tk
|
||||
# SLIP-0017: John Doe
|
||||
AGE-PLUGIN-TREZOR-1FFHKSM3QG3HK2PEPD5D
|
||||
|
||||
$ awk '/# recipient:/ {print $3}' ~/.passage/identities | tee -a ~/.passage/store/.age-recipients
|
||||
age1wpl78afms4x36mucnd4j65sanrtj9873up47qq39h68q0aw7n4xqdlw6tk
|
||||
```
|
||||
|
||||
```
|
||||
$ passage generate Dev/github 32
|
||||
$ passage generate Social/hackernews 32
|
||||
$ passage generate Social/twitter 32
|
||||
$ passage generate VPS/linode 32
|
||||
$ passage
|
||||
Passage
|
||||
├── Dev
|
||||
│ └── github
|
||||
├── Social
|
||||
│ ├── hackernews
|
||||
│ └── twitter
|
||||
└── VPS
|
||||
└── linode
|
||||
```
|
||||
In order to paste them into the browser, you'd need to decrypt the password using your hardware device:
|
||||
```
|
||||
$ passage --clip VPS/linode
|
||||
Copied VPS/linode to clipboard. Will clear in 45 seconds.
|
||||
```
|
||||
184
libagent/age/__init__.py
Normal file
184
libagent/age/__init__.py
Normal file
@@ -0,0 +1,184 @@
|
||||
"""
|
||||
TREZOR support for AGE format.
|
||||
|
||||
See these links for more details:
|
||||
- https://age-encryption.org/v1
|
||||
- https://github.com/FiloSottile/age
|
||||
- https://github.com/str4d/rage/
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import base64
|
||||
import contextlib
|
||||
import datetime
|
||||
import io
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import traceback
|
||||
|
||||
import bech32
|
||||
import pkg_resources
|
||||
import semver
|
||||
from cryptography.exceptions import InvalidTag
|
||||
from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305
|
||||
|
||||
from .. import device, server, util
|
||||
from . import client
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def bech32_decode(prefix, encoded):
|
||||
"""Decode Bech32-encoded data."""
|
||||
hrp, data = bech32.bech32_decode(encoded)
|
||||
assert prefix == hrp
|
||||
return bytes(bech32.convertbits(data, 5, 8, pad=False))
|
||||
|
||||
|
||||
def bech32_encode(prefix, data):
|
||||
"""Encode data using Bech32."""
|
||||
return bech32.bech32_encode(prefix, bech32.convertbits(bytes(data), 8, 5))
|
||||
|
||||
|
||||
def run_pubkey(device_type, args):
|
||||
"""Initialize hardware-based GnuPG identity."""
|
||||
log.warning('This AGE tool is still in EXPERIMENTAL mode, '
|
||||
'so please note that the API and features may '
|
||||
'change without backwards compatibility!')
|
||||
|
||||
c = client.Client(device=device_type())
|
||||
pubkey = c.pubkey(identity=client.create_identity(args.identity), ecdh=True)
|
||||
recipient = bech32_encode(prefix="age", data=pubkey)
|
||||
print(f"# recipient: {recipient}")
|
||||
print(f"# SLIP-0017: {args.identity}")
|
||||
data = args.identity.encode()
|
||||
encoded = bech32_encode(prefix="age-plugin-trezor-", data=data).upper()
|
||||
decoded = bech32_decode(prefix="age-plugin-trezor-", encoded=encoded)
|
||||
assert decoded.startswith(data)
|
||||
print(encoded)
|
||||
|
||||
|
||||
def base64_decode(encoded: str) -> bytes:
|
||||
"""Decode Base64-encoded data (after padding correctly with '=')."""
|
||||
k = len(encoded) % 4
|
||||
pad = (4 - k) if k else 0
|
||||
return base64.b64decode(encoded + ("=" * pad))
|
||||
|
||||
|
||||
# https://github.com/FiloSottile/age/blob/v1.1.0-rc.1/internal/format/format.go#L45
|
||||
BYTES_PER_LINE = 48
|
||||
|
||||
|
||||
def base64_encode(data: bytes) -> str:
|
||||
"""Encode data using Base64 (and remove '=')."""
|
||||
reader = io.BytesIO(data)
|
||||
chunks = map(base64.b64encode, iter(lambda: reader.read(BYTES_PER_LINE), b""))
|
||||
chunks = (chunk.replace(b"=", b"") for chunk in chunks)
|
||||
return b"\n".join(chunks).decode()
|
||||
|
||||
|
||||
def decrypt(key, encrypted):
|
||||
"""Decrypt age-encrypted data."""
|
||||
cipher = ChaCha20Poly1305(key)
|
||||
try:
|
||||
return cipher.decrypt(
|
||||
nonce=(b"\x00" * 12),
|
||||
data=encrypted,
|
||||
associated_data=None)
|
||||
except InvalidTag:
|
||||
return None
|
||||
|
||||
|
||||
def run_decrypt(device_type, args):
|
||||
"""Unlock hardware device (for future interaction)."""
|
||||
# pylint: disable=too-many-locals
|
||||
c = client.Client(device=device_type())
|
||||
|
||||
lines = (line.strip() for line in sys.stdin) # strip whitespace
|
||||
lines = (line for line in lines if line) # skip empty lines
|
||||
|
||||
identities = []
|
||||
stanza_map = {}
|
||||
|
||||
for line in lines:
|
||||
log.debug("got %r", line)
|
||||
if line == "-> done":
|
||||
break
|
||||
|
||||
if line.startswith("-> add-identity "):
|
||||
encoded = line.split(" ")[-1].lower()
|
||||
data = bech32_decode("age-plugin-trezor-", encoded)
|
||||
identity = client.create_identity(data.decode())
|
||||
identities.append(identity)
|
||||
|
||||
elif line.startswith("-> recipient-stanza "):
|
||||
file_index, tag, *args = line.split(" ")[2:]
|
||||
body = next(lines)
|
||||
if tag != "X25519":
|
||||
continue
|
||||
|
||||
peer_pubkey = base64_decode(args[0])
|
||||
encrypted = base64_decode(body)
|
||||
stanza_map.setdefault(file_index, []).append((peer_pubkey, encrypted))
|
||||
|
||||
for file_index, stanzas in stanza_map.items():
|
||||
_handle_single_file(file_index, stanzas, identities, c)
|
||||
|
||||
sys.stdout.write('-> done\n\n')
|
||||
sys.stdout.flush()
|
||||
sys.stdout.close()
|
||||
|
||||
|
||||
def _handle_single_file(file_index, stanzas, identities, c):
|
||||
d = c.device.__class__.__name__
|
||||
for peer_pubkey, encrypted in stanzas:
|
||||
for identity in identities:
|
||||
id_str = identity.to_string()
|
||||
msg = base64_encode(f'Please confirm {id_str} decryption on {d} device...'.encode())
|
||||
sys.stdout.write(f'-> msg\n{msg}\n')
|
||||
sys.stdout.flush()
|
||||
|
||||
key = c.ecdh(identity=identity, peer_pubkey=peer_pubkey)
|
||||
result = decrypt(key=key, encrypted=encrypted)
|
||||
if not result:
|
||||
continue
|
||||
|
||||
sys.stdout.write(f'-> file-key {file_index}\n{base64_encode(result)}\n')
|
||||
sys.stdout.flush()
|
||||
return
|
||||
|
||||
|
||||
def main(device_type):
|
||||
"""Parse command-line arguments."""
|
||||
p = argparse.ArgumentParser()
|
||||
|
||||
agent_package = device_type.package_name()
|
||||
resources_map = {r.key: r for r in pkg_resources.require(agent_package)}
|
||||
resources = [resources_map[agent_package], resources_map['libagent']]
|
||||
versions = '\n'.join('{}={}'.format(r.key, r.version) for r in resources)
|
||||
p.add_argument('--version', help='print the version info',
|
||||
action='version', version=versions)
|
||||
|
||||
p.add_argument('-i', '--identity')
|
||||
p.add_argument('-v', '--verbose', default=0, action='count')
|
||||
p.add_argument('--age-plugin')
|
||||
|
||||
args = p.parse_args()
|
||||
|
||||
log_path = os.environ.get("TREZOR_AGE_PLUGIN_LOG")
|
||||
util.setup_logging(verbosity=args.verbose, filename=log_path)
|
||||
|
||||
log.debug("starting age plugin: %s", args)
|
||||
|
||||
device_type.ui = device.ui.UI(device_type=device_type, config=vars(args))
|
||||
|
||||
try:
|
||||
if args.identity:
|
||||
run_pubkey(device_type=device_type, args=args)
|
||||
elif args.age_plugin:
|
||||
run_decrypt(device_type=device_type, args=args)
|
||||
except Exception as e: # pylint: disable=broad-except
|
||||
log.exception("age plugin failed: %s", e)
|
||||
|
||||
log.debug("closing age plugin")
|
||||
48
libagent/age/client.py
Normal file
48
libagent/age/client.py
Normal file
@@ -0,0 +1,48 @@
|
||||
"""Device abstraction layer for AGE operations."""
|
||||
|
||||
import logging
|
||||
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
|
||||
|
||||
from ..device import interface
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def create_identity(user_id):
|
||||
"""Create AGE identity for hardware device."""
|
||||
result = interface.Identity(identity_str='age://', curve_name="ed25519")
|
||||
result.identity_dict['host'] = user_id
|
||||
return result
|
||||
|
||||
|
||||
class Client:
|
||||
"""Sign messages and get public keys from a hardware device."""
|
||||
|
||||
def __init__(self, device):
|
||||
"""C-tor."""
|
||||
self.device = device
|
||||
|
||||
def pubkey(self, identity, ecdh=False):
|
||||
"""Return public key as VerifyingKey object."""
|
||||
with self.device:
|
||||
pubkey = bytes(self.device.pubkey(ecdh=ecdh, identity=identity))
|
||||
assert len(pubkey) == 32
|
||||
return pubkey
|
||||
|
||||
def ecdh(self, identity, peer_pubkey):
|
||||
"""Derive shared secret using ECDH from peer public key."""
|
||||
log.info('please confirm AGE decryption on %s for "%s"...',
|
||||
self.device, identity.to_string())
|
||||
with self.device:
|
||||
assert len(peer_pubkey) == 32
|
||||
result, self_pubkey = self.device.ecdh_with_pubkey(
|
||||
pubkey=(b"\x40" + peer_pubkey), identity=identity)
|
||||
assert result[:1] == b"\x04"
|
||||
hkdf = HKDF(
|
||||
algorithm=hashes.SHA256(),
|
||||
length=32,
|
||||
salt=((peer_pubkey + self_pubkey)),
|
||||
info=b"age-encryption.org/v1/X25519")
|
||||
return hkdf.derive(result[1:])
|
||||
@@ -5,8 +5,8 @@ import logging
|
||||
|
||||
import ecdsa
|
||||
|
||||
from . import interface
|
||||
from .. import formats
|
||||
from . import interface
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@@ -39,13 +39,17 @@ class FakeDevice(interface.Device):
|
||||
self.vk = self.sk.get_verifying_key()
|
||||
return self
|
||||
|
||||
def close(self):
|
||||
"""Close the device."""
|
||||
|
||||
def pubkey(self, identity, ecdh=False):
|
||||
"""Return public key."""
|
||||
_verify_support(identity)
|
||||
data = self.vk.to_string()
|
||||
x, y = data[:32], data[32:]
|
||||
prefix = bytearray([2 + (bytearray(y)[0] & 1)])
|
||||
return bytes(prefix) + x
|
||||
pubkey = bytes(prefix) + x
|
||||
return formats.decompress_pubkey(pubkey=pubkey, curve_name=identity.curve_name)
|
||||
|
||||
def sign(self, identity, blob):
|
||||
"""Sign given blob and return the signature (as bytes)."""
|
||||
|
||||
@@ -79,7 +79,7 @@ class Identity:
|
||||
|
||||
def to_string(self):
|
||||
"""Return identity serialized to string."""
|
||||
return u'<{}|{}>'.format(identity_to_string(self.identity_dict), self.curve_name)
|
||||
return '<{}|{}>'.format(identity_to_string(self.identity_dict), self.curve_name)
|
||||
|
||||
def get_bip32_address(self, ecdh=False):
|
||||
"""Compute BIP32 derivation address according to SLIP-0013/0017."""
|
||||
|
||||
143
libagent/device/jade.py
Normal file
143
libagent/device/jade.py
Normal file
@@ -0,0 +1,143 @@
|
||||
"""Jade-related code (see https://blockstream.com/jade/)."""
|
||||
|
||||
import logging
|
||||
|
||||
import ecdsa
|
||||
import semver
|
||||
|
||||
from .. import formats, util
|
||||
from . import interface
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _verify_support(identity, ecdh):
|
||||
"""Make sure the device supports given configuration."""
|
||||
if identity.get_curve_name(ecdh=ecdh) != formats.CURVE_NIST256:
|
||||
raise NotImplementedError(
|
||||
'Unsupported elliptic curve: {}'.format(identity.curve_name))
|
||||
|
||||
|
||||
class BlockstreamJade(interface.Device):
|
||||
"""Connection to Blockstream Jade device."""
|
||||
|
||||
MIN_SUPPORTED_FW_VERSION = semver.VersionInfo(0, 1, 33)
|
||||
DEVICE_IDS = [(0x10c4, 0xea60), (0x1a86, 0x55d4)]
|
||||
connection = None
|
||||
|
||||
@classmethod
|
||||
def package_name(cls):
|
||||
"""Python package name (at PyPI)."""
|
||||
return 'jade-agent'
|
||||
|
||||
def connect(self):
|
||||
"""Connect to the first matching device."""
|
||||
# pylint: disable=import-error
|
||||
from jadepy import JadeAPI
|
||||
from serial.tools import list_ports
|
||||
|
||||
# Return the existing connection if we have one
|
||||
if BlockstreamJade.connection is not None:
|
||||
return BlockstreamJade.connection
|
||||
|
||||
# Jade is a serial (over usb) device, it shows as a serial/com port device.
|
||||
# Scan com ports looking for the relevant vid and pid, and connect to the
|
||||
# first matching device. Then call 'auth_user' - this usually requires network
|
||||
# access in order to unlock the device with a PIN and the remote blind pinserver.
|
||||
for devinfo in list_ports.comports():
|
||||
device_product_key = (devinfo.vid, devinfo.pid)
|
||||
if device_product_key in self.DEVICE_IDS:
|
||||
try:
|
||||
jade = JadeAPI.create_serial(devinfo.device)
|
||||
|
||||
# Monkey-patch a no-op 'close()' method to suppress logged errors
|
||||
jade.close = lambda: log.debug("Close called")
|
||||
|
||||
# Connect and fetch version info
|
||||
jade.connect()
|
||||
verinfo = jade.get_version_info()
|
||||
|
||||
# Check minimum supported firmware version (ignore candidate/build parts)
|
||||
fwversion = semver.VersionInfo.parse(verinfo['JADE_VERSION'])
|
||||
if self.MIN_SUPPORTED_FW_VERSION > fwversion.finalize_version():
|
||||
msg = ('Outdated {} firmware for device. Please update using'
|
||||
' a Blockstream Green companion app')
|
||||
raise ValueError(msg.format(fwversion))
|
||||
|
||||
# Authenticate the user (unlock with pin)
|
||||
# NOTE: usually requires network access unless already unlocked
|
||||
# (or temporary 'Emergency Restore' wallet is already in use).
|
||||
network = 'testnet' if verinfo.get('JADE_NETWORKS') == 'TEST' else 'mainnet'
|
||||
while not jade.auth_user(network):
|
||||
log.warning("PIN incorrect, please try again")
|
||||
|
||||
# Cache the connection to jade
|
||||
BlockstreamJade.connection = jade
|
||||
return jade
|
||||
except Exception as e:
|
||||
raise interface.NotFoundError(
|
||||
'{} not connected: "{}"'.format(self, e))
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _get_identity_string(identity):
|
||||
return interface.identity_to_string(identity.identity_dict)
|
||||
|
||||
@staticmethod
|
||||
def _load_uncompressed_pubkey(pubkey, curve_name):
|
||||
assert curve_name == formats.CURVE_NIST256
|
||||
assert len(pubkey) == 65 and pubkey[0] == 0x04
|
||||
curve = ecdsa.NIST256p
|
||||
point = ecdsa.ellipticcurve.Point(curve.curve,
|
||||
util.bytes2num(pubkey[1:33]),
|
||||
util.bytes2num(pubkey[33:65]))
|
||||
return ecdsa.VerifyingKey.from_public_point(point, curve=curve,
|
||||
hashfunc=formats.hashfunc)
|
||||
|
||||
def pubkey(self, identity, ecdh=False):
|
||||
"""Get PublicKey object for specified BIP32 address and elliptic curve."""
|
||||
_verify_support(identity, ecdh)
|
||||
identity_string = self._get_identity_string(identity)
|
||||
curve_name = identity.get_curve_name(ecdh=ecdh)
|
||||
key_type = 'slip-0017' if ecdh else 'slip-0013'
|
||||
|
||||
log.debug('"%s" getting %s public key (%s) from %s',
|
||||
identity_string, key_type, curve_name, self)
|
||||
result = self.conn.get_identity_pubkey(identity_string, curve_name, key_type)
|
||||
log.debug('result: %s', result)
|
||||
|
||||
assert len(result) == 33 or len(result) == 65
|
||||
convert_pubkey = (formats.decompress_pubkey
|
||||
if len(result) == 33 else
|
||||
self._load_uncompressed_pubkey)
|
||||
return convert_pubkey(pubkey=result, curve_name=curve_name)
|
||||
|
||||
def sign(self, identity, blob):
|
||||
"""Sign given blob and return the signature (as bytes)."""
|
||||
_verify_support(identity, ecdh=False)
|
||||
identity_string = self._get_identity_string(identity)
|
||||
curve_name = identity.get_curve_name(ecdh=False)
|
||||
|
||||
log.debug('"%s" signing %r (%s) on %s',
|
||||
identity_string, blob, curve_name, self)
|
||||
result = self.conn.sign_identity(identity_string, curve_name, blob)
|
||||
log.debug('result: %s', result)
|
||||
|
||||
signature = result['signature']
|
||||
assert len(signature) == 64 or (len(signature) == 65 and signature[0] == 0x00)
|
||||
if len(signature) == 65:
|
||||
signature = signature[1:]
|
||||
return signature
|
||||
|
||||
def ecdh(self, identity, pubkey):
|
||||
"""Get shared session key using Elliptic Curve Diffie-Hellman."""
|
||||
_verify_support(identity, ecdh=True)
|
||||
identity_string = self._get_identity_string(identity)
|
||||
curve_name = identity.get_curve_name(ecdh=True)
|
||||
|
||||
log.debug('"%s" shared session key (%s) for %r from %s',
|
||||
identity_string, curve_name, pubkey, self)
|
||||
result = self.conn.get_identity_shared_key(identity_string, curve_name, pubkey)
|
||||
log.debug('result: %s', result)
|
||||
|
||||
return result
|
||||
@@ -1,7 +1,7 @@
|
||||
"""KeepKey-related code (see https://www.keepkey.com/)."""
|
||||
|
||||
from . import trezor
|
||||
from .. import formats
|
||||
from . import trezor
|
||||
|
||||
|
||||
def _verify_support(identity, ecdh):
|
||||
|
||||
@@ -2,8 +2,9 @@
|
||||
|
||||
# pylint: disable=unused-import,import-error
|
||||
|
||||
from keepkeylib.client import CallException, PinException
|
||||
from keepkeylib.client import CallException
|
||||
from keepkeylib.client import KeepKeyClient as Client
|
||||
from keepkeylib.client import PinException
|
||||
from keepkeylib.messages_pb2 import PassphraseAck, PinMatrixAck
|
||||
from keepkeylib.transport_hid import HidTransport
|
||||
from keepkeylib.transport_webusb import WebUsbTransport
|
||||
|
||||
@@ -6,6 +6,7 @@ import struct
|
||||
|
||||
from ledgerblue import comm # pylint: disable=import-error
|
||||
|
||||
from .. import formats
|
||||
from . import interface
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@@ -36,6 +37,24 @@ def _convert_public_key(ecdsa_curve_name, result):
|
||||
class LedgerNanoS(interface.Device):
|
||||
"""Connection to Ledger Nano S device."""
|
||||
|
||||
LEDGER_APP_NAME = "SSH/PGP Agent"
|
||||
ledger_app_version = None
|
||||
ledger_app_supports_end_of_frame_byte = True
|
||||
|
||||
def get_app_name_and_version(self, dongle):
|
||||
"""Retrieve currently running Ledger application name and its version string."""
|
||||
device_version_answer = dongle.exchange(binascii.unhexlify('B001000000'))
|
||||
offset = 1
|
||||
app_name_length = struct.unpack_from("B", device_version_answer, offset)[0]
|
||||
offset += 1
|
||||
app_name = device_version_answer[offset: offset + app_name_length]
|
||||
offset += app_name_length
|
||||
app_version_length = struct.unpack_from("B", device_version_answer, offset)[0]
|
||||
offset += 1
|
||||
app_version = device_version_answer[offset: offset + app_version_length]
|
||||
log.debug("running app %s, version %s", app_name, app_version)
|
||||
return (app_name.decode(), app_version.decode())
|
||||
|
||||
@classmethod
|
||||
def package_name(cls):
|
||||
"""Python package name (at PyPI)."""
|
||||
@@ -44,10 +63,21 @@ class LedgerNanoS(interface.Device):
|
||||
def connect(self):
|
||||
"""Enumerate and connect to the first USB HID interface."""
|
||||
try:
|
||||
return comm.getDongle()
|
||||
dongle = comm.getDongle(debug=True)
|
||||
(app_name, self.ledger_app_version) = self.get_app_name_and_version(dongle)
|
||||
|
||||
version_parts = self.ledger_app_version.split(".")
|
||||
if (version_parts[0] == "0" and version_parts[1] == "0" and int(version_parts[2]) <= 7):
|
||||
self.ledger_app_supports_end_of_frame_byte = False
|
||||
|
||||
if app_name != LedgerNanoS.LEDGER_APP_NAME:
|
||||
# we could launch the app here if we are in the dashboard
|
||||
raise interface.DeviceError(f'{self} is not running {LedgerNanoS.LEDGER_APP_NAME}')
|
||||
|
||||
return dongle
|
||||
except comm.CommException as e:
|
||||
raise interface.NotFoundError(
|
||||
'{} not connected: "{}"'.format(self, e))
|
||||
raise interface.DeviceError(
|
||||
'Error ({}) communicating with {}'.format(e, self))
|
||||
|
||||
def pubkey(self, identity, ecdh=False):
|
||||
"""Get PublicKey object for specified BIP32 address and elliptic curve."""
|
||||
@@ -64,28 +94,50 @@ class LedgerNanoS(interface.Device):
|
||||
log.debug('apdu: %r', apdu)
|
||||
result = bytearray(self.conn.exchange(bytes(apdu)))
|
||||
log.debug('result: %r', result)
|
||||
return _convert_public_key(curve_name, result[1:])
|
||||
return formats.decompress_pubkey(
|
||||
pubkey=_convert_public_key(curve_name, result[1:]),
|
||||
curve_name=identity.curve_name)
|
||||
|
||||
def sign(self, identity, blob):
|
||||
"""Sign given blob and return the signature (as bytes)."""
|
||||
# pylint: disable=too-many-locals,too-many-branches
|
||||
path = _expand_path(identity.get_bip32_address(ecdh=False))
|
||||
if identity.identity_dict['proto'] == 'ssh':
|
||||
ins = '04'
|
||||
p1 = '00'
|
||||
else:
|
||||
ins = '08'
|
||||
p1 = '00'
|
||||
if identity.curve_name == 'nist256p1':
|
||||
p2 = '81' if identity.identity_dict['proto'] == 'ssh' else '01'
|
||||
else:
|
||||
p2 = '82' if identity.identity_dict['proto'] == 'ssh' else '02'
|
||||
apdu = '80' + ins + p1 + p2
|
||||
apdu = binascii.unhexlify(apdu)
|
||||
apdu += bytearray([len(blob) + len(path) + 1])
|
||||
apdu += bytearray([len(path) // 4]) + path
|
||||
apdu += blob
|
||||
log.debug('apdu: %r', apdu)
|
||||
result = bytearray(self.conn.exchange(bytes(apdu)))
|
||||
offset = 0
|
||||
result = None
|
||||
while offset != len(blob):
|
||||
data = bytes()
|
||||
if offset == 0:
|
||||
data += bytearray([len(path) // 4]) + path
|
||||
chunk_size = min(len(blob) - offset, 255 - len(data))
|
||||
data += blob[offset:offset + chunk_size]
|
||||
|
||||
if identity.identity_dict['proto'] == 'ssh':
|
||||
ins = '04'
|
||||
else:
|
||||
ins = '08'
|
||||
|
||||
if identity.curve_name == 'nist256p1':
|
||||
p2 = '81' if identity.identity_dict['proto'] == 'ssh' else '01'
|
||||
else:
|
||||
p2 = '82' if identity.identity_dict['proto'] == 'ssh' else '02'
|
||||
|
||||
if offset + chunk_size == len(blob) and self.ledger_app_supports_end_of_frame_byte:
|
||||
# mark that we are at the end of the frame
|
||||
p1 = "80" if offset == 0 else "81"
|
||||
else:
|
||||
p1 = "00" if offset == 0 else "01"
|
||||
|
||||
apdu = binascii.unhexlify('80' + ins + p1 + p2) + len(data).to_bytes(1, 'little') + data
|
||||
|
||||
log.debug('apdu: %r', apdu)
|
||||
try:
|
||||
result = bytearray(self.conn.exchange(bytes(apdu)))
|
||||
except comm.CommException as e:
|
||||
raise interface.DeviceError(
|
||||
'Error ({}) communicating with {}'.format(e, self))
|
||||
|
||||
offset += chunk_size
|
||||
|
||||
log.debug('result: %r', result)
|
||||
if identity.curve_name == 'nist256p1':
|
||||
offset = 3
|
||||
@@ -116,7 +168,11 @@ class LedgerNanoS(interface.Device):
|
||||
apdu += bytearray([len(path) // 4]) + path
|
||||
apdu += pubkey
|
||||
log.debug('apdu: %r', apdu)
|
||||
result = bytearray(self.conn.exchange(bytes(apdu)))
|
||||
try:
|
||||
result = bytearray(self.conn.exchange(bytes(apdu)))
|
||||
except comm.CommException as e:
|
||||
raise interface.DeviceError(
|
||||
'Error ({}) communicating with {}'.format(e, self))
|
||||
log.debug('result: %r', result)
|
||||
assert result[0] == 0x04
|
||||
return bytes(result)
|
||||
|
||||
377
libagent/device/onlykey.py
Normal file
377
libagent/device/onlykey.py
Normal file
@@ -0,0 +1,377 @@
|
||||
# pylint: disable=too-many-locals,too-many-statements,too-many-branches
|
||||
# pylint: disable=attribute-defined-outside-init
|
||||
"""OnlyKey-related code (see https://www.onlykey.io/)."""
|
||||
|
||||
import codecs
|
||||
import hashlib
|
||||
import logging
|
||||
import time
|
||||
|
||||
import ecdsa
|
||||
import nacl.signing
|
||||
import unidecode
|
||||
|
||||
from . import interface
|
||||
|
||||
# import pgpy
|
||||
# from pgpy import PGPKey
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class OnlyKey(interface.Device):
|
||||
"""Connection to OnlyKey device."""
|
||||
|
||||
@classmethod
|
||||
def package_name(cls):
|
||||
"""Python package name (at PyPI)."""
|
||||
return 'onlykey-agent'
|
||||
|
||||
@property
|
||||
def _defs(self):
|
||||
from . import onlykey_defs
|
||||
return onlykey_defs
|
||||
|
||||
def connect(self):
|
||||
"""Enumerate and connect to the first USB HID interface."""
|
||||
try:
|
||||
self.device_name = 'OnlyKey'
|
||||
self.ok = self._defs.OnlyKey()
|
||||
self.ok.set_time(time.time())
|
||||
self.okversion = self.ok.read_string(timeout_ms=500)
|
||||
self.okversion = self.okversion[8:]
|
||||
self.skeyslot = 132
|
||||
self.dkeyslot = 132
|
||||
except Exception as e:
|
||||
raise interface.NotFoundError('{} not connected: "{}"') from e
|
||||
|
||||
def set_skey(self, skey):
|
||||
"""Set signing key to use."""
|
||||
self.skeyslot = skey
|
||||
log.debug('Setting skey slot = %s', skey)
|
||||
|
||||
def set_dkey(self, dkey):
|
||||
"""Set decryption key to use."""
|
||||
self.dkeyslot = dkey
|
||||
log.debug('Setting dkey slot = %s', dkey)
|
||||
|
||||
def import_pub(self, pubkey):
|
||||
"""Import PGP public key."""
|
||||
self.import_pubkey = pubkey
|
||||
log.debug('Public key to import = %s', pubkey)
|
||||
# self.import_pubkey_obj, _ = pgpy.PGPKey.from_blob(pubkey)
|
||||
# self.import_pubkey_bytes = bytes(self.import_pubkey_obj)
|
||||
|
||||
def get_sk_dk(self):
|
||||
"""Get default signing key and decryption key slots."""
|
||||
self.set_skey(132)
|
||||
self.set_dkey(132)
|
||||
|
||||
def sig_hash(self, sighash):
|
||||
"""Set signature hashing algorithm to use."""
|
||||
if sighash in (b'rsa-sha2-512', b'rsa-sha2-256'):
|
||||
self.sighash = sighash
|
||||
log.info('Setting RSA signature Hash Type =%s', sighash)
|
||||
|
||||
def close(self):
|
||||
"""Close connection."""
|
||||
log.info('disconnected from %s', self.device_name)
|
||||
self.ok.close()
|
||||
|
||||
def pubkey(self, identity, ecdh=False):
|
||||
"""Return public key."""
|
||||
curve_name = identity.get_curve_name(ecdh=ecdh)
|
||||
if identity.identity_dict['proto'] != 'ssh' and hasattr('self', 'skeyslot') is False:
|
||||
self.get_sk_dk()
|
||||
if identity.identity_dict['proto'] != 'ssh' and self.dkeyslot < 132 and ecdh is True:
|
||||
this_slot_id = self.dkeyslot
|
||||
log.info('Key Slot =%s', this_slot_id)
|
||||
elif self.skeyslot < 132 and ecdh is False:
|
||||
this_slot_id = self.skeyslot
|
||||
log.info('Key Slot =%s', this_slot_id)
|
||||
else:
|
||||
this_slot_id = 132
|
||||
|
||||
log.info('Requesting public key from key slot =%s', this_slot_id)
|
||||
|
||||
log.debug('"%s" getting public key (%s) from %s',
|
||||
identity.to_string(), curve_name, self)
|
||||
|
||||
# Calculate hash for key derivation input data
|
||||
if identity.identity_dict['proto'] == 'ssh':
|
||||
if identity.identity_dict.get('user'):
|
||||
id_parts = unidecode.unidecode(identity.identity_dict['user'] + '@' +
|
||||
identity.identity_dict['host']).encode('ascii')
|
||||
else:
|
||||
id_parts = unidecode.unidecode(identity.identity_dict['host']).encode('ascii')
|
||||
else:
|
||||
id_parts = identity.to_bytes()
|
||||
log.info('Identity to hash =%s', id_parts)
|
||||
h1 = hashlib.sha256()
|
||||
h1.update(id_parts)
|
||||
data = h1.hexdigest()
|
||||
log.info('Identity hash =%s', data)
|
||||
|
||||
if this_slot_id > 100:
|
||||
if curve_name == 'curve25519':
|
||||
data = '04' + data
|
||||
elif curve_name == 'secp256k1':
|
||||
# Not currently supported by agent, for future use
|
||||
data = '03' + data
|
||||
elif curve_name == 'nist256p1':
|
||||
data = '02' + data
|
||||
elif curve_name == 'ed25519':
|
||||
data = '01' + data
|
||||
else:
|
||||
data = '00' + data
|
||||
|
||||
self.ok.send_message(msg=self._defs.Message.OKGETPUBKEY, slot_id=this_slot_id, payload=data)
|
||||
log.info('curve name= %s', repr(curve_name))
|
||||
t_end = time.time() + 1.5
|
||||
if curve_name != 'rsa':
|
||||
while time.time() < t_end:
|
||||
try:
|
||||
ok_pubkey = self.ok.read_bytes(timeout_ms=100)
|
||||
if len(ok_pubkey) == 64 and len(set(ok_pubkey[0:63])) != 1:
|
||||
break
|
||||
except Exception as e:
|
||||
raise interface.DeviceError(e)
|
||||
|
||||
log.info('received= %s', repr(ok_pubkey))
|
||||
if len(set(ok_pubkey[34:63])) == 1:
|
||||
if curve_name in ('nist256p1', 'secp256k1'):
|
||||
raise interface.DeviceError("Public key curve does not match requested type")
|
||||
ok_pubkey = bytearray(ok_pubkey[0:32])
|
||||
log.info('Received Public Key generated by OnlyKey= %s', repr(ok_pubkey.hex()))
|
||||
vk = nacl.signing.VerifyKey(bytes(ok_pubkey),
|
||||
encoder=nacl.encoding.RawEncoder)
|
||||
log.info('vk= %s', repr(vk))
|
||||
# time.sleep(3)
|
||||
return vk
|
||||
elif len(ok_pubkey) == 64:
|
||||
ok_pubkey = bytearray(ok_pubkey[0:64])
|
||||
if curve_name in ('ed25519', 'curve25519'):
|
||||
raise interface.DeviceError("Public key curve does not match requested type")
|
||||
log.info('Received Public Key generated by OnlyKey= %s', repr(ok_pubkey))
|
||||
if identity.curve_name == 'nist256p1':
|
||||
vk = ecdsa.VerifyingKey.from_string(ok_pubkey, curve=ecdsa.NIST256p)
|
||||
else:
|
||||
vk = ecdsa.VerifyingKey.from_string(ok_pubkey, curve=ecdsa.SECP256k1)
|
||||
return vk
|
||||
else:
|
||||
ok_pubkey = []
|
||||
while time.time() < t_end:
|
||||
try:
|
||||
ok_pub_part = self.ok.read_bytes(timeout_ms=100)
|
||||
if len(ok_pub_part) == 64 and len(set(ok_pub_part[0:63])) != 1:
|
||||
log.info('received part= %s', repr(ok_pub_part))
|
||||
ok_pubkey += ok_pub_part
|
||||
# Todo know RSA type to know how many packets
|
||||
except Exception as e:
|
||||
raise interface.DeviceError(e)
|
||||
|
||||
log.info('received= %s', repr(ok_pubkey))
|
||||
if len(ok_pubkey) == 256:
|
||||
# https://security.stackexchange.com/questions/42268/how-do-i-get-the-rsa-bit-length-with-the-pubkey-and-openssl
|
||||
ok_pubkey = b'\x00\x00\x00\x07' + b'\x73\x73\x68\x2d\x72\x73\x61' + \
|
||||
b'\x00\x00\x00\x03' + b'\x01\x00\x01' + \
|
||||
b'\x00\x00\x01\x01' + b'\x00' + bytes(ok_pubkey)
|
||||
# ok_pubkey = b'\x00\x00\x00\x07' + b'\x72\x73\x61\x2d\x73\x68\x61\x32\x2d\x32\x35\x
|
||||
# 36' + b'\x00\x00\x00\x03' + b'\x01\x00\x01' + b'\x00\x00\x01\x01' + b'\x00' + byte
|
||||
# s(ok_pubkey)
|
||||
elif len(ok_pubkey) == 512:
|
||||
ok_pubkey = b'\x00\x00\x00\x07' + b'\x73\x73\x68\x2d\x72\x73\x61' + \
|
||||
b'\x00\x00\x00\x03' + b'\x01\x00\x01' + \
|
||||
b'\x00\x00\x02\x01' + b'\x00' + bytes(ok_pubkey)
|
||||
else:
|
||||
raise interface.DeviceError("Error response length is not a valid public key")
|
||||
log.info('pubkey len = %s', len(ok_pubkey))
|
||||
return ok_pubkey
|
||||
|
||||
def sign(self, identity, blob):
|
||||
"""Sign given blob and return the signature (as bytes)."""
|
||||
curve_name = identity.get_curve_name(ecdh=False)
|
||||
log.debug('"%s" signing %r (%s) on %s',
|
||||
identity.to_string(), blob, curve_name, self)
|
||||
if identity.identity_dict['proto'] != 'ssh' and hasattr('self', 'skeyslot') is False:
|
||||
self.get_sk_dk()
|
||||
# Calculate hash for SSH signing
|
||||
if curve_name == 'rsa':
|
||||
if self.sighash == b'rsa-sha2-512':
|
||||
log.info('rsa-sha2-512')
|
||||
h1 = hashlib.sha512()
|
||||
h1.update(blob)
|
||||
data = h1.hexdigest()
|
||||
data = codecs.decode(data, 'hex_codec')
|
||||
elif self.sighash == b'rsa-sha2-256':
|
||||
log.info('rsa-sha2-256')
|
||||
h1 = hashlib.sha256()
|
||||
h1.update(blob)
|
||||
data = h1.hexdigest()
|
||||
data = codecs.decode(data, 'hex_codec')
|
||||
else:
|
||||
# Calculate hash for key derivation input data
|
||||
h1 = hashlib.sha256()
|
||||
if identity.identity_dict['proto'] == 'ssh':
|
||||
if identity.identity_dict.get('user'):
|
||||
id_parts = unidecode.unidecode(identity.identity_dict['user'] + '@' +
|
||||
identity.identity_dict['host']).encode('ascii')
|
||||
else:
|
||||
id_parts = unidecode.unidecode(identity.identity_dict['host']).encode('ascii')
|
||||
else:
|
||||
id_parts = identity.to_bytes()
|
||||
h1.update(id_parts)
|
||||
data = h1.hexdigest()
|
||||
data = codecs.decode(data, 'hex_codec')
|
||||
log.info('Identity to hash =%s', id_parts)
|
||||
log.info('Identity hash =%s', data)
|
||||
# Determine type of key to derive on OnlyKey for signature
|
||||
# Slot 132 used for derived key, slots 101-116 used for stored ecc keys
|
||||
# slots 1-4 used for stored RSA keys
|
||||
if self.skeyslot == 132:
|
||||
if curve_name == 'ed25519':
|
||||
this_slot_id = 201
|
||||
log.info('Key type ed25519')
|
||||
elif curve_name == 'nist256p1':
|
||||
this_slot_id = 202
|
||||
log.info('Key type nistp256')
|
||||
else:
|
||||
this_slot_id = 203
|
||||
log.info('Key type secp256k1')
|
||||
# Send data and identity hash
|
||||
raw_message = blob + data
|
||||
else:
|
||||
this_slot_id = self.skeyslot
|
||||
# Send just data to sign
|
||||
raw_message = blob
|
||||
h2 = hashlib.sha256()
|
||||
h2.update(raw_message)
|
||||
d = h2.digest()
|
||||
assert len(d) == 32
|
||||
b1, b2, b3 = get_button(self, d[0]), get_button(self, d[15]), get_button(self, d[31])
|
||||
log.info('Key Slot =%s', this_slot_id)
|
||||
print('Enter the 3 digit challenge code on OnlyKey to authorize '+identity.to_string())
|
||||
print('{} {} {}'.format(b1, b2, b3))
|
||||
t_end = time.time() + 22
|
||||
if curve_name != 'rsa':
|
||||
self.ok.send_large_message2(msg=self._defs.Message.OKSIGN, payload=raw_message,
|
||||
slot_id=this_slot_id)
|
||||
while time.time() < t_end:
|
||||
try:
|
||||
result = self.ok.read_bytes(timeout_ms=100)
|
||||
if len(result) == 64 and len(set(result[0:63])) != 1:
|
||||
break
|
||||
except Exception as e:
|
||||
raise interface.DeviceError(e)
|
||||
|
||||
if len(result) >= 60:
|
||||
log.info('received= %s', repr(result))
|
||||
while len(result) < 64:
|
||||
result.append(0)
|
||||
log.info('disconnected from %s', self.device_name)
|
||||
self.ok.close()
|
||||
return bytes(result)
|
||||
else:
|
||||
self.ok.send_large_message2(msg=self._defs.Message.OKSIGN, payload=data,
|
||||
slot_id=this_slot_id)
|
||||
result = []
|
||||
while time.time() < t_end:
|
||||
try:
|
||||
sig_part = self.ok.read_bytes(timeout_ms=100)
|
||||
if len(sig_part) == 64 and len(set(sig_part[0:63])) != 1:
|
||||
log.info('received part= %s', repr(sig_part))
|
||||
result += sig_part
|
||||
t_end = time.time() + 1
|
||||
# Todo know RSA type to know how many packets
|
||||
except Exception as e:
|
||||
raise interface.DeviceError(e)
|
||||
|
||||
log.info('received= %s', repr(result))
|
||||
return bytes(result)
|
||||
raise interface.Error('failed to sign challenge')
|
||||
|
||||
def ecdh(self, identity, pubkey):
|
||||
"""Get shared session key using Elliptic Curve Diffie-Hellman."""
|
||||
curve_name = identity.get_curve_name(ecdh=True)
|
||||
log.debug('"%s" shared session key (%s) for %r from %s',
|
||||
identity.to_string(), curve_name, pubkey, self)
|
||||
# Calculate hash for key derivation input data
|
||||
h1 = hashlib.sha256()
|
||||
if identity.identity_dict['proto'] == 'ssh':
|
||||
if identity.identity_dict.get('user'):
|
||||
id_parts = unidecode.unidecode(identity.identity_dict['user'] + '@' +
|
||||
identity.identity_dict['host']).encode('ascii')
|
||||
else:
|
||||
id_parts = unidecode.unidecode(identity.identity_dict['host']).encode('ascii')
|
||||
else:
|
||||
id_parts = identity.to_bytes()
|
||||
h1.update(id_parts)
|
||||
log.info('Identity to hash =%s', id_parts)
|
||||
data = h1.hexdigest()
|
||||
log.info('Identity hash =%s', data)
|
||||
data = codecs.decode(data, 'hex_codec')
|
||||
# Determine type of key to derive on OnlyKey for ecdh
|
||||
# Slot 132 used for derived key, slots 101-116 used for stored ecc keys,
|
||||
# slots 1-4 used for stored RSA keys
|
||||
if self.dkeyslot == 132:
|
||||
if curve_name == 'curve25519':
|
||||
this_slot_id = 204
|
||||
log.info('Key type curve25519')
|
||||
elif curve_name == 'nist256p1':
|
||||
this_slot_id = 202
|
||||
log.info('Key type nistp256')
|
||||
else:
|
||||
this_slot_id = 203
|
||||
log.info('Key type secp256k1')
|
||||
raw_message = pubkey + data
|
||||
else:
|
||||
this_slot_id = self.dkeyslot
|
||||
raw_message = pubkey
|
||||
log.info('Key Slot =%s', this_slot_id)
|
||||
log.info('data hash =%s', data)
|
||||
h2 = hashlib.sha256()
|
||||
h2.update(raw_message)
|
||||
d = h2.digest()
|
||||
assert len(d) == 32
|
||||
b1, b2, b3 = get_button(self, d[0]), get_button(self, d[15]), get_button(self, d[31])
|
||||
self.ok.send_large_message2(msg=self._defs.Message.OKDECRYPT, payload=raw_message,
|
||||
slot_id=this_slot_id)
|
||||
print('Enter the 3 digit challenge code on OnlyKey to authorize ' + identity.to_string())
|
||||
print('{} {} {}'.format(b1, b2, b3))
|
||||
t_end = time.time() + 22
|
||||
if curve_name != 'rsa':
|
||||
while time.time() < t_end:
|
||||
try:
|
||||
result = self.ok.read_bytes(timeout_ms=100)
|
||||
if len(result) == 64 and len(set(result[0:63])) != 1:
|
||||
break
|
||||
except Exception as e:
|
||||
raise interface.DeviceError(e)
|
||||
if len(set(result[34:63])) == 1:
|
||||
result = b'\x04' + bytes(result[0:32])
|
||||
else:
|
||||
result = []
|
||||
while time.time() < t_end:
|
||||
try:
|
||||
dec_part = self.ok.read_bytes(timeout_ms=100)
|
||||
if len(dec_part) == 64 and len(set(dec_part[0:63])) != 1:
|
||||
log.info('received part= %s', repr(dec_part))
|
||||
result += dec_part
|
||||
t_end = time.time() + 1
|
||||
# Todo know RSA type to know how many packets
|
||||
except Exception as e:
|
||||
raise interface.DeviceError(e)
|
||||
|
||||
log.info('received= %s', repr(result))
|
||||
log.info('disconnected from %s', self.device_name)
|
||||
self.ok.close()
|
||||
return bytes(result)
|
||||
|
||||
|
||||
def get_button(self, byte):
|
||||
"""Return button number."""
|
||||
if str(self.okversion) == 'v0.2-beta.8c':
|
||||
return byte % 5 + 1
|
||||
else:
|
||||
return byte % 6 + 1
|
||||
5
libagent/device/onlykey_defs.py
Normal file
5
libagent/device/onlykey_defs.py
Normal file
@@ -0,0 +1,5 @@
|
||||
"""OnlyKey-related definitions."""
|
||||
|
||||
# pylint: disable=unused-import,import-error,no-name-in-module
|
||||
|
||||
from onlykey import Message, OnlyKey
|
||||
@@ -5,6 +5,7 @@ import logging
|
||||
|
||||
import semver
|
||||
|
||||
from .. import formats
|
||||
from . import interface
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@@ -70,6 +71,7 @@ class Trezor(interface.Device):
|
||||
log.exception('ping failed: %s', e)
|
||||
connection.close() # so the next HID open() will succeed
|
||||
raise
|
||||
return None
|
||||
|
||||
def close(self):
|
||||
"""Close connection."""
|
||||
@@ -87,7 +89,8 @@ class Trezor(interface.Device):
|
||||
n=addr,
|
||||
ecdsa_curve_name=curve_name)
|
||||
log.debug('result: %s', result)
|
||||
return bytes(result.node.public_key)
|
||||
pubkey = bytes(result.node.public_key)
|
||||
return formats.decompress_pubkey(pubkey=pubkey, curve_name=identity.curve_name)
|
||||
|
||||
def _identity_proto(self, identity):
|
||||
result = self._defs.IdentityType()
|
||||
@@ -96,6 +99,11 @@ class Trezor(interface.Device):
|
||||
return result
|
||||
|
||||
def sign(self, identity, blob):
|
||||
"""Sign given blob and return the signature (as bytes)."""
|
||||
sig, _ = self.sign_with_pubkey(identity, blob)
|
||||
return sig
|
||||
|
||||
def sign_with_pubkey(self, identity, blob):
|
||||
"""Sign given blob and return the signature (as bytes)."""
|
||||
curve_name = identity.get_curve_name(ecdh=False)
|
||||
log.debug('"%s" signing %r (%s) on %s',
|
||||
@@ -110,7 +118,7 @@ class Trezor(interface.Device):
|
||||
log.debug('result: %s', result)
|
||||
assert len(result.signature) == 65
|
||||
assert result.signature[:1] == b'\x00'
|
||||
return bytes(result.signature[1:])
|
||||
return bytes(result.signature[1:]), bytes(result.public_key)
|
||||
except self._defs.TrezorFailure as e:
|
||||
msg = '{} error: {}'.format(self, e)
|
||||
log.debug(msg, exc_info=True)
|
||||
@@ -118,6 +126,11 @@ class Trezor(interface.Device):
|
||||
|
||||
def ecdh(self, identity, pubkey):
|
||||
"""Get shared session key using Elliptic Curve Diffie-Hellman."""
|
||||
session_key, _ = self.ecdh_with_pubkey(identity, pubkey)
|
||||
return session_key
|
||||
|
||||
def ecdh_with_pubkey(self, identity, pubkey):
|
||||
"""Get shared session key using Elliptic Curve Diffie-Hellman & self public key."""
|
||||
curve_name = identity.get_curve_name(ecdh=True)
|
||||
log.debug('"%s" shared session key (%s) for %r from %s',
|
||||
identity.to_string(), curve_name, pubkey, self)
|
||||
@@ -130,7 +143,11 @@ class Trezor(interface.Device):
|
||||
log.debug('result: %s', result)
|
||||
assert len(result.session_key) in {65, 33} # NIST256 or Curve25519
|
||||
assert result.session_key[:1] == b'\x04'
|
||||
return bytes(result.session_key)
|
||||
self_pubkey = result.public_key
|
||||
if self_pubkey:
|
||||
self_pubkey = bytes(self_pubkey[1:])
|
||||
|
||||
return bytes(result.session_key), self_pubkey
|
||||
except self._defs.TrezorFailure as e:
|
||||
msg = '{} error: {}'.format(self, e)
|
||||
log.debug(msg, exc_info=True)
|
||||
|
||||
@@ -1,20 +1,19 @@
|
||||
"""TREZOR-related definitions."""
|
||||
|
||||
# pylint: disable=unused-import,import-error,no-name-in-module,no-member
|
||||
import os
|
||||
import logging
|
||||
import os
|
||||
|
||||
import mnemonic
|
||||
import semver
|
||||
import trezorlib
|
||||
|
||||
from trezorlib.client import TrezorClient as Client, PASSPHRASE_TEST_PATH
|
||||
from trezorlib.exceptions import TrezorFailure, PinException
|
||||
from trezorlib.transport import get_transport
|
||||
from trezorlib.messages import IdentityType
|
||||
|
||||
from trezorlib.btc import get_address, get_public_node
|
||||
from trezorlib.misc import sign_identity, get_ecdh_session_key
|
||||
from trezorlib.client import PASSPHRASE_TEST_PATH
|
||||
from trezorlib.client import TrezorClient as Client
|
||||
from trezorlib.exceptions import PinException, TrezorFailure
|
||||
from trezorlib.messages import IdentityType
|
||||
from trezorlib.misc import get_ecdh_session_key, sign_identity
|
||||
from trezorlib.transport import get_transport
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@@ -25,6 +24,7 @@ def find_device():
|
||||
If unset, picks first connected device.
|
||||
"""
|
||||
try:
|
||||
return get_transport(os.environ.get("TREZOR_PATH"))
|
||||
return get_transport(os.environ.get("TREZOR_PATH"), prefix_search=True)
|
||||
except Exception as e: # pylint: disable=broad-except
|
||||
log.debug("Failed to find a Trezor device: %s", e)
|
||||
return None
|
||||
|
||||
@@ -3,13 +3,15 @@
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
from .. import util
|
||||
|
||||
try:
|
||||
from trezorlib.client import PASSPHRASE_ON_DEVICE
|
||||
except ImportError:
|
||||
PASSPHRASE_ON_DEVICE = object()
|
||||
|
||||
from .. import util
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@@ -28,7 +30,8 @@ class UI:
|
||||
default_pinentry)
|
||||
self.options_getter = create_default_options_getter()
|
||||
self.device_name = device_type.__name__
|
||||
self.cached_passphrase_ack = None
|
||||
self.cached_passphrase_ack = util.ExpiringCache(
|
||||
seconds=float(config.get('cache_expiry_seconds', 'inf')))
|
||||
|
||||
def get_pin(self, _code=None):
|
||||
"""Ask the user for (scrambled) PIN."""
|
||||
@@ -75,11 +78,12 @@ class UI:
|
||||
def create_default_options_getter():
|
||||
"""Return current TTY and DISPLAY settings for GnuPG pinentry."""
|
||||
options = []
|
||||
try:
|
||||
ttyname = subprocess.check_output(args=['tty']).strip()
|
||||
options.append(b'ttyname=' + ttyname)
|
||||
except subprocess.CalledProcessError as e:
|
||||
log.warning('no TTY found: %s', e)
|
||||
if sys.stdin.isatty(): # short-circuit calling `tty`
|
||||
try:
|
||||
ttyname = subprocess.check_output(args=['tty']).strip()
|
||||
options.append(b'ttyname=' + ttyname)
|
||||
except subprocess.CalledProcessError as e:
|
||||
log.warning('no TTY found: %s', e)
|
||||
|
||||
display = os.environ.get('DISPLAY')
|
||||
if display is not None:
|
||||
|
||||
@@ -5,7 +5,7 @@ import io
|
||||
import logging
|
||||
|
||||
import ecdsa
|
||||
import ed25519
|
||||
import nacl.signing
|
||||
|
||||
from . import util
|
||||
|
||||
@@ -25,8 +25,10 @@ SSH_NIST256_DER_OCTET = b'\x04'
|
||||
SSH_NIST256_KEY_PREFIX = b'ecdsa-sha2-'
|
||||
SSH_NIST256_CURVE_NAME = b'nistp256'
|
||||
SSH_NIST256_KEY_TYPE = SSH_NIST256_KEY_PREFIX + SSH_NIST256_CURVE_NAME
|
||||
SSH_NIST256_CERT_POSTFIX = b'-cert-v01@openssh.com'
|
||||
SSH_NIST256_CERT_TYPE = SSH_NIST256_KEY_TYPE + SSH_NIST256_CERT_POSTFIX
|
||||
SSH_ED25519_KEY_TYPE = b'ssh-ed25519'
|
||||
SUPPORTED_KEY_TYPES = {SSH_NIST256_KEY_TYPE, SSH_ED25519_KEY_TYPE}
|
||||
SUPPORTED_KEY_TYPES = {SSH_NIST256_KEY_TYPE, SSH_NIST256_CERT_TYPE, SSH_ED25519_KEY_TYPE}
|
||||
|
||||
hashfunc = hashlib.sha256
|
||||
|
||||
@@ -49,6 +51,7 @@ def parse_pubkey(blob):
|
||||
The verifier returns the signatures in the required SSH format.
|
||||
Currently, NIST256P1 and ED25519 elliptic curves are supported.
|
||||
"""
|
||||
# pylint: disable=too-many-locals
|
||||
fp = fingerprint(blob)
|
||||
s = io.BytesIO(blob)
|
||||
key_type = util.read_frame(s)
|
||||
@@ -57,10 +60,27 @@ def parse_pubkey(blob):
|
||||
|
||||
result = {'blob': blob, 'type': key_type, 'fingerprint': fp}
|
||||
|
||||
if key_type == SSH_NIST256_KEY_TYPE:
|
||||
if key_type in (SSH_NIST256_KEY_TYPE, SSH_NIST256_CERT_TYPE):
|
||||
if key_type == SSH_NIST256_CERT_TYPE:
|
||||
_nonce = util.read_frame(s)
|
||||
|
||||
curve_name = util.read_frame(s)
|
||||
log.debug('curve name: %s', curve_name)
|
||||
point = util.read_frame(s)
|
||||
|
||||
if key_type == SSH_NIST256_CERT_TYPE:
|
||||
_serial_number = util.recv(s, '>Q')
|
||||
_type = util.recv(s, '>L')
|
||||
_key_id = util.read_frame(s)
|
||||
_valid_principals = util.read_frame(s)
|
||||
_valid_after = util.recv(s, '>Q')
|
||||
_valid_before = util.recv(s, '>Q')
|
||||
_critical_options = util.read_frame(s)
|
||||
_extensions = util.read_frame(s)
|
||||
_reserved = util.read_frame(s)
|
||||
_signature_key = util.read_frame(s)
|
||||
_signature = util.read_frame(s)
|
||||
|
||||
assert s.read() == b''
|
||||
_type, point = point[:1], point[1:]
|
||||
assert _type == SSH_NIST256_DER_OCTET
|
||||
@@ -88,8 +108,10 @@ def parse_pubkey(blob):
|
||||
|
||||
def ed25519_verify(sig, msg):
|
||||
assert len(sig) == 64
|
||||
vk = ed25519.VerifyingKey(pubkey)
|
||||
vk.verify(sig, msg)
|
||||
vk = nacl.signing.VerifyKey(bytes(pubkey),
|
||||
encoder=nacl.encoding.RawEncoder)
|
||||
vk.verify(msg, sig)
|
||||
log.debug('verify signature')
|
||||
return sig
|
||||
|
||||
result.update(curve=CURVE_ED25519, verifier=ed25519_verify)
|
||||
@@ -99,9 +121,9 @@ def parse_pubkey(blob):
|
||||
|
||||
def _decompress_ed25519(pubkey):
|
||||
"""Load public key from the serialized blob (stripping the prefix byte)."""
|
||||
if pubkey[:1] == b'\x00':
|
||||
if pubkey[:1] in {b'\x00', b'\x01'}:
|
||||
# set by Trezor fsm_msgSignIdentity() and fsm_msgGetPublicKey()
|
||||
return ed25519.VerifyingKey(pubkey[1:])
|
||||
return nacl.signing.VerifyKey(pubkey[1:], encoder=nacl.encoding.RawEncoder)
|
||||
else:
|
||||
return None
|
||||
|
||||
@@ -161,8 +183,8 @@ def serialize_verifying_key(vk):
|
||||
Currently, NIST256P1 and ED25519 elliptic curves are supported.
|
||||
Raise TypeError on unsupported key format.
|
||||
"""
|
||||
if isinstance(vk, ed25519.keys.VerifyingKey):
|
||||
pubkey = vk.to_bytes()
|
||||
if isinstance(vk, nacl.signing.VerifyKey):
|
||||
pubkey = vk.encode(encoder=nacl.encoding.RawEncoder)
|
||||
key_type = SSH_ED25519_KEY_TYPE
|
||||
blob = util.frame(SSH_ED25519_KEY_TYPE) + util.frame(pubkey)
|
||||
return key_type, blob
|
||||
@@ -188,7 +210,7 @@ def export_public_key(vk, label):
|
||||
key_type, blob = serialize_verifying_key(vk)
|
||||
log.debug('fingerprint: %s', fingerprint(blob))
|
||||
b64 = base64.b64encode(blob).decode('ascii')
|
||||
return u'{} {} {}\n'.format(key_type.decode('ascii'), b64, label)
|
||||
return '{} {} {}\n'.format(key_type.decode('ascii'), b64, label)
|
||||
|
||||
|
||||
def import_public_key(line):
|
||||
|
||||
@@ -18,12 +18,12 @@ import subprocess
|
||||
import sys
|
||||
import time
|
||||
|
||||
import daemon
|
||||
import pkg_resources
|
||||
import semver
|
||||
|
||||
|
||||
from . import agent, client, encode, keyring, protocol
|
||||
from .. import device, formats, server, util
|
||||
from . import agent, client, encode, keyring, protocol
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@@ -122,7 +122,7 @@ def run_init(device_type, args):
|
||||
verify_gpg_version()
|
||||
|
||||
# Prepare new GPG home directory for hardware-based identity
|
||||
device_name = os.path.basename(sys.argv[0]).rsplit('-', 1)[0]
|
||||
device_name = device_type.package_name().rsplit('-', 1)[0]
|
||||
log.info('device name: %s', device_name)
|
||||
homedir = args.homedir
|
||||
if not homedir:
|
||||
@@ -143,7 +143,7 @@ def run_init(device_type, args):
|
||||
# Prepare GPG agent invocation script (to pass the PATH from environment).
|
||||
with open(os.path.join(homedir, 'run-agent.sh'), 'w') as f:
|
||||
f.write(r"""#!/bin/sh
|
||||
export PATH={0}
|
||||
export PATH="{0}"
|
||||
{1} \
|
||||
-vv \
|
||||
--pin-entry-binary={pin_entry_binary} \
|
||||
@@ -226,6 +226,8 @@ def run_agent(device_type):
|
||||
p.add_argument('-v', '--verbose', default=0, action='count')
|
||||
p.add_argument('--server', default=False, action='store_true',
|
||||
help='Use stdin/stdout for communication with GPG.')
|
||||
p.add_argument('--daemon', default=False, action='store_true',
|
||||
help='Daemonize the agent.')
|
||||
|
||||
p.add_argument('--pin-entry-binary', type=str, default='pinentry',
|
||||
help='Path to PIN entry UI helper.')
|
||||
@@ -236,6 +238,15 @@ def run_agent(device_type):
|
||||
|
||||
args, _ = p.parse_known_args()
|
||||
|
||||
if args.daemon:
|
||||
with daemon.DaemonContext():
|
||||
run_agent_internal(args, device_type)
|
||||
else:
|
||||
run_agent_internal(args, device_type)
|
||||
|
||||
|
||||
def run_agent_internal(args, device_type):
|
||||
"""Actually run the server."""
|
||||
assert args.homedir
|
||||
|
||||
log_file = os.path.join(args.homedir, 'gpg-agent.log')
|
||||
@@ -249,8 +260,6 @@ def run_agent(device_type):
|
||||
pubkey_bytes = keyring.export_public_keys(env=env)
|
||||
device_type.ui = device.ui.UI(device_type=device_type,
|
||||
config=vars(args))
|
||||
device_type.ui.cached_passphrase_ack = util.ExpiringCache(
|
||||
seconds=float(args.cache_expiry_seconds))
|
||||
handler = agent.Handler(device=device_type(),
|
||||
pubkey_bytes=pubkey_bytes)
|
||||
|
||||
@@ -318,7 +327,5 @@ def main(device_type):
|
||||
|
||||
args = parser.parse_args()
|
||||
device_type.ui = device.ui.UI(device_type=device_type, config=vars(args))
|
||||
device_type.ui.cached_passphrase_ack = util.ExpiringCache(
|
||||
seconds=float(args.cache_expiry_seconds))
|
||||
|
||||
return args.func(device_type=device_type, args=args)
|
||||
|
||||
@@ -2,8 +2,8 @@
|
||||
import binascii
|
||||
import logging
|
||||
|
||||
from . import client, decode, keyring, protocol
|
||||
from .. import util
|
||||
from . import client, decode, keyring, protocol
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@@ -99,7 +99,7 @@ class Handler:
|
||||
b'SETHASH': lambda _, args: self.set_hash(*args),
|
||||
b'PKSIGN': lambda conn, _: self.pksign(conn),
|
||||
b'PKDECRYPT': lambda conn, _: self.pkdecrypt(conn),
|
||||
b'HAVEKEY': lambda _, args: self.have_key(*args),
|
||||
b'HAVEKEY': lambda conn, args: self.have_key(conn, *args),
|
||||
b'KEYINFO': _key_info,
|
||||
b'SCD': self.handle_scd,
|
||||
b'GET_PASSPHRASE': self.handle_get_passphrase,
|
||||
@@ -164,7 +164,7 @@ class Handler:
|
||||
# We assume the first user ID is used to generate TREZOR-based GPG keys.
|
||||
user_id = user_ids[0]['value'].decode('utf-8')
|
||||
curve_name = protocol.get_curve_name_by_oid(pubkey_dict['curve_oid'])
|
||||
ecdh = (pubkey_dict['algo'] == protocol.ECDH_ALGO_ID)
|
||||
ecdh = pubkey_dict['algo'] == protocol.ECDH_ALGO_ID
|
||||
|
||||
identity = client.create_identity(user_id=user_id, curve_name=curve_name)
|
||||
verifying_key = self.client.pubkey(identity=identity, ecdh=ecdh)
|
||||
@@ -198,8 +198,16 @@ class Handler:
|
||||
ec_point = self.client.ecdh(identity=identity, pubkey=remote_pubkey)
|
||||
keyring.sendline(conn, b'D ' + _serialize_point(ec_point))
|
||||
|
||||
def have_key(self, *keygrips):
|
||||
def have_key(self, conn, *keygrips):
|
||||
"""Check if any keygrip corresponds to a TREZOR-based key."""
|
||||
if len(keygrips) == 1 and keygrips[0].startswith(b"--list="):
|
||||
# Support "fast-path" key listing:
|
||||
# https://dev.gnupg.org/rG40da61b89b62dcb77847dc79eb159e885f52f817
|
||||
keygrips = list(decode.iter_keygrips(pubkey_bytes=self.pubkey_bytes))
|
||||
log.debug('keygrips: %r', keygrips)
|
||||
keyring.sendline(conn, b'D ' + util.assuan_serialize(b''.join(keygrips)))
|
||||
return
|
||||
|
||||
for keygrip in keygrips:
|
||||
try:
|
||||
self.get_identity(keygrip=keygrip)
|
||||
|
||||
@@ -25,9 +25,7 @@ class Client:
|
||||
def pubkey(self, identity, ecdh=False):
|
||||
"""Return public key as VerifyingKey object."""
|
||||
with self.device:
|
||||
pubkey = self.device.pubkey(ecdh=ecdh, identity=identity)
|
||||
return formats.decompress_pubkey(
|
||||
pubkey=pubkey, curve_name=identity.curve_name)
|
||||
return self.device.pubkey(ecdh=ecdh, identity=identity)
|
||||
|
||||
def sign(self, identity, digest):
|
||||
"""Sign the digest and return a serialized signature."""
|
||||
|
||||
@@ -7,10 +7,10 @@ import logging
|
||||
import struct
|
||||
|
||||
import ecdsa
|
||||
import ed25519
|
||||
import nacl.signing
|
||||
|
||||
from . import protocol
|
||||
from .. import util
|
||||
from . import protocol
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@@ -67,7 +67,8 @@ def _parse_ed25519_pubkey(mpi):
|
||||
prefix, value = util.split_bits(mpi, 8, 256)
|
||||
if prefix != 0x40:
|
||||
raise ValueError('Invalid MPI prefix: {}'.format(prefix))
|
||||
return ed25519.VerifyingKey(util.num2bytes(value, size=32))
|
||||
vk = nacl.signing.VerifyKey(util.num2bytes(value, size=32), encoder=nacl.encoding.RawEncoder)
|
||||
return vk
|
||||
|
||||
|
||||
SUPPORTED_CURVES = {
|
||||
@@ -281,18 +282,20 @@ HASH_ALGORITHMS = {
|
||||
}
|
||||
|
||||
|
||||
def load_by_keygrip(pubkey_bytes, keygrip):
|
||||
"""Return public key and first user ID for specified keygrip."""
|
||||
def _parse_pubkey_packets(pubkey_bytes):
|
||||
stream = io.BytesIO(pubkey_bytes)
|
||||
packets = list(parse_packets(stream))
|
||||
packets_per_pubkey = []
|
||||
for p in packets:
|
||||
for p in parse_packets(stream):
|
||||
if p['type'] == 'pubkey':
|
||||
# Add a new packet list for each pubkey.
|
||||
packets_per_pubkey.append([])
|
||||
packets_per_pubkey[-1].append(p)
|
||||
return packets_per_pubkey
|
||||
|
||||
for packets in packets_per_pubkey:
|
||||
|
||||
def load_by_keygrip(pubkey_bytes, keygrip):
|
||||
"""Return public key and first user ID for specified keygrip."""
|
||||
for packets in _parse_pubkey_packets(pubkey_bytes):
|
||||
user_ids = [p for p in packets if p['type'] == 'user_id']
|
||||
for p in packets:
|
||||
if p.get('keygrip') == keygrip:
|
||||
@@ -300,6 +303,15 @@ def load_by_keygrip(pubkey_bytes, keygrip):
|
||||
raise KeyError('{} keygrip not found'.format(util.hexlify(keygrip)))
|
||||
|
||||
|
||||
def iter_keygrips(pubkey_bytes):
|
||||
"""Iterate over all keygrips in this pubkey."""
|
||||
for packets in _parse_pubkey_packets(pubkey_bytes):
|
||||
for p in packets:
|
||||
keygrip = p.get('keygrip')
|
||||
if keygrip:
|
||||
yield keygrip
|
||||
|
||||
|
||||
def load_signature(stream, original_data):
|
||||
"""Load signature from stream, and compute GPG digest for verification."""
|
||||
signature, = list(parse_packets((stream)))
|
||||
|
||||
@@ -2,8 +2,8 @@
|
||||
import io
|
||||
import logging
|
||||
|
||||
from . import decode, keyring, protocol
|
||||
from .. import util
|
||||
from . import decode, keyring, protocol
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@@ -11,7 +11,7 @@ log = logging.getLogger(__name__)
|
||||
def create_primary(user_id, pubkey, signer_func, secret_bytes=b''):
|
||||
"""Export new primary GPG public key, ready for "gpg2 --import"."""
|
||||
pubkey_packet = protocol.packet(tag=(5 if secret_bytes else 6),
|
||||
blob=(pubkey.data() + secret_bytes))
|
||||
blob=pubkey.data() + secret_bytes)
|
||||
user_id_bytes = user_id.encode('utf-8')
|
||||
user_id_packet = protocol.packet(tag=13, blob=user_id_bytes)
|
||||
data_to_sign = (pubkey.data_to_hash() + user_id_packet[:1] +
|
||||
@@ -51,7 +51,7 @@ def create_primary(user_id, pubkey, signer_func, secret_bytes=b''):
|
||||
def create_subkey(primary_bytes, subkey, signer_func, secret_bytes=b''):
|
||||
"""Export new subkey to GPG primary key."""
|
||||
subkey_packet = protocol.packet(tag=(7 if secret_bytes else 14),
|
||||
blob=(subkey.data() + secret_bytes))
|
||||
blob=subkey.data() + secret_bytes)
|
||||
packets = list(decode.parse_packets(io.BytesIO(primary_bytes)))
|
||||
primary, user_id, signature = packets[:3]
|
||||
|
||||
|
||||
@@ -17,8 +17,11 @@ log = logging.getLogger(__name__)
|
||||
def check_output(args, env=None, sp=subprocess):
|
||||
"""Call an external binary and return its stdout."""
|
||||
log.debug('calling %s with env %s', args, env)
|
||||
output = sp.check_output(args=args, env=env)
|
||||
p = sp.Popen(args=args, env=env, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE)
|
||||
(output, error) = p.communicate()
|
||||
log.debug('output: %r', output)
|
||||
if error:
|
||||
log.debug('error: %r', error)
|
||||
return output
|
||||
|
||||
|
||||
@@ -222,7 +225,7 @@ def gpg_version(sp=subprocess):
|
||||
"""Get a keygrip of the primary GPG key of the specified user."""
|
||||
args = gpg_command(['--version'])
|
||||
output = check_output(args=args, sp=sp)
|
||||
line = output.split(b'\n')[0] # b'gpg (GnuPG) 2.1.11'
|
||||
line = output.split(b'\n', maxsplit=1)[0] # b'gpg (GnuPG) 2.1.11'
|
||||
line = line.split(b' ')[-1] # b'2.1.11'
|
||||
line = line.split(b'-')[0] # remove trailing version parts
|
||||
return line.split(b'v')[-1] # remove 'v' prefix
|
||||
|
||||
@@ -5,6 +5,8 @@ import hashlib
|
||||
import logging
|
||||
import struct
|
||||
|
||||
import nacl.signing
|
||||
|
||||
from .. import formats, util
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@@ -92,7 +94,7 @@ def _serialize_nist256(vk):
|
||||
|
||||
def _serialize_ed25519(vk):
|
||||
return mpi((0x40 << 256) |
|
||||
util.bytes2num(vk.to_bytes()))
|
||||
util.bytes2num(vk.encode(encoder=nacl.encoding.RawEncoder)))
|
||||
|
||||
|
||||
def _compute_keygrip(params):
|
||||
@@ -131,7 +133,7 @@ def keygrip_ed25519(vk):
|
||||
['b', util.num2bytes(0x2DFC9311D490018C7338BF8688861767FF8FF5B2BEBE27548A14B235ECA6874A, size=32)], # nopep8
|
||||
['g', util.num2bytes(0x04216936D3CD6E53FEC0A4E231FDD6DC5C692CC7609525A7B2C9562D608F25D51A6666666666666666666666666666666666666666666666666666666666666658, size=65)], # nopep8
|
||||
['n', util.num2bytes(0x1000000000000000000000000000000014DEF9DEA2F79CD65812631A5CF5D3ED, size=32)], # nopep8
|
||||
['q', vk.to_bytes()],
|
||||
['q', vk.encode(encoder=nacl.encoding.RawEncoder)],
|
||||
])
|
||||
|
||||
|
||||
@@ -144,7 +146,7 @@ def keygrip_curve25519(vk):
|
||||
['b', b'\x01'],
|
||||
['g', util.num2bytes(0x04000000000000000000000000000000000000000000000000000000000000000920ae19a1b8a086b4e01edd2c7748d14c923d4d7e6d7c61b229e9c5a27eced3d9, size=65)], # nopep8
|
||||
['n', util.num2bytes(0x1000000000000000000000000000000014DEF9DEA2F79CD65812631A5CF5D3ED, size=32)], # nopep8
|
||||
['q', vk.to_bytes()],
|
||||
['q', vk.encode(encoder=nacl.encoding.RawEncoder)],
|
||||
])
|
||||
|
||||
|
||||
|
||||
BIN
libagent/gpg/tests/romanz-pubkey.gpg
Normal file
BIN
libagent/gpg/tests/romanz-pubkey.gpg
Normal file
Binary file not shown.
@@ -1,11 +1,10 @@
|
||||
import glob
|
||||
import io
|
||||
import os
|
||||
import pathlib
|
||||
|
||||
import pytest
|
||||
|
||||
from .. import decode, protocol
|
||||
from ... import util
|
||||
from .. import decode, protocol
|
||||
|
||||
|
||||
def test_subpackets():
|
||||
@@ -30,8 +29,8 @@ def test_mpi():
|
||||
assert decode.parse_mpis(util.Reader(s), n=2) == [0x123, 5]
|
||||
|
||||
|
||||
cwd = os.path.join(os.path.dirname(__file__))
|
||||
input_files = glob.glob(os.path.join(cwd, '*.gpg'))
|
||||
cwd = pathlib.Path(__file__).parent
|
||||
input_files = cwd.glob('*.gpg')
|
||||
|
||||
|
||||
@pytest.fixture(params=input_files)
|
||||
@@ -60,3 +59,20 @@ def test_has_custom_subpacket():
|
||||
def test_load_by_keygrip_missing():
|
||||
with pytest.raises(KeyError):
|
||||
decode.load_by_keygrip(pubkey_bytes=b'', keygrip=b'')
|
||||
|
||||
|
||||
def test_keygrips():
|
||||
pubkey_bytes = (cwd / "romanz-pubkey.gpg").open("rb").read()
|
||||
keygrips = list(decode.iter_keygrips(pubkey_bytes))
|
||||
assert [k.hex() for k in keygrips] == [
|
||||
'7b2497258d76bc6539ed88d018cd1c739e2dbb6c',
|
||||
'30ae97f3d8e0e34c5ed80e1715fd442ca24c0a8e',
|
||||
]
|
||||
|
||||
for keygrip in keygrips:
|
||||
pubkey_dict, user_ids = decode.load_by_keygrip(pubkey_bytes, keygrip)
|
||||
assert pubkey_dict['keygrip'] == keygrip
|
||||
assert [u['value'] for u in user_ids] == [
|
||||
b'Roman Zeyde <roman.zeyde@gmail.com>',
|
||||
b'Roman Zeyde <me@romanzey.de>',
|
||||
]
|
||||
|
||||
@@ -53,6 +53,14 @@ class FakeSocket:
|
||||
self.tx.write(data)
|
||||
|
||||
|
||||
def mock_subprocess(output, error=b''):
|
||||
sp = mock.Mock(spec=['Popen', 'PIPE'])
|
||||
p = mock.Mock(spec=['communicate'])
|
||||
sp.Popen.return_value = p
|
||||
p.communicate.return_value = (output, error)
|
||||
return sp
|
||||
|
||||
|
||||
def test_sign_digest():
|
||||
sock = FakeSocket()
|
||||
sock.rx.write(b'OK Pleased to meet you, process XYZ\n')
|
||||
@@ -61,10 +69,8 @@ def test_sign_digest():
|
||||
sock.rx.seek(0)
|
||||
keygrip = '1234'
|
||||
digest = b'A' * 32
|
||||
sp = mock.Mock(spec=['check_output'])
|
||||
sp.check_output.return_value = '/dev/pts/0'
|
||||
sig = keyring.sign_digest(sock=sock, keygrip=keygrip,
|
||||
digest=digest, sp=sp,
|
||||
digest=digest, sp=mock_subprocess('/dev/pts/0'),
|
||||
environ={'DISPLAY': ':0'})
|
||||
assert sig == (0x30313233343536373839414243444546,)
|
||||
assert sock.tx.getvalue() == b'''RESET
|
||||
@@ -85,8 +91,7 @@ def test_iterlines():
|
||||
|
||||
|
||||
def test_get_agent_sock_path():
|
||||
sp = mock.Mock(spec=['check_output'])
|
||||
sp.check_output.return_value = b'''sysconfdir:/usr/local/etc/gnupg
|
||||
sp = mock_subprocess(b'''sysconfdir:/usr/local/etc/gnupg
|
||||
bindir:/usr/local/bin
|
||||
libexecdir:/usr/local/libexec
|
||||
libdir:/usr/local/lib/gnupg
|
||||
@@ -96,6 +101,6 @@ dirmngr-socket:/run/user/1000/gnupg/S.dirmngr
|
||||
agent-ssh-socket:/run/user/1000/gnupg/S.gpg-agent.ssh
|
||||
agent-socket:/run/user/1000/gnupg/S.gpg-agent
|
||||
homedir:/home/roman/.gnupg
|
||||
'''
|
||||
''')
|
||||
expected = b'/run/user/1000/gnupg/S.gpg-agent'
|
||||
assert keyring.get_agent_sock_path(sp=sp) == expected
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
import ecdsa
|
||||
import ed25519
|
||||
import nacl.signing
|
||||
import pytest
|
||||
|
||||
from .. import protocol
|
||||
from ... import formats
|
||||
from .. import protocol
|
||||
|
||||
|
||||
def test_packet():
|
||||
@@ -83,8 +83,8 @@ def test_nist256p1_ecdh():
|
||||
|
||||
|
||||
def test_ed25519():
|
||||
sk = ed25519.SigningKey(b'\x00' * 32)
|
||||
vk = sk.get_verifying_key()
|
||||
sk = nacl.signing.SigningKey(b'\x00'*32, encoder=nacl.encoding.RawEncoder)
|
||||
vk = sk.verify_key
|
||||
pk = protocol.PublicKey(curve_name=formats.CURVE_ED25519,
|
||||
created=42, verifying_key=vk)
|
||||
assert repr(pk) == 'GPG public key ed25519/36B40FE6'
|
||||
@@ -92,8 +92,8 @@ def test_ed25519():
|
||||
|
||||
|
||||
def test_curve25519():
|
||||
sk = ed25519.SigningKey(b'\x00' * 32)
|
||||
vk = sk.get_verifying_key()
|
||||
sk = nacl.signing.SigningKey(b'\x00'*32, encoder=nacl.encoding.RawEncoder)
|
||||
vk = sk.verify_key
|
||||
pk = protocol.PublicKey(curve_name=formats.ECDH_CURVE25519,
|
||||
created=42, verifying_key=vk)
|
||||
assert repr(pk) == 'GPG public key curve25519/69460384'
|
||||
|
||||
@@ -134,7 +134,7 @@ def server_thread(sock, handle_conn, quit_event):
|
||||
break
|
||||
# Handle connections from SSH concurrently.
|
||||
threading.Thread(target=handle_conn,
|
||||
kwargs=dict(conn=conn)).start()
|
||||
kwargs={'conn': conn}).start()
|
||||
log.debug('server thread stopped')
|
||||
|
||||
|
||||
@@ -159,7 +159,7 @@ def run_process(command, environ):
|
||||
try:
|
||||
p = subprocess.Popen(args=command, env=env)
|
||||
except OSError as e:
|
||||
raise OSError('cannot run %r: %s' % (command, e))
|
||||
raise OSError('cannot run %r: %s' % (command, e)) from e
|
||||
log.debug('subprocess %d is running', p.pid)
|
||||
ret = p.wait()
|
||||
log.debug('subprocess %d exited: %d', p.pid, ret)
|
||||
|
||||
128
libagent/signify/__init__.py
Normal file
128
libagent/signify/__init__.py
Normal file
@@ -0,0 +1,128 @@
|
||||
"""TREZOR support for Ed25519 signify/minisign signatures."""
|
||||
|
||||
import argparse
|
||||
import binascii
|
||||
import contextlib
|
||||
import functools
|
||||
import hashlib
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import struct
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
|
||||
import pkg_resources
|
||||
import semver
|
||||
|
||||
from .. import formats, server, util
|
||||
from ..device import interface, ui
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _create_identity(user_id):
|
||||
result = interface.Identity(identity_str='signify://', curve_name='ed25519')
|
||||
result.identity_dict['host'] = user_id
|
||||
return result
|
||||
|
||||
|
||||
class Client:
|
||||
"""Sign messages and get public keys from a hardware device."""
|
||||
|
||||
def __init__(self, device):
|
||||
"""C-tor."""
|
||||
self.device = device
|
||||
|
||||
def pubkey(self, identity):
|
||||
"""Return public key as VerifyingKey object."""
|
||||
with self.device:
|
||||
return bytes(self.device.pubkey(ecdh=False, identity=identity))
|
||||
|
||||
def sign_with_pubkey(self, identity, data):
|
||||
"""Sign the data and return a signature."""
|
||||
log.info('please confirm Signify signature on %s for "%s"...',
|
||||
self.device, identity.to_string())
|
||||
log.debug('signing data: %s', util.hexlify(data))
|
||||
with self.device:
|
||||
sig, pubkey = self.device.sign_with_pubkey(blob=data, identity=identity)
|
||||
assert len(sig) == 64
|
||||
assert len(pubkey) == 33
|
||||
assert pubkey[:1] == b"\x00"
|
||||
return sig, pubkey[1:]
|
||||
|
||||
|
||||
ALG_SIGNIFY = b'Ed'
|
||||
ALG_MINISIGN = b'ED' # prehashes the data before signing
|
||||
|
||||
|
||||
def format_payload(pubkey, data, sig_alg):
|
||||
"""See http://www.openbsd.org/papers/bsdcan-signify.html for details."""
|
||||
keynum = hashlib.sha256(pubkey).digest()[:8]
|
||||
return binascii.b2a_base64(sig_alg + keynum + data).decode("ascii")
|
||||
|
||||
|
||||
def run_pubkey(device_type, args):
|
||||
"""Export hardware-based Signify public key."""
|
||||
util.setup_logging(verbosity=args.verbose)
|
||||
log.warning('This Signify tool is still in EXPERIMENTAL mode, '
|
||||
'so please note that the key derivation, API, and features '
|
||||
'may change without backwards compatibility!')
|
||||
|
||||
identity = _create_identity(user_id=args.user_id)
|
||||
pubkey = Client(device=device_type()).pubkey(identity=identity)
|
||||
comment = f'untrusted comment: identity {identity.to_string()}\n'
|
||||
payload = format_payload(pubkey=pubkey, data=pubkey, sig_alg=ALG_SIGNIFY)
|
||||
print(comment + payload, end="")
|
||||
|
||||
|
||||
def run_sign(device_type, args):
|
||||
"""Prehash & sign an input blob using Ed25519."""
|
||||
util.setup_logging(verbosity=args.verbose)
|
||||
identity = _create_identity(user_id=args.user_id)
|
||||
|
||||
data_to_sign = sys.stdin.buffer.read()
|
||||
sig_alg = ALG_SIGNIFY
|
||||
if args.prehash:
|
||||
# See https://github.com/jedisct1/minisign/commit/6e1023d20758b6fdb2a4b697213b0bf608ba4020
|
||||
# Released in https://github.com/jedisct1/minisign/releases/tag/0.6
|
||||
sig_alg = ALG_MINISIGN
|
||||
data_to_sign = hashlib.blake2b(data_to_sign).digest()
|
||||
|
||||
sig, pubkey = Client(device=device_type()).sign_with_pubkey(identity, data_to_sign)
|
||||
pubkey_str = format_payload(pubkey=pubkey, data=pubkey, sig_alg=sig_alg)
|
||||
sig_str = format_payload(pubkey=pubkey, data=sig, sig_alg=sig_alg)
|
||||
untrusted_comment = f'untrusted comment: pubkey {pubkey_str}'
|
||||
print(untrusted_comment + sig_str, end="")
|
||||
|
||||
comment_to_sign = sig + args.comment.encode()
|
||||
sig, _ = Client(device=device_type()).sign_with_pubkey(identity, comment_to_sign)
|
||||
sig_str = binascii.b2a_base64(sig).decode("ascii")
|
||||
print(f'trusted comment: {args.comment}\n' + sig_str, end="")
|
||||
|
||||
|
||||
def main(device_type):
|
||||
"""Parse command-line arguments."""
|
||||
parser = argparse.ArgumentParser()
|
||||
|
||||
subparsers = parser.add_subparsers(title='Action', dest='action')
|
||||
subparsers.required = True
|
||||
|
||||
p = subparsers.add_parser('pubkey')
|
||||
p.add_argument('user_id')
|
||||
p.add_argument('-v', '--verbose', default=0, action='count')
|
||||
p.set_defaults(func=run_pubkey)
|
||||
|
||||
p = subparsers.add_parser('sign')
|
||||
p.add_argument('user_id')
|
||||
p.add_argument('-v', '--verbose', default=0, action='count')
|
||||
p.add_argument('-c', '--comment', default=time.asctime())
|
||||
p.add_argument('-H', '--prehash', default=False, action='store_true')
|
||||
p.set_defaults(func=run_sign)
|
||||
|
||||
args = parser.parse_args()
|
||||
device_type.ui = ui.UI(device_type=device_type, config=vars(args))
|
||||
device_type.ui.cached_passphrase_ack = util.ExpiringCache(seconds=float(60))
|
||||
|
||||
return args.func(device_type=device_type, args=args)
|
||||
@@ -11,9 +11,9 @@ import sys
|
||||
import tempfile
|
||||
import threading
|
||||
|
||||
import pkg_resources
|
||||
import configargparse
|
||||
import daemon
|
||||
import pkg_resources
|
||||
|
||||
from .. import device, formats, server, util
|
||||
from . import client, protocol
|
||||
@@ -138,9 +138,9 @@ def serve(handler, sock_path, timeout=UNIX_SOCKET_TIMEOUT):
|
||||
handle_conn = functools.partial(server.handle_connection,
|
||||
handler=handler,
|
||||
mutex=device_mutex)
|
||||
kwargs = dict(sock=sock,
|
||||
handle_conn=handle_conn,
|
||||
quit_event=quit_event)
|
||||
kwargs = {'sock': sock,
|
||||
'handle_conn': handle_conn,
|
||||
'quit_event': quit_event}
|
||||
with server.spawn(server.server_thread, kwargs):
|
||||
try:
|
||||
yield environ
|
||||
@@ -269,13 +269,11 @@ def main(device_type):
|
||||
identities = [device.interface.Identity(
|
||||
identity_str=args.identity, curve_name=args.ecdsa_curve_name)]
|
||||
for index, identity in enumerate(identities):
|
||||
identity.identity_dict['proto'] = u'ssh'
|
||||
identity.identity_dict['proto'] = 'ssh'
|
||||
log.info('identity #%d: %s', index, identity.to_string())
|
||||
|
||||
# override default PIN/passphrase entry tools (relevant for TREZOR/Keepkey):
|
||||
device_type.ui = device.ui.UI(device_type=device_type, config=vars(args))
|
||||
device_type.ui.cached_passphrase_ack = util.ExpiringCache(
|
||||
args.cache_expiry_seconds)
|
||||
|
||||
conn = JustInTimeConnection(
|
||||
conn_factory=lambda: client.Client(device_type()),
|
||||
|
||||
@@ -20,46 +20,63 @@ class Client:
|
||||
|
||||
def export_public_keys(self, identities):
|
||||
"""Export SSH public keys from the device."""
|
||||
public_keys = []
|
||||
pubkeys = []
|
||||
with self.device:
|
||||
for i in identities:
|
||||
pubkey = self.device.pubkey(identity=i)
|
||||
vk = formats.decompress_pubkey(pubkey=pubkey,
|
||||
curve_name=i.curve_name)
|
||||
public_key = formats.export_public_key(vk=vk,
|
||||
label=i.to_string())
|
||||
public_keys.append(public_key)
|
||||
return public_keys
|
||||
vk = self.device.pubkey(identity=i)
|
||||
label = i.to_string()
|
||||
pubkey = formats.export_public_key(vk=vk, label=label)
|
||||
pubkeys.append(pubkey)
|
||||
return pubkeys
|
||||
|
||||
def sign_ssh_challenge(self, blob, identity):
|
||||
"""Sign given blob using a private key on the device."""
|
||||
msg = _parse_ssh_blob(blob)
|
||||
log.debug('%s: user %r via %r (%r)',
|
||||
msg['conn'], msg['user'], msg['auth'], msg['key_type'])
|
||||
log.debug('nonce: %r', msg['nonce'])
|
||||
fp = msg['public_key']['fingerprint']
|
||||
log.debug('fingerprint: %s', fp)
|
||||
log.debug('hidden challenge size: %d bytes', len(blob))
|
||||
log.debug('blob: %r', blob)
|
||||
msg = parse_ssh_blob(blob)
|
||||
if msg['sshsig']:
|
||||
log.info('please confirm "%s" signature for "%s" using %s...',
|
||||
msg['namespace'], identity.to_string(), self.device)
|
||||
else:
|
||||
log.debug('%s: user %r via %r (%r)',
|
||||
msg['conn'], msg['user'], msg['auth'], msg['key_type'])
|
||||
log.debug('nonce: %r', msg['nonce'])
|
||||
fp = msg['public_key']['fingerprint']
|
||||
log.debug('fingerprint: %s', fp)
|
||||
log.debug('hidden challenge size: %d bytes', len(blob))
|
||||
|
||||
log.info('please confirm user "%s" login to "%s" using %s...',
|
||||
msg['user'].decode('ascii'), identity.to_string(),
|
||||
self.device)
|
||||
log.info('please confirm user "%s" login to "%s" using %s...',
|
||||
msg['user'].decode('ascii'), identity.to_string(),
|
||||
self.device)
|
||||
|
||||
with self.device:
|
||||
return self.device.sign(blob=blob, identity=identity)
|
||||
|
||||
|
||||
def _parse_ssh_blob(data):
|
||||
def parse_ssh_blob(data):
|
||||
"""Parse binary data into a dict."""
|
||||
res = {}
|
||||
i = io.BytesIO(data)
|
||||
res['nonce'] = util.read_frame(i)
|
||||
i.read(1) # SSH2_MSG_USERAUTH_REQUEST == 50 (from ssh2.h, line 108)
|
||||
res['user'] = util.read_frame(i)
|
||||
res['conn'] = util.read_frame(i)
|
||||
res['auth'] = util.read_frame(i)
|
||||
i.read(1) # have_sig == 1 (from sshconnect2.c, line 1056)
|
||||
res['key_type'] = util.read_frame(i)
|
||||
public_key = util.read_frame(i)
|
||||
res['public_key'] = formats.parse_pubkey(public_key)
|
||||
assert not i.read()
|
||||
if data.startswith(b'SSHSIG'):
|
||||
i = io.BytesIO(data[6:])
|
||||
# https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.sshsig
|
||||
res['sshsig'] = True
|
||||
res['namespace'] = util.read_frame(i)
|
||||
res['reserved'] = util.read_frame(i)
|
||||
res['hashalg'] = util.read_frame(i)
|
||||
res['message'] = util.read_frame(i)
|
||||
else:
|
||||
i = io.BytesIO(data)
|
||||
res['sshsig'] = False
|
||||
res['nonce'] = util.read_frame(i)
|
||||
i.read(1) # SSH2_MSG_USERAUTH_REQUEST == 50 (from ssh2.h, line 108)
|
||||
res['user'] = util.read_frame(i)
|
||||
res['conn'] = util.read_frame(i)
|
||||
res['auth'] = util.read_frame(i)
|
||||
i.read(1) # have_sig == 1 (from sshconnect2.c, line 1056)
|
||||
res['key_type'] = util.read_frame(i)
|
||||
public_key = util.read_frame(i)
|
||||
res['public_key'] = formats.parse_pubkey(public_key)
|
||||
|
||||
unparsed = i.read()
|
||||
if unparsed:
|
||||
log.warning('unparsed blob: %r', unparsed)
|
||||
return res
|
||||
|
||||
@@ -16,31 +16,33 @@ log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Taken from https://github.com/openssh/openssh-portable/blob/master/authfd.h
|
||||
COMMANDS = dict(
|
||||
SSH_AGENTC_REQUEST_RSA_IDENTITIES=1,
|
||||
SSH_AGENT_RSA_IDENTITIES_ANSWER=2,
|
||||
SSH_AGENTC_RSA_CHALLENGE=3,
|
||||
SSH_AGENT_RSA_RESPONSE=4,
|
||||
SSH_AGENT_FAILURE=5,
|
||||
SSH_AGENT_SUCCESS=6,
|
||||
SSH_AGENTC_ADD_RSA_IDENTITY=7,
|
||||
SSH_AGENTC_REMOVE_RSA_IDENTITY=8,
|
||||
SSH_AGENTC_REMOVE_ALL_RSA_IDENTITIES=9,
|
||||
SSH2_AGENTC_REQUEST_IDENTITIES=11,
|
||||
SSH2_AGENT_IDENTITIES_ANSWER=12,
|
||||
SSH2_AGENTC_SIGN_REQUEST=13,
|
||||
SSH2_AGENT_SIGN_RESPONSE=14,
|
||||
SSH2_AGENTC_ADD_IDENTITY=17,
|
||||
SSH2_AGENTC_REMOVE_IDENTITY=18,
|
||||
SSH2_AGENTC_REMOVE_ALL_IDENTITIES=19,
|
||||
SSH_AGENTC_ADD_SMARTCARD_KEY=20,
|
||||
SSH_AGENTC_REMOVE_SMARTCARD_KEY=21,
|
||||
SSH_AGENTC_LOCK=22,
|
||||
SSH_AGENTC_UNLOCK=23,
|
||||
SSH_AGENTC_ADD_RSA_ID_CONSTRAINED=24,
|
||||
SSH2_AGENTC_ADD_ID_CONSTRAINED=25,
|
||||
SSH_AGENTC_ADD_SMARTCARD_KEY_CONSTRAINED=26,
|
||||
)
|
||||
COMMANDS = {
|
||||
"SSH_AGENTC_REQUEST_RSA_IDENTITIES": 1,
|
||||
"SSH_AGENT_RSA_IDENTITIES_ANSWER": 2,
|
||||
"SSH_AGENTC_RSA_CHALLENGE": 3,
|
||||
"SSH_AGENT_RSA_RESPONSE": 4,
|
||||
"SSH_AGENT_FAILURE": 5,
|
||||
"SSH_AGENT_SUCCESS": 6,
|
||||
"SSH_AGENTC_ADD_RSA_IDENTITY": 7,
|
||||
"SSH_AGENTC_REMOVE_RSA_IDENTITY": 8,
|
||||
"SSH_AGENTC_REMOVE_ALL_RSA_IDENTITIES": 9,
|
||||
"SSH2_AGENTC_REQUEST_IDENTITIES": 11,
|
||||
"SSH2_AGENT_IDENTITIES_ANSWER": 12,
|
||||
"SSH2_AGENTC_SIGN_REQUEST": 13,
|
||||
"SSH2_AGENT_SIGN_RESPONSE": 14,
|
||||
"SSH2_AGENTC_ADD_IDENTITY": 17,
|
||||
"SSH2_AGENTC_REMOVE_IDENTITY": 18,
|
||||
"SSH2_AGENTC_REMOVE_ALL_IDENTITIES": 19,
|
||||
"SSH_AGENTC_ADD_SMARTCARD_KEY": 20,
|
||||
"SSH_AGENTC_REMOVE_SMARTCARD_KEY": 21,
|
||||
"SSH_AGENTC_LOCK": 22,
|
||||
"SSH_AGENTC_UNLOCK": 23,
|
||||
"SSH_AGENTC_ADD_RSA_ID_CONSTRAINED": 24,
|
||||
"SSH2_AGENTC_ADD_ID_CONSTRAINED": 25,
|
||||
"SSH_AGENTC_ADD_SMARTCARD_KEY_CONSTRAINED": 26,
|
||||
"SSH_AGENTC_EXTENSION": 27,
|
||||
"SSH_AGENT_EXTENSION_FAILURE": 28,
|
||||
}
|
||||
|
||||
|
||||
def msg_code(name):
|
||||
@@ -86,6 +88,7 @@ class Handler:
|
||||
msg_code('SSH_AGENTC_REQUEST_RSA_IDENTITIES'): _legacy_pubs,
|
||||
msg_code('SSH2_AGENTC_REQUEST_IDENTITIES'): self.list_pubs,
|
||||
msg_code('SSH2_AGENTC_SIGN_REQUEST'): self.sign_message,
|
||||
msg_code('SSH_AGENTC_EXTENSION'): _unsupported_extension,
|
||||
}
|
||||
|
||||
def handle(self, msg):
|
||||
@@ -144,17 +147,26 @@ class Handler:
|
||||
signature = self.conn.sign(blob=blob, identity=key['identity'])
|
||||
except IOError:
|
||||
return failure()
|
||||
except Exception:
|
||||
log.exception('signature with "%s" key failed', label)
|
||||
raise
|
||||
|
||||
log.debug('signature: %r', signature)
|
||||
|
||||
try:
|
||||
sig_bytes = key['verifier'](sig=signature, msg=blob)
|
||||
log.info('signature status: OK')
|
||||
except formats.ecdsa.BadSignatureError:
|
||||
except formats.ecdsa.BadSignatureError as e:
|
||||
log.exception('signature status: ERROR')
|
||||
raise ValueError('invalid ECDSA signature')
|
||||
raise ValueError('invalid ECDSA signature') from e
|
||||
|
||||
log.debug('signature size: %d bytes', len(sig_bytes))
|
||||
|
||||
data = util.frame(util.frame(key['type']), util.frame(sig_bytes))
|
||||
code = util.pack('B', msg_code('SSH2_AGENT_SIGN_RESPONSE'))
|
||||
return util.frame(code, data)
|
||||
|
||||
|
||||
def _unsupported_extension(buf): # pylint: disable=unused-argument
|
||||
code = util.pack('B', msg_code('SSH_AGENT_EXTENSION_FAILURE'))
|
||||
return util.frame(code)
|
||||
|
||||
@@ -17,12 +17,16 @@ PUBKEY_TEXT = ('ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzd'
|
||||
|
||||
class MockDevice(device.interface.Device): # pylint: disable=abstract-method
|
||||
|
||||
def connect(self): # pylint: disable=no-self-use
|
||||
@classmethod
|
||||
def package_name(cls):
|
||||
return 'fake-device-agent'
|
||||
|
||||
def connect(self):
|
||||
return mock.Mock()
|
||||
|
||||
def pubkey(self, identity, ecdh=False): # pylint: disable=unused-argument
|
||||
assert self.conn
|
||||
return PUBKEY
|
||||
return formats.decompress_pubkey(pubkey=PUBKEY, curve_name=identity.curve_name)
|
||||
|
||||
def sign(self, identity, blob):
|
||||
"""Sign given blob and return the signature (as bytes)."""
|
||||
@@ -70,3 +74,52 @@ def test_ssh_agent():
|
||||
c.device.sign = cancel_sign
|
||||
with pytest.raises(IOError):
|
||||
c.sign_ssh_challenge(blob=BLOB, identity=identity)
|
||||
|
||||
|
||||
CHALLENGE_BLOB = (
|
||||
b'\x00\x00\x00 \xe4\x08\x8e"J#\x83 \x05\x90\x1e\xa9\xf9C\xb1\xd2\x8f\xc3\x8c\xea\xd8\xf6E'
|
||||
b'%q\xff\x07\xfa\xd8\x8b\xdf\xbd2\x00\x00\x00\x03git\x00\x00\x00\x0essh-connection\x00\x00'
|
||||
b'\x00\tpublickey\x01\x00\x00\x00\x0bssh-ed25519\x00\x00\x003\x00\x00\x00\x0bssh-ed25519'
|
||||
b'\x00\x00\x00 \xd1q\x1ab\xc6\xf0d\x19\xe2q<\x05\x0b\xdao\xa1\xcb\xae\xad\xc9\x0b\x16\xf3'
|
||||
b'\xc2m\x84q8qU\xda\xb0'
|
||||
)
|
||||
|
||||
|
||||
def test_parse_ssh_challenge():
|
||||
result = client.parse_ssh_blob(CHALLENGE_BLOB)
|
||||
result['public_key'].pop('verifier')
|
||||
assert result == {
|
||||
'auth': b'publickey',
|
||||
'conn': b'ssh-connection',
|
||||
'key_type': b'ssh-ed25519',
|
||||
'nonce': b'\xe4\x08\x8e"J#\x83 \x05\x90\x1e\xa9\xf9C\xb1\xd2\x8f\xc3\x8c\xea'
|
||||
b'\xd8\xf6E%q\xff\x07\xfa\xd8\x8b\xdf\xbd',
|
||||
'public_key': {'blob': b'\x00\x00\x00\x0bssh-ed25519\x00\x00\x00 \xd1'
|
||||
b'q\x1ab\xc6\xf0d\x19\xe2q<\x05\x0b\xdao\xa1\xcb'
|
||||
b'\xae\xad\xc9\x0b\x16\xf3\xc2m\x84q8qU\xda\xb0',
|
||||
'curve': 'ed25519',
|
||||
'fingerprint': '47:a3:26:af:0b:5d:a2:c3:91:ed:26:36:94:be:3a:d5',
|
||||
'type': b'ssh-ed25519'},
|
||||
'sshsig': False,
|
||||
'user': b'git',
|
||||
}
|
||||
|
||||
|
||||
FILE_SIG_BLOB = (
|
||||
b"SSHSIG\x00\x00\x00\x04file\x00\x00\x00\x00\x00\x00\x00\x06sha512\x00\x00\x00@r\xb7r\xfeM"
|
||||
b"\xe5w\xf0#w\x1dbl\xca\to=\x90\xb69\xd1:u{\xe5\xe4\xf1\xb1\xa8C\xb8\xfcM\x91\x9f\x12\xa8"
|
||||
b"\x1d`\x00\x848C<\x85\x8e\xf0o\xdab\xdcQ\xce\xf2\xda\xc3\xae\xa9\x1e%\x85\xcd\xe3'"
|
||||
)
|
||||
|
||||
|
||||
def test_parse_ssh_signature():
|
||||
result = client.parse_ssh_blob(FILE_SIG_BLOB)
|
||||
assert result == {
|
||||
'hashalg': b'sha512',
|
||||
'message': b'r\xb7r\xfeM\xe5w\xf0#w\x1dbl\xca\to=\x90\xb69\xd1:u{'
|
||||
b'\xe5\xe4\xf1\xb1\xa8C\xb8\xfcM\x91\x9f\x12\xa8\x1d`\x00\x848C<'
|
||||
b"\x85\x8e\xf0o\xdab\xdcQ\xce\xf2\xda\xc3\xae\xa9\x1e%\x85\xcd\xe3'",
|
||||
'namespace': b'file',
|
||||
'reserved': b'',
|
||||
'sshsig': True,
|
||||
}
|
||||
|
||||
@@ -23,6 +23,26 @@ _public_key = (
|
||||
'home\n'
|
||||
)
|
||||
|
||||
_public_key_cert = (
|
||||
'ecdsa-sha2-nistp256-cert-v01@openssh.com '
|
||||
'AAAAKGVjZHNhLXNoYTItbmlzdHAyNTYtY2VydC12MDFAb3B'
|
||||
'lbnNzaC5jb20AAAAgohlAP8H3LPYWz3+w/E+RGDxG6tNAEE'
|
||||
'3Ao9Z6Pc66khEAAAAIbmlzdHAyNTYAAABBBGI2zqveJSB+g'
|
||||
'eQEWG46OvGs2h3+0qu7tIdsH8WylrV19vttd7GR5rKvTWJt'
|
||||
'8b9ErthmnFALelAFKOB/u50jsukAAAAAAAAAFQAAAAEAAAA'
|
||||
'IdW5pdFRlc3QAAAAIAAAABHVzZXIAAAAAAAAAAP////////'
|
||||
'//AAAAAAAAAIIAAAAVcGVybWl0LVgxMS1mb3J3YXJkaW5nA'
|
||||
'AAAAAAAABdwZXJtaXQtYWdlbnQtZm9yd2FyZGluZwAAAAAA'
|
||||
'AAAWcGVybWl0LXBvcnQtZm9yd2FyZGluZwAAAAAAAAAKcGV'
|
||||
'ybWl0LXB0eQAAAAAAAAAOcGVybWl0LXVzZXItcmMAAAAAAA'
|
||||
'AAAAAAADMAAAALc3NoLWVkMjU1MTkAAAAgf9gyPrF24CLZc'
|
||||
'0rHoZuI1+yjBFWt66G8oUmm20yRO8IAAABTAAAAC3NzaC1l'
|
||||
'ZDI1NTE5AAAAQCEgVgsR7fSgcTxuAWqMW4h42y7pt1BAKR4'
|
||||
'HTRg178tl7Vx8WoRtQcNirX9eggBcTA+5ILWmeY3uDN+soW'
|
||||
't7fwk= '
|
||||
'home\n'
|
||||
)
|
||||
|
||||
|
||||
def test_parse_public_key():
|
||||
key = formats.import_public_key(_public_key)
|
||||
@@ -34,6 +54,16 @@ def test_parse_public_key():
|
||||
assert key['type'] == b'ecdsa-sha2-nistp256'
|
||||
|
||||
|
||||
def test_parse_public_key_cert():
|
||||
key = formats.import_public_key(_public_key_cert)
|
||||
assert key['name'] == b'home'
|
||||
assert key['point'] == _point
|
||||
|
||||
assert key['curve'] == 'nist256p1'
|
||||
assert key['fingerprint'] == 'ab:ab:5d:9f:f4:33:f4:d0:c3:68:65:3b:94:86:de:22' # nopep8
|
||||
assert key['type'] == b'ecdsa-sha2-nistp256-cert-v01@openssh.com'
|
||||
|
||||
|
||||
def test_decompress():
|
||||
blob = '036236ceabde25207e81e404586e3a3af1acda1dfed2abbbb4876c1fc5b296b575'
|
||||
vk = formats.decompress_pubkey(binascii.unhexlify(blob),
|
||||
|
||||
@@ -2,6 +2,6 @@ from ..device import interface
|
||||
|
||||
|
||||
def test_unicode():
|
||||
i = interface.Identity(u'ko\u017eu\u0161\u010dek@host', 'ed25519')
|
||||
i = interface.Identity('ko\u017eu\u0161\u010dek@host', 'ed25519')
|
||||
assert i.to_bytes() == b'kozuscek@host'
|
||||
assert sorted(i.items()) == [('host', 'host'), ('user', 'kozuscek')]
|
||||
|
||||
@@ -78,12 +78,12 @@ def test_server_thread():
|
||||
quit_event = threading.Event()
|
||||
|
||||
class FakeServer:
|
||||
def accept(self): # pylint: disable=no-self-use
|
||||
def accept(self):
|
||||
if not connections:
|
||||
raise socket.timeout()
|
||||
return connections.pop(), 'address'
|
||||
|
||||
def getsockname(self): # pylint: disable=no-self-use
|
||||
def getsockname(self):
|
||||
return 'fake_server'
|
||||
|
||||
def handle_conn(conn):
|
||||
@@ -102,7 +102,7 @@ def test_spawn():
|
||||
def thread(x):
|
||||
obj.append(x)
|
||||
|
||||
with server.spawn(thread, dict(x=1)):
|
||||
with server.spawn(thread, {'x': 1}):
|
||||
pass
|
||||
|
||||
assert obj == [1]
|
||||
|
||||
11
setup.py
11
setup.py
@@ -3,25 +3,30 @@ from setuptools import setup
|
||||
|
||||
setup(
|
||||
name='libagent',
|
||||
version='0.14.0',
|
||||
version='0.14.7',
|
||||
description='Using hardware wallets as SSH/GPG agent',
|
||||
author='Roman Zeyde',
|
||||
author_email='roman.zeyde@gmail.com',
|
||||
url='http://github.com/romanz/trezor-agent',
|
||||
packages=[
|
||||
'libagent',
|
||||
'libagent.age',
|
||||
'libagent.device',
|
||||
'libagent.gpg',
|
||||
'libagent.ssh'
|
||||
'libagent.signify',
|
||||
'libagent.ssh',
|
||||
],
|
||||
install_requires=[
|
||||
'bech32>=1.2.0',
|
||||
'cryptography>=3.4.6',
|
||||
'docutils>=0.14',
|
||||
'python-daemon>=2.3.0',
|
||||
'wheel>=0.32.3',
|
||||
'backports.shutil_which>=3.5.1',
|
||||
'ConfigArgParse>=0.12.1',
|
||||
'python-daemon>=2.1.2',
|
||||
'ecdsa>=0.13',
|
||||
'ed25519>=1.4',
|
||||
'pynacl>=1.4.0',
|
||||
'mnemonic>=0.18',
|
||||
'pymsgbox>=1.0.6',
|
||||
'semver>=2.2',
|
||||
|
||||
4
tox.ini
4
tox.ini
@@ -16,9 +16,9 @@ deps=
|
||||
isort
|
||||
commands=
|
||||
pycodestyle libagent
|
||||
# isort --skip-glob .tox -c -r libagent
|
||||
isort --skip-glob .tox -c libagent
|
||||
pylint --reports=no --rcfile .pylintrc libagent
|
||||
pydocstyle libagent
|
||||
coverage run --source libagent -m py.test -v libagent
|
||||
coverage run --source libagent -m pytest -v libagent
|
||||
coverage report
|
||||
coverage html
|
||||
|
||||
Reference in New Issue
Block a user