mirror of
https://github.com/romanz/amodem.git
synced 2026-05-03 08:27:26 +08:00
Compare commits
78 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2af1086ed8 | ||
|
|
7e95179128 | ||
|
|
ac8898a434 | ||
|
|
0b829636e1 | ||
|
|
7598f6cdbf | ||
|
|
c8bf57cbcc | ||
|
|
62af49236c | ||
|
|
af3f669780 | ||
|
|
1520dbd8b9 | ||
|
|
4f05d51e9b | ||
|
|
9d38c26a0f | ||
|
|
3a9330b995 | ||
|
|
f904aac92e | ||
|
|
ca67923fe8 | ||
|
|
ce90e61eb2 | ||
|
|
90dc124e8d | ||
|
|
442bf725ef | ||
|
|
5820480052 | ||
|
|
ae2a84e168 | ||
|
|
f6911a0016 | ||
|
|
69c54eb425 | ||
|
|
931573f32b | ||
|
|
1e8363d4fc | ||
|
|
b7113083b4 | ||
|
|
b5c4eca0d2 | ||
|
|
8aa08d0862 | ||
|
|
b452b49f4c | ||
|
|
639c4efb6d | ||
|
|
6f1686c614 | ||
|
|
300d9a7140 | ||
|
|
b143bafc70 | ||
|
|
f2c6b6b9c1 | ||
|
|
e0507b1508 | ||
|
|
85274d8374 | ||
|
|
f358ca29d4 | ||
|
|
53d43cba29 | ||
|
|
214b556f83 | ||
|
|
051a3fd4ab | ||
|
|
91050ee64a | ||
|
|
257992d04c | ||
|
|
6c2273387d | ||
|
|
b6ad8207ba | ||
|
|
3a93fc859d | ||
|
|
7d9b3ff1d0 | ||
|
|
4af881b3cb | ||
|
|
eb525e1b62 | ||
|
|
02c8e729b7 | ||
|
|
12359938ad | ||
|
|
93cd3e688b | ||
|
|
26d7dd3124 | ||
|
|
0d5c3a9ca7 | ||
|
|
97ec6b2719 | ||
|
|
8ba9be1780 | ||
|
|
b2bc87c0c7 | ||
|
|
d522d148ef | ||
|
|
c796a3b01d | ||
|
|
a3362bbf3e | ||
|
|
9a271d115b | ||
|
|
6a7165298f | ||
|
|
c4f3fa6e04 | ||
|
|
8a77fa519f | ||
|
|
59560ec0b0 | ||
|
|
7a91196dd5 | ||
|
|
43c424a402 | ||
|
|
6672ea9bc4 | ||
|
|
002dc2a0e0 | ||
|
|
61ced2808f | ||
|
|
71a8930021 | ||
|
|
74e8f21a22 | ||
|
|
897236d556 | ||
|
|
5bec0e8382 | ||
|
|
3cb7f6fd21 | ||
|
|
cad2ec1239 | ||
|
|
604b2b7e99 | ||
|
|
159bd79b5f | ||
|
|
dde0b60e83 | ||
|
|
109bb3b47f | ||
|
|
0f20bfa239 |
@@ -1,2 +1,2 @@
|
||||
[MESSAGES CONTROL]
|
||||
disable=invalid-name, missing-docstring, locally-disabled, unbalanced-tuple-unpacking
|
||||
disable=invalid-name, missing-docstring, locally-disabled, unbalanced-tuple-unpacking,no-else-return
|
||||
|
||||
20
.travis.yml
20
.travis.yml
@@ -4,29 +4,25 @@ python:
|
||||
- "2.7"
|
||||
- "3.4"
|
||||
- "3.5"
|
||||
- "3.6"
|
||||
|
||||
cache:
|
||||
directories:
|
||||
- $HOME/.cache/pip
|
||||
|
||||
addons:
|
||||
apt:
|
||||
packages:
|
||||
- libudev-dev
|
||||
- libusb-1.0-0-dev
|
||||
|
||||
before_install:
|
||||
- pip install -U setuptools pylint coverage pep8 pydocstyle "pip>=7.0" wheel
|
||||
- pip install -e git+https://github.com/keepkey/python-keepkey@6e8baa8b935e830d05f87b6dfd9bc7c927a96dc3#egg=keepkey
|
||||
- pip install -U pip wheel
|
||||
- pip install -U setuptools
|
||||
- pip install -U pylint coverage pep8 pydocstyle
|
||||
|
||||
install:
|
||||
- pip install -e .
|
||||
|
||||
script:
|
||||
- pep8 trezor_agent
|
||||
- pylint --reports=no --rcfile .pylintrc trezor_agent
|
||||
- pydocstyle trezor_agent
|
||||
- coverage run --source trezor_agent/ -m py.test -v
|
||||
- pep8 libagent
|
||||
- pylint --reports=no --rcfile .pylintrc libagent
|
||||
- pydocstyle libagent
|
||||
- coverage run --source libagent/ -m py.test -v
|
||||
|
||||
after_success:
|
||||
- coverage report
|
||||
|
||||
70
INSTALL.md
Normal file
70
INSTALL.md
Normal file
@@ -0,0 +1,70 @@
|
||||
# Installation
|
||||
|
||||
Install the following packages (depending on your distribution):
|
||||
|
||||
### Debian
|
||||
|
||||
$ apt update && apt upgrade
|
||||
$ apt install python-pip python-dev libusb-1.0-0-dev libudev-dev
|
||||
|
||||
### Fedora/RedHat
|
||||
|
||||
$ yum update
|
||||
$ yum install python-pip python-devel libusb-devel libudev-devel \
|
||||
gcc redhat-rpm-config
|
||||
|
||||
Also, update Python packages before starting the installation:
|
||||
|
||||
$ pip install -U setuptools pip
|
||||
|
||||
Make sure you are running the latest firmware version on your hardware device.
|
||||
Currently the following firmware versions are supported:
|
||||
|
||||
* [TREZOR](https://wallet.trezor.io/data/firmware/releases.json): `1.4.2+`
|
||||
* [KeepKey](https://github.com/keepkey/keepkey-firmware/releases): `3.0.17+`
|
||||
* [Ledger Nano S](https://github.com/LedgerHQ/blue-app-ssh-agent): `0.0.3+` (install [SSH/PGP Agent](https://www.ledgerwallet.com/images/apps/chrome-mngr-apps.png) app)
|
||||
|
||||
## TREZOR
|
||||
|
||||
Make sure that your `udev` rules are configured [correctly](https://doc.satoshilabs.com/trezor-user/settingupchromeonlinux.html#manual-configuration-of-udev-rules).
|
||||
Then, install the latest [trezor_agent](https://pypi.python.org/pypi/trezor_agent) package:
|
||||
|
||||
$ pip install trezor_agent
|
||||
|
||||
Or, directly from the latest source code:
|
||||
|
||||
$ git clone https://github.com/romanz/trezor-agent
|
||||
$ pip install --user -e trezor-agent/agents/trezor
|
||||
|
||||
## KeepKey
|
||||
|
||||
Make sure that your `udev` rules are configured [correctly](https://support.keepkey.com/support/solutions/articles/6000037796-keepkey-wallet-is-not-being-recognized-by-linux).
|
||||
Then, install the latest [keepkey_agent](https://pypi.python.org/pypi/keepkey_agent) package:
|
||||
|
||||
$ pip install keepkey_agent
|
||||
|
||||
Or, directly from the latest source code:
|
||||
|
||||
$ git clone https://github.com/romanz/trezor-agent
|
||||
$ pip install --user -e trezor-agent/agents/keepkey
|
||||
|
||||
## Ledger Nano S
|
||||
|
||||
Make sure that your `udev` rules are configured [correctly](http://support.ledgerwallet.com/knowledge_base/topics/ledger-wallet-is-not-recognized-on-linux).
|
||||
Then, install the latest [ledger_agent](https://pypi.python.org/pypi/ledger_agent) package:
|
||||
|
||||
$ pip install ledger_agent
|
||||
|
||||
Or, directly from the latest source code:
|
||||
|
||||
$ git clone https://github.com/romanz/trezor-agent
|
||||
$ pip install --user -e trezor-agent/agents/ledger
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
If there is an import problem with the installed `protobuf` package,
|
||||
see [this issue](https://github.com/romanz/trezor-agent/issues/28) for fixing it.
|
||||
|
||||
If you can't find the command-line utilities (after running `pip install --user`),
|
||||
please make sure that `~/.local/bin` is on your `PATH` variable
|
||||
(see a [relevant](https://github.com/pypa/pip/issues/3813) issue).
|
||||
@@ -1,10 +1,15 @@
|
||||
Note: the GPG-related code is still under development, so please try the current implementation
|
||||
and feel free to [report any issue](https://github.com/romanz/trezor-agent/issues) you have encountered.
|
||||
and please let me [know](https://github.com/romanz/trezor-agent/issues/new) if something doesn't
|
||||
work well for you. If possible:
|
||||
|
||||
* record the session (e.g. using [asciinema](https://asciinema.org))
|
||||
* attach the GPG agent log from `~/.gnupg/{trezor,ledger}/gpg-agent.log`
|
||||
|
||||
Thanks!
|
||||
|
||||
# Installation
|
||||
|
||||
First, verify that you have GPG 2.1+ installed
|
||||
First, verify that you have GPG 2.1.11+ installed
|
||||
([Debian](https://gist.github.com/vt0r/a2f8c0bcb1400131ff51),
|
||||
[macOS](https://sourceforge.net/p/gpgosx/docu/Download/)):
|
||||
|
||||
@@ -13,36 +18,88 @@ $ gpg2 --version | head -n1
|
||||
gpg (GnuPG) 2.1.15
|
||||
```
|
||||
|
||||
Update you TREZOR firmware to the latest version (at least v1.4.0).
|
||||
This GPG version is included in [Ubuntu 16.04](https://launchpad.net/ubuntu/+source/gnupg2)
|
||||
and [Linux Mint 18](https://community.linuxmint.com/software/view/gnupg2).
|
||||
|
||||
Update you device firmware to the latest version and install your specific `agent` package:
|
||||
|
||||
Install latest `trezor-agent` package from GitHub:
|
||||
```
|
||||
$ pip install --user git+https://github.com/romanz/trezor-agent.git
|
||||
$ pip install --user (trezor|keepkey|ledger)_agent
|
||||
```
|
||||
|
||||
# Quickstart
|
||||
|
||||
## Identity creation
|
||||
[](https://asciinema.org/a/c2yodst21h9obttkn9wgf3783)
|
||||
[](https://asciinema.org/a/90416)
|
||||
|
||||
In order to use specific device type for GPG indentity creation, use either command:
|
||||
```
|
||||
$ DEVICE=(trezor,ledger) ./scripts/gpg-init "John Doe <john@doe.bit>"
|
||||
```
|
||||
|
||||
## Sample usage (signature and decryption)
|
||||
[](https://asciinema.org/a/7x0h9tyoyu5ar6jc8y9oih0ba)
|
||||
[](https://asciinema.org/a/120441)
|
||||
|
||||
In order to use specific device type for GPG operations, set the following environment variable to either:
|
||||
```
|
||||
$ export GNUPGHOME=~/.gnupg/{trezor,ledger}
|
||||
```
|
||||
|
||||
You can use GNU Privacy Assistant (GPA) in order to inspect the created keys
|
||||
and perform signature and decryption operations using:
|
||||
|
||||
```
|
||||
$ sudo apt install gpa
|
||||
$ ./scripts/gpg-shell gpa
|
||||
$ GNUPGHOME=~/.gnupg/trezor gpa
|
||||
```
|
||||
[](https://www.gnupg.org/related_software/swlist.html#gpa)
|
||||
|
||||
## Git commit & tag signatures:
|
||||
Git can use GPG to sign and verify commits and tags (see [here](https://git-scm.com/book/en/v2/Git-Tools-Signing-Your-Work)):
|
||||
```
|
||||
$ git config --local commit.gpgsign 1
|
||||
$ git config --local gpg.program $(which gpg2)
|
||||
$ git commit --gpg-sign # create GPG-signed commit
|
||||
$ git log --show-signature -1 # verify commit signature
|
||||
$ git tag v1.2.3 --sign # create GPG-signed tag
|
||||
$ git tag v1.2.3 --verify # verify tag signature
|
||||
```
|
||||
|
||||
## Password manager
|
||||
|
||||
First install `pass` from [passwordstore.org](https://www.passwordstore.org/) and initialize it to use your TREZOR-based GPG identity:
|
||||
```
|
||||
$ export GNUPGHOME=~/.gnupg/trezor
|
||||
$ pass init "Roman Zeyde <roman.zeyde@gmail.com>"
|
||||
Password store initialized for Roman Zeyde <roman.zeyde@gmail.com>
|
||||
```
|
||||
Then, you can generate truly random passwords and save them encrypted using your public key (as separate `.gpg` files under `~/.password-store/`):
|
||||
```
|
||||
$ pass generate Dev/github 32
|
||||
$ pass generate Social/hackernews 32
|
||||
$ pass generate Social/twitter 32
|
||||
$ pass generate VPS/linode 32
|
||||
$ pass
|
||||
Password Store
|
||||
├── Dev
|
||||
│ └── github
|
||||
├── Social
|
||||
│ ├── hackernews
|
||||
│ └── twitter
|
||||
└── VPS
|
||||
└── linode
|
||||
```
|
||||
In order to paste them into the browser, you'd need to decrypt the password using your hardware device:
|
||||
```
|
||||
$ pass --clip VPS/linode
|
||||
Copied VPS/linode to clipboard. Will clear in 45 seconds.
|
||||
```
|
||||
|
||||
You can also use the following [Qt-based UI](https://qtpass.org/) for `pass`:
|
||||
```
|
||||
$ sudo apt install qtpass
|
||||
$ GNUPGHOME=~/.gnupg/trezor qtpass
|
||||
```
|
||||
|
||||
## Re-generation of an existing GPG identity
|
||||
[](https://asciinema.org/a/M4lRjEmGJ2RreQiHBGWT9pzp4)
|
||||
|
||||
45
README.md
45
README.md
@@ -1,10 +1,7 @@
|
||||
# Using TREZOR as a hardware SSH/GPG agent
|
||||
|
||||
[](https://travis-ci.org/romanz/trezor-agent)
|
||||
[](https://pypi.python.org/pypi/trezor_agent/)
|
||||
[](https://pypi.python.org/pypi/trezor_agent/)
|
||||
[](https://pypi.python.org/pypi/trezor_agent/)
|
||||
[](https://pypi.python.org/pypi/trezor_agent/)
|
||||
[](https://gitter.im/romanz/trezor-agent)
|
||||
|
||||
See SatoshiLabs' blog posts about this feature:
|
||||
|
||||
@@ -14,47 +11,15 @@ See SatoshiLabs' blog posts about this feature:
|
||||
|
||||
## Installation
|
||||
|
||||
First, make sure that the latest [trezorlib](https://pypi.python.org/pypi/trezor) Python package
|
||||
is installed correctly (at least v0.6.6):
|
||||
|
||||
$ apt-get install python-dev libusb-1.0-0-dev libudev-dev
|
||||
$ pip install -U setuptools
|
||||
$ pip install Cython trezor
|
||||
|
||||
Then, install the latest [trezor_agent](https://pypi.python.org/pypi/trezor_agent) package:
|
||||
|
||||
$ pip install trezor_agent
|
||||
|
||||
Or, directly from the latest source code (if `pip` doesn't work for you):
|
||||
|
||||
$ git clone https://github.com/romanz/trezor-agent && cd trezor-agent
|
||||
$ python setup.py build && python setup.py install
|
||||
|
||||
Finally, verify that you are running the latest [TREZOR firmware](https://wallet.mytrezor.com/data/firmware/releases.json) version (at least v1.4.0):
|
||||
|
||||
$ trezorctl get_features | head
|
||||
vendor: "bitcointrezor.com"
|
||||
major_version: 1
|
||||
minor_version: 4
|
||||
patch_version: 0
|
||||
...
|
||||
|
||||
If you have an error regarding `protobuf` imports (after installing it), please see [this issue](https://github.com/romanz/trezor-agent/issues/28).
|
||||
See the [following instructions](INSTALL.md) for the
|
||||
[TREZOR](https://trezor.io/), [Keepkey](https://www.keepkey.com/) and
|
||||
[Ledger Nano S](https://www.ledgerwallet.com/products/ledger-nano-s) devices.
|
||||
|
||||
## Usage
|
||||
|
||||
For SSH, see the [following instructions](README-SSH.md) (for Windows support,
|
||||
see [trezor-ssh-agent](https://github.com/martin-lizner/trezor-ssh-agent) project (by Martin Lízner)).
|
||||
see [trezor-ssh-agent](https://github.com/martin-lizner/trezor-ssh-agent) project by Martin Lízner).
|
||||
|
||||
For GPG, see the [following instructions](README-GPG.md).
|
||||
|
||||
See [here](https://github.com/romanz/python-trezor#pin-entering) for PIN entering instructions.
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
If there is an import problem with the installed `protobuf` package,
|
||||
see [this issue](https://github.com/romanz/trezor-agent/issues/28) for fixing it.
|
||||
|
||||
### Gitter
|
||||
|
||||
Questions, suggestions and discussions are welcome: [](https://gitter.im/romanz/trezor-agent)
|
||||
|
||||
7
agents/fake/fake_device_agent.py
Normal file
7
agents/fake/fake_device_agent.py
Normal file
@@ -0,0 +1,7 @@
|
||||
import libagent.gpg
|
||||
import libagent.ssh
|
||||
from libagent.device.fake_device import FakeDevice as DeviceType
|
||||
|
||||
ssh_agent = lambda: libagent.ssh.main(DeviceType)
|
||||
gpg_tool = lambda: libagent.gpg.main(DeviceType)
|
||||
gpg_agent = lambda: libagent.gpg.run_agent(DeviceType)
|
||||
42
agents/fake/setup.py
Normal file
42
agents/fake/setup.py
Normal file
@@ -0,0 +1,42 @@
|
||||
#!/usr/bin/env python
|
||||
from setuptools import setup
|
||||
|
||||
print('NEVER USE THIS CODE FOR REAL-LIFE USE-CASES!!!')
|
||||
print('ONLY FOR DEBUGGING AND TESTING!!!')
|
||||
|
||||
setup(
|
||||
name='fake_device_agent',
|
||||
version='0.9.0',
|
||||
description='Testing trezor_agent with a fake device - NOT SAFE!!!',
|
||||
author='Roman Zeyde',
|
||||
author_email='roman.zeyde@gmail.com',
|
||||
url='http://github.com/romanz/trezor-agent',
|
||||
scripts=['fake_device_agent.py'],
|
||||
install_requires=[
|
||||
'libagent>=0.9.0',
|
||||
],
|
||||
platforms=['POSIX'],
|
||||
classifiers=[
|
||||
'Environment :: Console',
|
||||
'Development Status :: 4 - Beta',
|
||||
'Intended Audience :: Developers',
|
||||
'Intended Audience :: Information Technology',
|
||||
'Intended Audience :: System Administrators',
|
||||
'License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)',
|
||||
'Operating System :: POSIX',
|
||||
'Programming Language :: Python :: 2.7',
|
||||
'Programming Language :: Python :: 3.4',
|
||||
'Programming Language :: Python :: 3.5',
|
||||
'Programming Language :: Python :: 3.6',
|
||||
'Topic :: Software Development :: Libraries :: Python Modules',
|
||||
'Topic :: System :: Networking',
|
||||
'Topic :: Communications',
|
||||
'Topic :: Security',
|
||||
'Topic :: Utilities',
|
||||
],
|
||||
entry_points={'console_scripts': [
|
||||
'fake-device-agent = fake_device_agent:ssh_agent',
|
||||
'fake-device-gpg = fake_device_agent:gpg_tool',
|
||||
'fake-device-gpg-agent = fake_device_agent:gpg_agent',
|
||||
]},
|
||||
)
|
||||
5
agents/keepkey/keepkey_agent.py
Normal file
5
agents/keepkey/keepkey_agent.py
Normal file
@@ -0,0 +1,5 @@
|
||||
import libagent.gpg
|
||||
import libagent.ssh
|
||||
from libagent.device import keepkey
|
||||
|
||||
ssh_agent = lambda: libagent.ssh.main(keepkey.KeepKey)
|
||||
38
agents/keepkey/setup.py
Normal file
38
agents/keepkey/setup.py
Normal file
@@ -0,0 +1,38 @@
|
||||
#!/usr/bin/env python
|
||||
from setuptools import setup
|
||||
|
||||
setup(
|
||||
name='keepkey_agent',
|
||||
version='0.9.0',
|
||||
description='Using KeepKey as hardware SSH/GPG agent',
|
||||
author='Roman Zeyde',
|
||||
author_email='roman.zeyde@gmail.com',
|
||||
url='http://github.com/romanz/trezor-agent',
|
||||
scripts=['keepkey_agent.py'],
|
||||
install_requires=[
|
||||
'libagent>=0.9.0',
|
||||
'keepkey>=0.7.3'
|
||||
],
|
||||
platforms=['POSIX'],
|
||||
classifiers=[
|
||||
'Environment :: Console',
|
||||
'Development Status :: 4 - Beta',
|
||||
'Intended Audience :: Developers',
|
||||
'Intended Audience :: Information Technology',
|
||||
'Intended Audience :: System Administrators',
|
||||
'License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)',
|
||||
'Operating System :: POSIX',
|
||||
'Programming Language :: Python :: 2.7',
|
||||
'Programming Language :: Python :: 3.4',
|
||||
'Programming Language :: Python :: 3.5',
|
||||
'Programming Language :: Python :: 3.6',
|
||||
'Topic :: Software Development :: Libraries :: Python Modules',
|
||||
'Topic :: System :: Networking',
|
||||
'Topic :: Communications',
|
||||
'Topic :: Security',
|
||||
'Topic :: Utilities',
|
||||
],
|
||||
entry_points={'console_scripts': [
|
||||
'keepkey-agent = keepkey_agent:ssh_agent',
|
||||
]},
|
||||
)
|
||||
7
agents/ledger/ledger_agent.py
Normal file
7
agents/ledger/ledger_agent.py
Normal file
@@ -0,0 +1,7 @@
|
||||
import libagent.gpg
|
||||
import libagent.ssh
|
||||
from libagent.device.ledger import LedgerNanoS as DeviceType
|
||||
|
||||
ssh_agent = lambda: libagent.ssh.main(DeviceType)
|
||||
gpg_tool = lambda: libagent.gpg.main(DeviceType)
|
||||
gpg_agent = lambda: libagent.gpg.run_agent(DeviceType)
|
||||
40
agents/ledger/setup.py
Normal file
40
agents/ledger/setup.py
Normal file
@@ -0,0 +1,40 @@
|
||||
#!/usr/bin/env python
|
||||
from setuptools import setup
|
||||
|
||||
setup(
|
||||
name='ledger_agent',
|
||||
version='0.9.0',
|
||||
description='Using Ledger as hardware SSH/GPG agent',
|
||||
author='Roman Zeyde',
|
||||
author_email='roman.zeyde@gmail.com',
|
||||
url='http://github.com/romanz/trezor-agent',
|
||||
scripts=['ledger_agent.py'],
|
||||
install_requires=[
|
||||
'libagent>=0.9.0',
|
||||
'ledgerblue>=0.1.8'
|
||||
],
|
||||
platforms=['POSIX'],
|
||||
classifiers=[
|
||||
'Environment :: Console',
|
||||
'Development Status :: 4 - Beta',
|
||||
'Intended Audience :: Developers',
|
||||
'Intended Audience :: Information Technology',
|
||||
'Intended Audience :: System Administrators',
|
||||
'License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)',
|
||||
'Operating System :: POSIX',
|
||||
'Programming Language :: Python :: 2.7',
|
||||
'Programming Language :: Python :: 3.4',
|
||||
'Programming Language :: Python :: 3.5',
|
||||
'Programming Language :: Python :: 3.6',
|
||||
'Topic :: Software Development :: Libraries :: Python Modules',
|
||||
'Topic :: System :: Networking',
|
||||
'Topic :: Communications',
|
||||
'Topic :: Security',
|
||||
'Topic :: Utilities',
|
||||
],
|
||||
entry_points={'console_scripts': [
|
||||
'ledger-agent = ledger_agent:ssh_agent',
|
||||
'ledger-gpg = ledger_agent:gpg_tool',
|
||||
'ledger-gpg-agent = ledger_agent:gpg_agent',
|
||||
]},
|
||||
)
|
||||
40
agents/trezor/setup.py
Normal file
40
agents/trezor/setup.py
Normal file
@@ -0,0 +1,40 @@
|
||||
#!/usr/bin/env python
|
||||
from setuptools import setup
|
||||
|
||||
setup(
|
||||
name='trezor_agent',
|
||||
version='0.9.0',
|
||||
description='Using Trezor as hardware SSH/GPG agent',
|
||||
author='Roman Zeyde',
|
||||
author_email='roman.zeyde@gmail.com',
|
||||
url='http://github.com/romanz/trezor-agent',
|
||||
scripts=['trezor_agent.py'],
|
||||
install_requires=[
|
||||
'libagent>=0.9.0',
|
||||
'trezor>=0.7.6'
|
||||
],
|
||||
platforms=['POSIX'],
|
||||
classifiers=[
|
||||
'Environment :: Console',
|
||||
'Development Status :: 4 - Beta',
|
||||
'Intended Audience :: Developers',
|
||||
'Intended Audience :: Information Technology',
|
||||
'Intended Audience :: System Administrators',
|
||||
'License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)',
|
||||
'Operating System :: POSIX',
|
||||
'Programming Language :: Python :: 2.7',
|
||||
'Programming Language :: Python :: 3.4',
|
||||
'Programming Language :: Python :: 3.5',
|
||||
'Programming Language :: Python :: 3.6',
|
||||
'Topic :: Software Development :: Libraries :: Python Modules',
|
||||
'Topic :: System :: Networking',
|
||||
'Topic :: Communications',
|
||||
'Topic :: Security',
|
||||
'Topic :: Utilities',
|
||||
],
|
||||
entry_points={'console_scripts': [
|
||||
'trezor-agent = trezor_agent:ssh_agent',
|
||||
'trezor-gpg = trezor_agent:gpg_tool',
|
||||
'trezor-gpg-agent = trezor_agent:gpg_agent',
|
||||
]},
|
||||
)
|
||||
7
agents/trezor/trezor_agent.py
Normal file
7
agents/trezor/trezor_agent.py
Normal file
@@ -0,0 +1,7 @@
|
||||
import libagent.gpg
|
||||
import libagent.ssh
|
||||
from libagent.device.trezor import Trezor as DeviceType
|
||||
|
||||
ssh_agent = lambda: libagent.ssh.main(DeviceType)
|
||||
gpg_tool = lambda: libagent.gpg.main(DeviceType)
|
||||
gpg_agent = lambda: libagent.gpg.run_agent(DeviceType)
|
||||
3
libagent/device/__init__.py
Normal file
3
libagent/device/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
"""Cryptographic hardware device management."""
|
||||
|
||||
from . import interface
|
||||
69
libagent/device/fake_device.py
Normal file
69
libagent/device/fake_device.py
Normal file
@@ -0,0 +1,69 @@
|
||||
"""Fake device - ONLY FOR TESTS!!! (NEVER USE WITH REAL DATA)."""
|
||||
|
||||
import hashlib
|
||||
import logging
|
||||
|
||||
import ecdsa
|
||||
|
||||
from . import interface
|
||||
from .. import formats
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _verify_support(identity):
|
||||
"""Make sure the device supports given configuration."""
|
||||
if identity.curve_name not in {formats.CURVE_NIST256}:
|
||||
raise NotImplementedError(
|
||||
'Unsupported elliptic curve: {}'.format(identity.curve_name))
|
||||
|
||||
|
||||
class FakeDevice(interface.Device):
|
||||
"""Connection to TREZOR device."""
|
||||
|
||||
def connect(self):
|
||||
"""Return "dummy" connection."""
|
||||
log.critical('NEVER USE THIS CODE FOR REAL-LIFE USE-CASES!!!')
|
||||
log.critical('ONLY FOR DEBUGGING AND TESTING!!!')
|
||||
# The code below uses HARD-CODED secret key - and should be used ONLY
|
||||
# for GnuPG integration tests (e.g. when no real device is available).
|
||||
# pylint: disable=attribute-defined-outside-init
|
||||
self.secexp = 1
|
||||
self.sk = ecdsa.SigningKey.from_secret_exponent(
|
||||
secexp=self.secexp, curve=ecdsa.curves.NIST256p, hashfunc=hashlib.sha256)
|
||||
self.vk = self.sk.get_verifying_key()
|
||||
return self
|
||||
|
||||
def close(self):
|
||||
"""Close connection."""
|
||||
self.conn = None
|
||||
|
||||
def pubkey(self, identity, ecdh=False):
|
||||
"""Return public key."""
|
||||
_verify_support(identity)
|
||||
data = self.vk.to_string()
|
||||
x, y = data[:32], data[32:]
|
||||
prefix = bytearray([2 + (bytearray(y)[0] & 1)])
|
||||
return bytes(prefix) + x
|
||||
|
||||
def sign(self, identity, blob):
|
||||
"""Sign given blob and return the signature (as bytes)."""
|
||||
if identity.identity_dict['proto'] in {'ssh'}:
|
||||
digest = hashlib.sha256(blob).digest()
|
||||
else:
|
||||
digest = blob
|
||||
return self.sk.sign_digest_deterministic(digest=digest,
|
||||
hashfunc=hashlib.sha256)
|
||||
|
||||
def ecdh(self, identity, pubkey):
|
||||
"""Get shared session key using Elliptic Curve Diffie-Hellman."""
|
||||
assert pubkey[:1] == b'\x04'
|
||||
peer = ecdsa.VerifyingKey.from_string(
|
||||
pubkey[1:],
|
||||
curve=ecdsa.curves.NIST256p,
|
||||
hashfunc=hashlib.sha256)
|
||||
shared = ecdsa.VerifyingKey.from_public_point(
|
||||
point=(peer.pubkey.point * self.secexp),
|
||||
curve=ecdsa.curves.NIST256p,
|
||||
hashfunc=hashlib.sha256)
|
||||
return shared.to_string()
|
||||
@@ -6,6 +6,8 @@ import logging
|
||||
import re
|
||||
import struct
|
||||
|
||||
import unidecode
|
||||
|
||||
from .. import formats, util
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@@ -54,7 +56,7 @@ class NotFoundError(Error):
|
||||
|
||||
|
||||
class DeviceError(Error):
|
||||
""""Error during device operation."""
|
||||
"""Error during device operation."""
|
||||
|
||||
|
||||
class Identity(object):
|
||||
@@ -67,22 +69,28 @@ class Identity(object):
|
||||
|
||||
def items(self):
|
||||
"""Return a copy of identity_dict items."""
|
||||
return self.identity_dict.items()
|
||||
return [(k, unidecode.unidecode(v))
|
||||
for k, v in self.identity_dict.items()]
|
||||
|
||||
def __str__(self):
|
||||
def to_bytes(self):
|
||||
"""Transliterate Unicode into ASCII."""
|
||||
s = identity_to_string(self.identity_dict)
|
||||
return unidecode.unidecode(s).encode('ascii')
|
||||
|
||||
def to_string(self):
|
||||
"""Return identity serialized to string."""
|
||||
return '<{}|{}>'.format(identity_to_string(self.identity_dict), self.curve_name)
|
||||
return u'<{}|{}>'.format(identity_to_string(self.identity_dict), self.curve_name)
|
||||
|
||||
def get_bip32_address(self, ecdh=False):
|
||||
"""Compute BIP32 derivation address according to SLIP-0013/0017."""
|
||||
index = struct.pack('<L', self.identity_dict.get('index', 0))
|
||||
addr = index + identity_to_string(self.identity_dict).encode('ascii')
|
||||
addr = index + self.to_bytes()
|
||||
log.debug('bip32 address string: %r', addr)
|
||||
digest = hashlib.sha256(addr).digest()
|
||||
s = io.BytesIO(bytearray(digest))
|
||||
|
||||
hardened = 0x80000000
|
||||
addr_0 = [13, 17][bool(ecdh)]
|
||||
addr_0 = 17 if bool(ecdh) else 13
|
||||
address_n = [addr_0] + list(util.recv(s, '<LLLL'))
|
||||
return [(hardened | value) for value in address_n]
|
||||
|
||||
@@ -20,7 +20,10 @@ def _verify_support(identity, ecdh):
|
||||
class KeepKey(trezor.Trezor):
|
||||
"""Connection to KeepKey device."""
|
||||
|
||||
from . import keepkey_defs as defs
|
||||
@property
|
||||
def _defs(self):
|
||||
from . import keepkey_defs
|
||||
return keepkey_defs
|
||||
|
||||
required_version = '>=1.0.4'
|
||||
|
||||
9
libagent/device/keepkey_defs.py
Normal file
9
libagent/device/keepkey_defs.py
Normal file
@@ -0,0 +1,9 @@
|
||||
"""KeepKey-related definitions."""
|
||||
|
||||
# pylint: disable=unused-import,import-error
|
||||
|
||||
from keepkeylib.client import CallException as Error
|
||||
from keepkeylib.client import KeepKeyClient as Client
|
||||
from keepkeylib.messages_pb2 import PassphraseAck, PinMatrixAck
|
||||
from keepkeylib.transport_hid import HidTransport as Transport
|
||||
from keepkeylib.types_pb2 import IdentityType
|
||||
@@ -4,7 +4,7 @@ import binascii
|
||||
import logging
|
||||
import struct
|
||||
|
||||
from ledgerblue import comm
|
||||
from ledgerblue import comm # pylint: disable=import-error
|
||||
|
||||
from . import interface
|
||||
|
||||
@@ -2,6 +2,9 @@
|
||||
|
||||
import binascii
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
|
||||
import semver
|
||||
|
||||
from . import interface
|
||||
@@ -12,20 +15,54 @@ log = logging.getLogger(__name__)
|
||||
class Trezor(interface.Device):
|
||||
"""Connection to TREZOR device."""
|
||||
|
||||
from . import trezor_defs as defs
|
||||
@property
|
||||
def _defs(self):
|
||||
from . import trezor_defs
|
||||
# Allow using TREZOR bridge transport (instead of the HID default)
|
||||
trezor_defs.Transport = {
|
||||
'bridge': trezor_defs.BridgeTransport,
|
||||
}.get(os.environ.get('TREZOR_TRANSPORT'), trezor_defs.HidTransport)
|
||||
return trezor_defs
|
||||
|
||||
required_version = '>=1.4.0'
|
||||
passphrase = os.environ.get('TREZOR_PASSPHRASE', '')
|
||||
|
||||
def connect(self):
|
||||
"""Enumerate and connect to the first USB HID interface."""
|
||||
def empty_passphrase_handler(_):
|
||||
return self.defs.PassphraseAck(passphrase='')
|
||||
def passphrase_handler(_):
|
||||
log.debug('using %s passphrase for %s',
|
||||
'non-empty' if self.passphrase else 'empty', self)
|
||||
return self._defs.PassphraseAck(passphrase=self.passphrase)
|
||||
|
||||
for d in self.defs.HidTransport.enumerate():
|
||||
def create_pin_handler(conn):
|
||||
if not sys.stdin.closed and os.isatty(sys.stdin.fileno()):
|
||||
return conn.callback_PinMatrixRequest # CLI-based PIN handler
|
||||
|
||||
def qt_handler(_):
|
||||
# pylint: disable=import-error
|
||||
from PyQt5.QtWidgets import QApplication, QInputDialog, QLineEdit
|
||||
label = ('Use the numeric keypad to describe number positions.\n'
|
||||
'The layout is:\n'
|
||||
' 7 8 9\n'
|
||||
' 4 5 6\n'
|
||||
' 1 2 3\n'
|
||||
'Please enter PIN:')
|
||||
app = QApplication([])
|
||||
qd = QInputDialog()
|
||||
qd.setTextEchoMode(QLineEdit.Password)
|
||||
qd.setLabelText(label)
|
||||
qd.show()
|
||||
app.exec_()
|
||||
return self._defs.PinMatrixAck(pin=qd.textValue())
|
||||
|
||||
return qt_handler
|
||||
|
||||
for d in self._defs.Transport.enumerate():
|
||||
log.debug('endpoint: %s', d)
|
||||
transport = self.defs.HidTransport(d)
|
||||
connection = self.defs.Client(transport)
|
||||
connection.callback_PassphraseRequest = empty_passphrase_handler
|
||||
transport = self._defs.Transport(d)
|
||||
connection = self._defs.Client(transport)
|
||||
connection.callback_PassphraseRequest = passphrase_handler
|
||||
connection.callback_PinMatrixRequest = create_pin_handler(connection)
|
||||
f = connection.features
|
||||
log.debug('connected to %s %s', self, f.device_id)
|
||||
log.debug('label : %s', f.label)
|
||||
@@ -60,7 +97,7 @@ class Trezor(interface.Device):
|
||||
return result.node.public_key
|
||||
|
||||
def _identity_proto(self, identity):
|
||||
result = self.defs.IdentityType()
|
||||
result = self._defs.IdentityType()
|
||||
for name, value in identity.items():
|
||||
setattr(result, name, value)
|
||||
return result
|
||||
@@ -80,7 +117,7 @@ class Trezor(interface.Device):
|
||||
assert len(result.signature) == 65
|
||||
assert result.signature[:1] == b'\x00'
|
||||
return result.signature[1:]
|
||||
except self.defs.CallException as e:
|
||||
except self._defs.Error as e:
|
||||
msg = '{} error: {}'.format(self, e)
|
||||
log.debug(msg, exc_info=True)
|
||||
raise interface.DeviceError(msg)
|
||||
@@ -99,7 +136,7 @@ class Trezor(interface.Device):
|
||||
assert len(result.session_key) in {65, 33} # NIST256 or Curve25519
|
||||
assert result.session_key[:1] == b'\x04'
|
||||
return result.session_key
|
||||
except self.defs.CallException as e:
|
||||
except self._defs.Error as e:
|
||||
msg = '{} error: {}'.format(self, e)
|
||||
log.debug(msg, exc_info=True)
|
||||
raise interface.DeviceError(msg)
|
||||
10
libagent/device/trezor_defs.py
Normal file
10
libagent/device/trezor_defs.py
Normal file
@@ -0,0 +1,10 @@
|
||||
"""TREZOR-related definitions."""
|
||||
|
||||
# pylint: disable=unused-import,import-error
|
||||
|
||||
from trezorlib.client import CallException as Error
|
||||
from trezorlib.client import TrezorClient as Client
|
||||
from trezorlib.messages_pb2 import PassphraseAck, PinMatrixAck
|
||||
from trezorlib.transport_bridge import BridgeTransport
|
||||
from trezorlib.transport_hid import HidTransport
|
||||
from trezorlib.types_pb2 import IdentityType
|
||||
@@ -184,7 +184,7 @@ def export_public_key(vk, label):
|
||||
key_type, blob = serialize_verifying_key(vk)
|
||||
log.debug('fingerprint: %s', fingerprint(blob))
|
||||
b64 = base64.b64encode(blob).decode('ascii')
|
||||
return '{} {} {}\n'.format(key_type.decode('ascii'), b64, label)
|
||||
return u'{} {} {}\n'.format(key_type.decode('ascii'), b64, label)
|
||||
|
||||
|
||||
def import_public_key(line):
|
||||
@@ -193,7 +193,7 @@ def import_public_key(line):
|
||||
file_type, base64blob, name = line.split()
|
||||
blob = base64.b64decode(base64blob)
|
||||
result = parse_pubkey(blob)
|
||||
result['name'] = name.encode('ascii')
|
||||
result['name'] = name.encode('utf-8')
|
||||
assert result['type'] == file_type.encode('ascii')
|
||||
log.debug('loaded %s public key: %s', file_type, result['fingerprint'])
|
||||
return result
|
||||
99
trezor_agent/gpg/__main__.py → libagent/gpg/__init__.py
Executable file → Normal file
99
trezor_agent/gpg/__main__.py → libagent/gpg/__init__.py
Executable file → Normal file
@@ -1,7 +1,16 @@
|
||||
#!/usr/bin/env python
|
||||
"""Create signatures and export public keys for GPG using TREZOR."""
|
||||
"""
|
||||
TREZOR support for ECDSA GPG signatures.
|
||||
|
||||
See these links for more details:
|
||||
- https://www.gnupg.org/faq/whats-new-in-2.1.html
|
||||
- https://tools.ietf.org/html/rfc4880
|
||||
- https://tools.ietf.org/html/rfc6637
|
||||
- https://tools.ietf.org/html/draft-irtf-cfrg-eddsa-05
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import contextlib
|
||||
import functools
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
@@ -15,14 +24,17 @@ from .. import device, formats, server, util
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def run_create(args):
|
||||
def export_public_key(device_type, args):
|
||||
"""Generate a new pubkey for a new/existing GPG identity."""
|
||||
log.warning('NOTE: in order to re-generate the exact same GPG key later, '
|
||||
'run this command with "--time=%d" commandline flag (to set '
|
||||
'the timestamp of the GPG key manually).', args.time)
|
||||
d = client.Client(user_id=args.user_id, curve_name=args.ecdsa_curve)
|
||||
verifying_key = d.pubkey(ecdh=False)
|
||||
decryption_key = d.pubkey(ecdh=True)
|
||||
c = client.Client(device=device_type())
|
||||
identity = client.create_identity(user_id=args.user_id,
|
||||
curve_name=args.ecdsa_curve)
|
||||
verifying_key = c.pubkey(identity=identity, ecdh=False)
|
||||
decryption_key = c.pubkey(identity=identity, ecdh=True)
|
||||
signer_func = functools.partial(c.sign, identity=identity)
|
||||
|
||||
if args.subkey: # add as subkey
|
||||
log.info('adding %s GPG subkey for "%s" to existing key',
|
||||
@@ -38,10 +50,10 @@ def run_create(args):
|
||||
primary_bytes = keyring.export_public_key(args.user_id)
|
||||
result = encode.create_subkey(primary_bytes=primary_bytes,
|
||||
subkey=signing_key,
|
||||
signer_func=d.sign)
|
||||
signer_func=signer_func)
|
||||
result = encode.create_subkey(primary_bytes=result,
|
||||
subkey=encryption_key,
|
||||
signer_func=d.sign)
|
||||
signer_func=signer_func)
|
||||
else: # add as primary
|
||||
log.info('creating new %s GPG primary key for "%s"',
|
||||
args.ecdsa_curve, args.user_id)
|
||||
@@ -56,45 +68,45 @@ def run_create(args):
|
||||
|
||||
result = encode.create_primary(user_id=args.user_id,
|
||||
pubkey=primary,
|
||||
signer_func=d.sign)
|
||||
signer_func=signer_func)
|
||||
result = encode.create_subkey(primary_bytes=result,
|
||||
subkey=subkey,
|
||||
signer_func=d.sign)
|
||||
signer_func=signer_func)
|
||||
|
||||
sys.stdout.write(protocol.armor(result, 'PUBLIC KEY BLOCK'))
|
||||
|
||||
|
||||
def main_create():
|
||||
"""Main function for GPG identity creation."""
|
||||
p = argparse.ArgumentParser()
|
||||
p.add_argument('user_id')
|
||||
p.add_argument('-e', '--ecdsa-curve', default='nist256p1')
|
||||
p.add_argument('-t', '--time', type=int, default=int(time.time()))
|
||||
p.add_argument('-v', '--verbose', default=0, action='count')
|
||||
p.add_argument('-s', '--subkey', default=False, action='store_true')
|
||||
|
||||
args = p.parse_args()
|
||||
def run_create(device_type, args):
|
||||
"""Export public GPG key."""
|
||||
util.setup_logging(verbosity=args.verbose)
|
||||
log.warning('This GPG tool is still in EXPERIMENTAL mode, '
|
||||
'so please note that the API and features may '
|
||||
'change without backwards compatibility!')
|
||||
|
||||
existing_gpg = keyring.gpg_version().decode('ascii')
|
||||
required_gpg = '>=2.1.15'
|
||||
required_gpg = '>=2.1.11'
|
||||
if semver.match(existing_gpg, required_gpg):
|
||||
run_create(args)
|
||||
export_public_key(device_type, args)
|
||||
else:
|
||||
log.error('Existing gpg2 has version "%s" (%s required)',
|
||||
existing_gpg, required_gpg)
|
||||
|
||||
|
||||
def main_agent():
|
||||
def run_unlock(device_type, args):
|
||||
"""Unlock hardware device (for future interaction)."""
|
||||
util.setup_logging(verbosity=args.verbose)
|
||||
with device_type() as d:
|
||||
log.info('unlocked %s device', d)
|
||||
|
||||
|
||||
def run_agent(device_type):
|
||||
"""Run a simple GPG-agent server."""
|
||||
home_dir = os.environ.get('GNUPGHOME', os.path.expanduser('~/.gnupg/trezor'))
|
||||
config_file = os.path.join(home_dir, 'gpg-agent.conf')
|
||||
if not os.path.exists(config_file):
|
||||
msg = 'No configuration file found: {}'.format(config_file)
|
||||
raise IOError(msg)
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('--homedir', default=os.environ.get('GNUPGHOME'))
|
||||
args, _ = parser.parse_known_args()
|
||||
|
||||
assert args.homedir
|
||||
config_file = os.path.join(args.homedir, 'gpg-agent.conf')
|
||||
|
||||
lines = (line.strip() for line in open(config_file))
|
||||
lines = (line for line in lines if line and not line.startswith('#'))
|
||||
@@ -103,24 +115,35 @@ def main_agent():
|
||||
util.setup_logging(verbosity=int(config['verbosity']),
|
||||
filename=config['log-file'])
|
||||
sock_path = keyring.get_agent_sock_path()
|
||||
handler = agent.Handler(device=device_type())
|
||||
with server.unix_domain_socket_server(sock_path) as sock:
|
||||
for conn in agent.yield_connections(sock):
|
||||
with contextlib.closing(conn):
|
||||
try:
|
||||
agent.handle_connection(conn)
|
||||
except StopIteration:
|
||||
handler.handle(conn)
|
||||
except agent.AgentStop:
|
||||
log.info('stopping gpg-agent')
|
||||
return
|
||||
except Exception as e: # pylint: disable=broad-except
|
||||
log.exception('gpg-agent failed: %s', e)
|
||||
|
||||
|
||||
def auto_unlock():
|
||||
"""Automatically unlock first found device (used for `gpg-shell`)."""
|
||||
p = argparse.ArgumentParser()
|
||||
p.add_argument('-v', '--verbose', default=0, action='count')
|
||||
def main(device_type):
|
||||
"""Parse command-line arguments."""
|
||||
parser = argparse.ArgumentParser()
|
||||
subparsers = parser.add_subparsers()
|
||||
|
||||
args = p.parse_args()
|
||||
util.setup_logging(verbosity=args.verbose)
|
||||
d = device.detect()
|
||||
log.info('unlocked %s device', d)
|
||||
p = subparsers.add_parser('create', help='Export public GPG key')
|
||||
p.add_argument('user_id')
|
||||
p.add_argument('-e', '--ecdsa-curve', default='nist256p1')
|
||||
p.add_argument('-t', '--time', type=int, default=int(time.time()))
|
||||
p.add_argument('-v', '--verbose', default=0, action='count')
|
||||
p.add_argument('-s', '--subkey', default=False, action='store_true')
|
||||
p.set_defaults(func=run_create)
|
||||
|
||||
p = subparsers.add_parser('unlock', help='Unlock the hardware device')
|
||||
p.add_argument('-v', '--verbose', default=0, action='count')
|
||||
p.set_defaults(func=run_unlock)
|
||||
|
||||
args = parser.parse_args()
|
||||
return args.func(device_type=device_type, args=args)
|
||||
209
libagent/gpg/agent.py
Normal file
209
libagent/gpg/agent.py
Normal file
@@ -0,0 +1,209 @@
|
||||
"""GPG-agent utilities."""
|
||||
import binascii
|
||||
import logging
|
||||
|
||||
from . import client, decode, keyring, protocol
|
||||
from .. import util
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def yield_connections(sock):
|
||||
"""Run a server on the specified socket."""
|
||||
while True:
|
||||
log.debug('waiting for connection on %s', sock.getsockname())
|
||||
try:
|
||||
conn, _ = sock.accept()
|
||||
except KeyboardInterrupt:
|
||||
return
|
||||
conn.settimeout(None)
|
||||
log.debug('accepted connection on %s', sock.getsockname())
|
||||
yield conn
|
||||
|
||||
|
||||
def serialize(data):
|
||||
"""Serialize data according to ASSUAN protocol."""
|
||||
for c in [b'%', b'\n', b'\r']:
|
||||
escaped = '%{:02X}'.format(ord(c)).encode('ascii')
|
||||
data = data.replace(c, escaped)
|
||||
return data
|
||||
|
||||
|
||||
def sig_encode(r, s):
|
||||
"""Serialize ECDSA signature data into GPG S-expression."""
|
||||
r = serialize(util.num2bytes(r, 32))
|
||||
s = serialize(util.num2bytes(s, 32))
|
||||
return b'(7:sig-val(5:ecdsa(1:r32:' + r + b')(1:s32:' + s + b')))'
|
||||
|
||||
|
||||
def _serialize_point(data):
|
||||
prefix = '{}:'.format(len(data)).encode('ascii')
|
||||
# https://www.gnupg.org/documentation/manuals/assuan/Server-responses.html
|
||||
return b'(5:value' + serialize(prefix + data) + b')'
|
||||
|
||||
|
||||
def parse_ecdh(line):
|
||||
"""Parse ECDH request and return remote public key."""
|
||||
prefix, line = line.split(b' ', 1)
|
||||
assert prefix == b'D'
|
||||
exp, leftover = keyring.parse(keyring.unescape(line))
|
||||
log.debug('ECDH s-exp: %r', exp)
|
||||
assert not leftover
|
||||
label, exp = exp
|
||||
assert label == b'enc-val'
|
||||
assert exp[0] == b'ecdh'
|
||||
items = exp[1:]
|
||||
log.debug('ECDH parameters: %r', items)
|
||||
return dict(items)[b'e']
|
||||
|
||||
|
||||
class AgentError(Exception):
|
||||
"""GnuPG agent-related error."""
|
||||
|
||||
|
||||
class AgentStop(Exception):
|
||||
"""Raised to close the agent."""
|
||||
|
||||
|
||||
class Handler(object):
|
||||
"""GPG agent requests' handler."""
|
||||
|
||||
def __init__(self, device):
|
||||
"""C-tor."""
|
||||
self.client = client.Client(device=device)
|
||||
# Cache ASSUAN commands' arguments between commands
|
||||
self.keygrip = None
|
||||
self.digest = None
|
||||
self.algo = None
|
||||
# Cache public keys from GnuPG
|
||||
self.pubkey_bytes = keyring.export_public_keys()
|
||||
# "Clone" existing GPG version
|
||||
self.version = keyring.gpg_version()
|
||||
|
||||
self.handlers = {
|
||||
b'RESET': None,
|
||||
b'OPTION': None,
|
||||
b'SETKEYDESC': None,
|
||||
b'GETINFO': lambda conn, _: keyring.sendline(conn, b'D ' + self.version),
|
||||
b'AGENT_ID': lambda conn, _: keyring.sendline(conn, b'D TREZOR'), # "Fake" agent ID
|
||||
b'SIGKEY': lambda _, args: self.set_key(*args),
|
||||
b'SETKEY': lambda _, args: self.set_key(*args),
|
||||
b'SETHASH': lambda _, args: self.set_hash(*args),
|
||||
b'PKSIGN': lambda conn, _: self.pksign(conn),
|
||||
b'PKDECRYPT': lambda conn, _: self.pkdecrypt(conn),
|
||||
b'HAVEKEY': lambda _, args: self.have_key(*args),
|
||||
b'KEYINFO': lambda conn, _: self.key_info(conn),
|
||||
b'SCD': self.handle_scd,
|
||||
}
|
||||
|
||||
def handle_scd(self, conn, args):
|
||||
"""No support for smart-card device protocol."""
|
||||
reply = {
|
||||
(b'GETINFO', b'version'): self.version,
|
||||
}.get(args)
|
||||
if reply is None:
|
||||
raise AgentError(b'ERR 100696144 No such device <SCD>')
|
||||
keyring.sendline(conn, b'D ' + reply)
|
||||
|
||||
@util.memoize
|
||||
def get_identity(self, keygrip):
|
||||
"""
|
||||
Returns device.interface.Identity that matches specified keygrip.
|
||||
|
||||
In case of missing keygrip, KeyError will be raised.
|
||||
"""
|
||||
keygrip_bytes = binascii.unhexlify(keygrip)
|
||||
pubkey_dict, user_ids = decode.load_by_keygrip(
|
||||
pubkey_bytes=self.pubkey_bytes, keygrip=keygrip_bytes)
|
||||
# We assume the first user ID is used to generate TREZOR-based GPG keys.
|
||||
user_id = user_ids[0]['value'].decode('utf-8')
|
||||
curve_name = protocol.get_curve_name_by_oid(pubkey_dict['curve_oid'])
|
||||
ecdh = (pubkey_dict['algo'] == protocol.ECDH_ALGO_ID)
|
||||
|
||||
identity = client.create_identity(user_id=user_id, curve_name=curve_name)
|
||||
verifying_key = self.client.pubkey(identity=identity, ecdh=ecdh)
|
||||
pubkey = protocol.PublicKey(
|
||||
curve_name=curve_name, created=pubkey_dict['created'],
|
||||
verifying_key=verifying_key, ecdh=ecdh)
|
||||
assert pubkey.key_id() == pubkey_dict['key_id']
|
||||
assert pubkey.keygrip() == keygrip_bytes
|
||||
return identity
|
||||
|
||||
def pksign(self, conn):
|
||||
"""Sign a message digest using a private EC key."""
|
||||
log.debug('signing %r digest (algo #%s)', self.digest, self.algo)
|
||||
identity = self.get_identity(keygrip=self.keygrip)
|
||||
r, s = self.client.sign(identity=identity,
|
||||
digest=binascii.unhexlify(self.digest))
|
||||
result = sig_encode(r, s)
|
||||
log.debug('result: %r', result)
|
||||
keyring.sendline(conn, b'D ' + result)
|
||||
|
||||
def pkdecrypt(self, conn):
|
||||
"""Handle decryption using ECDH."""
|
||||
for msg in [b'S INQUIRE_MAXLEN 4096', b'INQUIRE CIPHERTEXT']:
|
||||
keyring.sendline(conn, msg)
|
||||
|
||||
line = keyring.recvline(conn)
|
||||
assert keyring.recvline(conn) == b'END'
|
||||
remote_pubkey = parse_ecdh(line)
|
||||
|
||||
identity = self.get_identity(keygrip=self.keygrip)
|
||||
ec_point = self.client.ecdh(identity=identity, pubkey=remote_pubkey)
|
||||
keyring.sendline(conn, b'D ' + _serialize_point(ec_point))
|
||||
|
||||
@util.memoize
|
||||
def have_key(self, *keygrips):
|
||||
"""Check if current keygrip correspond to a TREZOR-based key."""
|
||||
try:
|
||||
self.get_identity(keygrip=keygrips[0])
|
||||
except KeyError as e:
|
||||
log.warning('HAVEKEY(%s) failed: %s', keygrips, e)
|
||||
raise AgentError(b'ERR 67108881 No secret key <GPG Agent>')
|
||||
|
||||
def key_info(self, conn):
|
||||
"""
|
||||
Dummy reply (mainly for 'gpg --edit' to succeed).
|
||||
|
||||
For details, see GnuPG agent KEYINFO command help.
|
||||
https://git.gnupg.org/cgi-bin/gitweb.cgi?p=gnupg.git;a=blob;f=agent/command.c;h=c8b34e9882076b1b724346787781f657cac75499;hb=refs/heads/master#l1082
|
||||
"""
|
||||
fmt = 'S KEYINFO {0} X - - - - - - -'
|
||||
keyring.sendline(conn, fmt.format(self.keygrip).encode('ascii'))
|
||||
|
||||
def set_key(self, keygrip):
|
||||
"""Set hexadecimal keygrip for next operation."""
|
||||
self.keygrip = keygrip
|
||||
|
||||
def set_hash(self, algo, digest):
|
||||
"""Set algorithm ID and hexadecimal digest for next operation."""
|
||||
self.algo = algo
|
||||
self.digest = digest
|
||||
|
||||
def handle(self, conn):
|
||||
"""Handle connection from GPG binary using the ASSUAN protocol."""
|
||||
keyring.sendline(conn, b'OK')
|
||||
for line in keyring.iterlines(conn):
|
||||
parts = line.split(b' ')
|
||||
command = parts[0]
|
||||
args = tuple(parts[1:])
|
||||
|
||||
if command == b'BYE':
|
||||
return
|
||||
elif command == b'KILLAGENT':
|
||||
keyring.sendline(conn, b'OK')
|
||||
raise AgentStop()
|
||||
|
||||
if command not in self.handlers:
|
||||
log.error('unknown request: %r', line)
|
||||
continue
|
||||
|
||||
handler = self.handlers[command]
|
||||
if handler:
|
||||
try:
|
||||
handler(conn, args)
|
||||
except AgentError as e:
|
||||
msg, = e.args
|
||||
keyring.sendline(conn, msg)
|
||||
continue
|
||||
keyring.sendline(conn, b'OK')
|
||||
48
libagent/gpg/client.py
Normal file
48
libagent/gpg/client.py
Normal file
@@ -0,0 +1,48 @@
|
||||
"""Device abstraction layer for GPG operations."""
|
||||
|
||||
import logging
|
||||
|
||||
from .. import formats, util
|
||||
from ..device import interface
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def create_identity(user_id, curve_name):
|
||||
"""Create GPG identity for hardware device."""
|
||||
result = interface.Identity(identity_str='gpg://', curve_name=curve_name)
|
||||
result.identity_dict['host'] = user_id
|
||||
return result
|
||||
|
||||
|
||||
class Client(object):
|
||||
"""Sign messages and get public keys from a hardware device."""
|
||||
|
||||
def __init__(self, device):
|
||||
"""C-tor."""
|
||||
self.device = device
|
||||
|
||||
def pubkey(self, identity, ecdh=False):
|
||||
"""Return public key as VerifyingKey object."""
|
||||
with self.device:
|
||||
pubkey = self.device.pubkey(ecdh=ecdh, identity=identity)
|
||||
return formats.decompress_pubkey(
|
||||
pubkey=pubkey, curve_name=identity.curve_name)
|
||||
|
||||
def sign(self, identity, digest):
|
||||
"""Sign the digest and return a serialized signature."""
|
||||
log.info('please confirm GPG signature on %s for "%s"...',
|
||||
self.device, identity)
|
||||
if identity.curve_name == formats.CURVE_NIST256:
|
||||
digest = digest[:32] # sign the first 256 bits
|
||||
log.debug('signing digest: %s', util.hexlify(digest))
|
||||
with self.device:
|
||||
sig = self.device.sign(blob=digest, identity=identity)
|
||||
return (util.bytes2num(sig[:32]), util.bytes2num(sig[32:]))
|
||||
|
||||
def ecdh(self, identity, pubkey):
|
||||
"""Derive shared secret using ECDH from remote public key."""
|
||||
log.info('please confirm GPG decryption on %s for "%s"...',
|
||||
self.device, identity)
|
||||
with self.device:
|
||||
return self.device.ecdh(pubkey=pubkey, identity=identity)
|
||||
@@ -54,7 +54,8 @@ def parse_mpis(s, n):
|
||||
|
||||
def _parse_nist256p1_pubkey(mpi):
|
||||
prefix, x, y = util.split_bits(mpi, 4, 256, 256)
|
||||
assert prefix == 4
|
||||
if prefix != 4:
|
||||
raise ValueError('Invalid MPI prefix: {}'.format(prefix))
|
||||
point = ecdsa.ellipticcurve.Point(curve=ecdsa.NIST256p.curve,
|
||||
x=x, y=y)
|
||||
return ecdsa.VerifyingKey.from_public_point(
|
||||
@@ -64,7 +65,8 @@ def _parse_nist256p1_pubkey(mpi):
|
||||
|
||||
def _parse_ed25519_pubkey(mpi):
|
||||
prefix, value = util.split_bits(mpi, 8, 256)
|
||||
assert prefix == 0x40
|
||||
if prefix != 0x40:
|
||||
raise ValueError('Invalid MPI prefix: {}'.format(prefix))
|
||||
return ed25519.VerifyingKey(util.num2bytes(value, size=32))
|
||||
|
||||
|
||||
@@ -160,7 +162,7 @@ def _parse_pubkey(stream, packet_type='pubkey'):
|
||||
# should be b'\x03\x01\x08\x07': SHA256 + AES128
|
||||
size, = util.readfmt(leftover, 'B')
|
||||
p['kdf'] = leftover.read(size)
|
||||
assert not leftover.read()
|
||||
p['secret'] = leftover.read()
|
||||
|
||||
parse_func, keygrip_func = SUPPORTED_CURVES[oid]
|
||||
keygrip = keygrip_func(parse_func(mpi))
|
||||
@@ -199,7 +201,9 @@ _parse_attribute = functools.partial(_parse_user_id,
|
||||
|
||||
PACKET_TYPES = {
|
||||
2: _parse_signature,
|
||||
5: _parse_pubkey,
|
||||
6: _parse_pubkey,
|
||||
7: _parse_subkey,
|
||||
13: _parse_user_id,
|
||||
14: _parse_subkey,
|
||||
17: _parse_attribute,
|
||||
@@ -243,11 +247,13 @@ def parse_packets(stream):
|
||||
packet_data = reader.read(packet_size)
|
||||
packet_type = PACKET_TYPES.get(tag)
|
||||
|
||||
p = {'type': 'unknown', 'tag': tag, 'raw': packet_data}
|
||||
if packet_type is not None:
|
||||
p = packet_type(util.Reader(io.BytesIO(packet_data)))
|
||||
p['tag'] = tag
|
||||
else:
|
||||
p = {'type': 'unknown', 'tag': tag, 'raw': packet_data}
|
||||
try:
|
||||
p = packet_type(util.Reader(io.BytesIO(packet_data)))
|
||||
p['tag'] = tag
|
||||
except ValueError:
|
||||
log.exception('Skipping packet: %s', util.hexlify(packet_data))
|
||||
|
||||
log.debug('packet "%s": %s', p['type'], p)
|
||||
yield p
|
||||
@@ -289,6 +295,7 @@ def load_by_keygrip(pubkey_bytes, keygrip):
|
||||
for p in packets:
|
||||
if p.get('keygrip') == keygrip:
|
||||
return p, user_ids
|
||||
raise KeyError('{} keygrip not found'.format(util.hexlify(keygrip)))
|
||||
|
||||
|
||||
def load_signature(stream, original_data):
|
||||
@@ -8,15 +8,14 @@ from .. import util
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def create_primary(user_id, pubkey, signer_func):
|
||||
def create_primary(user_id, pubkey, signer_func, secret_bytes=b''):
|
||||
"""Export new primary GPG public key, ready for "gpg2 --import"."""
|
||||
pubkey_packet = protocol.packet(tag=6, blob=pubkey.data())
|
||||
user_id_packet = protocol.packet(tag=13,
|
||||
blob=user_id.encode('ascii'))
|
||||
|
||||
data_to_sign = (pubkey.data_to_hash() +
|
||||
user_id_packet[:1] +
|
||||
util.prefix_len('>L', user_id.encode('ascii')))
|
||||
pubkey_packet = protocol.packet(tag=(5 if secret_bytes else 6),
|
||||
blob=(pubkey.data() + secret_bytes))
|
||||
user_id_bytes = user_id.encode('utf-8')
|
||||
user_id_packet = protocol.packet(tag=13, blob=user_id_bytes)
|
||||
data_to_sign = (pubkey.data_to_hash() + user_id_packet[:1] +
|
||||
util.prefix_len('>L', user_id_bytes))
|
||||
hashed_subpackets = [
|
||||
protocol.subpacket_time(pubkey.created), # signature time
|
||||
# https://tools.ietf.org/html/rfc4880#section-5.2.3.7
|
||||
@@ -47,9 +46,10 @@ def create_primary(user_id, pubkey, signer_func):
|
||||
return pubkey_packet + user_id_packet + sign_packet
|
||||
|
||||
|
||||
def create_subkey(primary_bytes, subkey, signer_func, user_id=None):
|
||||
def create_subkey(primary_bytes, subkey, signer_func, secret_bytes=b''):
|
||||
"""Export new subkey to GPG primary key."""
|
||||
subkey_packet = protocol.packet(tag=14, blob=subkey.data())
|
||||
subkey_packet = protocol.packet(tag=(7 if secret_bytes else 14),
|
||||
blob=(subkey.data() + secret_bytes))
|
||||
packets = list(decode.parse_packets(io.BytesIO(primary_bytes)))
|
||||
primary, user_id, signature = packets[:3]
|
||||
|
||||
@@ -41,8 +41,7 @@ def public_key_path(request):
|
||||
|
||||
def test_gpg_files(public_key_path): # pylint: disable=redefined-outer-name
|
||||
with open(public_key_path, 'rb') as f:
|
||||
packets = list(decode.parse_packets(f))
|
||||
assert len(packets) > 0
|
||||
assert list(decode.parse_packets(f))
|
||||
|
||||
|
||||
def test_has_custom_subpacket():
|
||||
@@ -56,3 +55,8 @@ def test_has_custom_subpacket():
|
||||
for marker in custom_markers:
|
||||
sig = {'unhashed_subpackets': [marker]}
|
||||
assert decode.has_custom_subpacket(sig)
|
||||
|
||||
|
||||
def test_load_by_keygrip_missing():
|
||||
with pytest.raises(KeyError):
|
||||
decode.load_by_keygrip(pubkey_bytes=b'', keygrip=b'')
|
||||
@@ -1,19 +1,15 @@
|
||||
"""UNIX-domain socket server for ssh-agent implementation."""
|
||||
import contextlib
|
||||
import functools
|
||||
import logging
|
||||
import os
|
||||
import socket
|
||||
import subprocess
|
||||
import tempfile
|
||||
import threading
|
||||
|
||||
from . import util
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
UNIX_SOCKET_TIMEOUT = 0.1
|
||||
|
||||
|
||||
def remove_file(path, remove=os.remove, exists=os.path.exists):
|
||||
"""Remove file, and raise OSError if still exists."""
|
||||
@@ -114,39 +110,6 @@ def spawn(func, kwargs):
|
||||
t.join()
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def serve(handler, sock_path=None, timeout=UNIX_SOCKET_TIMEOUT):
|
||||
"""
|
||||
Start the ssh-agent server on a UNIX-domain socket.
|
||||
|
||||
If no connection is made during the specified timeout,
|
||||
retry until the context is over.
|
||||
"""
|
||||
ssh_version = subprocess.check_output(['ssh', '-V'],
|
||||
stderr=subprocess.STDOUT)
|
||||
log.debug('local SSH version: %r', ssh_version)
|
||||
if sock_path is None:
|
||||
sock_path = tempfile.mktemp(prefix='trezor-ssh-agent-')
|
||||
|
||||
environ = {'SSH_AUTH_SOCK': sock_path, 'SSH_AGENT_PID': str(os.getpid())}
|
||||
device_mutex = threading.Lock()
|
||||
with unix_domain_socket_server(sock_path) as sock:
|
||||
sock.settimeout(timeout)
|
||||
quit_event = threading.Event()
|
||||
handle_conn = functools.partial(handle_connection,
|
||||
handler=handler,
|
||||
mutex=device_mutex)
|
||||
kwargs = dict(sock=sock,
|
||||
handle_conn=handle_conn,
|
||||
quit_event=quit_event)
|
||||
with spawn(server_thread, kwargs):
|
||||
try:
|
||||
yield environ
|
||||
finally:
|
||||
log.debug('closing server')
|
||||
quit_event.set()
|
||||
|
||||
|
||||
def run_process(command, environ):
|
||||
"""
|
||||
Run the specified process and wait until it finishes.
|
||||
230
libagent/ssh/__init__.py
Normal file
230
libagent/ssh/__init__.py
Normal file
@@ -0,0 +1,230 @@
|
||||
"""SSH-agent implementation using hardware authentication devices."""
|
||||
import argparse
|
||||
import contextlib
|
||||
import functools
|
||||
import io
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import threading
|
||||
|
||||
|
||||
from .. import device, formats, server, util
|
||||
from . import client, protocol
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
UNIX_SOCKET_TIMEOUT = 0.1
|
||||
|
||||
|
||||
def ssh_args(label):
|
||||
"""Create SSH command for connecting specified server."""
|
||||
identity = device.interface.string_to_identity(label)
|
||||
|
||||
args = []
|
||||
if 'port' in identity:
|
||||
args += ['-p', identity['port']]
|
||||
if 'user' in identity:
|
||||
args += ['-l', identity['user']]
|
||||
|
||||
return args + [identity['host']]
|
||||
|
||||
|
||||
def mosh_args(label):
|
||||
"""Create SSH command for connecting specified server."""
|
||||
identity = device.interface.string_to_identity(label)
|
||||
|
||||
args = []
|
||||
if 'port' in identity:
|
||||
args += ['-p', identity['port']]
|
||||
if 'user' in identity:
|
||||
args += [identity['user']+'@'+identity['host']]
|
||||
else:
|
||||
args += [identity['host']]
|
||||
|
||||
return args
|
||||
|
||||
|
||||
def _to_unicode(s):
|
||||
try:
|
||||
return unicode(s, 'utf-8')
|
||||
except NameError:
|
||||
return s
|
||||
|
||||
|
||||
def create_agent_parser():
|
||||
"""Create an ArgumentParser for this tool."""
|
||||
p = argparse.ArgumentParser()
|
||||
p.add_argument('-v', '--verbose', default=0, action='count')
|
||||
|
||||
curve_names = [name for name in formats.SUPPORTED_CURVES]
|
||||
curve_names = ', '.join(sorted(curve_names))
|
||||
p.add_argument('-e', '--ecdsa-curve-name', metavar='CURVE',
|
||||
default=formats.CURVE_NIST256,
|
||||
help='specify ECDSA curve name: ' + curve_names)
|
||||
p.add_argument('--timeout',
|
||||
default=UNIX_SOCKET_TIMEOUT, type=float,
|
||||
help='Timeout for accepting SSH client connections')
|
||||
p.add_argument('--debug', default=False, action='store_true',
|
||||
help='Log SSH protocol messages for debugging.')
|
||||
|
||||
g = p.add_mutually_exclusive_group()
|
||||
g.add_argument('-s', '--shell', default=False, action='store_true',
|
||||
help='run ${SHELL} as subprocess under SSH agent')
|
||||
g.add_argument('-c', '--connect', default=False, action='store_true',
|
||||
help='connect to specified host via SSH')
|
||||
g.add_argument('--mosh', default=False, action='store_true',
|
||||
help='connect to specified host via using Mosh')
|
||||
|
||||
p.add_argument('identity', type=_to_unicode, default=None,
|
||||
help='proto://[user@]host[:port][/path]')
|
||||
p.add_argument('command', type=str, nargs='*', metavar='ARGUMENT',
|
||||
help='command to run under the SSH agent')
|
||||
return p
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def serve(handler, sock_path=None, timeout=UNIX_SOCKET_TIMEOUT):
|
||||
"""
|
||||
Start the ssh-agent server on a UNIX-domain socket.
|
||||
|
||||
If no connection is made during the specified timeout,
|
||||
retry until the context is over.
|
||||
"""
|
||||
ssh_version = subprocess.check_output(['ssh', '-V'],
|
||||
stderr=subprocess.STDOUT)
|
||||
log.debug('local SSH version: %r', ssh_version)
|
||||
if sock_path is None:
|
||||
sock_path = tempfile.mktemp(prefix='trezor-ssh-agent-')
|
||||
|
||||
environ = {'SSH_AUTH_SOCK': sock_path, 'SSH_AGENT_PID': str(os.getpid())}
|
||||
device_mutex = threading.Lock()
|
||||
with server.unix_domain_socket_server(sock_path) as sock:
|
||||
sock.settimeout(timeout)
|
||||
quit_event = threading.Event()
|
||||
handle_conn = functools.partial(server.handle_connection,
|
||||
handler=handler,
|
||||
mutex=device_mutex)
|
||||
kwargs = dict(sock=sock,
|
||||
handle_conn=handle_conn,
|
||||
quit_event=quit_event)
|
||||
with server.spawn(server.server_thread, kwargs):
|
||||
try:
|
||||
yield environ
|
||||
finally:
|
||||
log.debug('closing server')
|
||||
quit_event.set()
|
||||
|
||||
|
||||
def run_server(conn, command, debug, timeout):
|
||||
"""Common code for run_agent and run_git below."""
|
||||
try:
|
||||
handler = protocol.Handler(conn=conn, debug=debug)
|
||||
with serve(handler=handler, timeout=timeout) as env:
|
||||
return server.run_process(command=command, environ=env)
|
||||
except KeyboardInterrupt:
|
||||
log.info('server stopped')
|
||||
|
||||
|
||||
def handle_connection_error(func):
|
||||
"""Fail with non-zero exit code."""
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except IOError as e:
|
||||
log.error('Connection error (try unplugging and replugging your device): %s', e)
|
||||
return 1
|
||||
return wrapper
|
||||
|
||||
|
||||
def parse_config(contents):
|
||||
"""Parse config file into a list of Identity objects."""
|
||||
for identity_str, curve_name in re.findall(r'\<(.*?)\|(.*?)\>', contents):
|
||||
yield device.interface.Identity(identity_str=identity_str,
|
||||
curve_name=curve_name)
|
||||
|
||||
|
||||
def import_public_keys(contents):
|
||||
"""Load (previously exported) SSH public keys from a file's contents."""
|
||||
for line in io.StringIO(contents):
|
||||
# Verify this line represents valid SSH public key
|
||||
formats.import_public_key(line)
|
||||
yield line
|
||||
|
||||
|
||||
class JustInTimeConnection(object):
|
||||
"""Connect to the device just before the needed operation."""
|
||||
|
||||
def __init__(self, conn_factory, identities, public_keys=None):
|
||||
"""Create a JIT connection object."""
|
||||
self.conn_factory = conn_factory
|
||||
self.identities = identities
|
||||
self.public_keys_cache = public_keys
|
||||
|
||||
def public_keys(self):
|
||||
"""Return a list of SSH public keys (in textual format)."""
|
||||
if not self.public_keys_cache:
|
||||
conn = self.conn_factory()
|
||||
self.public_keys_cache = conn.export_public_keys(self.identities)
|
||||
return self.public_keys_cache
|
||||
|
||||
def parse_public_keys(self):
|
||||
"""Parse SSH public keys into dictionaries."""
|
||||
public_keys = [formats.import_public_key(pk)
|
||||
for pk in self.public_keys()]
|
||||
for pk, identity in zip(public_keys, self.identities):
|
||||
pk['identity'] = identity
|
||||
return public_keys
|
||||
|
||||
def sign(self, blob, identity):
|
||||
"""Sign a given blob using the specified identity on the device."""
|
||||
conn = self.conn_factory()
|
||||
return conn.sign_ssh_challenge(blob=blob, identity=identity)
|
||||
|
||||
|
||||
@handle_connection_error
|
||||
def main(device_type):
|
||||
"""Run ssh-agent using given hardware client factory."""
|
||||
args = create_agent_parser().parse_args()
|
||||
util.setup_logging(verbosity=args.verbose)
|
||||
|
||||
public_keys = None
|
||||
if args.identity.startswith('/'):
|
||||
filename = args.identity
|
||||
contents = open(filename, 'rb').read().decode('utf-8')
|
||||
# Allow loading previously exported SSH public keys
|
||||
if filename.endswith('.pub'):
|
||||
public_keys = list(import_public_keys(contents))
|
||||
identities = list(parse_config(contents))
|
||||
else:
|
||||
identities = [device.interface.Identity(
|
||||
identity_str=args.identity, curve_name=args.ecdsa_curve_name)]
|
||||
for index, identity in enumerate(identities):
|
||||
identity.identity_dict['proto'] = u'ssh'
|
||||
log.info('identity #%d: %s', index, identity)
|
||||
|
||||
if args.connect:
|
||||
command = ['ssh'] + ssh_args(args.identity) + args.command
|
||||
elif args.mosh:
|
||||
command = ['mosh'] + mosh_args(args.identity) + args.command
|
||||
else:
|
||||
command = args.command
|
||||
|
||||
use_shell = bool(args.shell)
|
||||
if use_shell:
|
||||
command = os.environ['SHELL']
|
||||
sys.stdin.close()
|
||||
|
||||
conn = JustInTimeConnection(
|
||||
conn_factory=lambda: client.Client(device_type()),
|
||||
identities=identities, public_keys=public_keys)
|
||||
if command:
|
||||
return run_server(conn=conn, command=command, debug=args.debug,
|
||||
timeout=args.timeout)
|
||||
else:
|
||||
for pk in conn.public_keys():
|
||||
sys.stdout.write(pk)
|
||||
@@ -18,15 +18,18 @@ class Client(object):
|
||||
"""Connect to hardware device."""
|
||||
self.device = device
|
||||
|
||||
def get_public_key(self, identity):
|
||||
"""Get SSH public key from the device."""
|
||||
def export_public_keys(self, identities):
|
||||
"""Export SSH public keys from the device."""
|
||||
public_keys = []
|
||||
with self.device:
|
||||
pubkey = self.device.pubkey(identity)
|
||||
|
||||
vk = formats.decompress_pubkey(pubkey=pubkey,
|
||||
curve_name=identity.curve_name)
|
||||
return formats.export_public_key(vk=vk,
|
||||
label=str(identity))
|
||||
for i in identities:
|
||||
pubkey = self.device.pubkey(identity=i)
|
||||
vk = formats.decompress_pubkey(pubkey=pubkey,
|
||||
curve_name=i.curve_name)
|
||||
public_key = formats.export_public_key(vk=vk,
|
||||
label=i.to_string())
|
||||
public_keys.append(public_key)
|
||||
return public_keys
|
||||
|
||||
def sign_ssh_challenge(self, blob, identity):
|
||||
"""Sign given blob using a private key on the device."""
|
||||
@@ -62,7 +62,9 @@ def failure():
|
||||
|
||||
def _legacy_pubs(buf):
|
||||
"""SSH v1 public keys are not supported."""
|
||||
assert not buf.read()
|
||||
leftover = buf.read()
|
||||
if leftover:
|
||||
log.warning('skipping leftover: %r', leftover)
|
||||
code = util.pack('B', msg_code('SSH_AGENT_RSA_IDENTITIES_ANSWER'))
|
||||
num = util.pack('L', 0) # no SSH v1 keys
|
||||
return util.frame(code, num)
|
||||
@@ -71,14 +73,13 @@ def _legacy_pubs(buf):
|
||||
class Handler(object):
|
||||
"""ssh-agent protocol handler."""
|
||||
|
||||
def __init__(self, keys, signer, debug=False):
|
||||
def __init__(self, conn, debug=False):
|
||||
"""
|
||||
Create a protocol handler with specified public keys.
|
||||
|
||||
Use specified signer function to sign SSH authentication requests.
|
||||
"""
|
||||
self.public_keys = keys
|
||||
self.signer = signer
|
||||
self.conn = conn
|
||||
self.debug = debug
|
||||
|
||||
self.methods = {
|
||||
@@ -107,7 +108,7 @@ class Handler(object):
|
||||
def list_pubs(self, buf):
|
||||
"""SSH v2 public keys are serialized and returned."""
|
||||
assert not buf.read()
|
||||
keys = self.public_keys
|
||||
keys = self.conn.parse_public_keys()
|
||||
code = util.pack('B', msg_code('SSH2_AGENT_IDENTITIES_ANSWER'))
|
||||
num = util.pack('L', len(keys))
|
||||
log.debug('available keys: %s', [k['name'] for k in keys])
|
||||
@@ -129,7 +130,7 @@ class Handler(object):
|
||||
assert util.read_frame(buf) == b''
|
||||
assert not buf.read()
|
||||
|
||||
for k in self.public_keys:
|
||||
for k in self.conn.parse_public_keys():
|
||||
if (k['fingerprint']) == (key['fingerprint']):
|
||||
log.debug('using key %r (%s)', k['name'], k['fingerprint'])
|
||||
key = k
|
||||
@@ -137,10 +138,10 @@ class Handler(object):
|
||||
else:
|
||||
raise KeyError('key not found')
|
||||
|
||||
label = key['name'].decode('ascii') # label should be a string
|
||||
label = key['name'].decode('utf-8')
|
||||
log.debug('signing %d-byte blob with "%s" key', len(blob), label)
|
||||
try:
|
||||
signature = self.signer(blob=blob, identity=key['identity'])
|
||||
signature = self.conn.sign(blob=blob, identity=key['identity'])
|
||||
except IOError:
|
||||
return failure()
|
||||
log.debug('signature: %r', signature)
|
||||
@@ -49,7 +49,7 @@ def test_ssh_agent():
|
||||
identity = device.interface.Identity(identity_str='localhost:22',
|
||||
curve_name=CURVE)
|
||||
c = client.Client(device=MockDevice())
|
||||
assert c.get_public_key(identity) == PUBKEY_TEXT
|
||||
assert c.export_public_keys([identity]) == [PUBKEY_TEXT]
|
||||
signature = c.sign_ssh_challenge(blob=BLOB, identity=identity)
|
||||
|
||||
key = formats.import_public_key(PUBKEY_TEXT)
|
||||
@@ -1,3 +1,4 @@
|
||||
import mock
|
||||
import pytest
|
||||
|
||||
from .. import device, formats, protocol
|
||||
@@ -15,22 +16,36 @@ NIST256_SIGN_MSG = b'\r\x00\x00\x00h\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\
|
||||
NIST256_SIGN_REPLY = b'\x00\x00\x00j\x0e\x00\x00\x00e\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00J\x00\x00\x00!\x00\x88G!\x0c\n\x16:\xbeF\xbe\xb9\xd2\xa9&e\x89\xad\xc4}\x10\xf8\xbc\xdc\xef\x0e\x8d_\x8a6.\xb6\x1f\x00\x00\x00!\x00q\xf0\x16>,\x9a\xde\xe7(\xd6\xd7\x93\x1f\xed\xf9\x94ddw\xfe\xbdq\x13\xbb\xfc\xa9K\xea\x9dC\xa1\xe9' # nopep8
|
||||
|
||||
|
||||
def fake_connection(keys, signer):
|
||||
c = mock.Mock()
|
||||
c.parse_public_keys.return_value = keys
|
||||
c.sign = signer
|
||||
return c
|
||||
|
||||
|
||||
def test_list():
|
||||
key = formats.import_public_key(NIST256_KEY)
|
||||
key['identity'] = device.interface.Identity('ssh://localhost', 'nist256p1')
|
||||
h = protocol.Handler(keys=[key], signer=None)
|
||||
h = protocol.Handler(fake_connection(keys=[key], signer=None))
|
||||
reply = h.handle(LIST_MSG)
|
||||
assert reply == LIST_NIST256_REPLY
|
||||
|
||||
|
||||
def test_list_legacy_pubs_with_suffix():
|
||||
h = protocol.Handler(fake_connection(keys=[], signer=None))
|
||||
suffix = b'\x00\x00\x00\x06foobar'
|
||||
reply = h.handle(b'\x01' + suffix)
|
||||
assert reply == b'\x00\x00\x00\x05\x02\x00\x00\x00\x00' # no legacy keys
|
||||
|
||||
|
||||
def test_unsupported():
|
||||
h = protocol.Handler(keys=[], signer=None)
|
||||
h = protocol.Handler(fake_connection(keys=[], signer=None))
|
||||
reply = h.handle(b'\x09')
|
||||
assert reply == b'\x00\x00\x00\x01\x05'
|
||||
|
||||
|
||||
def ecdsa_signer(identity, blob):
|
||||
assert str(identity) == '<ssh://localhost|nist256p1>'
|
||||
assert identity.to_string() == '<ssh://localhost|nist256p1>'
|
||||
assert blob == NIST256_BLOB
|
||||
return NIST256_SIG
|
||||
|
||||
@@ -38,26 +53,26 @@ def ecdsa_signer(identity, blob):
|
||||
def test_ecdsa_sign():
|
||||
key = formats.import_public_key(NIST256_KEY)
|
||||
key['identity'] = device.interface.Identity('ssh://localhost', 'nist256p1')
|
||||
h = protocol.Handler(keys=[key], signer=ecdsa_signer)
|
||||
h = protocol.Handler(fake_connection(keys=[key], signer=ecdsa_signer))
|
||||
reply = h.handle(NIST256_SIGN_MSG)
|
||||
assert reply == NIST256_SIGN_REPLY
|
||||
|
||||
|
||||
def test_sign_missing():
|
||||
h = protocol.Handler(keys=[], signer=ecdsa_signer)
|
||||
h = protocol.Handler(fake_connection(keys=[], signer=ecdsa_signer))
|
||||
with pytest.raises(KeyError):
|
||||
h.handle(NIST256_SIGN_MSG)
|
||||
|
||||
|
||||
def test_sign_wrong():
|
||||
def wrong_signature(identity, blob):
|
||||
assert str(identity) == '<ssh://localhost|nist256p1>'
|
||||
assert identity.to_string() == '<ssh://localhost|nist256p1>'
|
||||
assert blob == NIST256_BLOB
|
||||
return b'\x00' * 64
|
||||
|
||||
key = formats.import_public_key(NIST256_KEY)
|
||||
key['identity'] = device.interface.Identity('ssh://localhost', 'nist256p1')
|
||||
h = protocol.Handler(keys=[key], signer=wrong_signature)
|
||||
h = protocol.Handler(fake_connection(keys=[key], signer=wrong_signature))
|
||||
with pytest.raises(ValueError):
|
||||
h.handle(NIST256_SIGN_MSG)
|
||||
|
||||
@@ -68,7 +83,7 @@ def test_sign_cancel():
|
||||
|
||||
key = formats.import_public_key(NIST256_KEY)
|
||||
key['identity'] = device.interface.Identity('ssh://localhost', 'nist256p1')
|
||||
h = protocol.Handler(keys=[key], signer=cancel_signature)
|
||||
h = protocol.Handler(fake_connection(keys=[key], signer=cancel_signature))
|
||||
assert h.handle(NIST256_SIGN_MSG) == protocol.failure()
|
||||
|
||||
|
||||
@@ -81,7 +96,7 @@ ED25519_SIG = b'''\x8eb)\xa6\xe9P\x83VE\xfbq\xc6\xbf\x1dV3\xe3<O\x11\xc0\xfa\xe4
|
||||
|
||||
|
||||
def ed25519_signer(identity, blob):
|
||||
assert str(identity) == '<ssh://localhost|ed25519>'
|
||||
assert identity.to_string() == '<ssh://localhost|ed25519>'
|
||||
assert blob == ED25519_BLOB
|
||||
return ED25519_SIG
|
||||
|
||||
@@ -89,6 +104,6 @@ def ed25519_signer(identity, blob):
|
||||
def test_ed25519_sign():
|
||||
key = formats.import_public_key(ED25519_KEY)
|
||||
key['identity'] = device.interface.Identity('ssh://localhost', 'ed25519')
|
||||
h = protocol.Handler(keys=[key], signer=ed25519_signer)
|
||||
h = protocol.Handler(fake_connection(keys=[key], signer=ed25519_signer))
|
||||
reply = h.handle(ED25519_SIGN_MSG)
|
||||
assert reply == ED25519_SIGN_REPLY
|
||||
1
libagent/tests/__init__.py
Normal file
1
libagent/tests/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Unit-tests for this package."""
|
||||
7
libagent/tests/test_interface.py
Normal file
7
libagent/tests/test_interface.py
Normal file
@@ -0,0 +1,7 @@
|
||||
from ..device import interface
|
||||
|
||||
|
||||
def test_unicode():
|
||||
i = interface.Identity(u'ko\u017eu\u0161\u010dek@host', 'ed25519')
|
||||
assert i.to_bytes() == b'kozuscek@host'
|
||||
assert sorted(i.items()) == [('host', 'host'), ('user', 'kozuscek')]
|
||||
@@ -8,7 +8,8 @@ import threading
|
||||
import mock
|
||||
import pytest
|
||||
|
||||
from .. import protocol, server, util
|
||||
from .. import server, util
|
||||
from ..ssh import protocol
|
||||
|
||||
|
||||
def test_socket():
|
||||
@@ -37,10 +38,16 @@ class FakeSocket(object):
|
||||
pass
|
||||
|
||||
|
||||
def empty_device():
|
||||
c = mock.Mock(spec=['parse_public_keys'])
|
||||
c.parse_public_keys.return_value = []
|
||||
return c
|
||||
|
||||
|
||||
def test_handle():
|
||||
mutex = threading.Lock()
|
||||
|
||||
handler = protocol.Handler(keys=[], signer=None)
|
||||
handler = protocol.Handler(conn=empty_device())
|
||||
conn = FakeSocket()
|
||||
server.handle_connection(conn, handler, mutex)
|
||||
|
||||
@@ -67,7 +74,6 @@ def test_handle():
|
||||
|
||||
|
||||
def test_server_thread():
|
||||
|
||||
connections = [FakeSocket()]
|
||||
quit_event = threading.Event()
|
||||
|
||||
@@ -81,8 +87,10 @@ def test_server_thread():
|
||||
def getsockname(self): # pylint: disable=no-self-use
|
||||
return 'fake_server'
|
||||
|
||||
handler = protocol.Handler(keys=[], signer=None),
|
||||
handle_conn = functools.partial(server.handle_connection, handler=handler)
|
||||
handler = protocol.Handler(conn=empty_device()),
|
||||
handle_conn = functools.partial(server.handle_connection,
|
||||
handler=handler,
|
||||
mutex=None)
|
||||
server.server_thread(sock=FakeServer(),
|
||||
handle_conn=handle_conn,
|
||||
quit_event=quit_event)
|
||||
@@ -110,12 +118,6 @@ def test_run():
|
||||
server.run_process([''], environ={})
|
||||
|
||||
|
||||
def test_serve_main():
|
||||
handler = protocol.Handler(keys=[], signer=None)
|
||||
with server.serve(handler=handler, sock_path=None):
|
||||
pass
|
||||
|
||||
|
||||
def test_remove():
|
||||
path = 'foo.bar'
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import io
|
||||
|
||||
import mock
|
||||
import pytest
|
||||
|
||||
from .. import util
|
||||
@@ -100,4 +101,17 @@ def test_reader():
|
||||
|
||||
|
||||
def test_setup_logging():
|
||||
util.setup_logging(verbosity=10)
|
||||
util.setup_logging(verbosity=10, filename='/dev/null')
|
||||
|
||||
|
||||
def test_memoize():
|
||||
f = mock.Mock(side_effect=lambda x: x)
|
||||
|
||||
def func(x):
|
||||
# mock.Mock doesn't work with functools.wraps()
|
||||
return f(x)
|
||||
|
||||
g = util.memoize(func)
|
||||
assert g(1) == g(1)
|
||||
assert g(1) != g(2)
|
||||
assert f.mock_calls == [mock.call(1), mock.call(2)]
|
||||
@@ -1,6 +1,7 @@
|
||||
"""Various I/O and serialization utilities."""
|
||||
import binascii
|
||||
import contextlib
|
||||
import functools
|
||||
import io
|
||||
import logging
|
||||
import struct
|
||||
@@ -178,10 +179,37 @@ class Reader(object):
|
||||
self._captured = None
|
||||
|
||||
|
||||
def setup_logging(verbosity, **kwargs):
|
||||
def setup_logging(verbosity, filename=None):
|
||||
"""Configure logging for this tool."""
|
||||
fmt = ('%(asctime)s %(levelname)-12s %(message)-100s '
|
||||
'[%(filename)s:%(lineno)d]')
|
||||
levels = [logging.WARNING, logging.INFO, logging.DEBUG]
|
||||
level = levels[min(verbosity, len(levels) - 1)]
|
||||
logging.basicConfig(format=fmt, level=level, **kwargs)
|
||||
logging.root.setLevel(level)
|
||||
|
||||
fmt = logging.Formatter('%(asctime)s %(levelname)-12s %(message)-100s '
|
||||
'[%(filename)s:%(lineno)d]')
|
||||
hdlr = logging.StreamHandler() # stderr
|
||||
hdlr.setFormatter(fmt)
|
||||
logging.root.addHandler(hdlr)
|
||||
|
||||
if filename:
|
||||
hdlr = logging.FileHandler(filename, 'a')
|
||||
hdlr.setFormatter(fmt)
|
||||
logging.root.addHandler(hdlr)
|
||||
|
||||
|
||||
def memoize(func):
|
||||
"""Simple caching decorator."""
|
||||
cache = {}
|
||||
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
"""Caching wrapper."""
|
||||
key = (args, tuple(sorted(kwargs.items())))
|
||||
if key in cache:
|
||||
return cache[key]
|
||||
else:
|
||||
result = func(*args, **kwargs)
|
||||
cache[key] = result
|
||||
return result
|
||||
|
||||
return wrapper
|
||||
@@ -1,34 +1,58 @@
|
||||
#!/bin/bash
|
||||
set -eu
|
||||
|
||||
gpg2 --version >/dev/null # verify that GnuPG 2 is installed
|
||||
|
||||
USER_ID="${1}"
|
||||
HOMEDIR=~/.gnupg/trezor
|
||||
DEVICE=${DEVICE:="trezor"} # or "ledger"
|
||||
CURVE=${CURVE:="nist256p1"} # or "ed25519"
|
||||
TIMESTAMP=${TIMESTAMP:=`date +%s`} # key creation timestamp
|
||||
GPG_BINARY=${GPG_BINARY:="gpg2"} # starting from GnuPG 2.2, gpg2 -> gpg
|
||||
HOMEDIR=~/.gnupg/${DEVICE}
|
||||
|
||||
# Prepare new GPG home directory for TREZOR-based identity
|
||||
${GPG_BINARY} --version # verify that GnuPG 2.1+ is installed
|
||||
|
||||
# Prepare new GPG home directory for hardware-based identity
|
||||
rm -rf "${HOMEDIR}"
|
||||
mkdir -p "${HOMEDIR}"
|
||||
chmod 700 "${HOMEDIR}"
|
||||
|
||||
# Generate new GPG identity and import into GPG keyring
|
||||
trezor-gpg-create -v "${USER_ID}" -t "${TIMESTAMP}" -e "${CURVE}" > "${HOMEDIR}/pubkey.asc"
|
||||
gpg2 --homedir "${HOMEDIR}" --import < "${HOMEDIR}/pubkey.asc"
|
||||
$DEVICE-gpg create -v "${USER_ID}" -t "${TIMESTAMP}" -e "${CURVE}" > "${HOMEDIR}/pubkey.asc"
|
||||
${GPG_BINARY} --homedir "${HOMEDIR}" --import < "${HOMEDIR}/pubkey.asc" 2> /dev/null
|
||||
rm -f "${HOMEDIR}/S.gpg-agent" # (otherwise, our agent won't be started automatically)
|
||||
|
||||
# Make new GPG identity with "ultimate" trust (via its fingerprint)
|
||||
FINGERPRINT=$(gpg2 --homedir "${HOMEDIR}" --list-public-keys --with-colons | sed -n -E 's/^fpr:::::::::([0-9A-F]+):$/\1/p' | head -n1)
|
||||
echo "${FINGERPRINT}:6" | gpg2 --homedir "${HOMEDIR}" --import-ownertrust
|
||||
FINGERPRINT=$(${GPG_BINARY} --homedir "${HOMEDIR}" --list-public-keys --with-fingerprint --with-colons | sed -n -E 's/^fpr:::::::::([0-9A-F]+):$/\1/p' | head -n1)
|
||||
echo "${FINGERPRINT}:6" | ${GPG_BINARY} --homedir "${HOMEDIR}" --import-ownertrust 2> /dev/null
|
||||
|
||||
AGENT_PATH="$(which ${DEVICE}-gpg-agent)"
|
||||
|
||||
# Prepare GPG configuration file
|
||||
echo "# TREZOR-based GPG configuration
|
||||
agent-program $(which trezor-gpg-agent)
|
||||
echo "# Hardware-based GPG configuration
|
||||
agent-program ${AGENT_PATH}
|
||||
personal-digest-preferences SHA512
|
||||
" | tee "${HOMEDIR}/gpg.conf"
|
||||
default-key \"${USER_ID}\"
|
||||
" > "${HOMEDIR}/gpg.conf"
|
||||
|
||||
echo "# TREZOR-based GPG agent emulator
|
||||
# Prepare GPG agent configuration file
|
||||
echo "# Hardware-based GPG agent emulator
|
||||
log-file ${HOMEDIR}/gpg-agent.log
|
||||
verbosity 2
|
||||
" | tee "${HOMEDIR}/gpg-agent.conf"
|
||||
" > "${HOMEDIR}/gpg-agent.conf"
|
||||
|
||||
# Prepare a helper script for setting up the new identity
|
||||
echo "#!/bin/bash
|
||||
set -eu
|
||||
export GNUPGHOME=${HOMEDIR}
|
||||
COMMAND=\$*
|
||||
if [ -z \"\${COMMAND}\" ]
|
||||
then
|
||||
\${SHELL}
|
||||
else
|
||||
\${COMMAND}
|
||||
fi
|
||||
" > "${HOMEDIR}/env"
|
||||
chmod u+x "${HOMEDIR}/env"
|
||||
|
||||
echo "Starting ${DEVICE}-gpg-agent at ${HOMEDIR}..."
|
||||
# Load agent and make sure it responds with the new identity
|
||||
GNUPGHOME="${HOMEDIR}" ${GPG_BINARY} -K 2> /dev/null
|
||||
|
||||
@@ -1,28 +0,0 @@
|
||||
#!/bin/bash
|
||||
set -eu
|
||||
|
||||
gpg2 --version >/dev/null # verify that GnuPG 2 is installed
|
||||
|
||||
export GNUPGHOME=~/.gnupg/trezor
|
||||
|
||||
CONFIG_PATH="${GNUPGHOME}/gpg-agent.conf"
|
||||
if [ ! -f ${CONFIG_PATH} ]
|
||||
then
|
||||
echo "No configuration found: ${CONFIG_PATH}"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Make sure that the device is unlocked before starting the shell
|
||||
trezor-gpg-unlock
|
||||
|
||||
# Make sure TREZOR-based gpg-agent is running
|
||||
gpg-connect-agent --agent-program "$(which trezor-gpg-agent)" </dev/null
|
||||
|
||||
COMMAND=$*
|
||||
if [ -z "${COMMAND}" ]
|
||||
then
|
||||
gpg2 --list-public-keys
|
||||
${SHELL}
|
||||
else
|
||||
${COMMAND}
|
||||
fi
|
||||
27
setup.py
Normal file → Executable file
27
setup.py
Normal file → Executable file
@@ -2,17 +2,23 @@
|
||||
from setuptools import setup
|
||||
|
||||
setup(
|
||||
name='trezor_agent',
|
||||
version='0.8.1',
|
||||
description='Using Trezor as hardware SSH agent',
|
||||
name='libagent',
|
||||
version='0.9.3',
|
||||
description='Using hardware wallets as SSH/GPG agent',
|
||||
author='Roman Zeyde',
|
||||
author_email='roman.zeyde@gmail.com',
|
||||
url='http://github.com/romanz/trezor-agent',
|
||||
packages=['trezor_agent', 'trezor_agent.device', 'trezor_agent.gpg'],
|
||||
packages=[
|
||||
'libagent',
|
||||
'libagent.device',
|
||||
'libagent.gpg',
|
||||
'libagent.ssh'
|
||||
],
|
||||
install_requires=[
|
||||
'ecdsa>=0.13', 'ed25519>=1.4', 'Cython>=0.23.4', 'protobuf>=3.0.0', 'semver>=2.2',
|
||||
'trezor>=0.7.6', 'keepkey>=0.7.3', 'ledgerblue>=0.1.8',
|
||||
'hidapi==0.7.99.post15' # until https://github.com/keepkey/python-keepkey/pull/8 is merged
|
||||
'ecdsa>=0.13',
|
||||
'ed25519>=1.4',
|
||||
'semver>=2.2',
|
||||
'unidecode>=0.4.20',
|
||||
],
|
||||
platforms=['POSIX'],
|
||||
classifiers=[
|
||||
@@ -26,16 +32,11 @@ setup(
|
||||
'Programming Language :: Python :: 2.7',
|
||||
'Programming Language :: Python :: 3.4',
|
||||
'Programming Language :: Python :: 3.5',
|
||||
'Programming Language :: Python :: 3.6',
|
||||
'Topic :: Software Development :: Libraries :: Python Modules',
|
||||
'Topic :: System :: Networking',
|
||||
'Topic :: Communications',
|
||||
'Topic :: Security',
|
||||
'Topic :: Utilities',
|
||||
],
|
||||
entry_points={'console_scripts': [
|
||||
'trezor-agent = trezor_agent.__main__:run_agent',
|
||||
'trezor-gpg-create = trezor_agent.gpg.__main__:main_create',
|
||||
'trezor-gpg-agent = trezor_agent.gpg.__main__:main_agent',
|
||||
'trezor-gpg-unlock = trezor_agent.gpg.__main__:auto_unlock',
|
||||
]},
|
||||
)
|
||||
|
||||
12
tox.ini
12
tox.ini
@@ -2,6 +2,8 @@
|
||||
envlist = py27,py3
|
||||
[pep8]
|
||||
max-line-length = 100
|
||||
[pep257]
|
||||
add-ignore = D401
|
||||
[testenv]
|
||||
deps=
|
||||
pytest
|
||||
@@ -11,10 +13,12 @@ deps=
|
||||
pylint
|
||||
semver
|
||||
pydocstyle
|
||||
isort
|
||||
commands=
|
||||
pep8 trezor_agent
|
||||
pylint --reports=no --rcfile .pylintrc trezor_agent
|
||||
pydocstyle trezor_agent
|
||||
coverage run --omit='trezor_agent/__main__.py' --source trezor_agent -m py.test -v trezor_agent
|
||||
pep8 libagent
|
||||
isort --skip-glob .tox -c -r libagent
|
||||
pylint --reports=no --rcfile .pylintrc libagent
|
||||
pydocstyle libagent
|
||||
coverage run --source libagent -m py.test -v libagent
|
||||
coverage report
|
||||
coverage html
|
||||
|
||||
@@ -1,168 +0,0 @@
|
||||
"""SSH-agent implementation using hardware authentication devices."""
|
||||
import argparse
|
||||
import functools
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
from . import client, device, formats, protocol, server, util
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def ssh_args(label):
|
||||
"""Create SSH command for connecting specified server."""
|
||||
identity = device.interface.string_to_identity(label)
|
||||
|
||||
args = []
|
||||
if 'port' in identity:
|
||||
args += ['-p', identity['port']]
|
||||
if 'user' in identity:
|
||||
args += ['-l', identity['user']]
|
||||
|
||||
return args + [identity['host']]
|
||||
|
||||
|
||||
def create_parser():
|
||||
"""Create argparse.ArgumentParser for this tool."""
|
||||
p = argparse.ArgumentParser()
|
||||
p.add_argument('-v', '--verbose', default=0, action='count')
|
||||
|
||||
curve_names = [name for name in formats.SUPPORTED_CURVES]
|
||||
curve_names = ', '.join(sorted(curve_names))
|
||||
p.add_argument('-e', '--ecdsa-curve-name', metavar='CURVE',
|
||||
default=formats.CURVE_NIST256,
|
||||
help='specify ECDSA curve name: ' + curve_names)
|
||||
p.add_argument('--timeout',
|
||||
default=server.UNIX_SOCKET_TIMEOUT, type=float,
|
||||
help='Timeout for accepting SSH client connections')
|
||||
p.add_argument('--debug', default=False, action='store_true',
|
||||
help='Log SSH protocol messages for debugging.')
|
||||
return p
|
||||
|
||||
|
||||
def create_agent_parser():
|
||||
"""Specific parser for SSH connection."""
|
||||
p = create_parser()
|
||||
|
||||
g = p.add_mutually_exclusive_group()
|
||||
g.add_argument('-s', '--shell', default=False, action='store_true',
|
||||
help='run ${SHELL} as subprocess under SSH agent')
|
||||
g.add_argument('-c', '--connect', default=False, action='store_true',
|
||||
help='connect to specified host via SSH')
|
||||
g.add_argument('--mosh', default=False, action='store_true',
|
||||
help='connect to specified host via using Mosh')
|
||||
|
||||
p.add_argument('identity', type=str, default=None,
|
||||
help='proto://[user@]host[:port][/path]')
|
||||
p.add_argument('command', type=str, nargs='*', metavar='ARGUMENT',
|
||||
help='command to run under the SSH agent')
|
||||
return p
|
||||
|
||||
|
||||
def create_git_parser():
|
||||
"""Specific parser for git commands."""
|
||||
p = create_parser()
|
||||
|
||||
p.add_argument('-r', '--remote', default='origin',
|
||||
help='use this git remote URL to generate SSH identity')
|
||||
p.add_argument('-t', '--test', action='store_true',
|
||||
help='test connection using `ssh -T user@host` command')
|
||||
p.add_argument('command', type=str, nargs='*', metavar='ARGUMENT',
|
||||
help='Git command to run under the SSH agent')
|
||||
return p
|
||||
|
||||
|
||||
def git_host(remote_name, attributes):
|
||||
"""Extract git SSH host for specified remote name."""
|
||||
try:
|
||||
output = subprocess.check_output('git config --local --list'.split())
|
||||
except subprocess.CalledProcessError:
|
||||
return
|
||||
|
||||
for attribute in attributes:
|
||||
name = r'remote.{0}.{1}'.format(remote_name, attribute)
|
||||
matches = re.findall(re.escape(name) + '=(.*)', output)
|
||||
log.debug('%r: %r', name, matches)
|
||||
if not matches:
|
||||
continue
|
||||
|
||||
url = matches[0].strip()
|
||||
match = re.match('(?P<user>.*?)@(?P<host>.*?):(?P<path>.*)', url)
|
||||
if match:
|
||||
return '{user}@{host}'.format(**match.groupdict())
|
||||
|
||||
|
||||
def run_server(conn, public_keys, command, debug, timeout):
|
||||
"""Common code for run_agent and run_git below."""
|
||||
try:
|
||||
signer = conn.sign_ssh_challenge
|
||||
handler = protocol.Handler(keys=public_keys, signer=signer,
|
||||
debug=debug)
|
||||
with server.serve(handler=handler, timeout=timeout) as env:
|
||||
return server.run_process(command=command, environ=env)
|
||||
except KeyboardInterrupt:
|
||||
log.info('server stopped')
|
||||
|
||||
|
||||
def handle_connection_error(func):
|
||||
"""Fail with non-zero exit code."""
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except IOError as e:
|
||||
log.error('Connection error: %s', e)
|
||||
return 1
|
||||
return wrapper
|
||||
|
||||
|
||||
def parse_config(fname):
|
||||
"""Parse config file into a list of Identity objects."""
|
||||
contents = open(fname).read()
|
||||
for identity_str, curve_name in re.findall(r'\<(.*?)\|(.*?)\>', contents):
|
||||
yield device.interface.Identity(identity_str=identity_str,
|
||||
curve_name=curve_name)
|
||||
|
||||
|
||||
@handle_connection_error
|
||||
def run_agent(client_factory=client.Client):
|
||||
"""Run ssh-agent using given hardware client factory."""
|
||||
args = create_agent_parser().parse_args()
|
||||
util.setup_logging(verbosity=args.verbose)
|
||||
|
||||
conn = client_factory(device=device.detect())
|
||||
if args.identity.startswith('/'):
|
||||
identities = list(parse_config(fname=args.identity))
|
||||
else:
|
||||
identities = [device.interface.Identity(
|
||||
identity_str=args.identity, curve_name=args.ecdsa_curve_name)]
|
||||
for index, identity in enumerate(identities):
|
||||
identity.identity_dict['proto'] = 'ssh'
|
||||
log.info('identity #%d: %s', index, identity)
|
||||
|
||||
public_keys = [conn.get_public_key(i) for i in identities]
|
||||
|
||||
if args.connect:
|
||||
command = ['ssh'] + ssh_args(args.identity) + args.command
|
||||
elif args.mosh:
|
||||
command = ['mosh'] + ssh_args(args.identity) + args.command
|
||||
else:
|
||||
command = args.command
|
||||
|
||||
use_shell = bool(args.shell)
|
||||
if use_shell:
|
||||
command = os.environ['SHELL']
|
||||
|
||||
if not command:
|
||||
for pk in public_keys:
|
||||
sys.stdout.write(pk)
|
||||
return
|
||||
|
||||
public_keys = [formats.import_public_key(pk) for pk in public_keys]
|
||||
for pk, identity in zip(public_keys, identities):
|
||||
pk['identity'] = identity
|
||||
return run_server(conn=conn, public_keys=public_keys, command=command,
|
||||
debug=args.debug, timeout=args.timeout)
|
||||
@@ -1,27 +0,0 @@
|
||||
"""Cryptographic hardware device management."""
|
||||
|
||||
import logging
|
||||
|
||||
from . import trezor
|
||||
from . import keepkey
|
||||
from . import ledger
|
||||
from . import interface
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
DEVICE_TYPES = [
|
||||
trezor.Trezor,
|
||||
keepkey.KeepKey,
|
||||
ledger.LedgerNanoS,
|
||||
]
|
||||
|
||||
|
||||
def detect():
|
||||
"""Detect the first available device and return it to the user."""
|
||||
for device_type in DEVICE_TYPES:
|
||||
try:
|
||||
with device_type() as d:
|
||||
return d
|
||||
except interface.NotFoundError as e:
|
||||
log.debug('device not found: %s', e)
|
||||
raise IOError('No device found!')
|
||||
@@ -1,8 +0,0 @@
|
||||
"""KeepKey-related definitions."""
|
||||
|
||||
# pylint: disable=unused-import
|
||||
from keepkeylib.client import KeepKeyClient as Client
|
||||
from keepkeylib.client import CallException
|
||||
from keepkeylib.transport_hid import HidTransport
|
||||
from keepkeylib.messages_pb2 import PassphraseAck
|
||||
from keepkeylib.types_pb2 import IdentityType
|
||||
@@ -1,8 +0,0 @@
|
||||
"""TREZOR-related definitions."""
|
||||
|
||||
# pylint: disable=unused-import
|
||||
from trezorlib.client import TrezorClient as Client
|
||||
from trezorlib.client import CallException
|
||||
from trezorlib.transport_hid import HidTransport
|
||||
from trezorlib.messages_pb2 import PassphraseAck
|
||||
from trezorlib.types_pb2 import IdentityType
|
||||
@@ -1,9 +0,0 @@
|
||||
"""
|
||||
TREZOR support for ECDSA GPG signatures.
|
||||
|
||||
See these links for more details:
|
||||
- https://www.gnupg.org/faq/whats-new-in-2.1.html
|
||||
- https://tools.ietf.org/html/rfc4880
|
||||
- https://tools.ietf.org/html/rfc6637
|
||||
- https://tools.ietf.org/html/draft-irtf-cfrg-eddsa-05
|
||||
"""
|
||||
@@ -1,152 +0,0 @@
|
||||
"""GPG-agent utilities."""
|
||||
import binascii
|
||||
import logging
|
||||
|
||||
from . import decode, client, keyring, protocol
|
||||
from .. import util
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def yield_connections(sock):
|
||||
"""Run a server on the specified socket."""
|
||||
while True:
|
||||
log.debug('waiting for connection on %s', sock.getsockname())
|
||||
try:
|
||||
conn, _ = sock.accept()
|
||||
except KeyboardInterrupt:
|
||||
return
|
||||
conn.settimeout(None)
|
||||
log.debug('accepted connection on %s', sock.getsockname())
|
||||
yield conn
|
||||
|
||||
|
||||
def serialize(data):
|
||||
"""Serialize data according to ASSUAN protocol."""
|
||||
for c in [b'%', b'\n', b'\r']:
|
||||
escaped = '%{:02X}'.format(ord(c)).encode('ascii')
|
||||
data = data.replace(c, escaped)
|
||||
return data
|
||||
|
||||
|
||||
def sig_encode(r, s):
|
||||
"""Serialize ECDSA signature data into GPG S-expression."""
|
||||
r = serialize(util.num2bytes(r, 32))
|
||||
s = serialize(util.num2bytes(s, 32))
|
||||
return b'(7:sig-val(5:ecdsa(1:r32:' + r + b')(1:s32:' + s + b')))'
|
||||
|
||||
|
||||
def open_connection(keygrip_bytes):
|
||||
"""
|
||||
Connect to the device for the specified keygrip.
|
||||
|
||||
Parse GPG public key to find the first user ID, which is used to
|
||||
specify the correct signature/decryption key on the device.
|
||||
"""
|
||||
pubkey_dict, user_ids = decode.load_by_keygrip(
|
||||
pubkey_bytes=keyring.export_public_keys(),
|
||||
keygrip=keygrip_bytes)
|
||||
# We assume the first user ID is used to generate TREZOR-based GPG keys.
|
||||
user_id = user_ids[0]['value'].decode('ascii')
|
||||
curve_name = protocol.get_curve_name_by_oid(pubkey_dict['curve_oid'])
|
||||
ecdh = (pubkey_dict['algo'] == protocol.ECDH_ALGO_ID)
|
||||
|
||||
conn = client.Client(user_id, curve_name=curve_name)
|
||||
pubkey = protocol.PublicKey(
|
||||
curve_name=curve_name, created=pubkey_dict['created'],
|
||||
verifying_key=conn.pubkey(ecdh=ecdh), ecdh=ecdh)
|
||||
assert pubkey.key_id() == pubkey_dict['key_id']
|
||||
assert pubkey.keygrip() == keygrip_bytes
|
||||
return conn
|
||||
|
||||
|
||||
def pksign(keygrip, digest, algo):
|
||||
"""Sign a message digest using a private EC key."""
|
||||
log.debug('signing %r digest (algo #%s)', digest, algo)
|
||||
keygrip_bytes = binascii.unhexlify(keygrip)
|
||||
conn = open_connection(keygrip_bytes)
|
||||
r, s = conn.sign(binascii.unhexlify(digest))
|
||||
result = sig_encode(r, s)
|
||||
log.debug('result: %r', result)
|
||||
return result
|
||||
|
||||
|
||||
def _serialize_point(data):
|
||||
prefix = '{}:'.format(len(data)).encode('ascii')
|
||||
# https://www.gnupg.org/documentation/manuals/assuan/Server-responses.html
|
||||
return b'(5:value' + serialize(prefix + data) + b')'
|
||||
|
||||
|
||||
def parse_ecdh(line):
|
||||
"""Parse ECDH request and return remote public key."""
|
||||
prefix, line = line.split(b' ', 1)
|
||||
assert prefix == b'D'
|
||||
exp, leftover = keyring.parse(keyring.unescape(line))
|
||||
log.debug('ECDH s-exp: %r', exp)
|
||||
assert not leftover
|
||||
label, exp = exp
|
||||
assert label == b'enc-val'
|
||||
assert exp[0] == b'ecdh'
|
||||
items = exp[1:]
|
||||
log.debug('ECDH parameters: %r', items)
|
||||
return dict(items)[b'e']
|
||||
|
||||
|
||||
def pkdecrypt(keygrip, conn):
|
||||
"""Handle decryption using ECDH."""
|
||||
for msg in [b'S INQUIRE_MAXLEN 4096', b'INQUIRE CIPHERTEXT']:
|
||||
keyring.sendline(conn, msg)
|
||||
|
||||
line = keyring.recvline(conn)
|
||||
assert keyring.recvline(conn) == b'END'
|
||||
remote_pubkey = parse_ecdh(line)
|
||||
|
||||
keygrip_bytes = binascii.unhexlify(keygrip)
|
||||
conn = open_connection(keygrip_bytes)
|
||||
return _serialize_point(conn.ecdh(remote_pubkey))
|
||||
|
||||
|
||||
def handle_connection(conn):
|
||||
"""Handle connection from GPG binary using the ASSUAN protocol."""
|
||||
keygrip = None
|
||||
digest = None
|
||||
algo = None
|
||||
version = keyring.gpg_version() # "Clone" existing GPG version
|
||||
|
||||
keyring.sendline(conn, b'OK')
|
||||
for line in keyring.iterlines(conn):
|
||||
parts = line.split(b' ')
|
||||
command = parts[0]
|
||||
args = parts[1:]
|
||||
if command in {b'RESET', b'OPTION', b'HAVEKEY', b'SETKEYDESC'}:
|
||||
pass # reply with OK
|
||||
elif command == b'GETINFO':
|
||||
keyring.sendline(conn, b'D ' + version)
|
||||
elif command == b'AGENT_ID':
|
||||
keyring.sendline(conn, b'D TREZOR') # "Fake" agent ID
|
||||
elif command in {b'SIGKEY', b'SETKEY'}:
|
||||
keygrip, = args
|
||||
elif command == b'SETHASH':
|
||||
algo, digest = args
|
||||
elif command == b'PKSIGN':
|
||||
sig = pksign(keygrip, digest, algo)
|
||||
keyring.sendline(conn, b'D ' + sig)
|
||||
elif command == b'PKDECRYPT':
|
||||
sec = pkdecrypt(keygrip, conn)
|
||||
keyring.sendline(conn, b'D ' + sec)
|
||||
elif command == b'KEYINFO':
|
||||
keygrip, = args
|
||||
# Dummy reply (mainly for 'gpg --edit' to succeed).
|
||||
# For details, see GnuPG agent KEYINFO command help.
|
||||
fmt = 'S KEYINFO {0} X - - - - - - -'
|
||||
keyring.sendline(conn, fmt.format(keygrip).encode('ascii'))
|
||||
elif command == b'BYE':
|
||||
return
|
||||
elif command == b'KILLAGENT':
|
||||
keyring.sendline(conn, b'OK')
|
||||
raise StopIteration
|
||||
else:
|
||||
log.error('unknown request: %r', line)
|
||||
return
|
||||
|
||||
keyring.sendline(conn, b'OK')
|
||||
@@ -1,44 +0,0 @@
|
||||
"""Device abstraction layer for GPG operations."""
|
||||
|
||||
import logging
|
||||
|
||||
from .. import device, formats, util
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Client(object):
|
||||
"""Sign messages and get public keys from a hardware device."""
|
||||
|
||||
def __init__(self, user_id, curve_name):
|
||||
"""Connect to the device and retrieve required public key."""
|
||||
self.device = device.detect()
|
||||
self.user_id = user_id
|
||||
self.identity = device.interface.Identity(
|
||||
identity_str='gpg://', curve_name=curve_name)
|
||||
self.identity.identity_dict['host'] = user_id
|
||||
|
||||
def pubkey(self, ecdh=False):
|
||||
"""Return public key as VerifyingKey object."""
|
||||
with self.device:
|
||||
pubkey = self.device.pubkey(ecdh=ecdh, identity=self.identity)
|
||||
return formats.decompress_pubkey(
|
||||
pubkey=pubkey, curve_name=self.identity.curve_name)
|
||||
|
||||
def sign(self, digest):
|
||||
"""Sign the digest and return a serialized signature."""
|
||||
log.info('please confirm GPG signature on %s for "%s"...',
|
||||
self.device, self.user_id)
|
||||
if self.identity.curve_name == formats.CURVE_NIST256:
|
||||
digest = digest[:32] # sign the first 256 bits
|
||||
log.debug('signing digest: %s', util.hexlify(digest))
|
||||
with self.device:
|
||||
sig = self.device.sign(blob=digest, identity=self.identity)
|
||||
return (util.bytes2num(sig[:32]), util.bytes2num(sig[32:]))
|
||||
|
||||
def ecdh(self, pubkey):
|
||||
"""Derive shared secret using ECDH from remote public key."""
|
||||
log.info('please confirm GPG decryption on %s for "%s"...',
|
||||
self.device, self.user_id)
|
||||
with self.device:
|
||||
return self.device.ecdh(pubkey=pubkey, identity=self.identity)
|
||||
Reference in New Issue
Block a user