mirror of
https://github.com/romanz/amodem.git
synced 2026-05-03 08:27:26 +08:00
Compare commits
231 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
aeda85275d | ||
|
|
e41206b350 | ||
|
|
03650550dd | ||
|
|
f7b07070da | ||
|
|
96eede9c83 | ||
|
|
91146303a3 | ||
|
|
bf598435fb | ||
|
|
998c9ee958 | ||
|
|
d408a592aa | ||
|
|
282e91ace3 | ||
|
|
23c37cf1e3 | ||
|
|
5c5c6f9cbb | ||
|
|
17c8bd0e92 | ||
|
|
016e864503 | ||
|
|
0c4e67c837 | ||
|
|
adcbe6e7b2 | ||
|
|
73bdf417e4 | ||
|
|
ee347252b4 | ||
|
|
d63f048b78 | ||
|
|
05fada91d2 | ||
|
|
27a3fddfa2 | ||
|
|
45f6f1a3d8 | ||
|
|
c4c56b9faf | ||
|
|
1704ae7683 | ||
|
|
a7190223fd | ||
|
|
220735c6ad | ||
|
|
82e08d073b | ||
|
|
8ab0908388 | ||
|
|
fd3183d71c | ||
|
|
295d52ef10 | ||
|
|
8a51099488 | ||
|
|
f4dd1eacdd | ||
|
|
024b5f131f | ||
|
|
b9b7b8dafd | ||
|
|
744696fdee | ||
|
|
ccdbc7abfc | ||
|
|
e70f0ec681 | ||
|
|
aeaf978d8e | ||
|
|
d60fff202a | ||
|
|
9171dd08c8 | ||
|
|
4c5004d838 | ||
|
|
a2e46048a1 | ||
|
|
e66b0f47ed | ||
|
|
db874ad98f | ||
|
|
ed2d71cc08 | ||
|
|
59b39ce81f | ||
|
|
75f879edbb | ||
|
|
45a85a317b | ||
|
|
7b3874e6f7 | ||
|
|
6c96cc37b9 | ||
|
|
c98cb22ba4 | ||
|
|
d9fbfccd35 | ||
|
|
fe4d9ed3c8 | ||
|
|
092445af71 | ||
|
|
602e867c7d | ||
|
|
16de8cdabc | ||
|
|
7bbf11b631 | ||
|
|
3e41fddcef | ||
|
|
8108e5400d | ||
|
|
a1659e0f0d | ||
|
|
3b139314b6 | ||
|
|
a05cff5079 | ||
|
|
694cee17ac | ||
|
|
bc281d4411 | ||
|
|
04af6b737b | ||
|
|
171c746c7e | ||
|
|
8b5ac14150 | ||
|
|
16090cebed | ||
|
|
d2167cd4ff | ||
|
|
10cbe67c9a | ||
|
|
29a984eebb | ||
|
|
a6660fd5c5 | ||
|
|
2acd0bf3b7 | ||
|
|
e9f7894d62 | ||
|
|
56e9d7c776 | ||
|
|
e7bacf829c | ||
|
|
c1c679b541 | ||
|
|
49c343df94 | ||
|
|
7da7f5c256 | ||
|
|
39cb5565bf | ||
|
|
f89c5bb125 | ||
|
|
92649b290f | ||
|
|
d9b07e2ac6 | ||
|
|
6975671cc1 | ||
|
|
f0ea568bb8 | ||
|
|
34c614db6e | ||
|
|
2bbd335f7e | ||
|
|
af8ad99c7a | ||
|
|
313271ac06 | ||
|
|
969e08140b | ||
|
|
39f00af65d | ||
|
|
272759e907 | ||
|
|
4be55156ed | ||
|
|
80a5ea0f2a | ||
|
|
87e50449e5 | ||
|
|
dcf35c4267 | ||
|
|
7570861765 | ||
|
|
339f61c071 | ||
|
|
3c4fb7a17b | ||
|
|
a6a0c05f57 | ||
|
|
4c036d2ce7 | ||
|
|
eaa91cfdbd | ||
|
|
fd61941d0f | ||
|
|
decd3ddf75 | ||
|
|
4c07b360cd | ||
|
|
0b0f60dd89 | ||
|
|
db6903eab7 | ||
|
|
171a0c2f6a | ||
|
|
a535b31a1b | ||
|
|
ee4bcddd22 | ||
|
|
f626d34e21 | ||
|
|
2cf081420f | ||
|
|
0e72e3b7ff | ||
|
|
ce61c8b2ae | ||
|
|
3192e570ed | ||
|
|
bf8f516ef4 | ||
|
|
51f7d6120b | ||
|
|
0cb7cf0746 | ||
|
|
b4ff31f816 | ||
|
|
6e9d6d6430 | ||
|
|
fa9391ede6 | ||
|
|
ad8eafe6f8 | ||
|
|
695079e4b9 | ||
|
|
9888ef971a | ||
|
|
04a878374f | ||
|
|
4270d8464f | ||
|
|
25a427081c | ||
|
|
939fdbe829 | ||
|
|
1f126f3002 | ||
|
|
78526d1379 | ||
|
|
7e3c3b4f77 | ||
|
|
513c19bf1f | ||
|
|
f1e75783c4 | ||
|
|
68637525ea | ||
|
|
fce45832c2 | ||
|
|
df001c4100 | ||
|
|
1a228a1af6 | ||
|
|
7a7c9efc47 | ||
|
|
859cee9757 | ||
|
|
2846c0bf1a | ||
|
|
b2147a8418 | ||
|
|
4cbf8a9f0a | ||
|
|
d9c4e930f3 | ||
|
|
6fd6fe6520 | ||
|
|
4a7fef3011 | ||
|
|
a0e476ea19 | ||
|
|
683aae7aa4 | ||
|
|
d369638c7b | ||
|
|
07c4100618 | ||
|
|
b9f139b74a | ||
|
|
3bf926620b | ||
|
|
ab192619f4 | ||
|
|
f982d785bd | ||
|
|
38c1acf4db | ||
|
|
31c3686fa4 | ||
|
|
87ca33c104 | ||
|
|
c3d23ea7f5 | ||
|
|
5c04d17c43 | ||
|
|
2d2d6efa93 | ||
|
|
131c30acca | ||
|
|
a7ef263954 | ||
|
|
d486c1ee7b | ||
|
|
f35b5be3ac | ||
|
|
9ed9781496 | ||
|
|
5d007260e1 | ||
|
|
7dfa3ab255 | ||
|
|
b8eba72d0b | ||
|
|
492285de1b | ||
|
|
cc326b1f7d | ||
|
|
169ff39b1a | ||
|
|
dcc7ef2600 | ||
|
|
ac2d12b354 | ||
|
|
f3b49ff553 | ||
|
|
12d640c66b | ||
|
|
32984d2d3f | ||
|
|
a45c6c1300 | ||
|
|
1d3ba7e9b7 | ||
|
|
673b1df648 | ||
|
|
e63f03354e | ||
|
|
3c9c1b4e14 | ||
|
|
5caf4728ee | ||
|
|
dde6dcdaeb | ||
|
|
1f3c989884 | ||
|
|
55dea41959 | ||
|
|
ed01c00d0c | ||
|
|
e09571151c | ||
|
|
340aae4fb8 | ||
|
|
9875c9927e | ||
|
|
d9862ae0e1 | ||
|
|
5fb8b0e047 | ||
|
|
324fc21a5c | ||
|
|
e2f5ccafdf | ||
|
|
a0b4776374 | ||
|
|
5abc3dc41b | ||
|
|
3c2eb64e0d | ||
|
|
67d58a5ae0 | ||
|
|
9a435ae23e | ||
|
|
d7913a84d5 | ||
|
|
a114242243 | ||
|
|
b6dbc4aa81 | ||
|
|
6cc3a629a8 | ||
|
|
0c94363595 | ||
|
|
40377fc66b | ||
|
|
489c8fe357 | ||
|
|
6f4f33bfa5 | ||
|
|
76ce25fab1 | ||
|
|
5506310239 | ||
|
|
9dc955aae8 | ||
|
|
80f29469d0 | ||
|
|
fb368d24eb | ||
|
|
8c0848b459 | ||
|
|
276dec5728 | ||
|
|
74f7ebf228 | ||
|
|
7ef0958c33 | ||
|
|
1402918bb3 | ||
|
|
b6cfa0c03f | ||
|
|
33ff9ba667 | ||
|
|
ab64505cdb | ||
|
|
5651452c0d | ||
|
|
af6d0caf33 | ||
|
|
96592269b6 | ||
|
|
b2d078eec6 | ||
|
|
01dafb0ebd | ||
|
|
447faf973c | ||
|
|
add90e3c51 | ||
|
|
34670c601d | ||
|
|
b9ba4a3082 | ||
|
|
4335740abe | ||
|
|
861401e89a | ||
|
|
335d050212 | ||
|
|
6e1b08c27a |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -55,3 +55,6 @@ docs/_build/
|
||||
|
||||
# PyBuilder
|
||||
target/
|
||||
|
||||
# Sublime Text
|
||||
*.sublime-*
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
[MESSAGES CONTROL]
|
||||
disable=invalid-name, missing-docstring, locally-disabled
|
||||
disable=fixme, invalid-name, missing-docstring, locally-disabled, unbalanced-tuple-unpacking
|
||||
|
||||
@@ -3,10 +3,11 @@ language: python
|
||||
python:
|
||||
- "2.7"
|
||||
- "3.4"
|
||||
- "3.5"
|
||||
|
||||
install:
|
||||
- pip install ecdsa ed25519 semver # test without trezorlib for now
|
||||
- pip install pylint coverage pep8 pydocstyle
|
||||
- pip install -U pylint coverage pep8 pydocstyle # use latest tools
|
||||
|
||||
script:
|
||||
- pep8 trezor_agent
|
||||
|
||||
104
README-GPG.md
Normal file
104
README-GPG.md
Normal file
@@ -0,0 +1,104 @@
|
||||
Note: the GPG-related code is still under development, so please try the current implementation
|
||||
and feel free to [report any issue](https://github.com/romanz/trezor-agent/issues) you have encountered.
|
||||
Thanks!
|
||||
|
||||
# Installation
|
||||
|
||||
First, verify that you have GPG 2.1+ [installed](https://gist.github.com/vt0r/a2f8c0bcb1400131ff51):
|
||||
|
||||
```
|
||||
$ gpg2 --version | head -n1
|
||||
gpg (GnuPG) 2.1.11
|
||||
```
|
||||
|
||||
Update you TREZOR firmware to the latest version (at least [c720614](https://github.com/trezor/trezor-mcu/commit/c720614f6e9b9c07f446c95bda0257980d942871)).
|
||||
|
||||
Install latest `trezor-agent` package from [gpg-agent](https://github.com/romanz/trezor-agent/commits/gpg-agent) branch:
|
||||
```
|
||||
$ pip install --user git+https://github.com/romanz/trezor-agent.git
|
||||
```
|
||||
|
||||
Define your GPG user ID as an environment variable:
|
||||
```
|
||||
$ export TREZOR_GPG_USER_ID="John Doe <john@doe.bit>"
|
||||
```
|
||||
|
||||
There are two ways to generate TREZOR-based GPG public keys, as described below.
|
||||
|
||||
## 1. generate a new GPG identity:
|
||||
|
||||
```
|
||||
$ trezor-gpg create | gpg2 --import # use the TREZOR to confirm signing the primary key
|
||||
gpg: key 5E4D684D: public key "John Doe <john@doe.bit>" imported
|
||||
gpg: Total number processed: 1
|
||||
gpg: imported: 1
|
||||
|
||||
$ gpg2 --edit "${TREZOR_GPG_USER_ID}" trust # set this key to ultimate trust (option #5)
|
||||
|
||||
$ gpg2 -k
|
||||
/home/roman/.gnupg/pubring.kbx
|
||||
------------------------------
|
||||
pub nistp256/5E4D684D 2016-06-17 [SC]
|
||||
uid [ultimate] John Doe <john@doe.bit>
|
||||
sub nistp256/A31D9E25 2016-06-17 [E]
|
||||
```
|
||||
|
||||
## 2. generate a new subkey for an existing GPG identity:
|
||||
|
||||
```
|
||||
$ gpg2 -k # suppose there is already a GPG primary key
|
||||
/home/roman/.gnupg/pubring.kbx
|
||||
------------------------------
|
||||
pub rsa2048/87BB07B4 2016-06-17 [SC]
|
||||
uid [ultimate] John Doe <john@doe.bit>
|
||||
sub rsa2048/7176D31F 2016-06-17 [E]
|
||||
|
||||
$ trezor-gpg create --subkey | gpg2 --import # use the TREZOR to confirm signing the subkey
|
||||
gpg: key 87BB07B4: "John Doe <john@doe.bit>" 2 new signatures
|
||||
gpg: key 87BB07B4: "John Doe <john@doe.bit>" 2 new subkeys
|
||||
gpg: Total number processed: 1
|
||||
gpg: new subkeys: 2
|
||||
gpg: new signatures: 2
|
||||
|
||||
$ gpg2 -k
|
||||
/home/roman/.gnupg/pubring.kbx
|
||||
------------------------------
|
||||
pub rsa2048/87BB07B4 2016-06-17 [SC]
|
||||
uid [ultimate] John Doe <john@doe.bit>
|
||||
sub rsa2048/7176D31F 2016-06-17 [E]
|
||||
sub nistp256/DDE80B36 2016-06-17 [S]
|
||||
sub nistp256/E3D0BA19 2016-06-17 [E]
|
||||
```
|
||||
|
||||
# Usage examples:
|
||||
|
||||
## Start the TREZOR-based gpg-agent:
|
||||
```
|
||||
$ trezor-gpg agent &
|
||||
```
|
||||
Note: this agent intercepts all GPG requests, so make sure to close it (e.g. by using `killall trezor-gpg`),
|
||||
when you are done with the TREZOR-based GPG operations.
|
||||
|
||||
## Sign and verify GPG messages:
|
||||
```
|
||||
$ echo "Hello World!" | gpg2 --sign | gpg2 --verify
|
||||
gpg: Signature made Fri 17 Jun 2016 08:55:13 PM IDT using ECDSA key ID 5E4D684D
|
||||
gpg: Good signature from "Roman Zeyde <roman.zeyde@gmail.com>" [ultimate]
|
||||
```
|
||||
## Encrypt and decrypt GPG messages:
|
||||
```
|
||||
$ date | gpg2 --encrypt -r "${TREZOR_GPG_USER_ID}" | gpg2 --decrypt
|
||||
gpg: encrypted with 256-bit ECDH key, ID A31D9E25, created 2016-06-17
|
||||
"Roman Zeyde <roman.zeyde@gmail.com>"
|
||||
Fri Jun 17 20:55:31 IDT 2016
|
||||
```
|
||||
|
||||
## Git commit & tag signatures:
|
||||
Git can use GPG to sign and verify commits and tags (see [here](https://git-scm.com/book/en/v2/Git-Tools-Signing-Your-Work)):
|
||||
```
|
||||
$ git config --local gpg.program gpg2
|
||||
$ git commit --gpg-sign # create GPG-signed commit
|
||||
$ git log --show-signature -1 # verify commit signature
|
||||
$ git tag --sign "TAG" # create GPG-signed tag
|
||||
$ git verify-tag "TAG" # verify tag signature
|
||||
```
|
||||
45
README-SSH.md
Normal file
45
README-SSH.md
Normal file
@@ -0,0 +1,45 @@
|
||||
# Screencast demo usage
|
||||
|
||||
## Simple usage (single SSH session)
|
||||
[](https://asciinema.org/a/22959)
|
||||
|
||||
## Advanced usage (multiple SSH sessions from a sub-shell)
|
||||
[](https://asciinema.org/a/33240)
|
||||
|
||||
## Using for GitHub SSH authentication (via `trezor-git` utility)
|
||||
[](https://asciinema.org/a/38337)
|
||||
|
||||
# Public key generation
|
||||
|
||||
Run:
|
||||
|
||||
/tmp $ trezor-agent ssh.hostname.com -v > hostname.pub
|
||||
2015-09-02 15:03:18,929 INFO getting "ssh://ssh.hostname.com" public key from Trezor...
|
||||
2015-09-02 15:03:23,342 INFO disconnected from Trezor
|
||||
/tmp $ cat hostname.pub
|
||||
ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBGSevcDwmT+QaZPUEWUUjTeZRBICChxMKuJ7dRpBSF8+qt+8S1GBK5Zj8Xicc8SHG/SE/EXKUL2UU3kcUzE7ADQ= ssh://ssh.hostname.com
|
||||
|
||||
Append `hostname.pub` contents to `~/.ssh/authorized_keys`
|
||||
configuration file at `ssh.hostname.com`, so the remote server
|
||||
would allow you to login using the corresponding private key signature.
|
||||
|
||||
# Usage
|
||||
|
||||
Run:
|
||||
|
||||
/tmp $ trezor-agent ssh.hostname.com -v -c
|
||||
2015-09-02 15:09:39,782 INFO getting "ssh://ssh.hostname.com" public key from Trezor...
|
||||
2015-09-02 15:09:44,430 INFO please confirm user "roman" login to "ssh://ssh.hostname.com" using Trezor...
|
||||
2015-09-02 15:09:46,152 INFO signature status: OK
|
||||
Linux lmde 3.16.0-4-amd64 #1 SMP Debian 3.16.7-ckt11-1+deb8u3 (2015-08-04) x86_64
|
||||
|
||||
The programs included with the Debian GNU/Linux system are free software;
|
||||
the exact distribution terms for each program are described in the
|
||||
individual files in /usr/share/doc/*/copyright.
|
||||
|
||||
Debian GNU/Linux comes with ABSOLUTELY NO WARRANTY, to the extent
|
||||
permitted by applicable law.
|
||||
Last login: Tue Sep 1 15:57:05 2015 from localhost
|
||||
~ $
|
||||
|
||||
Make sure to confirm SSH signature on the Trezor device when requested.
|
||||
66
README.md
66
README.md
@@ -1,79 +1,47 @@
|
||||
# Using TREZOR as a hardware SSH agent
|
||||
# Using TREZOR as a hardware SSH/GPG agent
|
||||
|
||||
[](https://travis-ci.org/romanz/trezor-agent)
|
||||
[](https://pypi.python.org/pypi/trezor_agent/)
|
||||
[](https://pypi.python.org/pypi/trezor_agent/)
|
||||
[](https://pypi.python.org/pypi/trezor_agent/)
|
||||
[](https://pypi.python.org/pypi/trezor_agent/)
|
||||
[](https://gitter.im/romanz/trezor-agent)
|
||||
|
||||
See SatoshiLabs' blog post about this feature:
|
||||
See SatoshiLabs' blog posts about this feature:
|
||||
|
||||
- https://medium.com/@satoshilabs/trezor-firmware-1-3-4-enables-ssh-login-86a622d7e609
|
||||
|
||||
## Screencast demo usage
|
||||
|
||||
### Simple usage (single SSH session)
|
||||
[](https://asciinema.org/a/22959)
|
||||
|
||||
### Advanced usage (multiple SSH sessions from a sub-shell)
|
||||
[](https://asciinema.org/a/33240)
|
||||
|
||||
### Using for GitHub SSH authentication (via `trezor-git` utility)
|
||||
[](https://asciinema.org/a/38337)
|
||||
- [TREZOR Firmware 1.3.4 enables SSH login](https://medium.com/@satoshilabs/trezor-firmware-1-3-4-enables-ssh-login-86a622d7e609)
|
||||
- [TREZOR Firmware 1.3.6 — GPG Signing, SSH Login Updates and Advanced Transaction Features for Segwit](https://medium.com/@satoshilabs/trezor-firmware-1-3-6-20a7df6e692)
|
||||
|
||||
## Installation
|
||||
|
||||
First, make sure that the latest `trezorlib` Python package
|
||||
First, make sure that the latest [trezorlib](https://pypi.python.org/pypi/trezor) Python package
|
||||
is installed correctly (at least v0.6.6):
|
||||
|
||||
$ apt-get install python-dev libusb-1.0-0-dev libudev-dev
|
||||
$ pip install Cython trezor
|
||||
$ pip install -U setuptools
|
||||
|
||||
Then, install the latest `trezor_agent` package:
|
||||
Then, install the latest [trezor_agent](https://pypi.python.org/pypi/trezor_agent) package:
|
||||
|
||||
$ pip install trezor_agent
|
||||
|
||||
Finally, verify that you are running the latest TREZOR firmware version (at least v1.3.4):
|
||||
Finally, verify that you are running the latest [TREZOR firmware](https://mytrezor.com/data/firmware/releases.json) version (at least v1.4.0):
|
||||
|
||||
$ trezorctl get_features
|
||||
$ trezorctl get_features | head
|
||||
vendor: "bitcointrezor.com"
|
||||
major_version: 1
|
||||
minor_version: 3
|
||||
patch_version: 4
|
||||
minor_version: 4
|
||||
patch_version: 0
|
||||
...
|
||||
|
||||
## Public key generation
|
||||
|
||||
Run:
|
||||
|
||||
/tmp $ trezor-agent ssh.hostname.com -v > hostname.pub
|
||||
2015-09-02 15:03:18,929 INFO getting "ssh://ssh.hostname.com" public key from Trezor...
|
||||
2015-09-02 15:03:23,342 INFO disconnected from Trezor
|
||||
/tmp $ cat hostname.pub
|
||||
ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBGSevcDwmT+QaZPUEWUUjTeZRBICChxMKuJ7dRpBSF8+qt+8S1GBK5Zj8Xicc8SHG/SE/EXKUL2UU3kcUzE7ADQ= ssh://ssh.hostname.com
|
||||
|
||||
Append `hostname.pub` contents to `~/.ssh/authorized_keys`
|
||||
configuration file at `ssh.hostname.com`, so the remote server
|
||||
would allow you to login using the corresponding private key signature.
|
||||
|
||||
## Usage
|
||||
|
||||
Run:
|
||||
For SSH, see the [following instructions](README-SSH.md).
|
||||
|
||||
/tmp $ trezor-agent ssh.hostname.com -v -c
|
||||
2015-09-02 15:09:39,782 INFO getting "ssh://ssh.hostname.com" public key from Trezor...
|
||||
2015-09-02 15:09:44,430 INFO please confirm user "roman" login to "ssh://ssh.hostname.com" using Trezor...
|
||||
2015-09-02 15:09:46,152 INFO signature status: OK
|
||||
Linux lmde 3.16.0-4-amd64 #1 SMP Debian 3.16.7-ckt11-1+deb8u3 (2015-08-04) x86_64
|
||||
For GPG, see the [following instructions](README-GPG.md).
|
||||
|
||||
The programs included with the Debian GNU/Linux system are free software;
|
||||
the exact distribution terms for each program are described in the
|
||||
individual files in /usr/share/doc/*/copyright.
|
||||
Questions, suggestions and discussions are welcome: [](https://gitter.im/romanz/trezor-agent)
|
||||
|
||||
Debian GNU/Linux comes with ABSOLUTELY NO WARRANTY, to the extent
|
||||
permitted by applicable law.
|
||||
Last login: Tue Sep 1 15:57:05 2015 from localhost
|
||||
~ $
|
||||
## Troubleshooting
|
||||
|
||||
Make sure to confirm SSH signature on the Trezor device when requested.
|
||||
If there is an import problem with the installed `protobuf` package,
|
||||
see [this issue](https://github.com/romanz/trezor-agent/issues/28) for fixing it.
|
||||
|
||||
7
setup.py
7
setup.py
@@ -3,13 +3,13 @@ from setuptools import setup
|
||||
|
||||
setup(
|
||||
name='trezor_agent',
|
||||
version='0.6.5',
|
||||
version='0.7.0',
|
||||
description='Using Trezor as hardware SSH agent',
|
||||
author='Roman Zeyde',
|
||||
author_email='roman.zeyde@gmail.com',
|
||||
url='http://github.com/romanz/trezor-agent',
|
||||
packages=['trezor_agent'],
|
||||
install_requires=['ecdsa>=0.13', 'ed25519>=1.4', 'Cython>=0.23.4', 'trezor>=0.6.6', 'keepkey>=0.7.0', 'semver>=2.2'],
|
||||
packages=['trezor_agent', 'trezor_agent.gpg'],
|
||||
install_requires=['ecdsa>=0.13', 'ed25519>=1.4', 'Cython>=0.23.4', 'protobuf>=3.0.0', 'trezor>=0.7.4', 'semver>=2.2'],
|
||||
platforms=['POSIX'],
|
||||
classifiers=[
|
||||
'Environment :: Console',
|
||||
@@ -30,5 +30,6 @@ setup(
|
||||
entry_points={'console_scripts': [
|
||||
'trezor-agent = trezor_agent.__main__:run_agent',
|
||||
'trezor-git = trezor_agent.__main__:run_git',
|
||||
'trezor-gpg = trezor_agent.gpg.__main__:main',
|
||||
]},
|
||||
)
|
||||
|
||||
2
tox.ini
2
tox.ini
@@ -1,5 +1,7 @@
|
||||
[tox]
|
||||
envlist = py27,py34
|
||||
[pep8]
|
||||
max-line-length = 100
|
||||
[testenv]
|
||||
deps=
|
||||
pytest
|
||||
|
||||
@@ -2,11 +2,10 @@
|
||||
import argparse
|
||||
import functools
|
||||
import logging
|
||||
import re
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
|
||||
from . import client, formats, protocol, server
|
||||
|
||||
@@ -31,7 +30,7 @@ def create_parser():
|
||||
p = argparse.ArgumentParser()
|
||||
p.add_argument('-v', '--verbose', default=0, action='count')
|
||||
|
||||
curve_names = [name.decode('ascii') for name in formats.SUPPORTED_CURVES]
|
||||
curve_names = [name for name in formats.SUPPORTED_CURVES]
|
||||
curve_names = ', '.join(sorted(curve_names))
|
||||
p.add_argument('-e', '--ecdsa-curve-name', metavar='CURVE',
|
||||
default=formats.CURVE_NIST256,
|
||||
@@ -98,21 +97,15 @@ def git_host(remote_name, attributes):
|
||||
continue
|
||||
|
||||
url = matches[0].strip()
|
||||
user, url = url.split('@', 1)
|
||||
host, _ = url.split(':', 1) # skip unused path (1 key per user@host)
|
||||
return '{}@{}'.format(user, host)
|
||||
|
||||
|
||||
def ssh_sign(conn, label, blob):
|
||||
"""Perform SSH signature using given hardware device connection."""
|
||||
now = time.strftime('%Y-%m-%d %H:%M:%S')
|
||||
return conn.sign_ssh_challenge(label=label, blob=blob, visual=now)
|
||||
match = re.match('(?P<user>.*?)@(?P<host>.*?):(?P<path>.*)', url)
|
||||
if match:
|
||||
return '{user}@{host}'.format(**match.groupdict())
|
||||
|
||||
|
||||
def run_server(conn, public_key, command, debug, timeout):
|
||||
"""Common code for run_agent and run_git below."""
|
||||
try:
|
||||
signer = functools.partial(ssh_sign, conn=conn)
|
||||
signer = conn.sign_ssh_challenge
|
||||
public_key = formats.import_public_key(public_key)
|
||||
log.info('using SSH public key: %s', public_key['fingerprint'])
|
||||
handler = protocol.Handler(keys=[public_key], signer=signer,
|
||||
@@ -123,6 +116,19 @@ def run_server(conn, public_key, command, debug, timeout):
|
||||
log.info('server stopped')
|
||||
|
||||
|
||||
def handle_connection_error(func):
|
||||
"""Fail with non-zero exit code."""
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except IOError as e:
|
||||
log.error('Connection error: %s', e)
|
||||
return 1
|
||||
return wrapper
|
||||
|
||||
|
||||
@handle_connection_error
|
||||
def run_agent(client_factory=client.Client):
|
||||
"""Run ssh-agent using given hardware client factory."""
|
||||
args = create_agent_parser().parse_args()
|
||||
@@ -151,6 +157,7 @@ def run_agent(client_factory=client.Client):
|
||||
debug=args.debug, timeout=args.timeout)
|
||||
|
||||
|
||||
@handle_connection_error
|
||||
def run_git(client_factory=client.Client):
|
||||
"""Run git under ssh-agent using given hardware client factory."""
|
||||
args = create_git_parser().parse_args()
|
||||
@@ -159,7 +166,8 @@ def run_git(client_factory=client.Client):
|
||||
with client_factory(curve=args.ecdsa_curve_name) as conn:
|
||||
label = git_host(args.remote, ['pushurl', 'url'])
|
||||
if not label:
|
||||
log.error('Could not find "%s" remote in .git/config', args.remote)
|
||||
log.error('Could not find "%s" SSH remote in .git/config',
|
||||
args.remote)
|
||||
return
|
||||
|
||||
public_key = conn.get_public_key(label=label)
|
||||
|
||||
@@ -33,9 +33,8 @@ class Client(object):
|
||||
return self
|
||||
|
||||
def __exit__(self, *args):
|
||||
"""Forget PIN, shutdown screen and disconnect."""
|
||||
"""Keep the session open (doesn't forget PIN)."""
|
||||
log.info('disconnected from %s', self.device_name)
|
||||
self.client.clear_session()
|
||||
self.client.close()
|
||||
|
||||
def get_identity(self, label, index=0):
|
||||
@@ -51,7 +50,7 @@ class Client(object):
|
||||
label = identity_to_string(identity) # canonize key label
|
||||
log.info('getting "%s" public key (%s) from %s...',
|
||||
label, self.curve, self.device_name)
|
||||
addr = _get_address(identity)
|
||||
addr = get_address(identity)
|
||||
node = self.client.get_public_node(n=addr,
|
||||
ecdsa_curve_name=self.curve)
|
||||
|
||||
@@ -59,7 +58,7 @@ class Client(object):
|
||||
vk = formats.decompress_pubkey(pubkey=pubkey, curve_name=self.curve)
|
||||
return formats.export_public_key(vk=vk, label=label)
|
||||
|
||||
def sign_ssh_challenge(self, label, blob, visual=''):
|
||||
def sign_ssh_challenge(self, label, blob):
|
||||
"""Sign given blob using a private key, specified by the label."""
|
||||
identity = self.get_identity(label=label)
|
||||
msg = _parse_ssh_blob(blob)
|
||||
@@ -68,7 +67,6 @@ class Client(object):
|
||||
log.debug('nonce: %s', binascii.hexlify(msg['nonce']))
|
||||
log.debug('fingerprint: %s', msg['public_key']['fingerprint'])
|
||||
log.debug('hidden challenge size: %d bytes', len(blob))
|
||||
log.debug('visual challenge size: %d bytes = %r', len(visual), visual)
|
||||
|
||||
log.info('please confirm user "%s" login to "%s" using %s...',
|
||||
msg['user'], label, self.device_name)
|
||||
@@ -76,7 +74,7 @@ class Client(object):
|
||||
try:
|
||||
result = self.client.sign_identity(identity=identity,
|
||||
challenge_hidden=blob,
|
||||
challenge_visual=visual,
|
||||
challenge_visual='',
|
||||
ecdsa_curve_name=self.curve)
|
||||
except self.call_exception as e:
|
||||
code, msg = e.args
|
||||
@@ -129,7 +127,8 @@ def identity_to_string(identity):
|
||||
return ''.join(result)
|
||||
|
||||
|
||||
def _get_address(identity):
|
||||
def get_address(identity, ecdh=False):
|
||||
"""Compute BIP32 derivation address according to SLIP-0013/0017."""
|
||||
index = struct.pack('<L', identity.index)
|
||||
addr = index + identity_to_string(identity).encode('ascii')
|
||||
log.debug('address string: %r', addr)
|
||||
@@ -137,7 +136,8 @@ def _get_address(identity):
|
||||
s = io.BytesIO(bytearray(digest))
|
||||
|
||||
hardened = 0x80000000
|
||||
address_n = [13] + list(util.recv(s, '<LLLL'))
|
||||
addr_0 = [13, 17][bool(ecdh)]
|
||||
address_n = [addr_0] + list(util.recv(s, '<LLLL'))
|
||||
return [(hardened | value) for value in address_n]
|
||||
|
||||
|
||||
|
||||
@@ -44,33 +44,38 @@ def _load_client(name, client_type, hid_transport,
|
||||
|
||||
|
||||
def _load_trezor():
|
||||
# pylint: disable=import-error
|
||||
from trezorlib.client import TrezorClient, CallException
|
||||
from trezorlib.transport_hid import HidTransport
|
||||
from trezorlib.messages_pb2 import PassphraseAck
|
||||
from trezorlib.types_pb2 import IdentityType
|
||||
return _load_client(name='Trezor',
|
||||
client_type=TrezorClient,
|
||||
hid_transport=HidTransport,
|
||||
passphrase_ack=PassphraseAck,
|
||||
identity_type=IdentityType,
|
||||
required_version='>=1.3.4',
|
||||
call_exception=CallException)
|
||||
try:
|
||||
from trezorlib.client import TrezorClient, CallException
|
||||
from trezorlib.transport_hid import HidTransport
|
||||
from trezorlib.messages_pb2 import PassphraseAck
|
||||
from trezorlib.types_pb2 import IdentityType
|
||||
return _load_client(name='Trezor',
|
||||
client_type=TrezorClient,
|
||||
hid_transport=HidTransport,
|
||||
passphrase_ack=PassphraseAck,
|
||||
identity_type=IdentityType,
|
||||
required_version='>=1.4.0',
|
||||
call_exception=CallException)
|
||||
except ImportError:
|
||||
log.exception('Missing module: install via "pip install trezor"')
|
||||
|
||||
|
||||
def _load_keepkey():
|
||||
# pylint: disable=import-error
|
||||
from keepkeylib.client import KeepKeyClient, CallException
|
||||
from keepkeylib.transport_hid import HidTransport
|
||||
from keepkeylib.messages_pb2 import PassphraseAck
|
||||
from keepkeylib.types_pb2 import IdentityType
|
||||
return _load_client(name='KeepKey',
|
||||
client_type=KeepKeyClient,
|
||||
hid_transport=HidTransport,
|
||||
passphrase_ack=PassphraseAck,
|
||||
identity_type=IdentityType,
|
||||
required_version='>=1.0.4',
|
||||
call_exception=CallException)
|
||||
try:
|
||||
from keepkeylib.client import KeepKeyClient, CallException
|
||||
from keepkeylib.transport_hid import HidTransport
|
||||
from keepkeylib.messages_pb2 import PassphraseAck
|
||||
from keepkeylib.types_pb2 import IdentityType
|
||||
return _load_client(name='KeepKey',
|
||||
client_type=KeepKeyClient,
|
||||
hid_transport=HidTransport,
|
||||
passphrase_ack=PassphraseAck,
|
||||
identity_type=IdentityType,
|
||||
required_version='>=1.0.4',
|
||||
call_exception=CallException)
|
||||
except ImportError:
|
||||
log.exception('Missing module: install via "pip install keepkey"')
|
||||
|
||||
|
||||
LOADERS = [
|
||||
_load_trezor,
|
||||
@@ -83,7 +88,9 @@ def load(loaders=None):
|
||||
loaders = loaders if loaders is not None else LOADERS
|
||||
device_list = []
|
||||
for loader in loaders:
|
||||
device_list.extend(loader())
|
||||
device = loader()
|
||||
if device:
|
||||
device_list.extend(device)
|
||||
|
||||
if len(device_list) == 1:
|
||||
return device_list[0]
|
||||
|
||||
@@ -12,8 +12,8 @@ from . import util
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
# Supported ECDSA curves
|
||||
CURVE_NIST256 = b'nist256p1'
|
||||
CURVE_ED25519 = b'ed25519'
|
||||
CURVE_NIST256 = 'nist256p1'
|
||||
CURVE_ED25519 = 'ed25519'
|
||||
SUPPORTED_CURVES = {CURVE_NIST256, CURVE_ED25519}
|
||||
|
||||
# SSH key types
|
||||
@@ -41,7 +41,7 @@ def parse_pubkey(blob):
|
||||
"""
|
||||
Parse SSH public key from given blob.
|
||||
|
||||
Cnstruct a verifier for ECDSA signatures.
|
||||
Construct a verifier for ECDSA signatures.
|
||||
The verifier returns the signatures in the required SSH format.
|
||||
Currently, NIST256P1 and ED25519 elliptic curves are supported.
|
||||
"""
|
||||
|
||||
9
trezor_agent/gpg/__init__.py
Normal file
9
trezor_agent/gpg/__init__.py
Normal file
@@ -0,0 +1,9 @@
|
||||
"""
|
||||
TREZOR support for ECDSA GPG signatures.
|
||||
|
||||
See these links for more details:
|
||||
- https://www.gnupg.org/faq/whats-new-in-2.1.html
|
||||
- https://tools.ietf.org/html/rfc4880
|
||||
- https://tools.ietf.org/html/rfc6637
|
||||
- https://tools.ietf.org/html/draft-irtf-cfrg-eddsa-05
|
||||
"""
|
||||
99
trezor_agent/gpg/__main__.py
Executable file
99
trezor_agent/gpg/__main__.py
Executable file
@@ -0,0 +1,99 @@
|
||||
#!/usr/bin/env python
|
||||
"""Create signatures and export public keys for GPG using TREZOR."""
|
||||
import argparse
|
||||
import contextlib
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
|
||||
from . import agent, encode, keyring, protocol
|
||||
from .. import server
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def run_create(args):
|
||||
"""Generate a new pubkey for a new/existing GPG identity."""
|
||||
user_id = os.environ['TREZOR_GPG_USER_ID']
|
||||
log.warning('NOTE: in order to re-generate the exact same GPG key later, '
|
||||
'run this command with "--time=%d" commandline flag (to set '
|
||||
'the timestamp of the GPG key manually).', args.time)
|
||||
conn = encode.HardwareSigner(user_id=user_id,
|
||||
curve_name=args.ecdsa_curve)
|
||||
verifying_key = conn.pubkey(ecdh=False)
|
||||
decryption_key = conn.pubkey(ecdh=True)
|
||||
|
||||
if args.subkey:
|
||||
primary_bytes = keyring.export_public_key(user_id=user_id)
|
||||
# subkey for signing
|
||||
signing_key = protocol.PublicKey(
|
||||
curve_name=args.ecdsa_curve, created=args.time,
|
||||
verifying_key=verifying_key, ecdh=False)
|
||||
# subkey for encryption
|
||||
encryption_key = protocol.PublicKey(
|
||||
curve_name=args.ecdsa_curve, created=args.time,
|
||||
verifying_key=decryption_key, ecdh=True)
|
||||
result = encode.create_subkey(primary_bytes=primary_bytes,
|
||||
pubkey=signing_key,
|
||||
signer_func=conn.sign)
|
||||
result = encode.create_subkey(primary_bytes=result,
|
||||
pubkey=encryption_key,
|
||||
signer_func=conn.sign)
|
||||
else:
|
||||
# primary key for signing
|
||||
primary = protocol.PublicKey(
|
||||
curve_name=args.ecdsa_curve, created=args.time,
|
||||
verifying_key=verifying_key, ecdh=False)
|
||||
# subkey for encryption
|
||||
subkey = protocol.PublicKey(
|
||||
curve_name=args.ecdsa_curve, created=args.time,
|
||||
verifying_key=decryption_key, ecdh=True)
|
||||
|
||||
result = encode.create_primary(user_id=user_id,
|
||||
pubkey=primary,
|
||||
signer_func=conn.sign)
|
||||
result = encode.create_subkey(primary_bytes=result,
|
||||
pubkey=subkey,
|
||||
signer_func=conn.sign)
|
||||
|
||||
sys.stdout.write(protocol.armor(result, 'PUBLIC KEY BLOCK'))
|
||||
|
||||
|
||||
def run_agent(args): # pylint: disable=unused-argument
|
||||
"""Run a simple GPG-agent server."""
|
||||
sock_path = keyring.get_agent_sock_path()
|
||||
with server.unix_domain_socket_server(sock_path) as sock:
|
||||
for conn in agent.yield_connections(sock):
|
||||
with contextlib.closing(conn):
|
||||
agent.handle_connection(conn)
|
||||
|
||||
|
||||
def main():
|
||||
"""Main function."""
|
||||
p = argparse.ArgumentParser()
|
||||
p.add_argument('-v', '--verbose', action='store_true', default=False)
|
||||
subparsers = p.add_subparsers()
|
||||
subparsers.required = True
|
||||
subparsers.dest = 'command'
|
||||
|
||||
create_cmd = subparsers.add_parser('create')
|
||||
create_cmd.add_argument('-s', '--subkey', action='store_true', default=False)
|
||||
create_cmd.add_argument('-e', '--ecdsa-curve', default='nist256p1')
|
||||
create_cmd.add_argument('-t', '--time', type=int, default=int(time.time()))
|
||||
create_cmd.set_defaults(run=run_create)
|
||||
|
||||
agent_cmd = subparsers.add_parser('agent')
|
||||
agent_cmd.set_defaults(run=run_agent)
|
||||
|
||||
args = p.parse_args()
|
||||
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO,
|
||||
format='%(asctime)s %(levelname)-10s %(message)s')
|
||||
log.warning('This GPG tool is still in EXPERIMENTAL mode, '
|
||||
'so please note that the API and features may '
|
||||
'change without backwards compatibility!')
|
||||
args.run(args)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
136
trezor_agent/gpg/agent.py
Normal file
136
trezor_agent/gpg/agent.py
Normal file
@@ -0,0 +1,136 @@
|
||||
"""GPG-agent utilities."""
|
||||
import binascii
|
||||
import contextlib
|
||||
import logging
|
||||
import os
|
||||
|
||||
from . import decode, encode, keyring
|
||||
from .. import util
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def yield_connections(sock):
|
||||
"""Run a server on the specified socket."""
|
||||
while True:
|
||||
log.debug('waiting for connection on %s', sock.getsockname())
|
||||
try:
|
||||
conn, _ = sock.accept()
|
||||
except KeyboardInterrupt:
|
||||
return
|
||||
conn.settimeout(None)
|
||||
log.debug('accepted connection on %s', sock.getsockname())
|
||||
yield conn
|
||||
|
||||
|
||||
def serialize(data):
|
||||
"""Serialize data according to ASSUAN protocol."""
|
||||
for c in ['%', '\n', '\r']:
|
||||
data = data.replace(c, '%{:02X}'.format(ord(c)))
|
||||
return data
|
||||
|
||||
|
||||
def sig_encode(r, s):
|
||||
"""Serialize ECDSA signature data into GPG S-expression."""
|
||||
r = serialize(util.num2bytes(r, 32))
|
||||
s = serialize(util.num2bytes(s, 32))
|
||||
return '(7:sig-val(5:ecdsa(1:r32:{})(1:s32:{})))'.format(r, s)
|
||||
|
||||
|
||||
def pksign(keygrip, digest, algo):
|
||||
"""Sign a message digest using a private EC key."""
|
||||
assert algo == '8'
|
||||
user_id = os.environ['TREZOR_GPG_USER_ID']
|
||||
pubkey_dict = decode.load_public_key(
|
||||
pubkey_bytes=keyring.export_public_key(user_id=user_id),
|
||||
use_custom=True, ecdh=False)
|
||||
pubkey, conn = encode.load_from_public_key(pubkey_dict=pubkey_dict)
|
||||
with contextlib.closing(conn):
|
||||
assert pubkey.keygrip == binascii.unhexlify(keygrip)
|
||||
r, s = conn.sign(binascii.unhexlify(digest))
|
||||
result = sig_encode(r, s)
|
||||
log.debug('result: %r', result)
|
||||
return result
|
||||
|
||||
|
||||
def _serialize_point(data):
|
||||
data = '{}:'.format(len(data)) + data
|
||||
# https://www.gnupg.org/documentation/manuals/assuan/Server-responses.html
|
||||
for c in ['%', '\n', '\r']:
|
||||
data = data.replace(c, '%{:02X}'.format(ord(c)))
|
||||
return '(5:value' + data + ')'
|
||||
|
||||
|
||||
def parse_ecdh(line):
|
||||
"""Parse ECDH request and return remote public key."""
|
||||
prefix, line = line.split(' ', 1)
|
||||
assert prefix == 'D'
|
||||
exp, leftover = keyring.parse(keyring.unescape(line))
|
||||
log.debug('ECDH s-exp: %r', exp)
|
||||
assert not leftover
|
||||
label, exp = exp
|
||||
assert label == b'enc-val'
|
||||
assert exp[0] == b'ecdh'
|
||||
items = exp[1:]
|
||||
log.debug('ECDH parameters: %r', items)
|
||||
return dict(items)['e']
|
||||
|
||||
|
||||
def pkdecrypt(keygrip, conn):
|
||||
"""Handle decryption using ECDH."""
|
||||
for msg in [b'S INQUIRE_MAXLEN 4096', b'INQUIRE CIPHERTEXT']:
|
||||
keyring.sendline(conn, msg)
|
||||
|
||||
line = keyring.recvline(conn)
|
||||
assert keyring.recvline(conn) == b'END'
|
||||
remote_pubkey = parse_ecdh(line)
|
||||
|
||||
user_id = os.environ['TREZOR_GPG_USER_ID']
|
||||
local_pubkey = decode.load_public_key(
|
||||
pubkey_bytes=keyring.export_public_key(user_id=user_id),
|
||||
use_custom=True, ecdh=True)
|
||||
pubkey, conn = encode.load_from_public_key(pubkey_dict=local_pubkey)
|
||||
with contextlib.closing(conn):
|
||||
assert pubkey.keygrip == binascii.unhexlify(keygrip)
|
||||
shared_secret = conn.ecdh(remote_pubkey)
|
||||
|
||||
assert len(shared_secret) == 65
|
||||
assert shared_secret[:1] == b'\x04'
|
||||
return _serialize_point(shared_secret)
|
||||
|
||||
|
||||
def handle_connection(conn):
|
||||
"""Handle connection from GPG binary using the ASSUAN protocol."""
|
||||
keygrip = None
|
||||
digest = None
|
||||
algo = None
|
||||
version = keyring.gpg_version()
|
||||
|
||||
keyring.sendline(conn, b'OK')
|
||||
for line in keyring.iterlines(conn):
|
||||
parts = line.split(' ')
|
||||
command = parts[0]
|
||||
args = parts[1:]
|
||||
if command in {'RESET', 'OPTION', 'HAVEKEY', 'SETKEYDESC'}:
|
||||
pass # reply with OK
|
||||
elif command == 'GETINFO':
|
||||
keyring.sendline(conn, b'D ' + version)
|
||||
elif command == 'AGENT_ID':
|
||||
keyring.sendline(conn, b'D TREZOR')
|
||||
elif command in {'SIGKEY', 'SETKEY'}:
|
||||
keygrip, = args
|
||||
elif command == 'SETHASH':
|
||||
algo, digest = args
|
||||
elif command == 'PKSIGN':
|
||||
sig = pksign(keygrip, digest, algo)
|
||||
keyring.sendline(conn, b'D ' + sig)
|
||||
elif command == 'PKDECRYPT':
|
||||
sec = pkdecrypt(keygrip, conn)
|
||||
keyring.sendline(conn, b'D ' + sec)
|
||||
elif command == 'BYE':
|
||||
return
|
||||
else:
|
||||
log.error('unknown request: %r', line)
|
||||
return
|
||||
|
||||
keyring.sendline(conn, b'OK')
|
||||
390
trezor_agent/gpg/decode.py
Normal file
390
trezor_agent/gpg/decode.py
Normal file
@@ -0,0 +1,390 @@
|
||||
"""Decoders for GPG v2 data structures."""
|
||||
import base64
|
||||
import copy
|
||||
import functools
|
||||
import hashlib
|
||||
import io
|
||||
import logging
|
||||
import struct
|
||||
|
||||
import ecdsa
|
||||
import ed25519
|
||||
|
||||
from . import protocol
|
||||
from .. import util
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def parse_subpackets(s):
|
||||
"""See https://tools.ietf.org/html/rfc4880#section-5.2.3.1 for details."""
|
||||
subpackets = []
|
||||
total_size = s.readfmt('>H')
|
||||
data = s.read(total_size)
|
||||
s = util.Reader(io.BytesIO(data))
|
||||
|
||||
while True:
|
||||
try:
|
||||
first = s.readfmt('B')
|
||||
except EOFError:
|
||||
break
|
||||
|
||||
if first < 192:
|
||||
subpacket_len = first
|
||||
elif first < 255:
|
||||
subpacket_len = ((first - 192) << 8) + s.readfmt('B') + 192
|
||||
else: # first == 255
|
||||
subpacket_len = s.readfmt('>L')
|
||||
|
||||
subpackets.append(s.read(subpacket_len))
|
||||
|
||||
return subpackets
|
||||
|
||||
|
||||
def parse_mpi(s):
|
||||
"""See https://tools.ietf.org/html/rfc4880#section-3.2 for details."""
|
||||
bits = s.readfmt('>H')
|
||||
blob = bytearray(s.read(int((bits + 7) // 8)))
|
||||
return sum(v << (8 * i) for i, v in enumerate(reversed(blob)))
|
||||
|
||||
|
||||
def parse_mpis(s, n):
|
||||
"""Parse multiple MPIs from stream."""
|
||||
return [parse_mpi(s) for _ in range(n)]
|
||||
|
||||
|
||||
def _parse_nist256p1_verifier(mpi):
|
||||
prefix, x, y = util.split_bits(mpi, 4, 256, 256)
|
||||
assert prefix == 4
|
||||
point = ecdsa.ellipticcurve.Point(curve=ecdsa.NIST256p.curve,
|
||||
x=x, y=y)
|
||||
vk = ecdsa.VerifyingKey.from_public_point(
|
||||
point=point, curve=ecdsa.curves.NIST256p,
|
||||
hashfunc=hashlib.sha256)
|
||||
|
||||
def _nist256p1_verify(signature, digest):
|
||||
result = vk.verify_digest(signature=signature,
|
||||
digest=digest,
|
||||
sigdecode=lambda rs, order: rs)
|
||||
log.debug('nist256p1 ECDSA signature is OK (%s)', result)
|
||||
return _nist256p1_verify, vk
|
||||
|
||||
|
||||
def _parse_ed25519_verifier(mpi):
|
||||
prefix, value = util.split_bits(mpi, 8, 256)
|
||||
assert prefix == 0x40
|
||||
vk = ed25519.VerifyingKey(util.num2bytes(value, size=32))
|
||||
|
||||
def _ed25519_verify(signature, digest):
|
||||
sig = b''.join(util.num2bytes(val, size=32)
|
||||
for val in signature)
|
||||
result = vk.verify(sig, digest)
|
||||
log.debug('ed25519 ECDSA signature is OK (%s)', result)
|
||||
return _ed25519_verify, vk
|
||||
|
||||
|
||||
SUPPORTED_CURVES = {
|
||||
b'\x2A\x86\x48\xCE\x3D\x03\x01\x07': _parse_nist256p1_verifier,
|
||||
b'\x2B\x06\x01\x04\x01\xDA\x47\x0F\x01': _parse_ed25519_verifier,
|
||||
}
|
||||
|
||||
RSA_ALGO_IDS = {1, 2, 3}
|
||||
ELGAMAL_ALGO_ID = 16
|
||||
DSA_ALGO_ID = 17
|
||||
ECDSA_ALGO_IDS = {18, 19, 22} # {ecdsa, nist256, ed25519}
|
||||
|
||||
|
||||
def _parse_literal(stream):
|
||||
"""See https://tools.ietf.org/html/rfc4880#section-5.9 for details."""
|
||||
p = {'type': 'literal'}
|
||||
p['format'] = stream.readfmt('c')
|
||||
filename_len = stream.readfmt('B')
|
||||
p['filename'] = stream.read(filename_len)
|
||||
p['date'] = stream.readfmt('>L')
|
||||
p['content'] = stream.read()
|
||||
p['_to_hash'] = p['content']
|
||||
return p
|
||||
|
||||
|
||||
def _parse_embedded_signatures(subpackets):
|
||||
for packet in subpackets:
|
||||
data = bytearray(packet)
|
||||
if data[0] == 32:
|
||||
# https://tools.ietf.org/html/rfc4880#section-5.2.3.26
|
||||
stream = io.BytesIO(data[1:])
|
||||
yield _parse_signature(util.Reader(stream))
|
||||
|
||||
|
||||
def _parse_signature(stream):
|
||||
"""See https://tools.ietf.org/html/rfc4880#section-5.2 for details."""
|
||||
p = {'type': 'signature'}
|
||||
|
||||
to_hash = io.BytesIO()
|
||||
with stream.capture(to_hash):
|
||||
p['version'] = stream.readfmt('B')
|
||||
p['sig_type'] = stream.readfmt('B')
|
||||
p['pubkey_alg'] = stream.readfmt('B')
|
||||
p['hash_alg'] = stream.readfmt('B')
|
||||
p['hashed_subpackets'] = parse_subpackets(stream)
|
||||
|
||||
# https://tools.ietf.org/html/rfc4880#section-5.2.4
|
||||
tail_to_hash = b'\x04\xff' + struct.pack('>L', to_hash.tell())
|
||||
|
||||
p['_to_hash'] = to_hash.getvalue() + tail_to_hash
|
||||
|
||||
p['unhashed_subpackets'] = parse_subpackets(stream)
|
||||
embedded = list(_parse_embedded_signatures(p['unhashed_subpackets']))
|
||||
if embedded:
|
||||
log.debug('embedded sigs: %s', embedded)
|
||||
p['embedded'] = embedded
|
||||
|
||||
p['_is_custom'] = (protocol.CUSTOM_SUBPACKET in p['unhashed_subpackets'])
|
||||
|
||||
p['hash_prefix'] = stream.readfmt('2s')
|
||||
if p['pubkey_alg'] in ECDSA_ALGO_IDS:
|
||||
p['sig'] = (parse_mpi(stream), parse_mpi(stream))
|
||||
elif p['pubkey_alg'] in RSA_ALGO_IDS: # RSA
|
||||
p['sig'] = (parse_mpi(stream),)
|
||||
elif p['pubkey_alg'] == DSA_ALGO_ID:
|
||||
p['sig'] = (parse_mpi(stream), parse_mpi(stream))
|
||||
else:
|
||||
log.error('unsupported public key algo: %d', p['pubkey_alg'])
|
||||
|
||||
assert not stream.read()
|
||||
return p
|
||||
|
||||
|
||||
def _parse_pubkey(stream, packet_type='pubkey'):
|
||||
"""See https://tools.ietf.org/html/rfc4880#section-5.5 for details."""
|
||||
p = {'type': packet_type}
|
||||
packet = io.BytesIO()
|
||||
with stream.capture(packet):
|
||||
p['version'] = stream.readfmt('B')
|
||||
p['created'] = stream.readfmt('>L')
|
||||
p['algo'] = stream.readfmt('B')
|
||||
if p['algo'] in ECDSA_ALGO_IDS:
|
||||
log.debug('parsing elliptic curve key')
|
||||
# https://tools.ietf.org/html/rfc6637#section-11
|
||||
oid_size = stream.readfmt('B')
|
||||
oid = stream.read(oid_size)
|
||||
assert oid in SUPPORTED_CURVES, util.hexlify(oid)
|
||||
parser = SUPPORTED_CURVES[oid]
|
||||
|
||||
mpi = parse_mpi(stream)
|
||||
log.debug('mpi: %x (%d bits)', mpi, mpi.bit_length())
|
||||
p['verifier'], p['verifying_key'] = parser(mpi)
|
||||
leftover = stream.read()
|
||||
if leftover:
|
||||
leftover = io.BytesIO(leftover)
|
||||
# https://tools.ietf.org/html/rfc6637#section-8
|
||||
# should be b'\x03\x01\x08\x07': SHA256 + AES128
|
||||
size, = util.readfmt(leftover, 'B')
|
||||
p['kdf'] = leftover.read(size)
|
||||
assert not leftover.read()
|
||||
elif p['algo'] == DSA_ALGO_ID:
|
||||
log.warning('DSA signatures are not verified')
|
||||
parse_mpis(stream, n=4)
|
||||
elif p['algo'] == ELGAMAL_ALGO_ID:
|
||||
log.warning('ElGamal signatures are not verified')
|
||||
parse_mpis(stream, n=3)
|
||||
else: # assume RSA
|
||||
log.warning('RSA signatures are not verified')
|
||||
parse_mpis(stream, n=2)
|
||||
assert not stream.read()
|
||||
|
||||
# https://tools.ietf.org/html/rfc4880#section-12.2
|
||||
packet_data = packet.getvalue()
|
||||
data_to_hash = (b'\x99' + struct.pack('>H', len(packet_data)) +
|
||||
packet_data)
|
||||
p['key_id'] = hashlib.sha1(data_to_hash).digest()[-8:]
|
||||
p['_to_hash'] = data_to_hash
|
||||
log.debug('key ID: %s', util.hexlify(p['key_id']))
|
||||
return p
|
||||
|
||||
_parse_subkey = functools.partial(_parse_pubkey, packet_type='subkey')
|
||||
|
||||
|
||||
def _parse_user_id(stream, packet_type='user_id'):
|
||||
"""See https://tools.ietf.org/html/rfc4880#section-5.11 for details."""
|
||||
value = stream.read()
|
||||
to_hash = b'\xb4' + util.prefix_len('>L', value)
|
||||
return {'type': packet_type, 'value': value, '_to_hash': to_hash}
|
||||
|
||||
# User attribute is handled as an opaque user ID
|
||||
_parse_attribute = functools.partial(_parse_user_id,
|
||||
packet_type='user_attribute')
|
||||
|
||||
PACKET_TYPES = {
|
||||
2: _parse_signature,
|
||||
6: _parse_pubkey,
|
||||
11: _parse_literal,
|
||||
13: _parse_user_id,
|
||||
14: _parse_subkey,
|
||||
17: _parse_attribute,
|
||||
}
|
||||
|
||||
|
||||
def parse_packets(stream):
|
||||
"""
|
||||
Support iterative parsing of available GPG packets.
|
||||
|
||||
See https://tools.ietf.org/html/rfc4880#section-4.2 for details.
|
||||
"""
|
||||
reader = util.Reader(stream)
|
||||
while True:
|
||||
try:
|
||||
value = reader.readfmt('B')
|
||||
except EOFError:
|
||||
return
|
||||
|
||||
log.debug('prefix byte: %s', bin(value))
|
||||
assert util.bit(value, 7) == 1
|
||||
|
||||
tag = util.low_bits(value, 6)
|
||||
if util.bit(value, 6) == 0:
|
||||
length_type = util.low_bits(tag, 2)
|
||||
tag = tag >> 2
|
||||
fmt = {0: '>B', 1: '>H', 2: '>L'}[length_type]
|
||||
packet_size = reader.readfmt(fmt)
|
||||
else:
|
||||
first = reader.readfmt('B')
|
||||
if first < 192:
|
||||
packet_size = first
|
||||
elif first < 224:
|
||||
packet_size = ((first - 192) << 8) + reader.readfmt('B') + 192
|
||||
elif first == 255:
|
||||
packet_size = reader.readfmt('>L')
|
||||
else:
|
||||
log.error('Partial Body Lengths unsupported')
|
||||
|
||||
log.debug('packet length: %d', packet_size)
|
||||
packet_data = reader.read(packet_size)
|
||||
packet_type = PACKET_TYPES.get(tag)
|
||||
|
||||
if packet_type is not None:
|
||||
p = packet_type(util.Reader(io.BytesIO(packet_data)))
|
||||
p['tag'] = tag
|
||||
else:
|
||||
p = {'type': 'unknown', 'tag': tag, 'raw': packet_data}
|
||||
|
||||
log.debug('packet "%s": %s', p['type'], p)
|
||||
yield p
|
||||
|
||||
|
||||
def digest_packets(packets, hasher):
|
||||
"""Compute digest on specified packets, according to '_to_hash' field."""
|
||||
data_to_hash = io.BytesIO()
|
||||
for p in packets:
|
||||
data_to_hash.write(p['_to_hash'])
|
||||
hasher.update(data_to_hash.getvalue())
|
||||
return hasher.digest()
|
||||
|
||||
|
||||
def collect_packets(packets, types_to_collect):
|
||||
"""Collect specified packet types into their leading packet."""
|
||||
packet = None
|
||||
result = []
|
||||
for p in packets:
|
||||
if p['type'] in types_to_collect:
|
||||
packet.setdefault(p['type'], []).append(p)
|
||||
else:
|
||||
packet = copy.deepcopy(p)
|
||||
result.append(packet)
|
||||
return result
|
||||
|
||||
|
||||
def parse_public_keys(stream):
|
||||
"""Parse GPG public key into hierarchy of packets."""
|
||||
packets = list(parse_packets(stream))
|
||||
packets = collect_packets(packets, {'signature'})
|
||||
packets = collect_packets(packets, {'user_id', 'user_attribute'})
|
||||
packets = collect_packets(packets, {'subkey'})
|
||||
return packets
|
||||
|
||||
|
||||
HASH_ALGORITHMS = {
|
||||
1: 'md5',
|
||||
2: 'sha1',
|
||||
3: 'ripemd160',
|
||||
8: 'sha256',
|
||||
9: 'sha384',
|
||||
10: 'sha512',
|
||||
11: 'sha224',
|
||||
}
|
||||
|
||||
|
||||
def load_public_key(pubkey_bytes, use_custom=False, ecdh=False):
|
||||
"""Parse and validate GPG public key from an input stream."""
|
||||
stream = io.BytesIO(pubkey_bytes)
|
||||
packets = list(parse_packets(stream))
|
||||
pubkey, userid, signature = packets[:3]
|
||||
packets = packets[3:]
|
||||
|
||||
hash_alg = HASH_ALGORITHMS.get(signature['hash_alg'])
|
||||
if hash_alg is not None:
|
||||
digest = digest_packets(packets=[pubkey, userid, signature],
|
||||
hasher=hashlib.new(hash_alg))
|
||||
assert signature['hash_prefix'] == digest[:2]
|
||||
|
||||
log.debug('loaded public key "%s"', userid['value'])
|
||||
if hash_alg is not None and pubkey.get('verifier'):
|
||||
verify_digest(pubkey=pubkey, digest=digest,
|
||||
signature=signature['sig'], label='GPG public key')
|
||||
else:
|
||||
log.warning('public key %s is not verified!',
|
||||
util.hexlify(pubkey['key_id']))
|
||||
|
||||
packet = pubkey
|
||||
while use_custom:
|
||||
if packet['type'] in ('pubkey', 'subkey') and signature['_is_custom']:
|
||||
if ecdh == (packet['algo'] == protocol.ECDH_ALGO_ID):
|
||||
log.debug('found custom %s', packet['type'])
|
||||
break
|
||||
|
||||
while packets[1]['type'] != 'signature':
|
||||
packets = packets[1:]
|
||||
packet, signature = packets[:2]
|
||||
packets = packets[2:]
|
||||
|
||||
packet['user_id'] = userid['value']
|
||||
packet['_is_custom'] = signature['_is_custom']
|
||||
return packet
|
||||
|
||||
|
||||
def load_signature(stream, original_data):
|
||||
"""Load signature from stream, and compute GPG digest for verification."""
|
||||
signature, = list(parse_packets((stream)))
|
||||
hash_alg = HASH_ALGORITHMS[signature['hash_alg']]
|
||||
digest = digest_packets([{'_to_hash': original_data}, signature],
|
||||
hasher=hashlib.new(hash_alg))
|
||||
assert signature['hash_prefix'] == digest[:2]
|
||||
return signature, digest
|
||||
|
||||
|
||||
def verify_digest(pubkey, digest, signature, label):
|
||||
"""Verify a digest signature from a specified public key."""
|
||||
verifier = pubkey['verifier']
|
||||
try:
|
||||
verifier(signature, digest)
|
||||
log.debug('%s is OK', label)
|
||||
except ecdsa.keys.BadSignatureError:
|
||||
log.error('Bad %s!', label)
|
||||
raise ValueError('Invalid ECDSA signature for {}'.format(label))
|
||||
|
||||
|
||||
def remove_armor(armored_data):
|
||||
"""Decode armored data into its binary form."""
|
||||
stream = io.BytesIO(armored_data)
|
||||
lines = stream.readlines()[3:-1]
|
||||
data = base64.b64decode(b''.join(lines))
|
||||
payload, checksum = data[:-3], data[-3:]
|
||||
assert util.crc24(payload) == checksum
|
||||
return payload
|
||||
|
||||
|
||||
def verify(pubkey, signature, original_data):
|
||||
"""Verify correctness of public key and signature."""
|
||||
stream = io.BytesIO(remove_armor(signature))
|
||||
signature, digest = load_signature(stream, original_data)
|
||||
verify_digest(pubkey=pubkey, digest=digest,
|
||||
signature=signature['sig'], label='GPG signature')
|
||||
193
trezor_agent/gpg/encode.py
Normal file
193
trezor_agent/gpg/encode.py
Normal file
@@ -0,0 +1,193 @@
|
||||
"""Create GPG ECDSA signatures and public keys using TREZOR device."""
|
||||
import logging
|
||||
import time
|
||||
|
||||
from . import decode, keyring, protocol
|
||||
from .. import client, factory, formats, util
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HardwareSigner(object):
|
||||
"""Sign messages and get public keys from a hardware device."""
|
||||
|
||||
def __init__(self, user_id, curve_name):
|
||||
"""Connect to the device and retrieve required public key."""
|
||||
self.client_wrapper = factory.load()
|
||||
self.identity = self.client_wrapper.identity_type()
|
||||
self.identity.proto = 'gpg'
|
||||
self.identity.host = user_id
|
||||
self.curve_name = curve_name
|
||||
|
||||
def pubkey(self, ecdh=False):
|
||||
"""Return public key as VerifyingKey object."""
|
||||
addr = client.get_address(identity=self.identity, ecdh=ecdh)
|
||||
public_node = self.client_wrapper.connection.get_public_node(
|
||||
n=addr, ecdsa_curve_name=self.curve_name)
|
||||
|
||||
return formats.decompress_pubkey(
|
||||
pubkey=public_node.node.public_key,
|
||||
curve_name=self.curve_name)
|
||||
|
||||
def sign(self, digest):
|
||||
"""Sign the digest and return a serialized signature."""
|
||||
result = self.client_wrapper.connection.sign_identity(
|
||||
identity=self.identity,
|
||||
challenge_hidden=digest,
|
||||
challenge_visual='',
|
||||
ecdsa_curve_name=self.curve_name)
|
||||
assert result.signature[:1] == b'\x00'
|
||||
sig = result.signature[1:]
|
||||
return (util.bytes2num(sig[:32]), util.bytes2num(sig[32:]))
|
||||
|
||||
def ecdh(self, pubkey):
|
||||
"""Derive shared secret using ECDH from remote public key."""
|
||||
result = self.client_wrapper.connection.get_ecdh_session_key(
|
||||
identity=self.identity,
|
||||
peer_public_key=pubkey,
|
||||
ecdsa_curve_name=self.curve_name)
|
||||
assert len(result.session_key) == 65
|
||||
assert result.session_key[:1] == b'\x04'
|
||||
return result.session_key
|
||||
|
||||
def close(self):
|
||||
"""Close the connection to the device."""
|
||||
self.client_wrapper.connection.clear_session()
|
||||
self.client_wrapper.connection.close()
|
||||
|
||||
|
||||
class AgentSigner(object):
|
||||
"""Sign messages and get public keys using gpg-agent tool."""
|
||||
|
||||
def __init__(self, user_id):
|
||||
"""Connect to the agent and retrieve required public key."""
|
||||
self.sock = keyring.connect_to_agent()
|
||||
self.keygrip = keyring.get_keygrip(user_id)
|
||||
|
||||
def sign(self, digest):
|
||||
"""Sign the digest and return an ECDSA/RSA/DSA signature."""
|
||||
return keyring.sign_digest(sock=self.sock,
|
||||
keygrip=self.keygrip, digest=digest)
|
||||
|
||||
def close(self):
|
||||
"""Close the connection to gpg-agent."""
|
||||
self.sock.close()
|
||||
|
||||
|
||||
def _time_format(t):
|
||||
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(t))
|
||||
|
||||
|
||||
def create_primary(user_id, pubkey, signer_func):
|
||||
"""Export new primary GPG public key, ready for "gpg2 --import"."""
|
||||
pubkey_packet = protocol.packet(tag=6, blob=pubkey.data())
|
||||
user_id_packet = protocol.packet(tag=13,
|
||||
blob=user_id.encode('ascii'))
|
||||
|
||||
data_to_sign = (pubkey.data_to_hash() +
|
||||
user_id_packet[:1] +
|
||||
util.prefix_len('>L', user_id.encode('ascii')))
|
||||
log.info('creating primary GPG key "%s"', user_id)
|
||||
hashed_subpackets = [
|
||||
protocol.subpacket_time(pubkey.created), # signature time
|
||||
# https://tools.ietf.org/html/rfc4880#section-5.2.3.7
|
||||
protocol.subpacket_byte(0x0B, 9), # preferred symmetric algo (AES-256)
|
||||
# https://tools.ietf.org/html/rfc4880#section-5.2.3.4
|
||||
protocol.subpacket_byte(0x1B, 1 | 2), # key flags (certify & sign)
|
||||
# https://tools.ietf.org/html/rfc4880#section-5.2.3.21
|
||||
protocol.subpacket_byte(0x15, 8), # preferred hash (SHA256)
|
||||
# https://tools.ietf.org/html/rfc4880#section-5.2.3.8
|
||||
protocol.subpacket_byte(0x16, 0), # preferred compression (none)
|
||||
# https://tools.ietf.org/html/rfc4880#section-5.2.3.9
|
||||
protocol.subpacket_byte(0x17, 0x80) # key server prefs (no-modify)
|
||||
# https://tools.ietf.org/html/rfc4880#section-5.2.3.17
|
||||
]
|
||||
unhashed_subpackets = [
|
||||
protocol.subpacket(16, pubkey.key_id()), # issuer key id
|
||||
protocol.CUSTOM_SUBPACKET]
|
||||
|
||||
log.info('confirm signing with primary key')
|
||||
signature = protocol.make_signature(
|
||||
signer_func=signer_func,
|
||||
public_algo=pubkey.algo_id,
|
||||
data_to_sign=data_to_sign,
|
||||
sig_type=0x13, # user id & public key
|
||||
hashed_subpackets=hashed_subpackets,
|
||||
unhashed_subpackets=unhashed_subpackets)
|
||||
|
||||
sign_packet = protocol.packet(tag=2, blob=signature)
|
||||
return pubkey_packet + user_id_packet + sign_packet
|
||||
|
||||
|
||||
def create_subkey(primary_bytes, pubkey, signer_func):
|
||||
"""Export new subkey to GPG primary key."""
|
||||
subkey_packet = protocol.packet(tag=14, blob=pubkey.data())
|
||||
primary = decode.load_public_key(primary_bytes)
|
||||
log.info('adding subkey to primary GPG key "%s"', primary['user_id'])
|
||||
data_to_sign = primary['_to_hash'] + pubkey.data_to_hash()
|
||||
|
||||
if pubkey.ecdh:
|
||||
embedded_sig = None
|
||||
else:
|
||||
# Primary Key Binding Signature
|
||||
hashed_subpackets = [
|
||||
protocol.subpacket_time(pubkey.created)] # signature time
|
||||
unhashed_subpackets = [
|
||||
protocol.subpacket(16, pubkey.key_id())] # issuer key id
|
||||
log.info('confirm signing with new subkey')
|
||||
embedded_sig = protocol.make_signature(
|
||||
signer_func=signer_func,
|
||||
data_to_sign=data_to_sign,
|
||||
public_algo=pubkey.algo_id,
|
||||
sig_type=0x19,
|
||||
hashed_subpackets=hashed_subpackets,
|
||||
unhashed_subpackets=unhashed_subpackets)
|
||||
|
||||
# Subkey Binding Signature
|
||||
|
||||
# Key flags: https://tools.ietf.org/html/rfc4880#section-5.2.3.21
|
||||
# (certify & sign) (encrypt)
|
||||
flags = (2) if (not pubkey.ecdh) else (4 | 8)
|
||||
|
||||
hashed_subpackets = [
|
||||
protocol.subpacket_time(pubkey.created), # signature time
|
||||
protocol.subpacket_byte(0x1B, flags)]
|
||||
|
||||
unhashed_subpackets = []
|
||||
unhashed_subpackets.append(protocol.subpacket(16, primary['key_id']))
|
||||
if embedded_sig is not None:
|
||||
unhashed_subpackets.append(protocol.subpacket(32, embedded_sig))
|
||||
unhashed_subpackets.append(protocol.CUSTOM_SUBPACKET)
|
||||
|
||||
log.info('confirm signing with primary key')
|
||||
if not primary['_is_custom']:
|
||||
signer_func = AgentSigner(primary['user_id']).sign
|
||||
|
||||
signature = protocol.make_signature(
|
||||
signer_func=signer_func,
|
||||
data_to_sign=data_to_sign,
|
||||
public_algo=primary['algo'],
|
||||
sig_type=0x18,
|
||||
hashed_subpackets=hashed_subpackets,
|
||||
unhashed_subpackets=unhashed_subpackets)
|
||||
sign_packet = protocol.packet(tag=2, blob=signature)
|
||||
return primary_bytes + subkey_packet + sign_packet
|
||||
|
||||
|
||||
def load_from_public_key(pubkey_dict):
|
||||
"""Load correct public key from the device."""
|
||||
user_id = pubkey_dict['user_id']
|
||||
created = pubkey_dict['created']
|
||||
curve_name = protocol.find_curve_by_algo_id(pubkey_dict['algo'])
|
||||
assert curve_name in formats.SUPPORTED_CURVES
|
||||
ecdh = (pubkey_dict['algo'] == protocol.ECDH_ALGO_ID)
|
||||
|
||||
conn = HardwareSigner(user_id, curve_name=curve_name)
|
||||
pubkey = protocol.PublicKey(
|
||||
curve_name=curve_name, created=created,
|
||||
verifying_key=conn.pubkey(ecdh=ecdh), ecdh=ecdh)
|
||||
assert pubkey.key_id() == pubkey_dict['key_id']
|
||||
log.info('%s created at %s for "%s"',
|
||||
pubkey, _time_format(pubkey.created), user_id)
|
||||
|
||||
return pubkey, conn
|
||||
205
trezor_agent/gpg/keyring.py
Normal file
205
trezor_agent/gpg/keyring.py
Normal file
@@ -0,0 +1,205 @@
|
||||
"""Tools for doing signature using gpg-agent."""
|
||||
|
||||
import binascii
|
||||
import io
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import socket
|
||||
import subprocess
|
||||
|
||||
from .. import util
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_agent_sock_path(sp=subprocess):
|
||||
"""Parse gpgconf output to find out GPG agent UNIX socket path."""
|
||||
lines = sp.check_output(['gpgconf', '--list-dirs']).strip().split('\n')
|
||||
dirs = dict(line.split(':', 1) for line in lines)
|
||||
return dirs['agent-socket']
|
||||
|
||||
|
||||
def connect_to_agent(sp=subprocess):
|
||||
"""Connect to GPG agent's UNIX socket."""
|
||||
sock_path = get_agent_sock_path(sp=sp)
|
||||
sp.check_call(['gpg-connect-agent', '/bye'])
|
||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
sock.connect(sock_path)
|
||||
return sock
|
||||
|
||||
|
||||
def communicate(sock, msg):
|
||||
"""Send a message and receive a single line."""
|
||||
sendline(sock, msg.encode('ascii'))
|
||||
return recvline(sock)
|
||||
|
||||
|
||||
def sendline(sock, msg):
|
||||
"""Send a binary message, followed by EOL."""
|
||||
log.debug('<- %r', msg)
|
||||
sock.sendall(msg + b'\n')
|
||||
|
||||
|
||||
def recvline(sock):
|
||||
"""Receive a single line from the socket."""
|
||||
reply = io.BytesIO()
|
||||
|
||||
while True:
|
||||
c = sock.recv(1)
|
||||
if not c:
|
||||
return None # socket is closed
|
||||
|
||||
if c == b'\n':
|
||||
break
|
||||
reply.write(c)
|
||||
|
||||
result = reply.getvalue()
|
||||
log.debug('-> %r', result)
|
||||
return result
|
||||
|
||||
|
||||
def iterlines(conn):
|
||||
"""Iterate over input, split by lines."""
|
||||
while True:
|
||||
line = recvline(conn)
|
||||
if line is None:
|
||||
break
|
||||
yield line
|
||||
|
||||
|
||||
def unescape(s):
|
||||
"""Unescape ASSUAN message (0xAB <-> '%AB')."""
|
||||
s = bytearray(s)
|
||||
i = 0
|
||||
while i < len(s):
|
||||
if s[i] == ord('%'):
|
||||
hex_bytes = bytes(s[i+1:i+3])
|
||||
value = int(hex_bytes.decode('ascii'), 16)
|
||||
s[i:i+3] = [value]
|
||||
i += 1
|
||||
return bytes(s)
|
||||
|
||||
|
||||
def parse_term(s):
|
||||
"""Parse single s-expr term from bytes."""
|
||||
size, s = s.split(b':', 1)
|
||||
size = int(size)
|
||||
return s[:size], s[size:]
|
||||
|
||||
|
||||
def parse(s):
|
||||
"""Parse full s-expr from bytes."""
|
||||
if s.startswith(b'('):
|
||||
s = s[1:]
|
||||
name, s = parse_term(s)
|
||||
values = [name]
|
||||
while not s.startswith(b')'):
|
||||
value, s = parse(s)
|
||||
values.append(value)
|
||||
return values, s[1:]
|
||||
else:
|
||||
return parse_term(s)
|
||||
|
||||
|
||||
def _parse_ecdsa_sig(args):
|
||||
(r, sig_r), (s, sig_s) = args
|
||||
assert r == b'r'
|
||||
assert s == b's'
|
||||
return (util.bytes2num(sig_r),
|
||||
util.bytes2num(sig_s))
|
||||
|
||||
# DSA and EDDSA happen to have the same structure as ECDSA signatures
|
||||
_parse_dsa_sig = _parse_ecdsa_sig
|
||||
_parse_eddsa_sig = _parse_ecdsa_sig
|
||||
|
||||
|
||||
def _parse_rsa_sig(args):
|
||||
(s, sig_s), = args
|
||||
assert s == b's'
|
||||
return (util.bytes2num(sig_s),)
|
||||
|
||||
|
||||
def parse_sig(sig):
|
||||
"""Parse signature integer values from s-expr."""
|
||||
label, sig = sig
|
||||
assert label == b'sig-val'
|
||||
algo_name = sig[0]
|
||||
parser = {b'rsa': _parse_rsa_sig,
|
||||
b'ecdsa': _parse_ecdsa_sig,
|
||||
b'eddsa': _parse_eddsa_sig,
|
||||
b'dsa': _parse_dsa_sig}[algo_name]
|
||||
return parser(args=sig[1:])
|
||||
|
||||
|
||||
def sign_digest(sock, keygrip, digest, sp=subprocess, environ=None):
|
||||
"""Sign a digest using specified key using GPG agent."""
|
||||
hash_algo = 8 # SHA256
|
||||
assert len(digest) == 32
|
||||
|
||||
assert communicate(sock, 'RESET').startswith(b'OK')
|
||||
|
||||
ttyname = sp.check_output(['tty']).strip()
|
||||
options = ['ttyname={}'.format(ttyname)] # set TTY for passphrase entry
|
||||
|
||||
display = (environ or os.environ).get('DISPLAY')
|
||||
if display is not None:
|
||||
options.append('display={}'.format(display))
|
||||
|
||||
for opt in options:
|
||||
assert communicate(sock, 'OPTION {}'.format(opt)) == b'OK'
|
||||
|
||||
assert communicate(sock, 'SIGKEY {}'.format(keygrip)) == b'OK'
|
||||
hex_digest = binascii.hexlify(digest).upper().decode('ascii')
|
||||
assert communicate(sock, 'SETHASH {} {}'.format(hash_algo,
|
||||
hex_digest)) == b'OK'
|
||||
|
||||
assert communicate(sock, 'SETKEYDESC '
|
||||
'Sign+a+new+TREZOR-based+subkey') == b'OK'
|
||||
assert communicate(sock, 'PKSIGN') == b'OK'
|
||||
line = recvline(sock).strip()
|
||||
line = unescape(line)
|
||||
log.debug('unescaped: %r', line)
|
||||
prefix, sig = line.split(b' ', 1)
|
||||
if prefix != b'D':
|
||||
raise ValueError(prefix)
|
||||
|
||||
sig, leftover = parse(sig)
|
||||
assert not leftover, leftover
|
||||
return parse_sig(sig)
|
||||
|
||||
|
||||
def gpg_command(args, env=None):
|
||||
"""Prepare common GPG command line arguments."""
|
||||
if env is None:
|
||||
env = os.environ
|
||||
cmd = ['gpg2']
|
||||
homedir = env.get('GNUPGHOME')
|
||||
if homedir:
|
||||
cmd.extend(['--homedir', homedir])
|
||||
return cmd + args
|
||||
|
||||
|
||||
def get_keygrip(user_id, sp=subprocess):
|
||||
"""Get a keygrip of the primary GPG key of the specified user."""
|
||||
args = gpg_command(['--list-keys', '--with-keygrip', user_id])
|
||||
output = sp.check_output(args).decode('ascii')
|
||||
return re.findall(r'Keygrip = (\w+)', output)[0]
|
||||
|
||||
|
||||
def gpg_version(sp=subprocess):
|
||||
"""Get a keygrip of the primary GPG key of the specified user."""
|
||||
args = gpg_command(['--version'])
|
||||
output = sp.check_output(args).decode('ascii')
|
||||
line = output.split(b'\n')[0] # b'gpg (GnuPG) 2.1.11'
|
||||
return line.split(b' ')[-1] # b'2.1.11'
|
||||
|
||||
|
||||
def export_public_key(user_id, sp=subprocess):
|
||||
"""Export GPG public key for specified `user_id`."""
|
||||
args = gpg_command(['--export', user_id])
|
||||
result = sp.check_output(args=args)
|
||||
if not result:
|
||||
log.error('could not find public key %r in local GPG keyring', user_id)
|
||||
raise KeyError(user_id)
|
||||
return result
|
||||
252
trezor_agent/gpg/protocol.py
Normal file
252
trezor_agent/gpg/protocol.py
Normal file
@@ -0,0 +1,252 @@
|
||||
"""GPG protocol utilities."""
|
||||
|
||||
import base64
|
||||
import hashlib
|
||||
import logging
|
||||
import struct
|
||||
|
||||
from .. import formats, util
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def packet(tag, blob):
|
||||
"""Create small GPG packet."""
|
||||
assert len(blob) < 2**32
|
||||
|
||||
if len(blob) < 2**8:
|
||||
length_type = 0
|
||||
elif len(blob) < 2**16:
|
||||
length_type = 1
|
||||
else:
|
||||
length_type = 2
|
||||
|
||||
fmt = ['>B', '>H', '>L'][length_type]
|
||||
leading_byte = 0x80 | (tag << 2) | (length_type)
|
||||
return struct.pack('>B', leading_byte) + util.prefix_len(fmt, blob)
|
||||
|
||||
|
||||
def subpacket(subpacket_type, fmt, *values):
|
||||
"""Create GPG subpacket."""
|
||||
blob = struct.pack(fmt, *values) if values else fmt
|
||||
return struct.pack('>B', subpacket_type) + blob
|
||||
|
||||
|
||||
def subpacket_long(subpacket_type, value):
|
||||
"""Create GPG subpacket with 32-bit unsigned integer."""
|
||||
return subpacket(subpacket_type, '>L', value)
|
||||
|
||||
|
||||
def subpacket_time(value):
|
||||
"""Create GPG subpacket with time in seconds (since Epoch)."""
|
||||
return subpacket_long(2, value)
|
||||
|
||||
|
||||
def subpacket_byte(subpacket_type, value):
|
||||
"""Create GPG subpacket with 8-bit unsigned integer."""
|
||||
return subpacket(subpacket_type, '>B', value)
|
||||
|
||||
|
||||
def subpacket_prefix_len(item):
|
||||
"""Prefix subpacket length according to RFC 4880 section-5.2.3.1."""
|
||||
n = len(item)
|
||||
if n >= 8384:
|
||||
prefix = b'\xFF' + struct.pack('>L', n)
|
||||
elif n >= 192:
|
||||
n = n - 192
|
||||
prefix = struct.pack('BB', (n // 256) + 192, n % 256)
|
||||
else:
|
||||
prefix = struct.pack('B', n)
|
||||
return prefix + item
|
||||
|
||||
|
||||
def subpackets(*items):
|
||||
"""Serialize several GPG subpackets."""
|
||||
prefixed = [subpacket_prefix_len(item) for item in items]
|
||||
return util.prefix_len('>H', b''.join(prefixed))
|
||||
|
||||
|
||||
def mpi(value):
|
||||
"""Serialize multipresicion integer using GPG format."""
|
||||
bits = value.bit_length()
|
||||
data_size = (bits + 7) // 8
|
||||
data_bytes = bytearray(data_size)
|
||||
for i in range(data_size):
|
||||
data_bytes[i] = value & 0xFF
|
||||
value = value >> 8
|
||||
|
||||
data_bytes.reverse()
|
||||
return struct.pack('>H', bits) + bytes(data_bytes)
|
||||
|
||||
|
||||
def _serialize_nist256(vk):
|
||||
return mpi((4 << 512) |
|
||||
(vk.pubkey.point.x() << 256) |
|
||||
(vk.pubkey.point.y()))
|
||||
|
||||
|
||||
def _serialize_ed25519(vk):
|
||||
return mpi((0x40 << 256) |
|
||||
util.bytes2num(vk.to_bytes()))
|
||||
|
||||
|
||||
def _compute_keygrip(params):
|
||||
parts = []
|
||||
for name, value in params:
|
||||
exp = '{}:{}{}:'.format(len(name), name, len(value))
|
||||
parts.append(b'(' + exp.encode('ascii') + value + b')')
|
||||
|
||||
return hashlib.sha1(b''.join(parts)).digest()
|
||||
|
||||
|
||||
def _keygrip_nist256(vk):
|
||||
curve = vk.curve.curve
|
||||
gen = vk.curve.generator
|
||||
g = (4 << 512) | (gen.x() << 256) | gen.y()
|
||||
point = vk.pubkey.point
|
||||
q = (4 << 512) | (point.x() << 256) | point.y()
|
||||
|
||||
return _compute_keygrip([
|
||||
['p', util.num2bytes(curve.p(), size=32)],
|
||||
['a', util.num2bytes(curve.a() % curve.p(), size=32)],
|
||||
['b', util.num2bytes(curve.b() % curve.p(), size=32)],
|
||||
['g', util.num2bytes(g, size=65)],
|
||||
['n', util.num2bytes(vk.curve.order, size=32)],
|
||||
['q', util.num2bytes(q, size=65)],
|
||||
])
|
||||
|
||||
|
||||
def _keygrip_ed25519(vk):
|
||||
# pylint: disable=line-too-long
|
||||
return _compute_keygrip([
|
||||
['p', util.num2bytes(0x7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFED, size=32)], # nopep8
|
||||
['a', b'\x01'],
|
||||
['b', util.num2bytes(0x2DFC9311D490018C7338BF8688861767FF8FF5B2BEBE27548A14B235ECA6874A, size=32)], # nopep8
|
||||
['g', util.num2bytes(0x04216936D3CD6E53FEC0A4E231FDD6DC5C692CC7609525A7B2C9562D608F25D51A6666666666666666666666666666666666666666666666666666666666666658, size=65)], # nopep8
|
||||
['n', util.num2bytes(0x1000000000000000000000000000000014DEF9DEA2F79CD65812631A5CF5D3ED, size=32)], # nopep8
|
||||
['q', vk.to_bytes()],
|
||||
])
|
||||
|
||||
|
||||
SUPPORTED_CURVES = {
|
||||
formats.CURVE_NIST256: {
|
||||
# https://tools.ietf.org/html/rfc6637#section-11
|
||||
'oid': b'\x2A\x86\x48\xCE\x3D\x03\x01\x07',
|
||||
'algo_id': 19,
|
||||
'serialize': _serialize_nist256,
|
||||
'keygrip': _keygrip_nist256,
|
||||
},
|
||||
formats.CURVE_ED25519: {
|
||||
'oid': b'\x2B\x06\x01\x04\x01\xDA\x47\x0F\x01',
|
||||
'algo_id': 22,
|
||||
'serialize': _serialize_ed25519,
|
||||
'keygrip': _keygrip_ed25519,
|
||||
}
|
||||
}
|
||||
|
||||
ECDH_ALGO_ID = 18
|
||||
|
||||
CUSTOM_SUBPACKET = subpacket(100, b'TREZOR-GPG') # marks "our" pubkey
|
||||
|
||||
|
||||
def find_curve_by_algo_id(algo_id):
|
||||
"""Find curve name that matches a public key algorith ID."""
|
||||
if algo_id == ECDH_ALGO_ID:
|
||||
return formats.CURVE_NIST256
|
||||
|
||||
curve_name, = [name for name, info in SUPPORTED_CURVES.items()
|
||||
if info['algo_id'] == algo_id]
|
||||
return curve_name
|
||||
|
||||
|
||||
class PublicKey(object):
|
||||
"""GPG representation for public key packets."""
|
||||
|
||||
def __init__(self, curve_name, created, verifying_key, ecdh=False):
|
||||
"""Contruct using a ECDSA VerifyingKey object."""
|
||||
self.curve_info = SUPPORTED_CURVES[curve_name]
|
||||
self.created = int(created) # time since Epoch
|
||||
self.verifying_key = verifying_key
|
||||
self.ecdh = ecdh
|
||||
if ecdh:
|
||||
self.algo_id = ECDH_ALGO_ID
|
||||
self.ecdh_packet = b'\x03\x01\x08\x07'
|
||||
else:
|
||||
self.algo_id = self.curve_info['algo_id']
|
||||
self.ecdh_packet = b''
|
||||
|
||||
hex_key_id = util.hexlify(self.key_id())[-8:]
|
||||
self.desc = 'GPG public key {}/{}'.format(curve_name, hex_key_id)
|
||||
|
||||
@property
|
||||
def keygrip(self):
|
||||
"""Compute GPG2 keygrip."""
|
||||
return self.curve_info['keygrip'](self.verifying_key)
|
||||
|
||||
def data(self):
|
||||
"""Data for packet creation."""
|
||||
header = struct.pack('>BLB',
|
||||
4, # version
|
||||
self.created, # creation
|
||||
self.algo_id) # public key algorithm ID
|
||||
oid = util.prefix_len('>B', self.curve_info['oid'])
|
||||
blob = self.curve_info['serialize'](self.verifying_key)
|
||||
return header + oid + blob + self.ecdh_packet
|
||||
|
||||
def data_to_hash(self):
|
||||
"""Data for digest computation."""
|
||||
return b'\x99' + util.prefix_len('>H', self.data())
|
||||
|
||||
def _fingerprint(self):
|
||||
return hashlib.sha1(self.data_to_hash()).digest()
|
||||
|
||||
def key_id(self):
|
||||
"""Short (8 byte) GPG key ID."""
|
||||
return self._fingerprint()[-8:]
|
||||
|
||||
def __repr__(self):
|
||||
"""Short (8 hexadecimal digits) GPG key ID."""
|
||||
return self.desc
|
||||
|
||||
__str__ = __repr__
|
||||
|
||||
|
||||
def _split_lines(body, size):
|
||||
lines = []
|
||||
for i in range(0, len(body), size):
|
||||
lines.append(body[i:i+size] + '\n')
|
||||
return ''.join(lines)
|
||||
|
||||
|
||||
def armor(blob, type_str):
|
||||
"""See https://tools.ietf.org/html/rfc4880#section-6 for details."""
|
||||
head = '-----BEGIN PGP {}-----\nVersion: GnuPG v2\n\n'.format(type_str)
|
||||
body = base64.b64encode(blob).decode('ascii')
|
||||
checksum = base64.b64encode(util.crc24(blob)).decode('ascii')
|
||||
tail = '-----END PGP {}-----\n'.format(type_str)
|
||||
return head + _split_lines(body, 64) + '=' + checksum + '\n' + tail
|
||||
|
||||
|
||||
def make_signature(signer_func, data_to_sign, public_algo,
|
||||
hashed_subpackets, unhashed_subpackets, sig_type=0):
|
||||
"""Create new GPG signature."""
|
||||
# pylint: disable=too-many-arguments
|
||||
header = struct.pack('>BBBB',
|
||||
4, # version
|
||||
sig_type, # rfc4880 (section-5.2.1)
|
||||
public_algo,
|
||||
8) # hash_alg (SHA256)
|
||||
hashed = subpackets(*hashed_subpackets)
|
||||
unhashed = subpackets(*unhashed_subpackets)
|
||||
tail = b'\x04\xff' + struct.pack('>L', len(header) + len(hashed))
|
||||
data_to_hash = data_to_sign + header + hashed + tail
|
||||
|
||||
log.debug('hashing %d bytes', len(data_to_hash))
|
||||
digest = hashlib.sha256(data_to_hash).digest()
|
||||
log.debug('signing digest: %s', util.hexlify(digest))
|
||||
params = signer_func(digest=digest)
|
||||
sig = b''.join(mpi(p) for p in params)
|
||||
|
||||
return bytes(header + hashed + unhashed +
|
||||
digest[:2] + # used for decoder's sanity check
|
||||
sig) # actual ECDSA signature
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
1
trezor_agent/gpg/tests/__init__.py
Normal file
1
trezor_agent/gpg/tests/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Tests for GPG module."""
|
||||
124
trezor_agent/gpg/tests/test_decode.py
Normal file
124
trezor_agent/gpg/tests/test_decode.py
Normal file
@@ -0,0 +1,124 @@
|
||||
import glob
|
||||
import hashlib
|
||||
import io
|
||||
import os
|
||||
|
||||
import pytest
|
||||
|
||||
from .. import decode, protocol
|
||||
from ... import util
|
||||
|
||||
|
||||
def test_subpackets():
|
||||
s = io.BytesIO(b'\x00\x05\x02\xAB\xCD\x01\xEF')
|
||||
assert decode.parse_subpackets(util.Reader(s)) == [b'\xAB\xCD', b'\xEF']
|
||||
|
||||
|
||||
def test_subpackets_prefix():
|
||||
for n in [0, 1, 2, 4, 5, 10, 191, 192, 193,
|
||||
255, 256, 257, 8383, 8384, 65530]:
|
||||
item = b'?' * n # create dummy subpacket
|
||||
prefixed = protocol.subpackets(item)
|
||||
result = decode.parse_subpackets(util.Reader(io.BytesIO(prefixed)))
|
||||
assert [item] == result
|
||||
|
||||
|
||||
def test_mpi():
|
||||
s = io.BytesIO(b'\x00\x09\x01\x23')
|
||||
assert decode.parse_mpi(util.Reader(s)) == 0x123
|
||||
|
||||
s = io.BytesIO(b'\x00\x09\x01\x23\x00\x03\x05')
|
||||
assert decode.parse_mpis(util.Reader(s), n=2) == [0x123, 5]
|
||||
|
||||
|
||||
def assert_subdict(d, s):
|
||||
for k, v in s.items():
|
||||
assert d[k] == v
|
||||
|
||||
|
||||
def test_primary_nist256p1():
|
||||
# pylint: disable=line-too-long
|
||||
data = b'''-----BEGIN PGP PUBLIC KEY BLOCK-----
|
||||
Version: GnuPG v2
|
||||
|
||||
mFIEV0hI1hMIKoZIzj0DAQcCAwTXY7aq01xMPSU7gTHU9B7Z2CFoCk1Y4WYb8Tiy
|
||||
hurvIZ5la6+UEgAKF9HXpQo0yE+HQOgufoLlCpdE7NoEUb+HtAd0ZXN0aW5niHYE
|
||||
ExMIABIFAldISNYCGwMCFQgCFgACF4AAFgkQTcCehfpEIPILZFRSRVpPUi1HUEeV
|
||||
3QEApHKmBkbLVZNpsB8q9mBzKytxnOHNB3QWDuoKJu/ERi4A/1wRGZ/B0BDazHck
|
||||
zpR9luXTKwMEl+mlZmwEFKZXBmir
|
||||
=oyj0
|
||||
-----END PGP PUBLIC KEY BLOCK-----
|
||||
'''
|
||||
stream = io.BytesIO(decode.remove_armor(data))
|
||||
pubkey, user_id, signature = list(decode.parse_packets(stream))
|
||||
expected_pubkey = {
|
||||
'created': 1464355030, 'type': 'pubkey', 'tag': 6,
|
||||
'version': 4, 'algo': 19, 'key_id': b'M\xc0\x9e\x85\xfaD \xf2',
|
||||
'_to_hash': b'\x99\x00R\x04WHH\xd6\x13\x08*\x86H\xce=\x03\x01\x07\x02\x03\x04\xd7c\xb6\xaa\xd3\\L=%;\x811\xd4\xf4\x1e\xd9\xd8!h\nMX\xe1f\x1b\xf18\xb2\x86\xea\xef!\x9eek\xaf\x94\x12\x00\n\x17\xd1\xd7\xa5\n4\xc8O\x87@\xe8.~\x82\xe5\n\x97D\xec\xda\x04Q\xbf\x87' # nopep8
|
||||
}
|
||||
assert_subdict(pubkey, expected_pubkey)
|
||||
point = pubkey['verifying_key'].pubkey.point
|
||||
assert point.x(), point.y() == (
|
||||
97423441028100245505102979561460969898742433559010922791700160771755342491425,
|
||||
71644624850142103522769833619875243486871666152651730678601507641225861250951
|
||||
)
|
||||
assert_subdict(user_id, {
|
||||
'tag': 13, 'type': 'user_id', 'value': b'testing',
|
||||
'_to_hash': b'\xb4\x00\x00\x00\x07testing'
|
||||
})
|
||||
assert_subdict(signature, {
|
||||
'pubkey_alg': 19, '_is_custom': True, 'hash_alg': 8, 'tag': 2,
|
||||
'sig_type': 19, 'version': 4, 'type': 'signature', 'hash_prefix': b'\x95\xdd',
|
||||
'sig': (74381873592149178031432444136130575481350858387410643140628758456112511206958,
|
||||
41642995320462795718437755373080464775445470754419831653624197847615308982443),
|
||||
'hashed_subpackets': [b'\x02WHH\xd6', b'\x1b\x03', b'\x15\x08', b'\x16\x00', b'\x17\x80'],
|
||||
'unhashed_subpackets': [b'\x10M\xc0\x9e\x85\xfaD \xf2', b'dTREZOR-GPG'],
|
||||
'_to_hash': b'\x04\x13\x13\x08\x00\x12\x05\x02WHH\xd6\x02\x1b\x03\x02\x15\x08\x02\x16\x00\x02\x17\x80\x04\xff\x00\x00\x00\x18' # nopep8
|
||||
})
|
||||
|
||||
digest = decode.digest_packets(packets=[pubkey, user_id, signature],
|
||||
hasher=hashlib.sha256())
|
||||
decode.verify_digest(pubkey=pubkey, digest=digest,
|
||||
signature=signature['sig'],
|
||||
label='GPG primary public key')
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
bad_digest = b'\x00' * len(digest)
|
||||
decode.verify_digest(pubkey=pubkey, digest=bad_digest,
|
||||
signature=signature['sig'],
|
||||
label='GPG primary public key')
|
||||
|
||||
message = b'Hello, World!\n'
|
||||
signature = b'''-----BEGIN PGP SIGNATURE-----
|
||||
Version: GnuPG v2
|
||||
|
||||
iF4EABMIAAYFAldIlfQACgkQTcCehfpEIPKOUgD9FjaeWla4wOuDZ7P6fhkT5nZp
|
||||
KDQU0N5KmNwLlt2kwo4A/jQkBII2cI8tTqOVTLNRXXqIOsMf/fG4jKM/VOFc/01c
|
||||
=dC+z
|
||||
-----END PGP SIGNATURE-----
|
||||
'''
|
||||
decode.verify(pubkey=pubkey, signature=signature, original_data=message)
|
||||
|
||||
pubkey = decode.load_public_key(pubkey_bytes=decode.remove_armor(data))
|
||||
assert_subdict(pubkey, expected_pubkey)
|
||||
assert_subdict(pubkey, {'user_id': b'testing'})
|
||||
|
||||
pubkey = decode.load_public_key(pubkey_bytes=decode.remove_armor(data),
|
||||
use_custom=True)
|
||||
assert_subdict(pubkey, expected_pubkey)
|
||||
assert_subdict(pubkey, {'user_id': b'testing'})
|
||||
|
||||
|
||||
cwd = os.path.join(os.path.dirname(__file__))
|
||||
input_files = glob.glob(os.path.join(cwd, '*.gpg'))
|
||||
|
||||
|
||||
@pytest.fixture(params=input_files)
|
||||
def public_key_path(request):
|
||||
return request.param
|
||||
|
||||
|
||||
def test_gpg_files(public_key_path): # pylint: disable=redefined-outer-name
|
||||
with open(public_key_path, 'rb') as f:
|
||||
keys = decode.parse_public_keys(f)
|
||||
assert len(keys) > 0
|
||||
82
trezor_agent/gpg/tests/test_keyring.py
Normal file
82
trezor_agent/gpg/tests/test_keyring.py
Normal file
@@ -0,0 +1,82 @@
|
||||
import io
|
||||
import mock
|
||||
|
||||
from .. import keyring
|
||||
|
||||
|
||||
def test_unescape_short():
|
||||
assert keyring.unescape(b'abc%0AX%0D %25;.-+()') == b'abc\nX\r %;.-+()'
|
||||
|
||||
|
||||
def test_unescape_long():
|
||||
escaped = (b'D (7:sig-val(3:dsa(1:r32:\x1d\x15.\x12\xe8h\x19\xd9O\xeb\x06'
|
||||
b'yD?a:/\xae\xdb\xac\x93\xa6\x86\xcbs\xb8\x03\xf1\xcb\x89\xc7'
|
||||
b'\x1f)(1:s32:%25\xb5\x04\x94\xc7\xc4X\xc7\xe0%0D\x08\xbb%0DuN'
|
||||
b'\x9c6}[\xc2=t\x8c\xfdD\x81\xe8\xdd\x86=\xe2\xa9)))')
|
||||
unescaped = (b'D (7:sig-val(3:dsa(1:r32:\x1d\x15.\x12\xe8h\x19\xd9O\xeb'
|
||||
b'\x06yD?a:/\xae\xdb\xac\x93\xa6\x86\xcbs\xb8\x03\xf1\xcb\x89'
|
||||
b'\xc7\x1f)(1:s32:%\xb5\x04\x94\xc7\xc4X\xc7\xe0\r\x08\xbb\ru'
|
||||
b'N\x9c6}[\xc2=t\x8c\xfdD\x81\xe8\xdd\x86=\xe2\xa9)))')
|
||||
assert keyring.unescape(escaped) == unescaped
|
||||
|
||||
|
||||
def test_parse_term():
|
||||
assert keyring.parse(b'4:abcdXXX') == (b'abcd', b'XXX')
|
||||
|
||||
|
||||
def test_parse_ecdsa():
|
||||
sig, rest = keyring.parse(b'(7:sig-val(5:ecdsa'
|
||||
b'(1:r2:\x01\x02)(1:s2:\x03\x04)))')
|
||||
values = [[b'r', b'\x01\x02'], [b's', b'\x03\x04']]
|
||||
assert sig == [b'sig-val', [b'ecdsa'] + values]
|
||||
assert rest == b''
|
||||
assert keyring.parse_sig(sig) == (0x102, 0x304)
|
||||
|
||||
|
||||
def test_parse_rsa():
|
||||
sig, rest = keyring.parse(b'(7:sig-val(3:rsa(1:s4:\x01\x02\x03\x04)))')
|
||||
assert sig == [b'sig-val', [b'rsa', [b's', b'\x01\x02\x03\x04']]]
|
||||
assert rest == b''
|
||||
assert keyring.parse_sig(sig) == (0x1020304,)
|
||||
|
||||
|
||||
class FakeSocket(object):
|
||||
def __init__(self):
|
||||
self.rx = io.BytesIO()
|
||||
self.tx = io.BytesIO()
|
||||
|
||||
def recv(self, n):
|
||||
return self.rx.read(n)
|
||||
|
||||
def sendall(self, data):
|
||||
self.tx.write(data)
|
||||
|
||||
|
||||
def test_sign_digest():
|
||||
sock = FakeSocket()
|
||||
sock.rx.write(b'OK Pleased to meet you, process XYZ\n')
|
||||
sock.rx.write(b'OK\n' * 6)
|
||||
sock.rx.write(b'D (7:sig-val(3:rsa(1:s16:0123456789ABCDEF)))\n')
|
||||
sock.rx.seek(0)
|
||||
keygrip = '1234'
|
||||
digest = b'A' * 32
|
||||
sp = mock.Mock(spec=['check_output'])
|
||||
sp.check_output.return_value = '/dev/pts/0'
|
||||
sig = keyring.sign_digest(sock=sock, keygrip=keygrip,
|
||||
digest=digest, sp=sp,
|
||||
environ={'DISPLAY': ':0'})
|
||||
assert sig == (0x30313233343536373839414243444546,)
|
||||
assert sock.tx.getvalue() == b'''RESET
|
||||
OPTION ttyname=/dev/pts/0
|
||||
OPTION display=:0
|
||||
SIGKEY 1234
|
||||
SETHASH 8 4141414141414141414141414141414141414141414141414141414141414141
|
||||
SETKEYDESC Sign+a+new+TREZOR-based+subkey
|
||||
PKSIGN
|
||||
'''
|
||||
|
||||
|
||||
def test_iterlines():
|
||||
sock = FakeSocket()
|
||||
sock.rx.write(b'foo\nbar\nxyz')
|
||||
assert list(keyring.iterlines(sock)) == []
|
||||
95
trezor_agent/gpg/tests/test_protocol.py
Normal file
95
trezor_agent/gpg/tests/test_protocol.py
Normal file
@@ -0,0 +1,95 @@
|
||||
import ecdsa
|
||||
import ed25519
|
||||
|
||||
from .. import protocol
|
||||
from ... import formats
|
||||
|
||||
|
||||
def test_packet():
|
||||
assert protocol.packet(1, b'') == b'\x84\x00'
|
||||
assert protocol.packet(2, b'A') == b'\x88\x01A'
|
||||
blob = b'B' * 0xAB
|
||||
assert protocol.packet(3, blob) == b'\x8c\xAB' + blob
|
||||
blob = b'C' * 0x1234
|
||||
assert protocol.packet(3, blob) == b'\x8d\x12\x34' + blob
|
||||
blob = b'D' * 0x12345678
|
||||
assert protocol.packet(4, blob) == b'\x92\x12\x34\x56\x78' + blob
|
||||
|
||||
|
||||
def test_subpackets():
|
||||
assert protocol.subpacket(1, b'') == b'\x01'
|
||||
assert protocol.subpacket(2, '>H', 0x0304) == b'\x02\x03\x04'
|
||||
assert protocol.subpacket_long(9, 0x12345678) == b'\x09\x12\x34\x56\x78'
|
||||
assert protocol.subpacket_time(0x12345678) == b'\x02\x12\x34\x56\x78'
|
||||
assert protocol.subpacket_byte(0xAB, 0xCD) == b'\xAB\xCD'
|
||||
assert protocol.subpackets() == b'\x00\x00'
|
||||
assert protocol.subpackets(b'ABC', b'12345') == b'\x00\x0A\x03ABC\x0512345'
|
||||
|
||||
|
||||
def test_mpi():
|
||||
assert protocol.mpi(0x123) == b'\x00\x09\x01\x23'
|
||||
|
||||
|
||||
def test_find():
|
||||
assert protocol.find_curve_by_algo_id(19) == formats.CURVE_NIST256
|
||||
assert protocol.find_curve_by_algo_id(22) == formats.CURVE_ED25519
|
||||
|
||||
|
||||
def test_armor():
|
||||
data = bytearray(range(256))
|
||||
assert protocol.armor(data, 'TEST') == '''-----BEGIN PGP TEST-----
|
||||
Version: GnuPG v2
|
||||
|
||||
AAECAwQFBgcICQoLDA0ODxAREhMUFRYXGBkaGxwdHh8gISIjJCUmJygpKissLS4v
|
||||
MDEyMzQ1Njc4OTo7PD0+P0BBQkNERUZHSElKS0xNTk9QUVJTVFVWV1hZWltcXV5f
|
||||
YGFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6e3x9fn+AgYKDhIWGh4iJiouMjY6P
|
||||
kJGSk5SVlpeYmZqbnJ2en6ChoqOkpaanqKmqq6ytrq+wsbKztLW2t7i5uru8vb6/
|
||||
wMHCw8TFxsfIycrLzM3Oz9DR0tPU1dbX2Nna29zd3t/g4eLj5OXm5+jp6uvs7e7v
|
||||
8PHy8/T19vf4+fr7/P3+/w==
|
||||
=W700
|
||||
-----END PGP TEST-----
|
||||
'''
|
||||
|
||||
|
||||
def test_make_signature():
|
||||
def signer_func(digest):
|
||||
assert digest == (b'\xd0\xe5]|\x8bP\xe6\x91\xb3\xe8+\xf4A\xf0`(\xb1'
|
||||
b'\xc7\xf4;\x86\x97s\xdb\x9a\xda\xee< \xcb\x9e\x00')
|
||||
return (7, 8)
|
||||
|
||||
sig = protocol.make_signature(
|
||||
signer_func=signer_func,
|
||||
data_to_sign=b'Hello World!',
|
||||
public_algo=22,
|
||||
hashed_subpackets=[protocol.subpacket_time(1)],
|
||||
unhashed_subpackets=[],
|
||||
sig_type=25)
|
||||
assert sig == (b'\x04\x19\x16\x08\x00\x06\x05\x02'
|
||||
b'\x00\x00\x00\x01\x00\x00\xd0\xe5\x00\x03\x07\x00\x04\x08')
|
||||
|
||||
|
||||
def test_nist256p1():
|
||||
sk = ecdsa.SigningKey.from_secret_exponent(secexp=1, curve=ecdsa.NIST256p)
|
||||
vk = sk.get_verifying_key()
|
||||
pk = protocol.PublicKey(curve_name=formats.CURVE_NIST256,
|
||||
created=42, verifying_key=vk)
|
||||
assert repr(pk) == 'GPG public key nist256p1/F82361D9'
|
||||
assert pk.keygrip == b'\x95\x85.\x91\x7f\xe2\xc3\x91R\xba\x99\x81\x92\xb5y\x1d\xb1\\\xdc\xf0'
|
||||
|
||||
|
||||
def test_nist256p1_ecdh():
|
||||
sk = ecdsa.SigningKey.from_secret_exponent(secexp=1, curve=ecdsa.NIST256p)
|
||||
vk = sk.get_verifying_key()
|
||||
pk = protocol.PublicKey(curve_name=formats.CURVE_NIST256,
|
||||
created=42, verifying_key=vk, ecdh=True)
|
||||
assert repr(pk) == 'GPG public key nist256p1/5811DF46'
|
||||
assert pk.keygrip == b'\x95\x85.\x91\x7f\xe2\xc3\x91R\xba\x99\x81\x92\xb5y\x1d\xb1\\\xdc\xf0'
|
||||
|
||||
|
||||
def test_ed25519():
|
||||
sk = ed25519.SigningKey(b'\x00' * 32)
|
||||
vk = sk.get_verifying_key()
|
||||
pk = protocol.PublicKey(curve_name=formats.CURVE_ED25519,
|
||||
created=42, verifying_key=vk)
|
||||
assert repr(pk) == 'GPG public key ed25519/36B40FE6'
|
||||
assert pk.keygrip == b'\xbf\x01\x90l\x17\xb64\xa3-\xf4\xc0gr\x99\x18<\xddBQ?'
|
||||
@@ -29,7 +29,7 @@ class FakeConnection(object):
|
||||
def get_public_node(self, n, ecdsa_curve_name=b'secp256k1'):
|
||||
assert not self.closed
|
||||
assert n == ADDR
|
||||
assert ecdsa_curve_name in {b'secp256k1', b'nist256p1'}
|
||||
assert ecdsa_curve_name in {'secp256k1', 'nist256p1'}
|
||||
result = mock.Mock(spec=[])
|
||||
result.node = mock.Mock(spec=[])
|
||||
result.node.public_key = PUBKEY
|
||||
@@ -90,8 +90,8 @@ def test_ssh_agent():
|
||||
assert (client.identity_to_string(identity) ==
|
||||
client.identity_to_string(ident))
|
||||
assert challenge_hidden == BLOB
|
||||
assert challenge_visual == 'VISUAL'
|
||||
assert ecdsa_curve_name == b'nist256p1'
|
||||
assert challenge_visual == ''
|
||||
assert ecdsa_curve_name == 'nist256p1'
|
||||
|
||||
result = mock.Mock(spec=[])
|
||||
result.public_key = PUBKEY
|
||||
@@ -99,8 +99,7 @@ def test_ssh_agent():
|
||||
return result
|
||||
|
||||
c.client.sign_identity = ssh_sign_identity
|
||||
signature = c.sign_ssh_challenge(label=label, blob=BLOB,
|
||||
visual='VISUAL')
|
||||
signature = c.sign_ssh_challenge(label=label, blob=BLOB)
|
||||
|
||||
key = formats.import_public_key(PUBKEY_TEXT)
|
||||
serialized_sig = key['verifier'](sig=signature, msg=BLOB)
|
||||
@@ -122,7 +121,7 @@ def test_ssh_agent():
|
||||
|
||||
c.client.sign_identity = cancel_sign_identity
|
||||
with pytest.raises(IOError):
|
||||
c.sign_ssh_challenge(label=label, blob=BLOB, visual='VISUAL')
|
||||
c.sign_ssh_challenge(label=label, blob=BLOB)
|
||||
|
||||
|
||||
def test_utils():
|
||||
|
||||
@@ -29,7 +29,7 @@ def test_parse_public_key():
|
||||
assert key['name'] == b'home'
|
||||
assert key['point'] == _point
|
||||
|
||||
assert key['curve'] == b'nist256p1'
|
||||
assert key['curve'] == 'nist256p1'
|
||||
assert key['fingerprint'] == '4b:19:bc:0f:c8:7e:dc:fa:1a:e3:c2:ff:6f:e0:80:a2' # nopep8
|
||||
assert key['type'] == b'ecdsa-sha2-nistp256'
|
||||
|
||||
@@ -46,7 +46,7 @@ def test_parse_ed25519():
|
||||
'fSO8nLIi736is+f0erq28RTc7CkM11NZtTKR hello\n')
|
||||
p = formats.import_public_key(pubkey)
|
||||
assert p['name'] == b'hello'
|
||||
assert p['curve'] == b'ed25519'
|
||||
assert p['curve'] == 'ed25519'
|
||||
|
||||
BLOB = (b'\x00\x00\x00\x0bssh-ed25519\x00\x00\x00 P]\x17kc}#'
|
||||
b'\xbc\x9c\xb2"\xef~\xa2\xb3\xe7\xf4z\xba\xb6\xf1\x14'
|
||||
|
||||
@@ -46,3 +46,54 @@ def test_send_recv():
|
||||
assert util.recv(s, 2) == b'3*'
|
||||
|
||||
pytest.raises(EOFError, util.recv, s, 1)
|
||||
|
||||
|
||||
def test_crc24():
|
||||
assert util.crc24(b'') == b'\xb7\x04\xce'
|
||||
assert util.crc24(b'1234567890') == b'\x8c\x00\x72'
|
||||
|
||||
|
||||
def test_bit():
|
||||
assert util.bit(6, 3) == 0
|
||||
assert util.bit(6, 2) == 1
|
||||
assert util.bit(6, 1) == 1
|
||||
assert util.bit(6, 0) == 0
|
||||
|
||||
|
||||
def test_split_bits():
|
||||
assert util.split_bits(0x1234, 4, 8, 4) == [0x1, 0x23, 0x4]
|
||||
|
||||
|
||||
def test_hexlify():
|
||||
assert util.hexlify(b'\x12\x34\xab\xcd') == '1234ABCD'
|
||||
|
||||
|
||||
def test_low_bits():
|
||||
assert util.low_bits(0x1234, 12) == 0x234
|
||||
assert util.low_bits(0x1234, 32) == 0x1234
|
||||
assert util.low_bits(0x1234, 0) == 0
|
||||
|
||||
|
||||
def test_readfmt():
|
||||
stream = io.BytesIO(b'ABC\x12\x34')
|
||||
assert util.readfmt(stream, 'B') == (65,)
|
||||
assert util.readfmt(stream, '>2sH') == (b'BC', 0x1234)
|
||||
|
||||
|
||||
def test_prefix_len():
|
||||
assert util.prefix_len('>H', b'ABCD') == b'\x00\x04ABCD'
|
||||
|
||||
|
||||
def test_reader():
|
||||
stream = io.BytesIO(b'ABC\x12\x34')
|
||||
r = util.Reader(stream)
|
||||
assert r.read(1) == b'A'
|
||||
assert r.readfmt('2s') == b'BC'
|
||||
|
||||
dst = io.BytesIO()
|
||||
with r.capture(dst):
|
||||
assert r.readfmt('>H') == 0x1234
|
||||
assert dst.getvalue() == b'\x12\x34'
|
||||
|
||||
with pytest.raises(EOFError):
|
||||
r.read(1)
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
"""Various I/O and serialization utilities."""
|
||||
import binascii
|
||||
import contextlib
|
||||
import io
|
||||
import struct
|
||||
|
||||
@@ -60,7 +62,7 @@ def num2bytes(value, size):
|
||||
res.append(value & 0xFF)
|
||||
value = value >> 8
|
||||
assert value == 0
|
||||
return bytearray(list(reversed(res)))
|
||||
return bytes(bytearray(list(reversed(res))))
|
||||
|
||||
|
||||
def pack(fmt, *args):
|
||||
@@ -75,3 +77,99 @@ def frame(*msgs):
|
||||
res.write(msg)
|
||||
msg = res.getvalue()
|
||||
return pack('L', len(msg)) + msg
|
||||
|
||||
|
||||
def crc24(blob):
|
||||
"""See https://tools.ietf.org/html/rfc4880#section-6.1 for details."""
|
||||
CRC24_INIT = 0x0B704CE
|
||||
CRC24_POLY = 0x1864CFB
|
||||
|
||||
crc = CRC24_INIT
|
||||
for octet in bytearray(blob):
|
||||
crc ^= (octet << 16)
|
||||
for _ in range(8):
|
||||
crc <<= 1
|
||||
if crc & 0x1000000:
|
||||
crc ^= CRC24_POLY
|
||||
assert 0 <= crc < 0x1000000
|
||||
crc_bytes = struct.pack('>L', crc)
|
||||
assert crc_bytes[:1] == b'\x00'
|
||||
return crc_bytes[1:]
|
||||
|
||||
|
||||
def bit(value, i):
|
||||
"""Extract the i-th bit out of value."""
|
||||
return 1 if value & (1 << i) else 0
|
||||
|
||||
|
||||
def low_bits(value, n):
|
||||
"""Extract the lowest n bits out of value."""
|
||||
return value & ((1 << n) - 1)
|
||||
|
||||
|
||||
def split_bits(value, *bits):
|
||||
"""
|
||||
Split integer value into list of ints, according to `bits` list.
|
||||
|
||||
For example, split_bits(0x1234, 4, 8, 4) == [0x1, 0x23, 0x4]
|
||||
"""
|
||||
result = []
|
||||
for b in reversed(bits):
|
||||
mask = (1 << b) - 1
|
||||
result.append(value & mask)
|
||||
value = value >> b
|
||||
assert value == 0
|
||||
|
||||
result.reverse()
|
||||
return result
|
||||
|
||||
|
||||
def readfmt(stream, fmt):
|
||||
"""Read and unpack an object from stream, using a struct format string."""
|
||||
size = struct.calcsize(fmt)
|
||||
blob = stream.read(size)
|
||||
return struct.unpack(fmt, blob)
|
||||
|
||||
|
||||
def prefix_len(fmt, blob):
|
||||
"""Prefix `blob` with its size, serialized using `fmt` format."""
|
||||
return struct.pack(fmt, len(blob)) + blob
|
||||
|
||||
|
||||
def hexlify(blob):
|
||||
"""Utility for consistent hexadecimal formatting."""
|
||||
return binascii.hexlify(blob).decode('ascii').upper()
|
||||
|
||||
|
||||
class Reader(object):
|
||||
"""Read basic type objects out of given stream."""
|
||||
|
||||
def __init__(self, stream):
|
||||
"""Create a non-capturing reader."""
|
||||
self.s = stream
|
||||
self._captured = None
|
||||
|
||||
def readfmt(self, fmt):
|
||||
"""Read a specified object, using a struct format string."""
|
||||
size = struct.calcsize(fmt)
|
||||
blob = self.read(size)
|
||||
obj, = struct.unpack(fmt, blob)
|
||||
return obj
|
||||
|
||||
def read(self, size=None):
|
||||
"""Read `size` bytes from stream."""
|
||||
blob = self.s.read(size)
|
||||
if size is not None and len(blob) < size:
|
||||
raise EOFError
|
||||
if self._captured:
|
||||
self._captured.write(blob)
|
||||
return blob
|
||||
|
||||
@contextlib.contextmanager
|
||||
def capture(self, stream):
|
||||
"""Capture all data read during this context."""
|
||||
self._captured = stream
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
self._captured = None
|
||||
|
||||
Reference in New Issue
Block a user