Compare commits

...

63 Commits

Author SHA1 Message Date
Roman Zeyde
7d9b3ff1d0 README: update device-related info 2017-04-29 20:51:18 +03:00
Roman Zeyde
4af881b3cb Split the package into a shared library and separate per-device packages 2017-04-29 18:34:46 +03:00
Roman Zeyde
eb525e1b62 gpg: simplify Python entry point and refactor Bash scripts a bit
Now there is a single 'trezor-gpg' tool, with various subcommands.
2017-04-26 23:12:09 +03:00
Roman Zeyde
02c8e729b7 ssh: retrieve all keys using a single device session 2017-04-25 20:43:19 +03:00
Roman Zeyde
12359938ad keepkey: fix transport import 2017-04-23 21:27:02 +03:00
Roman Zeyde
93cd3e688b travis: add Python 3.6 2017-04-22 20:19:06 +03:00
Tomás Rojas
26d7dd3124 Cache public keys for the duration of the agent
This saves a lot of time when connecting to multiple hosts
simultaneously (e.g., during a deploy) as every time we are asked to sign a
challenge, all public keys are iterated to find the correct one. This
can become especially slow when using the Bridge transport and/or many
identities are defined.
2017-04-22 14:47:30 +03:00
Tomás Rojas
0d5c3a9ca7 Allow using TREZOR bridge (instead of HID transport) 2017-04-22 11:16:30 +03:00
Roman Zeyde
97ec6b2719 travis: fix dependency 2017-04-22 10:53:39 +03:00
Roman Zeyde
8ba9be1780 fix pylint warnings 2017-04-21 22:56:50 +03:00
Roman Zeyde
b2bc87c0c7 fix pydocstyle warnings 2017-04-21 22:51:11 +03:00
Timothy Hobbs
d522d148ef Connection error is confusing (#105)
Hi,

I ran into this connection error:

````
> trezor-agent  timothy@localhost
2017-04-10 00:22:01,818 ERROR        Connection error: open failed                                                                        [__main__.py:130]
````

I didn't know what was going on, whether there was a problem connecting to localhost or what. I eventually logged into trezor wallet and found that there too, my device was not recognized (probably because I did not unplug it/replug it after updating the HID settings.) Unplugging it and replugging it fixed everything.
2017-04-10 10:40:27 +03:00
Roman Zeyde
c796a3b01d README: password manager usage example 2017-03-28 21:21:35 +03:00
Roman Zeyde
a3362bbf3e bump version 2017-03-18 16:49:09 +02:00
Nubis
9a271d115b Allow contents in buffer when using _legacy_pubs 2017-03-09 20:13:23 +02:00
Roman Zeyde
6a7165298f decode: skip invalid pubkeys (instead of crashing) 2017-03-07 21:43:32 +02:00
Roman Zeyde
c4f3fa6e04 agent: reply correctly to HAVEKEY requests 2017-02-21 13:12:02 +02:00
Roman Zeyde
8a77fa519f decode: raise an error when keygrip is missing 2017-02-21 13:12:00 +02:00
Roman Zeyde
59560ec0b0 util: add simple memoization decorator 2017-02-21 13:05:48 +02:00
Roman Zeyde
7a91196dd5 agent: add link to HAVEKEY implementation 2017-02-21 11:48:22 +02:00
Roman Zeyde
43c424a402 ssh: allow "just-in-time" connection for agent-like behaviour
This would allow launching trezor-agent into the background
during the system startup, and the connecting the device
when the cryptographic operations are required.
2017-01-07 18:29:12 +02:00
Roman Zeyde
6672ea9bc4 device: set passphrase from environment 2017-01-06 12:52:45 +02:00
Roman Zeyde
002dc2a0e0 tox: order imports 2017-01-06 12:37:14 +02:00
Roman Zeyde
61ced2808f device: allow non-empty passphrases 2017-01-06 11:59:57 +02:00
Roman Zeyde
71a8930021 bump version 2017-01-05 23:12:54 +02:00
Roman Zeyde
74e8f21a22 gpg: export secret subkey 2017-01-01 18:14:52 +02:00
Roman Zeyde
897236d556 gpg: allow decoding secret keys 2017-01-01 18:14:28 +02:00
Roman Zeyde
5bec0e8382 README: upgrade also pip 2017-01-01 10:08:19 +02:00
Roman Zeyde
3cb7f6fd21 gpg: export secret primary key 2016-12-30 18:55:18 +02:00
Roman Zeyde
cad2ec1239 device: import device-specific defs module lazily
It may fail on unsupported platforms (e.g. keepkeylib does not supoprt Python 3)
2016-12-27 12:34:07 +02:00
Roman Zeyde
604b2b7e99 gpg: allow GPG 2.1.11+ (to support Ubuntu 16.04 & Mint 18) 2016-12-27 10:12:34 +02:00
Roman Zeyde
159bd79b5f gpg: list fingerpints explicitly during init 2016-12-27 10:04:48 +02:00
Roman Zeyde
dde0b60e83 Merge pull request #87 from aceat64/bugfix/mosh
Mosh doesn't support "-l" for user, only user@host for args
2016-12-15 19:25:19 +02:00
Andrew LeCody
109bb3b47f Mosh doesn't support "-l" for user, only user@host for args 2016-12-14 23:59:45 -06:00
Roman Zeyde
0f20bfa239 README: add a note about udev 2016-12-06 18:36:14 +02:00
Roman Zeyde
798597c436 bump version 2016-12-06 10:19:23 +02:00
Roman Zeyde
a13b1103f7 setup: temporary pin dependency for keepkey compatibilty 2016-12-06 10:07:18 +02:00
Roman Zeyde
9fe1a235c1 gpg: check that the configuration is in place 2016-12-02 13:10:33 +02:00
Roman Zeyde
f86aae9a40 gpg: check that GnuPG v2 is installed 2016-12-02 12:48:56 +02:00
Roman Zeyde
fc070e3ca0 GPG: add OS X installation link to README 2016-11-30 23:08:56 +02:00
Roman Zeyde
05fac995eb README: add 'setup.py' installation 2016-11-26 14:01:06 +02:00
Roman Zeyde
188b74b327 gpg: use explicit '--subkey' flag for adding a subkey to an existing GPG key 2016-11-25 19:35:40 +02:00
Roman Zeyde
fc31847f8e decode: add test for custom markers 2016-11-19 20:06:29 +02:00
Roman Zeyde
0faf21a102 README: easier tag signature verification 2016-11-19 13:37:33 +02:00
Roman Zeyde
6b82f8b9b7 keyring: add test for get_agent_sock_path() 2016-11-12 20:51:35 +02:00
Roman Zeyde
fabfcaaae2 keyring: fix test case for iterlines() 2016-11-12 20:51:14 +02:00
Roman Zeyde
f0f89310ac main: add '--mosh' for better SSH client 2016-11-11 22:26:22 +02:00
Roman Zeyde
47ff7c5cb3 gpg: add usage example for GPA 2016-11-11 20:05:47 +02:00
Roman Zeyde
0440025083 gpg: use explicit function to check for custom subpacket marker 2016-11-11 13:02:02 +02:00
Roman Zeyde
c49fe97f63 gpg: remove unused parser for literal packets 2016-11-11 13:01:54 +02:00
Roman Zeyde
7f8abcb5c5 client: remove unused code 2016-11-11 13:01:47 +02:00
Roman Zeyde
e13039e52d gpg: remove property method and unused member variable from PublicKey 2016-11-11 13:01:33 +02:00
Roman Zeyde
c420571eb8 gpg: import test coverage for protocol 2016-11-11 09:14:33 +02:00
Roman Zeyde
827119a18d gpg: handle KILLAGENT command
so `gpg-connect-agent KILLAGENT` should stop the running agent
2016-11-10 23:29:47 +02:00
Roman Zeyde
9be6504658 util: import test coverage 2016-11-10 14:33:41 +02:00
Roman Zeyde
07cbe65875 formats: improve test coverage 2016-11-10 14:33:27 +02:00
Roman Zeyde
180120e787 README: link to trezor-ssh-agent for Windows support 2016-11-10 13:09:53 +02:00
Roman Zeyde
f4ce81fa94 README: add 1.4.0-related post 2016-11-10 13:03:12 +02:00
Roman Zeyde
176bf4ef7c README: add Hg demo 2016-11-08 21:45:42 +02:00
Roman Zeyde
d22cd7512d gpg: launch agent before starting the shell 2016-11-06 22:39:45 +02:00
Roman Zeyde
83f17704cb server: remove 'SSH_AUTH_SOCK=' from logging 2016-11-06 22:02:35 +02:00
Roman Zeyde
92f6751ccb README: multiple SSH identities from config file 2016-11-06 20:46:42 +02:00
Roman Zeyde
abe80533eb README: trim whitespace 2016-11-06 20:43:45 +02:00
57 changed files with 669 additions and 296 deletions

View File

@@ -1,2 +1,2 @@
[MESSAGES CONTROL]
disable=invalid-name, missing-docstring, locally-disabled, unbalanced-tuple-unpacking
disable=invalid-name, missing-docstring, locally-disabled, unbalanced-tuple-unpacking,no-else-return

View File

@@ -4,29 +4,23 @@ python:
- "2.7"
- "3.4"
- "3.5"
- "3.6"
cache:
directories:
- $HOME/.cache/pip
addons:
apt:
packages:
- libudev-dev
- libusb-1.0-0-dev
before_install:
- pip install -U setuptools pylint coverage pep8 pydocstyle "pip>=7.0" wheel
- pip install -e git+https://github.com/keepkey/python-keepkey@6e8baa8b935e830d05f87b6dfd9bc7c927a96dc3#egg=keepkey
install:
- pip install -e .
script:
- pep8 trezor_agent
- pylint --reports=no --rcfile .pylintrc trezor_agent
- pydocstyle trezor_agent
- coverage run --source trezor_agent/ -m py.test -v
- pep8 libagent
- pylint --reports=no --rcfile .pylintrc libagent
- pydocstyle libagent
- coverage run --source libagent/ -m py.test -v
after_success:
- coverage report

View File

@@ -4,13 +4,18 @@ Thanks!
# Installation
First, verify that you have GPG 2.1+ [installed](https://gist.github.com/vt0r/a2f8c0bcb1400131ff51):
First, verify that you have GPG 2.1.11+ installed
([Debian](https://gist.github.com/vt0r/a2f8c0bcb1400131ff51),
[macOS](https://sourceforge.net/p/gpgosx/docu/Download/)):
```
$ gpg2 --version | head -n1
gpg (GnuPG) 2.1.15
```
This GPG version is included in [Ubuntu 16.04](https://launchpad.net/ubuntu/+source/gnupg2)
and [Linux Mint 18](https://community.linuxmint.com/software/view/gnupg2).
Update you TREZOR firmware to the latest version (at least v1.4.0).
Install latest `trezor-agent` package from GitHub:
@@ -26,12 +31,51 @@ $ pip install --user git+https://github.com/romanz/trezor-agent.git
## Sample usage (signature and decryption)
[![asciicast](https://asciinema.org/a/7x0h9tyoyu5ar6jc8y9oih0ba.png)](https://asciinema.org/a/7x0h9tyoyu5ar6jc8y9oih0ba)
You can use GNU Privacy Assistant (GPA) in order to inspect the created keys
and perform signature and decryption operations using:
```
$ sudo apt install gpa
$ ./scripts/gpg-shell gpa
```
[![GPA](https://cloud.githubusercontent.com/assets/9900/20224804/053d7474-a849-11e6-87f3-ab07dc536158.png)](https://www.gnupg.org/related_software/swlist.html#gpa)
## Git commit & tag signatures:
Git can use GPG to sign and verify commits and tags (see [here](https://git-scm.com/book/en/v2/Git-Tools-Signing-Your-Work)):
```
$ git config --local gpg.program $(which gpg2)
$ git commit --gpg-sign # create GPG-signed commit
$ git log --show-signature -1 # verify commit signature
$ git tag --sign "v1.2.3" # create GPG-signed tag
$ git verify-tag "v1.2.3" # verify tag signature
$ git tag v1.2.3 --sign # create GPG-signed tag
$ git tag v1.2.3 --verify # verify tag signature
```
## Password manager
First install `pass` from [passwordstore.org](https://www.passwordstore.org/) and initialize it to use your TREZOR-based GPG identity:
```
$ ./scripts/gpg-shell
$ pass init "Roman Zeyde <roman.zeyde@gmail.com>"
Password store initialized for Roman Zeyde <roman.zeyde@gmail.com>
```
Then, you can generate truly random passwords and save them encrypted using your public key (as separate `.gpg` files under `~/.password-store/`):
```
$ pass generate Dev/github 32
$ pass generate Social/hackernews 32
$ pass generate Social/twitter 32
$ pass generate VPS/linode 32
$ pass
Password Store
├── Dev
│   └── github
├── Social
│   ├── hackernews
│   └── twitter
└── VPS
└── linode
```
In order to paste them into the browser, you'd need to decrypt the password using your hardware device:
```
$ pass --clip VPS/linode
Copied VPS/linode to clipboard. Will clear in 45 seconds.
```

View File

@@ -9,6 +9,9 @@
## Using for GitHub SSH authentication (via `trezor-git` utility)
[![GitHub](https://asciinema.org/a/38337.png)](https://asciinema.org/a/38337)
## Loading multiple SSH identities from configuration file
[![Config](https://asciinema.org/a/bdxxtgctk5syu56yfz8lcp7ny.png)](https://asciinema.org/a/bdxxtgctk5syu56yfz8lcp7ny)
# Public key generation
Run:
@@ -44,7 +47,7 @@ Run:
Make sure to confirm SSH signature on the Trezor device when requested.
## Accessing remote Git repositories
## Accessing remote Git/Mercurial repositories
Use your SSH public key to access your remote repository (e.g. [GitHub](https://help.github.com/articles/adding-a-new-ssh-key-to-your-github-account/)):
@@ -58,6 +61,10 @@ Replace `git` with `git_hub` for remote operations:
$ git_hub push origin master
The same works for Mercurial (e.g. on [BitBucket](https://confluence.atlassian.com/bitbucket/set-up-ssh-for-mercurial-728138122.html)):
$ trezor-agent -v -e ed25519 git@bitbucket.org -- hg push
# Troubleshooting
@@ -69,10 +76,10 @@ with a verbose log attached (by running `trezor-agent -vv`) .
Note that your local SSH configuration may ignore `trezor-agent`, if it has `IdentitiesOnly` option set to `yes`.
IdentitiesOnly
Specifies that ssh(1) should only use the authentication identity files configured in
the ssh_config files, even if ssh-agent(1) or a PKCS11Provider offers more identities.
Specifies that ssh(1) should only use the authentication identity files configured in
the ssh_config files, even if ssh-agent(1) or a PKCS11Provider offers more identities.
The argument to this keyword must be “yes” or “no”.
This option is intended for situations where ssh-agent offers many different identities.
This option is intended for situations where ssh-agent offers many different identities.
The default is “no”.
If you are failing to connect, try running:

View File

@@ -10,34 +10,64 @@ See SatoshiLabs' blog posts about this feature:
- [TREZOR Firmware 1.3.4 enables SSH login](https://medium.com/@satoshilabs/trezor-firmware-1-3-4-enables-ssh-login-86a622d7e609)
- [TREZOR Firmware 1.3.6GPG Signing, SSH Login Updates and Advanced Transaction Features for Segwit](https://medium.com/@satoshilabs/trezor-firmware-1-3-6-20a7df6e692)
- [TREZOR Firmware 1.4.0GPG decryption support](https://www.reddit.com/r/TREZOR/comments/50h8r9/new_trezor_firmware_fidou2f_and_initial_ethereum/d7420q7/)
## Installation
First, make sure that the latest [trezorlib](https://pypi.python.org/pypi/trezor) Python package
is installed correctly (at least v0.6.6):
Install the following packages:
$ apt-get install python-dev libusb-1.0-0-dev libudev-dev
$ pip install -U setuptools
$ pip install Cython trezor
$ pip install -U setuptools pip
Make sure you are running the latest firmware version on your hardware device.
Currently the following firmware versions are supported:
* [TREZOR](https://wallet.trezor.io/data/firmware/releases.json): `1.4.2+`
* [KeepKey](https://github.com/keepkey/keepkey-firmware/releases): `3.0.17+`
* [Ledger Nano S](https://github.com/LedgerHQ/blue-app-ssh-agent): `0.0.3+`
### TREZOR
Make sure that your `udev` rules are configured [correctly](https://doc.satoshilabs.com/trezor-user/settingupchromeonlinux.html#manual-configuration-of-udev-rules).
Then, install the latest [trezor_agent](https://pypi.python.org/pypi/trezor_agent) package:
$ pip install trezor_agent
Finally, verify that you are running the latest [TREZOR firmware](https://wallet.mytrezor.com/data/firmware/releases.json) version (at least v1.4.0):
Or, directly from the latest source code:
$ trezorctl get_features | head
vendor: "bitcointrezor.com"
major_version: 1
minor_version: 4
patch_version: 0
...
$ git clone https://github.com/romanz/trezor-agent
$ pip install --user -e trezor-agent/agents/trezor
If you have an error regarding `protobuf` imports (after installing it), please see [this issue](https://github.com/romanz/trezor-agent/issues/28).
### KeepKey
Make sure that your `udev` rules are configured [correctly](https://support.keepkey.com/support/solutions/articles/6000037796-keepkey-wallet-is-not-being-recognized-by-linux).
Then, install the latest [keepkey_agent](https://pypi.python.org/pypi/keepkey_agent) package:
$ pip install keepkey_agent
Or, directly from the latest source code:
$ git clone https://github.com/romanz/trezor-agent
$ pip install --user -e trezor-agent/agents/keepkey
### Ledger Nano S
Make sure that your `udev` rules are configured [correctly](http://support.ledgerwallet.com/knowledge_base/topics/ledger-wallet-is-not-recognized-on-linux).
Then, install the latest [ledger_agent](https://pypi.python.org/pypi/ledger_agent) package:
$ pip install ledger_agent
Or, directly from the latest source code:
$ git clone https://github.com/romanz/trezor-agent
$ pip install --user -e trezor-agent/agents/ledger
## Usage
For SSH, see the [following instructions](README-SSH.md).
For SSH, see the [following instructions](README-SSH.md) (for Windows support,
see [trezor-ssh-agent](https://github.com/martin-lizner/trezor-ssh-agent) project (by Martin Lízner)).
For GPG, see the [following instructions](README-GPG.md).

View File

@@ -0,0 +1,5 @@
import libagent.gpg
import libagent.ssh
from libagent.device import keepkey
ssh_agent = lambda: libagent.ssh.main(keepkey.KeepKey)

34
agents/keepkey/setup.py Normal file
View File

@@ -0,0 +1,34 @@
#!/usr/bin/env python
from setuptools import setup
setup(
name='keepkey_agent',
version='0.9.0',
description='Using KeepKey as hardware SSH/GPG agent',
author='Roman Zeyde',
author_email='roman.zeyde@gmail.com',
url='http://github.com/romanz/trezor-agent',
scripts=['keepkey_agent.py'],
install_requires=['libagent>=0.9.0', 'keepkey>=0.7.3'],
platforms=['POSIX'],
classifiers=[
'Environment :: Console',
'Development Status :: 4 - Beta',
'Intended Audience :: Developers',
'Intended Audience :: Information Technology',
'Intended Audience :: System Administrators',
'License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)',
'Operating System :: POSIX',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Topic :: Software Development :: Libraries :: Python Modules',
'Topic :: System :: Networking',
'Topic :: Communications',
'Topic :: Security',
'Topic :: Utilities',
],
entry_points={'console_scripts': [
'keepkey-agent = keepkey_agent:ssh_agent',
]},
)

View File

@@ -0,0 +1,7 @@
import libagent.gpg
import libagent.ssh
from libagent.device.ledger import LedgerNanoS as DeviceType
ssh_agent = lambda: libagent.ssh.main(DeviceType)
gpg_tool = lambda: libagent.gpg.main(DeviceType)
gpg_agent = lambda: libagent.gpg.run_agent(DeviceType)

36
agents/ledger/setup.py Normal file
View File

@@ -0,0 +1,36 @@
#!/usr/bin/env python
from setuptools import setup
setup(
name='ledger_agent',
version='0.9.0',
description='Using Ledger as hardware SSH/GPG agent',
author='Roman Zeyde',
author_email='roman.zeyde@gmail.com',
url='http://github.com/romanz/trezor-agent',
scripts=['ledger_agent.py'],
install_requires=['libagent>=0.9.0', 'ledgerblue>=0.1.8'],
platforms=['POSIX'],
classifiers=[
'Environment :: Console',
'Development Status :: 4 - Beta',
'Intended Audience :: Developers',
'Intended Audience :: Information Technology',
'Intended Audience :: System Administrators',
'License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)',
'Operating System :: POSIX',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Topic :: Software Development :: Libraries :: Python Modules',
'Topic :: System :: Networking',
'Topic :: Communications',
'Topic :: Security',
'Topic :: Utilities',
],
entry_points={'console_scripts': [
'ledger-agent = ledger_agent:ssh_agent',
'ledger-gpg = ledger_agent:gpg_tool',
'ledger-gpg-agent = ledger_agent:gpg_agent',
]},
)

36
agents/trezor/setup.py Normal file
View File

@@ -0,0 +1,36 @@
#!/usr/bin/env python
from setuptools import setup
setup(
name='trezor_agent',
version='0.9.0',
description='Using Trezor as hardware SSH/GPG agent',
author='Roman Zeyde',
author_email='roman.zeyde@gmail.com',
url='http://github.com/romanz/trezor-agent',
scripts=['trezor_agent.py'],
install_requires=['libagent>=0.9.0', 'trezor>=0.7.6'],
platforms=['POSIX'],
classifiers=[
'Environment :: Console',
'Development Status :: 4 - Beta',
'Intended Audience :: Developers',
'Intended Audience :: Information Technology',
'Intended Audience :: System Administrators',
'License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)',
'Operating System :: POSIX',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Topic :: Software Development :: Libraries :: Python Modules',
'Topic :: System :: Networking',
'Topic :: Communications',
'Topic :: Security',
'Topic :: Utilities',
],
entry_points={'console_scripts': [
'trezor-agent = trezor_agent:ssh_agent',
'trezor-gpg = trezor_agent:gpg_tool',
'trezor-gpg-agent = trezor_agent:gpg_agent',
]},
)

View File

@@ -0,0 +1,7 @@
import libagent.gpg
import libagent.ssh
from libagent.device.trezor import Trezor as DeviceType
ssh_agent = lambda: libagent.ssh.main(DeviceType)
gpg_tool = lambda: libagent.gpg.main(DeviceType)
gpg_agent = lambda: libagent.gpg.run_agent(DeviceType)

View File

@@ -18,15 +18,17 @@ class Client(object):
"""Connect to hardware device."""
self.device = device
def get_public_key(self, identity):
"""Get SSH public key from the device."""
def export_public_keys(self, identities):
"""Export SSH public keys from the device."""
public_keys = []
with self.device:
pubkey = self.device.pubkey(identity)
vk = formats.decompress_pubkey(pubkey=pubkey,
curve_name=identity.curve_name)
return formats.export_public_key(vk=vk,
label=str(identity))
for i in identities:
pubkey = self.device.pubkey(identity=i)
vk = formats.decompress_pubkey(pubkey=pubkey,
curve_name=i.curve_name)
public_keys.append(formats.export_public_key(vk=vk,
label=str(i)))
return public_keys
def sign_ssh_challenge(self, blob, identity):
"""Sign given blob using a private key on the device."""

View File

@@ -0,0 +1,3 @@
"""Cryptographic hardware device management."""
from . import interface

View File

@@ -54,7 +54,7 @@ class NotFoundError(Error):
class DeviceError(Error):
""""Error during device operation."""
"""Error during device operation."""
class Identity(object):
@@ -82,7 +82,7 @@ class Identity(object):
s = io.BytesIO(bytearray(digest))
hardened = 0x80000000
addr_0 = [13, 17][bool(ecdh)]
addr_0 = 17 if bool(ecdh) else 13
address_n = [addr_0] + list(util.recv(s, '<LLLL'))
return [(hardened | value) for value in address_n]

View File

@@ -20,7 +20,10 @@ def _verify_support(identity, ecdh):
class KeepKey(trezor.Trezor):
"""Connection to KeepKey device."""
from . import keepkey_defs as defs
@property
def _defs(self):
from . import keepkey_defs
return keepkey_defs
required_version = '>=1.0.4'

View File

@@ -1,8 +1,9 @@
"""KeepKey-related definitions."""
# pylint: disable=unused-import
# pylint: disable=unused-import,import-error
from keepkeylib.client import CallException as Error
from keepkeylib.client import KeepKeyClient as Client
from keepkeylib.client import CallException
from keepkeylib.transport_hid import HidTransport
from keepkeylib.messages_pb2 import PassphraseAck
from keepkeylib.transport_hid import HidTransport as Transport
from keepkeylib.types_pb2 import IdentityType

View File

@@ -4,7 +4,7 @@ import binascii
import logging
import struct
from ledgerblue import comm
from ledgerblue import comm # pylint: disable=import-error
from . import interface

View File

@@ -2,6 +2,8 @@
import binascii
import logging
import os
import semver
from . import interface
@@ -12,20 +14,30 @@ log = logging.getLogger(__name__)
class Trezor(interface.Device):
"""Connection to TREZOR device."""
from . import trezor_defs as defs
@property
def _defs(self):
from . import trezor_defs
# Allow using TREZOR bridge transport (instead of the HID default)
trezor_defs.Transport = {
'bridge': trezor_defs.BridgeTransport,
}.get(os.environ.get('TREZOR_TRANSPORT'), trezor_defs.HidTransport)
return trezor_defs
required_version = '>=1.4.0'
passphrase = os.environ.get('TREZOR_PASSPHRASE', '')
def connect(self):
"""Enumerate and connect to the first USB HID interface."""
def empty_passphrase_handler(_):
return self.defs.PassphraseAck(passphrase='')
def passphrase_handler(_):
log.debug('using %s passphrase for %s',
'non-empty' if self.passphrase else 'empty', self)
return self._defs.PassphraseAck(passphrase=self.passphrase)
for d in self.defs.HidTransport.enumerate():
for d in self._defs.Transport.enumerate():
log.debug('endpoint: %s', d)
transport = self.defs.HidTransport(d)
connection = self.defs.Client(transport)
connection.callback_PassphraseRequest = empty_passphrase_handler
transport = self._defs.Transport(d)
connection = self._defs.Client(transport)
connection.callback_PassphraseRequest = passphrase_handler
f = connection.features
log.debug('connected to %s %s', self, f.device_id)
log.debug('label : %s', f.label)
@@ -60,7 +72,7 @@ class Trezor(interface.Device):
return result.node.public_key
def _identity_proto(self, identity):
result = self.defs.IdentityType()
result = self._defs.IdentityType()
for name, value in identity.items():
setattr(result, name, value)
return result
@@ -80,7 +92,7 @@ class Trezor(interface.Device):
assert len(result.signature) == 65
assert result.signature[:1] == b'\x00'
return result.signature[1:]
except self.defs.CallException as e:
except self._defs.Error as e:
msg = '{} error: {}'.format(self, e)
log.debug(msg, exc_info=True)
raise interface.DeviceError(msg)
@@ -99,7 +111,7 @@ class Trezor(interface.Device):
assert len(result.session_key) in {65, 33} # NIST256 or Curve25519
assert result.session_key[:1] == b'\x04'
return result.session_key
except self.defs.CallException as e:
except self._defs.Error as e:
msg = '{} error: {}'.format(self, e)
log.debug(msg, exc_info=True)
raise interface.DeviceError(msg)

View File

@@ -1,8 +1,10 @@
"""TREZOR-related definitions."""
# pylint: disable=unused-import
# pylint: disable=unused-import,import-error
from trezorlib.client import CallException as Error
from trezorlib.client import TrezorClient as Client
from trezorlib.client import CallException
from trezorlib.transport_hid import HidTransport
from trezorlib.messages_pb2 import PassphraseAck
from trezorlib.transport_bridge import BridgeTransport
from trezorlib.transport_hid import HidTransport
from trezorlib.types_pb2 import IdentityType

106
trezor_agent/gpg/__main__.py → libagent/gpg/__init__.py Executable file → Normal file
View File

@@ -1,8 +1,15 @@
#!/usr/bin/env python
"""Create signatures and export public keys for GPG using TREZOR."""
"""
TREZOR support for ECDSA GPG signatures.
See these links for more details:
- https://www.gnupg.org/faq/whats-new-in-2.1.html
- https://tools.ietf.org/html/rfc4880
- https://tools.ietf.org/html/rfc6637
- https://tools.ietf.org/html/draft-irtf-cfrg-eddsa-05
"""
import argparse
import contextlib
import io
import logging
import os
import sys
@@ -10,30 +17,23 @@ import time
import semver
from . import agent, decode, client, encode, keyring, protocol
from . import agent, client, encode, keyring, protocol
from .. import device, formats, server, util
log = logging.getLogger(__name__)
def key_exists(user_id):
"""Return True iff there is a GPG key with specified user ID."""
for p in decode.parse_packets(io.BytesIO(keyring.export_public_keys())):
if p['type'] == 'user_id' and p['value'] == user_id:
return True
return False
def run_create(args):
def export_public_key(device_type, args):
"""Generate a new pubkey for a new/existing GPG identity."""
log.warning('NOTE: in order to re-generate the exact same GPG key later, '
'run this command with "--time=%d" commandline flag (to set '
'the timestamp of the GPG key manually).', args.time)
d = client.Client(user_id=args.user_id, curve_name=args.ecdsa_curve)
verifying_key = d.pubkey(ecdh=False)
decryption_key = d.pubkey(ecdh=True)
c = client.Client(user_id=args.user_id, curve_name=args.ecdsa_curve,
device_type=device_type)
verifying_key = c.pubkey(ecdh=False)
decryption_key = c.pubkey(ecdh=True)
if key_exists(args.user_id): # add as subkey
if args.subkey: # add as subkey
log.info('adding %s GPG subkey for "%s" to existing key',
args.ecdsa_curve, args.user_id)
# subkey for signing
@@ -47,10 +47,10 @@ def run_create(args):
primary_bytes = keyring.export_public_key(args.user_id)
result = encode.create_subkey(primary_bytes=primary_bytes,
subkey=signing_key,
signer_func=d.sign)
signer_func=c.sign)
result = encode.create_subkey(primary_bytes=result,
subkey=encryption_key,
signer_func=d.sign)
signer_func=c.sign)
else: # add as primary
log.info('creating new %s GPG primary key for "%s"',
args.ecdsa_curve, args.user_id)
@@ -65,41 +65,46 @@ def run_create(args):
result = encode.create_primary(user_id=args.user_id,
pubkey=primary,
signer_func=d.sign)
signer_func=c.sign)
result = encode.create_subkey(primary_bytes=result,
subkey=subkey,
signer_func=d.sign)
signer_func=c.sign)
sys.stdout.write(protocol.armor(result, 'PUBLIC KEY BLOCK'))
def main_create():
"""Main function for GPG identity creation."""
p = argparse.ArgumentParser()
p.add_argument('user_id')
p.add_argument('-e', '--ecdsa-curve', default='nist256p1')
p.add_argument('-t', '--time', type=int, default=int(time.time()))
p.add_argument('-v', '--verbose', default=0, action='count')
args = p.parse_args()
def run_create(device_type, args):
"""Export public GPG key."""
util.setup_logging(verbosity=args.verbose)
log.warning('This GPG tool is still in EXPERIMENTAL mode, '
'so please note that the API and features may '
'change without backwards compatibility!')
existing_gpg = keyring.gpg_version().decode('ascii')
required_gpg = '>=2.1.15'
required_gpg = '>=2.1.11'
if semver.match(existing_gpg, required_gpg):
run_create(args)
export_public_key(device_type, args)
else:
log.error('Existing gpg2 has version "%s" (%s required)',
existing_gpg, required_gpg)
def main_agent():
def run_unlock(device_type, args):
"""Unlock hardware device (for future interaction)."""
util.setup_logging(verbosity=args.verbose)
with device_type() as d:
log.info('unlocked %s device', d)
def run_agent(device_type):
"""Run a simple GPG-agent server."""
home_dir = os.environ.get('GNUPGHOME', os.path.expanduser('~/.gnupg/trezor'))
config_file = os.path.join(home_dir, 'gpg-agent.conf')
parser = argparse.ArgumentParser()
parser.add_argument('--homedir', default=os.environ.get('GNUPGHOME'))
args, _ = parser.parse_known_args()
assert args.homedir
config_file = os.path.join(args.homedir, 'gpg-agent.conf')
lines = (line.strip() for line in open(config_file))
lines = (line for line in lines if line and not line.startswith('#'))
config = dict(line.split(' ', 1) for line in lines)
@@ -111,17 +116,30 @@ def main_agent():
for conn in agent.yield_connections(sock):
with contextlib.closing(conn):
try:
agent.handle_connection(conn)
agent.handle_connection(conn=conn, device_type=device_type)
except StopIteration:
log.info('stopping gpg-agent')
return
except Exception as e: # pylint: disable=broad-except
log.exception('gpg-agent failed: %s', e)
def auto_unlock():
"""Automatically unlock first found device (used for `gpg-shell`)."""
p = argparse.ArgumentParser()
p.add_argument('-v', '--verbose', default=0, action='count')
def main(device_type):
"""Parse command-line arguments."""
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers()
args = p.parse_args()
util.setup_logging(verbosity=args.verbose)
d = device.detect()
log.info('unlocked %s device', d)
p = subparsers.add_parser('create', help='Export public GPG key')
p.add_argument('user_id')
p.add_argument('-e', '--ecdsa-curve', default='nist256p1')
p.add_argument('-t', '--time', type=int, default=int(time.time()))
p.add_argument('-v', '--verbose', default=0, action='count')
p.add_argument('-s', '--subkey', default=False, action='store_true')
p.set_defaults(func=run_create)
p = subparsers.add_parser('unlock', help='Unlock the hardware device')
p.add_argument('-v', '--verbose', default=0, action='count')
p.set_defaults(func=run_unlock)
args = parser.parse_args()
return args.func(device_type=device_type, args=args)

View File

@@ -2,7 +2,7 @@
import binascii
import logging
from . import decode, client, keyring, protocol
from . import client, decode, keyring, protocol
from .. import util
log = logging.getLogger(__name__)
@@ -36,7 +36,7 @@ def sig_encode(r, s):
return b'(7:sig-val(5:ecdsa(1:r32:' + r + b')(1:s32:' + s + b')))'
def open_connection(keygrip_bytes):
def open_connection(keygrip_bytes, device_type):
"""
Connect to the device for the specified keygrip.
@@ -51,20 +51,20 @@ def open_connection(keygrip_bytes):
curve_name = protocol.get_curve_name_by_oid(pubkey_dict['curve_oid'])
ecdh = (pubkey_dict['algo'] == protocol.ECDH_ALGO_ID)
conn = client.Client(user_id, curve_name=curve_name)
conn = client.Client(user_id, curve_name=curve_name, device_type=device_type)
pubkey = protocol.PublicKey(
curve_name=curve_name, created=pubkey_dict['created'],
verifying_key=conn.pubkey(ecdh=ecdh), ecdh=ecdh)
assert pubkey.key_id() == pubkey_dict['key_id']
assert pubkey.keygrip == keygrip_bytes
assert pubkey.keygrip() == keygrip_bytes
return conn
def pksign(keygrip, digest, algo):
def pksign(keygrip, digest, algo, device_type):
"""Sign a message digest using a private EC key."""
log.debug('signing %r digest (algo #%s)', digest, algo)
keygrip_bytes = binascii.unhexlify(keygrip)
conn = open_connection(keygrip_bytes)
conn = open_connection(keygrip_bytes, device_type=device_type)
r, s = conn.sign(binascii.unhexlify(digest))
result = sig_encode(r, s)
log.debug('result: %r', result)
@@ -92,7 +92,7 @@ def parse_ecdh(line):
return dict(items)[b'e']
def pkdecrypt(keygrip, conn):
def pkdecrypt(keygrip, conn, device_type):
"""Handle decryption using ECDH."""
for msg in [b'S INQUIRE_MAXLEN 4096', b'INQUIRE CIPHERTEXT']:
keyring.sendline(conn, msg)
@@ -102,11 +102,24 @@ def pkdecrypt(keygrip, conn):
remote_pubkey = parse_ecdh(line)
keygrip_bytes = binascii.unhexlify(keygrip)
conn = open_connection(keygrip_bytes)
conn = open_connection(keygrip_bytes, device_type=device_type)
return _serialize_point(conn.ecdh(remote_pubkey))
def handle_connection(conn):
@util.memoize
def have_key(keygrip, device_type):
"""Check if current keygrip correspond to a TREZOR-based key."""
try:
open_connection(keygrip_bytes=binascii.unhexlify(keygrip),
device_type=device_type)
return True
except KeyError as e:
log.warning('HAVEKEY(%s) failed: %s', keygrip, e)
return False
# pylint: disable=too-many-branches
def handle_connection(conn, device_type):
"""Handle connection from GPG binary using the ASSUAN protocol."""
keygrip = None
digest = None
@@ -118,7 +131,7 @@ def handle_connection(conn):
parts = line.split(b' ')
command = parts[0]
args = parts[1:]
if command in {b'RESET', b'OPTION', b'HAVEKEY', b'SETKEYDESC'}:
if command in {b'RESET', b'OPTION', b'SETKEYDESC'}:
pass # reply with OK
elif command == b'GETINFO':
keyring.sendline(conn, b'D ' + version)
@@ -129,19 +142,28 @@ def handle_connection(conn):
elif command == b'SETHASH':
algo, digest = args
elif command == b'PKSIGN':
sig = pksign(keygrip, digest, algo)
sig = pksign(keygrip, digest, algo, device_type=device_type)
keyring.sendline(conn, b'D ' + sig)
elif command == b'PKDECRYPT':
sec = pkdecrypt(keygrip, conn)
sec = pkdecrypt(keygrip, conn, device_type=device_type)
keyring.sendline(conn, b'D ' + sec)
elif command == b'HAVEKEY':
if not have_key(keygrip=args[0], device_type=device_type):
keyring.sendline(conn,
b'ERR 67108881 No secret key <GPG Agent>')
return
elif command == b'KEYINFO':
keygrip, = args
# Dummy reply (mainly for 'gpg --edit' to succeed).
# For details, see GnuPG agent KEYINFO command help.
# https://git.gnupg.org/cgi-bin/gitweb.cgi?p=gnupg.git;a=blob;f=agent/command.c;h=c8b34e9882076b1b724346787781f657cac75499;hb=refs/heads/master#l1082
fmt = 'S KEYINFO {0} X - - - - - - -'
keyring.sendline(conn, fmt.format(keygrip).encode('ascii'))
elif command == b'BYE':
return
elif command == b'KILLAGENT':
keyring.sendline(conn, b'OK')
raise StopIteration
else:
log.error('unknown request: %r', line)
return

View File

@@ -10,9 +10,9 @@ log = logging.getLogger(__name__)
class Client(object):
"""Sign messages and get public keys from a hardware device."""
def __init__(self, user_id, curve_name):
def __init__(self, user_id, curve_name, device_type):
"""Connect to the device and retrieve required public key."""
self.device = device.detect()
self.device = device_type()
self.user_id = user_id
self.identity = device.interface.Identity(
identity_str='gpg://', curve_name=curve_name)

View File

@@ -54,7 +54,8 @@ def parse_mpis(s, n):
def _parse_nist256p1_pubkey(mpi):
prefix, x, y = util.split_bits(mpi, 4, 256, 256)
assert prefix == 4
if prefix != 4:
raise ValueError('Invalid MPI prefix: {}'.format(prefix))
point = ecdsa.ellipticcurve.Point(curve=ecdsa.NIST256p.curve,
x=x, y=y)
return ecdsa.VerifyingKey.from_public_point(
@@ -64,7 +65,8 @@ def _parse_nist256p1_pubkey(mpi):
def _parse_ed25519_pubkey(mpi):
prefix, value = util.split_bits(mpi, 8, 256)
assert prefix == 0x40
if prefix != 0x40:
raise ValueError('Invalid MPI prefix: {}'.format(prefix))
return ed25519.VerifyingKey(util.num2bytes(value, size=32))
@@ -83,18 +85,6 @@ DSA_ALGO_ID = 17
ECDSA_ALGO_IDS = {18, 19, 22} # {ecdsa, nist256, ed25519}
def _parse_literal(stream):
"""See https://tools.ietf.org/html/rfc4880#section-5.9 for details."""
p = {'type': 'literal'}
p['format'] = stream.readfmt('c')
filename_len = stream.readfmt('B')
p['filename'] = stream.read(filename_len)
p['date'] = stream.readfmt('>L')
p['content'] = stream.read()
p['_to_hash'] = p['content']
return p
def _parse_embedded_signatures(subpackets):
for packet in subpackets:
data = bytearray(packet)
@@ -104,6 +94,12 @@ def _parse_embedded_signatures(subpackets):
yield _parse_signature(util.Reader(stream))
def has_custom_subpacket(signature_packet):
"""Detect our custom public keys by matching subpacket data."""
return any(protocol.CUSTOM_KEY_LABEL == subpacket[1:]
for subpacket in signature_packet['unhashed_subpackets'])
def _parse_signature(stream):
"""See https://tools.ietf.org/html/rfc4880#section-5.2 for details."""
p = {'type': 'signature'}
@@ -127,10 +123,6 @@ def _parse_signature(stream):
log.debug('embedded sigs: %s', embedded)
p['embedded'] = embedded
# Detect our custom public keys by matching subpacket data
p['_is_custom'] = any(protocol.CUSTOM_KEY_LABEL == subpacket[1:]
for subpacket in p['unhashed_subpackets'])
p['hash_prefix'] = stream.readfmt('2s')
if p['pubkey_alg'] in ECDSA_ALGO_IDS:
p['sig'] = (parse_mpi(stream), parse_mpi(stream))
@@ -170,7 +162,7 @@ def _parse_pubkey(stream, packet_type='pubkey'):
# should be b'\x03\x01\x08\x07': SHA256 + AES128
size, = util.readfmt(leftover, 'B')
p['kdf'] = leftover.read(size)
assert not leftover.read()
p['secret'] = leftover.read()
parse_func, keygrip_func = SUPPORTED_CURVES[oid]
keygrip = keygrip_func(parse_func(mpi))
@@ -209,8 +201,9 @@ _parse_attribute = functools.partial(_parse_user_id,
PACKET_TYPES = {
2: _parse_signature,
5: _parse_pubkey,
6: _parse_pubkey,
11: _parse_literal,
7: _parse_subkey,
13: _parse_user_id,
14: _parse_subkey,
17: _parse_attribute,
@@ -254,11 +247,13 @@ def parse_packets(stream):
packet_data = reader.read(packet_size)
packet_type = PACKET_TYPES.get(tag)
p = {'type': 'unknown', 'tag': tag, 'raw': packet_data}
if packet_type is not None:
p = packet_type(util.Reader(io.BytesIO(packet_data)))
p['tag'] = tag
else:
p = {'type': 'unknown', 'tag': tag, 'raw': packet_data}
try:
p = packet_type(util.Reader(io.BytesIO(packet_data)))
p['tag'] = tag
except ValueError:
log.exception('Skipping packet: %s', util.hexlify(packet_data))
log.debug('packet "%s": %s', p['type'], p)
yield p
@@ -300,6 +295,7 @@ def load_by_keygrip(pubkey_bytes, keygrip):
for p in packets:
if p.get('keygrip') == keygrip:
return p, user_ids
raise KeyError('{} keygrip not found'.format(util.hexlify(keygrip)))
def load_signature(stream, original_data):

View File

@@ -8,9 +8,10 @@ from .. import util
log = logging.getLogger(__name__)
def create_primary(user_id, pubkey, signer_func):
def create_primary(user_id, pubkey, signer_func, secret_bytes=b''):
"""Export new primary GPG public key, ready for "gpg2 --import"."""
pubkey_packet = protocol.packet(tag=6, blob=pubkey.data())
pubkey_packet = protocol.packet(tag=(5 if secret_bytes else 6),
blob=(pubkey.data() + secret_bytes))
user_id_packet = protocol.packet(tag=13,
blob=user_id.encode('ascii'))
@@ -47,9 +48,10 @@ def create_primary(user_id, pubkey, signer_func):
return pubkey_packet + user_id_packet + sign_packet
def create_subkey(primary_bytes, subkey, signer_func, user_id=None):
def create_subkey(primary_bytes, subkey, signer_func, secret_bytes=b''):
"""Export new subkey to GPG primary key."""
subkey_packet = protocol.packet(tag=14, blob=subkey.data())
subkey_packet = protocol.packet(tag=(7 if secret_bytes else 14),
blob=(subkey.data() + secret_bytes))
packets = list(decode.parse_packets(io.BytesIO(primary_bytes)))
primary, user_id, signature = packets[:3]
@@ -87,7 +89,7 @@ def create_subkey(primary_bytes, subkey, signer_func, user_id=None):
unhashed_subpackets.append(protocol.subpacket(32, embedded_sig))
unhashed_subpackets.append(protocol.CUSTOM_SUBPACKET)
if not signature['_is_custom']:
if not decode.has_custom_subpacket(signature):
signer_func = keyring.create_agent_signer(user_id['value'])
signature = protocol.make_signature(

View File

@@ -185,6 +185,7 @@ class PublicKey(object):
def __init__(self, curve_name, created, verifying_key, ecdh=False):
"""Contruct using a ECDSA VerifyingKey object."""
self.curve_name = curve_name
self.curve_info = SUPPORTED_CURVES[curve_name]
self.created = int(created) # time since Epoch
self.verifying_key = verifying_key
@@ -196,12 +197,8 @@ class PublicKey(object):
self.algo_id = self.curve_info['algo_id']
self.ecdh_packet = b''
hex_key_id = util.hexlify(self.key_id())[-8:]
self.desc = 'GPG public key {}/{}'.format(curve_name, hex_key_id)
@property
def keygrip(self):
"""Compute GPG2 keygrip."""
"""Compute GPG keygrip of the verifying key."""
return self.curve_info['keygrip'](self.verifying_key)
def data(self):
@@ -227,7 +224,8 @@ class PublicKey(object):
def __repr__(self):
"""Short (8 hexadecimal digits) GPG key ID."""
return self.desc
hex_key_id = util.hexlify(self.key_id())[-8:]
return 'GPG public key {}/{}'.format(self.curve_name, hex_key_id)
__str__ = __repr__

View File

@@ -41,5 +41,22 @@ def public_key_path(request):
def test_gpg_files(public_key_path): # pylint: disable=redefined-outer-name
with open(public_key_path, 'rb') as f:
packets = list(decode.parse_packets(f))
assert len(packets) > 0
assert list(decode.parse_packets(f))
def test_has_custom_subpacket():
sig = {'unhashed_subpackets': []}
assert not decode.has_custom_subpacket(sig)
custom_markers = [
protocol.CUSTOM_SUBPACKET,
protocol.subpacket(10, protocol.CUSTOM_KEY_LABEL),
]
for marker in custom_markers:
sig = {'unhashed_subpackets': [marker]}
assert decode.has_custom_subpacket(sig)
def test_load_by_keygrip_missing():
with pytest.raises(KeyError):
decode.load_by_keygrip(pubkey_bytes=b'', keygrip=b'')

View File

@@ -80,4 +80,22 @@ PKSIGN
def test_iterlines():
sock = FakeSocket()
sock.rx.write(b'foo\nbar\nxyz')
assert list(keyring.iterlines(sock)) == []
sock.rx.seek(0)
assert list(keyring.iterlines(sock)) == [b'foo', b'bar']
def test_get_agent_sock_path():
sp = mock.Mock(spec=['check_output'])
sp.check_output.return_value = b'''sysconfdir:/usr/local/etc/gnupg
bindir:/usr/local/bin
libexecdir:/usr/local/libexec
libdir:/usr/local/lib/gnupg
datadir:/usr/local/share/gnupg
localedir:/usr/local/share/locale
dirmngr-socket:/run/user/1000/gnupg/S.dirmngr
agent-ssh-socket:/run/user/1000/gnupg/S.gpg-agent.ssh
agent-socket:/run/user/1000/gnupg/S.gpg-agent
homedir:/home/roman/.gnupg
'''
expected = b'/run/user/1000/gnupg/S.gpg-agent'
assert keyring.get_agent_sock_path(sp=sp) == expected

View File

@@ -1,5 +1,6 @@
import ecdsa
import ed25519
import pytest
from .. import protocol
from ... import formats
@@ -69,7 +70,7 @@ def test_nist256p1():
pk = protocol.PublicKey(curve_name=formats.CURVE_NIST256,
created=42, verifying_key=vk)
assert repr(pk) == 'GPG public key nist256p1/F82361D9'
assert pk.keygrip == b'\x95\x85.\x91\x7f\xe2\xc3\x91R\xba\x99\x81\x92\xb5y\x1d\xb1\\\xdc\xf0'
assert pk.keygrip() == b'\x95\x85.\x91\x7f\xe2\xc3\x91R\xba\x99\x81\x92\xb5y\x1d\xb1\\\xdc\xf0'
def test_nist256p1_ecdh():
@@ -78,7 +79,7 @@ def test_nist256p1_ecdh():
pk = protocol.PublicKey(curve_name=formats.CURVE_NIST256,
created=42, verifying_key=vk, ecdh=True)
assert repr(pk) == 'GPG public key nist256p1/5811DF46'
assert pk.keygrip == b'\x95\x85.\x91\x7f\xe2\xc3\x91R\xba\x99\x81\x92\xb5y\x1d\xb1\\\xdc\xf0'
assert pk.keygrip() == b'\x95\x85.\x91\x7f\xe2\xc3\x91R\xba\x99\x81\x92\xb5y\x1d\xb1\\\xdc\xf0'
def test_ed25519():
@@ -87,4 +88,20 @@ def test_ed25519():
pk = protocol.PublicKey(curve_name=formats.CURVE_ED25519,
created=42, verifying_key=vk)
assert repr(pk) == 'GPG public key ed25519/36B40FE6'
assert pk.keygrip == b'\xbf\x01\x90l\x17\xb64\xa3-\xf4\xc0gr\x99\x18<\xddBQ?'
assert pk.keygrip() == b'\xbf\x01\x90l\x17\xb64\xa3-\xf4\xc0gr\x99\x18<\xddBQ?'
def test_curve25519():
sk = ed25519.SigningKey(b'\x00' * 32)
vk = sk.get_verifying_key()
pk = protocol.PublicKey(curve_name=formats.ECDH_CURVE25519,
created=42, verifying_key=vk)
assert repr(pk) == 'GPG public key curve25519/69460384'
assert pk.keygrip() == b'x\xd6\x86\xe4\xa6\xfc;\x0fY\xe1}Lw\xc4\x9ed\xf1Q\x8a\x00'
def test_get_curve_name_by_oid():
for name, info in protocol.SUPPORTED_CURVES.items():
assert protocol.get_curve_name_by_oid(info['oid']) == name
with pytest.raises(KeyError):
protocol.get_curve_name_by_oid('BAD_OID')

View File

@@ -62,7 +62,9 @@ def failure():
def _legacy_pubs(buf):
"""SSH v1 public keys are not supported."""
assert not buf.read()
leftover = buf.read()
if leftover:
log.warning('skipping leftover: %r', leftover)
code = util.pack('B', msg_code('SSH_AGENT_RSA_IDENTITIES_ANSWER'))
num = util.pack('L', 0) # no SSH v1 keys
return util.frame(code, num)
@@ -71,14 +73,13 @@ def _legacy_pubs(buf):
class Handler(object):
"""ssh-agent protocol handler."""
def __init__(self, keys, signer, debug=False):
def __init__(self, conn, debug=False):
"""
Create a protocol handler with specified public keys.
Use specified signer function to sign SSH authentication requests.
"""
self.public_keys = keys
self.signer = signer
self.conn = conn
self.debug = debug
self.methods = {
@@ -107,7 +108,7 @@ class Handler(object):
def list_pubs(self, buf):
"""SSH v2 public keys are serialized and returned."""
assert not buf.read()
keys = self.public_keys
keys = self.conn.parse_public_keys()
code = util.pack('B', msg_code('SSH2_AGENT_IDENTITIES_ANSWER'))
num = util.pack('L', len(keys))
log.debug('available keys: %s', [k['name'] for k in keys])
@@ -129,7 +130,7 @@ class Handler(object):
assert util.read_frame(buf) == b''
assert not buf.read()
for k in self.public_keys:
for k in self.conn.parse_public_keys():
if (k['fingerprint']) == (key['fingerprint']):
log.debug('using key %r (%s)', k['name'], k['fingerprint'])
key = k
@@ -140,7 +141,7 @@ class Handler(object):
label = key['name'].decode('ascii') # label should be a string
log.debug('signing %d-byte blob with "%s" key', len(blob), label)
try:
signature = self.signer(blob=blob, identity=key['identity'])
signature = self.conn.sign(blob=blob, identity=key['identity'])
except IOError:
return failure()
log.debug('signature: %r', signature)

View File

@@ -31,7 +31,7 @@ def unix_domain_socket_server(sock_path):
Listen on it, and delete it after the generated context is over.
"""
log.debug('serving on SSH_AUTH_SOCK=%s', sock_path)
log.debug('serving on %s', sock_path)
remove_file(sock_path)
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)

View File

@@ -7,7 +7,7 @@ import re
import subprocess
import sys
from . import client, device, formats, protocol, server, util
from .. import client, device, formats, protocol, server, util
log = logging.getLogger(__name__)
@@ -22,7 +22,22 @@ def ssh_args(label):
if 'user' in identity:
args += ['-l', identity['user']]
return ['ssh'] + args + [identity['host']]
return args + [identity['host']]
def mosh_args(label):
"""Create SSH command for connecting specified server."""
identity = device.interface.string_to_identity(label)
args = []
if 'port' in identity:
args += ['-p', identity['port']]
if 'user' in identity:
args += [identity['user']+'@'+identity['host']]
else:
args += [identity['host']]
return args
def create_parser():
@@ -52,6 +67,8 @@ def create_agent_parser():
help='run ${SHELL} as subprocess under SSH agent')
g.add_argument('-c', '--connect', default=False, action='store_true',
help='connect to specified host via SSH')
g.add_argument('--mosh', default=False, action='store_true',
help='connect to specified host via using Mosh')
p.add_argument('identity', type=str, default=None,
help='proto://[user@]host[:port][/path]')
@@ -93,12 +110,10 @@ def git_host(remote_name, attributes):
return '{user}@{host}'.format(**match.groupdict())
def run_server(conn, public_keys, command, debug, timeout):
def run_server(conn, command, debug, timeout):
"""Common code for run_agent and run_git below."""
try:
signer = conn.sign_ssh_challenge
handler = protocol.Handler(keys=public_keys, signer=signer,
debug=debug)
handler = protocol.Handler(conn=conn, debug=debug)
with server.serve(handler=handler, timeout=timeout) as env:
return server.run_process(command=command, environ=env)
except KeyboardInterrupt:
@@ -112,7 +127,7 @@ def handle_connection_error(func):
try:
return func(*args, **kwargs)
except IOError as e:
log.error('Connection error: %s', e)
log.error('Connection error (try unplugging and replugging your device): %s', e)
return 1
return wrapper
@@ -125,13 +140,40 @@ def parse_config(fname):
curve_name=curve_name)
class JustInTimeConnection(object):
"""Connect to the device just before the needed operation."""
def __init__(self, conn_factory, identities):
"""Create a JIT connection object."""
self.conn_factory = conn_factory
self.identities = identities
self.public_keys = util.memoize(self._public_keys) # a simple cache
def _public_keys(self):
"""Return a list of SSH public keys (in textual format)."""
conn = self.conn_factory()
return conn.export_public_keys(self.identities)
def parse_public_keys(self):
"""Parse SSH public keys into dictionaries."""
public_keys = [formats.import_public_key(pk)
for pk in self.public_keys()]
for pk, identity in zip(public_keys, self.identities):
pk['identity'] = identity
return public_keys
def sign(self, blob, identity):
"""Sign a given blob using the specified identity on the device."""
conn = self.conn_factory()
return conn.sign_ssh_challenge(blob=blob, identity=identity)
@handle_connection_error
def run_agent(client_factory=client.Client):
def main(device_type):
"""Run ssh-agent using given hardware client factory."""
args = create_agent_parser().parse_args()
util.setup_logging(verbosity=args.verbose)
conn = client_factory(device=device.detect())
if args.identity.startswith('/'):
identities = list(parse_config(fname=args.identity))
else:
@@ -141,26 +183,23 @@ def run_agent(client_factory=client.Client):
identity.identity_dict['proto'] = 'ssh'
log.info('identity #%d: %s', index, identity)
command = args.command
public_keys = [conn.get_public_key(i) for i in identities]
if args.connect:
command = ssh_args(args.identity) + args.command
log.debug('SSH connect: %r', command)
command = ['ssh'] + ssh_args(args.identity) + args.command
elif args.mosh:
command = ['mosh'] + mosh_args(args.identity) + args.command
else:
command = args.command
use_shell = bool(args.shell)
if use_shell:
command = os.environ['SHELL']
log.debug('using shell: %r', command)
if not command:
for pk in public_keys:
conn = JustInTimeConnection(
conn_factory=lambda: client.Client(device_type()),
identities=identities)
if command:
return run_server(conn=conn, command=command, debug=args.debug,
timeout=args.timeout)
else:
for pk in conn.public_keys():
sys.stdout.write(pk)
return
public_keys = [formats.import_public_key(pk) for pk in public_keys]
for pk, identity in zip(public_keys, identities):
pk['identity'] = identity
return run_server(conn=conn, public_keys=public_keys, command=command,
debug=args.debug, timeout=args.timeout)

View File

@@ -20,9 +20,6 @@ class MockDevice(device.interface.Device): # pylint: disable=abstract-method
def connect(self): # pylint: disable=no-self-use
return mock.Mock()
def close(self):
self.conn = None
def pubkey(self, identity, ecdh=False): # pylint: disable=unused-argument
assert self.conn
return PUBKEY
@@ -34,16 +31,6 @@ class MockDevice(device.interface.Device): # pylint: disable=abstract-method
return SIG
def identity_type(**kwargs):
result = mock.Mock(spec=[])
result.index = 0
result.proto = result.user = result.host = result.port = None
result.path = None
for k, v in kwargs.items():
setattr(result, k, v)
return result
BLOB = (b'\x00\x00\x00 \xce\xe0\xc9\xd5\xceu/\xe8\xc5\xf2\xbfR+x\xa1\xcf\xb0'
b'\x8e;R\xd3)m\x96\x1b\xb4\xd8s\xf1\x99\x16\xaa2\x00\x00\x00\x05roman'
b'\x00\x00\x00\x0essh-connection\x00\x00\x00\tpublickey'
@@ -62,7 +49,7 @@ def test_ssh_agent():
identity = device.interface.Identity(identity_str='localhost:22',
curve_name=CURVE)
c = client.Client(device=MockDevice())
assert c.get_public_key(identity) == PUBKEY_TEXT
assert c.export_public_keys([identity]) == [PUBKEY_TEXT]
signature = c.sign_ssh_challenge(blob=BLOB, identity=identity)
key = formats.import_public_key(PUBKEY_TEXT)

View File

@@ -93,3 +93,11 @@ def test_curve_mismatch():
def test_serialize_error():
with pytest.raises(TypeError):
formats.serialize_verifying_key(None)
def test_get_ecdh_curve_name():
for c in [formats.CURVE_NIST256, formats.ECDH_CURVE25519]:
assert c == formats.get_ecdh_curve_name(c)
assert (formats.ECDH_CURVE25519 ==
formats.get_ecdh_curve_name(formats.CURVE_ED25519))

View File

@@ -1,3 +1,4 @@
import mock
import pytest
from .. import device, formats, protocol
@@ -15,16 +16,30 @@ NIST256_SIGN_MSG = b'\r\x00\x00\x00h\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\
NIST256_SIGN_REPLY = b'\x00\x00\x00j\x0e\x00\x00\x00e\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00J\x00\x00\x00!\x00\x88G!\x0c\n\x16:\xbeF\xbe\xb9\xd2\xa9&e\x89\xad\xc4}\x10\xf8\xbc\xdc\xef\x0e\x8d_\x8a6.\xb6\x1f\x00\x00\x00!\x00q\xf0\x16>,\x9a\xde\xe7(\xd6\xd7\x93\x1f\xed\xf9\x94ddw\xfe\xbdq\x13\xbb\xfc\xa9K\xea\x9dC\xa1\xe9' # nopep8
def fake_connection(keys, signer):
c = mock.Mock()
c.parse_public_keys.return_value = keys
c.sign = signer
return c
def test_list():
key = formats.import_public_key(NIST256_KEY)
key['identity'] = device.interface.Identity('ssh://localhost', 'nist256p1')
h = protocol.Handler(keys=[key], signer=None)
h = protocol.Handler(fake_connection(keys=[key], signer=None))
reply = h.handle(LIST_MSG)
assert reply == LIST_NIST256_REPLY
def test_list_legacy_pubs_with_suffix():
h = protocol.Handler(fake_connection(keys=[], signer=None))
suffix = b'\x00\x00\x00\x06foobar'
reply = h.handle(b'\x01' + suffix)
assert reply == b'\x00\x00\x00\x05\x02\x00\x00\x00\x00' # no legacy keys
def test_unsupported():
h = protocol.Handler(keys=[], signer=None)
h = protocol.Handler(fake_connection(keys=[], signer=None))
reply = h.handle(b'\x09')
assert reply == b'\x00\x00\x00\x01\x05'
@@ -38,13 +53,13 @@ def ecdsa_signer(identity, blob):
def test_ecdsa_sign():
key = formats.import_public_key(NIST256_KEY)
key['identity'] = device.interface.Identity('ssh://localhost', 'nist256p1')
h = protocol.Handler(keys=[key], signer=ecdsa_signer)
h = protocol.Handler(fake_connection(keys=[key], signer=ecdsa_signer))
reply = h.handle(NIST256_SIGN_MSG)
assert reply == NIST256_SIGN_REPLY
def test_sign_missing():
h = protocol.Handler(keys=[], signer=ecdsa_signer)
h = protocol.Handler(fake_connection(keys=[], signer=ecdsa_signer))
with pytest.raises(KeyError):
h.handle(NIST256_SIGN_MSG)
@@ -57,7 +72,7 @@ def test_sign_wrong():
key = formats.import_public_key(NIST256_KEY)
key['identity'] = device.interface.Identity('ssh://localhost', 'nist256p1')
h = protocol.Handler(keys=[key], signer=wrong_signature)
h = protocol.Handler(fake_connection(keys=[key], signer=wrong_signature))
with pytest.raises(ValueError):
h.handle(NIST256_SIGN_MSG)
@@ -68,7 +83,7 @@ def test_sign_cancel():
key = formats.import_public_key(NIST256_KEY)
key['identity'] = device.interface.Identity('ssh://localhost', 'nist256p1')
h = protocol.Handler(keys=[key], signer=cancel_signature)
h = protocol.Handler(fake_connection(keys=[key], signer=cancel_signature))
assert h.handle(NIST256_SIGN_MSG) == protocol.failure()
@@ -89,6 +104,6 @@ def ed25519_signer(identity, blob):
def test_ed25519_sign():
key = formats.import_public_key(ED25519_KEY)
key['identity'] = device.interface.Identity('ssh://localhost', 'ed25519')
h = protocol.Handler(keys=[key], signer=ed25519_signer)
h = protocol.Handler(fake_connection(keys=[key], signer=ed25519_signer))
reply = h.handle(ED25519_SIGN_MSG)
assert reply == ED25519_SIGN_REPLY

View File

@@ -37,10 +37,16 @@ class FakeSocket(object):
pass
def empty_device():
c = mock.Mock(spec=['parse_public_keys'])
c.parse_public_keys.return_value = []
return c
def test_handle():
mutex = threading.Lock()
handler = protocol.Handler(keys=[], signer=None)
handler = protocol.Handler(conn=empty_device())
conn = FakeSocket()
server.handle_connection(conn, handler, mutex)
@@ -67,7 +73,6 @@ def test_handle():
def test_server_thread():
connections = [FakeSocket()]
quit_event = threading.Event()
@@ -81,8 +86,10 @@ def test_server_thread():
def getsockname(self): # pylint: disable=no-self-use
return 'fake_server'
handler = protocol.Handler(keys=[], signer=None),
handle_conn = functools.partial(server.handle_connection, handler=handler)
handler = protocol.Handler(conn=empty_device()),
handle_conn = functools.partial(server.handle_connection,
handler=handler,
mutex=None)
server.server_thread(sock=FakeServer(),
handle_conn=handle_conn,
quit_event=quit_event)
@@ -111,7 +118,7 @@ def test_run():
def test_serve_main():
handler = protocol.Handler(keys=[], signer=None)
handler = protocol.Handler(conn=empty_device())
with server.serve(handler=handler, sock_path=None):
pass

View File

@@ -1,5 +1,6 @@
import io
import mock
import pytest
from .. import util
@@ -97,3 +98,20 @@ def test_reader():
with pytest.raises(EOFError):
r.read(1)
def test_setup_logging():
util.setup_logging(verbosity=10)
def test_memoize():
f = mock.Mock(side_effect=lambda x: x)
def func(x):
# mock.Mock doesn't work with functools.wraps()
return f(x)
g = util.memoize(func)
assert g(1) == g(1)
assert g(1) != g(2)
assert f.mock_calls == [mock.call(1), mock.call(2)]

View File

@@ -1,6 +1,7 @@
"""Various I/O and serialization utilities."""
import binascii
import contextlib
import functools
import io
import logging
import struct
@@ -185,3 +186,21 @@ def setup_logging(verbosity, **kwargs):
levels = [logging.WARNING, logging.INFO, logging.DEBUG]
level = levels[min(verbosity, len(levels) - 1)]
logging.basicConfig(format=fmt, level=level, **kwargs)
def memoize(func):
"""Simple caching decorator."""
cache = {}
@functools.wraps(func)
def wrapper(*args, **kwargs):
"""Caching wrapper."""
key = (args, tuple(sorted(kwargs.items())))
if key in cache:
return cache[key]
else:
result = func(*args, **kwargs)
cache[key] = result
return result
return wrapper

View File

@@ -1,32 +1,55 @@
#!/bin/bash
set -eu
gpg2 --version >/dev/null # verify that GnuPG 2 is installed
USER_ID="${1}"
HOMEDIR=~/.gnupg/trezor
DEVICE=${DEVICE:="trezor"} # or "ledger"
CURVE=${CURVE:="nist256p1"} # or "ed25519"
TIMESTAMP=${TIMESTAMP:=`date +%s`} # key creation timestamp
HOMEDIR=~/.gnupg/${DEVICE}
# Prepare new GPG home directory for TREZOR-based identity
# Prepare new GPG home directory for hardware-based identity
rm -rf "${HOMEDIR}"
mkdir -p "${HOMEDIR}"
chmod 700 "${HOMEDIR}"
# Generate new GPG identity and import into GPG keyring
trezor-gpg-create -v "${USER_ID}" -t "${TIMESTAMP}" -e "${CURVE}" > "${HOMEDIR}/pubkey.asc"
gpg2 --homedir "${HOMEDIR}" --import < "${HOMEDIR}/pubkey.asc"
$DEVICE-gpg create -v "${USER_ID}" -t "${TIMESTAMP}" -e "${CURVE}" > "${HOMEDIR}/pubkey.asc"
gpg2 --homedir "${HOMEDIR}" --import < "${HOMEDIR}/pubkey.asc" 2> /dev/null
rm -f "${HOMEDIR}/S.gpg-agent" # (otherwise, our agent won't be started automatically)
# Make new GPG identity with "ultimate" trust (via its fingerprint)
FINGERPRINT=$(gpg2 --homedir "${HOMEDIR}" --list-public-keys --with-colons | sed -n -E 's/^fpr:::::::::([0-9A-F]+):$/\1/p' | head -n1)
echo "${FINGERPRINT}:6" | gpg2 --homedir "${HOMEDIR}" --import-ownertrust
FINGERPRINT=$(gpg2 --homedir "${HOMEDIR}" --list-public-keys --with-fingerprint --with-colons | sed -n -E 's/^fpr:::::::::([0-9A-F]+):$/\1/p' | head -n1)
echo "${FINGERPRINT}:6" | gpg2 --homedir "${HOMEDIR}" --import-ownertrust 2> /dev/null
AGENT_PATH="$(which ${DEVICE}-gpg-agent)"
# Prepare GPG configuration file
echo "# TREZOR-based GPG configuration
agent-program $(which trezor-gpg-agent)
echo "# Hardware-based GPG configuration
agent-program ${AGENT_PATH}
personal-digest-preferences SHA512
" | tee "${HOMEDIR}/gpg.conf"
" > "${HOMEDIR}/gpg.conf"
echo "# TREZOR-based GPG agent emulator
# Prepare GPG agent configuration file
echo "# Hardware-based GPG agent emulator
log-file ${HOMEDIR}/gpg-agent.log
verbosity 2
" | tee "${HOMEDIR}/gpg-agent.conf"
" > "${HOMEDIR}/gpg-agent.conf"
# Prepare a helper script for setting up the new identity
echo "#!/bin/bash
set -eu
export GNUPGHOME=${HOMEDIR}
COMMAND=\$*
if [ -z \"\${COMMAND}\" ]
then
\${SHELL}
else
\${COMMAND}
fi
" > "${HOMEDIR}/env"
chmod u+x "${HOMEDIR}/env"
# Load agent and make sure it responds with the new identity
GNUPGHOME="$HOMEDIR" gpg2 -K 2> /dev/null

View File

@@ -1,16 +0,0 @@
#!/bin/bash
set -eu
export GNUPGHOME=~/.gnupg/trezor
# Make sure that the device is unlocked before starting the shell
trezor-gpg-unlock
COMMAND=$*
if [ -z "${COMMAND}" ]
then
gpg2 --list-public-keys
${SHELL}
else
${COMMAND}
fi

17
setup.py Normal file → Executable file
View File

@@ -2,15 +2,14 @@
from setuptools import setup
setup(
name='trezor_agent',
version='0.8.0',
description='Using Trezor as hardware SSH agent',
name='libagent',
version='0.9.0',
description='Using hardware wallets as SSH/GPG agent',
author='Roman Zeyde',
author_email='roman.zeyde@gmail.com',
url='http://github.com/romanz/trezor-agent',
packages=['trezor_agent', 'trezor_agent.device', 'trezor_agent.gpg'],
install_requires=['ecdsa>=0.13', 'ed25519>=1.4', 'Cython>=0.23.4', 'protobuf>=3.0.0', 'semver>=2.2',
'trezor>=0.7.6', 'keepkey>=0.7.3', 'ledgerblue>=0.1.8'],
packages=['libagent', 'libagent.device', 'libagent.gpg', 'libagent.ssh'],
install_requires=['ecdsa>=0.13', 'ed25519>=1.4', 'semver>=2.2'],
platforms=['POSIX'],
classifiers=[
'Environment :: Console',
@@ -29,10 +28,4 @@ setup(
'Topic :: Security',
'Topic :: Utilities',
],
entry_points={'console_scripts': [
'trezor-agent = trezor_agent.__main__:run_agent',
'trezor-gpg-create = trezor_agent.gpg.__main__:main_create',
'trezor-gpg-agent = trezor_agent.gpg.__main__:main_agent',
'trezor-gpg-unlock = trezor_agent.gpg.__main__:auto_unlock',
]},
)

12
tox.ini
View File

@@ -2,6 +2,8 @@
envlist = py27,py3
[pep8]
max-line-length = 100
[pep257]
add-ignore = D401
[testenv]
deps=
pytest
@@ -11,10 +13,12 @@ deps=
pylint
semver
pydocstyle
isort
commands=
pep8 trezor_agent
pylint --reports=no --rcfile .pylintrc trezor_agent
pydocstyle trezor_agent
coverage run --omit='trezor_agent/__main__.py' --source trezor_agent -m py.test -v trezor_agent
pep8 libagent
isort --skip-glob .tox -c -r libagent
pylint --reports=no --rcfile .pylintrc libagent
pydocstyle libagent
coverage run --source libagent -m py.test -v libagent
coverage report
coverage html

View File

@@ -1,27 +0,0 @@
"""Cryptographic hardware device management."""
import logging
from . import trezor
from . import keepkey
from . import ledger
from . import interface
log = logging.getLogger(__name__)
DEVICE_TYPES = [
trezor.Trezor,
keepkey.KeepKey,
ledger.LedgerNanoS,
]
def detect():
"""Detect the first available device and return it to the user."""
for device_type in DEVICE_TYPES:
try:
with device_type() as d:
return d
except interface.NotFoundError as e:
log.debug('device not found: %s', e)
raise IOError('No device found!')

View File

@@ -1,9 +0,0 @@
"""
TREZOR support for ECDSA GPG signatures.
See these links for more details:
- https://www.gnupg.org/faq/whats-new-in-2.1.html
- https://tools.ietf.org/html/rfc4880
- https://tools.ietf.org/html/rfc6637
- https://tools.ietf.org/html/draft-irtf-cfrg-eddsa-05
"""