mirror of
https://github.com/romanz/amodem.git
synced 2026-05-03 08:27:26 +08:00
Compare commits
63 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7d9b3ff1d0 | ||
|
|
4af881b3cb | ||
|
|
eb525e1b62 | ||
|
|
02c8e729b7 | ||
|
|
12359938ad | ||
|
|
93cd3e688b | ||
|
|
26d7dd3124 | ||
|
|
0d5c3a9ca7 | ||
|
|
97ec6b2719 | ||
|
|
8ba9be1780 | ||
|
|
b2bc87c0c7 | ||
|
|
d522d148ef | ||
|
|
c796a3b01d | ||
|
|
a3362bbf3e | ||
|
|
9a271d115b | ||
|
|
6a7165298f | ||
|
|
c4f3fa6e04 | ||
|
|
8a77fa519f | ||
|
|
59560ec0b0 | ||
|
|
7a91196dd5 | ||
|
|
43c424a402 | ||
|
|
6672ea9bc4 | ||
|
|
002dc2a0e0 | ||
|
|
61ced2808f | ||
|
|
71a8930021 | ||
|
|
74e8f21a22 | ||
|
|
897236d556 | ||
|
|
5bec0e8382 | ||
|
|
3cb7f6fd21 | ||
|
|
cad2ec1239 | ||
|
|
604b2b7e99 | ||
|
|
159bd79b5f | ||
|
|
dde0b60e83 | ||
|
|
109bb3b47f | ||
|
|
0f20bfa239 | ||
|
|
798597c436 | ||
|
|
a13b1103f7 | ||
|
|
9fe1a235c1 | ||
|
|
f86aae9a40 | ||
|
|
fc070e3ca0 | ||
|
|
05fac995eb | ||
|
|
188b74b327 | ||
|
|
fc31847f8e | ||
|
|
0faf21a102 | ||
|
|
6b82f8b9b7 | ||
|
|
fabfcaaae2 | ||
|
|
f0f89310ac | ||
|
|
47ff7c5cb3 | ||
|
|
0440025083 | ||
|
|
c49fe97f63 | ||
|
|
7f8abcb5c5 | ||
|
|
e13039e52d | ||
|
|
c420571eb8 | ||
|
|
827119a18d | ||
|
|
9be6504658 | ||
|
|
07cbe65875 | ||
|
|
180120e787 | ||
|
|
f4ce81fa94 | ||
|
|
176bf4ef7c | ||
|
|
d22cd7512d | ||
|
|
83f17704cb | ||
|
|
92f6751ccb | ||
|
|
abe80533eb |
@@ -1,2 +1,2 @@
|
||||
[MESSAGES CONTROL]
|
||||
disable=invalid-name, missing-docstring, locally-disabled, unbalanced-tuple-unpacking
|
||||
disable=invalid-name, missing-docstring, locally-disabled, unbalanced-tuple-unpacking,no-else-return
|
||||
|
||||
16
.travis.yml
16
.travis.yml
@@ -4,29 +4,23 @@ python:
|
||||
- "2.7"
|
||||
- "3.4"
|
||||
- "3.5"
|
||||
- "3.6"
|
||||
|
||||
cache:
|
||||
directories:
|
||||
- $HOME/.cache/pip
|
||||
|
||||
addons:
|
||||
apt:
|
||||
packages:
|
||||
- libudev-dev
|
||||
- libusb-1.0-0-dev
|
||||
|
||||
before_install:
|
||||
- pip install -U setuptools pylint coverage pep8 pydocstyle "pip>=7.0" wheel
|
||||
- pip install -e git+https://github.com/keepkey/python-keepkey@6e8baa8b935e830d05f87b6dfd9bc7c927a96dc3#egg=keepkey
|
||||
|
||||
install:
|
||||
- pip install -e .
|
||||
|
||||
script:
|
||||
- pep8 trezor_agent
|
||||
- pylint --reports=no --rcfile .pylintrc trezor_agent
|
||||
- pydocstyle trezor_agent
|
||||
- coverage run --source trezor_agent/ -m py.test -v
|
||||
- pep8 libagent
|
||||
- pylint --reports=no --rcfile .pylintrc libagent
|
||||
- pydocstyle libagent
|
||||
- coverage run --source libagent/ -m py.test -v
|
||||
|
||||
after_success:
|
||||
- coverage report
|
||||
|
||||
@@ -4,13 +4,18 @@ Thanks!
|
||||
|
||||
# Installation
|
||||
|
||||
First, verify that you have GPG 2.1+ [installed](https://gist.github.com/vt0r/a2f8c0bcb1400131ff51):
|
||||
First, verify that you have GPG 2.1.11+ installed
|
||||
([Debian](https://gist.github.com/vt0r/a2f8c0bcb1400131ff51),
|
||||
[macOS](https://sourceforge.net/p/gpgosx/docu/Download/)):
|
||||
|
||||
```
|
||||
$ gpg2 --version | head -n1
|
||||
gpg (GnuPG) 2.1.15
|
||||
```
|
||||
|
||||
This GPG version is included in [Ubuntu 16.04](https://launchpad.net/ubuntu/+source/gnupg2)
|
||||
and [Linux Mint 18](https://community.linuxmint.com/software/view/gnupg2).
|
||||
|
||||
Update you TREZOR firmware to the latest version (at least v1.4.0).
|
||||
|
||||
Install latest `trezor-agent` package from GitHub:
|
||||
@@ -26,12 +31,51 @@ $ pip install --user git+https://github.com/romanz/trezor-agent.git
|
||||
## Sample usage (signature and decryption)
|
||||
[](https://asciinema.org/a/7x0h9tyoyu5ar6jc8y9oih0ba)
|
||||
|
||||
You can use GNU Privacy Assistant (GPA) in order to inspect the created keys
|
||||
and perform signature and decryption operations using:
|
||||
|
||||
```
|
||||
$ sudo apt install gpa
|
||||
$ ./scripts/gpg-shell gpa
|
||||
```
|
||||
[](https://www.gnupg.org/related_software/swlist.html#gpa)
|
||||
|
||||
## Git commit & tag signatures:
|
||||
Git can use GPG to sign and verify commits and tags (see [here](https://git-scm.com/book/en/v2/Git-Tools-Signing-Your-Work)):
|
||||
```
|
||||
$ git config --local gpg.program $(which gpg2)
|
||||
$ git commit --gpg-sign # create GPG-signed commit
|
||||
$ git log --show-signature -1 # verify commit signature
|
||||
$ git tag --sign "v1.2.3" # create GPG-signed tag
|
||||
$ git verify-tag "v1.2.3" # verify tag signature
|
||||
$ git tag v1.2.3 --sign # create GPG-signed tag
|
||||
$ git tag v1.2.3 --verify # verify tag signature
|
||||
```
|
||||
|
||||
## Password manager
|
||||
|
||||
First install `pass` from [passwordstore.org](https://www.passwordstore.org/) and initialize it to use your TREZOR-based GPG identity:
|
||||
```
|
||||
$ ./scripts/gpg-shell
|
||||
$ pass init "Roman Zeyde <roman.zeyde@gmail.com>"
|
||||
Password store initialized for Roman Zeyde <roman.zeyde@gmail.com>
|
||||
```
|
||||
Then, you can generate truly random passwords and save them encrypted using your public key (as separate `.gpg` files under `~/.password-store/`):
|
||||
```
|
||||
$ pass generate Dev/github 32
|
||||
$ pass generate Social/hackernews 32
|
||||
$ pass generate Social/twitter 32
|
||||
$ pass generate VPS/linode 32
|
||||
$ pass
|
||||
Password Store
|
||||
├── Dev
|
||||
│ └── github
|
||||
├── Social
|
||||
│ ├── hackernews
|
||||
│ └── twitter
|
||||
└── VPS
|
||||
└── linode
|
||||
```
|
||||
In order to paste them into the browser, you'd need to decrypt the password using your hardware device:
|
||||
```
|
||||
$ pass --clip VPS/linode
|
||||
Copied VPS/linode to clipboard. Will clear in 45 seconds.
|
||||
```
|
||||
|
||||
@@ -9,6 +9,9 @@
|
||||
## Using for GitHub SSH authentication (via `trezor-git` utility)
|
||||
[](https://asciinema.org/a/38337)
|
||||
|
||||
## Loading multiple SSH identities from configuration file
|
||||
[](https://asciinema.org/a/bdxxtgctk5syu56yfz8lcp7ny)
|
||||
|
||||
# Public key generation
|
||||
|
||||
Run:
|
||||
@@ -44,7 +47,7 @@ Run:
|
||||
|
||||
Make sure to confirm SSH signature on the Trezor device when requested.
|
||||
|
||||
## Accessing remote Git repositories
|
||||
## Accessing remote Git/Mercurial repositories
|
||||
|
||||
Use your SSH public key to access your remote repository (e.g. [GitHub](https://help.github.com/articles/adding-a-new-ssh-key-to-your-github-account/)):
|
||||
|
||||
@@ -58,6 +61,10 @@ Replace `git` with `git_hub` for remote operations:
|
||||
|
||||
$ git_hub push origin master
|
||||
|
||||
The same works for Mercurial (e.g. on [BitBucket](https://confluence.atlassian.com/bitbucket/set-up-ssh-for-mercurial-728138122.html)):
|
||||
|
||||
$ trezor-agent -v -e ed25519 git@bitbucket.org -- hg push
|
||||
|
||||
|
||||
# Troubleshooting
|
||||
|
||||
@@ -69,10 +76,10 @@ with a verbose log attached (by running `trezor-agent -vv`) .
|
||||
Note that your local SSH configuration may ignore `trezor-agent`, if it has `IdentitiesOnly` option set to `yes`.
|
||||
|
||||
IdentitiesOnly
|
||||
Specifies that ssh(1) should only use the authentication identity files configured in
|
||||
the ssh_config files, even if ssh-agent(1) or a PKCS11Provider offers more identities.
|
||||
Specifies that ssh(1) should only use the authentication identity files configured in
|
||||
the ssh_config files, even if ssh-agent(1) or a PKCS11Provider offers more identities.
|
||||
The argument to this keyword must be “yes” or “no”.
|
||||
This option is intended for situations where ssh-agent offers many different identities.
|
||||
This option is intended for situations where ssh-agent offers many different identities.
|
||||
The default is “no”.
|
||||
|
||||
If you are failing to connect, try running:
|
||||
|
||||
54
README.md
54
README.md
@@ -10,34 +10,64 @@ See SatoshiLabs' blog posts about this feature:
|
||||
|
||||
- [TREZOR Firmware 1.3.4 enables SSH login](https://medium.com/@satoshilabs/trezor-firmware-1-3-4-enables-ssh-login-86a622d7e609)
|
||||
- [TREZOR Firmware 1.3.6 — GPG Signing, SSH Login Updates and Advanced Transaction Features for Segwit](https://medium.com/@satoshilabs/trezor-firmware-1-3-6-20a7df6e692)
|
||||
- [TREZOR Firmware 1.4.0 — GPG decryption support](https://www.reddit.com/r/TREZOR/comments/50h8r9/new_trezor_firmware_fidou2f_and_initial_ethereum/d7420q7/)
|
||||
|
||||
## Installation
|
||||
|
||||
First, make sure that the latest [trezorlib](https://pypi.python.org/pypi/trezor) Python package
|
||||
is installed correctly (at least v0.6.6):
|
||||
Install the following packages:
|
||||
|
||||
$ apt-get install python-dev libusb-1.0-0-dev libudev-dev
|
||||
$ pip install -U setuptools
|
||||
$ pip install Cython trezor
|
||||
$ pip install -U setuptools pip
|
||||
|
||||
Make sure you are running the latest firmware version on your hardware device.
|
||||
Currently the following firmware versions are supported:
|
||||
|
||||
* [TREZOR](https://wallet.trezor.io/data/firmware/releases.json): `1.4.2+`
|
||||
* [KeepKey](https://github.com/keepkey/keepkey-firmware/releases): `3.0.17+`
|
||||
* [Ledger Nano S](https://github.com/LedgerHQ/blue-app-ssh-agent): `0.0.3+`
|
||||
|
||||
### TREZOR
|
||||
|
||||
Make sure that your `udev` rules are configured [correctly](https://doc.satoshilabs.com/trezor-user/settingupchromeonlinux.html#manual-configuration-of-udev-rules).
|
||||
Then, install the latest [trezor_agent](https://pypi.python.org/pypi/trezor_agent) package:
|
||||
|
||||
$ pip install trezor_agent
|
||||
|
||||
Finally, verify that you are running the latest [TREZOR firmware](https://wallet.mytrezor.com/data/firmware/releases.json) version (at least v1.4.0):
|
||||
Or, directly from the latest source code:
|
||||
|
||||
$ trezorctl get_features | head
|
||||
vendor: "bitcointrezor.com"
|
||||
major_version: 1
|
||||
minor_version: 4
|
||||
patch_version: 0
|
||||
...
|
||||
$ git clone https://github.com/romanz/trezor-agent
|
||||
$ pip install --user -e trezor-agent/agents/trezor
|
||||
|
||||
If you have an error regarding `protobuf` imports (after installing it), please see [this issue](https://github.com/romanz/trezor-agent/issues/28).
|
||||
|
||||
### KeepKey
|
||||
|
||||
Make sure that your `udev` rules are configured [correctly](https://support.keepkey.com/support/solutions/articles/6000037796-keepkey-wallet-is-not-being-recognized-by-linux).
|
||||
Then, install the latest [keepkey_agent](https://pypi.python.org/pypi/keepkey_agent) package:
|
||||
|
||||
$ pip install keepkey_agent
|
||||
|
||||
Or, directly from the latest source code:
|
||||
|
||||
$ git clone https://github.com/romanz/trezor-agent
|
||||
$ pip install --user -e trezor-agent/agents/keepkey
|
||||
|
||||
### Ledger Nano S
|
||||
|
||||
Make sure that your `udev` rules are configured [correctly](http://support.ledgerwallet.com/knowledge_base/topics/ledger-wallet-is-not-recognized-on-linux).
|
||||
Then, install the latest [ledger_agent](https://pypi.python.org/pypi/ledger_agent) package:
|
||||
|
||||
$ pip install ledger_agent
|
||||
|
||||
Or, directly from the latest source code:
|
||||
|
||||
$ git clone https://github.com/romanz/trezor-agent
|
||||
$ pip install --user -e trezor-agent/agents/ledger
|
||||
|
||||
## Usage
|
||||
|
||||
For SSH, see the [following instructions](README-SSH.md).
|
||||
For SSH, see the [following instructions](README-SSH.md) (for Windows support,
|
||||
see [trezor-ssh-agent](https://github.com/martin-lizner/trezor-ssh-agent) project (by Martin Lízner)).
|
||||
|
||||
For GPG, see the [following instructions](README-GPG.md).
|
||||
|
||||
|
||||
5
agents/keepkey/keepkey_agent.py
Normal file
5
agents/keepkey/keepkey_agent.py
Normal file
@@ -0,0 +1,5 @@
|
||||
import libagent.gpg
|
||||
import libagent.ssh
|
||||
from libagent.device import keepkey
|
||||
|
||||
ssh_agent = lambda: libagent.ssh.main(keepkey.KeepKey)
|
||||
34
agents/keepkey/setup.py
Normal file
34
agents/keepkey/setup.py
Normal file
@@ -0,0 +1,34 @@
|
||||
#!/usr/bin/env python
|
||||
from setuptools import setup
|
||||
|
||||
setup(
|
||||
name='keepkey_agent',
|
||||
version='0.9.0',
|
||||
description='Using KeepKey as hardware SSH/GPG agent',
|
||||
author='Roman Zeyde',
|
||||
author_email='roman.zeyde@gmail.com',
|
||||
url='http://github.com/romanz/trezor-agent',
|
||||
scripts=['keepkey_agent.py'],
|
||||
install_requires=['libagent>=0.9.0', 'keepkey>=0.7.3'],
|
||||
platforms=['POSIX'],
|
||||
classifiers=[
|
||||
'Environment :: Console',
|
||||
'Development Status :: 4 - Beta',
|
||||
'Intended Audience :: Developers',
|
||||
'Intended Audience :: Information Technology',
|
||||
'Intended Audience :: System Administrators',
|
||||
'License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)',
|
||||
'Operating System :: POSIX',
|
||||
'Programming Language :: Python :: 2.7',
|
||||
'Programming Language :: Python :: 3.4',
|
||||
'Programming Language :: Python :: 3.5',
|
||||
'Topic :: Software Development :: Libraries :: Python Modules',
|
||||
'Topic :: System :: Networking',
|
||||
'Topic :: Communications',
|
||||
'Topic :: Security',
|
||||
'Topic :: Utilities',
|
||||
],
|
||||
entry_points={'console_scripts': [
|
||||
'keepkey-agent = keepkey_agent:ssh_agent',
|
||||
]},
|
||||
)
|
||||
7
agents/ledger/ledger_agent.py
Normal file
7
agents/ledger/ledger_agent.py
Normal file
@@ -0,0 +1,7 @@
|
||||
import libagent.gpg
|
||||
import libagent.ssh
|
||||
from libagent.device.ledger import LedgerNanoS as DeviceType
|
||||
|
||||
ssh_agent = lambda: libagent.ssh.main(DeviceType)
|
||||
gpg_tool = lambda: libagent.gpg.main(DeviceType)
|
||||
gpg_agent = lambda: libagent.gpg.run_agent(DeviceType)
|
||||
36
agents/ledger/setup.py
Normal file
36
agents/ledger/setup.py
Normal file
@@ -0,0 +1,36 @@
|
||||
#!/usr/bin/env python
|
||||
from setuptools import setup
|
||||
|
||||
setup(
|
||||
name='ledger_agent',
|
||||
version='0.9.0',
|
||||
description='Using Ledger as hardware SSH/GPG agent',
|
||||
author='Roman Zeyde',
|
||||
author_email='roman.zeyde@gmail.com',
|
||||
url='http://github.com/romanz/trezor-agent',
|
||||
scripts=['ledger_agent.py'],
|
||||
install_requires=['libagent>=0.9.0', 'ledgerblue>=0.1.8'],
|
||||
platforms=['POSIX'],
|
||||
classifiers=[
|
||||
'Environment :: Console',
|
||||
'Development Status :: 4 - Beta',
|
||||
'Intended Audience :: Developers',
|
||||
'Intended Audience :: Information Technology',
|
||||
'Intended Audience :: System Administrators',
|
||||
'License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)',
|
||||
'Operating System :: POSIX',
|
||||
'Programming Language :: Python :: 2.7',
|
||||
'Programming Language :: Python :: 3.4',
|
||||
'Programming Language :: Python :: 3.5',
|
||||
'Topic :: Software Development :: Libraries :: Python Modules',
|
||||
'Topic :: System :: Networking',
|
||||
'Topic :: Communications',
|
||||
'Topic :: Security',
|
||||
'Topic :: Utilities',
|
||||
],
|
||||
entry_points={'console_scripts': [
|
||||
'ledger-agent = ledger_agent:ssh_agent',
|
||||
'ledger-gpg = ledger_agent:gpg_tool',
|
||||
'ledger-gpg-agent = ledger_agent:gpg_agent',
|
||||
]},
|
||||
)
|
||||
36
agents/trezor/setup.py
Normal file
36
agents/trezor/setup.py
Normal file
@@ -0,0 +1,36 @@
|
||||
#!/usr/bin/env python
|
||||
from setuptools import setup
|
||||
|
||||
setup(
|
||||
name='trezor_agent',
|
||||
version='0.9.0',
|
||||
description='Using Trezor as hardware SSH/GPG agent',
|
||||
author='Roman Zeyde',
|
||||
author_email='roman.zeyde@gmail.com',
|
||||
url='http://github.com/romanz/trezor-agent',
|
||||
scripts=['trezor_agent.py'],
|
||||
install_requires=['libagent>=0.9.0', 'trezor>=0.7.6'],
|
||||
platforms=['POSIX'],
|
||||
classifiers=[
|
||||
'Environment :: Console',
|
||||
'Development Status :: 4 - Beta',
|
||||
'Intended Audience :: Developers',
|
||||
'Intended Audience :: Information Technology',
|
||||
'Intended Audience :: System Administrators',
|
||||
'License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)',
|
||||
'Operating System :: POSIX',
|
||||
'Programming Language :: Python :: 2.7',
|
||||
'Programming Language :: Python :: 3.4',
|
||||
'Programming Language :: Python :: 3.5',
|
||||
'Topic :: Software Development :: Libraries :: Python Modules',
|
||||
'Topic :: System :: Networking',
|
||||
'Topic :: Communications',
|
||||
'Topic :: Security',
|
||||
'Topic :: Utilities',
|
||||
],
|
||||
entry_points={'console_scripts': [
|
||||
'trezor-agent = trezor_agent:ssh_agent',
|
||||
'trezor-gpg = trezor_agent:gpg_tool',
|
||||
'trezor-gpg-agent = trezor_agent:gpg_agent',
|
||||
]},
|
||||
)
|
||||
7
agents/trezor/trezor_agent.py
Normal file
7
agents/trezor/trezor_agent.py
Normal file
@@ -0,0 +1,7 @@
|
||||
import libagent.gpg
|
||||
import libagent.ssh
|
||||
from libagent.device.trezor import Trezor as DeviceType
|
||||
|
||||
ssh_agent = lambda: libagent.ssh.main(DeviceType)
|
||||
gpg_tool = lambda: libagent.gpg.main(DeviceType)
|
||||
gpg_agent = lambda: libagent.gpg.run_agent(DeviceType)
|
||||
@@ -18,15 +18,17 @@ class Client(object):
|
||||
"""Connect to hardware device."""
|
||||
self.device = device
|
||||
|
||||
def get_public_key(self, identity):
|
||||
"""Get SSH public key from the device."""
|
||||
def export_public_keys(self, identities):
|
||||
"""Export SSH public keys from the device."""
|
||||
public_keys = []
|
||||
with self.device:
|
||||
pubkey = self.device.pubkey(identity)
|
||||
|
||||
vk = formats.decompress_pubkey(pubkey=pubkey,
|
||||
curve_name=identity.curve_name)
|
||||
return formats.export_public_key(vk=vk,
|
||||
label=str(identity))
|
||||
for i in identities:
|
||||
pubkey = self.device.pubkey(identity=i)
|
||||
vk = formats.decompress_pubkey(pubkey=pubkey,
|
||||
curve_name=i.curve_name)
|
||||
public_keys.append(formats.export_public_key(vk=vk,
|
||||
label=str(i)))
|
||||
return public_keys
|
||||
|
||||
def sign_ssh_challenge(self, blob, identity):
|
||||
"""Sign given blob using a private key on the device."""
|
||||
3
libagent/device/__init__.py
Normal file
3
libagent/device/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
"""Cryptographic hardware device management."""
|
||||
|
||||
from . import interface
|
||||
@@ -54,7 +54,7 @@ class NotFoundError(Error):
|
||||
|
||||
|
||||
class DeviceError(Error):
|
||||
""""Error during device operation."""
|
||||
"""Error during device operation."""
|
||||
|
||||
|
||||
class Identity(object):
|
||||
@@ -82,7 +82,7 @@ class Identity(object):
|
||||
s = io.BytesIO(bytearray(digest))
|
||||
|
||||
hardened = 0x80000000
|
||||
addr_0 = [13, 17][bool(ecdh)]
|
||||
addr_0 = 17 if bool(ecdh) else 13
|
||||
address_n = [addr_0] + list(util.recv(s, '<LLLL'))
|
||||
return [(hardened | value) for value in address_n]
|
||||
|
||||
@@ -20,7 +20,10 @@ def _verify_support(identity, ecdh):
|
||||
class KeepKey(trezor.Trezor):
|
||||
"""Connection to KeepKey device."""
|
||||
|
||||
from . import keepkey_defs as defs
|
||||
@property
|
||||
def _defs(self):
|
||||
from . import keepkey_defs
|
||||
return keepkey_defs
|
||||
|
||||
required_version = '>=1.0.4'
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
"""KeepKey-related definitions."""
|
||||
|
||||
# pylint: disable=unused-import
|
||||
# pylint: disable=unused-import,import-error
|
||||
|
||||
from keepkeylib.client import CallException as Error
|
||||
from keepkeylib.client import KeepKeyClient as Client
|
||||
from keepkeylib.client import CallException
|
||||
from keepkeylib.transport_hid import HidTransport
|
||||
from keepkeylib.messages_pb2 import PassphraseAck
|
||||
from keepkeylib.transport_hid import HidTransport as Transport
|
||||
from keepkeylib.types_pb2 import IdentityType
|
||||
@@ -4,7 +4,7 @@ import binascii
|
||||
import logging
|
||||
import struct
|
||||
|
||||
from ledgerblue import comm
|
||||
from ledgerblue import comm # pylint: disable=import-error
|
||||
|
||||
from . import interface
|
||||
|
||||
@@ -2,6 +2,8 @@
|
||||
|
||||
import binascii
|
||||
import logging
|
||||
import os
|
||||
|
||||
import semver
|
||||
|
||||
from . import interface
|
||||
@@ -12,20 +14,30 @@ log = logging.getLogger(__name__)
|
||||
class Trezor(interface.Device):
|
||||
"""Connection to TREZOR device."""
|
||||
|
||||
from . import trezor_defs as defs
|
||||
@property
|
||||
def _defs(self):
|
||||
from . import trezor_defs
|
||||
# Allow using TREZOR bridge transport (instead of the HID default)
|
||||
trezor_defs.Transport = {
|
||||
'bridge': trezor_defs.BridgeTransport,
|
||||
}.get(os.environ.get('TREZOR_TRANSPORT'), trezor_defs.HidTransport)
|
||||
return trezor_defs
|
||||
|
||||
required_version = '>=1.4.0'
|
||||
passphrase = os.environ.get('TREZOR_PASSPHRASE', '')
|
||||
|
||||
def connect(self):
|
||||
"""Enumerate and connect to the first USB HID interface."""
|
||||
def empty_passphrase_handler(_):
|
||||
return self.defs.PassphraseAck(passphrase='')
|
||||
def passphrase_handler(_):
|
||||
log.debug('using %s passphrase for %s',
|
||||
'non-empty' if self.passphrase else 'empty', self)
|
||||
return self._defs.PassphraseAck(passphrase=self.passphrase)
|
||||
|
||||
for d in self.defs.HidTransport.enumerate():
|
||||
for d in self._defs.Transport.enumerate():
|
||||
log.debug('endpoint: %s', d)
|
||||
transport = self.defs.HidTransport(d)
|
||||
connection = self.defs.Client(transport)
|
||||
connection.callback_PassphraseRequest = empty_passphrase_handler
|
||||
transport = self._defs.Transport(d)
|
||||
connection = self._defs.Client(transport)
|
||||
connection.callback_PassphraseRequest = passphrase_handler
|
||||
f = connection.features
|
||||
log.debug('connected to %s %s', self, f.device_id)
|
||||
log.debug('label : %s', f.label)
|
||||
@@ -60,7 +72,7 @@ class Trezor(interface.Device):
|
||||
return result.node.public_key
|
||||
|
||||
def _identity_proto(self, identity):
|
||||
result = self.defs.IdentityType()
|
||||
result = self._defs.IdentityType()
|
||||
for name, value in identity.items():
|
||||
setattr(result, name, value)
|
||||
return result
|
||||
@@ -80,7 +92,7 @@ class Trezor(interface.Device):
|
||||
assert len(result.signature) == 65
|
||||
assert result.signature[:1] == b'\x00'
|
||||
return result.signature[1:]
|
||||
except self.defs.CallException as e:
|
||||
except self._defs.Error as e:
|
||||
msg = '{} error: {}'.format(self, e)
|
||||
log.debug(msg, exc_info=True)
|
||||
raise interface.DeviceError(msg)
|
||||
@@ -99,7 +111,7 @@ class Trezor(interface.Device):
|
||||
assert len(result.session_key) in {65, 33} # NIST256 or Curve25519
|
||||
assert result.session_key[:1] == b'\x04'
|
||||
return result.session_key
|
||||
except self.defs.CallException as e:
|
||||
except self._defs.Error as e:
|
||||
msg = '{} error: {}'.format(self, e)
|
||||
log.debug(msg, exc_info=True)
|
||||
raise interface.DeviceError(msg)
|
||||
@@ -1,8 +1,10 @@
|
||||
"""TREZOR-related definitions."""
|
||||
|
||||
# pylint: disable=unused-import
|
||||
# pylint: disable=unused-import,import-error
|
||||
|
||||
from trezorlib.client import CallException as Error
|
||||
from trezorlib.client import TrezorClient as Client
|
||||
from trezorlib.client import CallException
|
||||
from trezorlib.transport_hid import HidTransport
|
||||
from trezorlib.messages_pb2 import PassphraseAck
|
||||
from trezorlib.transport_bridge import BridgeTransport
|
||||
from trezorlib.transport_hid import HidTransport
|
||||
from trezorlib.types_pb2 import IdentityType
|
||||
106
trezor_agent/gpg/__main__.py → libagent/gpg/__init__.py
Executable file → Normal file
106
trezor_agent/gpg/__main__.py → libagent/gpg/__init__.py
Executable file → Normal file
@@ -1,8 +1,15 @@
|
||||
#!/usr/bin/env python
|
||||
"""Create signatures and export public keys for GPG using TREZOR."""
|
||||
"""
|
||||
TREZOR support for ECDSA GPG signatures.
|
||||
|
||||
See these links for more details:
|
||||
- https://www.gnupg.org/faq/whats-new-in-2.1.html
|
||||
- https://tools.ietf.org/html/rfc4880
|
||||
- https://tools.ietf.org/html/rfc6637
|
||||
- https://tools.ietf.org/html/draft-irtf-cfrg-eddsa-05
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import contextlib
|
||||
import io
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
@@ -10,30 +17,23 @@ import time
|
||||
|
||||
import semver
|
||||
|
||||
from . import agent, decode, client, encode, keyring, protocol
|
||||
from . import agent, client, encode, keyring, protocol
|
||||
from .. import device, formats, server, util
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def key_exists(user_id):
|
||||
"""Return True iff there is a GPG key with specified user ID."""
|
||||
for p in decode.parse_packets(io.BytesIO(keyring.export_public_keys())):
|
||||
if p['type'] == 'user_id' and p['value'] == user_id:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def run_create(args):
|
||||
def export_public_key(device_type, args):
|
||||
"""Generate a new pubkey for a new/existing GPG identity."""
|
||||
log.warning('NOTE: in order to re-generate the exact same GPG key later, '
|
||||
'run this command with "--time=%d" commandline flag (to set '
|
||||
'the timestamp of the GPG key manually).', args.time)
|
||||
d = client.Client(user_id=args.user_id, curve_name=args.ecdsa_curve)
|
||||
verifying_key = d.pubkey(ecdh=False)
|
||||
decryption_key = d.pubkey(ecdh=True)
|
||||
c = client.Client(user_id=args.user_id, curve_name=args.ecdsa_curve,
|
||||
device_type=device_type)
|
||||
verifying_key = c.pubkey(ecdh=False)
|
||||
decryption_key = c.pubkey(ecdh=True)
|
||||
|
||||
if key_exists(args.user_id): # add as subkey
|
||||
if args.subkey: # add as subkey
|
||||
log.info('adding %s GPG subkey for "%s" to existing key',
|
||||
args.ecdsa_curve, args.user_id)
|
||||
# subkey for signing
|
||||
@@ -47,10 +47,10 @@ def run_create(args):
|
||||
primary_bytes = keyring.export_public_key(args.user_id)
|
||||
result = encode.create_subkey(primary_bytes=primary_bytes,
|
||||
subkey=signing_key,
|
||||
signer_func=d.sign)
|
||||
signer_func=c.sign)
|
||||
result = encode.create_subkey(primary_bytes=result,
|
||||
subkey=encryption_key,
|
||||
signer_func=d.sign)
|
||||
signer_func=c.sign)
|
||||
else: # add as primary
|
||||
log.info('creating new %s GPG primary key for "%s"',
|
||||
args.ecdsa_curve, args.user_id)
|
||||
@@ -65,41 +65,46 @@ def run_create(args):
|
||||
|
||||
result = encode.create_primary(user_id=args.user_id,
|
||||
pubkey=primary,
|
||||
signer_func=d.sign)
|
||||
signer_func=c.sign)
|
||||
result = encode.create_subkey(primary_bytes=result,
|
||||
subkey=subkey,
|
||||
signer_func=d.sign)
|
||||
signer_func=c.sign)
|
||||
|
||||
sys.stdout.write(protocol.armor(result, 'PUBLIC KEY BLOCK'))
|
||||
|
||||
|
||||
def main_create():
|
||||
"""Main function for GPG identity creation."""
|
||||
p = argparse.ArgumentParser()
|
||||
p.add_argument('user_id')
|
||||
p.add_argument('-e', '--ecdsa-curve', default='nist256p1')
|
||||
p.add_argument('-t', '--time', type=int, default=int(time.time()))
|
||||
p.add_argument('-v', '--verbose', default=0, action='count')
|
||||
|
||||
args = p.parse_args()
|
||||
def run_create(device_type, args):
|
||||
"""Export public GPG key."""
|
||||
util.setup_logging(verbosity=args.verbose)
|
||||
log.warning('This GPG tool is still in EXPERIMENTAL mode, '
|
||||
'so please note that the API and features may '
|
||||
'change without backwards compatibility!')
|
||||
|
||||
existing_gpg = keyring.gpg_version().decode('ascii')
|
||||
required_gpg = '>=2.1.15'
|
||||
required_gpg = '>=2.1.11'
|
||||
if semver.match(existing_gpg, required_gpg):
|
||||
run_create(args)
|
||||
export_public_key(device_type, args)
|
||||
else:
|
||||
log.error('Existing gpg2 has version "%s" (%s required)',
|
||||
existing_gpg, required_gpg)
|
||||
|
||||
|
||||
def main_agent():
|
||||
def run_unlock(device_type, args):
|
||||
"""Unlock hardware device (for future interaction)."""
|
||||
util.setup_logging(verbosity=args.verbose)
|
||||
with device_type() as d:
|
||||
log.info('unlocked %s device', d)
|
||||
|
||||
|
||||
def run_agent(device_type):
|
||||
"""Run a simple GPG-agent server."""
|
||||
home_dir = os.environ.get('GNUPGHOME', os.path.expanduser('~/.gnupg/trezor'))
|
||||
config_file = os.path.join(home_dir, 'gpg-agent.conf')
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('--homedir', default=os.environ.get('GNUPGHOME'))
|
||||
args, _ = parser.parse_known_args()
|
||||
|
||||
assert args.homedir
|
||||
config_file = os.path.join(args.homedir, 'gpg-agent.conf')
|
||||
|
||||
lines = (line.strip() for line in open(config_file))
|
||||
lines = (line for line in lines if line and not line.startswith('#'))
|
||||
config = dict(line.split(' ', 1) for line in lines)
|
||||
@@ -111,17 +116,30 @@ def main_agent():
|
||||
for conn in agent.yield_connections(sock):
|
||||
with contextlib.closing(conn):
|
||||
try:
|
||||
agent.handle_connection(conn)
|
||||
agent.handle_connection(conn=conn, device_type=device_type)
|
||||
except StopIteration:
|
||||
log.info('stopping gpg-agent')
|
||||
return
|
||||
except Exception as e: # pylint: disable=broad-except
|
||||
log.exception('gpg-agent failed: %s', e)
|
||||
|
||||
|
||||
def auto_unlock():
|
||||
"""Automatically unlock first found device (used for `gpg-shell`)."""
|
||||
p = argparse.ArgumentParser()
|
||||
p.add_argument('-v', '--verbose', default=0, action='count')
|
||||
def main(device_type):
|
||||
"""Parse command-line arguments."""
|
||||
parser = argparse.ArgumentParser()
|
||||
subparsers = parser.add_subparsers()
|
||||
|
||||
args = p.parse_args()
|
||||
util.setup_logging(verbosity=args.verbose)
|
||||
d = device.detect()
|
||||
log.info('unlocked %s device', d)
|
||||
p = subparsers.add_parser('create', help='Export public GPG key')
|
||||
p.add_argument('user_id')
|
||||
p.add_argument('-e', '--ecdsa-curve', default='nist256p1')
|
||||
p.add_argument('-t', '--time', type=int, default=int(time.time()))
|
||||
p.add_argument('-v', '--verbose', default=0, action='count')
|
||||
p.add_argument('-s', '--subkey', default=False, action='store_true')
|
||||
p.set_defaults(func=run_create)
|
||||
|
||||
p = subparsers.add_parser('unlock', help='Unlock the hardware device')
|
||||
p.add_argument('-v', '--verbose', default=0, action='count')
|
||||
p.set_defaults(func=run_unlock)
|
||||
|
||||
args = parser.parse_args()
|
||||
return args.func(device_type=device_type, args=args)
|
||||
@@ -2,7 +2,7 @@
|
||||
import binascii
|
||||
import logging
|
||||
|
||||
from . import decode, client, keyring, protocol
|
||||
from . import client, decode, keyring, protocol
|
||||
from .. import util
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@@ -36,7 +36,7 @@ def sig_encode(r, s):
|
||||
return b'(7:sig-val(5:ecdsa(1:r32:' + r + b')(1:s32:' + s + b')))'
|
||||
|
||||
|
||||
def open_connection(keygrip_bytes):
|
||||
def open_connection(keygrip_bytes, device_type):
|
||||
"""
|
||||
Connect to the device for the specified keygrip.
|
||||
|
||||
@@ -51,20 +51,20 @@ def open_connection(keygrip_bytes):
|
||||
curve_name = protocol.get_curve_name_by_oid(pubkey_dict['curve_oid'])
|
||||
ecdh = (pubkey_dict['algo'] == protocol.ECDH_ALGO_ID)
|
||||
|
||||
conn = client.Client(user_id, curve_name=curve_name)
|
||||
conn = client.Client(user_id, curve_name=curve_name, device_type=device_type)
|
||||
pubkey = protocol.PublicKey(
|
||||
curve_name=curve_name, created=pubkey_dict['created'],
|
||||
verifying_key=conn.pubkey(ecdh=ecdh), ecdh=ecdh)
|
||||
assert pubkey.key_id() == pubkey_dict['key_id']
|
||||
assert pubkey.keygrip == keygrip_bytes
|
||||
assert pubkey.keygrip() == keygrip_bytes
|
||||
return conn
|
||||
|
||||
|
||||
def pksign(keygrip, digest, algo):
|
||||
def pksign(keygrip, digest, algo, device_type):
|
||||
"""Sign a message digest using a private EC key."""
|
||||
log.debug('signing %r digest (algo #%s)', digest, algo)
|
||||
keygrip_bytes = binascii.unhexlify(keygrip)
|
||||
conn = open_connection(keygrip_bytes)
|
||||
conn = open_connection(keygrip_bytes, device_type=device_type)
|
||||
r, s = conn.sign(binascii.unhexlify(digest))
|
||||
result = sig_encode(r, s)
|
||||
log.debug('result: %r', result)
|
||||
@@ -92,7 +92,7 @@ def parse_ecdh(line):
|
||||
return dict(items)[b'e']
|
||||
|
||||
|
||||
def pkdecrypt(keygrip, conn):
|
||||
def pkdecrypt(keygrip, conn, device_type):
|
||||
"""Handle decryption using ECDH."""
|
||||
for msg in [b'S INQUIRE_MAXLEN 4096', b'INQUIRE CIPHERTEXT']:
|
||||
keyring.sendline(conn, msg)
|
||||
@@ -102,11 +102,24 @@ def pkdecrypt(keygrip, conn):
|
||||
remote_pubkey = parse_ecdh(line)
|
||||
|
||||
keygrip_bytes = binascii.unhexlify(keygrip)
|
||||
conn = open_connection(keygrip_bytes)
|
||||
conn = open_connection(keygrip_bytes, device_type=device_type)
|
||||
return _serialize_point(conn.ecdh(remote_pubkey))
|
||||
|
||||
|
||||
def handle_connection(conn):
|
||||
@util.memoize
|
||||
def have_key(keygrip, device_type):
|
||||
"""Check if current keygrip correspond to a TREZOR-based key."""
|
||||
try:
|
||||
open_connection(keygrip_bytes=binascii.unhexlify(keygrip),
|
||||
device_type=device_type)
|
||||
return True
|
||||
except KeyError as e:
|
||||
log.warning('HAVEKEY(%s) failed: %s', keygrip, e)
|
||||
return False
|
||||
|
||||
|
||||
# pylint: disable=too-many-branches
|
||||
def handle_connection(conn, device_type):
|
||||
"""Handle connection from GPG binary using the ASSUAN protocol."""
|
||||
keygrip = None
|
||||
digest = None
|
||||
@@ -118,7 +131,7 @@ def handle_connection(conn):
|
||||
parts = line.split(b' ')
|
||||
command = parts[0]
|
||||
args = parts[1:]
|
||||
if command in {b'RESET', b'OPTION', b'HAVEKEY', b'SETKEYDESC'}:
|
||||
if command in {b'RESET', b'OPTION', b'SETKEYDESC'}:
|
||||
pass # reply with OK
|
||||
elif command == b'GETINFO':
|
||||
keyring.sendline(conn, b'D ' + version)
|
||||
@@ -129,19 +142,28 @@ def handle_connection(conn):
|
||||
elif command == b'SETHASH':
|
||||
algo, digest = args
|
||||
elif command == b'PKSIGN':
|
||||
sig = pksign(keygrip, digest, algo)
|
||||
sig = pksign(keygrip, digest, algo, device_type=device_type)
|
||||
keyring.sendline(conn, b'D ' + sig)
|
||||
elif command == b'PKDECRYPT':
|
||||
sec = pkdecrypt(keygrip, conn)
|
||||
sec = pkdecrypt(keygrip, conn, device_type=device_type)
|
||||
keyring.sendline(conn, b'D ' + sec)
|
||||
elif command == b'HAVEKEY':
|
||||
if not have_key(keygrip=args[0], device_type=device_type):
|
||||
keyring.sendline(conn,
|
||||
b'ERR 67108881 No secret key <GPG Agent>')
|
||||
return
|
||||
elif command == b'KEYINFO':
|
||||
keygrip, = args
|
||||
# Dummy reply (mainly for 'gpg --edit' to succeed).
|
||||
# For details, see GnuPG agent KEYINFO command help.
|
||||
# https://git.gnupg.org/cgi-bin/gitweb.cgi?p=gnupg.git;a=blob;f=agent/command.c;h=c8b34e9882076b1b724346787781f657cac75499;hb=refs/heads/master#l1082
|
||||
fmt = 'S KEYINFO {0} X - - - - - - -'
|
||||
keyring.sendline(conn, fmt.format(keygrip).encode('ascii'))
|
||||
elif command == b'BYE':
|
||||
return
|
||||
elif command == b'KILLAGENT':
|
||||
keyring.sendline(conn, b'OK')
|
||||
raise StopIteration
|
||||
else:
|
||||
log.error('unknown request: %r', line)
|
||||
return
|
||||
@@ -10,9 +10,9 @@ log = logging.getLogger(__name__)
|
||||
class Client(object):
|
||||
"""Sign messages and get public keys from a hardware device."""
|
||||
|
||||
def __init__(self, user_id, curve_name):
|
||||
def __init__(self, user_id, curve_name, device_type):
|
||||
"""Connect to the device and retrieve required public key."""
|
||||
self.device = device.detect()
|
||||
self.device = device_type()
|
||||
self.user_id = user_id
|
||||
self.identity = device.interface.Identity(
|
||||
identity_str='gpg://', curve_name=curve_name)
|
||||
@@ -54,7 +54,8 @@ def parse_mpis(s, n):
|
||||
|
||||
def _parse_nist256p1_pubkey(mpi):
|
||||
prefix, x, y = util.split_bits(mpi, 4, 256, 256)
|
||||
assert prefix == 4
|
||||
if prefix != 4:
|
||||
raise ValueError('Invalid MPI prefix: {}'.format(prefix))
|
||||
point = ecdsa.ellipticcurve.Point(curve=ecdsa.NIST256p.curve,
|
||||
x=x, y=y)
|
||||
return ecdsa.VerifyingKey.from_public_point(
|
||||
@@ -64,7 +65,8 @@ def _parse_nist256p1_pubkey(mpi):
|
||||
|
||||
def _parse_ed25519_pubkey(mpi):
|
||||
prefix, value = util.split_bits(mpi, 8, 256)
|
||||
assert prefix == 0x40
|
||||
if prefix != 0x40:
|
||||
raise ValueError('Invalid MPI prefix: {}'.format(prefix))
|
||||
return ed25519.VerifyingKey(util.num2bytes(value, size=32))
|
||||
|
||||
|
||||
@@ -83,18 +85,6 @@ DSA_ALGO_ID = 17
|
||||
ECDSA_ALGO_IDS = {18, 19, 22} # {ecdsa, nist256, ed25519}
|
||||
|
||||
|
||||
def _parse_literal(stream):
|
||||
"""See https://tools.ietf.org/html/rfc4880#section-5.9 for details."""
|
||||
p = {'type': 'literal'}
|
||||
p['format'] = stream.readfmt('c')
|
||||
filename_len = stream.readfmt('B')
|
||||
p['filename'] = stream.read(filename_len)
|
||||
p['date'] = stream.readfmt('>L')
|
||||
p['content'] = stream.read()
|
||||
p['_to_hash'] = p['content']
|
||||
return p
|
||||
|
||||
|
||||
def _parse_embedded_signatures(subpackets):
|
||||
for packet in subpackets:
|
||||
data = bytearray(packet)
|
||||
@@ -104,6 +94,12 @@ def _parse_embedded_signatures(subpackets):
|
||||
yield _parse_signature(util.Reader(stream))
|
||||
|
||||
|
||||
def has_custom_subpacket(signature_packet):
|
||||
"""Detect our custom public keys by matching subpacket data."""
|
||||
return any(protocol.CUSTOM_KEY_LABEL == subpacket[1:]
|
||||
for subpacket in signature_packet['unhashed_subpackets'])
|
||||
|
||||
|
||||
def _parse_signature(stream):
|
||||
"""See https://tools.ietf.org/html/rfc4880#section-5.2 for details."""
|
||||
p = {'type': 'signature'}
|
||||
@@ -127,10 +123,6 @@ def _parse_signature(stream):
|
||||
log.debug('embedded sigs: %s', embedded)
|
||||
p['embedded'] = embedded
|
||||
|
||||
# Detect our custom public keys by matching subpacket data
|
||||
p['_is_custom'] = any(protocol.CUSTOM_KEY_LABEL == subpacket[1:]
|
||||
for subpacket in p['unhashed_subpackets'])
|
||||
|
||||
p['hash_prefix'] = stream.readfmt('2s')
|
||||
if p['pubkey_alg'] in ECDSA_ALGO_IDS:
|
||||
p['sig'] = (parse_mpi(stream), parse_mpi(stream))
|
||||
@@ -170,7 +162,7 @@ def _parse_pubkey(stream, packet_type='pubkey'):
|
||||
# should be b'\x03\x01\x08\x07': SHA256 + AES128
|
||||
size, = util.readfmt(leftover, 'B')
|
||||
p['kdf'] = leftover.read(size)
|
||||
assert not leftover.read()
|
||||
p['secret'] = leftover.read()
|
||||
|
||||
parse_func, keygrip_func = SUPPORTED_CURVES[oid]
|
||||
keygrip = keygrip_func(parse_func(mpi))
|
||||
@@ -209,8 +201,9 @@ _parse_attribute = functools.partial(_parse_user_id,
|
||||
|
||||
PACKET_TYPES = {
|
||||
2: _parse_signature,
|
||||
5: _parse_pubkey,
|
||||
6: _parse_pubkey,
|
||||
11: _parse_literal,
|
||||
7: _parse_subkey,
|
||||
13: _parse_user_id,
|
||||
14: _parse_subkey,
|
||||
17: _parse_attribute,
|
||||
@@ -254,11 +247,13 @@ def parse_packets(stream):
|
||||
packet_data = reader.read(packet_size)
|
||||
packet_type = PACKET_TYPES.get(tag)
|
||||
|
||||
p = {'type': 'unknown', 'tag': tag, 'raw': packet_data}
|
||||
if packet_type is not None:
|
||||
p = packet_type(util.Reader(io.BytesIO(packet_data)))
|
||||
p['tag'] = tag
|
||||
else:
|
||||
p = {'type': 'unknown', 'tag': tag, 'raw': packet_data}
|
||||
try:
|
||||
p = packet_type(util.Reader(io.BytesIO(packet_data)))
|
||||
p['tag'] = tag
|
||||
except ValueError:
|
||||
log.exception('Skipping packet: %s', util.hexlify(packet_data))
|
||||
|
||||
log.debug('packet "%s": %s', p['type'], p)
|
||||
yield p
|
||||
@@ -300,6 +295,7 @@ def load_by_keygrip(pubkey_bytes, keygrip):
|
||||
for p in packets:
|
||||
if p.get('keygrip') == keygrip:
|
||||
return p, user_ids
|
||||
raise KeyError('{} keygrip not found'.format(util.hexlify(keygrip)))
|
||||
|
||||
|
||||
def load_signature(stream, original_data):
|
||||
@@ -8,9 +8,10 @@ from .. import util
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def create_primary(user_id, pubkey, signer_func):
|
||||
def create_primary(user_id, pubkey, signer_func, secret_bytes=b''):
|
||||
"""Export new primary GPG public key, ready for "gpg2 --import"."""
|
||||
pubkey_packet = protocol.packet(tag=6, blob=pubkey.data())
|
||||
pubkey_packet = protocol.packet(tag=(5 if secret_bytes else 6),
|
||||
blob=(pubkey.data() + secret_bytes))
|
||||
user_id_packet = protocol.packet(tag=13,
|
||||
blob=user_id.encode('ascii'))
|
||||
|
||||
@@ -47,9 +48,10 @@ def create_primary(user_id, pubkey, signer_func):
|
||||
return pubkey_packet + user_id_packet + sign_packet
|
||||
|
||||
|
||||
def create_subkey(primary_bytes, subkey, signer_func, user_id=None):
|
||||
def create_subkey(primary_bytes, subkey, signer_func, secret_bytes=b''):
|
||||
"""Export new subkey to GPG primary key."""
|
||||
subkey_packet = protocol.packet(tag=14, blob=subkey.data())
|
||||
subkey_packet = protocol.packet(tag=(7 if secret_bytes else 14),
|
||||
blob=(subkey.data() + secret_bytes))
|
||||
packets = list(decode.parse_packets(io.BytesIO(primary_bytes)))
|
||||
primary, user_id, signature = packets[:3]
|
||||
|
||||
@@ -87,7 +89,7 @@ def create_subkey(primary_bytes, subkey, signer_func, user_id=None):
|
||||
unhashed_subpackets.append(protocol.subpacket(32, embedded_sig))
|
||||
unhashed_subpackets.append(protocol.CUSTOM_SUBPACKET)
|
||||
|
||||
if not signature['_is_custom']:
|
||||
if not decode.has_custom_subpacket(signature):
|
||||
signer_func = keyring.create_agent_signer(user_id['value'])
|
||||
|
||||
signature = protocol.make_signature(
|
||||
@@ -185,6 +185,7 @@ class PublicKey(object):
|
||||
|
||||
def __init__(self, curve_name, created, verifying_key, ecdh=False):
|
||||
"""Contruct using a ECDSA VerifyingKey object."""
|
||||
self.curve_name = curve_name
|
||||
self.curve_info = SUPPORTED_CURVES[curve_name]
|
||||
self.created = int(created) # time since Epoch
|
||||
self.verifying_key = verifying_key
|
||||
@@ -196,12 +197,8 @@ class PublicKey(object):
|
||||
self.algo_id = self.curve_info['algo_id']
|
||||
self.ecdh_packet = b''
|
||||
|
||||
hex_key_id = util.hexlify(self.key_id())[-8:]
|
||||
self.desc = 'GPG public key {}/{}'.format(curve_name, hex_key_id)
|
||||
|
||||
@property
|
||||
def keygrip(self):
|
||||
"""Compute GPG2 keygrip."""
|
||||
"""Compute GPG keygrip of the verifying key."""
|
||||
return self.curve_info['keygrip'](self.verifying_key)
|
||||
|
||||
def data(self):
|
||||
@@ -227,7 +224,8 @@ class PublicKey(object):
|
||||
|
||||
def __repr__(self):
|
||||
"""Short (8 hexadecimal digits) GPG key ID."""
|
||||
return self.desc
|
||||
hex_key_id = util.hexlify(self.key_id())[-8:]
|
||||
return 'GPG public key {}/{}'.format(self.curve_name, hex_key_id)
|
||||
|
||||
__str__ = __repr__
|
||||
|
||||
@@ -41,5 +41,22 @@ def public_key_path(request):
|
||||
|
||||
def test_gpg_files(public_key_path): # pylint: disable=redefined-outer-name
|
||||
with open(public_key_path, 'rb') as f:
|
||||
packets = list(decode.parse_packets(f))
|
||||
assert len(packets) > 0
|
||||
assert list(decode.parse_packets(f))
|
||||
|
||||
|
||||
def test_has_custom_subpacket():
|
||||
sig = {'unhashed_subpackets': []}
|
||||
assert not decode.has_custom_subpacket(sig)
|
||||
|
||||
custom_markers = [
|
||||
protocol.CUSTOM_SUBPACKET,
|
||||
protocol.subpacket(10, protocol.CUSTOM_KEY_LABEL),
|
||||
]
|
||||
for marker in custom_markers:
|
||||
sig = {'unhashed_subpackets': [marker]}
|
||||
assert decode.has_custom_subpacket(sig)
|
||||
|
||||
|
||||
def test_load_by_keygrip_missing():
|
||||
with pytest.raises(KeyError):
|
||||
decode.load_by_keygrip(pubkey_bytes=b'', keygrip=b'')
|
||||
@@ -80,4 +80,22 @@ PKSIGN
|
||||
def test_iterlines():
|
||||
sock = FakeSocket()
|
||||
sock.rx.write(b'foo\nbar\nxyz')
|
||||
assert list(keyring.iterlines(sock)) == []
|
||||
sock.rx.seek(0)
|
||||
assert list(keyring.iterlines(sock)) == [b'foo', b'bar']
|
||||
|
||||
|
||||
def test_get_agent_sock_path():
|
||||
sp = mock.Mock(spec=['check_output'])
|
||||
sp.check_output.return_value = b'''sysconfdir:/usr/local/etc/gnupg
|
||||
bindir:/usr/local/bin
|
||||
libexecdir:/usr/local/libexec
|
||||
libdir:/usr/local/lib/gnupg
|
||||
datadir:/usr/local/share/gnupg
|
||||
localedir:/usr/local/share/locale
|
||||
dirmngr-socket:/run/user/1000/gnupg/S.dirmngr
|
||||
agent-ssh-socket:/run/user/1000/gnupg/S.gpg-agent.ssh
|
||||
agent-socket:/run/user/1000/gnupg/S.gpg-agent
|
||||
homedir:/home/roman/.gnupg
|
||||
'''
|
||||
expected = b'/run/user/1000/gnupg/S.gpg-agent'
|
||||
assert keyring.get_agent_sock_path(sp=sp) == expected
|
||||
@@ -1,5 +1,6 @@
|
||||
import ecdsa
|
||||
import ed25519
|
||||
import pytest
|
||||
|
||||
from .. import protocol
|
||||
from ... import formats
|
||||
@@ -69,7 +70,7 @@ def test_nist256p1():
|
||||
pk = protocol.PublicKey(curve_name=formats.CURVE_NIST256,
|
||||
created=42, verifying_key=vk)
|
||||
assert repr(pk) == 'GPG public key nist256p1/F82361D9'
|
||||
assert pk.keygrip == b'\x95\x85.\x91\x7f\xe2\xc3\x91R\xba\x99\x81\x92\xb5y\x1d\xb1\\\xdc\xf0'
|
||||
assert pk.keygrip() == b'\x95\x85.\x91\x7f\xe2\xc3\x91R\xba\x99\x81\x92\xb5y\x1d\xb1\\\xdc\xf0'
|
||||
|
||||
|
||||
def test_nist256p1_ecdh():
|
||||
@@ -78,7 +79,7 @@ def test_nist256p1_ecdh():
|
||||
pk = protocol.PublicKey(curve_name=formats.CURVE_NIST256,
|
||||
created=42, verifying_key=vk, ecdh=True)
|
||||
assert repr(pk) == 'GPG public key nist256p1/5811DF46'
|
||||
assert pk.keygrip == b'\x95\x85.\x91\x7f\xe2\xc3\x91R\xba\x99\x81\x92\xb5y\x1d\xb1\\\xdc\xf0'
|
||||
assert pk.keygrip() == b'\x95\x85.\x91\x7f\xe2\xc3\x91R\xba\x99\x81\x92\xb5y\x1d\xb1\\\xdc\xf0'
|
||||
|
||||
|
||||
def test_ed25519():
|
||||
@@ -87,4 +88,20 @@ def test_ed25519():
|
||||
pk = protocol.PublicKey(curve_name=formats.CURVE_ED25519,
|
||||
created=42, verifying_key=vk)
|
||||
assert repr(pk) == 'GPG public key ed25519/36B40FE6'
|
||||
assert pk.keygrip == b'\xbf\x01\x90l\x17\xb64\xa3-\xf4\xc0gr\x99\x18<\xddBQ?'
|
||||
assert pk.keygrip() == b'\xbf\x01\x90l\x17\xb64\xa3-\xf4\xc0gr\x99\x18<\xddBQ?'
|
||||
|
||||
|
||||
def test_curve25519():
|
||||
sk = ed25519.SigningKey(b'\x00' * 32)
|
||||
vk = sk.get_verifying_key()
|
||||
pk = protocol.PublicKey(curve_name=formats.ECDH_CURVE25519,
|
||||
created=42, verifying_key=vk)
|
||||
assert repr(pk) == 'GPG public key curve25519/69460384'
|
||||
assert pk.keygrip() == b'x\xd6\x86\xe4\xa6\xfc;\x0fY\xe1}Lw\xc4\x9ed\xf1Q\x8a\x00'
|
||||
|
||||
|
||||
def test_get_curve_name_by_oid():
|
||||
for name, info in protocol.SUPPORTED_CURVES.items():
|
||||
assert protocol.get_curve_name_by_oid(info['oid']) == name
|
||||
with pytest.raises(KeyError):
|
||||
protocol.get_curve_name_by_oid('BAD_OID')
|
||||
@@ -62,7 +62,9 @@ def failure():
|
||||
|
||||
def _legacy_pubs(buf):
|
||||
"""SSH v1 public keys are not supported."""
|
||||
assert not buf.read()
|
||||
leftover = buf.read()
|
||||
if leftover:
|
||||
log.warning('skipping leftover: %r', leftover)
|
||||
code = util.pack('B', msg_code('SSH_AGENT_RSA_IDENTITIES_ANSWER'))
|
||||
num = util.pack('L', 0) # no SSH v1 keys
|
||||
return util.frame(code, num)
|
||||
@@ -71,14 +73,13 @@ def _legacy_pubs(buf):
|
||||
class Handler(object):
|
||||
"""ssh-agent protocol handler."""
|
||||
|
||||
def __init__(self, keys, signer, debug=False):
|
||||
def __init__(self, conn, debug=False):
|
||||
"""
|
||||
Create a protocol handler with specified public keys.
|
||||
|
||||
Use specified signer function to sign SSH authentication requests.
|
||||
"""
|
||||
self.public_keys = keys
|
||||
self.signer = signer
|
||||
self.conn = conn
|
||||
self.debug = debug
|
||||
|
||||
self.methods = {
|
||||
@@ -107,7 +108,7 @@ class Handler(object):
|
||||
def list_pubs(self, buf):
|
||||
"""SSH v2 public keys are serialized and returned."""
|
||||
assert not buf.read()
|
||||
keys = self.public_keys
|
||||
keys = self.conn.parse_public_keys()
|
||||
code = util.pack('B', msg_code('SSH2_AGENT_IDENTITIES_ANSWER'))
|
||||
num = util.pack('L', len(keys))
|
||||
log.debug('available keys: %s', [k['name'] for k in keys])
|
||||
@@ -129,7 +130,7 @@ class Handler(object):
|
||||
assert util.read_frame(buf) == b''
|
||||
assert not buf.read()
|
||||
|
||||
for k in self.public_keys:
|
||||
for k in self.conn.parse_public_keys():
|
||||
if (k['fingerprint']) == (key['fingerprint']):
|
||||
log.debug('using key %r (%s)', k['name'], k['fingerprint'])
|
||||
key = k
|
||||
@@ -140,7 +141,7 @@ class Handler(object):
|
||||
label = key['name'].decode('ascii') # label should be a string
|
||||
log.debug('signing %d-byte blob with "%s" key', len(blob), label)
|
||||
try:
|
||||
signature = self.signer(blob=blob, identity=key['identity'])
|
||||
signature = self.conn.sign(blob=blob, identity=key['identity'])
|
||||
except IOError:
|
||||
return failure()
|
||||
log.debug('signature: %r', signature)
|
||||
@@ -31,7 +31,7 @@ def unix_domain_socket_server(sock_path):
|
||||
|
||||
Listen on it, and delete it after the generated context is over.
|
||||
"""
|
||||
log.debug('serving on SSH_AUTH_SOCK=%s', sock_path)
|
||||
log.debug('serving on %s', sock_path)
|
||||
remove_file(sock_path)
|
||||
|
||||
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
@@ -7,7 +7,7 @@ import re
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
from . import client, device, formats, protocol, server, util
|
||||
from .. import client, device, formats, protocol, server, util
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@@ -22,7 +22,22 @@ def ssh_args(label):
|
||||
if 'user' in identity:
|
||||
args += ['-l', identity['user']]
|
||||
|
||||
return ['ssh'] + args + [identity['host']]
|
||||
return args + [identity['host']]
|
||||
|
||||
|
||||
def mosh_args(label):
|
||||
"""Create SSH command for connecting specified server."""
|
||||
identity = device.interface.string_to_identity(label)
|
||||
|
||||
args = []
|
||||
if 'port' in identity:
|
||||
args += ['-p', identity['port']]
|
||||
if 'user' in identity:
|
||||
args += [identity['user']+'@'+identity['host']]
|
||||
else:
|
||||
args += [identity['host']]
|
||||
|
||||
return args
|
||||
|
||||
|
||||
def create_parser():
|
||||
@@ -52,6 +67,8 @@ def create_agent_parser():
|
||||
help='run ${SHELL} as subprocess under SSH agent')
|
||||
g.add_argument('-c', '--connect', default=False, action='store_true',
|
||||
help='connect to specified host via SSH')
|
||||
g.add_argument('--mosh', default=False, action='store_true',
|
||||
help='connect to specified host via using Mosh')
|
||||
|
||||
p.add_argument('identity', type=str, default=None,
|
||||
help='proto://[user@]host[:port][/path]')
|
||||
@@ -93,12 +110,10 @@ def git_host(remote_name, attributes):
|
||||
return '{user}@{host}'.format(**match.groupdict())
|
||||
|
||||
|
||||
def run_server(conn, public_keys, command, debug, timeout):
|
||||
def run_server(conn, command, debug, timeout):
|
||||
"""Common code for run_agent and run_git below."""
|
||||
try:
|
||||
signer = conn.sign_ssh_challenge
|
||||
handler = protocol.Handler(keys=public_keys, signer=signer,
|
||||
debug=debug)
|
||||
handler = protocol.Handler(conn=conn, debug=debug)
|
||||
with server.serve(handler=handler, timeout=timeout) as env:
|
||||
return server.run_process(command=command, environ=env)
|
||||
except KeyboardInterrupt:
|
||||
@@ -112,7 +127,7 @@ def handle_connection_error(func):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except IOError as e:
|
||||
log.error('Connection error: %s', e)
|
||||
log.error('Connection error (try unplugging and replugging your device): %s', e)
|
||||
return 1
|
||||
return wrapper
|
||||
|
||||
@@ -125,13 +140,40 @@ def parse_config(fname):
|
||||
curve_name=curve_name)
|
||||
|
||||
|
||||
class JustInTimeConnection(object):
|
||||
"""Connect to the device just before the needed operation."""
|
||||
|
||||
def __init__(self, conn_factory, identities):
|
||||
"""Create a JIT connection object."""
|
||||
self.conn_factory = conn_factory
|
||||
self.identities = identities
|
||||
self.public_keys = util.memoize(self._public_keys) # a simple cache
|
||||
|
||||
def _public_keys(self):
|
||||
"""Return a list of SSH public keys (in textual format)."""
|
||||
conn = self.conn_factory()
|
||||
return conn.export_public_keys(self.identities)
|
||||
|
||||
def parse_public_keys(self):
|
||||
"""Parse SSH public keys into dictionaries."""
|
||||
public_keys = [formats.import_public_key(pk)
|
||||
for pk in self.public_keys()]
|
||||
for pk, identity in zip(public_keys, self.identities):
|
||||
pk['identity'] = identity
|
||||
return public_keys
|
||||
|
||||
def sign(self, blob, identity):
|
||||
"""Sign a given blob using the specified identity on the device."""
|
||||
conn = self.conn_factory()
|
||||
return conn.sign_ssh_challenge(blob=blob, identity=identity)
|
||||
|
||||
|
||||
@handle_connection_error
|
||||
def run_agent(client_factory=client.Client):
|
||||
def main(device_type):
|
||||
"""Run ssh-agent using given hardware client factory."""
|
||||
args = create_agent_parser().parse_args()
|
||||
util.setup_logging(verbosity=args.verbose)
|
||||
|
||||
conn = client_factory(device=device.detect())
|
||||
if args.identity.startswith('/'):
|
||||
identities = list(parse_config(fname=args.identity))
|
||||
else:
|
||||
@@ -141,26 +183,23 @@ def run_agent(client_factory=client.Client):
|
||||
identity.identity_dict['proto'] = 'ssh'
|
||||
log.info('identity #%d: %s', index, identity)
|
||||
|
||||
command = args.command
|
||||
|
||||
public_keys = [conn.get_public_key(i) for i in identities]
|
||||
|
||||
if args.connect:
|
||||
command = ssh_args(args.identity) + args.command
|
||||
log.debug('SSH connect: %r', command)
|
||||
command = ['ssh'] + ssh_args(args.identity) + args.command
|
||||
elif args.mosh:
|
||||
command = ['mosh'] + mosh_args(args.identity) + args.command
|
||||
else:
|
||||
command = args.command
|
||||
|
||||
use_shell = bool(args.shell)
|
||||
if use_shell:
|
||||
command = os.environ['SHELL']
|
||||
log.debug('using shell: %r', command)
|
||||
|
||||
if not command:
|
||||
for pk in public_keys:
|
||||
conn = JustInTimeConnection(
|
||||
conn_factory=lambda: client.Client(device_type()),
|
||||
identities=identities)
|
||||
if command:
|
||||
return run_server(conn=conn, command=command, debug=args.debug,
|
||||
timeout=args.timeout)
|
||||
else:
|
||||
for pk in conn.public_keys():
|
||||
sys.stdout.write(pk)
|
||||
return
|
||||
|
||||
public_keys = [formats.import_public_key(pk) for pk in public_keys]
|
||||
for pk, identity in zip(public_keys, identities):
|
||||
pk['identity'] = identity
|
||||
return run_server(conn=conn, public_keys=public_keys, command=command,
|
||||
debug=args.debug, timeout=args.timeout)
|
||||
@@ -20,9 +20,6 @@ class MockDevice(device.interface.Device): # pylint: disable=abstract-method
|
||||
def connect(self): # pylint: disable=no-self-use
|
||||
return mock.Mock()
|
||||
|
||||
def close(self):
|
||||
self.conn = None
|
||||
|
||||
def pubkey(self, identity, ecdh=False): # pylint: disable=unused-argument
|
||||
assert self.conn
|
||||
return PUBKEY
|
||||
@@ -34,16 +31,6 @@ class MockDevice(device.interface.Device): # pylint: disable=abstract-method
|
||||
return SIG
|
||||
|
||||
|
||||
def identity_type(**kwargs):
|
||||
result = mock.Mock(spec=[])
|
||||
result.index = 0
|
||||
result.proto = result.user = result.host = result.port = None
|
||||
result.path = None
|
||||
for k, v in kwargs.items():
|
||||
setattr(result, k, v)
|
||||
return result
|
||||
|
||||
|
||||
BLOB = (b'\x00\x00\x00 \xce\xe0\xc9\xd5\xceu/\xe8\xc5\xf2\xbfR+x\xa1\xcf\xb0'
|
||||
b'\x8e;R\xd3)m\x96\x1b\xb4\xd8s\xf1\x99\x16\xaa2\x00\x00\x00\x05roman'
|
||||
b'\x00\x00\x00\x0essh-connection\x00\x00\x00\tpublickey'
|
||||
@@ -62,7 +49,7 @@ def test_ssh_agent():
|
||||
identity = device.interface.Identity(identity_str='localhost:22',
|
||||
curve_name=CURVE)
|
||||
c = client.Client(device=MockDevice())
|
||||
assert c.get_public_key(identity) == PUBKEY_TEXT
|
||||
assert c.export_public_keys([identity]) == [PUBKEY_TEXT]
|
||||
signature = c.sign_ssh_challenge(blob=BLOB, identity=identity)
|
||||
|
||||
key = formats.import_public_key(PUBKEY_TEXT)
|
||||
@@ -93,3 +93,11 @@ def test_curve_mismatch():
|
||||
def test_serialize_error():
|
||||
with pytest.raises(TypeError):
|
||||
formats.serialize_verifying_key(None)
|
||||
|
||||
|
||||
def test_get_ecdh_curve_name():
|
||||
for c in [formats.CURVE_NIST256, formats.ECDH_CURVE25519]:
|
||||
assert c == formats.get_ecdh_curve_name(c)
|
||||
|
||||
assert (formats.ECDH_CURVE25519 ==
|
||||
formats.get_ecdh_curve_name(formats.CURVE_ED25519))
|
||||
@@ -1,3 +1,4 @@
|
||||
import mock
|
||||
import pytest
|
||||
|
||||
from .. import device, formats, protocol
|
||||
@@ -15,16 +16,30 @@ NIST256_SIGN_MSG = b'\r\x00\x00\x00h\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\
|
||||
NIST256_SIGN_REPLY = b'\x00\x00\x00j\x0e\x00\x00\x00e\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00J\x00\x00\x00!\x00\x88G!\x0c\n\x16:\xbeF\xbe\xb9\xd2\xa9&e\x89\xad\xc4}\x10\xf8\xbc\xdc\xef\x0e\x8d_\x8a6.\xb6\x1f\x00\x00\x00!\x00q\xf0\x16>,\x9a\xde\xe7(\xd6\xd7\x93\x1f\xed\xf9\x94ddw\xfe\xbdq\x13\xbb\xfc\xa9K\xea\x9dC\xa1\xe9' # nopep8
|
||||
|
||||
|
||||
def fake_connection(keys, signer):
|
||||
c = mock.Mock()
|
||||
c.parse_public_keys.return_value = keys
|
||||
c.sign = signer
|
||||
return c
|
||||
|
||||
|
||||
def test_list():
|
||||
key = formats.import_public_key(NIST256_KEY)
|
||||
key['identity'] = device.interface.Identity('ssh://localhost', 'nist256p1')
|
||||
h = protocol.Handler(keys=[key], signer=None)
|
||||
h = protocol.Handler(fake_connection(keys=[key], signer=None))
|
||||
reply = h.handle(LIST_MSG)
|
||||
assert reply == LIST_NIST256_REPLY
|
||||
|
||||
|
||||
def test_list_legacy_pubs_with_suffix():
|
||||
h = protocol.Handler(fake_connection(keys=[], signer=None))
|
||||
suffix = b'\x00\x00\x00\x06foobar'
|
||||
reply = h.handle(b'\x01' + suffix)
|
||||
assert reply == b'\x00\x00\x00\x05\x02\x00\x00\x00\x00' # no legacy keys
|
||||
|
||||
|
||||
def test_unsupported():
|
||||
h = protocol.Handler(keys=[], signer=None)
|
||||
h = protocol.Handler(fake_connection(keys=[], signer=None))
|
||||
reply = h.handle(b'\x09')
|
||||
assert reply == b'\x00\x00\x00\x01\x05'
|
||||
|
||||
@@ -38,13 +53,13 @@ def ecdsa_signer(identity, blob):
|
||||
def test_ecdsa_sign():
|
||||
key = formats.import_public_key(NIST256_KEY)
|
||||
key['identity'] = device.interface.Identity('ssh://localhost', 'nist256p1')
|
||||
h = protocol.Handler(keys=[key], signer=ecdsa_signer)
|
||||
h = protocol.Handler(fake_connection(keys=[key], signer=ecdsa_signer))
|
||||
reply = h.handle(NIST256_SIGN_MSG)
|
||||
assert reply == NIST256_SIGN_REPLY
|
||||
|
||||
|
||||
def test_sign_missing():
|
||||
h = protocol.Handler(keys=[], signer=ecdsa_signer)
|
||||
h = protocol.Handler(fake_connection(keys=[], signer=ecdsa_signer))
|
||||
with pytest.raises(KeyError):
|
||||
h.handle(NIST256_SIGN_MSG)
|
||||
|
||||
@@ -57,7 +72,7 @@ def test_sign_wrong():
|
||||
|
||||
key = formats.import_public_key(NIST256_KEY)
|
||||
key['identity'] = device.interface.Identity('ssh://localhost', 'nist256p1')
|
||||
h = protocol.Handler(keys=[key], signer=wrong_signature)
|
||||
h = protocol.Handler(fake_connection(keys=[key], signer=wrong_signature))
|
||||
with pytest.raises(ValueError):
|
||||
h.handle(NIST256_SIGN_MSG)
|
||||
|
||||
@@ -68,7 +83,7 @@ def test_sign_cancel():
|
||||
|
||||
key = formats.import_public_key(NIST256_KEY)
|
||||
key['identity'] = device.interface.Identity('ssh://localhost', 'nist256p1')
|
||||
h = protocol.Handler(keys=[key], signer=cancel_signature)
|
||||
h = protocol.Handler(fake_connection(keys=[key], signer=cancel_signature))
|
||||
assert h.handle(NIST256_SIGN_MSG) == protocol.failure()
|
||||
|
||||
|
||||
@@ -89,6 +104,6 @@ def ed25519_signer(identity, blob):
|
||||
def test_ed25519_sign():
|
||||
key = formats.import_public_key(ED25519_KEY)
|
||||
key['identity'] = device.interface.Identity('ssh://localhost', 'ed25519')
|
||||
h = protocol.Handler(keys=[key], signer=ed25519_signer)
|
||||
h = protocol.Handler(fake_connection(keys=[key], signer=ed25519_signer))
|
||||
reply = h.handle(ED25519_SIGN_MSG)
|
||||
assert reply == ED25519_SIGN_REPLY
|
||||
@@ -37,10 +37,16 @@ class FakeSocket(object):
|
||||
pass
|
||||
|
||||
|
||||
def empty_device():
|
||||
c = mock.Mock(spec=['parse_public_keys'])
|
||||
c.parse_public_keys.return_value = []
|
||||
return c
|
||||
|
||||
|
||||
def test_handle():
|
||||
mutex = threading.Lock()
|
||||
|
||||
handler = protocol.Handler(keys=[], signer=None)
|
||||
handler = protocol.Handler(conn=empty_device())
|
||||
conn = FakeSocket()
|
||||
server.handle_connection(conn, handler, mutex)
|
||||
|
||||
@@ -67,7 +73,6 @@ def test_handle():
|
||||
|
||||
|
||||
def test_server_thread():
|
||||
|
||||
connections = [FakeSocket()]
|
||||
quit_event = threading.Event()
|
||||
|
||||
@@ -81,8 +86,10 @@ def test_server_thread():
|
||||
def getsockname(self): # pylint: disable=no-self-use
|
||||
return 'fake_server'
|
||||
|
||||
handler = protocol.Handler(keys=[], signer=None),
|
||||
handle_conn = functools.partial(server.handle_connection, handler=handler)
|
||||
handler = protocol.Handler(conn=empty_device()),
|
||||
handle_conn = functools.partial(server.handle_connection,
|
||||
handler=handler,
|
||||
mutex=None)
|
||||
server.server_thread(sock=FakeServer(),
|
||||
handle_conn=handle_conn,
|
||||
quit_event=quit_event)
|
||||
@@ -111,7 +118,7 @@ def test_run():
|
||||
|
||||
|
||||
def test_serve_main():
|
||||
handler = protocol.Handler(keys=[], signer=None)
|
||||
handler = protocol.Handler(conn=empty_device())
|
||||
with server.serve(handler=handler, sock_path=None):
|
||||
pass
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import io
|
||||
|
||||
import mock
|
||||
import pytest
|
||||
|
||||
from .. import util
|
||||
@@ -97,3 +98,20 @@ def test_reader():
|
||||
|
||||
with pytest.raises(EOFError):
|
||||
r.read(1)
|
||||
|
||||
|
||||
def test_setup_logging():
|
||||
util.setup_logging(verbosity=10)
|
||||
|
||||
|
||||
def test_memoize():
|
||||
f = mock.Mock(side_effect=lambda x: x)
|
||||
|
||||
def func(x):
|
||||
# mock.Mock doesn't work with functools.wraps()
|
||||
return f(x)
|
||||
|
||||
g = util.memoize(func)
|
||||
assert g(1) == g(1)
|
||||
assert g(1) != g(2)
|
||||
assert f.mock_calls == [mock.call(1), mock.call(2)]
|
||||
@@ -1,6 +1,7 @@
|
||||
"""Various I/O and serialization utilities."""
|
||||
import binascii
|
||||
import contextlib
|
||||
import functools
|
||||
import io
|
||||
import logging
|
||||
import struct
|
||||
@@ -185,3 +186,21 @@ def setup_logging(verbosity, **kwargs):
|
||||
levels = [logging.WARNING, logging.INFO, logging.DEBUG]
|
||||
level = levels[min(verbosity, len(levels) - 1)]
|
||||
logging.basicConfig(format=fmt, level=level, **kwargs)
|
||||
|
||||
|
||||
def memoize(func):
|
||||
"""Simple caching decorator."""
|
||||
cache = {}
|
||||
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
"""Caching wrapper."""
|
||||
key = (args, tuple(sorted(kwargs.items())))
|
||||
if key in cache:
|
||||
return cache[key]
|
||||
else:
|
||||
result = func(*args, **kwargs)
|
||||
cache[key] = result
|
||||
return result
|
||||
|
||||
return wrapper
|
||||
@@ -1,32 +1,55 @@
|
||||
#!/bin/bash
|
||||
set -eu
|
||||
|
||||
gpg2 --version >/dev/null # verify that GnuPG 2 is installed
|
||||
|
||||
USER_ID="${1}"
|
||||
HOMEDIR=~/.gnupg/trezor
|
||||
DEVICE=${DEVICE:="trezor"} # or "ledger"
|
||||
CURVE=${CURVE:="nist256p1"} # or "ed25519"
|
||||
TIMESTAMP=${TIMESTAMP:=`date +%s`} # key creation timestamp
|
||||
HOMEDIR=~/.gnupg/${DEVICE}
|
||||
|
||||
# Prepare new GPG home directory for TREZOR-based identity
|
||||
# Prepare new GPG home directory for hardware-based identity
|
||||
rm -rf "${HOMEDIR}"
|
||||
mkdir -p "${HOMEDIR}"
|
||||
chmod 700 "${HOMEDIR}"
|
||||
|
||||
# Generate new GPG identity and import into GPG keyring
|
||||
trezor-gpg-create -v "${USER_ID}" -t "${TIMESTAMP}" -e "${CURVE}" > "${HOMEDIR}/pubkey.asc"
|
||||
gpg2 --homedir "${HOMEDIR}" --import < "${HOMEDIR}/pubkey.asc"
|
||||
$DEVICE-gpg create -v "${USER_ID}" -t "${TIMESTAMP}" -e "${CURVE}" > "${HOMEDIR}/pubkey.asc"
|
||||
gpg2 --homedir "${HOMEDIR}" --import < "${HOMEDIR}/pubkey.asc" 2> /dev/null
|
||||
rm -f "${HOMEDIR}/S.gpg-agent" # (otherwise, our agent won't be started automatically)
|
||||
|
||||
# Make new GPG identity with "ultimate" trust (via its fingerprint)
|
||||
FINGERPRINT=$(gpg2 --homedir "${HOMEDIR}" --list-public-keys --with-colons | sed -n -E 's/^fpr:::::::::([0-9A-F]+):$/\1/p' | head -n1)
|
||||
echo "${FINGERPRINT}:6" | gpg2 --homedir "${HOMEDIR}" --import-ownertrust
|
||||
FINGERPRINT=$(gpg2 --homedir "${HOMEDIR}" --list-public-keys --with-fingerprint --with-colons | sed -n -E 's/^fpr:::::::::([0-9A-F]+):$/\1/p' | head -n1)
|
||||
echo "${FINGERPRINT}:6" | gpg2 --homedir "${HOMEDIR}" --import-ownertrust 2> /dev/null
|
||||
|
||||
AGENT_PATH="$(which ${DEVICE}-gpg-agent)"
|
||||
|
||||
# Prepare GPG configuration file
|
||||
echo "# TREZOR-based GPG configuration
|
||||
agent-program $(which trezor-gpg-agent)
|
||||
echo "# Hardware-based GPG configuration
|
||||
agent-program ${AGENT_PATH}
|
||||
personal-digest-preferences SHA512
|
||||
" | tee "${HOMEDIR}/gpg.conf"
|
||||
" > "${HOMEDIR}/gpg.conf"
|
||||
|
||||
echo "# TREZOR-based GPG agent emulator
|
||||
# Prepare GPG agent configuration file
|
||||
echo "# Hardware-based GPG agent emulator
|
||||
log-file ${HOMEDIR}/gpg-agent.log
|
||||
verbosity 2
|
||||
" | tee "${HOMEDIR}/gpg-agent.conf"
|
||||
" > "${HOMEDIR}/gpg-agent.conf"
|
||||
|
||||
# Prepare a helper script for setting up the new identity
|
||||
echo "#!/bin/bash
|
||||
set -eu
|
||||
export GNUPGHOME=${HOMEDIR}
|
||||
COMMAND=\$*
|
||||
if [ -z \"\${COMMAND}\" ]
|
||||
then
|
||||
\${SHELL}
|
||||
else
|
||||
\${COMMAND}
|
||||
fi
|
||||
" > "${HOMEDIR}/env"
|
||||
chmod u+x "${HOMEDIR}/env"
|
||||
|
||||
# Load agent and make sure it responds with the new identity
|
||||
GNUPGHOME="$HOMEDIR" gpg2 -K 2> /dev/null
|
||||
|
||||
@@ -1,16 +0,0 @@
|
||||
#!/bin/bash
|
||||
set -eu
|
||||
|
||||
export GNUPGHOME=~/.gnupg/trezor
|
||||
|
||||
# Make sure that the device is unlocked before starting the shell
|
||||
trezor-gpg-unlock
|
||||
|
||||
COMMAND=$*
|
||||
if [ -z "${COMMAND}" ]
|
||||
then
|
||||
gpg2 --list-public-keys
|
||||
${SHELL}
|
||||
else
|
||||
${COMMAND}
|
||||
fi
|
||||
17
setup.py
Normal file → Executable file
17
setup.py
Normal file → Executable file
@@ -2,15 +2,14 @@
|
||||
from setuptools import setup
|
||||
|
||||
setup(
|
||||
name='trezor_agent',
|
||||
version='0.8.0',
|
||||
description='Using Trezor as hardware SSH agent',
|
||||
name='libagent',
|
||||
version='0.9.0',
|
||||
description='Using hardware wallets as SSH/GPG agent',
|
||||
author='Roman Zeyde',
|
||||
author_email='roman.zeyde@gmail.com',
|
||||
url='http://github.com/romanz/trezor-agent',
|
||||
packages=['trezor_agent', 'trezor_agent.device', 'trezor_agent.gpg'],
|
||||
install_requires=['ecdsa>=0.13', 'ed25519>=1.4', 'Cython>=0.23.4', 'protobuf>=3.0.0', 'semver>=2.2',
|
||||
'trezor>=0.7.6', 'keepkey>=0.7.3', 'ledgerblue>=0.1.8'],
|
||||
packages=['libagent', 'libagent.device', 'libagent.gpg', 'libagent.ssh'],
|
||||
install_requires=['ecdsa>=0.13', 'ed25519>=1.4', 'semver>=2.2'],
|
||||
platforms=['POSIX'],
|
||||
classifiers=[
|
||||
'Environment :: Console',
|
||||
@@ -29,10 +28,4 @@ setup(
|
||||
'Topic :: Security',
|
||||
'Topic :: Utilities',
|
||||
],
|
||||
entry_points={'console_scripts': [
|
||||
'trezor-agent = trezor_agent.__main__:run_agent',
|
||||
'trezor-gpg-create = trezor_agent.gpg.__main__:main_create',
|
||||
'trezor-gpg-agent = trezor_agent.gpg.__main__:main_agent',
|
||||
'trezor-gpg-unlock = trezor_agent.gpg.__main__:auto_unlock',
|
||||
]},
|
||||
)
|
||||
|
||||
12
tox.ini
12
tox.ini
@@ -2,6 +2,8 @@
|
||||
envlist = py27,py3
|
||||
[pep8]
|
||||
max-line-length = 100
|
||||
[pep257]
|
||||
add-ignore = D401
|
||||
[testenv]
|
||||
deps=
|
||||
pytest
|
||||
@@ -11,10 +13,12 @@ deps=
|
||||
pylint
|
||||
semver
|
||||
pydocstyle
|
||||
isort
|
||||
commands=
|
||||
pep8 trezor_agent
|
||||
pylint --reports=no --rcfile .pylintrc trezor_agent
|
||||
pydocstyle trezor_agent
|
||||
coverage run --omit='trezor_agent/__main__.py' --source trezor_agent -m py.test -v trezor_agent
|
||||
pep8 libagent
|
||||
isort --skip-glob .tox -c -r libagent
|
||||
pylint --reports=no --rcfile .pylintrc libagent
|
||||
pydocstyle libagent
|
||||
coverage run --source libagent -m py.test -v libagent
|
||||
coverage report
|
||||
coverage html
|
||||
|
||||
@@ -1,27 +0,0 @@
|
||||
"""Cryptographic hardware device management."""
|
||||
|
||||
import logging
|
||||
|
||||
from . import trezor
|
||||
from . import keepkey
|
||||
from . import ledger
|
||||
from . import interface
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
DEVICE_TYPES = [
|
||||
trezor.Trezor,
|
||||
keepkey.KeepKey,
|
||||
ledger.LedgerNanoS,
|
||||
]
|
||||
|
||||
|
||||
def detect():
|
||||
"""Detect the first available device and return it to the user."""
|
||||
for device_type in DEVICE_TYPES:
|
||||
try:
|
||||
with device_type() as d:
|
||||
return d
|
||||
except interface.NotFoundError as e:
|
||||
log.debug('device not found: %s', e)
|
||||
raise IOError('No device found!')
|
||||
@@ -1,9 +0,0 @@
|
||||
"""
|
||||
TREZOR support for ECDSA GPG signatures.
|
||||
|
||||
See these links for more details:
|
||||
- https://www.gnupg.org/faq/whats-new-in-2.1.html
|
||||
- https://tools.ietf.org/html/rfc4880
|
||||
- https://tools.ietf.org/html/rfc6637
|
||||
- https://tools.ietf.org/html/draft-irtf-cfrg-eddsa-05
|
||||
"""
|
||||
Reference in New Issue
Block a user