1 # Copyright (C) 2012 W. Trevor King <wking@tremily.us>
3 # This file is part of pgp-mime.
5 # pgp-mime is free software: you can redistribute it and/or modify it under the
6 # terms of the GNU General Public License as published by the Free Software
7 # Foundation, either version 3 of the License, or (at your option) any later
10 # pgp-mime is distributed in the hope that it will be useful, but WITHOUT ANY
11 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12 # A PARTICULAR PURPOSE. See the GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License along with
15 # pgp-mime. If not, see <http://www.gnu.org/licenses/>.
17 import codecs as _codecs
18 import logging as _logging
20 import os.path as _os_path
22 from pyassuan import client as _client
23 from pyassuan import common as _common
25 from . import LOG as _LOG
26 from . import signature as _signature
29 def get_client(**kwargs):
30 logger = _logging.getLogger('{}.{}'.format(_LOG.name, 'pyassuan'))
31 client = _client.AssuanClient(
32 name='pgp-mime', logger=logger, use_sublogger=False,
33 close_on_disconnect=True)
34 client.connect(socket_path='/tmp/gpgme-tool.sock')
37 def disconnect(client):
38 client.make_request(_common.Request('BYE'))
42 responses,data = client.get_responses() # get initial 'OK' from server
43 client.make_request(_common.Request('ARMOR', 'true'))
45 def _read(fd, buffersize=512):
49 new = _os.read(fd, buffersize)
50 except Exception as e:
51 _LOG.warn('error while reading: {}'.format(e))
61 i += _os.write(fd, data[i:])
64 def sign_and_encrypt_bytes(data, signers=None, recipients=None,
65 always_trust=False, mode='detach',
66 allow_default_signer=False):
67 r"""Sign ``data`` with ``signers`` and encrypt to ``recipients``.
69 Just sign (with a detached signature):
71 >>> print(sign_and_encrypt_bytes(
72 ... bytes(b'Hello'), signers=['pgp-mime@invalid.com']))
73 ... # doctest: +ELLIPSIS
74 b'-----BEGIN PGP SIGNATURE-----\n...-----END PGP SIGNATURE-----\n'
78 >>> sign_and_encrypt_bytes(
79 ... bytes(b'Hello'), recipients=['pgp-mime@invalid.com'],
80 ... always_trust=True)
81 ... # doctest: +ELLIPSIS
82 b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
86 >>> sign_and_encrypt_bytes(
87 ... bytes(b'Hello'), signers=['pgp-mime@invalid.com'],
88 ... recipients=['pgp-mime@invalid.com'], always_trust=True)
89 ... # doctest: +ELLIPSIS
90 b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
92 Sign and encrypt with a specific subkey:
94 >>> sign_and_encrypt_bytes(
95 ... bytes(b'Hello'), signers=['0x2F73DE2E'],
96 ... recipients=['pgp-mime@invalid.com'], always_trust=True)
97 ... # doctest: +ELLIPSIS
98 b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
100 input_read,input_write = _os.pipe()
101 output_read,output_write = _os.pipe()
102 client = get_client()
106 for signer in signers:
107 client.make_request(_common.Request('SIGNER', signer))
109 for recipient in recipients:
110 client.make_request(_common.Request('RECIPIENT', recipient))
111 client.send_fds([input_read])
112 client.make_request(_common.Request('INPUT', 'FD'))
113 _os.close(input_read)
115 client.send_fds([output_write])
116 client.make_request(_common.Request('OUTPUT', 'FD'))
117 _os.close(output_write)
120 if signers or allow_default_signer:
122 command = 'SIGN_ENCRYPT'
125 parameters.append('--{}'.format(mode))
129 raise ValueError('must specify at least one signer or recipient')
131 parameters.append('--always-trust')
132 _write(input_write, data)
133 _os.close(input_write)
136 _common.Request(command, ' '.join(parameters)))
137 d = _read(output_read)
140 for fd in [input_read, input_write, output_read, output_write]:
145 def decrypt_bytes(data):
146 r"""Decrypt ``data``.
149 ... '-----BEGIN PGP MESSAGE-----',
150 ... 'Version: GnuPG v2.0.17 (GNU/Linux)',
152 ... 'hQEMA1Ea7aZDMrbjAQf/TAqLjksZSJxSqkBxYT5gtLQoXY6isvRZg2apjs7CW0y2',
153 ... 'tFK/ptnVYAq2OtWQFhbiJXj8hmwJyyFfb3lghpeu4ihO52JgkkwOpmJb6dxjOi83',
154 ... 'qDwaGOogEPH38BNLuwdrMCW0jmNROwvS796PtqSGUaJTuIiKUB8lETwPwIHrDc11',
155 ... 'N3RWStE5uShNkXXQXplUoeCKf3N4XguXym+GQCqJQzlEMrkkDdr4l7mzvt3Nf8EA',
156 ... 'SgSak086tUoo9x8IN5PJCuOJkcXcjQzFcpqOsA7dyZKO8NeQUZv2JvlZuorckNvN',
157 ... 'xx3PwW0a8VeJgTQrh64ZK/d3F3gNHUTzXkq/UIn25tJFAcmSUwxtsBal7p8zAeCV',
158 ... '8zefsHRQ5Y03IBeYBcVJBhDS9XfvwLQTJiGGstPCxzKTwSUT1MzV5t5twG/STDCc',
161 ... '-----END PGP MESSAGE-----',
163 ... ]).encode('us-ascii')
167 input_read,input_write = _os.pipe()
168 output_read,output_write = _os.pipe()
169 client = get_client()
172 client.send_fds([input_read])
173 client.make_request(_common.Request('INPUT', 'FD'))
174 _os.close(input_read)
176 client.send_fds([output_write])
177 client.make_request(_common.Request('OUTPUT', 'FD'))
178 _os.close(output_write)
180 _write(input_write, data)
181 _os.close(input_write)
183 client.make_request(_common.Request('DECRYPT'))
184 d = _read(output_read)
187 for fd in [input_read, input_write, output_read, output_write]:
192 def verify_bytes(data, signature=None, always_trust=False):
193 r"""Verify a signature on ``data``, possibly decrypting first.
195 These tests assume you didn't trust the distributed test key.
198 ... '-----BEGIN PGP MESSAGE-----',
199 ... 'Version: GnuPG v2.0.17 (GNU/Linux)',
201 ... 'hQEMA1Ea7aZDMrbjAQf/YM1SeFzNGz0DnUynaEyhfGCvcqmjtbN1PtZMpT7VaQLN',
202 ... 'a+c0faskr79Atz0+2IBR7CDOlcETrRtH2EnrWukbRIDtmffNFGuhMRTNfnQ15OIN',
203 ... 'qrmt2P5gXznsgnm2XjzTK7S/Cc3Aq+zjaDrDt7bIedEdz+EyNgaKuL/lB9cAB8xL',
204 ... 'YYp/yn55Myjair2idgzsa7w/QXdE3RhpyRLqR2Jgz4P1I1xOgUYnylbpIZL9FOKN',
205 ... 'NR3RQhkGdANBku8otfthb5ZUGsNMV45ct4V8PE+xChjFb9gcwpaf1hhoIF/sYHD5',
206 ... 'Bkf+v/J8F40KGYY16b0DjQIUlnra9y7q9jj0h2bvc9LAtgHtVUso133LLcVYl7RP',
207 ... 'Vjyz9Ps366BtIdPlAL4CoF5hEcMKS5J3h1vRlyAKN4uHENl5vKvoxn7ID3JhhWQc',
208 ... '6QrPGis64zi3OnYor34HPh/KNJvkgOQkekmtYuTxnkiONA4lhMDJgeaVZ9WZq+GV',
209 ... 'MaCvCFGNYU2TV4V8wMlnUbF8d5bDQ83g8MxIVKdDcnBzzYLZha+qmz4Spry9iB53',
210 ... 'Sg/sM5H8gWWSl7Oj1lxVg7o7IscpQfVt6zL6jD2VjL3L3Hu7WEXIrcGZtvrP4d+C',
211 ... 'TGYWiGlh5B2UCFk2bVctfw8W/QfaVvJYD4Rfqta2V2p14KIJLFRSGa1g26W4ixrH',
212 ... 'XKxgaA3AIfJ+6c5RoisRLuYCxvQi91wkE9hAXR+inXK4Hq4SmiHoeITZFhHP3hh3',
213 ... 'rbpp8mopiMNxWqCbuqgILP6pShn4oPclu9aR8uJ1ziDxISTGYC71mvLUERUjFn2L',
214 ... 'fu6C0+TCC9RmeyL+eNdM6cjs1G7YR6yX',
216 ... '-----END PGP MESSAGE-----',
218 ... ]).encode('us-ascii')
219 >>> output,verified,signatures = verify_bytes(b)
224 >>> for s in signatures:
226 ... # doctest: +REPORT_UDIFF
227 B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3 signature:
237 signature expired: False
241 timestamp: Wed Mar 21 19:13:57 2012
242 expiration timestamp: None
243 wrong key usage: False
244 pka trust: not available
247 validity reason: success
248 public key algorithm: RSA
249 hash algorithm: SHA256
250 >>> b = b'Success!\n'
251 >>> signature = '\n'.join([
252 ... '-----BEGIN PGP SIGNATURE-----',
253 ... 'Version: GnuPG v2.0.17 (GNU/Linux)',
255 ... 'iQEcBAEBAgAGBQJPaiw/AAoJEFEa7aZDMrbj93gH/1fQPXLjUTpONJUTmvGoMLNA',
256 ... 'W9ZhjpUL5i6rRqYGUvQ4kTEDuPMxkMrCyFCDHEhSDHufMek6Nso5/HeJn3aqxlgs',
257 ... 'hmNlvAq4FI6JQyFL7eCp/XG9cPx1p42dTI7JAih8FuK21sS4m/H5XP3R/6KXC99D',
258 ... '39rrXCvvR+yNgKe2dxuJwmKuLteVlcWxiIQwVrYK70GtJHC5BO79G8yGccWoEy9C',
259 ... '9JkJiyNptqZyFjGBNmMmrCSFZ7ZFA02RB+laRmwuIiozw4TJYEksxPrgZMbbcFzx',
260 ... 'zs3JHyV23+Fz1ftalvwskHE7tJkX9Ub8iBMNZ/KxJXXdPdpuMdEYVjoUehkQBQE=',
262 ... '-----END PGP SIGNATURE-----',
264 ... ]).encode('us-ascii')
265 >>> output,verified,signatures = verify_bytes(b, signature=signature)
270 >>> for s in signatures:
272 ... # doctest: +REPORT_UDIFF
273 B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3 signature:
283 signature expired: False
287 timestamp: Wed Mar 21 19:30:07 2012
288 expiration timestamp: None
289 wrong key usage: False
290 pka trust: not available
293 validity reason: success
294 public key algorithm: RSA
297 Data signed by a subkey returns the subkey fingerprint. To find
298 the primary key for a given subkey, use
299 ``pgp_mime.key.lookup_keys()``.
302 ... '-----BEGIN PGP MESSAGE-----',
303 ... 'Version: GnuPG v2.0.19 (GNU/Linux)',
305 ... 'hQEMAxcQCLovc94uAQf9ErTZnr0lYRlLLZIk1VcpNNTHrMro+BmqpFC0jprA4/2m',
306 ... '92klBF4TIS1A9bU5oxzQquaAIDV42P3sXrbxu/YhHLmPGH+dc2JVSfPLL0XOL5GC',
307 ... 'qpQYe5lglRBReFSRktrfhukjHBoXvh3c8T4xYK2r+nIV4gsp+FrSQMIOdhhBoC36',
308 ... 'U1MOk+R+I0JDbWdzZzJONs7ZcAcNDVKqxmAXZUqVgkhPpnGBSBuF9ExKRT3S6e5N',
309 ... 'Rsorb/DjGIUHSZuH2EaWAUz1jJ3nSta7TnveT/avfJiAV7cRS4oVgyyFyuHO5gkI',
310 ... 'o0obeJaut3enVgpq2TUUk0M4L8TX4jjKvDGAYNyuPNLAsQFHLj5eLmJSudGStWuA',
311 ... 'WjKLqBHD0M8/OcwnrTMleJl+h50ZsHO1tvvkXelH+w/jD5SMS+ktxq2Te8Vj7BmM',
312 ... '0WQn3Ys7ViA5PgcSpbqNNLdgc1EMcpPI/sfJAORPKVWRPBKDXX/irY2onAMSe5gH',
313 ... 'teNX6bZd/gaoLWqD/1ZhsOCnlV7LY1R929TJ9vxnJcfKKAKwBDfAaSbecUUMECVw',
314 ... 's4u3ZT1pmNslBmH6XSy3ifLYWu/2xsJuhPradT88BJOBARMGg81gOE6zxGRrMLJa',
315 ... 'KojFgqaF2y4nlZAyaJ1Ld4qCaoQogaL9qE1BbmgtBehZ2FNQiIBSLC0fUUl8A4Py',
316 ... '4d9ZxUoSp7nZmgTN5pUH1N9DIC4ntp/Rak2WnpS7+dRPlp9A2SF0RkeLY+JD9gNm',
317 ... 'j44zBkI79KlgaE/cMt6xUXAF/1ZR/Hv/6GUazGx0l23CnSGuqzLpex2uKOxfKiJt',
318 ... 'jfgyZRhIdFJnRuEXt8dTTDiiYA==',
320 ... '-----END PGP MESSAGE-----',
322 ... ]).encode('us-ascii')
323 >>> output,verified,signatures = verify_bytes(b)
328 >>> for s in signatures:
330 ... # doctest: +REPORT_UDIFF
331 DECC812C8795ADD60538B0CD171008BA2F73DE2E signature:
341 signature expired: False
345 timestamp: Thu Sep 20 15:29:28 2012
346 expiration timestamp: None
347 wrong key usage: False
348 pka trust: not available
351 validity reason: success
352 public key algorithm: RSA
353 hash algorithm: SHA256
355 input_read,input_write = _os.pipe()
357 message_read,message_write = _os.pipe()
358 output_read = output_write = -1
360 message_read = message_write = -1
361 output_read,output_write = _os.pipe()
362 client = get_client()
367 client.send_fds([input_read])
368 client.make_request(_common.Request('INPUT', 'FD'))
369 _os.close(input_read)
372 client.send_fds([message_read])
373 client.make_request(_common.Request('MESSAGE', 'FD'))
374 _os.close(message_read)
377 client.send_fds([output_write])
378 client.make_request(_common.Request('OUTPUT', 'FD'))
379 _os.close(output_write)
382 _write(input_write, signature)
383 _os.close(input_write)
385 _write(message_write, data)
386 _os.close(message_write)
389 _write(input_write, data)
390 _os.close(input_write)
392 client.make_request(_common.Request('VERIFY'))
396 plain = _read(output_read)
397 rs,result = client.make_request(_common.Request('RESULT'))
398 signatures = list(_signature.verify_result_signatures(result))
400 for signature in signatures:
401 if signature.status != 'success':
403 elif signature.pka_trust != 'good':
407 for fd in [input_read, input_write, message_read, message_write,
408 output_read, output_write]:
411 return (plain, verified, signatures)