Compare commits

...

231 Commits

Author SHA1 Message Date
Roman Zeyde
aeda85275d bump version 2016-09-28 18:32:19 +03:00
Roman Zeyde
e41206b350 setup: update trezorlib version 2016-09-28 18:31:13 +03:00
Roman Zeyde
03650550dd Use latest protobuf library (for native Python 3 support) 2016-09-28 18:18:09 +03:00
Roman Zeyde
f7b07070da README: update setuptools to the latest version 2016-09-28 18:08:09 +03:00
Roman Zeyde
96eede9c83 Merge branch 'np-encode-subpackets' 2016-09-28 17:27:48 +03:00
Roman Zeyde
91146303a3 Follow GPG implementation for subpacket prefix encoding.
Conflicts:
	trezor_agent/gpg/protocol.py
2016-09-28 17:26:50 +03:00
Roman Zeyde
bf598435fb client: keep the session open (doesn't forget PIN) 2016-09-26 22:27:47 +03:00
Roman Zeyde
998c9ee958 README: update usage section 2016-09-11 23:38:21 +03:00
Roman Zeyde
d408a592aa README: get only the first lines of 'trezorctl get_features' 2016-09-11 23:35:10 +03:00
Roman Zeyde
282e91ace3 update README about protobuf issueOF 2016-09-11 23:22:10 +03:00
Roman Zeyde
23c37cf1e3 README: update TREZOR's required version 2016-09-11 23:17:47 +03:00
Roman Zeyde
5c5c6f9cbb bump version 2016-09-11 23:08:11 +03:00
Roman Zeyde
17c8bd0e92 gpg: add experimental warning 2016-09-11 23:06:47 +03:00
Nicolas Pouillard
016e864503 Attempt at fixing issue #32 2016-09-06 00:45:51 +02:00
Roman Zeyde
0c4e67c837 Merge pull request #30 from np/decode-subpackets
gpg/decode/parse_subpackets: parse subpacket length according to RFC
2016-09-05 20:52:48 +03:00
Nicolas Pouillard
adcbe6e7b2 gpg/decode/parse_subpackets: parse subpacket length according to RFC 2016-09-05 17:13:33 +02:00
Roman Zeyde
73bdf417e4 factory: require TREZOR firmware v1.4.0+ for GPG signatures and decryption 2016-09-02 11:38:59 +03:00
Roman Zeyde
ee347252b4 README: update gitter badge position 2016-09-01 22:02:32 +03:00
Roman Zeyde
d63f048b78 gpg: update trezor-agent installation instruction (using pip) 2016-08-27 20:59:25 +03:00
Roman Zeyde
05fada91d2 gpg: use gpgconf to get correct GPG agent UNIX socket path 2016-08-15 21:54:01 +03:00
Roman Zeyde
27a3fddfa2 gpg: add a note about restoring GPG keys with --time command-line flag 2016-08-15 21:39:34 +03:00
Roman Zeyde
45f6f1a3d8 gpg: allow setting GPG home directory via $GNUPGHOME 2016-08-05 13:48:47 +03:00
Roman Zeyde
c4c56b9faf gpg: no support for empty user_id 2016-08-05 11:57:12 +03:00
Roman Zeyde
1704ae7683 gpg: add '--user' flag to pip install command 2016-07-30 14:53:54 +03:00
Roman Zeyde
a7190223fd gpg: note WIP status in README 2016-07-30 14:52:50 +03:00
Roman Zeyde
220735c6ad gpg: note WIP status in README 2016-07-30 14:45:50 +03:00
Roman Zeyde
82e08d073b gpg: rename proto -> protocol 2016-07-26 19:37:42 +03:00
Roman Zeyde
8ab0908388 proto: don't hardcode name length 2016-07-26 17:59:06 +03:00
Roman Zeyde
fd3183d71c gitignore: sublime text project files 2016-07-26 17:57:34 +03:00
Roman Zeyde
295d52ef10 gpg: move 'iterlines' to keyring 2016-07-26 17:50:49 +03:00
Roman Zeyde
8a51099488 gpg: remove unused "sign_message" 2016-07-26 17:42:14 +03:00
Roman Zeyde
f4dd1eacdd gpg: allow parsing multiple keys 2016-07-26 17:35:07 +03:00
Roman Zeyde
024b5f131f README: reformat links 2016-07-22 23:14:25 +03:00
Roman Zeyde
b9b7b8dafd gpg: re-structure public key packets for easier parsing 2016-07-22 23:14:25 +03:00
Roman Zeyde
744696fdee gpg: decode user_attribute packets 2016-07-22 22:44:54 +03:00
Roman Zeyde
ccdbc7abfc gpg: parse_packets() should get file-like stream
and wrap it with util.Reader()
2016-07-22 21:46:38 +03:00
Roman Zeyde
e70f0ec681 gpg: refactor hash algorithm handling 2016-07-09 12:25:05 +03:00
Roman Zeyde
aeaf978d8e gpg: add mulitple GPG public keys as test vectors 2016-07-09 12:08:07 +03:00
Roman Zeyde
d60fff202a gpg: don't validate non-ECDSA signatures 2016-07-09 11:26:48 +03:00
Roman Zeyde
9171dd08c8 README: update posts 2016-06-23 22:25:27 +03:00
Roman Zeyde
4c5004d838 Merge pull request #16 from jhoenicke/master
More robust gpg key parsing
2016-06-22 20:58:34 +03:00
Jochen Hoenicke
a2e46048a1 Use TREZOR_GPG_USER_ID in agent 2016-06-22 02:31:57 +02:00
Jochen Hoenicke
e66b0f47ed More robust gpg key parsing
Handle new packet format.
Ignore unknown packets.
Handle packets that are not immediately followed by signature.
Handle other hash algorithms.
2016-06-22 02:31:22 +02:00
Roman Zeyde
db874ad98f README: add GPG part 2016-06-20 22:43:13 +03:00
Roman Zeyde
ed2d71cc08 README: split into main and SSH parts 2016-06-20 22:36:07 +03:00
Roman Zeyde
59b39ce81f Merge branch 'gpg-agent' 2016-06-20 22:19:12 +03:00
Roman Zeyde
75f879edbb gpg: update README.md 2016-06-20 22:18:03 +03:00
Roman Zeyde
45a85a317b gpg: allow setting UNIX socket from command-line 2016-06-18 20:10:52 +03:00
Roman Zeyde
7b3874e6f7 gpg: fixup logging during key creation 2016-06-17 22:05:13 +03:00
Roman Zeyde
6c96cc37b9 gpg: add support for adding subkeys to EdDSA primary GPG keys 2016-06-17 21:59:13 +03:00
Roman Zeyde
c98cb22ba4 gpg: use separate derivations for GPG keys 2016-06-17 19:51:49 +03:00
Roman Zeyde
d9fbfccd35 gpg: load correct key if ECDH is requested 2016-06-17 19:51:49 +03:00
Roman Zeyde
fe4d9ed3c8 gpg: add SLIP-0017 support for ECDH session key generation 2016-06-17 09:29:53 +03:00
Roman Zeyde
092445af71 agent: handle connection errors 2016-06-11 20:26:10 +03:00
Roman Zeyde
602e867c7d gpg: add test for keygrip 2016-06-11 20:18:07 +03:00
Roman Zeyde
16de8cdabc agent: refactor signature and ECDH 2016-06-11 15:06:35 +03:00
Roman Zeyde
7bbf11b631 gpg: refactor key creation 2016-06-11 14:46:24 +03:00
Roman Zeyde
3e41fddcef gpg: add test for ECDH pubkey generation 2016-06-11 14:02:12 +03:00
Roman Zeyde
8108e5400d gpg: support TREZOR-based primary key 2016-06-11 13:47:56 +03:00
Roman Zeyde
a1659e0f0d gpg: add preferred symmetric algo 2016-06-11 10:34:59 +03:00
Roman Zeyde
3b139314b6 gpg: refactor sign_message method 2016-06-06 23:02:14 +03:00
Roman Zeyde
a05cff5079 gpg: use "gpg2" for 'git config --local gpg.program' 2016-06-06 23:02:14 +03:00
Roman Zeyde
694cee17ac gpg: refactor create_* methods 2016-06-04 20:54:07 +03:00
Roman Zeyde
bc281d4411 gpg: use local version 2016-06-04 19:45:03 +03:00
Roman Zeyde
04af6b737b gpg: remove extra param from Factory.from_public_key() 2016-06-04 09:53:43 +03:00
Roman Zeyde
171c746c7e gpg: move agent main code to __main__ 2016-06-04 09:53:23 +03:00
Roman Zeyde
8b5ac14150 gpg: add docstrings 2016-06-03 22:44:25 +03:00
Roman Zeyde
16090cebed pylint: ignore TODOs 2016-06-03 22:40:17 +03:00
Roman Zeyde
d2167cd4ff gpg: check keygrip on ECDH 2016-06-03 22:39:31 +03:00
Roman Zeyde
10cbe67c9a gpg: add TODO 2016-06-03 21:53:31 +03:00
Roman Zeyde
29a984eebb gpg: improve flags selection 2016-06-03 20:17:53 +03:00
Roman Zeyde
a6660fd5c5 gpg: handle BYE command 2016-06-03 17:43:46 +03:00
Roman Zeyde
2acd0bf3b7 gpg: fix keygrip computation 2016-06-03 17:41:31 +03:00
Roman Zeyde
e9f7894d62 ecdh: fixup pubkey ID 2016-06-03 15:05:45 +03:00
Roman Zeyde
56e9d7c776 gpg: allow graceful exit via Ctrl+C 2016-06-03 14:42:40 +03:00
Roman Zeyde
e7bacf829c gpg: refactor ecdh case 2016-06-03 14:39:16 +03:00
Roman Zeyde
c1c679b541 HACK: support ECDH in agent - note keygrip and ID errors. 2016-06-02 23:24:36 +03:00
Roman Zeyde
49c343df94 HACK: create subkey with ECDH support 2016-06-02 22:54:08 +03:00
Roman Zeyde
7da7f5c256 HACK: fixup tests 2016-06-02 22:25:44 +03:00
Roman Zeyde
39cb5565bf HACK: better line iteration 2016-06-02 21:39:48 +03:00
Roman Zeyde
f89c5bb125 HACK: better logging 2016-06-02 21:38:48 +03:00
Roman Zeyde
92649b290f HACK: add preliminary gpg support 2016-05-30 21:57:10 +03:00
Roman Zeyde
d9b07e2ac6 gpg: hack agent prototype 2016-05-28 23:02:45 +03:00
Roman Zeyde
6975671cc1 setup: fix protobuf dependency to allow 2.6.1+
protobuf 3.0+ is needed for Python 3 support.
2016-05-28 08:43:55 +03:00
Roman Zeyde
f0ea568bb8 gpg: add more UTs for decode 2016-05-27 22:07:50 +03:00
Roman Zeyde
34c614db6e gpg: add more UTs for decode 2016-05-27 21:52:00 +03:00
Roman Zeyde
2bbd335f7e pep8: use tox.ini for configuration 2016-05-27 19:48:28 +03:00
Roman Zeyde
af8ad99c7a gpg: add UTs for decode 2016-05-27 16:59:10 +03:00
Roman Zeyde
313271ac06 gpg: move signer.py to __main__.py 2016-05-27 14:38:38 +03:00
Roman Zeyde
969e08140b gpg: add more tests for keyring 2016-05-27 13:43:55 +03:00
Roman Zeyde
39f00af65d gpg: add help for sign arguments 2016-05-27 12:20:33 +03:00
Roman Zeyde
272759e907 gpg: allow dependency injection for subprocess module 2016-05-27 12:20:33 +03:00
Roman Zeyde
4be55156ed gpg: refactor pubkeys' parsing code 2016-05-27 11:28:23 +03:00
Roman Zeyde
80a5ea0f2a gpg: add UTs for keyring 2016-05-26 23:16:08 +03:00
Roman Zeyde
87e50449e5 travis: add Python 3.5 2016-05-26 22:31:39 +03:00
Roman Zeyde
dcf35c4267 decode: split _remove_armor() from verify() 2016-05-26 22:29:19 +03:00
Roman Zeyde
7570861765 gpg: fixup signer docstring 2016-05-26 22:29:19 +03:00
Roman Zeyde
339f61c071 gpg: better __repr__ and logging for public keys 2016-05-26 22:29:19 +03:00
Roman Zeyde
3c4fb7a17b gpg: allow pinentry UI via "display=" option 2016-05-25 18:52:20 +03:00
Roman Zeyde
a6a0c05f57 keyring: fix more Python 2/3 issues 2016-05-23 23:03:02 +03:00
Roman Zeyde
4c036d2ce7 gpg: fixup str/bytes issues 2016-05-22 23:10:12 +03:00
Roman Zeyde
eaa91cfdbd gpg: add tests for basic protocol utils 2016-05-22 23:06:12 +03:00
Roman Zeyde
fd61941d0f gpg: fixup subcommand for Python 3
http://bugs.python.org/issue9253#msg186387
2016-05-22 22:28:07 +03:00
Roman Zeyde
decd3ddf75 gpg: fixup str/bytes issues 2016-05-22 22:20:55 +03:00
Roman Zeyde
4c07b360cd gpg: fix pep8/pylint warning 2016-05-22 08:07:51 +03:00
Roman Zeyde
0b0f60dd89 gpg: rename load_from_gpg -> get_public_key 2016-05-21 20:23:48 +03:00
Roman Zeyde
db6903eab7 gpg: rename agent -> keyring 2016-05-21 20:17:58 +03:00
Roman Zeyde
171a0c2f6a gpg: remove agent's main 2016-05-21 20:12:41 +03:00
Roman Zeyde
a535b31a1b gpg: fixup lint/pep8 2016-05-21 20:00:38 +03:00
Roman Zeyde
ee4bcddd22 gpg: rename main API 2016-05-21 17:32:15 +03:00
Roman Zeyde
f626d34e21 gpg: using closing() context handler 2016-05-21 17:21:16 +03:00
Roman Zeyde
2cf081420f gpg: move armor to proto 2016-05-21 17:15:42 +03:00
Roman Zeyde
0e72e3b7ff gpg: move PublicKey to proto 2016-05-21 17:10:17 +03:00
Roman Zeyde
ce61c8b2ae gpg: move timeformat from util 2016-05-21 17:04:18 +03:00
Roman Zeyde
3192e570ed gpg: initial support for ElGamal and DSA
Doesn't verify anything (yet).
2016-05-21 16:54:38 +03:00
Roman Zeyde
bf8f516ef4 gpg: no visual challenge 2016-05-21 07:44:27 +03:00
Roman Zeyde
51f7d6120b client: not visual challength for SSH 2016-05-21 07:43:10 +03:00
Roman Zeyde
0cb7cf0746 Merge branch 'python3' 2016-05-18 18:42:00 +03:00
Roman Zeyde
b4ff31f816 gpg: handle ECDH keys 2016-05-12 22:15:05 +03:00
Roman Zeyde
6e9d6d6430 gpg: add URLs for subpackets 2016-05-12 21:55:26 +03:00
Roman Zeyde
fa9391ede6 gpg: update required firmware version 2016-05-08 21:19:28 +03:00
Roman Zeyde
ad8eafe6f8 Merge branch 'master' into python3
Conflicts:
	setup.py
2016-05-07 21:14:20 +03:00
Roman Zeyde
695079e4b9 agent: raise explicit error when signature fails 2016-05-07 20:49:51 +03:00
Roman Zeyde
9888ef971a gpg: add installation command to README 2016-05-07 20:41:34 +03:00
Roman Zeyde
04a878374f setup: add gpg subpackage 2016-05-07 20:38:19 +03:00
Roman Zeyde
4270d8464f gpg: add screencasts 2016-05-07 20:29:07 +03:00
Roman Zeyde
25a427081c gpg: add more output examples 2016-05-07 13:24:25 +03:00
Roman Zeyde
939fdbe829 gpg: add output examples 2016-05-07 13:15:29 +03:00
Roman Zeyde
1f126f3002 gpg: better logging 2016-05-07 13:05:25 +03:00
Roman Zeyde
78526d1379 gpg: install gpg-git wrapper script 2016-05-07 13:02:16 +03:00
Roman Zeyde
7e3c3b4f77 gpg: fixup README 2016-05-07 12:49:01 +03:00
Roman Zeyde
513c19bf1f gpg: remove unused files 2016-05-07 09:54:55 +03:00
Roman Zeyde
f1e75783c4 gpg: use environment variable for user_id 2016-05-07 09:41:58 +03:00
Roman Zeyde
68637525ea Merge branch 'master' into python3 2016-05-06 22:24:17 +03:00
Roman Zeyde
fce45832c2 gpg: fix small typo 2016-05-06 22:22:02 +03:00
Roman Zeyde
df001c4100 gpg: rename README 2016-05-06 22:20:50 +03:00
Roman Zeyde
1a228a1af6 gpg: refactor cli 2016-05-06 22:19:46 +03:00
Roman Zeyde
7a7c9efc47 pylint: disable unbalanced-tuple-unpacking warning 2016-05-06 21:17:10 +03:00
Roman Zeyde
859cee9757 travis: use latest tools 2016-05-06 21:15:04 +03:00
Roman Zeyde
2846c0bf1a util: add tests for gpg-related code 2016-05-06 14:28:15 +03:00
Roman Zeyde
b2147a8418 formats: curve name should be a string 2016-05-05 22:31:07 +03:00
Roman Zeyde
4cbf8a9f0a setup: WiP for Python 3 support 2016-05-05 22:17:04 +03:00
Roman Zeyde
d9c4e930f3 main: fixup str/bytes issue for curve_name 2016-05-05 21:42:11 +03:00
Roman Zeyde
6fd6fe6520 handle missing imports 2016-05-04 23:05:43 +03:00
Roman Zeyde
4a7fef3011 gpg: fix logging and arguments in demo 2016-04-30 22:20:50 +03:00
Roman Zeyde
a0e476ea19 gpg: remove unused code 2016-04-30 22:15:15 +03:00
Roman Zeyde
683aae7aa4 gpg: add logging for digest 2016-04-30 22:11:51 +03:00
Roman Zeyde
d369638c7b gpg: add a script for faster commit verification 2016-04-30 22:07:46 +03:00
Roman Zeyde
07c4100618 gpg: fixup logging and make sure it works with git 2016-04-30 21:55:37 +03:00
Roman Zeyde
b9f139b74a gpg: refactor subkey as pubkey 2016-04-30 21:34:12 +03:00
Roman Zeyde
3bf926620b gpg: handle multiple packets 2016-04-30 21:07:19 +03:00
Roman Zeyde
ab192619f4 gpg: move protocol utils to proto.py 2016-04-30 16:50:01 +03:00
Roman Zeyde
f982d785bd gpg: add marker to our pubkey signature packets 2016-04-30 16:27:43 +03:00
Roman Zeyde
38c1acf4db Merge branch 'subkey' 2016-04-30 15:42:21 +03:00
Roman Zeyde
31c3686fa4 gpg: small fixes 2016-04-30 15:39:32 +03:00
Roman Zeyde
87ca33c104 gpg: fixup encoding for large packets 2016-04-30 15:34:18 +03:00
Roman Zeyde
c3d23ea7f5 gpg: allow longer packets 2016-04-30 14:47:32 +03:00
Roman Zeyde
5c04d17c43 gpg: demo with ed25519 TREZOR-based keys 2016-04-30 13:32:20 +03:00
Roman Zeyde
2d2d6efa93 gpg: small refactoring 2016-04-30 13:25:14 +03:00
Roman Zeyde
131c30acca gpg: use explicit public key algo_id 2016-04-30 13:20:06 +03:00
Roman Zeyde
a7ef263954 gpg: generalize RSA/ECDSA signatures 2016-04-30 13:01:40 +03:00
Roman Zeyde
d486c1ee7b gpg: refactor agent rsa/ecdsa signature parsing 2016-04-30 12:33:01 +03:00
Roman Zeyde
f35b5be3ac gpg: 1st try for RSA primary key support 2016-04-30 11:40:02 +03:00
Roman Zeyde
9ed9781496 gpg: support RSA decode and verify 2016-04-30 11:02:41 +03:00
Roman Zeyde
5d007260e1 gpg: add docstrings 2016-04-30 10:04:44 +03:00
Roman Zeyde
7dfa3ab255 gpg: replace PublicKey.curve_name attribute 2016-04-30 09:29:04 +03:00
Roman Zeyde
b8eba72d0b gpg: fixup subkey/export handling 2016-04-29 22:46:02 +03:00
Roman Zeyde
492285de1b gpg: rename pubkey methods 2016-04-29 22:28:41 +03:00
Roman Zeyde
cc326b1f7d gpg: pubkey is not needed for make_signature 2016-04-29 22:25:08 +03:00
Roman Zeyde
169ff39b1a gpg: remove visual keyword for now 2016-04-29 22:23:12 +03:00
Roman Zeyde
dcc7ef2600 minor fixes 2016-04-29 22:10:04 +03:00
Roman Zeyde
ac2d12b354 It works again! 2016-04-29 17:45:16 +03:00
Roman Zeyde
f3b49ff553 gpg: use strict bash mode for demo 2016-04-29 11:14:27 +03:00
Roman Zeyde
12d640c66b fixup pep8 2016-04-29 10:25:46 +03:00
Roman Zeyde
32984d2d3f agent: add support for gpg passphrase entry 2016-04-29 10:16:58 +03:00
Roman Zeyde
a45c6c1300 horrible hack - but IT WORKS!!! 2016-04-28 22:17:08 +03:00
Roman Zeyde
1d3ba7e9b7 subkey: add backsig 2016-04-28 22:10:40 +03:00
Roman Zeyde
673b1df648 1st try 2016-04-28 21:31:01 +03:00
Roman Zeyde
e63f03354e gpg: refactor signing providers from actual Signer class 2016-04-28 14:56:58 +03:00
Roman Zeyde
3c9c1b4e14 gpg: export verifying_key from parsing 2016-04-28 14:44:52 +03:00
Roman Zeyde
5caf4728ee gpg: fixup comment 2016-04-28 12:56:06 +03:00
Roman Zeyde
dde6dcdaeb gpg: fix unpacking for subkey-case 2016-04-28 12:55:48 +03:00
Roman Zeyde
1f3c989884 gpg: 'dump' -> 'serialize' 2016-04-28 12:34:00 +03:00
Roman Zeyde
55dea41959 gpg: make sure gpg-agent is running before connecting 2016-04-28 12:09:45 +03:00
Roman Zeyde
ed01c00d0c gpg: add agent-signing tool 2016-04-27 21:01:21 +03:00
Roman Zeyde
e09571151c gpg: remove length type logging 2016-04-26 21:46:39 +03:00
Roman Zeyde
340aae4fb8 gpg: refactor decode to functional style 2016-04-26 21:38:59 +03:00
Roman Zeyde
9875c9927e gpg: demo for subkeys decoding 2016-04-26 21:12:02 +03:00
Roman Zeyde
d9862ae0e1 gpg: debug logging for ECDSA verification 2016-04-26 12:57:27 +03:00
Roman Zeyde
5fb8b0e047 decode: parse GPG subkeys 2016-04-26 12:54:10 +03:00
Roman Zeyde
324fc21a5c decode: refactor digest calculation 2016-04-26 12:34:50 +03:00
Roman Zeyde
e2f5ccafdf signer: allow importing to local keyring (using "-o" flag) 2016-04-26 10:26:12 +03:00
Roman Zeyde
a0b4776374 gpg: fixup exception message 2016-04-25 18:19:08 +03:00
Roman Zeyde
5abc3dc41b gpg: fix check script -v option 2016-04-24 21:56:09 +03:00
Roman Zeyde
3c2eb64e0d gpg: fixup demo script 2016-04-24 14:25:01 +03:00
Roman Zeyde
67d58a5ae0 Merge pull request #10 from romanz/gpg
GPG v2.1 support
2016-04-24 14:07:22 +03:00
Roman Zeyde
9a435ae23e gpg: minor renames and code refactoring 2016-04-24 14:05:30 +03:00
Roman Zeyde
d7913a84d5 gpg: pydocstyle fixes 2016-04-24 12:22:02 +03:00
Roman Zeyde
a114242243 gpg: small fixes before merging to master 2016-04-24 10:58:32 +03:00
Roman Zeyde
b6dbc4aa81 gpg: small fixes before merging to master 2016-04-23 23:37:11 +03:00
Roman Zeyde
6cc3a629a8 gpg: export git-gpg wrapper
should be used as 'gpg.program' in .git/config
2016-04-23 23:13:06 +03:00
Roman Zeyde
0c94363595 gpg: export command-line tool 2016-04-23 23:12:32 +03:00
Roman Zeyde
40377fc66b gpg: add __init__.py 2016-04-23 22:46:24 +03:00
Roman Zeyde
489c8fe357 gpg: rename git wrapper 2016-04-23 22:45:11 +03:00
Roman Zeyde
6f4f33bfa5 gpg: verify signature after signing 2016-04-23 22:41:43 +03:00
Roman Zeyde
76ce25fab1 gpg: fixup imports 2016-04-23 22:30:12 +03:00
Roman Zeyde
5506310239 gpg: move under trezor_agent 2016-04-23 21:47:30 +03:00
Roman Zeyde
9dc955aae8 gpg: fix signer logging 2016-04-23 19:31:23 +03:00
Roman Zeyde
80f29469d0 gpg: deduce curve name from existing pubkey information 2016-04-23 00:08:45 +03:00
Roman Zeyde
fb368d24eb gpg: use subprocess.call() 2016-04-22 23:48:01 +03:00
Roman Zeyde
8c0848b459 gpg: debug during check 2016-04-22 23:48:01 +03:00
Roman Zeyde
276dec5728 gpg: support ed25519 public keys and signatures 2016-04-22 23:47:25 +03:00
Roman Zeyde
74f7ebf228 gpg: support ed25519 decoding 2016-04-22 22:39:03 +03:00
Roman Zeyde
7ef0958c33 gpg: minor fixes 2016-04-22 21:37:23 +03:00
Roman Zeyde
1402918bb3 gpg: use user name instead of key id 2016-04-22 21:36:56 +03:00
Roman Zeyde
b6cfa0c03f main: show better error when no SSH remote is found 2016-04-22 11:31:00 +03:00
Roman Zeyde
33ff9ba667 signer: update required patch link 2016-04-20 22:42:47 +03:00
Roman Zeyde
ab64505cdb gpg: refactor hexlification of key_id 2016-04-20 21:41:46 +03:00
Roman Zeyde
5651452c0d gpg: rename GPG public key file 2016-04-20 21:41:31 +03:00
Roman Zeyde
af6d0caf33 Add GPG-wrapper script for Git 2016-04-18 23:02:14 +03:00
Roman Zeyde
96592269b6 signer: refactor a bit 2016-04-18 22:10:00 +03:00
Roman Zeyde
b2d078eec6 simplify signer usage
and make less INFO loggin
2016-04-18 21:55:23 +03:00
Roman Zeyde
01dafb0ebd signer: show key ID on TREZOR screen 2016-04-17 23:11:18 +03:00
Roman Zeyde
447faf973c signer should export public key or sign a file 2016-04-17 23:04:23 +03:00
Roman Zeyde
add90e3c51 signer: support armoring public keys 2016-04-17 22:40:02 +03:00
Roman Zeyde
34670c601d Fix PEP8 warnings 2016-04-17 22:19:55 +03:00
Roman Zeyde
b9ba4a3082 split decoding functionality 2016-04-17 22:18:42 +03:00
Roman Zeyde
4335740abe Add experimental support for GPG signing via TREZOR
In order to use this feature, GPG "modern" (v2.1) is required [1].
Also, since TREZOR protocol does not support arbitrary long fields,
TREZOR firmware needs to be adapted  with the following patch [2],
to support signing fixed-size digests of GPG messages of arbitrary size.

[1] https://gist.github.com/vt0r/a2f8c0bcb1400131ff51
[2] https://gist.github.com/romanz/b66f5df1ca8ef15641df8ea5bb09fd47
2016-04-16 21:21:12 +03:00
Roman Zeyde
861401e89a client: make get_address() public 2016-04-09 21:09:11 +03:00
Roman Zeyde
335d050212 formats: fixup comment 2016-04-09 20:40:32 +03:00
Roman Zeyde
6e1b08c27a README: fix links 2016-03-12 21:18:13 +02:00
36 changed files with 1986 additions and 113 deletions

3
.gitignore vendored
View File

@@ -55,3 +55,6 @@ docs/_build/
# PyBuilder
target/
# Sublime Text
*.sublime-*

View File

@@ -1,2 +1,2 @@
[MESSAGES CONTROL]
disable=invalid-name, missing-docstring, locally-disabled
disable=fixme, invalid-name, missing-docstring, locally-disabled, unbalanced-tuple-unpacking

View File

@@ -3,10 +3,11 @@ language: python
python:
- "2.7"
- "3.4"
- "3.5"
install:
- pip install ecdsa ed25519 semver # test without trezorlib for now
- pip install pylint coverage pep8 pydocstyle
- pip install -U pylint coverage pep8 pydocstyle # use latest tools
script:
- pep8 trezor_agent

104
README-GPG.md Normal file
View File

@@ -0,0 +1,104 @@
Note: the GPG-related code is still under development, so please try the current implementation
and feel free to [report any issue](https://github.com/romanz/trezor-agent/issues) you have encountered.
Thanks!
# Installation
First, verify that you have GPG 2.1+ [installed](https://gist.github.com/vt0r/a2f8c0bcb1400131ff51):
```
$ gpg2 --version | head -n1
gpg (GnuPG) 2.1.11
```
Update you TREZOR firmware to the latest version (at least [c720614](https://github.com/trezor/trezor-mcu/commit/c720614f6e9b9c07f446c95bda0257980d942871)).
Install latest `trezor-agent` package from [gpg-agent](https://github.com/romanz/trezor-agent/commits/gpg-agent) branch:
```
$ pip install --user git+https://github.com/romanz/trezor-agent.git
```
Define your GPG user ID as an environment variable:
```
$ export TREZOR_GPG_USER_ID="John Doe <john@doe.bit>"
```
There are two ways to generate TREZOR-based GPG public keys, as described below.
## 1. generate a new GPG identity:
```
$ trezor-gpg create | gpg2 --import # use the TREZOR to confirm signing the primary key
gpg: key 5E4D684D: public key "John Doe <john@doe.bit>" imported
gpg: Total number processed: 1
gpg: imported: 1
$ gpg2 --edit "${TREZOR_GPG_USER_ID}" trust # set this key to ultimate trust (option #5)
$ gpg2 -k
/home/roman/.gnupg/pubring.kbx
------------------------------
pub nistp256/5E4D684D 2016-06-17 [SC]
uid [ultimate] John Doe <john@doe.bit>
sub nistp256/A31D9E25 2016-06-17 [E]
```
## 2. generate a new subkey for an existing GPG identity:
```
$ gpg2 -k # suppose there is already a GPG primary key
/home/roman/.gnupg/pubring.kbx
------------------------------
pub rsa2048/87BB07B4 2016-06-17 [SC]
uid [ultimate] John Doe <john@doe.bit>
sub rsa2048/7176D31F 2016-06-17 [E]
$ trezor-gpg create --subkey | gpg2 --import # use the TREZOR to confirm signing the subkey
gpg: key 87BB07B4: "John Doe <john@doe.bit>" 2 new signatures
gpg: key 87BB07B4: "John Doe <john@doe.bit>" 2 new subkeys
gpg: Total number processed: 1
gpg: new subkeys: 2
gpg: new signatures: 2
$ gpg2 -k
/home/roman/.gnupg/pubring.kbx
------------------------------
pub rsa2048/87BB07B4 2016-06-17 [SC]
uid [ultimate] John Doe <john@doe.bit>
sub rsa2048/7176D31F 2016-06-17 [E]
sub nistp256/DDE80B36 2016-06-17 [S]
sub nistp256/E3D0BA19 2016-06-17 [E]
```
# Usage examples:
## Start the TREZOR-based gpg-agent:
```
$ trezor-gpg agent &
```
Note: this agent intercepts all GPG requests, so make sure to close it (e.g. by using `killall trezor-gpg`),
when you are done with the TREZOR-based GPG operations.
## Sign and verify GPG messages:
```
$ echo "Hello World!" | gpg2 --sign | gpg2 --verify
gpg: Signature made Fri 17 Jun 2016 08:55:13 PM IDT using ECDSA key ID 5E4D684D
gpg: Good signature from "Roman Zeyde <roman.zeyde@gmail.com>" [ultimate]
```
## Encrypt and decrypt GPG messages:
```
$ date | gpg2 --encrypt -r "${TREZOR_GPG_USER_ID}" | gpg2 --decrypt
gpg: encrypted with 256-bit ECDH key, ID A31D9E25, created 2016-06-17
"Roman Zeyde <roman.zeyde@gmail.com>"
Fri Jun 17 20:55:31 IDT 2016
```
## Git commit & tag signatures:
Git can use GPG to sign and verify commits and tags (see [here](https://git-scm.com/book/en/v2/Git-Tools-Signing-Your-Work)):
```
$ git config --local gpg.program gpg2
$ git commit --gpg-sign # create GPG-signed commit
$ git log --show-signature -1 # verify commit signature
$ git tag --sign "TAG" # create GPG-signed tag
$ git verify-tag "TAG" # verify tag signature
```

45
README-SSH.md Normal file
View File

@@ -0,0 +1,45 @@
# Screencast demo usage
## Simple usage (single SSH session)
[![Demo](https://asciinema.org/a/22959.png)](https://asciinema.org/a/22959)
## Advanced usage (multiple SSH sessions from a sub-shell)
[![Subshell](https://asciinema.org/a/33240.png)](https://asciinema.org/a/33240)
## Using for GitHub SSH authentication (via `trezor-git` utility)
[![GitHub](https://asciinema.org/a/38337.png)](https://asciinema.org/a/38337)
# Public key generation
Run:
/tmp $ trezor-agent ssh.hostname.com -v > hostname.pub
2015-09-02 15:03:18,929 INFO getting "ssh://ssh.hostname.com" public key from Trezor...
2015-09-02 15:03:23,342 INFO disconnected from Trezor
/tmp $ cat hostname.pub
ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBGSevcDwmT+QaZPUEWUUjTeZRBICChxMKuJ7dRpBSF8+qt+8S1GBK5Zj8Xicc8SHG/SE/EXKUL2UU3kcUzE7ADQ= ssh://ssh.hostname.com
Append `hostname.pub` contents to `~/.ssh/authorized_keys`
configuration file at `ssh.hostname.com`, so the remote server
would allow you to login using the corresponding private key signature.
# Usage
Run:
/tmp $ trezor-agent ssh.hostname.com -v -c
2015-09-02 15:09:39,782 INFO getting "ssh://ssh.hostname.com" public key from Trezor...
2015-09-02 15:09:44,430 INFO please confirm user "roman" login to "ssh://ssh.hostname.com" using Trezor...
2015-09-02 15:09:46,152 INFO signature status: OK
Linux lmde 3.16.0-4-amd64 #1 SMP Debian 3.16.7-ckt11-1+deb8u3 (2015-08-04) x86_64
The programs included with the Debian GNU/Linux system are free software;
the exact distribution terms for each program are described in the
individual files in /usr/share/doc/*/copyright.
Debian GNU/Linux comes with ABSOLUTELY NO WARRANTY, to the extent
permitted by applicable law.
Last login: Tue Sep 1 15:57:05 2015 from localhost
~ $
Make sure to confirm SSH signature on the Trezor device when requested.

View File

@@ -1,79 +1,47 @@
# Using TREZOR as a hardware SSH agent
# Using TREZOR as a hardware SSH/GPG agent
[![Build Status](https://travis-ci.org/romanz/trezor-agent.svg?branch=master)](https://travis-ci.org/romanz/trezor-agent)
[![Python Versions](https://img.shields.io/pypi/pyversions/trezor_agent.svg)](https://pypi.python.org/pypi/trezor_agent/)
[![Package Version](https://img.shields.io/pypi/v/trezor_agent.svg)](https://pypi.python.org/pypi/trezor_agent/)
[![Development Status](https://img.shields.io/pypi/status/trezor_agent.svg)](https://pypi.python.org/pypi/trezor_agent/)
[![Downloads](https://img.shields.io/pypi/dm/trezor_agent.svg)](https://pypi.python.org/pypi/trezor_agent/)
[![Chat](https://badges.gitter.im/romanz/trezor-agent.svg)](https://gitter.im/romanz/trezor-agent)
See SatoshiLabs' blog post about this feature:
See SatoshiLabs' blog posts about this feature:
- https://medium.com/@satoshilabs/trezor-firmware-1-3-4-enables-ssh-login-86a622d7e609
## Screencast demo usage
### Simple usage (single SSH session)
[![Demo](https://asciinema.org/a/22959.png)](https://asciinema.org/a/22959)
### Advanced usage (multiple SSH sessions from a sub-shell)
[![Subshell](https://asciinema.org/a/33240.png)](https://asciinema.org/a/33240)
### Using for GitHub SSH authentication (via `trezor-git` utility)
[![GitHub](https://asciinema.org/a/38337.png)](https://asciinema.org/a/38337)
- [TREZOR Firmware 1.3.4 enables SSH login](https://medium.com/@satoshilabs/trezor-firmware-1-3-4-enables-ssh-login-86a622d7e609)
- [TREZOR Firmware 1.3.6GPG Signing, SSH Login Updates and Advanced Transaction Features for Segwit](https://medium.com/@satoshilabs/trezor-firmware-1-3-6-20a7df6e692)
## Installation
First, make sure that the latest `trezorlib` Python package
First, make sure that the latest [trezorlib](https://pypi.python.org/pypi/trezor) Python package
is installed correctly (at least v0.6.6):
$ apt-get install python-dev libusb-1.0-0-dev libudev-dev
$ pip install Cython trezor
$ pip install -U setuptools
Then, install the latest `trezor_agent` package:
Then, install the latest [trezor_agent](https://pypi.python.org/pypi/trezor_agent) package:
$ pip install trezor_agent
Finally, verify that you are running the latest TREZOR firmware version (at least v1.3.4):
Finally, verify that you are running the latest [TREZOR firmware](https://mytrezor.com/data/firmware/releases.json) version (at least v1.4.0):
$ trezorctl get_features
$ trezorctl get_features | head
vendor: "bitcointrezor.com"
major_version: 1
minor_version: 3
patch_version: 4
minor_version: 4
patch_version: 0
...
## Public key generation
Run:
/tmp $ trezor-agent ssh.hostname.com -v > hostname.pub
2015-09-02 15:03:18,929 INFO getting "ssh://ssh.hostname.com" public key from Trezor...
2015-09-02 15:03:23,342 INFO disconnected from Trezor
/tmp $ cat hostname.pub
ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBGSevcDwmT+QaZPUEWUUjTeZRBICChxMKuJ7dRpBSF8+qt+8S1GBK5Zj8Xicc8SHG/SE/EXKUL2UU3kcUzE7ADQ= ssh://ssh.hostname.com
Append `hostname.pub` contents to `~/.ssh/authorized_keys`
configuration file at `ssh.hostname.com`, so the remote server
would allow you to login using the corresponding private key signature.
## Usage
Run:
For SSH, see the [following instructions](README-SSH.md).
/tmp $ trezor-agent ssh.hostname.com -v -c
2015-09-02 15:09:39,782 INFO getting "ssh://ssh.hostname.com" public key from Trezor...
2015-09-02 15:09:44,430 INFO please confirm user "roman" login to "ssh://ssh.hostname.com" using Trezor...
2015-09-02 15:09:46,152 INFO signature status: OK
Linux lmde 3.16.0-4-amd64 #1 SMP Debian 3.16.7-ckt11-1+deb8u3 (2015-08-04) x86_64
For GPG, see the [following instructions](README-GPG.md).
The programs included with the Debian GNU/Linux system are free software;
the exact distribution terms for each program are described in the
individual files in /usr/share/doc/*/copyright.
Questions, suggestions and discussions are welcome: [![Chat](https://badges.gitter.im/romanz/trezor-agent.svg)](https://gitter.im/romanz/trezor-agent)
Debian GNU/Linux comes with ABSOLUTELY NO WARRANTY, to the extent
permitted by applicable law.
Last login: Tue Sep 1 15:57:05 2015 from localhost
~ $
## Troubleshooting
Make sure to confirm SSH signature on the Trezor device when requested.
If there is an import problem with the installed `protobuf` package,
see [this issue](https://github.com/romanz/trezor-agent/issues/28) for fixing it.

View File

@@ -3,13 +3,13 @@ from setuptools import setup
setup(
name='trezor_agent',
version='0.6.5',
version='0.7.0',
description='Using Trezor as hardware SSH agent',
author='Roman Zeyde',
author_email='roman.zeyde@gmail.com',
url='http://github.com/romanz/trezor-agent',
packages=['trezor_agent'],
install_requires=['ecdsa>=0.13', 'ed25519>=1.4', 'Cython>=0.23.4', 'trezor>=0.6.6', 'keepkey>=0.7.0', 'semver>=2.2'],
packages=['trezor_agent', 'trezor_agent.gpg'],
install_requires=['ecdsa>=0.13', 'ed25519>=1.4', 'Cython>=0.23.4', 'protobuf>=3.0.0', 'trezor>=0.7.4', 'semver>=2.2'],
platforms=['POSIX'],
classifiers=[
'Environment :: Console',
@@ -30,5 +30,6 @@ setup(
entry_points={'console_scripts': [
'trezor-agent = trezor_agent.__main__:run_agent',
'trezor-git = trezor_agent.__main__:run_git',
'trezor-gpg = trezor_agent.gpg.__main__:main',
]},
)

View File

@@ -1,5 +1,7 @@
[tox]
envlist = py27,py34
[pep8]
max-line-length = 100
[testenv]
deps=
pytest

View File

@@ -2,11 +2,10 @@
import argparse
import functools
import logging
import re
import os
import re
import subprocess
import sys
import time
from . import client, formats, protocol, server
@@ -31,7 +30,7 @@ def create_parser():
p = argparse.ArgumentParser()
p.add_argument('-v', '--verbose', default=0, action='count')
curve_names = [name.decode('ascii') for name in formats.SUPPORTED_CURVES]
curve_names = [name for name in formats.SUPPORTED_CURVES]
curve_names = ', '.join(sorted(curve_names))
p.add_argument('-e', '--ecdsa-curve-name', metavar='CURVE',
default=formats.CURVE_NIST256,
@@ -98,21 +97,15 @@ def git_host(remote_name, attributes):
continue
url = matches[0].strip()
user, url = url.split('@', 1)
host, _ = url.split(':', 1) # skip unused path (1 key per user@host)
return '{}@{}'.format(user, host)
def ssh_sign(conn, label, blob):
"""Perform SSH signature using given hardware device connection."""
now = time.strftime('%Y-%m-%d %H:%M:%S')
return conn.sign_ssh_challenge(label=label, blob=blob, visual=now)
match = re.match('(?P<user>.*?)@(?P<host>.*?):(?P<path>.*)', url)
if match:
return '{user}@{host}'.format(**match.groupdict())
def run_server(conn, public_key, command, debug, timeout):
"""Common code for run_agent and run_git below."""
try:
signer = functools.partial(ssh_sign, conn=conn)
signer = conn.sign_ssh_challenge
public_key = formats.import_public_key(public_key)
log.info('using SSH public key: %s', public_key['fingerprint'])
handler = protocol.Handler(keys=[public_key], signer=signer,
@@ -123,6 +116,19 @@ def run_server(conn, public_key, command, debug, timeout):
log.info('server stopped')
def handle_connection_error(func):
"""Fail with non-zero exit code."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except IOError as e:
log.error('Connection error: %s', e)
return 1
return wrapper
@handle_connection_error
def run_agent(client_factory=client.Client):
"""Run ssh-agent using given hardware client factory."""
args = create_agent_parser().parse_args()
@@ -151,6 +157,7 @@ def run_agent(client_factory=client.Client):
debug=args.debug, timeout=args.timeout)
@handle_connection_error
def run_git(client_factory=client.Client):
"""Run git under ssh-agent using given hardware client factory."""
args = create_git_parser().parse_args()
@@ -159,7 +166,8 @@ def run_git(client_factory=client.Client):
with client_factory(curve=args.ecdsa_curve_name) as conn:
label = git_host(args.remote, ['pushurl', 'url'])
if not label:
log.error('Could not find "%s" remote in .git/config', args.remote)
log.error('Could not find "%s" SSH remote in .git/config',
args.remote)
return
public_key = conn.get_public_key(label=label)

View File

@@ -33,9 +33,8 @@ class Client(object):
return self
def __exit__(self, *args):
"""Forget PIN, shutdown screen and disconnect."""
"""Keep the session open (doesn't forget PIN)."""
log.info('disconnected from %s', self.device_name)
self.client.clear_session()
self.client.close()
def get_identity(self, label, index=0):
@@ -51,7 +50,7 @@ class Client(object):
label = identity_to_string(identity) # canonize key label
log.info('getting "%s" public key (%s) from %s...',
label, self.curve, self.device_name)
addr = _get_address(identity)
addr = get_address(identity)
node = self.client.get_public_node(n=addr,
ecdsa_curve_name=self.curve)
@@ -59,7 +58,7 @@ class Client(object):
vk = formats.decompress_pubkey(pubkey=pubkey, curve_name=self.curve)
return formats.export_public_key(vk=vk, label=label)
def sign_ssh_challenge(self, label, blob, visual=''):
def sign_ssh_challenge(self, label, blob):
"""Sign given blob using a private key, specified by the label."""
identity = self.get_identity(label=label)
msg = _parse_ssh_blob(blob)
@@ -68,7 +67,6 @@ class Client(object):
log.debug('nonce: %s', binascii.hexlify(msg['nonce']))
log.debug('fingerprint: %s', msg['public_key']['fingerprint'])
log.debug('hidden challenge size: %d bytes', len(blob))
log.debug('visual challenge size: %d bytes = %r', len(visual), visual)
log.info('please confirm user "%s" login to "%s" using %s...',
msg['user'], label, self.device_name)
@@ -76,7 +74,7 @@ class Client(object):
try:
result = self.client.sign_identity(identity=identity,
challenge_hidden=blob,
challenge_visual=visual,
challenge_visual='',
ecdsa_curve_name=self.curve)
except self.call_exception as e:
code, msg = e.args
@@ -129,7 +127,8 @@ def identity_to_string(identity):
return ''.join(result)
def _get_address(identity):
def get_address(identity, ecdh=False):
"""Compute BIP32 derivation address according to SLIP-0013/0017."""
index = struct.pack('<L', identity.index)
addr = index + identity_to_string(identity).encode('ascii')
log.debug('address string: %r', addr)
@@ -137,7 +136,8 @@ def _get_address(identity):
s = io.BytesIO(bytearray(digest))
hardened = 0x80000000
address_n = [13] + list(util.recv(s, '<LLLL'))
addr_0 = [13, 17][bool(ecdh)]
address_n = [addr_0] + list(util.recv(s, '<LLLL'))
return [(hardened | value) for value in address_n]

View File

@@ -44,33 +44,38 @@ def _load_client(name, client_type, hid_transport,
def _load_trezor():
# pylint: disable=import-error
from trezorlib.client import TrezorClient, CallException
from trezorlib.transport_hid import HidTransport
from trezorlib.messages_pb2 import PassphraseAck
from trezorlib.types_pb2 import IdentityType
return _load_client(name='Trezor',
client_type=TrezorClient,
hid_transport=HidTransport,
passphrase_ack=PassphraseAck,
identity_type=IdentityType,
required_version='>=1.3.4',
call_exception=CallException)
try:
from trezorlib.client import TrezorClient, CallException
from trezorlib.transport_hid import HidTransport
from trezorlib.messages_pb2 import PassphraseAck
from trezorlib.types_pb2 import IdentityType
return _load_client(name='Trezor',
client_type=TrezorClient,
hid_transport=HidTransport,
passphrase_ack=PassphraseAck,
identity_type=IdentityType,
required_version='>=1.4.0',
call_exception=CallException)
except ImportError:
log.exception('Missing module: install via "pip install trezor"')
def _load_keepkey():
# pylint: disable=import-error
from keepkeylib.client import KeepKeyClient, CallException
from keepkeylib.transport_hid import HidTransport
from keepkeylib.messages_pb2 import PassphraseAck
from keepkeylib.types_pb2 import IdentityType
return _load_client(name='KeepKey',
client_type=KeepKeyClient,
hid_transport=HidTransport,
passphrase_ack=PassphraseAck,
identity_type=IdentityType,
required_version='>=1.0.4',
call_exception=CallException)
try:
from keepkeylib.client import KeepKeyClient, CallException
from keepkeylib.transport_hid import HidTransport
from keepkeylib.messages_pb2 import PassphraseAck
from keepkeylib.types_pb2 import IdentityType
return _load_client(name='KeepKey',
client_type=KeepKeyClient,
hid_transport=HidTransport,
passphrase_ack=PassphraseAck,
identity_type=IdentityType,
required_version='>=1.0.4',
call_exception=CallException)
except ImportError:
log.exception('Missing module: install via "pip install keepkey"')
LOADERS = [
_load_trezor,
@@ -83,7 +88,9 @@ def load(loaders=None):
loaders = loaders if loaders is not None else LOADERS
device_list = []
for loader in loaders:
device_list.extend(loader())
device = loader()
if device:
device_list.extend(device)
if len(device_list) == 1:
return device_list[0]

View File

@@ -12,8 +12,8 @@ from . import util
log = logging.getLogger(__name__)
# Supported ECDSA curves
CURVE_NIST256 = b'nist256p1'
CURVE_ED25519 = b'ed25519'
CURVE_NIST256 = 'nist256p1'
CURVE_ED25519 = 'ed25519'
SUPPORTED_CURVES = {CURVE_NIST256, CURVE_ED25519}
# SSH key types
@@ -41,7 +41,7 @@ def parse_pubkey(blob):
"""
Parse SSH public key from given blob.
Cnstruct a verifier for ECDSA signatures.
Construct a verifier for ECDSA signatures.
The verifier returns the signatures in the required SSH format.
Currently, NIST256P1 and ED25519 elliptic curves are supported.
"""

View File

@@ -0,0 +1,9 @@
"""
TREZOR support for ECDSA GPG signatures.
See these links for more details:
- https://www.gnupg.org/faq/whats-new-in-2.1.html
- https://tools.ietf.org/html/rfc4880
- https://tools.ietf.org/html/rfc6637
- https://tools.ietf.org/html/draft-irtf-cfrg-eddsa-05
"""

99
trezor_agent/gpg/__main__.py Executable file
View File

@@ -0,0 +1,99 @@
#!/usr/bin/env python
"""Create signatures and export public keys for GPG using TREZOR."""
import argparse
import contextlib
import logging
import os
import sys
import time
from . import agent, encode, keyring, protocol
from .. import server
log = logging.getLogger(__name__)
def run_create(args):
"""Generate a new pubkey for a new/existing GPG identity."""
user_id = os.environ['TREZOR_GPG_USER_ID']
log.warning('NOTE: in order to re-generate the exact same GPG key later, '
'run this command with "--time=%d" commandline flag (to set '
'the timestamp of the GPG key manually).', args.time)
conn = encode.HardwareSigner(user_id=user_id,
curve_name=args.ecdsa_curve)
verifying_key = conn.pubkey(ecdh=False)
decryption_key = conn.pubkey(ecdh=True)
if args.subkey:
primary_bytes = keyring.export_public_key(user_id=user_id)
# subkey for signing
signing_key = protocol.PublicKey(
curve_name=args.ecdsa_curve, created=args.time,
verifying_key=verifying_key, ecdh=False)
# subkey for encryption
encryption_key = protocol.PublicKey(
curve_name=args.ecdsa_curve, created=args.time,
verifying_key=decryption_key, ecdh=True)
result = encode.create_subkey(primary_bytes=primary_bytes,
pubkey=signing_key,
signer_func=conn.sign)
result = encode.create_subkey(primary_bytes=result,
pubkey=encryption_key,
signer_func=conn.sign)
else:
# primary key for signing
primary = protocol.PublicKey(
curve_name=args.ecdsa_curve, created=args.time,
verifying_key=verifying_key, ecdh=False)
# subkey for encryption
subkey = protocol.PublicKey(
curve_name=args.ecdsa_curve, created=args.time,
verifying_key=decryption_key, ecdh=True)
result = encode.create_primary(user_id=user_id,
pubkey=primary,
signer_func=conn.sign)
result = encode.create_subkey(primary_bytes=result,
pubkey=subkey,
signer_func=conn.sign)
sys.stdout.write(protocol.armor(result, 'PUBLIC KEY BLOCK'))
def run_agent(args): # pylint: disable=unused-argument
"""Run a simple GPG-agent server."""
sock_path = keyring.get_agent_sock_path()
with server.unix_domain_socket_server(sock_path) as sock:
for conn in agent.yield_connections(sock):
with contextlib.closing(conn):
agent.handle_connection(conn)
def main():
"""Main function."""
p = argparse.ArgumentParser()
p.add_argument('-v', '--verbose', action='store_true', default=False)
subparsers = p.add_subparsers()
subparsers.required = True
subparsers.dest = 'command'
create_cmd = subparsers.add_parser('create')
create_cmd.add_argument('-s', '--subkey', action='store_true', default=False)
create_cmd.add_argument('-e', '--ecdsa-curve', default='nist256p1')
create_cmd.add_argument('-t', '--time', type=int, default=int(time.time()))
create_cmd.set_defaults(run=run_create)
agent_cmd = subparsers.add_parser('agent')
agent_cmd.set_defaults(run=run_agent)
args = p.parse_args()
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO,
format='%(asctime)s %(levelname)-10s %(message)s')
log.warning('This GPG tool is still in EXPERIMENTAL mode, '
'so please note that the API and features may '
'change without backwards compatibility!')
args.run(args)
if __name__ == '__main__':
main()

136
trezor_agent/gpg/agent.py Normal file
View File

@@ -0,0 +1,136 @@
"""GPG-agent utilities."""
import binascii
import contextlib
import logging
import os
from . import decode, encode, keyring
from .. import util
log = logging.getLogger(__name__)
def yield_connections(sock):
"""Run a server on the specified socket."""
while True:
log.debug('waiting for connection on %s', sock.getsockname())
try:
conn, _ = sock.accept()
except KeyboardInterrupt:
return
conn.settimeout(None)
log.debug('accepted connection on %s', sock.getsockname())
yield conn
def serialize(data):
"""Serialize data according to ASSUAN protocol."""
for c in ['%', '\n', '\r']:
data = data.replace(c, '%{:02X}'.format(ord(c)))
return data
def sig_encode(r, s):
"""Serialize ECDSA signature data into GPG S-expression."""
r = serialize(util.num2bytes(r, 32))
s = serialize(util.num2bytes(s, 32))
return '(7:sig-val(5:ecdsa(1:r32:{})(1:s32:{})))'.format(r, s)
def pksign(keygrip, digest, algo):
"""Sign a message digest using a private EC key."""
assert algo == '8'
user_id = os.environ['TREZOR_GPG_USER_ID']
pubkey_dict = decode.load_public_key(
pubkey_bytes=keyring.export_public_key(user_id=user_id),
use_custom=True, ecdh=False)
pubkey, conn = encode.load_from_public_key(pubkey_dict=pubkey_dict)
with contextlib.closing(conn):
assert pubkey.keygrip == binascii.unhexlify(keygrip)
r, s = conn.sign(binascii.unhexlify(digest))
result = sig_encode(r, s)
log.debug('result: %r', result)
return result
def _serialize_point(data):
data = '{}:'.format(len(data)) + data
# https://www.gnupg.org/documentation/manuals/assuan/Server-responses.html
for c in ['%', '\n', '\r']:
data = data.replace(c, '%{:02X}'.format(ord(c)))
return '(5:value' + data + ')'
def parse_ecdh(line):
"""Parse ECDH request and return remote public key."""
prefix, line = line.split(' ', 1)
assert prefix == 'D'
exp, leftover = keyring.parse(keyring.unescape(line))
log.debug('ECDH s-exp: %r', exp)
assert not leftover
label, exp = exp
assert label == b'enc-val'
assert exp[0] == b'ecdh'
items = exp[1:]
log.debug('ECDH parameters: %r', items)
return dict(items)['e']
def pkdecrypt(keygrip, conn):
"""Handle decryption using ECDH."""
for msg in [b'S INQUIRE_MAXLEN 4096', b'INQUIRE CIPHERTEXT']:
keyring.sendline(conn, msg)
line = keyring.recvline(conn)
assert keyring.recvline(conn) == b'END'
remote_pubkey = parse_ecdh(line)
user_id = os.environ['TREZOR_GPG_USER_ID']
local_pubkey = decode.load_public_key(
pubkey_bytes=keyring.export_public_key(user_id=user_id),
use_custom=True, ecdh=True)
pubkey, conn = encode.load_from_public_key(pubkey_dict=local_pubkey)
with contextlib.closing(conn):
assert pubkey.keygrip == binascii.unhexlify(keygrip)
shared_secret = conn.ecdh(remote_pubkey)
assert len(shared_secret) == 65
assert shared_secret[:1] == b'\x04'
return _serialize_point(shared_secret)
def handle_connection(conn):
"""Handle connection from GPG binary using the ASSUAN protocol."""
keygrip = None
digest = None
algo = None
version = keyring.gpg_version()
keyring.sendline(conn, b'OK')
for line in keyring.iterlines(conn):
parts = line.split(' ')
command = parts[0]
args = parts[1:]
if command in {'RESET', 'OPTION', 'HAVEKEY', 'SETKEYDESC'}:
pass # reply with OK
elif command == 'GETINFO':
keyring.sendline(conn, b'D ' + version)
elif command == 'AGENT_ID':
keyring.sendline(conn, b'D TREZOR')
elif command in {'SIGKEY', 'SETKEY'}:
keygrip, = args
elif command == 'SETHASH':
algo, digest = args
elif command == 'PKSIGN':
sig = pksign(keygrip, digest, algo)
keyring.sendline(conn, b'D ' + sig)
elif command == 'PKDECRYPT':
sec = pkdecrypt(keygrip, conn)
keyring.sendline(conn, b'D ' + sec)
elif command == 'BYE':
return
else:
log.error('unknown request: %r', line)
return
keyring.sendline(conn, b'OK')

390
trezor_agent/gpg/decode.py Normal file
View File

@@ -0,0 +1,390 @@
"""Decoders for GPG v2 data structures."""
import base64
import copy
import functools
import hashlib
import io
import logging
import struct
import ecdsa
import ed25519
from . import protocol
from .. import util
log = logging.getLogger(__name__)
def parse_subpackets(s):
"""See https://tools.ietf.org/html/rfc4880#section-5.2.3.1 for details."""
subpackets = []
total_size = s.readfmt('>H')
data = s.read(total_size)
s = util.Reader(io.BytesIO(data))
while True:
try:
first = s.readfmt('B')
except EOFError:
break
if first < 192:
subpacket_len = first
elif first < 255:
subpacket_len = ((first - 192) << 8) + s.readfmt('B') + 192
else: # first == 255
subpacket_len = s.readfmt('>L')
subpackets.append(s.read(subpacket_len))
return subpackets
def parse_mpi(s):
"""See https://tools.ietf.org/html/rfc4880#section-3.2 for details."""
bits = s.readfmt('>H')
blob = bytearray(s.read(int((bits + 7) // 8)))
return sum(v << (8 * i) for i, v in enumerate(reversed(blob)))
def parse_mpis(s, n):
"""Parse multiple MPIs from stream."""
return [parse_mpi(s) for _ in range(n)]
def _parse_nist256p1_verifier(mpi):
prefix, x, y = util.split_bits(mpi, 4, 256, 256)
assert prefix == 4
point = ecdsa.ellipticcurve.Point(curve=ecdsa.NIST256p.curve,
x=x, y=y)
vk = ecdsa.VerifyingKey.from_public_point(
point=point, curve=ecdsa.curves.NIST256p,
hashfunc=hashlib.sha256)
def _nist256p1_verify(signature, digest):
result = vk.verify_digest(signature=signature,
digest=digest,
sigdecode=lambda rs, order: rs)
log.debug('nist256p1 ECDSA signature is OK (%s)', result)
return _nist256p1_verify, vk
def _parse_ed25519_verifier(mpi):
prefix, value = util.split_bits(mpi, 8, 256)
assert prefix == 0x40
vk = ed25519.VerifyingKey(util.num2bytes(value, size=32))
def _ed25519_verify(signature, digest):
sig = b''.join(util.num2bytes(val, size=32)
for val in signature)
result = vk.verify(sig, digest)
log.debug('ed25519 ECDSA signature is OK (%s)', result)
return _ed25519_verify, vk
SUPPORTED_CURVES = {
b'\x2A\x86\x48\xCE\x3D\x03\x01\x07': _parse_nist256p1_verifier,
b'\x2B\x06\x01\x04\x01\xDA\x47\x0F\x01': _parse_ed25519_verifier,
}
RSA_ALGO_IDS = {1, 2, 3}
ELGAMAL_ALGO_ID = 16
DSA_ALGO_ID = 17
ECDSA_ALGO_IDS = {18, 19, 22} # {ecdsa, nist256, ed25519}
def _parse_literal(stream):
"""See https://tools.ietf.org/html/rfc4880#section-5.9 for details."""
p = {'type': 'literal'}
p['format'] = stream.readfmt('c')
filename_len = stream.readfmt('B')
p['filename'] = stream.read(filename_len)
p['date'] = stream.readfmt('>L')
p['content'] = stream.read()
p['_to_hash'] = p['content']
return p
def _parse_embedded_signatures(subpackets):
for packet in subpackets:
data = bytearray(packet)
if data[0] == 32:
# https://tools.ietf.org/html/rfc4880#section-5.2.3.26
stream = io.BytesIO(data[1:])
yield _parse_signature(util.Reader(stream))
def _parse_signature(stream):
"""See https://tools.ietf.org/html/rfc4880#section-5.2 for details."""
p = {'type': 'signature'}
to_hash = io.BytesIO()
with stream.capture(to_hash):
p['version'] = stream.readfmt('B')
p['sig_type'] = stream.readfmt('B')
p['pubkey_alg'] = stream.readfmt('B')
p['hash_alg'] = stream.readfmt('B')
p['hashed_subpackets'] = parse_subpackets(stream)
# https://tools.ietf.org/html/rfc4880#section-5.2.4
tail_to_hash = b'\x04\xff' + struct.pack('>L', to_hash.tell())
p['_to_hash'] = to_hash.getvalue() + tail_to_hash
p['unhashed_subpackets'] = parse_subpackets(stream)
embedded = list(_parse_embedded_signatures(p['unhashed_subpackets']))
if embedded:
log.debug('embedded sigs: %s', embedded)
p['embedded'] = embedded
p['_is_custom'] = (protocol.CUSTOM_SUBPACKET in p['unhashed_subpackets'])
p['hash_prefix'] = stream.readfmt('2s')
if p['pubkey_alg'] in ECDSA_ALGO_IDS:
p['sig'] = (parse_mpi(stream), parse_mpi(stream))
elif p['pubkey_alg'] in RSA_ALGO_IDS: # RSA
p['sig'] = (parse_mpi(stream),)
elif p['pubkey_alg'] == DSA_ALGO_ID:
p['sig'] = (parse_mpi(stream), parse_mpi(stream))
else:
log.error('unsupported public key algo: %d', p['pubkey_alg'])
assert not stream.read()
return p
def _parse_pubkey(stream, packet_type='pubkey'):
"""See https://tools.ietf.org/html/rfc4880#section-5.5 for details."""
p = {'type': packet_type}
packet = io.BytesIO()
with stream.capture(packet):
p['version'] = stream.readfmt('B')
p['created'] = stream.readfmt('>L')
p['algo'] = stream.readfmt('B')
if p['algo'] in ECDSA_ALGO_IDS:
log.debug('parsing elliptic curve key')
# https://tools.ietf.org/html/rfc6637#section-11
oid_size = stream.readfmt('B')
oid = stream.read(oid_size)
assert oid in SUPPORTED_CURVES, util.hexlify(oid)
parser = SUPPORTED_CURVES[oid]
mpi = parse_mpi(stream)
log.debug('mpi: %x (%d bits)', mpi, mpi.bit_length())
p['verifier'], p['verifying_key'] = parser(mpi)
leftover = stream.read()
if leftover:
leftover = io.BytesIO(leftover)
# https://tools.ietf.org/html/rfc6637#section-8
# should be b'\x03\x01\x08\x07': SHA256 + AES128
size, = util.readfmt(leftover, 'B')
p['kdf'] = leftover.read(size)
assert not leftover.read()
elif p['algo'] == DSA_ALGO_ID:
log.warning('DSA signatures are not verified')
parse_mpis(stream, n=4)
elif p['algo'] == ELGAMAL_ALGO_ID:
log.warning('ElGamal signatures are not verified')
parse_mpis(stream, n=3)
else: # assume RSA
log.warning('RSA signatures are not verified')
parse_mpis(stream, n=2)
assert not stream.read()
# https://tools.ietf.org/html/rfc4880#section-12.2
packet_data = packet.getvalue()
data_to_hash = (b'\x99' + struct.pack('>H', len(packet_data)) +
packet_data)
p['key_id'] = hashlib.sha1(data_to_hash).digest()[-8:]
p['_to_hash'] = data_to_hash
log.debug('key ID: %s', util.hexlify(p['key_id']))
return p
_parse_subkey = functools.partial(_parse_pubkey, packet_type='subkey')
def _parse_user_id(stream, packet_type='user_id'):
"""See https://tools.ietf.org/html/rfc4880#section-5.11 for details."""
value = stream.read()
to_hash = b'\xb4' + util.prefix_len('>L', value)
return {'type': packet_type, 'value': value, '_to_hash': to_hash}
# User attribute is handled as an opaque user ID
_parse_attribute = functools.partial(_parse_user_id,
packet_type='user_attribute')
PACKET_TYPES = {
2: _parse_signature,
6: _parse_pubkey,
11: _parse_literal,
13: _parse_user_id,
14: _parse_subkey,
17: _parse_attribute,
}
def parse_packets(stream):
"""
Support iterative parsing of available GPG packets.
See https://tools.ietf.org/html/rfc4880#section-4.2 for details.
"""
reader = util.Reader(stream)
while True:
try:
value = reader.readfmt('B')
except EOFError:
return
log.debug('prefix byte: %s', bin(value))
assert util.bit(value, 7) == 1
tag = util.low_bits(value, 6)
if util.bit(value, 6) == 0:
length_type = util.low_bits(tag, 2)
tag = tag >> 2
fmt = {0: '>B', 1: '>H', 2: '>L'}[length_type]
packet_size = reader.readfmt(fmt)
else:
first = reader.readfmt('B')
if first < 192:
packet_size = first
elif first < 224:
packet_size = ((first - 192) << 8) + reader.readfmt('B') + 192
elif first == 255:
packet_size = reader.readfmt('>L')
else:
log.error('Partial Body Lengths unsupported')
log.debug('packet length: %d', packet_size)
packet_data = reader.read(packet_size)
packet_type = PACKET_TYPES.get(tag)
if packet_type is not None:
p = packet_type(util.Reader(io.BytesIO(packet_data)))
p['tag'] = tag
else:
p = {'type': 'unknown', 'tag': tag, 'raw': packet_data}
log.debug('packet "%s": %s', p['type'], p)
yield p
def digest_packets(packets, hasher):
"""Compute digest on specified packets, according to '_to_hash' field."""
data_to_hash = io.BytesIO()
for p in packets:
data_to_hash.write(p['_to_hash'])
hasher.update(data_to_hash.getvalue())
return hasher.digest()
def collect_packets(packets, types_to_collect):
"""Collect specified packet types into their leading packet."""
packet = None
result = []
for p in packets:
if p['type'] in types_to_collect:
packet.setdefault(p['type'], []).append(p)
else:
packet = copy.deepcopy(p)
result.append(packet)
return result
def parse_public_keys(stream):
"""Parse GPG public key into hierarchy of packets."""
packets = list(parse_packets(stream))
packets = collect_packets(packets, {'signature'})
packets = collect_packets(packets, {'user_id', 'user_attribute'})
packets = collect_packets(packets, {'subkey'})
return packets
HASH_ALGORITHMS = {
1: 'md5',
2: 'sha1',
3: 'ripemd160',
8: 'sha256',
9: 'sha384',
10: 'sha512',
11: 'sha224',
}
def load_public_key(pubkey_bytes, use_custom=False, ecdh=False):
"""Parse and validate GPG public key from an input stream."""
stream = io.BytesIO(pubkey_bytes)
packets = list(parse_packets(stream))
pubkey, userid, signature = packets[:3]
packets = packets[3:]
hash_alg = HASH_ALGORITHMS.get(signature['hash_alg'])
if hash_alg is not None:
digest = digest_packets(packets=[pubkey, userid, signature],
hasher=hashlib.new(hash_alg))
assert signature['hash_prefix'] == digest[:2]
log.debug('loaded public key "%s"', userid['value'])
if hash_alg is not None and pubkey.get('verifier'):
verify_digest(pubkey=pubkey, digest=digest,
signature=signature['sig'], label='GPG public key')
else:
log.warning('public key %s is not verified!',
util.hexlify(pubkey['key_id']))
packet = pubkey
while use_custom:
if packet['type'] in ('pubkey', 'subkey') and signature['_is_custom']:
if ecdh == (packet['algo'] == protocol.ECDH_ALGO_ID):
log.debug('found custom %s', packet['type'])
break
while packets[1]['type'] != 'signature':
packets = packets[1:]
packet, signature = packets[:2]
packets = packets[2:]
packet['user_id'] = userid['value']
packet['_is_custom'] = signature['_is_custom']
return packet
def load_signature(stream, original_data):
"""Load signature from stream, and compute GPG digest for verification."""
signature, = list(parse_packets((stream)))
hash_alg = HASH_ALGORITHMS[signature['hash_alg']]
digest = digest_packets([{'_to_hash': original_data}, signature],
hasher=hashlib.new(hash_alg))
assert signature['hash_prefix'] == digest[:2]
return signature, digest
def verify_digest(pubkey, digest, signature, label):
"""Verify a digest signature from a specified public key."""
verifier = pubkey['verifier']
try:
verifier(signature, digest)
log.debug('%s is OK', label)
except ecdsa.keys.BadSignatureError:
log.error('Bad %s!', label)
raise ValueError('Invalid ECDSA signature for {}'.format(label))
def remove_armor(armored_data):
"""Decode armored data into its binary form."""
stream = io.BytesIO(armored_data)
lines = stream.readlines()[3:-1]
data = base64.b64decode(b''.join(lines))
payload, checksum = data[:-3], data[-3:]
assert util.crc24(payload) == checksum
return payload
def verify(pubkey, signature, original_data):
"""Verify correctness of public key and signature."""
stream = io.BytesIO(remove_armor(signature))
signature, digest = load_signature(stream, original_data)
verify_digest(pubkey=pubkey, digest=digest,
signature=signature['sig'], label='GPG signature')

193
trezor_agent/gpg/encode.py Normal file
View File

@@ -0,0 +1,193 @@
"""Create GPG ECDSA signatures and public keys using TREZOR device."""
import logging
import time
from . import decode, keyring, protocol
from .. import client, factory, formats, util
log = logging.getLogger(__name__)
class HardwareSigner(object):
"""Sign messages and get public keys from a hardware device."""
def __init__(self, user_id, curve_name):
"""Connect to the device and retrieve required public key."""
self.client_wrapper = factory.load()
self.identity = self.client_wrapper.identity_type()
self.identity.proto = 'gpg'
self.identity.host = user_id
self.curve_name = curve_name
def pubkey(self, ecdh=False):
"""Return public key as VerifyingKey object."""
addr = client.get_address(identity=self.identity, ecdh=ecdh)
public_node = self.client_wrapper.connection.get_public_node(
n=addr, ecdsa_curve_name=self.curve_name)
return formats.decompress_pubkey(
pubkey=public_node.node.public_key,
curve_name=self.curve_name)
def sign(self, digest):
"""Sign the digest and return a serialized signature."""
result = self.client_wrapper.connection.sign_identity(
identity=self.identity,
challenge_hidden=digest,
challenge_visual='',
ecdsa_curve_name=self.curve_name)
assert result.signature[:1] == b'\x00'
sig = result.signature[1:]
return (util.bytes2num(sig[:32]), util.bytes2num(sig[32:]))
def ecdh(self, pubkey):
"""Derive shared secret using ECDH from remote public key."""
result = self.client_wrapper.connection.get_ecdh_session_key(
identity=self.identity,
peer_public_key=pubkey,
ecdsa_curve_name=self.curve_name)
assert len(result.session_key) == 65
assert result.session_key[:1] == b'\x04'
return result.session_key
def close(self):
"""Close the connection to the device."""
self.client_wrapper.connection.clear_session()
self.client_wrapper.connection.close()
class AgentSigner(object):
"""Sign messages and get public keys using gpg-agent tool."""
def __init__(self, user_id):
"""Connect to the agent and retrieve required public key."""
self.sock = keyring.connect_to_agent()
self.keygrip = keyring.get_keygrip(user_id)
def sign(self, digest):
"""Sign the digest and return an ECDSA/RSA/DSA signature."""
return keyring.sign_digest(sock=self.sock,
keygrip=self.keygrip, digest=digest)
def close(self):
"""Close the connection to gpg-agent."""
self.sock.close()
def _time_format(t):
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(t))
def create_primary(user_id, pubkey, signer_func):
"""Export new primary GPG public key, ready for "gpg2 --import"."""
pubkey_packet = protocol.packet(tag=6, blob=pubkey.data())
user_id_packet = protocol.packet(tag=13,
blob=user_id.encode('ascii'))
data_to_sign = (pubkey.data_to_hash() +
user_id_packet[:1] +
util.prefix_len('>L', user_id.encode('ascii')))
log.info('creating primary GPG key "%s"', user_id)
hashed_subpackets = [
protocol.subpacket_time(pubkey.created), # signature time
# https://tools.ietf.org/html/rfc4880#section-5.2.3.7
protocol.subpacket_byte(0x0B, 9), # preferred symmetric algo (AES-256)
# https://tools.ietf.org/html/rfc4880#section-5.2.3.4
protocol.subpacket_byte(0x1B, 1 | 2), # key flags (certify & sign)
# https://tools.ietf.org/html/rfc4880#section-5.2.3.21
protocol.subpacket_byte(0x15, 8), # preferred hash (SHA256)
# https://tools.ietf.org/html/rfc4880#section-5.2.3.8
protocol.subpacket_byte(0x16, 0), # preferred compression (none)
# https://tools.ietf.org/html/rfc4880#section-5.2.3.9
protocol.subpacket_byte(0x17, 0x80) # key server prefs (no-modify)
# https://tools.ietf.org/html/rfc4880#section-5.2.3.17
]
unhashed_subpackets = [
protocol.subpacket(16, pubkey.key_id()), # issuer key id
protocol.CUSTOM_SUBPACKET]
log.info('confirm signing with primary key')
signature = protocol.make_signature(
signer_func=signer_func,
public_algo=pubkey.algo_id,
data_to_sign=data_to_sign,
sig_type=0x13, # user id & public key
hashed_subpackets=hashed_subpackets,
unhashed_subpackets=unhashed_subpackets)
sign_packet = protocol.packet(tag=2, blob=signature)
return pubkey_packet + user_id_packet + sign_packet
def create_subkey(primary_bytes, pubkey, signer_func):
"""Export new subkey to GPG primary key."""
subkey_packet = protocol.packet(tag=14, blob=pubkey.data())
primary = decode.load_public_key(primary_bytes)
log.info('adding subkey to primary GPG key "%s"', primary['user_id'])
data_to_sign = primary['_to_hash'] + pubkey.data_to_hash()
if pubkey.ecdh:
embedded_sig = None
else:
# Primary Key Binding Signature
hashed_subpackets = [
protocol.subpacket_time(pubkey.created)] # signature time
unhashed_subpackets = [
protocol.subpacket(16, pubkey.key_id())] # issuer key id
log.info('confirm signing with new subkey')
embedded_sig = protocol.make_signature(
signer_func=signer_func,
data_to_sign=data_to_sign,
public_algo=pubkey.algo_id,
sig_type=0x19,
hashed_subpackets=hashed_subpackets,
unhashed_subpackets=unhashed_subpackets)
# Subkey Binding Signature
# Key flags: https://tools.ietf.org/html/rfc4880#section-5.2.3.21
# (certify & sign) (encrypt)
flags = (2) if (not pubkey.ecdh) else (4 | 8)
hashed_subpackets = [
protocol.subpacket_time(pubkey.created), # signature time
protocol.subpacket_byte(0x1B, flags)]
unhashed_subpackets = []
unhashed_subpackets.append(protocol.subpacket(16, primary['key_id']))
if embedded_sig is not None:
unhashed_subpackets.append(protocol.subpacket(32, embedded_sig))
unhashed_subpackets.append(protocol.CUSTOM_SUBPACKET)
log.info('confirm signing with primary key')
if not primary['_is_custom']:
signer_func = AgentSigner(primary['user_id']).sign
signature = protocol.make_signature(
signer_func=signer_func,
data_to_sign=data_to_sign,
public_algo=primary['algo'],
sig_type=0x18,
hashed_subpackets=hashed_subpackets,
unhashed_subpackets=unhashed_subpackets)
sign_packet = protocol.packet(tag=2, blob=signature)
return primary_bytes + subkey_packet + sign_packet
def load_from_public_key(pubkey_dict):
"""Load correct public key from the device."""
user_id = pubkey_dict['user_id']
created = pubkey_dict['created']
curve_name = protocol.find_curve_by_algo_id(pubkey_dict['algo'])
assert curve_name in formats.SUPPORTED_CURVES
ecdh = (pubkey_dict['algo'] == protocol.ECDH_ALGO_ID)
conn = HardwareSigner(user_id, curve_name=curve_name)
pubkey = protocol.PublicKey(
curve_name=curve_name, created=created,
verifying_key=conn.pubkey(ecdh=ecdh), ecdh=ecdh)
assert pubkey.key_id() == pubkey_dict['key_id']
log.info('%s created at %s for "%s"',
pubkey, _time_format(pubkey.created), user_id)
return pubkey, conn

205
trezor_agent/gpg/keyring.py Normal file
View File

@@ -0,0 +1,205 @@
"""Tools for doing signature using gpg-agent."""
import binascii
import io
import logging
import os
import re
import socket
import subprocess
from .. import util
log = logging.getLogger(__name__)
def get_agent_sock_path(sp=subprocess):
"""Parse gpgconf output to find out GPG agent UNIX socket path."""
lines = sp.check_output(['gpgconf', '--list-dirs']).strip().split('\n')
dirs = dict(line.split(':', 1) for line in lines)
return dirs['agent-socket']
def connect_to_agent(sp=subprocess):
"""Connect to GPG agent's UNIX socket."""
sock_path = get_agent_sock_path(sp=sp)
sp.check_call(['gpg-connect-agent', '/bye'])
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(sock_path)
return sock
def communicate(sock, msg):
"""Send a message and receive a single line."""
sendline(sock, msg.encode('ascii'))
return recvline(sock)
def sendline(sock, msg):
"""Send a binary message, followed by EOL."""
log.debug('<- %r', msg)
sock.sendall(msg + b'\n')
def recvline(sock):
"""Receive a single line from the socket."""
reply = io.BytesIO()
while True:
c = sock.recv(1)
if not c:
return None # socket is closed
if c == b'\n':
break
reply.write(c)
result = reply.getvalue()
log.debug('-> %r', result)
return result
def iterlines(conn):
"""Iterate over input, split by lines."""
while True:
line = recvline(conn)
if line is None:
break
yield line
def unescape(s):
"""Unescape ASSUAN message (0xAB <-> '%AB')."""
s = bytearray(s)
i = 0
while i < len(s):
if s[i] == ord('%'):
hex_bytes = bytes(s[i+1:i+3])
value = int(hex_bytes.decode('ascii'), 16)
s[i:i+3] = [value]
i += 1
return bytes(s)
def parse_term(s):
"""Parse single s-expr term from bytes."""
size, s = s.split(b':', 1)
size = int(size)
return s[:size], s[size:]
def parse(s):
"""Parse full s-expr from bytes."""
if s.startswith(b'('):
s = s[1:]
name, s = parse_term(s)
values = [name]
while not s.startswith(b')'):
value, s = parse(s)
values.append(value)
return values, s[1:]
else:
return parse_term(s)
def _parse_ecdsa_sig(args):
(r, sig_r), (s, sig_s) = args
assert r == b'r'
assert s == b's'
return (util.bytes2num(sig_r),
util.bytes2num(sig_s))
# DSA and EDDSA happen to have the same structure as ECDSA signatures
_parse_dsa_sig = _parse_ecdsa_sig
_parse_eddsa_sig = _parse_ecdsa_sig
def _parse_rsa_sig(args):
(s, sig_s), = args
assert s == b's'
return (util.bytes2num(sig_s),)
def parse_sig(sig):
"""Parse signature integer values from s-expr."""
label, sig = sig
assert label == b'sig-val'
algo_name = sig[0]
parser = {b'rsa': _parse_rsa_sig,
b'ecdsa': _parse_ecdsa_sig,
b'eddsa': _parse_eddsa_sig,
b'dsa': _parse_dsa_sig}[algo_name]
return parser(args=sig[1:])
def sign_digest(sock, keygrip, digest, sp=subprocess, environ=None):
"""Sign a digest using specified key using GPG agent."""
hash_algo = 8 # SHA256
assert len(digest) == 32
assert communicate(sock, 'RESET').startswith(b'OK')
ttyname = sp.check_output(['tty']).strip()
options = ['ttyname={}'.format(ttyname)] # set TTY for passphrase entry
display = (environ or os.environ).get('DISPLAY')
if display is not None:
options.append('display={}'.format(display))
for opt in options:
assert communicate(sock, 'OPTION {}'.format(opt)) == b'OK'
assert communicate(sock, 'SIGKEY {}'.format(keygrip)) == b'OK'
hex_digest = binascii.hexlify(digest).upper().decode('ascii')
assert communicate(sock, 'SETHASH {} {}'.format(hash_algo,
hex_digest)) == b'OK'
assert communicate(sock, 'SETKEYDESC '
'Sign+a+new+TREZOR-based+subkey') == b'OK'
assert communicate(sock, 'PKSIGN') == b'OK'
line = recvline(sock).strip()
line = unescape(line)
log.debug('unescaped: %r', line)
prefix, sig = line.split(b' ', 1)
if prefix != b'D':
raise ValueError(prefix)
sig, leftover = parse(sig)
assert not leftover, leftover
return parse_sig(sig)
def gpg_command(args, env=None):
"""Prepare common GPG command line arguments."""
if env is None:
env = os.environ
cmd = ['gpg2']
homedir = env.get('GNUPGHOME')
if homedir:
cmd.extend(['--homedir', homedir])
return cmd + args
def get_keygrip(user_id, sp=subprocess):
"""Get a keygrip of the primary GPG key of the specified user."""
args = gpg_command(['--list-keys', '--with-keygrip', user_id])
output = sp.check_output(args).decode('ascii')
return re.findall(r'Keygrip = (\w+)', output)[0]
def gpg_version(sp=subprocess):
"""Get a keygrip of the primary GPG key of the specified user."""
args = gpg_command(['--version'])
output = sp.check_output(args).decode('ascii')
line = output.split(b'\n')[0] # b'gpg (GnuPG) 2.1.11'
return line.split(b' ')[-1] # b'2.1.11'
def export_public_key(user_id, sp=subprocess):
"""Export GPG public key for specified `user_id`."""
args = gpg_command(['--export', user_id])
result = sp.check_output(args=args)
if not result:
log.error('could not find public key %r in local GPG keyring', user_id)
raise KeyError(user_id)
return result

View File

@@ -0,0 +1,252 @@
"""GPG protocol utilities."""
import base64
import hashlib
import logging
import struct
from .. import formats, util
log = logging.getLogger(__name__)
def packet(tag, blob):
"""Create small GPG packet."""
assert len(blob) < 2**32
if len(blob) < 2**8:
length_type = 0
elif len(blob) < 2**16:
length_type = 1
else:
length_type = 2
fmt = ['>B', '>H', '>L'][length_type]
leading_byte = 0x80 | (tag << 2) | (length_type)
return struct.pack('>B', leading_byte) + util.prefix_len(fmt, blob)
def subpacket(subpacket_type, fmt, *values):
"""Create GPG subpacket."""
blob = struct.pack(fmt, *values) if values else fmt
return struct.pack('>B', subpacket_type) + blob
def subpacket_long(subpacket_type, value):
"""Create GPG subpacket with 32-bit unsigned integer."""
return subpacket(subpacket_type, '>L', value)
def subpacket_time(value):
"""Create GPG subpacket with time in seconds (since Epoch)."""
return subpacket_long(2, value)
def subpacket_byte(subpacket_type, value):
"""Create GPG subpacket with 8-bit unsigned integer."""
return subpacket(subpacket_type, '>B', value)
def subpacket_prefix_len(item):
"""Prefix subpacket length according to RFC 4880 section-5.2.3.1."""
n = len(item)
if n >= 8384:
prefix = b'\xFF' + struct.pack('>L', n)
elif n >= 192:
n = n - 192
prefix = struct.pack('BB', (n // 256) + 192, n % 256)
else:
prefix = struct.pack('B', n)
return prefix + item
def subpackets(*items):
"""Serialize several GPG subpackets."""
prefixed = [subpacket_prefix_len(item) for item in items]
return util.prefix_len('>H', b''.join(prefixed))
def mpi(value):
"""Serialize multipresicion integer using GPG format."""
bits = value.bit_length()
data_size = (bits + 7) // 8
data_bytes = bytearray(data_size)
for i in range(data_size):
data_bytes[i] = value & 0xFF
value = value >> 8
data_bytes.reverse()
return struct.pack('>H', bits) + bytes(data_bytes)
def _serialize_nist256(vk):
return mpi((4 << 512) |
(vk.pubkey.point.x() << 256) |
(vk.pubkey.point.y()))
def _serialize_ed25519(vk):
return mpi((0x40 << 256) |
util.bytes2num(vk.to_bytes()))
def _compute_keygrip(params):
parts = []
for name, value in params:
exp = '{}:{}{}:'.format(len(name), name, len(value))
parts.append(b'(' + exp.encode('ascii') + value + b')')
return hashlib.sha1(b''.join(parts)).digest()
def _keygrip_nist256(vk):
curve = vk.curve.curve
gen = vk.curve.generator
g = (4 << 512) | (gen.x() << 256) | gen.y()
point = vk.pubkey.point
q = (4 << 512) | (point.x() << 256) | point.y()
return _compute_keygrip([
['p', util.num2bytes(curve.p(), size=32)],
['a', util.num2bytes(curve.a() % curve.p(), size=32)],
['b', util.num2bytes(curve.b() % curve.p(), size=32)],
['g', util.num2bytes(g, size=65)],
['n', util.num2bytes(vk.curve.order, size=32)],
['q', util.num2bytes(q, size=65)],
])
def _keygrip_ed25519(vk):
# pylint: disable=line-too-long
return _compute_keygrip([
['p', util.num2bytes(0x7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFED, size=32)], # nopep8
['a', b'\x01'],
['b', util.num2bytes(0x2DFC9311D490018C7338BF8688861767FF8FF5B2BEBE27548A14B235ECA6874A, size=32)], # nopep8
['g', util.num2bytes(0x04216936D3CD6E53FEC0A4E231FDD6DC5C692CC7609525A7B2C9562D608F25D51A6666666666666666666666666666666666666666666666666666666666666658, size=65)], # nopep8
['n', util.num2bytes(0x1000000000000000000000000000000014DEF9DEA2F79CD65812631A5CF5D3ED, size=32)], # nopep8
['q', vk.to_bytes()],
])
SUPPORTED_CURVES = {
formats.CURVE_NIST256: {
# https://tools.ietf.org/html/rfc6637#section-11
'oid': b'\x2A\x86\x48\xCE\x3D\x03\x01\x07',
'algo_id': 19,
'serialize': _serialize_nist256,
'keygrip': _keygrip_nist256,
},
formats.CURVE_ED25519: {
'oid': b'\x2B\x06\x01\x04\x01\xDA\x47\x0F\x01',
'algo_id': 22,
'serialize': _serialize_ed25519,
'keygrip': _keygrip_ed25519,
}
}
ECDH_ALGO_ID = 18
CUSTOM_SUBPACKET = subpacket(100, b'TREZOR-GPG') # marks "our" pubkey
def find_curve_by_algo_id(algo_id):
"""Find curve name that matches a public key algorith ID."""
if algo_id == ECDH_ALGO_ID:
return formats.CURVE_NIST256
curve_name, = [name for name, info in SUPPORTED_CURVES.items()
if info['algo_id'] == algo_id]
return curve_name
class PublicKey(object):
"""GPG representation for public key packets."""
def __init__(self, curve_name, created, verifying_key, ecdh=False):
"""Contruct using a ECDSA VerifyingKey object."""
self.curve_info = SUPPORTED_CURVES[curve_name]
self.created = int(created) # time since Epoch
self.verifying_key = verifying_key
self.ecdh = ecdh
if ecdh:
self.algo_id = ECDH_ALGO_ID
self.ecdh_packet = b'\x03\x01\x08\x07'
else:
self.algo_id = self.curve_info['algo_id']
self.ecdh_packet = b''
hex_key_id = util.hexlify(self.key_id())[-8:]
self.desc = 'GPG public key {}/{}'.format(curve_name, hex_key_id)
@property
def keygrip(self):
"""Compute GPG2 keygrip."""
return self.curve_info['keygrip'](self.verifying_key)
def data(self):
"""Data for packet creation."""
header = struct.pack('>BLB',
4, # version
self.created, # creation
self.algo_id) # public key algorithm ID
oid = util.prefix_len('>B', self.curve_info['oid'])
blob = self.curve_info['serialize'](self.verifying_key)
return header + oid + blob + self.ecdh_packet
def data_to_hash(self):
"""Data for digest computation."""
return b'\x99' + util.prefix_len('>H', self.data())
def _fingerprint(self):
return hashlib.sha1(self.data_to_hash()).digest()
def key_id(self):
"""Short (8 byte) GPG key ID."""
return self._fingerprint()[-8:]
def __repr__(self):
"""Short (8 hexadecimal digits) GPG key ID."""
return self.desc
__str__ = __repr__
def _split_lines(body, size):
lines = []
for i in range(0, len(body), size):
lines.append(body[i:i+size] + '\n')
return ''.join(lines)
def armor(blob, type_str):
"""See https://tools.ietf.org/html/rfc4880#section-6 for details."""
head = '-----BEGIN PGP {}-----\nVersion: GnuPG v2\n\n'.format(type_str)
body = base64.b64encode(blob).decode('ascii')
checksum = base64.b64encode(util.crc24(blob)).decode('ascii')
tail = '-----END PGP {}-----\n'.format(type_str)
return head + _split_lines(body, 64) + '=' + checksum + '\n' + tail
def make_signature(signer_func, data_to_sign, public_algo,
hashed_subpackets, unhashed_subpackets, sig_type=0):
"""Create new GPG signature."""
# pylint: disable=too-many-arguments
header = struct.pack('>BBBB',
4, # version
sig_type, # rfc4880 (section-5.2.1)
public_algo,
8) # hash_alg (SHA256)
hashed = subpackets(*hashed_subpackets)
unhashed = subpackets(*unhashed_subpackets)
tail = b'\x04\xff' + struct.pack('>L', len(header) + len(hashed))
data_to_hash = data_to_sign + header + hashed + tail
log.debug('hashing %d bytes', len(data_to_hash))
digest = hashlib.sha256(data_to_hash).digest()
log.debug('signing digest: %s', util.hexlify(digest))
params = signer_func(digest=digest)
sig = b''.join(mpi(p) for p in params)
return bytes(header + hashed + unhashed +
digest[:2] + # used for decoder's sanity check
sig) # actual ECDSA signature

View File

@@ -0,0 +1 @@
"""Tests for GPG module."""

View File

@@ -0,0 +1,124 @@
import glob
import hashlib
import io
import os
import pytest
from .. import decode, protocol
from ... import util
def test_subpackets():
s = io.BytesIO(b'\x00\x05\x02\xAB\xCD\x01\xEF')
assert decode.parse_subpackets(util.Reader(s)) == [b'\xAB\xCD', b'\xEF']
def test_subpackets_prefix():
for n in [0, 1, 2, 4, 5, 10, 191, 192, 193,
255, 256, 257, 8383, 8384, 65530]:
item = b'?' * n # create dummy subpacket
prefixed = protocol.subpackets(item)
result = decode.parse_subpackets(util.Reader(io.BytesIO(prefixed)))
assert [item] == result
def test_mpi():
s = io.BytesIO(b'\x00\x09\x01\x23')
assert decode.parse_mpi(util.Reader(s)) == 0x123
s = io.BytesIO(b'\x00\x09\x01\x23\x00\x03\x05')
assert decode.parse_mpis(util.Reader(s), n=2) == [0x123, 5]
def assert_subdict(d, s):
for k, v in s.items():
assert d[k] == v
def test_primary_nist256p1():
# pylint: disable=line-too-long
data = b'''-----BEGIN PGP PUBLIC KEY BLOCK-----
Version: GnuPG v2
mFIEV0hI1hMIKoZIzj0DAQcCAwTXY7aq01xMPSU7gTHU9B7Z2CFoCk1Y4WYb8Tiy
hurvIZ5la6+UEgAKF9HXpQo0yE+HQOgufoLlCpdE7NoEUb+HtAd0ZXN0aW5niHYE
ExMIABIFAldISNYCGwMCFQgCFgACF4AAFgkQTcCehfpEIPILZFRSRVpPUi1HUEeV
3QEApHKmBkbLVZNpsB8q9mBzKytxnOHNB3QWDuoKJu/ERi4A/1wRGZ/B0BDazHck
zpR9luXTKwMEl+mlZmwEFKZXBmir
=oyj0
-----END PGP PUBLIC KEY BLOCK-----
'''
stream = io.BytesIO(decode.remove_armor(data))
pubkey, user_id, signature = list(decode.parse_packets(stream))
expected_pubkey = {
'created': 1464355030, 'type': 'pubkey', 'tag': 6,
'version': 4, 'algo': 19, 'key_id': b'M\xc0\x9e\x85\xfaD \xf2',
'_to_hash': b'\x99\x00R\x04WHH\xd6\x13\x08*\x86H\xce=\x03\x01\x07\x02\x03\x04\xd7c\xb6\xaa\xd3\\L=%;\x811\xd4\xf4\x1e\xd9\xd8!h\nMX\xe1f\x1b\xf18\xb2\x86\xea\xef!\x9eek\xaf\x94\x12\x00\n\x17\xd1\xd7\xa5\n4\xc8O\x87@\xe8.~\x82\xe5\n\x97D\xec\xda\x04Q\xbf\x87' # nopep8
}
assert_subdict(pubkey, expected_pubkey)
point = pubkey['verifying_key'].pubkey.point
assert point.x(), point.y() == (
97423441028100245505102979561460969898742433559010922791700160771755342491425,
71644624850142103522769833619875243486871666152651730678601507641225861250951
)
assert_subdict(user_id, {
'tag': 13, 'type': 'user_id', 'value': b'testing',
'_to_hash': b'\xb4\x00\x00\x00\x07testing'
})
assert_subdict(signature, {
'pubkey_alg': 19, '_is_custom': True, 'hash_alg': 8, 'tag': 2,
'sig_type': 19, 'version': 4, 'type': 'signature', 'hash_prefix': b'\x95\xdd',
'sig': (74381873592149178031432444136130575481350858387410643140628758456112511206958,
41642995320462795718437755373080464775445470754419831653624197847615308982443),
'hashed_subpackets': [b'\x02WHH\xd6', b'\x1b\x03', b'\x15\x08', b'\x16\x00', b'\x17\x80'],
'unhashed_subpackets': [b'\x10M\xc0\x9e\x85\xfaD \xf2', b'dTREZOR-GPG'],
'_to_hash': b'\x04\x13\x13\x08\x00\x12\x05\x02WHH\xd6\x02\x1b\x03\x02\x15\x08\x02\x16\x00\x02\x17\x80\x04\xff\x00\x00\x00\x18' # nopep8
})
digest = decode.digest_packets(packets=[pubkey, user_id, signature],
hasher=hashlib.sha256())
decode.verify_digest(pubkey=pubkey, digest=digest,
signature=signature['sig'],
label='GPG primary public key')
with pytest.raises(ValueError):
bad_digest = b'\x00' * len(digest)
decode.verify_digest(pubkey=pubkey, digest=bad_digest,
signature=signature['sig'],
label='GPG primary public key')
message = b'Hello, World!\n'
signature = b'''-----BEGIN PGP SIGNATURE-----
Version: GnuPG v2
iF4EABMIAAYFAldIlfQACgkQTcCehfpEIPKOUgD9FjaeWla4wOuDZ7P6fhkT5nZp
KDQU0N5KmNwLlt2kwo4A/jQkBII2cI8tTqOVTLNRXXqIOsMf/fG4jKM/VOFc/01c
=dC+z
-----END PGP SIGNATURE-----
'''
decode.verify(pubkey=pubkey, signature=signature, original_data=message)
pubkey = decode.load_public_key(pubkey_bytes=decode.remove_armor(data))
assert_subdict(pubkey, expected_pubkey)
assert_subdict(pubkey, {'user_id': b'testing'})
pubkey = decode.load_public_key(pubkey_bytes=decode.remove_armor(data),
use_custom=True)
assert_subdict(pubkey, expected_pubkey)
assert_subdict(pubkey, {'user_id': b'testing'})
cwd = os.path.join(os.path.dirname(__file__))
input_files = glob.glob(os.path.join(cwd, '*.gpg'))
@pytest.fixture(params=input_files)
def public_key_path(request):
return request.param
def test_gpg_files(public_key_path): # pylint: disable=redefined-outer-name
with open(public_key_path, 'rb') as f:
keys = decode.parse_public_keys(f)
assert len(keys) > 0

View File

@@ -0,0 +1,82 @@
import io
import mock
from .. import keyring
def test_unescape_short():
assert keyring.unescape(b'abc%0AX%0D %25;.-+()') == b'abc\nX\r %;.-+()'
def test_unescape_long():
escaped = (b'D (7:sig-val(3:dsa(1:r32:\x1d\x15.\x12\xe8h\x19\xd9O\xeb\x06'
b'yD?a:/\xae\xdb\xac\x93\xa6\x86\xcbs\xb8\x03\xf1\xcb\x89\xc7'
b'\x1f)(1:s32:%25\xb5\x04\x94\xc7\xc4X\xc7\xe0%0D\x08\xbb%0DuN'
b'\x9c6}[\xc2=t\x8c\xfdD\x81\xe8\xdd\x86=\xe2\xa9)))')
unescaped = (b'D (7:sig-val(3:dsa(1:r32:\x1d\x15.\x12\xe8h\x19\xd9O\xeb'
b'\x06yD?a:/\xae\xdb\xac\x93\xa6\x86\xcbs\xb8\x03\xf1\xcb\x89'
b'\xc7\x1f)(1:s32:%\xb5\x04\x94\xc7\xc4X\xc7\xe0\r\x08\xbb\ru'
b'N\x9c6}[\xc2=t\x8c\xfdD\x81\xe8\xdd\x86=\xe2\xa9)))')
assert keyring.unescape(escaped) == unescaped
def test_parse_term():
assert keyring.parse(b'4:abcdXXX') == (b'abcd', b'XXX')
def test_parse_ecdsa():
sig, rest = keyring.parse(b'(7:sig-val(5:ecdsa'
b'(1:r2:\x01\x02)(1:s2:\x03\x04)))')
values = [[b'r', b'\x01\x02'], [b's', b'\x03\x04']]
assert sig == [b'sig-val', [b'ecdsa'] + values]
assert rest == b''
assert keyring.parse_sig(sig) == (0x102, 0x304)
def test_parse_rsa():
sig, rest = keyring.parse(b'(7:sig-val(3:rsa(1:s4:\x01\x02\x03\x04)))')
assert sig == [b'sig-val', [b'rsa', [b's', b'\x01\x02\x03\x04']]]
assert rest == b''
assert keyring.parse_sig(sig) == (0x1020304,)
class FakeSocket(object):
def __init__(self):
self.rx = io.BytesIO()
self.tx = io.BytesIO()
def recv(self, n):
return self.rx.read(n)
def sendall(self, data):
self.tx.write(data)
def test_sign_digest():
sock = FakeSocket()
sock.rx.write(b'OK Pleased to meet you, process XYZ\n')
sock.rx.write(b'OK\n' * 6)
sock.rx.write(b'D (7:sig-val(3:rsa(1:s16:0123456789ABCDEF)))\n')
sock.rx.seek(0)
keygrip = '1234'
digest = b'A' * 32
sp = mock.Mock(spec=['check_output'])
sp.check_output.return_value = '/dev/pts/0'
sig = keyring.sign_digest(sock=sock, keygrip=keygrip,
digest=digest, sp=sp,
environ={'DISPLAY': ':0'})
assert sig == (0x30313233343536373839414243444546,)
assert sock.tx.getvalue() == b'''RESET
OPTION ttyname=/dev/pts/0
OPTION display=:0
SIGKEY 1234
SETHASH 8 4141414141414141414141414141414141414141414141414141414141414141
SETKEYDESC Sign+a+new+TREZOR-based+subkey
PKSIGN
'''
def test_iterlines():
sock = FakeSocket()
sock.rx.write(b'foo\nbar\nxyz')
assert list(keyring.iterlines(sock)) == []

View File

@@ -0,0 +1,95 @@
import ecdsa
import ed25519
from .. import protocol
from ... import formats
def test_packet():
assert protocol.packet(1, b'') == b'\x84\x00'
assert protocol.packet(2, b'A') == b'\x88\x01A'
blob = b'B' * 0xAB
assert protocol.packet(3, blob) == b'\x8c\xAB' + blob
blob = b'C' * 0x1234
assert protocol.packet(3, blob) == b'\x8d\x12\x34' + blob
blob = b'D' * 0x12345678
assert protocol.packet(4, blob) == b'\x92\x12\x34\x56\x78' + blob
def test_subpackets():
assert protocol.subpacket(1, b'') == b'\x01'
assert protocol.subpacket(2, '>H', 0x0304) == b'\x02\x03\x04'
assert protocol.subpacket_long(9, 0x12345678) == b'\x09\x12\x34\x56\x78'
assert protocol.subpacket_time(0x12345678) == b'\x02\x12\x34\x56\x78'
assert protocol.subpacket_byte(0xAB, 0xCD) == b'\xAB\xCD'
assert protocol.subpackets() == b'\x00\x00'
assert protocol.subpackets(b'ABC', b'12345') == b'\x00\x0A\x03ABC\x0512345'
def test_mpi():
assert protocol.mpi(0x123) == b'\x00\x09\x01\x23'
def test_find():
assert protocol.find_curve_by_algo_id(19) == formats.CURVE_NIST256
assert protocol.find_curve_by_algo_id(22) == formats.CURVE_ED25519
def test_armor():
data = bytearray(range(256))
assert protocol.armor(data, 'TEST') == '''-----BEGIN PGP TEST-----
Version: GnuPG v2
AAECAwQFBgcICQoLDA0ODxAREhMUFRYXGBkaGxwdHh8gISIjJCUmJygpKissLS4v
MDEyMzQ1Njc4OTo7PD0+P0BBQkNERUZHSElKS0xNTk9QUVJTVFVWV1hZWltcXV5f
YGFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6e3x9fn+AgYKDhIWGh4iJiouMjY6P
kJGSk5SVlpeYmZqbnJ2en6ChoqOkpaanqKmqq6ytrq+wsbKztLW2t7i5uru8vb6/
wMHCw8TFxsfIycrLzM3Oz9DR0tPU1dbX2Nna29zd3t/g4eLj5OXm5+jp6uvs7e7v
8PHy8/T19vf4+fr7/P3+/w==
=W700
-----END PGP TEST-----
'''
def test_make_signature():
def signer_func(digest):
assert digest == (b'\xd0\xe5]|\x8bP\xe6\x91\xb3\xe8+\xf4A\xf0`(\xb1'
b'\xc7\xf4;\x86\x97s\xdb\x9a\xda\xee< \xcb\x9e\x00')
return (7, 8)
sig = protocol.make_signature(
signer_func=signer_func,
data_to_sign=b'Hello World!',
public_algo=22,
hashed_subpackets=[protocol.subpacket_time(1)],
unhashed_subpackets=[],
sig_type=25)
assert sig == (b'\x04\x19\x16\x08\x00\x06\x05\x02'
b'\x00\x00\x00\x01\x00\x00\xd0\xe5\x00\x03\x07\x00\x04\x08')
def test_nist256p1():
sk = ecdsa.SigningKey.from_secret_exponent(secexp=1, curve=ecdsa.NIST256p)
vk = sk.get_verifying_key()
pk = protocol.PublicKey(curve_name=formats.CURVE_NIST256,
created=42, verifying_key=vk)
assert repr(pk) == 'GPG public key nist256p1/F82361D9'
assert pk.keygrip == b'\x95\x85.\x91\x7f\xe2\xc3\x91R\xba\x99\x81\x92\xb5y\x1d\xb1\\\xdc\xf0'
def test_nist256p1_ecdh():
sk = ecdsa.SigningKey.from_secret_exponent(secexp=1, curve=ecdsa.NIST256p)
vk = sk.get_verifying_key()
pk = protocol.PublicKey(curve_name=formats.CURVE_NIST256,
created=42, verifying_key=vk, ecdh=True)
assert repr(pk) == 'GPG public key nist256p1/5811DF46'
assert pk.keygrip == b'\x95\x85.\x91\x7f\xe2\xc3\x91R\xba\x99\x81\x92\xb5y\x1d\xb1\\\xdc\xf0'
def test_ed25519():
sk = ed25519.SigningKey(b'\x00' * 32)
vk = sk.get_verifying_key()
pk = protocol.PublicKey(curve_name=formats.CURVE_ED25519,
created=42, verifying_key=vk)
assert repr(pk) == 'GPG public key ed25519/36B40FE6'
assert pk.keygrip == b'\xbf\x01\x90l\x17\xb64\xa3-\xf4\xc0gr\x99\x18<\xddBQ?'

View File

@@ -29,7 +29,7 @@ class FakeConnection(object):
def get_public_node(self, n, ecdsa_curve_name=b'secp256k1'):
assert not self.closed
assert n == ADDR
assert ecdsa_curve_name in {b'secp256k1', b'nist256p1'}
assert ecdsa_curve_name in {'secp256k1', 'nist256p1'}
result = mock.Mock(spec=[])
result.node = mock.Mock(spec=[])
result.node.public_key = PUBKEY
@@ -90,8 +90,8 @@ def test_ssh_agent():
assert (client.identity_to_string(identity) ==
client.identity_to_string(ident))
assert challenge_hidden == BLOB
assert challenge_visual == 'VISUAL'
assert ecdsa_curve_name == b'nist256p1'
assert challenge_visual == ''
assert ecdsa_curve_name == 'nist256p1'
result = mock.Mock(spec=[])
result.public_key = PUBKEY
@@ -99,8 +99,7 @@ def test_ssh_agent():
return result
c.client.sign_identity = ssh_sign_identity
signature = c.sign_ssh_challenge(label=label, blob=BLOB,
visual='VISUAL')
signature = c.sign_ssh_challenge(label=label, blob=BLOB)
key = formats.import_public_key(PUBKEY_TEXT)
serialized_sig = key['verifier'](sig=signature, msg=BLOB)
@@ -122,7 +121,7 @@ def test_ssh_agent():
c.client.sign_identity = cancel_sign_identity
with pytest.raises(IOError):
c.sign_ssh_challenge(label=label, blob=BLOB, visual='VISUAL')
c.sign_ssh_challenge(label=label, blob=BLOB)
def test_utils():

View File

@@ -29,7 +29,7 @@ def test_parse_public_key():
assert key['name'] == b'home'
assert key['point'] == _point
assert key['curve'] == b'nist256p1'
assert key['curve'] == 'nist256p1'
assert key['fingerprint'] == '4b:19:bc:0f:c8:7e:dc:fa:1a:e3:c2:ff:6f:e0:80:a2' # nopep8
assert key['type'] == b'ecdsa-sha2-nistp256'
@@ -46,7 +46,7 @@ def test_parse_ed25519():
'fSO8nLIi736is+f0erq28RTc7CkM11NZtTKR hello\n')
p = formats.import_public_key(pubkey)
assert p['name'] == b'hello'
assert p['curve'] == b'ed25519'
assert p['curve'] == 'ed25519'
BLOB = (b'\x00\x00\x00\x0bssh-ed25519\x00\x00\x00 P]\x17kc}#'
b'\xbc\x9c\xb2"\xef~\xa2\xb3\xe7\xf4z\xba\xb6\xf1\x14'

View File

@@ -46,3 +46,54 @@ def test_send_recv():
assert util.recv(s, 2) == b'3*'
pytest.raises(EOFError, util.recv, s, 1)
def test_crc24():
assert util.crc24(b'') == b'\xb7\x04\xce'
assert util.crc24(b'1234567890') == b'\x8c\x00\x72'
def test_bit():
assert util.bit(6, 3) == 0
assert util.bit(6, 2) == 1
assert util.bit(6, 1) == 1
assert util.bit(6, 0) == 0
def test_split_bits():
assert util.split_bits(0x1234, 4, 8, 4) == [0x1, 0x23, 0x4]
def test_hexlify():
assert util.hexlify(b'\x12\x34\xab\xcd') == '1234ABCD'
def test_low_bits():
assert util.low_bits(0x1234, 12) == 0x234
assert util.low_bits(0x1234, 32) == 0x1234
assert util.low_bits(0x1234, 0) == 0
def test_readfmt():
stream = io.BytesIO(b'ABC\x12\x34')
assert util.readfmt(stream, 'B') == (65,)
assert util.readfmt(stream, '>2sH') == (b'BC', 0x1234)
def test_prefix_len():
assert util.prefix_len('>H', b'ABCD') == b'\x00\x04ABCD'
def test_reader():
stream = io.BytesIO(b'ABC\x12\x34')
r = util.Reader(stream)
assert r.read(1) == b'A'
assert r.readfmt('2s') == b'BC'
dst = io.BytesIO()
with r.capture(dst):
assert r.readfmt('>H') == 0x1234
assert dst.getvalue() == b'\x12\x34'
with pytest.raises(EOFError):
r.read(1)

View File

@@ -1,4 +1,6 @@
"""Various I/O and serialization utilities."""
import binascii
import contextlib
import io
import struct
@@ -60,7 +62,7 @@ def num2bytes(value, size):
res.append(value & 0xFF)
value = value >> 8
assert value == 0
return bytearray(list(reversed(res)))
return bytes(bytearray(list(reversed(res))))
def pack(fmt, *args):
@@ -75,3 +77,99 @@ def frame(*msgs):
res.write(msg)
msg = res.getvalue()
return pack('L', len(msg)) + msg
def crc24(blob):
"""See https://tools.ietf.org/html/rfc4880#section-6.1 for details."""
CRC24_INIT = 0x0B704CE
CRC24_POLY = 0x1864CFB
crc = CRC24_INIT
for octet in bytearray(blob):
crc ^= (octet << 16)
for _ in range(8):
crc <<= 1
if crc & 0x1000000:
crc ^= CRC24_POLY
assert 0 <= crc < 0x1000000
crc_bytes = struct.pack('>L', crc)
assert crc_bytes[:1] == b'\x00'
return crc_bytes[1:]
def bit(value, i):
"""Extract the i-th bit out of value."""
return 1 if value & (1 << i) else 0
def low_bits(value, n):
"""Extract the lowest n bits out of value."""
return value & ((1 << n) - 1)
def split_bits(value, *bits):
"""
Split integer value into list of ints, according to `bits` list.
For example, split_bits(0x1234, 4, 8, 4) == [0x1, 0x23, 0x4]
"""
result = []
for b in reversed(bits):
mask = (1 << b) - 1
result.append(value & mask)
value = value >> b
assert value == 0
result.reverse()
return result
def readfmt(stream, fmt):
"""Read and unpack an object from stream, using a struct format string."""
size = struct.calcsize(fmt)
blob = stream.read(size)
return struct.unpack(fmt, blob)
def prefix_len(fmt, blob):
"""Prefix `blob` with its size, serialized using `fmt` format."""
return struct.pack(fmt, len(blob)) + blob
def hexlify(blob):
"""Utility for consistent hexadecimal formatting."""
return binascii.hexlify(blob).decode('ascii').upper()
class Reader(object):
"""Read basic type objects out of given stream."""
def __init__(self, stream):
"""Create a non-capturing reader."""
self.s = stream
self._captured = None
def readfmt(self, fmt):
"""Read a specified object, using a struct format string."""
size = struct.calcsize(fmt)
blob = self.read(size)
obj, = struct.unpack(fmt, blob)
return obj
def read(self, size=None):
"""Read `size` bytes from stream."""
blob = self.s.read(size)
if size is not None and len(blob) < size:
raise EOFError
if self._captured:
self._captured.write(blob)
return blob
@contextlib.contextmanager
def capture(self, stream):
"""Capture all data read during this context."""
self._captured = stream
try:
yield
finally:
self._captured = None