1 # Copyright (C) 2012 W. Trevor King <wking@tremily.us>
3 # This file is part of pgp-mime.
5 # pgp-mime is free software: you can redistribute it and/or modify it under the
6 # terms of the GNU General Public License as published by the Free Software
7 # Foundation, either version 3 of the License, or (at your option) any later
10 # pgp-mime is distributed in the hope that it will be useful, but WITHOUT ANY
11 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12 # A PARTICULAR PURPOSE. See the GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License along with
15 # pgp-mime. If not, see <http://www.gnu.org/licenses/>.
17 import codecs as _codecs
18 import logging as _logging
20 import os.path as _os_path
22 from pyassuan import client as _client
23 from pyassuan import common as _common
25 from . import LOG as _LOG
26 from . import signature as _signature
29 SOCKET_PATH = _os_path.expanduser(_os_path.join('~', '.gnupg', 'S.gpgme-tool'))
32 def get_client_params(config):
33 r"""Retrieve Assuan client paramters from a config file.
35 >>> from configparser import ConfigParser
36 >>> config = ConfigParser()
37 >>> config.read_string('\n'.join([
39 ... 'socket-path: /tmp/S.gpgme-tool',
41 >>> get_client_params(config)
42 {'socket_path': '/tmp/S.gpgme-tool'}
43 >>> config = ConfigParser()
44 >>> get_smtp_params(ConfigParser())
47 params = {'socket_path': None}
49 params['socket_path'] = config.get('gpgme-tool', 'socket-path')
50 except _configparser.NoSectionError:
52 except _configparser.NoOptionError:
56 def get_client(socket_path=None):
57 if socket_path is None:
58 socket_path = socket_path
59 logger = _logging.getLogger('{}.{}'.format(_LOG.name, 'pyassuan'))
60 client = _client.AssuanClient(
61 name='pgp-mime', logger=logger, use_sublogger=False,
62 close_on_disconnect=True)
63 client.connect(socket_path=socket_path)
66 def disconnect(client):
67 client.make_request(_common.Request('BYE'))
71 responses,data = client.get_responses() # get initial 'OK' from server
72 client.make_request(_common.Request('ARMOR', 'true'))
74 def _read(fd, buffersize=512):
78 new = _os.read(fd, buffersize)
79 except Exception as e:
80 _LOG.warn('error while reading: {}'.format(e))
90 i += _os.write(fd, data[i:])
93 def sign_and_encrypt_bytes(data, signers=None, recipients=None,
94 always_trust=False, mode='detach',
95 allow_default_signer=False, **kwargs):
96 r"""Sign ``data`` with ``signers`` and encrypt to ``recipients``.
98 Just sign (with a detached signature):
100 >>> print(sign_and_encrypt_bytes(
101 ... bytes(b'Hello'), signers=['pgp-mime@invalid.com']))
102 ... # doctest: +ELLIPSIS
103 b'-----BEGIN PGP SIGNATURE-----\n...-----END PGP SIGNATURE-----\n'
107 >>> sign_and_encrypt_bytes(
108 ... bytes(b'Hello'), recipients=['pgp-mime@invalid.com'],
109 ... always_trust=True)
110 ... # doctest: +ELLIPSIS
111 b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
115 >>> sign_and_encrypt_bytes(
116 ... bytes(b'Hello'), signers=['pgp-mime@invalid.com'],
117 ... recipients=['pgp-mime@invalid.com'], always_trust=True)
118 ... # doctest: +ELLIPSIS
119 b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
121 Sign and encrypt with a specific subkey:
123 >>> sign_and_encrypt_bytes(
124 ... bytes(b'Hello'), signers=['0x2F73DE2E'],
125 ... recipients=['pgp-mime@invalid.com'], always_trust=True)
126 ... # doctest: +ELLIPSIS
127 b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
129 input_read,input_write = _os.pipe()
130 output_read,output_write = _os.pipe()
131 client = get_client(**kwargs)
135 for signer in signers:
136 client.make_request(_common.Request('SIGNER', signer))
138 for recipient in recipients:
139 client.make_request(_common.Request('RECIPIENT', recipient))
140 client.send_fds([input_read])
141 client.make_request(_common.Request('INPUT', 'FD'))
142 _os.close(input_read)
144 client.send_fds([output_write])
145 client.make_request(_common.Request('OUTPUT', 'FD'))
146 _os.close(output_write)
149 if signers or allow_default_signer:
151 command = 'SIGN_ENCRYPT'
154 parameters.append('--{}'.format(mode))
158 raise ValueError('must specify at least one signer or recipient')
160 parameters.append('--always-trust')
161 _write(input_write, data)
162 _os.close(input_write)
165 _common.Request(command, ' '.join(parameters)))
166 d = _read(output_read)
169 for fd in [input_read, input_write, output_read, output_write]:
174 def decrypt_bytes(data, **kwargs):
175 r"""Decrypt ``data``.
178 ... '-----BEGIN PGP MESSAGE-----',
179 ... 'Version: GnuPG v2.0.17 (GNU/Linux)',
181 ... 'hQEMA1Ea7aZDMrbjAQf/TAqLjksZSJxSqkBxYT5gtLQoXY6isvRZg2apjs7CW0y2',
182 ... 'tFK/ptnVYAq2OtWQFhbiJXj8hmwJyyFfb3lghpeu4ihO52JgkkwOpmJb6dxjOi83',
183 ... 'qDwaGOogEPH38BNLuwdrMCW0jmNROwvS796PtqSGUaJTuIiKUB8lETwPwIHrDc11',
184 ... 'N3RWStE5uShNkXXQXplUoeCKf3N4XguXym+GQCqJQzlEMrkkDdr4l7mzvt3Nf8EA',
185 ... 'SgSak086tUoo9x8IN5PJCuOJkcXcjQzFcpqOsA7dyZKO8NeQUZv2JvlZuorckNvN',
186 ... 'xx3PwW0a8VeJgTQrh64ZK/d3F3gNHUTzXkq/UIn25tJFAcmSUwxtsBal7p8zAeCV',
187 ... '8zefsHRQ5Y03IBeYBcVJBhDS9XfvwLQTJiGGstPCxzKTwSUT1MzV5t5twG/STDCc',
190 ... '-----END PGP MESSAGE-----',
192 ... ]).encode('us-ascii')
196 input_read,input_write = _os.pipe()
197 output_read,output_write = _os.pipe()
198 client = get_client(**kwargs)
201 client.send_fds([input_read])
202 client.make_request(_common.Request('INPUT', 'FD'))
203 _os.close(input_read)
205 client.send_fds([output_write])
206 client.make_request(_common.Request('OUTPUT', 'FD'))
207 _os.close(output_write)
209 _write(input_write, data)
210 _os.close(input_write)
212 client.make_request(_common.Request('DECRYPT'))
213 d = _read(output_read)
216 for fd in [input_read, input_write, output_read, output_write]:
221 def verify_bytes(data, signature=None, always_trust=False, **kwargs):
222 r"""Verify a signature on ``data``, possibly decrypting first.
224 These tests assume you didn't trust the distributed test key.
227 ... '-----BEGIN PGP MESSAGE-----',
228 ... 'Version: GnuPG v2.0.17 (GNU/Linux)',
230 ... 'hQEMA1Ea7aZDMrbjAQf/YM1SeFzNGz0DnUynaEyhfGCvcqmjtbN1PtZMpT7VaQLN',
231 ... 'a+c0faskr79Atz0+2IBR7CDOlcETrRtH2EnrWukbRIDtmffNFGuhMRTNfnQ15OIN',
232 ... 'qrmt2P5gXznsgnm2XjzTK7S/Cc3Aq+zjaDrDt7bIedEdz+EyNgaKuL/lB9cAB8xL',
233 ... 'YYp/yn55Myjair2idgzsa7w/QXdE3RhpyRLqR2Jgz4P1I1xOgUYnylbpIZL9FOKN',
234 ... 'NR3RQhkGdANBku8otfthb5ZUGsNMV45ct4V8PE+xChjFb9gcwpaf1hhoIF/sYHD5',
235 ... 'Bkf+v/J8F40KGYY16b0DjQIUlnra9y7q9jj0h2bvc9LAtgHtVUso133LLcVYl7RP',
236 ... 'Vjyz9Ps366BtIdPlAL4CoF5hEcMKS5J3h1vRlyAKN4uHENl5vKvoxn7ID3JhhWQc',
237 ... '6QrPGis64zi3OnYor34HPh/KNJvkgOQkekmtYuTxnkiONA4lhMDJgeaVZ9WZq+GV',
238 ... 'MaCvCFGNYU2TV4V8wMlnUbF8d5bDQ83g8MxIVKdDcnBzzYLZha+qmz4Spry9iB53',
239 ... 'Sg/sM5H8gWWSl7Oj1lxVg7o7IscpQfVt6zL6jD2VjL3L3Hu7WEXIrcGZtvrP4d+C',
240 ... 'TGYWiGlh5B2UCFk2bVctfw8W/QfaVvJYD4Rfqta2V2p14KIJLFRSGa1g26W4ixrH',
241 ... 'XKxgaA3AIfJ+6c5RoisRLuYCxvQi91wkE9hAXR+inXK4Hq4SmiHoeITZFhHP3hh3',
242 ... 'rbpp8mopiMNxWqCbuqgILP6pShn4oPclu9aR8uJ1ziDxISTGYC71mvLUERUjFn2L',
243 ... 'fu6C0+TCC9RmeyL+eNdM6cjs1G7YR6yX',
245 ... '-----END PGP MESSAGE-----',
247 ... ]).encode('us-ascii')
248 >>> output,verified,signatures = verify_bytes(b)
253 >>> for s in signatures:
255 ... # doctest: +REPORT_UDIFF
256 B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3 signature:
266 signature expired: False
270 timestamp: Wed Mar 21 19:13:57 2012
271 expiration timestamp: None
272 wrong key usage: False
273 pka trust: not available
276 validity reason: success
277 public key algorithm: RSA
278 hash algorithm: SHA256
279 >>> b = b'Success!\n'
280 >>> signature = '\n'.join([
281 ... '-----BEGIN PGP SIGNATURE-----',
282 ... 'Version: GnuPG v2.0.17 (GNU/Linux)',
284 ... 'iQEcBAEBAgAGBQJPaiw/AAoJEFEa7aZDMrbj93gH/1fQPXLjUTpONJUTmvGoMLNA',
285 ... 'W9ZhjpUL5i6rRqYGUvQ4kTEDuPMxkMrCyFCDHEhSDHufMek6Nso5/HeJn3aqxlgs',
286 ... 'hmNlvAq4FI6JQyFL7eCp/XG9cPx1p42dTI7JAih8FuK21sS4m/H5XP3R/6KXC99D',
287 ... '39rrXCvvR+yNgKe2dxuJwmKuLteVlcWxiIQwVrYK70GtJHC5BO79G8yGccWoEy9C',
288 ... '9JkJiyNptqZyFjGBNmMmrCSFZ7ZFA02RB+laRmwuIiozw4TJYEksxPrgZMbbcFzx',
289 ... 'zs3JHyV23+Fz1ftalvwskHE7tJkX9Ub8iBMNZ/KxJXXdPdpuMdEYVjoUehkQBQE=',
291 ... '-----END PGP SIGNATURE-----',
293 ... ]).encode('us-ascii')
294 >>> output,verified,signatures = verify_bytes(b, signature=signature)
299 >>> for s in signatures:
301 ... # doctest: +REPORT_UDIFF
302 B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3 signature:
312 signature expired: False
316 timestamp: Wed Mar 21 19:30:07 2012
317 expiration timestamp: None
318 wrong key usage: False
319 pka trust: not available
322 validity reason: success
323 public key algorithm: RSA
326 Data signed by a subkey returns the subkey fingerprint. To find
327 the primary key for a given subkey, use
328 ``pgp_mime.key.lookup_keys()``.
331 ... '-----BEGIN PGP MESSAGE-----',
332 ... 'Version: GnuPG v2.0.19 (GNU/Linux)',
334 ... 'hQEMAxcQCLovc94uAQf9ErTZnr0lYRlLLZIk1VcpNNTHrMro+BmqpFC0jprA4/2m',
335 ... '92klBF4TIS1A9bU5oxzQquaAIDV42P3sXrbxu/YhHLmPGH+dc2JVSfPLL0XOL5GC',
336 ... 'qpQYe5lglRBReFSRktrfhukjHBoXvh3c8T4xYK2r+nIV4gsp+FrSQMIOdhhBoC36',
337 ... 'U1MOk+R+I0JDbWdzZzJONs7ZcAcNDVKqxmAXZUqVgkhPpnGBSBuF9ExKRT3S6e5N',
338 ... 'Rsorb/DjGIUHSZuH2EaWAUz1jJ3nSta7TnveT/avfJiAV7cRS4oVgyyFyuHO5gkI',
339 ... 'o0obeJaut3enVgpq2TUUk0M4L8TX4jjKvDGAYNyuPNLAsQFHLj5eLmJSudGStWuA',
340 ... 'WjKLqBHD0M8/OcwnrTMleJl+h50ZsHO1tvvkXelH+w/jD5SMS+ktxq2Te8Vj7BmM',
341 ... '0WQn3Ys7ViA5PgcSpbqNNLdgc1EMcpPI/sfJAORPKVWRPBKDXX/irY2onAMSe5gH',
342 ... 'teNX6bZd/gaoLWqD/1ZhsOCnlV7LY1R929TJ9vxnJcfKKAKwBDfAaSbecUUMECVw',
343 ... 's4u3ZT1pmNslBmH6XSy3ifLYWu/2xsJuhPradT88BJOBARMGg81gOE6zxGRrMLJa',
344 ... 'KojFgqaF2y4nlZAyaJ1Ld4qCaoQogaL9qE1BbmgtBehZ2FNQiIBSLC0fUUl8A4Py',
345 ... '4d9ZxUoSp7nZmgTN5pUH1N9DIC4ntp/Rak2WnpS7+dRPlp9A2SF0RkeLY+JD9gNm',
346 ... 'j44zBkI79KlgaE/cMt6xUXAF/1ZR/Hv/6GUazGx0l23CnSGuqzLpex2uKOxfKiJt',
347 ... 'jfgyZRhIdFJnRuEXt8dTTDiiYA==',
349 ... '-----END PGP MESSAGE-----',
351 ... ]).encode('us-ascii')
352 >>> output,verified,signatures = verify_bytes(b)
357 >>> for s in signatures:
359 ... # doctest: +REPORT_UDIFF
360 DECC812C8795ADD60538B0CD171008BA2F73DE2E signature:
370 signature expired: False
374 timestamp: Thu Sep 20 15:29:28 2012
375 expiration timestamp: None
376 wrong key usage: False
377 pka trust: not available
380 validity reason: success
381 public key algorithm: RSA
382 hash algorithm: SHA256
384 input_read,input_write = _os.pipe()
386 message_read,message_write = _os.pipe()
387 output_read = output_write = -1
389 message_read = message_write = -1
390 output_read,output_write = _os.pipe()
391 client = get_client(**kwargs)
396 client.send_fds([input_read])
397 client.make_request(_common.Request('INPUT', 'FD'))
398 _os.close(input_read)
401 client.send_fds([message_read])
402 client.make_request(_common.Request('MESSAGE', 'FD'))
403 _os.close(message_read)
406 client.send_fds([output_write])
407 client.make_request(_common.Request('OUTPUT', 'FD'))
408 _os.close(output_write)
411 _write(input_write, signature)
412 _os.close(input_write)
414 _write(message_write, data)
415 _os.close(message_write)
418 _write(input_write, data)
419 _os.close(input_write)
421 client.make_request(_common.Request('VERIFY'))
425 plain = _read(output_read)
426 rs,result = client.make_request(_common.Request('RESULT'))
427 signatures = list(_signature.verify_result_signatures(result))
429 for signature in signatures:
430 if signature.status != 'success':
432 elif signature.pka_trust != 'good':
436 for fd in [input_read, input_write, message_read, message_write,
437 output_read, output_write]:
440 return (plain, verified, signatures)