Compare commits

...

47 Commits

Author SHA1 Message Date
Roman Zeyde
4f05d51e9b bump version 2017-07-16 15:29:49 +03:00
Roman Zeyde
9d38c26a0f gpg: add 'fake' device for GPG-integration testing 2017-07-16 15:29:47 +03:00
Roman Zeyde
3a9330b995 gpg: return SCD version from agent 2017-07-16 15:29:21 +03:00
Roman Zeyde
f904aac92e Allow unicode in identity string for SSH and GPG 2017-06-24 14:26:51 +03:00
Roman Zeyde
ca67923fe8 INSTALL: add instructions for Fedora/RedHat 2017-06-15 21:13:42 +03:00
Roman Zeyde
ce90e61eb2 README: add troubleshooting section for GPG 2017-06-14 21:49:20 +03:00
Roman Zeyde
90dc124e8d install: note '~/.local/bin' issue with pip 2017-06-12 22:39:11 +03:00
Roman Zeyde
442bf725ef gpg: fail SCD-related requests 2017-06-07 23:11:57 +03:00
Roman Zeyde
5820480052 GPG: set default user ID for signing during gpg-init 2017-06-05 22:04:43 +03:00
Roman Zeyde
ae2a84e168 INSTALL: comment about Ledger's 'SSH/PGP Agent' app requirement 2017-05-26 09:59:23 +03:00
Roman Zeyde
f6911a0016 pin: use PyQt only when running with no TTY 2017-05-21 21:03:04 +03:00
Roman Zeyde
69c54eb425 device: allow Qt-based PIN entry for Trezor/Keepkey 2017-05-21 17:29:57 +03:00
Roman Zeyde
931573f32b gpg: echo during identity initialization 2017-05-20 12:22:10 +03:00
Roman Zeyde
1e8363d4fc gpg: refactor client usage at agent module
This allows caching public keys (as done in the SSH agent).
2017-05-20 12:22:08 +03:00
Roman Zeyde
b7113083b4 README: add 'git config' for enabling commit signing 2017-05-19 21:29:57 +03:00
Roman Zeyde
b5c4eca0d2 README: elaborate trezor/ledger usage for GPG 2017-05-17 22:01:59 +03:00
Roman Zeyde
8aa08d0862 INSTALL: remove duplicate line 2017-05-17 21:53:35 +03:00
Roman Zeyde
b452b49f4c README: note qtpass for password management 2017-05-15 20:13:13 +03:00
Roman Zeyde
639c4efb6d README: add links to products 2017-05-14 21:08:55 +03:00
Roman Zeyde
6f1686c614 README: remove extra parentheses 2017-05-14 14:06:19 +03:00
Roman Zeyde
300d9a7140 README: update GPG screencasts and other examples 2017-05-14 10:58:56 +03:00
Roman Zeyde
b143bafc70 README: remove PyPI badges 2017-05-14 10:51:07 +03:00
Roman Zeyde
f2c6b6b9c1 README: move installation to a separate file 2017-05-14 10:44:50 +03:00
Roman Zeyde
e0507b1508 tests: cover file-based logging case 2017-05-14 10:04:01 +03:00
Roman Zeyde
85274d8374 bump version 2017-05-13 13:18:25 +03:00
Roman Zeyde
f358ca29d4 Allow loading previously exported SSH public keys from a file 2017-05-13 12:47:48 +03:00
Roman Zeyde
53d43cba29 gpg: update README after re-packaging 2017-05-06 21:13:07 +03:00
Roman Zeyde
214b556f83 ssh: simplify argparse creation 2017-05-06 20:55:38 +03:00
Roman Zeyde
051a3fd4ab ssh: remove unused git-related code 2017-05-06 20:40:23 +03:00
Roman Zeyde
91050ee64a ssh: remove unused git wrapper 2017-05-06 15:02:46 +03:00
Roman Zeyde
257992d04c ssh: move related code to a separate subdirectory 2017-05-05 11:22:00 +03:00
Roman Zeyde
6c2273387d util: extend logging.basicConfig() 2017-05-03 21:30:33 +03:00
Roman Zeyde
b6ad8207ba setup: add Python 3.6 support 2017-05-01 21:37:32 +03:00
Roman Zeyde
3a93fc859d setup: fix indentation for lists 2017-05-01 21:35:05 +03:00
Roman Zeyde
7d9b3ff1d0 README: update device-related info 2017-04-29 20:51:18 +03:00
Roman Zeyde
4af881b3cb Split the package into a shared library and separate per-device packages 2017-04-29 18:34:46 +03:00
Roman Zeyde
eb525e1b62 gpg: simplify Python entry point and refactor Bash scripts a bit
Now there is a single 'trezor-gpg' tool, with various subcommands.
2017-04-26 23:12:09 +03:00
Roman Zeyde
02c8e729b7 ssh: retrieve all keys using a single device session 2017-04-25 20:43:19 +03:00
Roman Zeyde
12359938ad keepkey: fix transport import 2017-04-23 21:27:02 +03:00
Roman Zeyde
93cd3e688b travis: add Python 3.6 2017-04-22 20:19:06 +03:00
Tomás Rojas
26d7dd3124 Cache public keys for the duration of the agent
This saves a lot of time when connecting to multiple hosts
simultaneously (e.g., during a deploy) as every time we are asked to sign a
challenge, all public keys are iterated to find the correct one. This
can become especially slow when using the Bridge transport and/or many
identities are defined.
2017-04-22 14:47:30 +03:00
Tomás Rojas
0d5c3a9ca7 Allow using TREZOR bridge (instead of HID transport) 2017-04-22 11:16:30 +03:00
Roman Zeyde
97ec6b2719 travis: fix dependency 2017-04-22 10:53:39 +03:00
Roman Zeyde
8ba9be1780 fix pylint warnings 2017-04-21 22:56:50 +03:00
Roman Zeyde
b2bc87c0c7 fix pydocstyle warnings 2017-04-21 22:51:11 +03:00
Timothy Hobbs
d522d148ef Connection error is confusing (#105)
Hi,

I ran into this connection error:

````
> trezor-agent  timothy@localhost
2017-04-10 00:22:01,818 ERROR        Connection error: open failed                                                                        [__main__.py:130]
````

I didn't know what was going on, whether there was a problem connecting to localhost or what. I eventually logged into trezor wallet and found that there too, my device was not recognized (probably because I did not unplug it/replug it after updating the HID settings.) Unplugging it and replugging it fixed everything.
2017-04-10 10:40:27 +03:00
Roman Zeyde
c796a3b01d README: password manager usage example 2017-03-28 21:21:35 +03:00
64 changed files with 928 additions and 533 deletions

View File

@@ -1,2 +1,2 @@
[MESSAGES CONTROL]
disable=invalid-name, missing-docstring, locally-disabled, unbalanced-tuple-unpacking
disable=invalid-name, missing-docstring, locally-disabled, unbalanced-tuple-unpacking,no-else-return

View File

@@ -4,29 +4,23 @@ python:
- "2.7"
- "3.4"
- "3.5"
- "3.6"
cache:
directories:
- $HOME/.cache/pip
addons:
apt:
packages:
- libudev-dev
- libusb-1.0-0-dev
before_install:
- pip install -U setuptools pylint coverage pep8 pydocstyle "pip>=7.0" wheel
- pip install -e git+https://github.com/keepkey/python-keepkey@6e8baa8b935e830d05f87b6dfd9bc7c927a96dc3#egg=keepkey
install:
- pip install -e .
script:
- pep8 trezor_agent
- pylint --reports=no --rcfile .pylintrc trezor_agent
- pydocstyle trezor_agent
- coverage run --source trezor_agent/ -m py.test -v
- pep8 libagent
- pylint --reports=no --rcfile .pylintrc libagent
- pydocstyle libagent
- coverage run --source libagent/ -m py.test -v
after_success:
- coverage report

70
INSTALL.md Normal file
View File

@@ -0,0 +1,70 @@
# Installation
Install the following packages (depending on your distribution):
### Debian
$ apt update && apt upgrade
$ apt install python-pip python-dev libusb-1.0-0-dev libudev-dev
### Fedora/RedHat
$ yum update
$ yum install python-pip python-devel libusb-devel libudev-devel \
gcc redhat-rpm-config
Also, update Python packages before starting the installation:
$ pip install -U setuptools pip
Make sure you are running the latest firmware version on your hardware device.
Currently the following firmware versions are supported:
* [TREZOR](https://wallet.trezor.io/data/firmware/releases.json): `1.4.2+`
* [KeepKey](https://github.com/keepkey/keepkey-firmware/releases): `3.0.17+`
* [Ledger Nano S](https://github.com/LedgerHQ/blue-app-ssh-agent): `0.0.3+` (install [SSH/PGP Agent](https://www.ledgerwallet.com/images/apps/chrome-mngr-apps.png) app)
## TREZOR
Make sure that your `udev` rules are configured [correctly](https://doc.satoshilabs.com/trezor-user/settingupchromeonlinux.html#manual-configuration-of-udev-rules).
Then, install the latest [trezor_agent](https://pypi.python.org/pypi/trezor_agent) package:
$ pip install trezor_agent
Or, directly from the latest source code:
$ git clone https://github.com/romanz/trezor-agent
$ pip install --user -e trezor-agent/agents/trezor
## KeepKey
Make sure that your `udev` rules are configured [correctly](https://support.keepkey.com/support/solutions/articles/6000037796-keepkey-wallet-is-not-being-recognized-by-linux).
Then, install the latest [keepkey_agent](https://pypi.python.org/pypi/keepkey_agent) package:
$ pip install keepkey_agent
Or, directly from the latest source code:
$ git clone https://github.com/romanz/trezor-agent
$ pip install --user -e trezor-agent/agents/keepkey
## Ledger Nano S
Make sure that your `udev` rules are configured [correctly](http://support.ledgerwallet.com/knowledge_base/topics/ledger-wallet-is-not-recognized-on-linux).
Then, install the latest [ledger_agent](https://pypi.python.org/pypi/ledger_agent) package:
$ pip install ledger_agent
Or, directly from the latest source code:
$ git clone https://github.com/romanz/trezor-agent
$ pip install --user -e trezor-agent/agents/ledger
## Troubleshooting
If there is an import problem with the installed `protobuf` package,
see [this issue](https://github.com/romanz/trezor-agent/issues/28) for fixing it.
If you can't find the command-line utilities (after running `pip install --user`),
please make sure that `~/.local/bin` is on your `PATH` variable
(see a [relevant](https://github.com/pypa/pip/issues/3813) issue).

View File

@@ -1,5 +1,10 @@
Note: the GPG-related code is still under development, so please try the current implementation
and feel free to [report any issue](https://github.com/romanz/trezor-agent/issues) you have encountered.
and please let me [know](https://github.com/romanz/trezor-agent/issues/new) if something doesn't
work well for you. If possible:
* record the session (e.g. using [asciinema](asciinema.org))
* attach the GPG agent log from `~/.gnupg/{trezor,ledger}/gpg-agent.log`
Thanks!
# Installation
@@ -16,36 +21,82 @@ gpg (GnuPG) 2.1.15
This GPG version is included in [Ubuntu 16.04](https://launchpad.net/ubuntu/+source/gnupg2)
and [Linux Mint 18](https://community.linuxmint.com/software/view/gnupg2).
Update you TREZOR firmware to the latest version (at least v1.4.0).
Update you device firmware to the latest version and install your specific `agent` package:
Install latest `trezor-agent` package from GitHub:
```
$ pip install --user git+https://github.com/romanz/trezor-agent.git
$ pip install --user (trezor|keepkey|ledger)_agent
```
# Quickstart
## Identity creation
[![asciicast](https://asciinema.org/a/c2yodst21h9obttkn9wgf3783.png)](https://asciinema.org/a/c2yodst21h9obttkn9wgf3783)
[![asciicast](https://asciinema.org/a/90416.png)](https://asciinema.org/a/90416)
In order to use specific device type for GPG indentity creation, use either command:
```
$ DEVICE=(trezor,ledger) ./scripts/gpg-init "John Doe <john@doe.bit>"
```
## Sample usage (signature and decryption)
[![asciicast](https://asciinema.org/a/7x0h9tyoyu5ar6jc8y9oih0ba.png)](https://asciinema.org/a/7x0h9tyoyu5ar6jc8y9oih0ba)
[![asciicast](https://asciinema.org/a/120441.png)](https://asciinema.org/a/120441)
In order to use specific device type for GPG operations, set the following environment variable to either:
```
$ export GNUPGHOME=~/.gnupg/{trezor,ledger}
```
You can use GNU Privacy Assistant (GPA) in order to inspect the created keys
and perform signature and decryption operations using:
```
$ sudo apt install gpa
$ ./scripts/gpg-shell gpa
$ GNUPGHOME=~/.gnupg/trezor gpa
```
[![GPA](https://cloud.githubusercontent.com/assets/9900/20224804/053d7474-a849-11e6-87f3-ab07dc536158.png)](https://www.gnupg.org/related_software/swlist.html#gpa)
## Git commit & tag signatures:
Git can use GPG to sign and verify commits and tags (see [here](https://git-scm.com/book/en/v2/Git-Tools-Signing-Your-Work)):
```
$ git config --local commit.gpgsign 1
$ git config --local gpg.program $(which gpg2)
$ git commit --gpg-sign # create GPG-signed commit
$ git log --show-signature -1 # verify commit signature
$ git tag v1.2.3 --sign # create GPG-signed tag
$ git tag v1.2.3 --verify # verify tag signature
```
## Password manager
First install `pass` from [passwordstore.org](https://www.passwordstore.org/) and initialize it to use your TREZOR-based GPG identity:
```
$ export GNUPGHOME=~/.gnupg/trezor
$ pass init "Roman Zeyde <roman.zeyde@gmail.com>"
Password store initialized for Roman Zeyde <roman.zeyde@gmail.com>
```
Then, you can generate truly random passwords and save them encrypted using your public key (as separate `.gpg` files under `~/.password-store/`):
```
$ pass generate Dev/github 32
$ pass generate Social/hackernews 32
$ pass generate Social/twitter 32
$ pass generate VPS/linode 32
$ pass
Password Store
├── Dev
│   └── github
├── Social
│   ├── hackernews
│   └── twitter
└── VPS
└── linode
```
In order to paste them into the browser, you'd need to decrypt the password using your hardware device:
```
$ pass --clip VPS/linode
Copied VPS/linode to clipboard. Will clear in 45 seconds.
```
You can also use the following [Qt-based UI](https://qtpass.org/) for `pass`:
```
$ sudo apt install qtpass
$ GNUPGHOME=~/.gnupg/trezor qtpass
```

View File

@@ -1,10 +1,7 @@
# Using TREZOR as a hardware SSH/GPG agent
[![Build Status](https://travis-ci.org/romanz/trezor-agent.svg?branch=master)](https://travis-ci.org/romanz/trezor-agent)
[![Python Versions](https://img.shields.io/pypi/pyversions/trezor_agent.svg)](https://pypi.python.org/pypi/trezor_agent/)
[![Package Version](https://img.shields.io/pypi/v/trezor_agent.svg)](https://pypi.python.org/pypi/trezor_agent/)
[![Development Status](https://img.shields.io/pypi/status/trezor_agent.svg)](https://pypi.python.org/pypi/trezor_agent/)
[![Downloads](https://img.shields.io/pypi/dm/trezor_agent.svg)](https://pypi.python.org/pypi/trezor_agent/)
[![Chat](https://badges.gitter.im/romanz/trezor-agent.svg)](https://gitter.im/romanz/trezor-agent)
See SatoshiLabs' blog posts about this feature:
@@ -14,48 +11,15 @@ See SatoshiLabs' blog posts about this feature:
## Installation
First, make sure that the latest [trezorlib](https://pypi.python.org/pypi/trezor) Python package
is installed correctly (at least v0.6.6):
$ apt-get install python-dev libusb-1.0-0-dev libudev-dev
$ pip install -U setuptools pip
$ pip install Cython trezor
Make sure that your `udev` rules are configured [correctly](https://doc.satoshilabs.com/trezor-user/settingupchromeonlinux.html#manual-configuration-of-udev-rules).
Then, install the latest [trezor_agent](https://pypi.python.org/pypi/trezor_agent) package:
$ pip install trezor_agent
Or, directly from the latest source code (if `pip` doesn't work for you):
$ git clone https://github.com/romanz/trezor-agent && cd trezor-agent
$ python setup.py build && python setup.py install
Finally, verify that you are running the latest [TREZOR firmware](https://wallet.mytrezor.com/data/firmware/releases.json) version (at least v1.4.0):
$ trezorctl get_features | head
vendor: "bitcointrezor.com"
major_version: 1
minor_version: 4
patch_version: 0
...
If you have an error regarding `protobuf` imports (after installing it), please see [this issue](https://github.com/romanz/trezor-agent/issues/28).
See the [following instructions](INSTALL.md) for the
[TREZOR](https://trezor.io/), [Keepkey](https://www.keepkey.com/) and
[Ledger Nano S](https://www.ledgerwallet.com/products/ledger-nano-s) devices.
## Usage
For SSH, see the [following instructions](README-SSH.md) (for Windows support,
see [trezor-ssh-agent](https://github.com/martin-lizner/trezor-ssh-agent) project (by Martin Lízner)).
see [trezor-ssh-agent](https://github.com/martin-lizner/trezor-ssh-agent) project by Martin Lízner).
For GPG, see the [following instructions](README-GPG.md).
See [here](https://github.com/romanz/python-trezor#pin-entering) for PIN entering instructions.
## Troubleshooting
If there is an import problem with the installed `protobuf` package,
see [this issue](https://github.com/romanz/trezor-agent/issues/28) for fixing it.
### Gitter
Questions, suggestions and discussions are welcome: [![Chat](https://badges.gitter.im/romanz/trezor-agent.svg)](https://gitter.im/romanz/trezor-agent)

View File

@@ -0,0 +1,7 @@
import libagent.gpg
import libagent.ssh
from libagent.device.fake_device import FakeDevice as DeviceType
ssh_agent = lambda: libagent.ssh.main(DeviceType)
gpg_tool = lambda: libagent.gpg.main(DeviceType)
gpg_agent = lambda: libagent.gpg.run_agent(DeviceType)

42
agents/fake/setup.py Normal file
View File

@@ -0,0 +1,42 @@
#!/usr/bin/env python
from setuptools import setup
print('NEVER USE THIS CODE FOR REAL-LIFE USE-CASES!!!')
print('ONLY FOR DEBUGGING AND TESTING!!!')
setup(
name='fake_device_agent',
version='0.9.0',
description='Testing trezor_agent with a fake device - NOT SAFE!!!',
author='Roman Zeyde',
author_email='roman.zeyde@gmail.com',
url='http://github.com/romanz/trezor-agent',
scripts=['fake_device_agent.py'],
install_requires=[
'libagent>=0.9.0',
],
platforms=['POSIX'],
classifiers=[
'Environment :: Console',
'Development Status :: 4 - Beta',
'Intended Audience :: Developers',
'Intended Audience :: Information Technology',
'Intended Audience :: System Administrators',
'License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)',
'Operating System :: POSIX',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Topic :: Software Development :: Libraries :: Python Modules',
'Topic :: System :: Networking',
'Topic :: Communications',
'Topic :: Security',
'Topic :: Utilities',
],
entry_points={'console_scripts': [
'fake-device-agent = fake_device_agent:ssh_agent',
'fake-device-gpg = fake_device_agent:gpg_tool',
'fake-device-gpg-agent = fake_device_agent:gpg_agent',
]},
)

View File

@@ -0,0 +1,5 @@
import libagent.gpg
import libagent.ssh
from libagent.device import keepkey
ssh_agent = lambda: libagent.ssh.main(keepkey.KeepKey)

38
agents/keepkey/setup.py Normal file
View File

@@ -0,0 +1,38 @@
#!/usr/bin/env python
from setuptools import setup
setup(
name='keepkey_agent',
version='0.9.0',
description='Using KeepKey as hardware SSH/GPG agent',
author='Roman Zeyde',
author_email='roman.zeyde@gmail.com',
url='http://github.com/romanz/trezor-agent',
scripts=['keepkey_agent.py'],
install_requires=[
'libagent>=0.9.0',
'keepkey>=0.7.3'
],
platforms=['POSIX'],
classifiers=[
'Environment :: Console',
'Development Status :: 4 - Beta',
'Intended Audience :: Developers',
'Intended Audience :: Information Technology',
'Intended Audience :: System Administrators',
'License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)',
'Operating System :: POSIX',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Topic :: Software Development :: Libraries :: Python Modules',
'Topic :: System :: Networking',
'Topic :: Communications',
'Topic :: Security',
'Topic :: Utilities',
],
entry_points={'console_scripts': [
'keepkey-agent = keepkey_agent:ssh_agent',
]},
)

View File

@@ -0,0 +1,7 @@
import libagent.gpg
import libagent.ssh
from libagent.device.ledger import LedgerNanoS as DeviceType
ssh_agent = lambda: libagent.ssh.main(DeviceType)
gpg_tool = lambda: libagent.gpg.main(DeviceType)
gpg_agent = lambda: libagent.gpg.run_agent(DeviceType)

40
agents/ledger/setup.py Normal file
View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python
from setuptools import setup
setup(
name='ledger_agent',
version='0.9.0',
description='Using Ledger as hardware SSH/GPG agent',
author='Roman Zeyde',
author_email='roman.zeyde@gmail.com',
url='http://github.com/romanz/trezor-agent',
scripts=['ledger_agent.py'],
install_requires=[
'libagent>=0.9.0',
'ledgerblue>=0.1.8'
],
platforms=['POSIX'],
classifiers=[
'Environment :: Console',
'Development Status :: 4 - Beta',
'Intended Audience :: Developers',
'Intended Audience :: Information Technology',
'Intended Audience :: System Administrators',
'License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)',
'Operating System :: POSIX',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Topic :: Software Development :: Libraries :: Python Modules',
'Topic :: System :: Networking',
'Topic :: Communications',
'Topic :: Security',
'Topic :: Utilities',
],
entry_points={'console_scripts': [
'ledger-agent = ledger_agent:ssh_agent',
'ledger-gpg = ledger_agent:gpg_tool',
'ledger-gpg-agent = ledger_agent:gpg_agent',
]},
)

40
agents/trezor/setup.py Normal file
View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python
from setuptools import setup
setup(
name='trezor_agent',
version='0.9.0',
description='Using Trezor as hardware SSH/GPG agent',
author='Roman Zeyde',
author_email='roman.zeyde@gmail.com',
url='http://github.com/romanz/trezor-agent',
scripts=['trezor_agent.py'],
install_requires=[
'libagent>=0.9.0',
'trezor>=0.7.6'
],
platforms=['POSIX'],
classifiers=[
'Environment :: Console',
'Development Status :: 4 - Beta',
'Intended Audience :: Developers',
'Intended Audience :: Information Technology',
'Intended Audience :: System Administrators',
'License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)',
'Operating System :: POSIX',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Topic :: Software Development :: Libraries :: Python Modules',
'Topic :: System :: Networking',
'Topic :: Communications',
'Topic :: Security',
'Topic :: Utilities',
],
entry_points={'console_scripts': [
'trezor-agent = trezor_agent:ssh_agent',
'trezor-gpg = trezor_agent:gpg_tool',
'trezor-gpg-agent = trezor_agent:gpg_agent',
]},
)

View File

@@ -0,0 +1,7 @@
import libagent.gpg
import libagent.ssh
from libagent.device.trezor import Trezor as DeviceType
ssh_agent = lambda: libagent.ssh.main(DeviceType)
gpg_tool = lambda: libagent.gpg.main(DeviceType)
gpg_agent = lambda: libagent.gpg.run_agent(DeviceType)

View File

@@ -0,0 +1,3 @@
"""Cryptographic hardware device management."""
from . import interface

View File

@@ -0,0 +1,69 @@
"""Fake device - ONLY FOR TESTS!!! (NEVER USE WITH REAL DATA)."""
import hashlib
import logging
import ecdsa
from . import interface
from .. import formats
log = logging.getLogger(__name__)
def _verify_support(identity):
"""Make sure the device supports given configuration."""
if identity.curve_name not in {formats.CURVE_NIST256}:
raise NotImplementedError(
'Unsupported elliptic curve: {}'.format(identity.curve_name))
class FakeDevice(interface.Device):
"""Connection to TREZOR device."""
def connect(self):
"""Return "dummy" connection."""
log.critical('NEVER USE THIS CODE FOR REAL-LIFE USE-CASES!!!')
log.critical('ONLY FOR DEBUGGING AND TESTING!!!')
# The code below uses HARD-CODED secret key - and should be used ONLY
# for GnuPG integration tests (e.g. when no real device is available).
# pylint: disable=attribute-defined-outside-init
self.secexp = 1
self.sk = ecdsa.SigningKey.from_secret_exponent(
secexp=self.secexp, curve=ecdsa.curves.NIST256p, hashfunc=hashlib.sha256)
self.vk = self.sk.get_verifying_key()
return self
def close(self):
"""Close connection."""
self.conn = None
def pubkey(self, identity, ecdh=False):
"""Return public key."""
_verify_support(identity)
data = self.vk.to_string()
x, y = data[:32], data[32:]
prefix = bytearray([2 + (bytearray(y)[0] & 1)])
return bytes(prefix) + x
def sign(self, identity, blob):
"""Sign given blob and return the signature (as bytes)."""
if identity.identity_dict['proto'] in {'ssh'}:
digest = hashlib.sha256(blob).digest()
else:
digest = blob
return self.sk.sign_digest_deterministic(digest=digest,
hashfunc=hashlib.sha256)
def ecdh(self, identity, pubkey):
"""Get shared session key using Elliptic Curve Diffie-Hellman."""
assert pubkey[:1] == b'\x04'
peer = ecdsa.VerifyingKey.from_string(
pubkey[1:],
curve=ecdsa.curves.NIST256p,
hashfunc=hashlib.sha256)
shared = ecdsa.VerifyingKey.from_public_point(
point=(peer.pubkey.point * self.secexp),
curve=ecdsa.curves.NIST256p,
hashfunc=hashlib.sha256)
return shared.to_string()

View File

@@ -6,6 +6,8 @@ import logging
import re
import struct
import unidecode
from .. import formats, util
log = logging.getLogger(__name__)
@@ -54,7 +56,7 @@ class NotFoundError(Error):
class DeviceError(Error):
""""Error during device operation."""
"""Error during device operation."""
class Identity(object):
@@ -67,7 +69,13 @@ class Identity(object):
def items(self):
"""Return a copy of identity_dict items."""
return self.identity_dict.items()
return [(k, unidecode.unidecode(v))
for k, v in self.identity_dict.items()]
def to_bytes(self):
"""Transliterate Unicode into ASCII."""
s = identity_to_string(self.identity_dict)
return unidecode.unidecode(s).encode('ascii')
def __str__(self):
"""Return identity serialized to string."""
@@ -76,13 +84,13 @@ class Identity(object):
def get_bip32_address(self, ecdh=False):
"""Compute BIP32 derivation address according to SLIP-0013/0017."""
index = struct.pack('<L', self.identity_dict.get('index', 0))
addr = index + identity_to_string(self.identity_dict).encode('ascii')
addr = index + self.to_bytes()
log.debug('bip32 address string: %r', addr)
digest = hashlib.sha256(addr).digest()
s = io.BytesIO(bytearray(digest))
hardened = 0x80000000
addr_0 = [13, 17][bool(ecdh)]
addr_0 = 17 if bool(ecdh) else 13
address_n = [addr_0] + list(util.recv(s, '<LLLL'))
return [(hardened | value) for value in address_n]

View File

@@ -1,9 +1,9 @@
"""KeepKey-related definitions."""
# pylint: disable=unused-import
# pylint: disable=unused-import,import-error
from keepkeylib.client import CallException as Error
from keepkeylib.client import KeepKeyClient as Client
from keepkeylib.messages_pb2 import PassphraseAck
from keepkeylib.transport_hid import HidTransport
from keepkeylib.messages_pb2 import PassphraseAck, PinMatrixAck
from keepkeylib.transport_hid import HidTransport as Transport
from keepkeylib.types_pb2 import IdentityType

View File

@@ -4,7 +4,7 @@ import binascii
import logging
import struct
from ledgerblue import comm
from ledgerblue import comm # pylint: disable=import-error
from . import interface

View File

@@ -3,6 +3,7 @@
import binascii
import logging
import os
import sys
import semver
@@ -17,6 +18,10 @@ class Trezor(interface.Device):
@property
def _defs(self):
from . import trezor_defs
# Allow using TREZOR bridge transport (instead of the HID default)
trezor_defs.Transport = {
'bridge': trezor_defs.BridgeTransport,
}.get(os.environ.get('TREZOR_TRANSPORT'), trezor_defs.HidTransport)
return trezor_defs
required_version = '>=1.4.0'
@@ -29,11 +34,35 @@ class Trezor(interface.Device):
'non-empty' if self.passphrase else 'empty', self)
return self._defs.PassphraseAck(passphrase=self.passphrase)
for d in self._defs.HidTransport.enumerate():
def create_pin_handler(conn):
if os.isatty(sys.stdin.fileno()):
return conn.callback_PinMatrixRequest # CLI-based PIN handler
def qt_handler(_):
# pylint: disable=import-error
from PyQt5.QtWidgets import QApplication, QInputDialog, QLineEdit
label = ('Use the numeric keypad to describe number positions.\n'
'The layout is:\n'
' 7 8 9\n'
' 4 5 6\n'
' 1 2 3\n'
'Please enter PIN:')
app = QApplication([])
qd = QInputDialog()
qd.setTextEchoMode(QLineEdit.Password)
qd.setLabelText(label)
qd.show()
app.exec_()
return self._defs.PinMatrixAck(pin=qd.textValue())
return qt_handler
for d in self._defs.Transport.enumerate():
log.debug('endpoint: %s', d)
transport = self._defs.HidTransport(d)
transport = self._defs.Transport(d)
connection = self._defs.Client(transport)
connection.callback_PassphraseRequest = passphrase_handler
connection.callback_PinMatrixRequest = create_pin_handler(connection)
f = connection.features
log.debug('connected to %s %s', self, f.device_id)
log.debug('label : %s', f.label)

View File

@@ -1,9 +1,10 @@
"""TREZOR-related definitions."""
# pylint: disable=unused-import
# pylint: disable=unused-import,import-error
from trezorlib.client import CallException as Error
from trezorlib.client import TrezorClient as Client
from trezorlib.messages_pb2 import PassphraseAck
from trezorlib.messages_pb2 import PassphraseAck, PinMatrixAck
from trezorlib.transport_bridge import BridgeTransport
from trezorlib.transport_hid import HidTransport
from trezorlib.types_pb2 import IdentityType

View File

@@ -193,7 +193,7 @@ def import_public_key(line):
file_type, base64blob, name = line.split()
blob = base64.b64decode(base64blob)
result = parse_pubkey(blob)
result['name'] = name.encode('ascii')
result['name'] = name.encode('utf-8')
assert result['type'] == file_type.encode('ascii')
log.debug('loaded %s public key: %s', file_type, result['fingerprint'])
return result

View File

@@ -1,7 +1,16 @@
#!/usr/bin/env python
"""Create signatures and export public keys for GPG using TREZOR."""
"""
TREZOR support for ECDSA GPG signatures.
See these links for more details:
- https://www.gnupg.org/faq/whats-new-in-2.1.html
- https://tools.ietf.org/html/rfc4880
- https://tools.ietf.org/html/rfc6637
- https://tools.ietf.org/html/draft-irtf-cfrg-eddsa-05
"""
import argparse
import contextlib
import functools
import logging
import os
import sys
@@ -15,14 +24,17 @@ from .. import device, formats, server, util
log = logging.getLogger(__name__)
def run_create(args):
def export_public_key(device_type, args):
"""Generate a new pubkey for a new/existing GPG identity."""
log.warning('NOTE: in order to re-generate the exact same GPG key later, '
'run this command with "--time=%d" commandline flag (to set '
'the timestamp of the GPG key manually).', args.time)
d = client.Client(user_id=args.user_id, curve_name=args.ecdsa_curve)
verifying_key = d.pubkey(ecdh=False)
decryption_key = d.pubkey(ecdh=True)
c = client.Client(device=device_type())
identity = client.create_identity(user_id=args.user_id,
curve_name=args.ecdsa_curve)
verifying_key = c.pubkey(identity=identity, ecdh=False)
decryption_key = c.pubkey(identity=identity, ecdh=True)
signer_func = functools.partial(c.sign, identity=identity)
if args.subkey: # add as subkey
log.info('adding %s GPG subkey for "%s" to existing key',
@@ -38,10 +50,10 @@ def run_create(args):
primary_bytes = keyring.export_public_key(args.user_id)
result = encode.create_subkey(primary_bytes=primary_bytes,
subkey=signing_key,
signer_func=d.sign)
signer_func=signer_func)
result = encode.create_subkey(primary_bytes=result,
subkey=encryption_key,
signer_func=d.sign)
signer_func=signer_func)
else: # add as primary
log.info('creating new %s GPG primary key for "%s"',
args.ecdsa_curve, args.user_id)
@@ -56,24 +68,16 @@ def run_create(args):
result = encode.create_primary(user_id=args.user_id,
pubkey=primary,
signer_func=d.sign)
signer_func=signer_func)
result = encode.create_subkey(primary_bytes=result,
subkey=subkey,
signer_func=d.sign)
signer_func=signer_func)
sys.stdout.write(protocol.armor(result, 'PUBLIC KEY BLOCK'))
def main_create():
"""Main function for GPG identity creation."""
p = argparse.ArgumentParser()
p.add_argument('user_id')
p.add_argument('-e', '--ecdsa-curve', default='nist256p1')
p.add_argument('-t', '--time', type=int, default=int(time.time()))
p.add_argument('-v', '--verbose', default=0, action='count')
p.add_argument('-s', '--subkey', default=False, action='store_true')
args = p.parse_args()
def run_create(device_type, args):
"""Export public GPG key."""
util.setup_logging(verbosity=args.verbose)
log.warning('This GPG tool is still in EXPERIMENTAL mode, '
'so please note that the API and features may '
@@ -82,19 +86,27 @@ def main_create():
existing_gpg = keyring.gpg_version().decode('ascii')
required_gpg = '>=2.1.11'
if semver.match(existing_gpg, required_gpg):
run_create(args)
export_public_key(device_type, args)
else:
log.error('Existing gpg2 has version "%s" (%s required)',
existing_gpg, required_gpg)
def main_agent():
def run_unlock(device_type, args):
"""Unlock hardware device (for future interaction)."""
util.setup_logging(verbosity=args.verbose)
with device_type() as d:
log.info('unlocked %s device', d)
def run_agent(device_type):
"""Run a simple GPG-agent server."""
home_dir = os.environ.get('GNUPGHOME', os.path.expanduser('~/.gnupg/trezor'))
config_file = os.path.join(home_dir, 'gpg-agent.conf')
if not os.path.exists(config_file):
msg = 'No configuration file found: {}'.format(config_file)
raise IOError(msg)
parser = argparse.ArgumentParser()
parser.add_argument('--homedir', default=os.environ.get('GNUPGHOME'))
args, _ = parser.parse_known_args()
assert args.homedir
config_file = os.path.join(args.homedir, 'gpg-agent.conf')
lines = (line.strip() for line in open(config_file))
lines = (line for line in lines if line and not line.startswith('#'))
@@ -103,24 +115,35 @@ def main_agent():
util.setup_logging(verbosity=int(config['verbosity']),
filename=config['log-file'])
sock_path = keyring.get_agent_sock_path()
handler = agent.Handler(device=device_type())
with server.unix_domain_socket_server(sock_path) as sock:
for conn in agent.yield_connections(sock):
with contextlib.closing(conn):
try:
agent.handle_connection(conn)
except StopIteration:
handler.handle(conn)
except agent.AgentStop:
log.info('stopping gpg-agent')
return
except Exception as e: # pylint: disable=broad-except
log.exception('gpg-agent failed: %s', e)
def auto_unlock():
"""Automatically unlock first found device (used for `gpg-shell`)."""
p = argparse.ArgumentParser()
p.add_argument('-v', '--verbose', default=0, action='count')
def main(device_type):
"""Parse command-line arguments."""
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers()
args = p.parse_args()
util.setup_logging(verbosity=args.verbose)
d = device.detect()
log.info('unlocked %s device', d)
p = subparsers.add_parser('create', help='Export public GPG key')
p.add_argument('user_id')
p.add_argument('-e', '--ecdsa-curve', default='nist256p1')
p.add_argument('-t', '--time', type=int, default=int(time.time()))
p.add_argument('-v', '--verbose', default=0, action='count')
p.add_argument('-s', '--subkey', default=False, action='store_true')
p.set_defaults(func=run_create)
p = subparsers.add_parser('unlock', help='Unlock the hardware device')
p.add_argument('-v', '--verbose', default=0, action='count')
p.set_defaults(func=run_unlock)
args = parser.parse_args()
return args.func(device_type=device_type, args=args)

209
libagent/gpg/agent.py Normal file
View File

@@ -0,0 +1,209 @@
"""GPG-agent utilities."""
import binascii
import logging
from . import client, decode, keyring, protocol
from .. import util
log = logging.getLogger(__name__)
def yield_connections(sock):
"""Run a server on the specified socket."""
while True:
log.debug('waiting for connection on %s', sock.getsockname())
try:
conn, _ = sock.accept()
except KeyboardInterrupt:
return
conn.settimeout(None)
log.debug('accepted connection on %s', sock.getsockname())
yield conn
def serialize(data):
"""Serialize data according to ASSUAN protocol."""
for c in [b'%', b'\n', b'\r']:
escaped = '%{:02X}'.format(ord(c)).encode('ascii')
data = data.replace(c, escaped)
return data
def sig_encode(r, s):
"""Serialize ECDSA signature data into GPG S-expression."""
r = serialize(util.num2bytes(r, 32))
s = serialize(util.num2bytes(s, 32))
return b'(7:sig-val(5:ecdsa(1:r32:' + r + b')(1:s32:' + s + b')))'
def _serialize_point(data):
prefix = '{}:'.format(len(data)).encode('ascii')
# https://www.gnupg.org/documentation/manuals/assuan/Server-responses.html
return b'(5:value' + serialize(prefix + data) + b')'
def parse_ecdh(line):
"""Parse ECDH request and return remote public key."""
prefix, line = line.split(b' ', 1)
assert prefix == b'D'
exp, leftover = keyring.parse(keyring.unescape(line))
log.debug('ECDH s-exp: %r', exp)
assert not leftover
label, exp = exp
assert label == b'enc-val'
assert exp[0] == b'ecdh'
items = exp[1:]
log.debug('ECDH parameters: %r', items)
return dict(items)[b'e']
class AgentError(Exception):
"""GnuPG agent-related error."""
class AgentStop(Exception):
"""Raised to close the agent."""
class Handler(object):
"""GPG agent requests' handler."""
def __init__(self, device):
"""C-tor."""
self.client = client.Client(device=device)
# Cache ASSUAN commands' arguments between commands
self.keygrip = None
self.digest = None
self.algo = None
# Cache public keys from GnuPG
self.pubkey_bytes = keyring.export_public_keys()
# "Clone" existing GPG version
self.version = keyring.gpg_version()
self.handlers = {
b'RESET': None,
b'OPTION': None,
b'SETKEYDESC': None,
b'GETINFO': lambda conn, _: keyring.sendline(conn, b'D ' + self.version),
b'AGENT_ID': lambda conn, _: keyring.sendline(conn, b'D TREZOR'), # "Fake" agent ID
b'SIGKEY': lambda _, args: self.set_key(*args),
b'SETKEY': lambda _, args: self.set_key(*args),
b'SETHASH': lambda _, args: self.set_hash(*args),
b'PKSIGN': lambda conn, _: self.pksign(conn),
b'PKDECRYPT': lambda conn, _: self.pkdecrypt(conn),
b'HAVEKEY': lambda _, args: self.have_key(*args),
b'KEYINFO': lambda conn, _: self.key_info(conn),
b'SCD': self.handle_scd,
}
def handle_scd(self, conn, args):
"""No support for smart-card device protocol."""
reply = {
(b'GETINFO', b'version'): self.version,
}.get(args)
if reply is None:
raise AgentError(b'ERR 100696144 No such device <SCD>')
keyring.sendline(conn, b'D ' + reply)
@util.memoize
def get_identity(self, keygrip):
"""
Returns device.interface.Identity that matches specified keygrip.
In case of missing keygrip, KeyError will be raised.
"""
keygrip_bytes = binascii.unhexlify(keygrip)
pubkey_dict, user_ids = decode.load_by_keygrip(
pubkey_bytes=self.pubkey_bytes, keygrip=keygrip_bytes)
# We assume the first user ID is used to generate TREZOR-based GPG keys.
user_id = user_ids[0]['value'].decode('utf-8')
curve_name = protocol.get_curve_name_by_oid(pubkey_dict['curve_oid'])
ecdh = (pubkey_dict['algo'] == protocol.ECDH_ALGO_ID)
identity = client.create_identity(user_id=user_id, curve_name=curve_name)
verifying_key = self.client.pubkey(identity=identity, ecdh=ecdh)
pubkey = protocol.PublicKey(
curve_name=curve_name, created=pubkey_dict['created'],
verifying_key=verifying_key, ecdh=ecdh)
assert pubkey.key_id() == pubkey_dict['key_id']
assert pubkey.keygrip() == keygrip_bytes
return identity
def pksign(self, conn):
"""Sign a message digest using a private EC key."""
log.debug('signing %r digest (algo #%s)', self.digest, self.algo)
identity = self.get_identity(keygrip=self.keygrip)
r, s = self.client.sign(identity=identity,
digest=binascii.unhexlify(self.digest))
result = sig_encode(r, s)
log.debug('result: %r', result)
keyring.sendline(conn, b'D ' + result)
def pkdecrypt(self, conn):
"""Handle decryption using ECDH."""
for msg in [b'S INQUIRE_MAXLEN 4096', b'INQUIRE CIPHERTEXT']:
keyring.sendline(conn, msg)
line = keyring.recvline(conn)
assert keyring.recvline(conn) == b'END'
remote_pubkey = parse_ecdh(line)
identity = self.get_identity(keygrip=self.keygrip)
ec_point = self.client.ecdh(identity=identity, pubkey=remote_pubkey)
keyring.sendline(conn, b'D ' + _serialize_point(ec_point))
@util.memoize
def have_key(self, *keygrips):
"""Check if current keygrip correspond to a TREZOR-based key."""
try:
self.get_identity(keygrip=keygrips[0])
except KeyError as e:
log.warning('HAVEKEY(%s) failed: %s', keygrips, e)
raise AgentError(b'ERR 67108881 No secret key <GPG Agent>')
def key_info(self, conn):
"""
Dummy reply (mainly for 'gpg --edit' to succeed).
For details, see GnuPG agent KEYINFO command help.
https://git.gnupg.org/cgi-bin/gitweb.cgi?p=gnupg.git;a=blob;f=agent/command.c;h=c8b34e9882076b1b724346787781f657cac75499;hb=refs/heads/master#l1082
"""
fmt = 'S KEYINFO {0} X - - - - - - -'
keyring.sendline(conn, fmt.format(self.keygrip).encode('ascii'))
def set_key(self, keygrip):
"""Set hexadecimal keygrip for next operation."""
self.keygrip = keygrip
def set_hash(self, algo, digest):
"""Set algorithm ID and hexadecimal digest for next operation."""
self.algo = algo
self.digest = digest
def handle(self, conn):
"""Handle connection from GPG binary using the ASSUAN protocol."""
keyring.sendline(conn, b'OK')
for line in keyring.iterlines(conn):
parts = line.split(b' ')
command = parts[0]
args = tuple(parts[1:])
if command == b'BYE':
return
elif command == b'KILLAGENT':
keyring.sendline(conn, b'OK')
raise AgentStop()
if command not in self.handlers:
log.error('unknown request: %r', line)
continue
handler = self.handlers[command]
if handler:
try:
handler(conn, args)
except AgentError as e:
msg, = e.args
keyring.sendline(conn, msg)
continue
keyring.sendline(conn, b'OK')

48
libagent/gpg/client.py Normal file
View File

@@ -0,0 +1,48 @@
"""Device abstraction layer for GPG operations."""
import logging
from .. import formats, util
from ..device import interface
log = logging.getLogger(__name__)
def create_identity(user_id, curve_name):
"""Create GPG identity for hardware device."""
result = interface.Identity(identity_str='gpg://', curve_name=curve_name)
result.identity_dict['host'] = user_id
return result
class Client(object):
"""Sign messages and get public keys from a hardware device."""
def __init__(self, device):
"""C-tor."""
self.device = device
def pubkey(self, identity, ecdh=False):
"""Return public key as VerifyingKey object."""
with self.device:
pubkey = self.device.pubkey(ecdh=ecdh, identity=identity)
return formats.decompress_pubkey(
pubkey=pubkey, curve_name=identity.curve_name)
def sign(self, identity, digest):
"""Sign the digest and return a serialized signature."""
log.info('please confirm GPG signature on %s for "%s"...',
self.device, identity)
if identity.curve_name == formats.CURVE_NIST256:
digest = digest[:32] # sign the first 256 bits
log.debug('signing digest: %s', util.hexlify(digest))
with self.device:
sig = self.device.sign(blob=digest, identity=identity)
return (util.bytes2num(sig[:32]), util.bytes2num(sig[32:]))
def ecdh(self, identity, pubkey):
"""Derive shared secret using ECDH from remote public key."""
log.info('please confirm GPG decryption on %s for "%s"...',
self.device, identity)
with self.device:
return self.device.ecdh(pubkey=pubkey, identity=identity)

View File

@@ -12,12 +12,10 @@ def create_primary(user_id, pubkey, signer_func, secret_bytes=b''):
"""Export new primary GPG public key, ready for "gpg2 --import"."""
pubkey_packet = protocol.packet(tag=(5 if secret_bytes else 6),
blob=(pubkey.data() + secret_bytes))
user_id_packet = protocol.packet(tag=13,
blob=user_id.encode('ascii'))
data_to_sign = (pubkey.data_to_hash() +
user_id_packet[:1] +
util.prefix_len('>L', user_id.encode('ascii')))
user_id_bytes = user_id.encode('utf-8')
user_id_packet = protocol.packet(tag=13, blob=user_id_bytes)
data_to_sign = (pubkey.data_to_hash() + user_id_packet[:1] +
util.prefix_len('>L', user_id_bytes))
hashed_subpackets = [
protocol.subpacket_time(pubkey.created), # signature time
# https://tools.ietf.org/html/rfc4880#section-5.2.3.7

View File

@@ -41,8 +41,7 @@ def public_key_path(request):
def test_gpg_files(public_key_path): # pylint: disable=redefined-outer-name
with open(public_key_path, 'rb') as f:
packets = list(decode.parse_packets(f))
assert len(packets) > 0
assert list(decode.parse_packets(f))
def test_has_custom_subpacket():

View File

@@ -1,19 +1,15 @@
"""UNIX-domain socket server for ssh-agent implementation."""
import contextlib
import functools
import logging
import os
import socket
import subprocess
import tempfile
import threading
from . import util
log = logging.getLogger(__name__)
UNIX_SOCKET_TIMEOUT = 0.1
def remove_file(path, remove=os.remove, exists=os.path.exists):
"""Remove file, and raise OSError if still exists."""
@@ -114,39 +110,6 @@ def spawn(func, kwargs):
t.join()
@contextlib.contextmanager
def serve(handler, sock_path=None, timeout=UNIX_SOCKET_TIMEOUT):
"""
Start the ssh-agent server on a UNIX-domain socket.
If no connection is made during the specified timeout,
retry until the context is over.
"""
ssh_version = subprocess.check_output(['ssh', '-V'],
stderr=subprocess.STDOUT)
log.debug('local SSH version: %r', ssh_version)
if sock_path is None:
sock_path = tempfile.mktemp(prefix='trezor-ssh-agent-')
environ = {'SSH_AUTH_SOCK': sock_path, 'SSH_AGENT_PID': str(os.getpid())}
device_mutex = threading.Lock()
with unix_domain_socket_server(sock_path) as sock:
sock.settimeout(timeout)
quit_event = threading.Event()
handle_conn = functools.partial(handle_connection,
handler=handler,
mutex=device_mutex)
kwargs = dict(sock=sock,
handle_conn=handle_conn,
quit_event=quit_event)
with spawn(server_thread, kwargs):
try:
yield environ
finally:
log.debug('closing server')
quit_event.set()
def run_process(command, environ):
"""
Run the specified process and wait until it finishes.

View File

@@ -1,16 +1,24 @@
"""SSH-agent implementation using hardware authentication devices."""
import argparse
import contextlib
import functools
import io
import logging
import os
import re
import subprocess
import sys
import tempfile
import threading
from . import client, device, formats, protocol, server, util
from .. import device, formats, server, util
from . import client, protocol
log = logging.getLogger(__name__)
UNIX_SOCKET_TIMEOUT = 0.1
def ssh_args(label):
"""Create SSH command for connecting specified server."""
@@ -40,8 +48,8 @@ def mosh_args(label):
return args
def create_parser():
"""Create argparse.ArgumentParser for this tool."""
def create_agent_parser():
"""Create an ArgumentParser for this tool."""
p = argparse.ArgumentParser()
p.add_argument('-v', '--verbose', default=0, action='count')
@@ -51,16 +59,10 @@ def create_parser():
default=formats.CURVE_NIST256,
help='specify ECDSA curve name: ' + curve_names)
p.add_argument('--timeout',
default=server.UNIX_SOCKET_TIMEOUT, type=float,
default=UNIX_SOCKET_TIMEOUT, type=float,
help='Timeout for accepting SSH client connections')
p.add_argument('--debug', default=False, action='store_true',
help='Log SSH protocol messages for debugging.')
return p
def create_agent_parser():
"""Specific parser for SSH connection."""
p = create_parser()
g = p.add_mutually_exclusive_group()
g.add_argument('-s', '--shell', default=False, action='store_true',
@@ -77,44 +79,44 @@ def create_agent_parser():
return p
def create_git_parser():
"""Specific parser for git commands."""
p = create_parser()
@contextlib.contextmanager
def serve(handler, sock_path=None, timeout=UNIX_SOCKET_TIMEOUT):
"""
Start the ssh-agent server on a UNIX-domain socket.
p.add_argument('-r', '--remote', default='origin',
help='use this git remote URL to generate SSH identity')
p.add_argument('-t', '--test', action='store_true',
help='test connection using `ssh -T user@host` command')
p.add_argument('command', type=str, nargs='*', metavar='ARGUMENT',
help='Git command to run under the SSH agent')
return p
If no connection is made during the specified timeout,
retry until the context is over.
"""
ssh_version = subprocess.check_output(['ssh', '-V'],
stderr=subprocess.STDOUT)
log.debug('local SSH version: %r', ssh_version)
if sock_path is None:
sock_path = tempfile.mktemp(prefix='trezor-ssh-agent-')
def git_host(remote_name, attributes):
"""Extract git SSH host for specified remote name."""
try:
output = subprocess.check_output('git config --local --list'.split())
except subprocess.CalledProcessError:
return
for attribute in attributes:
name = r'remote.{0}.{1}'.format(remote_name, attribute)
matches = re.findall(re.escape(name) + '=(.*)', output)
log.debug('%r: %r', name, matches)
if not matches:
continue
url = matches[0].strip()
match = re.match('(?P<user>.*?)@(?P<host>.*?):(?P<path>.*)', url)
if match:
return '{user}@{host}'.format(**match.groupdict())
environ = {'SSH_AUTH_SOCK': sock_path, 'SSH_AGENT_PID': str(os.getpid())}
device_mutex = threading.Lock()
with server.unix_domain_socket_server(sock_path) as sock:
sock.settimeout(timeout)
quit_event = threading.Event()
handle_conn = functools.partial(server.handle_connection,
handler=handler,
mutex=device_mutex)
kwargs = dict(sock=sock,
handle_conn=handle_conn,
quit_event=quit_event)
with server.spawn(server.server_thread, kwargs):
try:
yield environ
finally:
log.debug('closing server')
quit_event.set()
def run_server(conn, command, debug, timeout):
"""Common code for run_agent and run_git below."""
try:
handler = protocol.Handler(conn=conn, debug=debug)
with server.serve(handler=handler, timeout=timeout) as env:
with serve(handler=handler, timeout=timeout) as env:
return server.run_process(command=command, environ=env)
except KeyboardInterrupt:
log.info('server stopped')
@@ -127,31 +129,41 @@ def handle_connection_error(func):
try:
return func(*args, **kwargs)
except IOError as e:
log.error('Connection error: %s', e)
log.error('Connection error (try unplugging and replugging your device): %s', e)
return 1
return wrapper
def parse_config(fname):
def parse_config(contents):
"""Parse config file into a list of Identity objects."""
contents = open(fname).read()
for identity_str, curve_name in re.findall(r'\<(.*?)\|(.*?)\>', contents):
yield device.interface.Identity(identity_str=identity_str,
curve_name=curve_name)
def import_public_keys(contents):
"""Load (previously exported) SSH public keys from a file's contents."""
for line in io.StringIO(contents):
# Verify this line represents valid SSH public key
formats.import_public_key(line)
yield line
class JustInTimeConnection(object):
"""Connect to the device just before the needed operation."""
def __init__(self, conn_factory, identities):
def __init__(self, conn_factory, identities, public_keys=None):
"""Create a JIT connection object."""
self.conn_factory = conn_factory
self.identities = identities
self.public_keys_cache = public_keys
def public_keys(self):
"""Return a list of SSH public keys (in textual format)."""
conn = self.conn_factory()
return [conn.get_public_key(i) for i in self.identities]
if not self.public_keys_cache:
conn = self.conn_factory()
self.public_keys_cache = conn.export_public_keys(self.identities)
return self.public_keys_cache
def parse_public_keys(self):
"""Parse SSH public keys into dictionaries."""
@@ -168,13 +180,19 @@ class JustInTimeConnection(object):
@handle_connection_error
def run_agent(client_factory=client.Client):
def main(device_type):
"""Run ssh-agent using given hardware client factory."""
args = create_agent_parser().parse_args()
util.setup_logging(verbosity=args.verbose)
public_keys = None
if args.identity.startswith('/'):
identities = list(parse_config(fname=args.identity))
filename = args.identity
contents = open(filename, 'rb').read().decode('utf-8')
# Allow loading previously exported SSH public keys
if filename.endswith('.pub'):
public_keys = list(import_public_keys(contents))
identities = list(parse_config(contents))
else:
identities = [device.interface.Identity(
identity_str=args.identity, curve_name=args.ecdsa_curve_name)]
@@ -194,8 +212,8 @@ def run_agent(client_factory=client.Client):
command = os.environ['SHELL']
conn = JustInTimeConnection(
conn_factory=lambda: client_factory(device.detect()),
identities=identities)
conn_factory=lambda: client.Client(device_type()),
identities=identities, public_keys=public_keys)
if command:
return run_server(conn=conn, command=command, debug=args.debug,
timeout=args.timeout)

View File

@@ -18,15 +18,17 @@ class Client(object):
"""Connect to hardware device."""
self.device = device
def get_public_key(self, identity):
"""Get SSH public key from the device."""
def export_public_keys(self, identities):
"""Export SSH public keys from the device."""
public_keys = []
with self.device:
pubkey = self.device.pubkey(identity)
vk = formats.decompress_pubkey(pubkey=pubkey,
curve_name=identity.curve_name)
return formats.export_public_key(vk=vk,
label=str(identity))
for i in identities:
pubkey = self.device.pubkey(identity=i)
vk = formats.decompress_pubkey(pubkey=pubkey,
curve_name=i.curve_name)
public_keys.append(formats.export_public_key(vk=vk,
label=str(i)))
return public_keys
def sign_ssh_challenge(self, blob, identity):
"""Sign given blob using a private key on the device."""

View File

@@ -138,7 +138,7 @@ class Handler(object):
else:
raise KeyError('key not found')
label = key['name'].decode('ascii') # label should be a string
label = key['name'].decode('utf-8')
log.debug('signing %d-byte blob with "%s" key', len(blob), label)
try:
signature = self.conn.sign(blob=blob, identity=key['identity'])

View File

@@ -49,7 +49,7 @@ def test_ssh_agent():
identity = device.interface.Identity(identity_str='localhost:22',
curve_name=CURVE)
c = client.Client(device=MockDevice())
assert c.get_public_key(identity) == PUBKEY_TEXT
assert c.export_public_keys([identity]) == [PUBKEY_TEXT]
signature = c.sign_ssh_challenge(blob=BLOB, identity=identity)
key = formats.import_public_key(PUBKEY_TEXT)

View File

@@ -0,0 +1 @@
"""Unit-tests for this package."""

View File

@@ -0,0 +1,7 @@
from ..device import interface
def test_unicode():
i = interface.Identity(u'ko\u017eu\u0161\u010dek@host', 'ed25519')
assert i.to_bytes() == b'kozuscek@host'
assert sorted(i.items()) == [('host', 'host'), ('user', 'kozuscek')]

View File

@@ -8,7 +8,8 @@ import threading
import mock
import pytest
from .. import protocol, server, util
from .. import server, util
from ..ssh import protocol
def test_socket():
@@ -117,12 +118,6 @@ def test_run():
server.run_process([''], environ={})
def test_serve_main():
handler = protocol.Handler(conn=empty_device())
with server.serve(handler=handler, sock_path=None):
pass
def test_remove():
path = 'foo.bar'

View File

@@ -101,7 +101,7 @@ def test_reader():
def test_setup_logging():
util.setup_logging(verbosity=10)
util.setup_logging(verbosity=10, filename='/dev/null')
def test_memoize():

View File

@@ -179,13 +179,22 @@ class Reader(object):
self._captured = None
def setup_logging(verbosity, **kwargs):
def setup_logging(verbosity, filename=None):
"""Configure logging for this tool."""
fmt = ('%(asctime)s %(levelname)-12s %(message)-100s '
'[%(filename)s:%(lineno)d]')
levels = [logging.WARNING, logging.INFO, logging.DEBUG]
level = levels[min(verbosity, len(levels) - 1)]
logging.basicConfig(format=fmt, level=level, **kwargs)
logging.root.setLevel(level)
fmt = logging.Formatter('%(asctime)s %(levelname)-12s %(message)-100s '
'[%(filename)s:%(lineno)d]')
hdlr = logging.StreamHandler() # stderr
hdlr.setFormatter(fmt)
logging.root.addHandler(hdlr)
if filename:
hdlr = logging.FileHandler(filename, 'a')
hdlr.setFormatter(fmt)
logging.root.addHandler(hdlr)
def memoize(func):

View File

@@ -4,31 +4,54 @@ set -eu
gpg2 --version >/dev/null # verify that GnuPG 2 is installed
USER_ID="${1}"
HOMEDIR=~/.gnupg/trezor
DEVICE=${DEVICE:="trezor"} # or "ledger"
CURVE=${CURVE:="nist256p1"} # or "ed25519"
TIMESTAMP=${TIMESTAMP:=`date +%s`} # key creation timestamp
HOMEDIR=~/.gnupg/${DEVICE}
# Prepare new GPG home directory for TREZOR-based identity
# Prepare new GPG home directory for hardware-based identity
rm -rf "${HOMEDIR}"
mkdir -p "${HOMEDIR}"
chmod 700 "${HOMEDIR}"
# Generate new GPG identity and import into GPG keyring
trezor-gpg-create -v "${USER_ID}" -t "${TIMESTAMP}" -e "${CURVE}" > "${HOMEDIR}/pubkey.asc"
gpg2 --homedir "${HOMEDIR}" --import < "${HOMEDIR}/pubkey.asc"
$DEVICE-gpg create -v "${USER_ID}" -t "${TIMESTAMP}" -e "${CURVE}" > "${HOMEDIR}/pubkey.asc"
gpg2 --homedir "${HOMEDIR}" --import < "${HOMEDIR}/pubkey.asc" 2> /dev/null
rm -f "${HOMEDIR}/S.gpg-agent" # (otherwise, our agent won't be started automatically)
# Make new GPG identity with "ultimate" trust (via its fingerprint)
FINGERPRINT=$(gpg2 --homedir "${HOMEDIR}" --list-public-keys --with-fingerprint --with-colons | sed -n -E 's/^fpr:::::::::([0-9A-F]+):$/\1/p' | head -n1)
echo "${FINGERPRINT}:6" | gpg2 --homedir "${HOMEDIR}" --import-ownertrust
echo "${FINGERPRINT}:6" | gpg2 --homedir "${HOMEDIR}" --import-ownertrust 2> /dev/null
AGENT_PATH="$(which ${DEVICE}-gpg-agent)"
# Prepare GPG configuration file
echo "# TREZOR-based GPG configuration
agent-program $(which trezor-gpg-agent)
echo "# Hardware-based GPG configuration
agent-program ${AGENT_PATH}
personal-digest-preferences SHA512
" | tee "${HOMEDIR}/gpg.conf"
default-key \"${USER_ID}\"
" > "${HOMEDIR}/gpg.conf"
echo "# TREZOR-based GPG agent emulator
# Prepare GPG agent configuration file
echo "# Hardware-based GPG agent emulator
log-file ${HOMEDIR}/gpg-agent.log
verbosity 2
" | tee "${HOMEDIR}/gpg-agent.conf"
" > "${HOMEDIR}/gpg-agent.conf"
# Prepare a helper script for setting up the new identity
echo "#!/bin/bash
set -eu
export GNUPGHOME=${HOMEDIR}
COMMAND=\$*
if [ -z \"\${COMMAND}\" ]
then
\${SHELL}
else
\${COMMAND}
fi
" > "${HOMEDIR}/env"
chmod u+x "${HOMEDIR}/env"
echo "Starting ${DEVICE}-gpg-agent at ${HOMEDIR}..."
# Load agent and make sure it responds with the new identity
GNUPGHOME="${HOMEDIR}" gpg2 -K 2> /dev/null

View File

@@ -1,28 +0,0 @@
#!/bin/bash
set -eu
gpg2 --version >/dev/null # verify that GnuPG 2 is installed
export GNUPGHOME=~/.gnupg/trezor
CONFIG_PATH="${GNUPGHOME}/gpg-agent.conf"
if [ ! -f ${CONFIG_PATH} ]
then
echo "No configuration found: ${CONFIG_PATH}"
exit 1
fi
# Make sure that the device is unlocked before starting the shell
trezor-gpg-unlock
# Make sure TREZOR-based gpg-agent is running
gpg-connect-agent --agent-program "$(which trezor-gpg-agent)" </dev/null
COMMAND=$*
if [ -z "${COMMAND}" ]
then
gpg2 --list-public-keys
${SHELL}
else
${COMMAND}
fi

27
setup.py Normal file → Executable file
View File

@@ -2,17 +2,23 @@
from setuptools import setup
setup(
name='trezor_agent',
version='0.8.3',
description='Using Trezor as hardware SSH agent',
name='libagent',
version='0.9.2',
description='Using hardware wallets as SSH/GPG agent',
author='Roman Zeyde',
author_email='roman.zeyde@gmail.com',
url='http://github.com/romanz/trezor-agent',
packages=['trezor_agent', 'trezor_agent.device', 'trezor_agent.gpg'],
packages=[
'libagent',
'libagent.device',
'libagent.gpg',
'libagent.ssh'
],
install_requires=[
'ecdsa>=0.13', 'ed25519>=1.4', 'Cython>=0.23.4', 'protobuf>=3.0.0', 'semver>=2.2',
'trezor>=0.7.6', 'keepkey>=0.7.3', 'ledgerblue>=0.1.8',
'hidapi==0.7.99.post15' # until https://github.com/keepkey/python-keepkey/pull/8 is merged
'ecdsa>=0.13',
'ed25519>=1.4',
'semver>=2.2',
'unidecode>=0.4.20',
],
platforms=['POSIX'],
classifiers=[
@@ -26,16 +32,11 @@ setup(
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Topic :: Software Development :: Libraries :: Python Modules',
'Topic :: System :: Networking',
'Topic :: Communications',
'Topic :: Security',
'Topic :: Utilities',
],
entry_points={'console_scripts': [
'trezor-agent = trezor_agent.__main__:run_agent',
'trezor-gpg-create = trezor_agent.gpg.__main__:main_create',
'trezor-gpg-agent = trezor_agent.gpg.__main__:main_agent',
'trezor-gpg-unlock = trezor_agent.gpg.__main__:auto_unlock',
]},
)

12
tox.ini
View File

@@ -2,6 +2,8 @@
envlist = py27,py3
[pep8]
max-line-length = 100
[pep257]
add-ignore = D401
[testenv]
deps=
pytest
@@ -13,10 +15,10 @@ deps=
pydocstyle
isort
commands=
pep8 trezor_agent
isort --skip-glob .tox -c -r trezor_agent
pylint --reports=no --rcfile .pylintrc trezor_agent
pydocstyle trezor_agent
coverage run --omit='trezor_agent/__main__.py' --source trezor_agent -m py.test -v trezor_agent
pep8 libagent
isort --skip-glob .tox -c -r libagent
pylint --reports=no --rcfile .pylintrc libagent
pydocstyle libagent
coverage run --source libagent -m py.test -v libagent
coverage report
coverage html

View File

@@ -1,27 +0,0 @@
"""Cryptographic hardware device management."""
import logging
from . import trezor
from . import keepkey
from . import ledger
from . import interface
log = logging.getLogger(__name__)
DEVICE_TYPES = [
trezor.Trezor,
keepkey.KeepKey,
ledger.LedgerNanoS,
]
def detect():
"""Detect the first available device and return it to the user."""
for device_type in DEVICE_TYPES:
try:
with device_type() as d:
return d
except interface.NotFoundError as e:
log.debug('device not found: %s', e)
raise IOError('No device found!')

View File

@@ -1,9 +0,0 @@
"""
TREZOR support for ECDSA GPG signatures.
See these links for more details:
- https://www.gnupg.org/faq/whats-new-in-2.1.html
- https://tools.ietf.org/html/rfc4880
- https://tools.ietf.org/html/rfc6637
- https://tools.ietf.org/html/draft-irtf-cfrg-eddsa-05
"""

View File

@@ -1,170 +0,0 @@
"""GPG-agent utilities."""
import binascii
import logging
from . import client, decode, keyring, protocol
from .. import util
log = logging.getLogger(__name__)
def yield_connections(sock):
"""Run a server on the specified socket."""
while True:
log.debug('waiting for connection on %s', sock.getsockname())
try:
conn, _ = sock.accept()
except KeyboardInterrupt:
return
conn.settimeout(None)
log.debug('accepted connection on %s', sock.getsockname())
yield conn
def serialize(data):
"""Serialize data according to ASSUAN protocol."""
for c in [b'%', b'\n', b'\r']:
escaped = '%{:02X}'.format(ord(c)).encode('ascii')
data = data.replace(c, escaped)
return data
def sig_encode(r, s):
"""Serialize ECDSA signature data into GPG S-expression."""
r = serialize(util.num2bytes(r, 32))
s = serialize(util.num2bytes(s, 32))
return b'(7:sig-val(5:ecdsa(1:r32:' + r + b')(1:s32:' + s + b')))'
def open_connection(keygrip_bytes):
"""
Connect to the device for the specified keygrip.
Parse GPG public key to find the first user ID, which is used to
specify the correct signature/decryption key on the device.
"""
pubkey_dict, user_ids = decode.load_by_keygrip(
pubkey_bytes=keyring.export_public_keys(),
keygrip=keygrip_bytes)
# We assume the first user ID is used to generate TREZOR-based GPG keys.
user_id = user_ids[0]['value'].decode('ascii')
curve_name = protocol.get_curve_name_by_oid(pubkey_dict['curve_oid'])
ecdh = (pubkey_dict['algo'] == protocol.ECDH_ALGO_ID)
conn = client.Client(user_id, curve_name=curve_name)
pubkey = protocol.PublicKey(
curve_name=curve_name, created=pubkey_dict['created'],
verifying_key=conn.pubkey(ecdh=ecdh), ecdh=ecdh)
assert pubkey.key_id() == pubkey_dict['key_id']
assert pubkey.keygrip() == keygrip_bytes
return conn
def pksign(keygrip, digest, algo):
"""Sign a message digest using a private EC key."""
log.debug('signing %r digest (algo #%s)', digest, algo)
keygrip_bytes = binascii.unhexlify(keygrip)
conn = open_connection(keygrip_bytes)
r, s = conn.sign(binascii.unhexlify(digest))
result = sig_encode(r, s)
log.debug('result: %r', result)
return result
def _serialize_point(data):
prefix = '{}:'.format(len(data)).encode('ascii')
# https://www.gnupg.org/documentation/manuals/assuan/Server-responses.html
return b'(5:value' + serialize(prefix + data) + b')'
def parse_ecdh(line):
"""Parse ECDH request and return remote public key."""
prefix, line = line.split(b' ', 1)
assert prefix == b'D'
exp, leftover = keyring.parse(keyring.unescape(line))
log.debug('ECDH s-exp: %r', exp)
assert not leftover
label, exp = exp
assert label == b'enc-val'
assert exp[0] == b'ecdh'
items = exp[1:]
log.debug('ECDH parameters: %r', items)
return dict(items)[b'e']
def pkdecrypt(keygrip, conn):
"""Handle decryption using ECDH."""
for msg in [b'S INQUIRE_MAXLEN 4096', b'INQUIRE CIPHERTEXT']:
keyring.sendline(conn, msg)
line = keyring.recvline(conn)
assert keyring.recvline(conn) == b'END'
remote_pubkey = parse_ecdh(line)
keygrip_bytes = binascii.unhexlify(keygrip)
conn = open_connection(keygrip_bytes)
return _serialize_point(conn.ecdh(remote_pubkey))
@util.memoize
def have_key(keygrip):
"""Check if current keygrip correspond to a TREZOR-based key."""
try:
open_connection(keygrip_bytes=binascii.unhexlify(keygrip))
return True
except KeyError as e:
log.warning('HAVEKEY(%s) failed: %s', keygrip, e)
return False
# pylint: disable=too-many-branches
def handle_connection(conn):
"""Handle connection from GPG binary using the ASSUAN protocol."""
keygrip = None
digest = None
algo = None
version = keyring.gpg_version() # "Clone" existing GPG version
keyring.sendline(conn, b'OK')
for line in keyring.iterlines(conn):
parts = line.split(b' ')
command = parts[0]
args = parts[1:]
if command in {b'RESET', b'OPTION', b'SETKEYDESC'}:
pass # reply with OK
elif command == b'GETINFO':
keyring.sendline(conn, b'D ' + version)
elif command == b'AGENT_ID':
keyring.sendline(conn, b'D TREZOR') # "Fake" agent ID
elif command in {b'SIGKEY', b'SETKEY'}:
keygrip, = args
elif command == b'SETHASH':
algo, digest = args
elif command == b'PKSIGN':
sig = pksign(keygrip, digest, algo)
keyring.sendline(conn, b'D ' + sig)
elif command == b'PKDECRYPT':
sec = pkdecrypt(keygrip, conn)
keyring.sendline(conn, b'D ' + sec)
elif command == b'HAVEKEY':
if not have_key(keygrip=args[0]):
keyring.sendline(conn,
b'ERR 67108881 No secret key <GPG Agent>')
return
elif command == b'KEYINFO':
keygrip, = args
# Dummy reply (mainly for 'gpg --edit' to succeed).
# For details, see GnuPG agent KEYINFO command help.
# https://git.gnupg.org/cgi-bin/gitweb.cgi?p=gnupg.git;a=blob;f=agent/command.c;h=c8b34e9882076b1b724346787781f657cac75499;hb=refs/heads/master#l1082
fmt = 'S KEYINFO {0} X - - - - - - -'
keyring.sendline(conn, fmt.format(keygrip).encode('ascii'))
elif command == b'BYE':
return
elif command == b'KILLAGENT':
keyring.sendline(conn, b'OK')
raise StopIteration
else:
log.error('unknown request: %r', line)
return
keyring.sendline(conn, b'OK')

View File

@@ -1,44 +0,0 @@
"""Device abstraction layer for GPG operations."""
import logging
from .. import device, formats, util
log = logging.getLogger(__name__)
class Client(object):
"""Sign messages and get public keys from a hardware device."""
def __init__(self, user_id, curve_name):
"""Connect to the device and retrieve required public key."""
self.device = device.detect()
self.user_id = user_id
self.identity = device.interface.Identity(
identity_str='gpg://', curve_name=curve_name)
self.identity.identity_dict['host'] = user_id
def pubkey(self, ecdh=False):
"""Return public key as VerifyingKey object."""
with self.device:
pubkey = self.device.pubkey(ecdh=ecdh, identity=self.identity)
return formats.decompress_pubkey(
pubkey=pubkey, curve_name=self.identity.curve_name)
def sign(self, digest):
"""Sign the digest and return a serialized signature."""
log.info('please confirm GPG signature on %s for "%s"...',
self.device, self.user_id)
if self.identity.curve_name == formats.CURVE_NIST256:
digest = digest[:32] # sign the first 256 bits
log.debug('signing digest: %s', util.hexlify(digest))
with self.device:
sig = self.device.sign(blob=digest, identity=self.identity)
return (util.bytes2num(sig[:32]), util.bytes2num(sig[32:]))
def ecdh(self, pubkey):
"""Derive shared secret using ECDH from remote public key."""
log.info('please confirm GPG decryption on %s for "%s"...',
self.device, self.user_id)
with self.device:
return self.device.ecdh(pubkey=pubkey, identity=self.identity)