Compare commits

...

79 Commits

Author SHA1 Message Date
Roman Zeyde
79e68b29c2 bump version 2016-10-18 22:16:54 +03:00
Roman Zeyde
8265515641 gpg: fix small Python2/3 issue 2016-10-18 22:15:58 +03:00
Roman Zeyde
749799845d bump version 2016-10-18 21:37:01 +03:00
Roman Zeyde
eaea35003e gpg: remove unused function (_time_format) 2016-10-18 21:25:31 +03:00
Roman Zeyde
eefb38ce83 gpg: remove unused function (_verify_keygrip) 2016-10-18 21:19:09 +03:00
Roman Zeyde
0730eb7223 gpg: use same logging configuration as in SSH 2016-10-18 21:02:49 +03:00
Roman Zeyde
5b61702205 gpg: don't crash gpg-agent on error 2016-10-18 20:56:17 +03:00
Roman Zeyde
0ad0ca3b9a README: add a note about SSH incompatible options 2016-10-18 19:46:43 +03:00
Roman Zeyde
2843cdcf41 ssh: pretty-print user name 2016-10-18 18:28:21 +03:00
Roman Zeyde
c7bc78ebe7 Merge pull request #58 from romanz/keygrip-agent
gpg: replace TREZOR_GPG_USER_ID usage in gpg-agent mode
2016-10-18 18:15:00 +03:00
Roman Zeyde
a6d9edcb0b README: update for new user ID specification for GPG 2016-10-18 18:12:42 +03:00
Roman Zeyde
bc64205a85 gpg: replace TREZOR_GPG_USER_ID usage in gpg-agent mode
Use the keygrip to find the correct public key instead.
2016-10-18 18:05:51 +03:00
Roman Zeyde
34dc803856 README: add "user@" for SSH example usage
This should help when local username and remote username are different.
2016-10-18 15:07:40 +03:00
Roman Zeyde
f7ebb02799 isort: fix imports 2016-10-18 12:10:28 +03:00
Roman Zeyde
0ba33a5bc4 gpg: document agent responses 2016-10-18 12:08:28 +03:00
Roman Zeyde
13752ddcd5 gpg: require latest GPG version 2016-10-18 12:05:44 +03:00
Roman Zeyde
487a8e56c4 gpg: add keygrip logic into decoding 2016-10-17 23:30:50 +03:00
Roman Zeyde
ef56ee4602 gpg: remove verifying logic from decoding 2016-10-17 23:08:16 +03:00
Roman Zeyde
ae381a38e5 gpg: export keygrips from protocol 2016-10-17 22:57:40 +03:00
Roman Zeyde
446ec99bf4 gpg: remove complex pubkey parsing code 2016-10-17 22:51:11 +03:00
Roman Zeyde
80c6f10533 README: correct pip commands order 2016-10-17 18:01:38 +03:00
Roman Zeyde
ff984c60e4 README: link to PIN entering instructions 2016-10-17 17:54:59 +03:00
Roman Zeyde
c9bc079dc9 gpg: add file:line to logging format 2016-10-17 11:58:03 +03:00
Roman Zeyde
65d2c04478 gpg: fix agent module to work with Python 3 2016-10-17 11:47:22 +03:00
Roman Zeyde
2d57bf4453 gpg: beter logging while search for GPG key 2016-10-17 11:46:58 +03:00
Roman Zeyde
79b6d31dfe gpg: raise proper exception when keygrip mismatch is detected 2016-10-17 11:08:06 +03:00
Roman Zeyde
7de88a3980 gpg: add comment for stopping current gpg-agent 2016-10-16 22:40:16 +03:00
Roman Zeyde
6f8d0df116 bump version 2016-10-16 22:17:53 +03:00
Roman Zeyde
b4a382d22e Merge pull request #51 from romanz/curve25519
Curve25519
2016-10-15 16:19:50 +03:00
Roman Zeyde
d236f4667e gpg: allow Curve25519 for ECDH 2016-10-15 16:10:16 +03:00
Roman Zeyde
42813ddbb4 gpg: parse curve OID from public key to select curve name 2016-10-15 16:10:16 +03:00
Roman Zeyde
8f19690943 gpg: support Curve25519 for creating encryption subkeys 2016-10-15 16:10:16 +03:00
Roman Zeyde
5047805385 gpg: move HardwareSigner to device module 2016-10-15 16:10:16 +03:00
Roman Zeyde
915b326da7 gpg: simplify AgentSigner and move to keyring module 2016-10-15 15:57:45 +03:00
Roman Zeyde
e7b8379a97 factory: explicitly only the first interface 2016-10-14 20:58:42 +03:00
Roman Zeyde
26435130d7 factory: emit warning (instead of exception) when an import fails 2016-10-12 21:15:21 +03:00
Roman Zeyde
dfde6dbee4 bump version 2016-10-12 19:12:04 +03:00
Cédric Félizard
085a3e81c7 Add explanation about entering PIN (#47)
[ci skip]
2016-10-11 21:31:25 +03:00
Cédric Félizard
3082d61deb Fix typo (#48) 2016-10-11 21:29:54 +03:00
Roman Zeyde
e3286a4510 gpg: don't clear the session after PIN is entered
This would allow single PIN entry when running multiple GPG commands.
2016-10-11 08:43:39 +03:00
Roman Zeyde
fcd5671626 Handle keyinfo request (#44)
gpg: handle KEYINFO request

See https://git.gnupg.org/cgi-bin/gitweb.cgi?p=gnupg.git;a=blob;f=agent/command.c;h=9522f898997e95207d59122d056f0f0be03ccecb;hb=6bee88dd067e03e7767ceacf6a849d9ba38cc11d#l1027 for more details.
2016-10-04 23:11:12 +03:00
Roman Zeyde
1454d2f4d7 Merge pull request #43 from romanz/fix-gpg-manual
GPG: fix installation instructions
2016-10-04 20:01:44 +03:00
Roman Zeyde
9b395363a3 GPG: fix installation instructions 2016-10-04 19:59:08 +03:00
Roman Zeyde
5bb9dd7770 Merge pull request #42 from romanz/ssh-git
README: add an example for remote git repository
2016-10-04 11:43:55 +03:00
Roman Zeyde
51df023a23 README: add an example for remote git repository 2016-10-04 11:40:14 +03:00
Roman Zeyde
d74f375637 Merge pull request #41 from romanz/protobuf-note
README: add note about protobuf permissions issue
2016-10-04 11:27:14 +03:00
Roman Zeyde
1fd0659051 README: add note about protobuf permissions issue 2016-10-04 11:24:39 +03:00
Roman Zeyde
18be290bd6 Merge branch 'fix_agent' of https://github.com/Solution4Future/trezor-agent into Solution4Future-fix_agent 2016-10-04 11:16:56 +03:00
Roman Zeyde
a1ab496bf4 Merge branch 'ledger' 2016-10-04 10:39:08 +03:00
Roman Zeyde
784e14647a Merge branch 'master' into HEAD
Conflicts:
	trezor_agent/factory.py
2016-10-04 10:37:52 +03:00
Dominik Kozaczko
7d2c649e83 don't stop polling for more devices as having more than one inserted raises more problems and we need to keep the check 2016-10-01 12:38:16 +02:00
Dominik Kozaczko
cf27b345f6 better handling of keepkey dependency; fixes #36 2016-10-01 12:30:00 +02:00
Dominik Kozaczko
386ed5a81f Merge pull request #1 from romanz/master
pull changes from upstream
2016-10-01 12:10:18 +02:00
Roman Zeyde
5a64954324 Merge pull request #37 from Solution4Future/fix_agent
removed .decode('ascii') and added missing bytestrings and also some missing deps
2016-10-01 11:42:47 +03:00
Dominik Kozaczko
3aebd137b0 removed .decode('ascii') and added missing bytestrings 2016-10-01 10:02:46 +02:00
Nicolas Pouillard
1fa35e7f1a Fix the URL for the TREZOR firmware 2016-09-30 21:07:35 +03:00
Roman Zeyde
aeda85275d bump version 2016-09-28 18:32:19 +03:00
Roman Zeyde
e41206b350 setup: update trezorlib version 2016-09-28 18:31:13 +03:00
Roman Zeyde
03650550dd Use latest protobuf library (for native Python 3 support) 2016-09-28 18:18:09 +03:00
Roman Zeyde
f7b07070da README: update setuptools to the latest version 2016-09-28 18:08:09 +03:00
Roman Zeyde
96eede9c83 Merge branch 'np-encode-subpackets' 2016-09-28 17:27:48 +03:00
Roman Zeyde
91146303a3 Follow GPG implementation for subpacket prefix encoding.
Conflicts:
	trezor_agent/gpg/protocol.py
2016-09-28 17:26:50 +03:00
Roman Zeyde
bf598435fb client: keep the session open (doesn't forget PIN) 2016-09-26 22:27:47 +03:00
Roman Zeyde
459b882b89 ledger: don't use debug=True 2016-09-14 23:07:27 +03:00
Roman Zeyde
998c9ee958 README: update usage section 2016-09-11 23:38:21 +03:00
Roman Zeyde
d408a592aa README: get only the first lines of 'trezorctl get_features' 2016-09-11 23:35:10 +03:00
Roman Zeyde
282e91ace3 update README about protobuf issueOF 2016-09-11 23:22:10 +03:00
Roman Zeyde
23c37cf1e3 README: update TREZOR's required version 2016-09-11 23:17:47 +03:00
Nicolas Pouillard
016e864503 Attempt at fixing issue #32 2016-09-06 00:45:51 +02:00
Roman Zeyde
57e09248db Merge pull request #31 from romanz/master
Update ledger branch with the latest changes from master branch
2016-09-05 22:28:07 +03:00
Roman Zeyde
030ae4c3f6 gpg: include unsupport hash algorithm ID in exception message 2016-08-13 10:06:52 +03:00
Roman Zeyde
4897b70888 factory: fix pylint import-error warnings 2016-08-11 22:38:12 +03:00
Roman Zeyde
f4ecd47ed6 factory: fix pep8 and pylint warnings 2016-08-11 22:31:24 +03:00
Roman Zeyde
c4bbac0e77 util: move BIP32 address related functions 2016-08-11 22:30:59 +03:00
BTChip
5d0b0f65d3 Merge branch 'ledger' of https://github.com/btchip/trezor-agent into ledger 2016-08-09 13:02:47 +02:00
BTChip
33747592ca Fix eddsa, SSH optimization with signature + key, cleanup 2016-08-09 13:01:57 +02:00
BTChip
adb09cd8ca Ledger integration 2016-08-09 13:01:57 +02:00
BTChip
bc1d7a5448 Fix eddsa, SSH optimization with signature + key, cleanup 2016-08-03 15:17:04 +02:00
BTChip
8fe16d24c2 Ledger integration 2016-07-31 10:00:46 +02:00
22 changed files with 625 additions and 505 deletions

View File

@@ -8,19 +8,19 @@ First, verify that you have GPG 2.1+ [installed](https://gist.github.com/vt0r/a2
```
$ gpg2 --version | head -n1
gpg (GnuPG) 2.1.11
gpg (GnuPG) 2.1.15
```
Update you TREZOR firmware to the latest version (at least [c720614](https://github.com/trezor/trezor-mcu/commit/c720614f6e9b9c07f446c95bda0257980d942871)).
Update you TREZOR firmware to the latest version (at least v1.4.0).
Install latest `trezor-agent` package from [gpg-agent](https://github.com/romanz/trezor-agent/commits/gpg-agent) branch:
Install latest `trezor-agent` package from GitHub:
```
$ pip install --user git+https://github.com/romanz/trezor-agent.git
```
Define your GPG user ID as an environment variable:
```
$ export TREZOR_GPG_USER_ID="John Doe <john@doe.bit>"
$ TREZOR_GPG_USER_ID="John Doe <john@doe.bit>"
```
There are two ways to generate TREZOR-based GPG public keys, as described below.
@@ -28,12 +28,12 @@ There are two ways to generate TREZOR-based GPG public keys, as described below.
## 1. generate a new GPG identity:
```
$ trezor-gpg create | gpg2 --import # use the TREZOR to confirm signing the primary key
$ trezor-gpg create "${TREZOR_GPG_USER_ID}" | gpg2 --import # use the TREZOR to confirm signing the primary key
gpg: key 5E4D684D: public key "John Doe <john@doe.bit>" imported
gpg: Total number processed: 1
gpg: imported: 1
$ gpg2 --edit "${TREZOR_GPG_USER_ID}" trust # set this key to ultimate trust (option #5)
$ gpg2 --edit "${TREZOR_GPG_USER_ID}" trust # set this key to ultimate trust (option #5)
$ gpg2 -k
/home/roman/.gnupg/pubring.kbx
@@ -46,14 +46,14 @@ sub nistp256/A31D9E25 2016-06-17 [E]
## 2. generate a new subkey for an existing GPG identity:
```
$ gpg2 -k # suppose there is already a GPG primary key
$ gpg2 -k # suppose there is already a GPG primary key
/home/roman/.gnupg/pubring.kbx
------------------------------
pub rsa2048/87BB07B4 2016-06-17 [SC]
uid [ultimate] John Doe <john@doe.bit>
sub rsa2048/7176D31F 2016-06-17 [E]
$ trezor-gpg create --subkey | gpg2 --import # use the TREZOR to confirm signing the subkey
$ trezor-gpg create --subkey "${TREZOR_GPG_USER_ID}" | gpg2 --import # use the TREZOR to confirm signing the subkey
gpg: key 87BB07B4: "John Doe <john@doe.bit>" 2 new signatures
gpg: key 87BB07B4: "John Doe <john@doe.bit>" 2 new subkeys
gpg: Total number processed: 1
@@ -83,13 +83,13 @@ when you are done with the TREZOR-based GPG operations.
```
$ echo "Hello World!" | gpg2 --sign | gpg2 --verify
gpg: Signature made Fri 17 Jun 2016 08:55:13 PM IDT using ECDSA key ID 5E4D684D
gpg: Good signature from "Roman Zeyde <roman.zeyde@gmail.com>" [ultimate]
gpg: Good signature from "John Doe <john@doe.bit>" [ultimate]
```
## Encrypt and decrypt GPG messages:
```
$ date | gpg2 --encrypt -r "${TREZOR_GPG_USER_ID}" | gpg2 --decrypt
gpg: encrypted with 256-bit ECDH key, ID A31D9E25, created 2016-06-17
"Roman Zeyde <roman.zeyde@gmail.com>"
"John Doe <john@doe.bit>"
Fri Jun 17 20:55:31 IDT 2016
```
@@ -101,4 +101,4 @@ $ git commit --gpg-sign # create GPG-signed commit
$ git log --show-signature -1 # verify commit signature
$ git tag --sign "TAG" # create GPG-signed tag
$ git verify-tag "TAG" # verify tag signature
```
```

View File

@@ -9,38 +9,17 @@
## Using for GitHub SSH authentication (via `trezor-git` utility)
[![GitHub](https://asciinema.org/a/38337.png)](https://asciinema.org/a/38337)
# Installation
First, make sure that the latest [trezorlib](https://pypi.python.org/pypi/trezor) Python package
is installed correctly (at least v0.6.6):
$ apt-get install python-dev libusb-1.0-0-dev libudev-dev
$ pip install Cython trezor
Then, install the latest [trezor_agent](https://pypi.python.org/pypi/trezor_agent) package:
$ pip install trezor_agent
Finally, verify that you are running the latest [TREZOR firmware](https://mytrezor.com/data/firmware/releases.json) version (at least v1.3.4):
$ trezorctl get_features
vendor: "bitcointrezor.com"
major_version: 1
minor_version: 3
patch_version: 4
...
# Public key generation
Run:
/tmp $ trezor-agent ssh.hostname.com -v > hostname.pub
2015-09-02 15:03:18,929 INFO getting "ssh://ssh.hostname.com" public key from Trezor...
/tmp $ trezor-agent user@ssh.hostname.com -v > hostname.pub
2015-09-02 15:03:18,929 INFO getting "ssh://user@ssh.hostname.com" public key from Trezor...
2015-09-02 15:03:23,342 INFO disconnected from Trezor
/tmp $ cat hostname.pub
ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBGSevcDwmT+QaZPUEWUUjTeZRBICChxMKuJ7dRpBSF8+qt+8S1GBK5Zj8Xicc8SHG/SE/EXKUL2UU3kcUzE7ADQ= ssh://ssh.hostname.com
ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBGSevcDwmT+QaZPUEWUUjTeZRBICChxMKuJ7dRpBSF8+qt+8S1GBK5Zj8Xicc8SHG/SE/EXKUL2UU3kcUzE7ADQ= ssh://user@ssh.hostname.com
Append `hostname.pub` contents to `~/.ssh/authorized_keys`
Append `hostname.pub` contents to `/home/user/.ssh/authorized_keys`
configuration file at `ssh.hostname.com`, so the remote server
would allow you to login using the corresponding private key signature.
@@ -48,9 +27,9 @@ would allow you to login using the corresponding private key signature.
Run:
/tmp $ trezor-agent ssh.hostname.com -v -c
2015-09-02 15:09:39,782 INFO getting "ssh://ssh.hostname.com" public key from Trezor...
2015-09-02 15:09:44,430 INFO please confirm user "roman" login to "ssh://ssh.hostname.com" using Trezor...
/tmp $ trezor-agent user@ssh.hostname.com -v -c
2015-09-02 15:09:39,782 INFO getting "ssh://user@ssh.hostname.com" public key from Trezor...
2015-09-02 15:09:44,430 INFO please confirm user "roman" login to "ssh://user@ssh.hostname.com" using Trezor...
2015-09-02 15:09:46,152 INFO signature status: OK
Linux lmde 3.16.0-4-amd64 #1 SMP Debian 3.16.7-ckt11-1+deb8u3 (2015-08-04) x86_64
@@ -64,3 +43,38 @@ Run:
~ $
Make sure to confirm SSH signature on the Trezor device when requested.
## Accessing remote Git repositories
Use your SSH public key to access your remote repository (e.g. [GitHub](https://help.github.com/articles/adding-a-new-ssh-key-to-your-github-account/)):
$ trezor-agent -v -e ed25519 git@github.com | xclip
Use the following Bash alias for convinient Git operations:
$ alias git_hub='trezor-agent -v -e ed25519 git@github.com -- git'
Replace `git` with `git_hub` for remote operations:
$ git_hub push origin master
# Troubleshooting
If SSH connection fails to work, please open an [issue](https://github.com/romanz/trezor-agent/issues)
with a verbose log attached (by running `trezor-agent -vv`) .
## Incompatible SSH options
Note that your local SSH configuration may ignore `trezor-agent`, if it has `IdentitiesOnly` option set to `yes`.
IdentitiesOnly
Specifies that ssh(1) should only use the authentication identity files configured in
the ssh_config files, even if ssh-agent(1) or a PKCS11Provider offers more identities.
The argument to this keyword must be “yes” or “no”.
This option is intended for situations where ssh-agent offers many different identities.
The default is “no”.
If you are failing to connect, try running:
$ trezor-agent -vv user@host -- ssh -vv -oIdentitiesOnly=no user@host

View File

@@ -11,8 +11,43 @@ See SatoshiLabs' blog posts about this feature:
- [TREZOR Firmware 1.3.4 enables SSH login](https://medium.com/@satoshilabs/trezor-firmware-1-3-4-enables-ssh-login-86a622d7e609)
- [TREZOR Firmware 1.3.6GPG Signing, SSH Login Updates and Advanced Transaction Features for Segwit](https://medium.com/@satoshilabs/trezor-firmware-1-3-6-20a7df6e692)
For usage with SSH, see the [following instructions](README-SSH.md).
## Installation
For usage with GPG, see the [following instructions](README-GPG.md).
First, make sure that the latest [trezorlib](https://pypi.python.org/pypi/trezor) Python package
is installed correctly (at least v0.6.6):
$ apt-get install python-dev libusb-1.0-0-dev libudev-dev
$ pip install -U setuptools
$ pip install Cython trezor
Then, install the latest [trezor_agent](https://pypi.python.org/pypi/trezor_agent) package:
$ pip install trezor_agent
Finally, verify that you are running the latest [TREZOR firmware](https://wallet.mytrezor.com/data/firmware/releases.json) version (at least v1.4.0):
$ trezorctl get_features | head
vendor: "bitcointrezor.com"
major_version: 1
minor_version: 4
patch_version: 0
...
If you have an error regarding `protobuf` imports (after installing it), please see [this issue](https://github.com/romanz/trezor-agent/issues/28).
## Usage
For SSH, see the [following instructions](README-SSH.md).
For GPG, see the [following instructions](README-GPG.md).
See [here](https://github.com/romanz/python-trezor#pin-entering) for PIN entering instructions.
## Troubleshooting
If there is an import problem with the installed `protobuf` package,
see [this issue](https://github.com/romanz/trezor-agent/issues/28) for fixing it.
### Gitter
Questions, suggestions and discussions are welcome: [![Chat](https://badges.gitter.im/romanz/trezor-agent.svg)](https://gitter.im/romanz/trezor-agent)

View File

@@ -3,13 +3,13 @@ from setuptools import setup
setup(
name='trezor_agent',
version='0.6.6',
version='0.7.4',
description='Using Trezor as hardware SSH agent',
author='Roman Zeyde',
author_email='roman.zeyde@gmail.com',
url='http://github.com/romanz/trezor-agent',
packages=['trezor_agent', 'trezor_agent.gpg'],
install_requires=['ecdsa>=0.13', 'ed25519>=1.4', 'Cython>=0.23.4', 'protobuf>=2.6.1', 'trezor>=0.6.12', 'semver>=2.2'],
install_requires=['ecdsa>=0.13', 'ed25519>=1.4', 'Cython>=0.23.4', 'protobuf>=3.0.0', 'trezor>=0.7.4', 'semver>=2.2'],
platforms=['POSIX'],
classifiers=[
'Environment :: Console',
@@ -27,6 +27,10 @@ setup(
'Topic :: Security',
'Topic :: Utilities',
],
extras_require={
'trezorlib': ['python-trezor>=0.7.4'],
'keepkeylib': ['keepkey>=0.7.3'],
},
entry_points={'console_scripts': [
'trezor-agent = trezor_agent.__main__:run_agent',
'trezor-git = trezor_agent.__main__:run_git',

View File

@@ -1,5 +1,5 @@
[tox]
envlist = py27,py34
envlist = py27,py3
[pep8]
max-line-length = 100
[testenv]

View File

@@ -7,14 +7,14 @@ import re
import subprocess
import sys
from . import client, formats, protocol, server
from . import client, formats, protocol, server, util
log = logging.getLogger(__name__)
def ssh_args(label):
"""Create SSH command for connecting specified server."""
identity = client.string_to_identity(label, identity_type=dict)
identity = util.string_to_identity(label, identity_type=dict)
args = []
if 'port' in identity:
@@ -73,15 +73,6 @@ def create_git_parser():
return p
def setup_logging(verbosity):
"""Configure logging for this tool."""
fmt = ('%(asctime)s %(levelname)-12s %(message)-100s '
'[%(filename)s:%(lineno)d]')
levels = [logging.WARNING, logging.INFO, logging.DEBUG]
level = levels[min(verbosity, len(levels) - 1)]
logging.basicConfig(format=fmt, level=level)
def git_host(remote_name, attributes):
"""Extract git SSH host for specified remote name."""
try:
@@ -132,7 +123,7 @@ def handle_connection_error(func):
def run_agent(client_factory=client.Client):
"""Run ssh-agent using given hardware client factory."""
args = create_agent_parser().parse_args()
setup_logging(verbosity=args.verbose)
util.setup_logging(verbosity=args.verbose)
with client_factory(curve=args.ecdsa_curve_name) as conn:
label = args.identity
@@ -161,7 +152,7 @@ def run_agent(client_factory=client.Client):
def run_git(client_factory=client.Client):
"""Run git under ssh-agent using given hardware client factory."""
args = create_git_parser().parse_args()
setup_logging(verbosity=args.verbose)
util.setup_logging(verbosity=args.verbose)
with client_factory(curve=args.ecdsa_curve_name) as conn:
label = git_host(args.remote, ['pushurl', 'url'])

View File

@@ -6,8 +6,6 @@ It is used for getting SSH public keys and ECDSA signing of server requests.
import binascii
import io
import logging
import re
import struct
from . import factory, formats, util
@@ -33,14 +31,13 @@ class Client(object):
return self
def __exit__(self, *args):
"""Forget PIN, shutdown screen and disconnect."""
"""Keep the session open (doesn't forget PIN)."""
log.info('disconnected from %s', self.device_name)
self.client.clear_session()
self.client.close()
def get_identity(self, label, index=0):
"""Parse label string into Identity protobuf."""
identity = string_to_identity(label, self.identity_type)
identity = util.string_to_identity(label, self.identity_type)
identity.proto = 'ssh'
identity.index = index
return identity
@@ -48,10 +45,10 @@ class Client(object):
def get_public_key(self, label):
"""Get SSH public key corresponding to specified by label."""
identity = self.get_identity(label=label)
label = identity_to_string(identity) # canonize key label
label = util.identity_to_string(identity) # canonize key label
log.info('getting "%s" public key (%s) from %s...',
label, self.curve, self.device_name)
addr = get_address(identity)
addr = util.get_bip32_address(identity)
node = self.client.get_public_node(n=addr,
ecdsa_curve_name=self.curve)
@@ -70,7 +67,7 @@ class Client(object):
log.debug('hidden challenge size: %d bytes', len(blob))
log.info('please confirm user "%s" login to "%s" using %s...',
msg['user'], label, self.device_name)
msg['user'].decode('ascii'), label, self.device_name)
try:
result = self.client.sign_identity(identity=identity,
@@ -93,55 +90,6 @@ class Client(object):
return result.signature[1:]
_identity_regexp = re.compile(''.join([
'^'
r'(?:(?P<proto>.*)://)?',
r'(?:(?P<user>.*)@)?',
r'(?P<host>.*?)',
r'(?::(?P<port>\w*))?',
r'(?P<path>/.*)?',
'$'
]))
def string_to_identity(s, identity_type):
"""Parse string into Identity protobuf."""
m = _identity_regexp.match(s)
result = m.groupdict()
log.debug('parsed identity: %s', result)
kwargs = {k: v for k, v in result.items() if v}
return identity_type(**kwargs)
def identity_to_string(identity):
"""Dump Identity protobuf into its string representation."""
result = []
if identity.proto:
result.append(identity.proto + '://')
if identity.user:
result.append(identity.user + '@')
result.append(identity.host)
if identity.port:
result.append(':' + identity.port)
if identity.path:
result.append(identity.path)
return ''.join(result)
def get_address(identity, ecdh=False):
"""Compute BIP32 derivation address according to SLIP-0013/0017."""
index = struct.pack('<L', identity.index)
addr = index + identity_to_string(identity).encode('ascii')
log.debug('address string: %r', addr)
digest = formats.hashfunc(addr).digest()
s = io.BytesIO(bytearray(digest))
hardened = 0x80000000
addr_0 = [13, 17][bool(ecdh)]
address_n = [addr_0] + list(util.recv(s, '<LLLL'))
return [(hardened | value) for value in address_n]
def _parse_ssh_blob(data):
res = {}
i = io.BytesIO(data)

View File

@@ -1,10 +1,14 @@
"""Thin wrapper around trezor/keepkey libraries."""
from __future__ import absolute_import
import binascii
import collections
import logging
import semver
from . import util
log = logging.getLogger(__name__)
ClientWrapper = collections.namedtuple(
@@ -41,6 +45,7 @@ def _load_client(name, client_type, hid_transport,
identity_type=identity_type,
device_name=name,
call_exception=call_exception)
return
def _load_trezor():
@@ -56,8 +61,9 @@ def _load_trezor():
identity_type=IdentityType,
required_version='>=1.4.0',
call_exception=CallException)
except ImportError:
log.exception('Missing module: install via "pip install trezor"')
except ImportError as e:
log.warning('%s: install via "pip install trezor" '
'if you need to support this device', e)
def _load_keepkey():
@@ -73,13 +79,163 @@ def _load_keepkey():
identity_type=IdentityType,
required_version='>=1.0.4',
call_exception=CallException)
except ImportError:
log.exception('Missing module: install via "pip install keepkey"')
except ImportError as e:
log.warning('%s: install via "pip install keepkey" '
'if you need to support this device', e)
def _load_ledger():
import struct
class LedgerClientConnection(object):
def __init__(self, dongle):
self.dongle = dongle
@staticmethod
def expand_path(path):
result = ""
for pathElement in path:
result = result + struct.pack(">I", pathElement)
return result
@staticmethod
def convert_public_key(ecdsa_curve_name, result):
from trezorlib.messages_pb2 import PublicKey # pylint: disable=import-error
if ecdsa_curve_name == "nist256p1":
if (result[64] & 1) != 0:
result = bytearray([0x03]) + result[1:33]
else:
result = bytearray([0x02]) + result[1:33]
else:
result = result[1:]
keyX = bytearray(result[0:32])
keyY = bytearray(result[32:][::-1])
if (keyX[31] & 1) != 0:
keyY[31] |= 0x80
result = chr(0) + str(keyY)
publicKey = PublicKey()
publicKey.node.public_key = str(result)
return publicKey
# pylint: disable=unused-argument
def get_public_node(self, n, ecdsa_curve_name="secp256k1", show_display=False):
donglePath = LedgerClientConnection.expand_path(n)
if ecdsa_curve_name == "nist256p1":
p2 = "01"
else:
p2 = "02"
apdu = "800200" + p2
apdu = apdu.decode('hex')
apdu += chr(len(donglePath) + 1) + chr(len(donglePath) / 4)
apdu += donglePath
result = bytearray(self.dongle.exchange(bytes(apdu)))[1:]
return LedgerClientConnection.convert_public_key(ecdsa_curve_name, result)
# pylint: disable=too-many-locals
def sign_identity(self, identity, challenge_hidden, challenge_visual,
ecdsa_curve_name="secp256k1"):
from trezorlib.messages_pb2 import SignedIdentity # pylint: disable=import-error
n = util.get_bip32_address(identity)
donglePath = LedgerClientConnection.expand_path(n)
if identity.proto == 'ssh':
ins = "04"
p1 = "00"
else:
ins = "08"
p1 = "00"
if ecdsa_curve_name == "nist256p1":
p2 = "81" if identity.proto == 'ssh' else "01"
else:
p2 = "82" if identity.proto == 'ssh' else "02"
apdu = "80" + ins + p1 + p2
apdu = apdu.decode('hex')
apdu += chr(len(challenge_hidden) + len(donglePath) + 1)
apdu += chr(len(donglePath) / 4) + donglePath
apdu += challenge_hidden
result = bytearray(self.dongle.exchange(bytes(apdu)))
if ecdsa_curve_name == "nist256p1":
offset = 3
length = result[offset]
r = result[offset+1:offset+1+length]
if r[0] == 0:
r = r[1:]
offset = offset + 1 + length + 1
length = result[offset]
s = result[offset+1:offset+1+length]
if s[0] == 0:
s = s[1:]
offset = offset + 1 + length
signature = SignedIdentity()
signature.signature = chr(0) + str(r) + str(s)
if identity.proto == 'ssh':
keyData = result[offset:]
pk = LedgerClientConnection.convert_public_key(ecdsa_curve_name, keyData)
signature.public_key = pk.node.public_key
return signature
else:
signature = SignedIdentity()
signature.signature = chr(0) + str(result[0:64])
if identity.proto == 'ssh':
keyData = result[64:]
pk = LedgerClientConnection.convert_public_key(ecdsa_curve_name, keyData)
signature.public_key = pk.node.public_key
return signature
def get_ecdh_session_key(self, identity, peer_public_key, ecdsa_curve_name="secp256k1"):
from trezorlib.messages_pb2 import ECDHSessionKey # pylint: disable=import-error
n = util.get_bip32_address(identity, True)
donglePath = LedgerClientConnection.expand_path(n)
if ecdsa_curve_name == "nist256p1":
p2 = "01"
else:
p2 = "02"
apdu = "800a00" + p2
apdu = apdu.decode('hex')
apdu += chr(len(peer_public_key) + len(donglePath) + 1)
apdu += chr(len(donglePath) / 4) + donglePath
apdu += peer_public_key
result = bytearray(self.dongle.exchange(bytes(apdu)))
sessionKey = ECDHSessionKey()
sessionKey.session_key = str(result)
return sessionKey
def clear_session(self):
pass
def close(self):
self.dongle.close()
# pylint: disable=unused-argument
# pylint: disable=no-self-use
def ping(self, msg, button_protection=False, pin_protection=False,
passphrase_protection=False):
return msg
class CallException(Exception):
def __init__(self, code, message):
super(CallException, self).__init__()
self.args = [code, message]
try:
from ledgerblue.comm import getDongle
except ImportError as e:
log.warning('%s: install via "pip install ledgerblue" '
'if you need to support this device', e)
return
# pylint: disable=bare-except
try:
from trezorlib.types_pb2 import IdentityType # pylint: disable=import-error
dongle = getDongle()
except:
return
yield ClientWrapper(connection=LedgerClientConnection(dongle),
identity_type=IdentityType,
device_name="ledger",
call_exception=CallException)
LOADERS = [
_load_trezor,
_load_keepkey
_load_keepkey,
_load_ledger
]

View File

@@ -11,11 +11,15 @@ from . import util
log = logging.getLogger(__name__)
# Supported ECDSA curves
# Supported ECDSA curves (for SSH and GPG)
CURVE_NIST256 = 'nist256p1'
CURVE_ED25519 = 'ed25519'
SUPPORTED_CURVES = {CURVE_NIST256, CURVE_ED25519}
# Supported ECDH curves (for GPG)
ECDH_NIST256 = 'nist256p1'
ECDH_CURVE25519 = 'curve25519'
# SSH key types
SSH_NIST256_DER_OCTET = b'\x04'
SSH_NIST256_KEY_PREFIX = b'ecdsa-sha2-'
@@ -134,7 +138,8 @@ def decompress_pubkey(pubkey, curve_name):
if len(pubkey) == 33:
decompress = {
CURVE_NIST256: _decompress_nist256,
CURVE_ED25519: _decompress_ed25519
CURVE_ED25519: _decompress_ed25519,
ECDH_CURVE25519: _decompress_ed25519,
}[curve_name]
vk = decompress(pubkey)
@@ -192,3 +197,12 @@ def import_public_key(line):
assert result['type'] == file_type.encode('ascii')
log.debug('loaded %s public key: %s', file_type, result['fingerprint'])
return result
def get_ecdh_curve_name(signature_curve_name):
"""Return appropriate curve for ECDH for specified signing curve."""
return {
CURVE_NIST256: ECDH_NIST256,
CURVE_ED25519: ECDH_CURVE25519,
ECDH_CURVE25519: ECDH_CURVE25519,
}[signature_curve_name]

View File

@@ -3,42 +3,42 @@
import argparse
import contextlib
import logging
import os
import sys
import time
from . import agent, encode, keyring, protocol
from .. import server
import semver
from . import agent, device, encode, keyring, protocol
from .. import formats, server, util
log = logging.getLogger(__name__)
def run_create(args):
"""Generate a new pubkey for a new/existing GPG identity."""
user_id = os.environ['TREZOR_GPG_USER_ID']
log.warning('NOTE: in order to re-generate the exact same GPG key later, '
'run this command with "--time=%d" commandline flag (to set '
'the timestamp of the GPG key manually).', args.time)
conn = encode.HardwareSigner(user_id=user_id,
conn = device.HardwareSigner(user_id=args.user_id,
curve_name=args.ecdsa_curve)
verifying_key = conn.pubkey(ecdh=False)
decryption_key = conn.pubkey(ecdh=True)
if args.subkey:
primary_bytes = keyring.export_public_key(user_id=user_id)
primary_bytes = keyring.export_public_key(user_id=args.user_id)
# subkey for signing
signing_key = protocol.PublicKey(
curve_name=args.ecdsa_curve, created=args.time,
verifying_key=verifying_key, ecdh=False)
# subkey for encryption
encryption_key = protocol.PublicKey(
curve_name=args.ecdsa_curve, created=args.time,
verifying_key=decryption_key, ecdh=True)
curve_name=formats.get_ecdh_curve_name(args.ecdsa_curve),
created=args.time, verifying_key=decryption_key, ecdh=True)
result = encode.create_subkey(primary_bytes=primary_bytes,
pubkey=signing_key,
subkey=signing_key,
signer_func=conn.sign)
result = encode.create_subkey(primary_bytes=result,
pubkey=encryption_key,
subkey=encryption_key,
signer_func=conn.sign)
else:
# primary key for signing
@@ -47,14 +47,14 @@ def run_create(args):
verifying_key=verifying_key, ecdh=False)
# subkey for encryption
subkey = protocol.PublicKey(
curve_name=args.ecdsa_curve, created=args.time,
verifying_key=decryption_key, ecdh=True)
curve_name=formats.get_ecdh_curve_name(args.ecdsa_curve),
created=args.time, verifying_key=decryption_key, ecdh=True)
result = encode.create_primary(user_id=user_id,
result = encode.create_primary(user_id=args.user_id,
pubkey=primary,
signer_func=conn.sign)
result = encode.create_subkey(primary_bytes=result,
pubkey=subkey,
subkey=subkey,
signer_func=conn.sign)
sys.stdout.write(protocol.armor(result, 'PUBLIC KEY BLOCK'))
@@ -66,18 +66,22 @@ def run_agent(args): # pylint: disable=unused-argument
with server.unix_domain_socket_server(sock_path) as sock:
for conn in agent.yield_connections(sock):
with contextlib.closing(conn):
agent.handle_connection(conn)
try:
agent.handle_connection(conn)
except Exception as e: # pylint: disable=broad-except
log.exception('gpg-agent failed: %s', e)
def main():
"""Main function."""
p = argparse.ArgumentParser()
p.add_argument('-v', '--verbose', action='store_true', default=False)
p.add_argument('-v', '--verbose', default=0, action='count')
subparsers = p.add_subparsers()
subparsers.required = True
subparsers.dest = 'command'
create_cmd = subparsers.add_parser('create')
create_cmd.add_argument('user_id')
create_cmd.add_argument('-s', '--subkey', action='store_true', default=False)
create_cmd.add_argument('-e', '--ecdsa-curve', default='nist256p1')
create_cmd.add_argument('-t', '--time', type=int, default=int(time.time()))
@@ -87,12 +91,18 @@ def main():
agent_cmd.set_defaults(run=run_agent)
args = p.parse_args()
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO,
format='%(asctime)s %(levelname)-10s %(message)s')
util.setup_logging(verbosity=args.verbose)
log.warning('This GPG tool is still in EXPERIMENTAL mode, '
'so please note that the API and features may '
'change without backwards compatibility!')
args.run(args)
existing_gpg = keyring.gpg_version().decode('ascii')
required_gpg = '>=2.1.15'
if semver.match(existing_gpg, required_gpg):
args.run(args)
else:
log.error('Existing gpg2 has version "%s" (%s required)',
existing_gpg, required_gpg)
if __name__ == '__main__':

View File

@@ -2,9 +2,8 @@
import binascii
import contextlib
import logging
import os
from . import decode, encode, keyring
from . import decode, device, keyring, protocol
from .. import util
log = logging.getLogger(__name__)
@@ -25,8 +24,9 @@ def yield_connections(sock):
def serialize(data):
"""Serialize data according to ASSUAN protocol."""
for c in ['%', '\n', '\r']:
data = data.replace(c, '%{:02X}'.format(ord(c)))
for c in [b'%', b'\n', b'\r']:
escaped = '%{:02X}'.format(ord(c)).encode('ascii')
data = data.replace(c, escaped)
return data
@@ -34,19 +34,40 @@ def sig_encode(r, s):
"""Serialize ECDSA signature data into GPG S-expression."""
r = serialize(util.num2bytes(r, 32))
s = serialize(util.num2bytes(s, 32))
return '(7:sig-val(5:ecdsa(1:r32:{})(1:s32:{})))'.format(r, s)
return b'(7:sig-val(5:ecdsa(1:r32:' + r + b')(1:s32:' + s + b')))'
@contextlib.contextmanager
def open_connection(keygrip_bytes):
"""
Connect to the device for the specified keygrip.
Parse GPG public key to find the first user ID, which is used to
specify the correct signature/decryption key on the device.
"""
pubkey_dict, user_ids = decode.load_by_keygrip(
pubkey_bytes=keyring.export_public_keys(),
keygrip=keygrip_bytes)
# We assume the first user ID is used to generate TREZOR-based GPG keys.
user_id = user_ids[0]['value']
curve_name = protocol.get_curve_name_by_oid(pubkey_dict['curve_oid'])
ecdh = (pubkey_dict['algo'] == protocol.ECDH_ALGO_ID)
conn = device.HardwareSigner(user_id, curve_name=curve_name)
with contextlib.closing(conn):
pubkey = protocol.PublicKey(
curve_name=curve_name, created=pubkey_dict['created'],
verifying_key=conn.pubkey(ecdh=ecdh), ecdh=ecdh)
assert pubkey.key_id() == pubkey_dict['key_id']
assert pubkey.keygrip == keygrip_bytes
yield conn
def pksign(keygrip, digest, algo):
"""Sign a message digest using a private EC key."""
assert algo == '8'
user_id = os.environ['TREZOR_GPG_USER_ID']
pubkey_dict = decode.load_public_key(
pubkey_bytes=keyring.export_public_key(user_id=user_id),
use_custom=True, ecdh=False)
pubkey, conn = encode.load_from_public_key(pubkey_dict=pubkey_dict)
with contextlib.closing(conn):
assert pubkey.keygrip == binascii.unhexlify(keygrip)
assert algo == b'8', 'Unsupported hash algorithm ID {}'.format(algo)
keygrip_bytes = binascii.unhexlify(keygrip)
with open_connection(keygrip_bytes) as conn:
r, s = conn.sign(binascii.unhexlify(digest))
result = sig_encode(r, s)
log.debug('result: %r', result)
@@ -54,17 +75,15 @@ def pksign(keygrip, digest, algo):
def _serialize_point(data):
data = '{}:'.format(len(data)) + data
prefix = '{}:'.format(len(data)).encode('ascii')
# https://www.gnupg.org/documentation/manuals/assuan/Server-responses.html
for c in ['%', '\n', '\r']:
data = data.replace(c, '%{:02X}'.format(ord(c)))
return '(5:value' + data + ')'
return b'(5:value' + serialize(prefix + data) + b')'
def parse_ecdh(line):
"""Parse ECDH request and return remote public key."""
prefix, line = line.split(' ', 1)
assert prefix == 'D'
prefix, line = line.split(b' ', 1)
assert prefix == b'D'
exp, leftover = keyring.parse(keyring.unescape(line))
log.debug('ECDH s-exp: %r', exp)
assert not leftover
@@ -73,7 +92,7 @@ def parse_ecdh(line):
assert exp[0] == b'ecdh'
items = exp[1:]
log.debug('ECDH parameters: %r', items)
return dict(items)['e']
return dict(items)[b'e']
def pkdecrypt(keygrip, conn):
@@ -85,18 +104,9 @@ def pkdecrypt(keygrip, conn):
assert keyring.recvline(conn) == b'END'
remote_pubkey = parse_ecdh(line)
user_id = os.environ['TREZOR_GPG_USER_ID']
local_pubkey = decode.load_public_key(
pubkey_bytes=keyring.export_public_key(user_id=user_id),
use_custom=True, ecdh=True)
pubkey, conn = encode.load_from_public_key(pubkey_dict=local_pubkey)
with contextlib.closing(conn):
assert pubkey.keygrip == binascii.unhexlify(keygrip)
shared_secret = conn.ecdh(remote_pubkey)
assert len(shared_secret) == 65
assert shared_secret[:1] == b'\x04'
return _serialize_point(shared_secret)
keygrip_bytes = binascii.unhexlify(keygrip)
with open_connection(keygrip_bytes) as conn:
return _serialize_point(conn.ecdh(remote_pubkey))
def handle_connection(conn):
@@ -104,30 +114,36 @@ def handle_connection(conn):
keygrip = None
digest = None
algo = None
version = keyring.gpg_version()
version = keyring.gpg_version() # "Clone" existing GPG version
keyring.sendline(conn, b'OK')
for line in keyring.iterlines(conn):
parts = line.split(' ')
parts = line.split(b' ')
command = parts[0]
args = parts[1:]
if command in {'RESET', 'OPTION', 'HAVEKEY', 'SETKEYDESC'}:
if command in {b'RESET', b'OPTION', b'HAVEKEY', b'SETKEYDESC'}:
pass # reply with OK
elif command == 'GETINFO':
elif command == b'GETINFO':
keyring.sendline(conn, b'D ' + version)
elif command == 'AGENT_ID':
keyring.sendline(conn, b'D TREZOR')
elif command in {'SIGKEY', 'SETKEY'}:
elif command == b'AGENT_ID':
keyring.sendline(conn, b'D TREZOR') # "Fake" agent ID
elif command in {b'SIGKEY', b'SETKEY'}:
keygrip, = args
elif command == 'SETHASH':
elif command == b'SETHASH':
algo, digest = args
elif command == 'PKSIGN':
elif command == b'PKSIGN':
sig = pksign(keygrip, digest, algo)
keyring.sendline(conn, b'D ' + sig)
elif command == 'PKDECRYPT':
elif command == b'PKDECRYPT':
sec = pkdecrypt(keygrip, conn)
keyring.sendline(conn, b'D ' + sec)
elif command == 'BYE':
elif command == b'KEYINFO':
keygrip, = args
# Dummy reply (mainly for 'gpg --edit' to succeed).
# For details, see GnuPG agent KEYINFO command help.
fmt = 'S KEYINFO {0} X - - - - - - -'
keyring.sendline(conn, fmt.format(keygrip).encode('ascii'))
elif command == b'BYE':
return
else:
log.error('unknown request: %r', line)

View File

@@ -1,6 +1,5 @@
"""Decoders for GPG v2 data structures."""
import base64
import copy
import functools
import hashlib
import io
@@ -53,39 +52,29 @@ def parse_mpis(s, n):
return [parse_mpi(s) for _ in range(n)]
def _parse_nist256p1_verifier(mpi):
def _parse_nist256p1_pubkey(mpi):
prefix, x, y = util.split_bits(mpi, 4, 256, 256)
assert prefix == 4
point = ecdsa.ellipticcurve.Point(curve=ecdsa.NIST256p.curve,
x=x, y=y)
vk = ecdsa.VerifyingKey.from_public_point(
return ecdsa.VerifyingKey.from_public_point(
point=point, curve=ecdsa.curves.NIST256p,
hashfunc=hashlib.sha256)
def _nist256p1_verify(signature, digest):
result = vk.verify_digest(signature=signature,
digest=digest,
sigdecode=lambda rs, order: rs)
log.debug('nist256p1 ECDSA signature is OK (%s)', result)
return _nist256p1_verify, vk
def _parse_ed25519_verifier(mpi):
def _parse_ed25519_pubkey(mpi):
prefix, value = util.split_bits(mpi, 8, 256)
assert prefix == 0x40
vk = ed25519.VerifyingKey(util.num2bytes(value, size=32))
def _ed25519_verify(signature, digest):
sig = b''.join(util.num2bytes(val, size=32)
for val in signature)
result = vk.verify(sig, digest)
log.debug('ed25519 ECDSA signature is OK (%s)', result)
return _ed25519_verify, vk
return ed25519.VerifyingKey(util.num2bytes(value, size=32))
SUPPORTED_CURVES = {
b'\x2A\x86\x48\xCE\x3D\x03\x01\x07': _parse_nist256p1_verifier,
b'\x2B\x06\x01\x04\x01\xDA\x47\x0F\x01': _parse_ed25519_verifier,
b'\x2A\x86\x48\xCE\x3D\x03\x01\x07':
(_parse_nist256p1_pubkey, protocol.keygrip_nist256),
b'\x2B\x06\x01\x04\x01\xDA\x47\x0F\x01':
(_parse_ed25519_pubkey, protocol.keygrip_ed25519),
b'\x2B\x06\x01\x04\x01\x97\x55\x01\x05\x01':
(_parse_ed25519_pubkey, protocol.keygrip_curve25519),
}
RSA_ALGO_IDS = {1, 2, 3}
@@ -168,11 +157,10 @@ def _parse_pubkey(stream, packet_type='pubkey'):
oid_size = stream.readfmt('B')
oid = stream.read(oid_size)
assert oid in SUPPORTED_CURVES, util.hexlify(oid)
parser = SUPPORTED_CURVES[oid]
p['curve_oid'] = oid
mpi = parse_mpi(stream)
log.debug('mpi: %x (%d bits)', mpi, mpi.bit_length())
p['verifier'], p['verifying_key'] = parser(mpi)
leftover = stream.read()
if leftover:
leftover = io.BytesIO(leftover)
@@ -181,15 +169,18 @@ def _parse_pubkey(stream, packet_type='pubkey'):
size, = util.readfmt(leftover, 'B')
p['kdf'] = leftover.read(size)
assert not leftover.read()
parse_func, keygrip_func = SUPPORTED_CURVES[oid]
keygrip = keygrip_func(parse_func(mpi))
log.debug('keygrip: %s', util.hexlify(keygrip))
p['keygrip'] = keygrip
elif p['algo'] == DSA_ALGO_ID:
log.warning('DSA signatures are not verified')
parse_mpis(stream, n=4)
parse_mpis(stream, n=4) # DSA keys are not supported
elif p['algo'] == ELGAMAL_ALGO_ID:
log.warning('ElGamal signatures are not verified')
parse_mpis(stream, n=3)
parse_mpis(stream, n=3) # ElGamal keys are not supported
else: # assume RSA
log.warning('RSA signatures are not verified')
parse_mpis(stream, n=2)
parse_mpis(stream, n=2) # RSA keys are not supported
assert not stream.read()
# https://tools.ietf.org/html/rfc4880#section-12.2
@@ -280,28 +271,6 @@ def digest_packets(packets, hasher):
return hasher.digest()
def collect_packets(packets, types_to_collect):
"""Collect specified packet types into their leading packet."""
packet = None
result = []
for p in packets:
if p['type'] in types_to_collect:
packet.setdefault(p['type'], []).append(p)
else:
packet = copy.deepcopy(p)
result.append(packet)
return result
def parse_public_keys(stream):
"""Parse GPG public key into hierarchy of packets."""
packets = list(parse_packets(stream))
packets = collect_packets(packets, {'signature'})
packets = collect_packets(packets, {'user_id', 'user_attribute'})
packets = collect_packets(packets, {'subkey'})
return packets
HASH_ALGORITHMS = {
1: 'md5',
2: 'sha1',
@@ -313,42 +282,22 @@ HASH_ALGORITHMS = {
}
def load_public_key(pubkey_bytes, use_custom=False, ecdh=False):
"""Parse and validate GPG public key from an input stream."""
def load_by_keygrip(pubkey_bytes, keygrip):
"""Return public key and first user ID for specified keygrip."""
stream = io.BytesIO(pubkey_bytes)
packets = list(parse_packets(stream))
pubkey, userid, signature = packets[:3]
packets = packets[3:]
packets_per_pubkey = []
for p in packets:
if p['type'] == 'pubkey':
# Add a new packet list for each pubkey.
packets_per_pubkey.append([])
packets_per_pubkey[-1].append(p)
hash_alg = HASH_ALGORITHMS.get(signature['hash_alg'])
if hash_alg is not None:
digest = digest_packets(packets=[pubkey, userid, signature],
hasher=hashlib.new(hash_alg))
assert signature['hash_prefix'] == digest[:2]
log.debug('loaded public key "%s"', userid['value'])
if hash_alg is not None and pubkey.get('verifier'):
verify_digest(pubkey=pubkey, digest=digest,
signature=signature['sig'], label='GPG public key')
else:
log.warning('public key %s is not verified!',
util.hexlify(pubkey['key_id']))
packet = pubkey
while use_custom:
if packet['type'] in ('pubkey', 'subkey') and signature['_is_custom']:
if ecdh == (packet['algo'] == protocol.ECDH_ALGO_ID):
log.debug('found custom %s', packet['type'])
break
while packets[1]['type'] != 'signature':
packets = packets[1:]
packet, signature = packets[:2]
packets = packets[2:]
packet['user_id'] = userid['value']
packet['_is_custom'] = signature['_is_custom']
return packet
for packets in packets_per_pubkey:
user_ids = [p for p in packets if p['type'] == 'user_id']
for p in packets:
if p.get('keygrip') == keygrip:
return p, user_ids
def load_signature(stream, original_data):
@@ -361,17 +310,6 @@ def load_signature(stream, original_data):
return signature, digest
def verify_digest(pubkey, digest, signature, label):
"""Verify a digest signature from a specified public key."""
verifier = pubkey['verifier']
try:
verifier(signature, digest)
log.debug('%s is OK', label)
except ecdsa.keys.BadSignatureError:
log.error('Bad %s!', label)
raise ValueError('Invalid ECDSA signature for {}'.format(label))
def remove_armor(armored_data):
"""Decode armored data into its binary form."""
stream = io.BytesIO(armored_data)
@@ -380,11 +318,3 @@ def remove_armor(armored_data):
payload, checksum = data[:-3], data[-3:]
assert util.crc24(payload) == checksum
return payload
def verify(pubkey, signature, original_data):
"""Verify correctness of public key and signature."""
stream = io.BytesIO(remove_armor(signature))
signature, digest = load_signature(stream, original_data)
verify_digest(pubkey=pubkey, digest=digest,
signature=signature['sig'], label='GPG signature')

View File

@@ -0,0 +1,54 @@
"""Device abstraction layer for GPG operations."""
from .. import factory, formats, util
class HardwareSigner(object):
"""Sign messages and get public keys from a hardware device."""
def __init__(self, user_id, curve_name):
"""Connect to the device and retrieve required public key."""
self.client_wrapper = factory.load()
self.identity = self.client_wrapper.identity_type()
self.identity.proto = 'gpg'
self.identity.host = user_id
self.curve_name = curve_name
def pubkey(self, ecdh=False):
"""Return public key as VerifyingKey object."""
addr = util.get_bip32_address(identity=self.identity, ecdh=ecdh)
if ecdh:
curve_name = formats.get_ecdh_curve_name(self.curve_name)
else:
curve_name = self.curve_name
public_node = self.client_wrapper.connection.get_public_node(
n=addr, ecdsa_curve_name=curve_name)
return formats.decompress_pubkey(
pubkey=public_node.node.public_key,
curve_name=curve_name)
def sign(self, digest):
"""Sign the digest and return a serialized signature."""
result = self.client_wrapper.connection.sign_identity(
identity=self.identity,
challenge_hidden=digest,
challenge_visual='',
ecdsa_curve_name=self.curve_name)
assert result.signature[:1] == b'\x00'
sig = result.signature[1:]
return (util.bytes2num(sig[:32]), util.bytes2num(sig[32:]))
def ecdh(self, pubkey):
"""Derive shared secret using ECDH from remote public key."""
result = self.client_wrapper.connection.get_ecdh_session_key(
identity=self.identity,
peer_public_key=pubkey,
ecdsa_curve_name=formats.get_ecdh_curve_name(self.curve_name))
assert len(result.session_key) in {65, 33} # NIST256 or Curve25519
assert result.session_key[:1] == b'\x04'
return result.session_key
def close(self):
"""Close the connection to the device."""
self.client_wrapper.connection.close()

View File

@@ -1,83 +1,13 @@
"""Create GPG ECDSA signatures and public keys using TREZOR device."""
import io
import logging
import time
from . import decode, keyring, protocol
from .. import client, factory, formats, util
from .. import util
log = logging.getLogger(__name__)
class HardwareSigner(object):
"""Sign messages and get public keys from a hardware device."""
def __init__(self, user_id, curve_name):
"""Connect to the device and retrieve required public key."""
self.client_wrapper = factory.load()
self.identity = self.client_wrapper.identity_type()
self.identity.proto = 'gpg'
self.identity.host = user_id
self.curve_name = curve_name
def pubkey(self, ecdh=False):
"""Return public key as VerifyingKey object."""
addr = client.get_address(identity=self.identity, ecdh=ecdh)
public_node = self.client_wrapper.connection.get_public_node(
n=addr, ecdsa_curve_name=self.curve_name)
return formats.decompress_pubkey(
pubkey=public_node.node.public_key,
curve_name=self.curve_name)
def sign(self, digest):
"""Sign the digest and return a serialized signature."""
result = self.client_wrapper.connection.sign_identity(
identity=self.identity,
challenge_hidden=digest,
challenge_visual='',
ecdsa_curve_name=self.curve_name)
assert result.signature[:1] == b'\x00'
sig = result.signature[1:]
return (util.bytes2num(sig[:32]), util.bytes2num(sig[32:]))
def ecdh(self, pubkey):
"""Derive shared secret using ECDH from remote public key."""
result = self.client_wrapper.connection.get_ecdh_session_key(
identity=self.identity,
peer_public_key=pubkey,
ecdsa_curve_name=self.curve_name)
assert len(result.session_key) == 65
assert result.session_key[:1] == b'\x04'
return result.session_key
def close(self):
"""Close the connection to the device."""
self.client_wrapper.connection.clear_session()
self.client_wrapper.connection.close()
class AgentSigner(object):
"""Sign messages and get public keys using gpg-agent tool."""
def __init__(self, user_id):
"""Connect to the agent and retrieve required public key."""
self.sock = keyring.connect_to_agent()
self.keygrip = keyring.get_keygrip(user_id)
def sign(self, digest):
"""Sign the digest and return an ECDSA/RSA/DSA signature."""
return keyring.sign_digest(sock=self.sock,
keygrip=self.keygrip, digest=digest)
def close(self):
"""Close the connection to gpg-agent."""
self.sock.close()
def _time_format(t):
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(t))
def create_primary(user_id, pubkey, signer_func):
"""Export new primary GPG public key, ready for "gpg2 --import"."""
pubkey_packet = protocol.packet(tag=6, blob=pubkey.data())
@@ -87,7 +17,6 @@ def create_primary(user_id, pubkey, signer_func):
data_to_sign = (pubkey.data_to_hash() +
user_id_packet[:1] +
util.prefix_len('>L', user_id.encode('ascii')))
log.info('creating primary GPG key "%s"', user_id)
hashed_subpackets = [
protocol.subpacket_time(pubkey.created), # signature time
# https://tools.ietf.org/html/rfc4880#section-5.2.3.7
@@ -106,7 +35,6 @@ def create_primary(user_id, pubkey, signer_func):
protocol.subpacket(16, pubkey.key_id()), # issuer key id
protocol.CUSTOM_SUBPACKET]
log.info('confirm signing with primary key')
signature = protocol.make_signature(
signer_func=signer_func,
public_algo=pubkey.algo_id,
@@ -119,26 +47,26 @@ def create_primary(user_id, pubkey, signer_func):
return pubkey_packet + user_id_packet + sign_packet
def create_subkey(primary_bytes, pubkey, signer_func):
def create_subkey(primary_bytes, subkey, signer_func, user_id=None):
"""Export new subkey to GPG primary key."""
subkey_packet = protocol.packet(tag=14, blob=pubkey.data())
primary = decode.load_public_key(primary_bytes)
log.info('adding subkey to primary GPG key "%s"', primary['user_id'])
data_to_sign = primary['_to_hash'] + pubkey.data_to_hash()
subkey_packet = protocol.packet(tag=14, blob=subkey.data())
packets = list(decode.parse_packets(io.BytesIO(primary_bytes)))
primary, user_id, signature = packets[:3]
if pubkey.ecdh:
data_to_sign = primary['_to_hash'] + subkey.data_to_hash()
if subkey.ecdh:
embedded_sig = None
else:
# Primary Key Binding Signature
hashed_subpackets = [
protocol.subpacket_time(pubkey.created)] # signature time
protocol.subpacket_time(subkey.created)] # signature time
unhashed_subpackets = [
protocol.subpacket(16, pubkey.key_id())] # issuer key id
log.info('confirm signing with new subkey')
protocol.subpacket(16, subkey.key_id())] # issuer key id
embedded_sig = protocol.make_signature(
signer_func=signer_func,
data_to_sign=data_to_sign,
public_algo=pubkey.algo_id,
public_algo=subkey.algo_id,
sig_type=0x19,
hashed_subpackets=hashed_subpackets,
unhashed_subpackets=unhashed_subpackets)
@@ -147,10 +75,10 @@ def create_subkey(primary_bytes, pubkey, signer_func):
# Key flags: https://tools.ietf.org/html/rfc4880#section-5.2.3.21
# (certify & sign) (encrypt)
flags = (2) if (not pubkey.ecdh) else (4 | 8)
flags = (2) if (not subkey.ecdh) else (4 | 8)
hashed_subpackets = [
protocol.subpacket_time(pubkey.created), # signature time
protocol.subpacket_time(subkey.created), # signature time
protocol.subpacket_byte(0x1B, flags)]
unhashed_subpackets = []
@@ -159,9 +87,8 @@ def create_subkey(primary_bytes, pubkey, signer_func):
unhashed_subpackets.append(protocol.subpacket(32, embedded_sig))
unhashed_subpackets.append(protocol.CUSTOM_SUBPACKET)
log.info('confirm signing with primary key')
if not primary['_is_custom']:
signer_func = AgentSigner(primary['user_id']).sign
if not signature['_is_custom']:
signer_func = keyring.create_agent_signer(user_id['value'])
signature = protocol.make_signature(
signer_func=signer_func,
@@ -172,22 +99,3 @@ def create_subkey(primary_bytes, pubkey, signer_func):
unhashed_subpackets=unhashed_subpackets)
sign_packet = protocol.packet(tag=2, blob=signature)
return primary_bytes + subkey_packet + sign_packet
def load_from_public_key(pubkey_dict):
"""Load correct public key from the device."""
user_id = pubkey_dict['user_id']
created = pubkey_dict['created']
curve_name = protocol.find_curve_by_algo_id(pubkey_dict['algo'])
assert curve_name in formats.SUPPORTED_CURVES
ecdh = (pubkey_dict['algo'] == protocol.ECDH_ALGO_ID)
conn = HardwareSigner(user_id, curve_name=curve_name)
pubkey = protocol.PublicKey(
curve_name=curve_name, created=created,
verifying_key=conn.pubkey(ecdh=ecdh), ecdh=ecdh)
assert pubkey.key_id() == pubkey_dict['key_id']
log.info('%s created at %s for "%s"',
pubkey, _time_format(pubkey.created), user_id)
return pubkey, conn

View File

@@ -1,4 +1,5 @@
"""Tools for doing signature using gpg-agent."""
from __future__ import absolute_import, print_function, unicode_literals
import binascii
import io
@@ -15,15 +16,15 @@ log = logging.getLogger(__name__)
def get_agent_sock_path(sp=subprocess):
"""Parse gpgconf output to find out GPG agent UNIX socket path."""
lines = sp.check_output(['gpgconf', '--list-dirs']).strip().split('\n')
dirs = dict(line.split(':', 1) for line in lines)
return dirs['agent-socket']
lines = sp.check_output(['gpgconf', '--list-dirs']).strip().split(b'\n')
dirs = dict(line.split(b':', 1) for line in lines)
return dirs[b'agent-socket']
def connect_to_agent(sp=subprocess):
"""Connect to GPG agent's UNIX socket."""
sock_path = get_agent_sock_path(sp=sp)
sp.check_call(['gpg-connect-agent', '/bye'])
sp.check_call(['gpg-connect-agent', '/bye']) # Stop current gpg-agent
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(sock_path)
return sock
@@ -190,7 +191,7 @@ def get_keygrip(user_id, sp=subprocess):
def gpg_version(sp=subprocess):
"""Get a keygrip of the primary GPG key of the specified user."""
args = gpg_command(['--version'])
output = sp.check_output(args).decode('ascii')
output = sp.check_output(args)
line = output.split(b'\n')[0] # b'gpg (GnuPG) 2.1.11'
return line.split(b' ')[-1] # b'2.1.11'
@@ -203,3 +204,21 @@ def export_public_key(user_id, sp=subprocess):
log.error('could not find public key %r in local GPG keyring', user_id)
raise KeyError(user_id)
return result
def export_public_keys(sp=subprocess):
"""Export all GPG public keys."""
args = gpg_command(['--export'])
return sp.check_output(args=args)
def create_agent_signer(user_id):
"""Sign digest with existing GPG keys using gpg-agent tool."""
sock = connect_to_agent()
keygrip = get_keygrip(user_id)
def sign(digest):
"""Sign the digest and return an ECDSA/RSA/DSA signature."""
return sign_digest(sock=sock, keygrip=keygrip, digest=digest)
return sign

View File

@@ -47,9 +47,22 @@ def subpacket_byte(subpacket_type, value):
return subpacket(subpacket_type, '>B', value)
def subpacket_prefix_len(item):
"""Prefix subpacket length according to RFC 4880 section-5.2.3.1."""
n = len(item)
if n >= 8384:
prefix = b'\xFF' + struct.pack('>L', n)
elif n >= 192:
n = n - 192
prefix = struct.pack('BB', (n // 256) + 192, n % 256)
else:
prefix = struct.pack('B', n)
return prefix + item
def subpackets(*items):
"""Serialize several GPG subpackets."""
prefixed = [util.prefix_len('>B', item) for item in items]
prefixed = [subpacket_prefix_len(item) for item in items]
return util.prefix_len('>H', b''.join(prefixed))
@@ -86,7 +99,8 @@ def _compute_keygrip(params):
return hashlib.sha1(b''.join(parts)).digest()
def _keygrip_nist256(vk):
def keygrip_nist256(vk):
"""Compute keygrip for NIST256 curve public keys."""
curve = vk.curve.curve
gen = vk.curve.generator
g = (4 << 512) | (gen.x() << 256) | gen.y()
@@ -103,7 +117,8 @@ def _keygrip_nist256(vk):
])
def _keygrip_ed25519(vk):
def keygrip_ed25519(vk):
"""Compute keygrip for Ed25519 public keys."""
# pylint: disable=line-too-long
return _compute_keygrip([
['p', util.num2bytes(0x7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFED, size=32)], # nopep8
@@ -115,20 +130,39 @@ def _keygrip_ed25519(vk):
])
def keygrip_curve25519(vk):
"""Compute keygrip for Curve25519 public keys."""
# pylint: disable=line-too-long
return _compute_keygrip([
['p', util.num2bytes(0x7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFED, size=32)], # nopep8
['a', b'\x01\xDB\x41'],
['b', b'\x01'],
['g', util.num2bytes(0x04000000000000000000000000000000000000000000000000000000000000000920ae19a1b8a086b4e01edd2c7748d14c923d4d7e6d7c61b229e9c5a27eced3d9, size=65)], # nopep8
['n', util.num2bytes(0x1000000000000000000000000000000014DEF9DEA2F79CD65812631A5CF5D3ED, size=32)], # nopep8
['q', vk.to_bytes()],
])
SUPPORTED_CURVES = {
formats.CURVE_NIST256: {
# https://tools.ietf.org/html/rfc6637#section-11
'oid': b'\x2A\x86\x48\xCE\x3D\x03\x01\x07',
'algo_id': 19,
'serialize': _serialize_nist256,
'keygrip': _keygrip_nist256,
'keygrip': keygrip_nist256,
},
formats.CURVE_ED25519: {
'oid': b'\x2B\x06\x01\x04\x01\xDA\x47\x0F\x01',
'algo_id': 22,
'serialize': _serialize_ed25519,
'keygrip': _keygrip_ed25519,
}
'keygrip': keygrip_ed25519,
},
formats.ECDH_CURVE25519: {
'oid': b'\x2B\x06\x01\x04\x01\x97\x55\x01\x05\x01',
'algo_id': 18,
'serialize': _serialize_ed25519,
'keygrip': keygrip_curve25519,
},
}
ECDH_ALGO_ID = 18
@@ -136,14 +170,12 @@ ECDH_ALGO_ID = 18
CUSTOM_SUBPACKET = subpacket(100, b'TREZOR-GPG') # marks "our" pubkey
def find_curve_by_algo_id(algo_id):
"""Find curve name that matches a public key algorith ID."""
if algo_id == ECDH_ALGO_ID:
return formats.CURVE_NIST256
curve_name, = [name for name, info in SUPPORTED_CURVES.items()
if info['algo_id'] == algo_id]
return curve_name
def get_curve_name_by_oid(oid):
"""Return curve name matching specified OID, or raise KeyError."""
for curve_name, info in SUPPORTED_CURVES.items():
if info['oid'] == oid:
return curve_name
raise KeyError('Unknown OID: {!r}'.format(oid))
class PublicKey(object):
@@ -154,7 +186,7 @@ class PublicKey(object):
self.curve_info = SUPPORTED_CURVES[curve_name]
self.created = int(created) # time since Epoch
self.verifying_key = verifying_key
self.ecdh = ecdh
self.ecdh = bool(ecdh)
if ecdh:
self.algo_id = ECDH_ALGO_ID
self.ecdh_packet = b'\x03\x01\x08\x07'

View File

@@ -1,11 +1,10 @@
import glob
import hashlib
import io
import os
import pytest
from .. import decode
from .. import decode, protocol
from ... import util
@@ -14,6 +13,15 @@ def test_subpackets():
assert decode.parse_subpackets(util.Reader(s)) == [b'\xAB\xCD', b'\xEF']
def test_subpackets_prefix():
for n in [0, 1, 2, 4, 5, 10, 191, 192, 193,
255, 256, 257, 8383, 8384, 65530]:
item = b'?' * n # create dummy subpacket
prefixed = protocol.subpackets(item)
result = decode.parse_subpackets(util.Reader(io.BytesIO(prefixed)))
assert [item] == result
def test_mpi():
s = io.BytesIO(b'\x00\x09\x01\x23')
assert decode.parse_mpi(util.Reader(s)) == 0x123
@@ -22,84 +30,6 @@ def test_mpi():
assert decode.parse_mpis(util.Reader(s), n=2) == [0x123, 5]
def assert_subdict(d, s):
for k, v in s.items():
assert d[k] == v
def test_primary_nist256p1():
# pylint: disable=line-too-long
data = b'''-----BEGIN PGP PUBLIC KEY BLOCK-----
Version: GnuPG v2
mFIEV0hI1hMIKoZIzj0DAQcCAwTXY7aq01xMPSU7gTHU9B7Z2CFoCk1Y4WYb8Tiy
hurvIZ5la6+UEgAKF9HXpQo0yE+HQOgufoLlCpdE7NoEUb+HtAd0ZXN0aW5niHYE
ExMIABIFAldISNYCGwMCFQgCFgACF4AAFgkQTcCehfpEIPILZFRSRVpPUi1HUEeV
3QEApHKmBkbLVZNpsB8q9mBzKytxnOHNB3QWDuoKJu/ERi4A/1wRGZ/B0BDazHck
zpR9luXTKwMEl+mlZmwEFKZXBmir
=oyj0
-----END PGP PUBLIC KEY BLOCK-----
'''
stream = io.BytesIO(decode.remove_armor(data))
pubkey, user_id, signature = list(decode.parse_packets(stream))
expected_pubkey = {
'created': 1464355030, 'type': 'pubkey', 'tag': 6,
'version': 4, 'algo': 19, 'key_id': b'M\xc0\x9e\x85\xfaD \xf2',
'_to_hash': b'\x99\x00R\x04WHH\xd6\x13\x08*\x86H\xce=\x03\x01\x07\x02\x03\x04\xd7c\xb6\xaa\xd3\\L=%;\x811\xd4\xf4\x1e\xd9\xd8!h\nMX\xe1f\x1b\xf18\xb2\x86\xea\xef!\x9eek\xaf\x94\x12\x00\n\x17\xd1\xd7\xa5\n4\xc8O\x87@\xe8.~\x82\xe5\n\x97D\xec\xda\x04Q\xbf\x87' # nopep8
}
assert_subdict(pubkey, expected_pubkey)
point = pubkey['verifying_key'].pubkey.point
assert point.x(), point.y() == (
97423441028100245505102979561460969898742433559010922791700160771755342491425,
71644624850142103522769833619875243486871666152651730678601507641225861250951
)
assert_subdict(user_id, {
'tag': 13, 'type': 'user_id', 'value': b'testing',
'_to_hash': b'\xb4\x00\x00\x00\x07testing'
})
assert_subdict(signature, {
'pubkey_alg': 19, '_is_custom': True, 'hash_alg': 8, 'tag': 2,
'sig_type': 19, 'version': 4, 'type': 'signature', 'hash_prefix': b'\x95\xdd',
'sig': (74381873592149178031432444136130575481350858387410643140628758456112511206958,
41642995320462795718437755373080464775445470754419831653624197847615308982443),
'hashed_subpackets': [b'\x02WHH\xd6', b'\x1b\x03', b'\x15\x08', b'\x16\x00', b'\x17\x80'],
'unhashed_subpackets': [b'\x10M\xc0\x9e\x85\xfaD \xf2', b'dTREZOR-GPG'],
'_to_hash': b'\x04\x13\x13\x08\x00\x12\x05\x02WHH\xd6\x02\x1b\x03\x02\x15\x08\x02\x16\x00\x02\x17\x80\x04\xff\x00\x00\x00\x18' # nopep8
})
digest = decode.digest_packets(packets=[pubkey, user_id, signature],
hasher=hashlib.sha256())
decode.verify_digest(pubkey=pubkey, digest=digest,
signature=signature['sig'],
label='GPG primary public key')
with pytest.raises(ValueError):
bad_digest = b'\x00' * len(digest)
decode.verify_digest(pubkey=pubkey, digest=bad_digest,
signature=signature['sig'],
label='GPG primary public key')
message = b'Hello, World!\n'
signature = b'''-----BEGIN PGP SIGNATURE-----
Version: GnuPG v2
iF4EABMIAAYFAldIlfQACgkQTcCehfpEIPKOUgD9FjaeWla4wOuDZ7P6fhkT5nZp
KDQU0N5KmNwLlt2kwo4A/jQkBII2cI8tTqOVTLNRXXqIOsMf/fG4jKM/VOFc/01c
=dC+z
-----END PGP SIGNATURE-----
'''
decode.verify(pubkey=pubkey, signature=signature, original_data=message)
pubkey = decode.load_public_key(pubkey_bytes=decode.remove_armor(data))
assert_subdict(pubkey, expected_pubkey)
assert_subdict(pubkey, {'user_id': b'testing'})
pubkey = decode.load_public_key(pubkey_bytes=decode.remove_armor(data),
use_custom=True)
assert_subdict(pubkey, expected_pubkey)
assert_subdict(pubkey, {'user_id': b'testing'})
cwd = os.path.join(os.path.dirname(__file__))
input_files = glob.glob(os.path.join(cwd, '*.gpg'))
@@ -111,5 +41,5 @@ def public_key_path(request):
def test_gpg_files(public_key_path): # pylint: disable=redefined-outer-name
with open(public_key_path, 'rb') as f:
keys = decode.parse_public_keys(f)
assert len(keys) > 0
packets = list(decode.parse_packets(f))
assert len(packets) > 0

View File

@@ -1,4 +1,5 @@
import io
import mock
from .. import keyring

View File

@@ -30,11 +30,6 @@ def test_mpi():
assert protocol.mpi(0x123) == b'\x00\x09\x01\x23'
def test_find():
assert protocol.find_curve_by_algo_id(19) == formats.CURVE_NIST256
assert protocol.find_curve_by_algo_id(22) == formats.CURVE_ED25519
def test_armor():
data = bytearray(range(256))
assert protocol.armor(data, 'TEST') == '''-----BEGIN PGP TEST-----

View File

@@ -122,7 +122,7 @@ class Handler(object):
SSH v2 public key authentication is performed.
If the required key is not supported, raise KeyError
If the signature is invalid, rause ValueError
If the signature is invalid, raise ValueError
"""
key = formats.parse_pubkey(util.read_frame(buf))
log.debug('looking for %s', key['fingerprint'])

View File

@@ -87,8 +87,8 @@ def test_ssh_agent():
def ssh_sign_identity(identity, challenge_hidden,
challenge_visual, ecdsa_curve_name):
assert (client.identity_to_string(identity) ==
client.identity_to_string(ident))
assert (util.identity_to_string(identity) ==
util.identity_to_string(ident))
assert challenge_hidden == BLOB
assert challenge_visual == ''
assert ecdsa_curve_name == 'nist256p1'
@@ -133,4 +133,4 @@ def test_utils():
identity.path = '/path'
url = 'https://user@host:443/path'
assert client.identity_to_string(identity) == url
assert util.identity_to_string(identity) == url

View File

@@ -1,9 +1,14 @@
"""Various I/O and serialization utilities."""
import binascii
import contextlib
import hashlib
import io
import logging
import re
import struct
log = logging.getLogger(__name__)
def send(conn, data):
"""Send data blob to connection socket."""
@@ -173,3 +178,61 @@ class Reader(object):
yield
finally:
self._captured = None
_identity_regexp = re.compile(''.join([
'^'
r'(?:(?P<proto>.*)://)?',
r'(?:(?P<user>.*)@)?',
r'(?P<host>.*?)',
r'(?::(?P<port>\w*))?',
r'(?P<path>/.*)?',
'$'
]))
def string_to_identity(s, identity_type):
"""Parse string into Identity protobuf."""
m = _identity_regexp.match(s)
result = m.groupdict()
log.debug('parsed identity: %s', result)
kwargs = {k: v for k, v in result.items() if v}
return identity_type(**kwargs)
def identity_to_string(identity):
"""Dump Identity protobuf into its string representation."""
result = []
if identity.proto:
result.append(identity.proto + '://')
if identity.user:
result.append(identity.user + '@')
result.append(identity.host)
if identity.port:
result.append(':' + identity.port)
if identity.path:
result.append(identity.path)
return ''.join(result)
def get_bip32_address(identity, ecdh=False):
"""Compute BIP32 derivation address according to SLIP-0013/0017."""
index = struct.pack('<L', identity.index)
addr = index + identity_to_string(identity).encode('ascii')
log.debug('address string: %r', addr)
digest = hashlib.sha256(addr).digest()
s = io.BytesIO(bytearray(digest))
hardened = 0x80000000
addr_0 = [13, 17][bool(ecdh)]
address_n = [addr_0] + list(recv(s, '<LLLL'))
return [(hardened | value) for value in address_n]
def setup_logging(verbosity):
"""Configure logging for this tool."""
fmt = ('%(asctime)s %(levelname)-12s %(message)-100s '
'[%(filename)s:%(lineno)d]')
levels = [logging.WARNING, logging.INFO, logging.DEBUG]
level = levels[min(verbosity, len(levels) - 1)]
logging.basicConfig(format=fmt, level=level)