1 # Copyright (C) 2012 W. Trevor King <wking@tremily.us>
3 # This file is part of pgp-mime.
5 # pgp-mime is free software: you can redistribute it and/or modify it under the
6 # terms of the GNU General Public License as published by the Free Software
7 # Foundation, either version 3 of the License, or (at your option) any later
10 # pgp-mime is distributed in the hope that it will be useful, but WITHOUT ANY
11 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12 # A PARTICULAR PURPOSE. See the GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License along with
15 # pgp-mime. If not, see <http://www.gnu.org/licenses/>.
17 import codecs as _codecs
18 import configparser as _configparser
19 import logging as _logging
21 import os.path as _os_path
23 from pyassuan import client as _client
24 from pyassuan import common as _common
26 from . import LOG as _LOG
27 from . import signature as _signature
30 SOCKET_PATH = _os_path.expanduser(_os_path.join('~', '.gnupg', 'S.gpgme-tool'))
33 def get_client_params(config):
34 r"""Retrieve Assuan client paramters from a config file.
36 >>> from configparser import ConfigParser
37 >>> config = ConfigParser()
38 >>> config.read_string('\n'.join([
40 ... 'socket-path: /tmp/S.gpgme-tool',
42 >>> get_client_params(config)
43 {'socket_path': '/tmp/S.gpgme-tool'}
44 >>> config = ConfigParser()
45 >>> get_client_params(ConfigParser())
48 params = {'socket_path': None}
50 params['socket_path'] = config.get('gpgme-tool', 'socket-path')
51 except _configparser.NoSectionError:
53 except _configparser.NoOptionError:
57 def get_client(socket_path=None):
58 if socket_path is None:
59 socket_path = SOCKET_PATH
60 logger = _logging.getLogger('{}.{}'.format(_LOG.name, 'pyassuan'))
61 client = _client.AssuanClient(
62 name='pgp-mime', logger=logger, use_sublogger=False,
63 close_on_disconnect=True)
64 client.connect(socket_path=socket_path)
67 def disconnect(client):
68 client.make_request(_common.Request('BYE'))
72 responses,data = client.get_responses() # get initial 'OK' from server
73 client.make_request(_common.Request('ARMOR', 'true'))
75 def _read(fd, buffersize=512):
79 new = _os.read(fd, buffersize)
80 except Exception as e:
81 _LOG.warn('error while reading: {}'.format(e))
91 i += _os.write(fd, data[i:])
94 def sign_and_encrypt_bytes(data, signers=None, recipients=None,
95 always_trust=False, mode='detach',
96 allow_default_signer=False, **kwargs):
97 r"""Sign ``data`` with ``signers`` and encrypt to ``recipients``.
99 Just sign (with a detached signature):
101 >>> print(sign_and_encrypt_bytes(
102 ... bytes(b'Hello'), signers=['pgp-mime@invalid.com']))
103 ... # doctest: +ELLIPSIS
104 b'-----BEGIN PGP SIGNATURE-----\n...-----END PGP SIGNATURE-----\n'
108 >>> sign_and_encrypt_bytes(
109 ... bytes(b'Hello'), recipients=['pgp-mime@invalid.com'],
110 ... always_trust=True)
111 ... # doctest: +ELLIPSIS
112 b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
116 >>> sign_and_encrypt_bytes(
117 ... bytes(b'Hello'), signers=['pgp-mime@invalid.com'],
118 ... recipients=['pgp-mime@invalid.com'], always_trust=True)
119 ... # doctest: +ELLIPSIS
120 b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
122 Sign and encrypt with a specific subkey:
124 >>> sign_and_encrypt_bytes(
125 ... bytes(b'Hello'), signers=['0x2F73DE2E'],
126 ... recipients=['pgp-mime@invalid.com'], always_trust=True)
127 ... # doctest: +ELLIPSIS
128 b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
130 input_read,input_write = _os.pipe()
131 output_read,output_write = _os.pipe()
132 client = get_client(**kwargs)
136 for signer in signers:
137 client.make_request(_common.Request('SIGNER', signer))
139 for recipient in recipients:
140 client.make_request(_common.Request('RECIPIENT', recipient))
141 client.send_fds([input_read])
142 client.make_request(_common.Request('INPUT', 'FD'))
143 _os.close(input_read)
145 client.send_fds([output_write])
146 client.make_request(_common.Request('OUTPUT', 'FD'))
147 _os.close(output_write)
150 if signers or allow_default_signer:
152 command = 'SIGN_ENCRYPT'
155 parameters.append('--{}'.format(mode))
159 raise ValueError('must specify at least one signer or recipient')
161 parameters.append('--always-trust')
162 _write(input_write, data)
163 _os.close(input_write)
166 _common.Request(command, ' '.join(parameters)))
167 d = _read(output_read)
170 for fd in [input_read, input_write, output_read, output_write]:
175 def decrypt_bytes(data, **kwargs):
176 r"""Decrypt ``data``.
179 ... '-----BEGIN PGP MESSAGE-----',
180 ... 'Version: GnuPG v2.0.17 (GNU/Linux)',
182 ... 'hQEMA1Ea7aZDMrbjAQf/TAqLjksZSJxSqkBxYT5gtLQoXY6isvRZg2apjs7CW0y2',
183 ... 'tFK/ptnVYAq2OtWQFhbiJXj8hmwJyyFfb3lghpeu4ihO52JgkkwOpmJb6dxjOi83',
184 ... 'qDwaGOogEPH38BNLuwdrMCW0jmNROwvS796PtqSGUaJTuIiKUB8lETwPwIHrDc11',
185 ... 'N3RWStE5uShNkXXQXplUoeCKf3N4XguXym+GQCqJQzlEMrkkDdr4l7mzvt3Nf8EA',
186 ... 'SgSak086tUoo9x8IN5PJCuOJkcXcjQzFcpqOsA7dyZKO8NeQUZv2JvlZuorckNvN',
187 ... 'xx3PwW0a8VeJgTQrh64ZK/d3F3gNHUTzXkq/UIn25tJFAcmSUwxtsBal7p8zAeCV',
188 ... '8zefsHRQ5Y03IBeYBcVJBhDS9XfvwLQTJiGGstPCxzKTwSUT1MzV5t5twG/STDCc',
191 ... '-----END PGP MESSAGE-----',
193 ... ]).encode('us-ascii')
197 input_read,input_write = _os.pipe()
198 output_read,output_write = _os.pipe()
199 client = get_client(**kwargs)
202 client.send_fds([input_read])
203 client.make_request(_common.Request('INPUT', 'FD'))
204 _os.close(input_read)
206 client.send_fds([output_write])
207 client.make_request(_common.Request('OUTPUT', 'FD'))
208 _os.close(output_write)
210 _write(input_write, data)
211 _os.close(input_write)
213 client.make_request(_common.Request('DECRYPT'))
214 d = _read(output_read)
217 for fd in [input_read, input_write, output_read, output_write]:
222 def verify_bytes(data, signature=None, always_trust=False, **kwargs):
223 r"""Verify a signature on ``data``, possibly decrypting first.
225 These tests assume you didn't trust the distributed test key.
228 ... '-----BEGIN PGP MESSAGE-----',
229 ... 'Version: GnuPG v2.0.17 (GNU/Linux)',
231 ... 'hQEMA1Ea7aZDMrbjAQf/YM1SeFzNGz0DnUynaEyhfGCvcqmjtbN1PtZMpT7VaQLN',
232 ... 'a+c0faskr79Atz0+2IBR7CDOlcETrRtH2EnrWukbRIDtmffNFGuhMRTNfnQ15OIN',
233 ... 'qrmt2P5gXznsgnm2XjzTK7S/Cc3Aq+zjaDrDt7bIedEdz+EyNgaKuL/lB9cAB8xL',
234 ... 'YYp/yn55Myjair2idgzsa7w/QXdE3RhpyRLqR2Jgz4P1I1xOgUYnylbpIZL9FOKN',
235 ... 'NR3RQhkGdANBku8otfthb5ZUGsNMV45ct4V8PE+xChjFb9gcwpaf1hhoIF/sYHD5',
236 ... 'Bkf+v/J8F40KGYY16b0DjQIUlnra9y7q9jj0h2bvc9LAtgHtVUso133LLcVYl7RP',
237 ... 'Vjyz9Ps366BtIdPlAL4CoF5hEcMKS5J3h1vRlyAKN4uHENl5vKvoxn7ID3JhhWQc',
238 ... '6QrPGis64zi3OnYor34HPh/KNJvkgOQkekmtYuTxnkiONA4lhMDJgeaVZ9WZq+GV',
239 ... 'MaCvCFGNYU2TV4V8wMlnUbF8d5bDQ83g8MxIVKdDcnBzzYLZha+qmz4Spry9iB53',
240 ... 'Sg/sM5H8gWWSl7Oj1lxVg7o7IscpQfVt6zL6jD2VjL3L3Hu7WEXIrcGZtvrP4d+C',
241 ... 'TGYWiGlh5B2UCFk2bVctfw8W/QfaVvJYD4Rfqta2V2p14KIJLFRSGa1g26W4ixrH',
242 ... 'XKxgaA3AIfJ+6c5RoisRLuYCxvQi91wkE9hAXR+inXK4Hq4SmiHoeITZFhHP3hh3',
243 ... 'rbpp8mopiMNxWqCbuqgILP6pShn4oPclu9aR8uJ1ziDxISTGYC71mvLUERUjFn2L',
244 ... 'fu6C0+TCC9RmeyL+eNdM6cjs1G7YR6yX',
246 ... '-----END PGP MESSAGE-----',
248 ... ]).encode('us-ascii')
249 >>> output,verified,signatures = verify_bytes(b)
254 >>> for s in signatures:
256 ... # doctest: +REPORT_UDIFF
257 B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3 signature:
267 signature expired: False
271 timestamp: Wed Mar 21 19:13:57 2012
272 expiration timestamp: None
273 wrong key usage: False
274 pka trust: not available
277 validity reason: success
278 public key algorithm: RSA
279 hash algorithm: SHA256
280 >>> b = b'Success!\n'
281 >>> signature = '\n'.join([
282 ... '-----BEGIN PGP SIGNATURE-----',
283 ... 'Version: GnuPG v2.0.17 (GNU/Linux)',
285 ... 'iQEcBAEBAgAGBQJPaiw/AAoJEFEa7aZDMrbj93gH/1fQPXLjUTpONJUTmvGoMLNA',
286 ... 'W9ZhjpUL5i6rRqYGUvQ4kTEDuPMxkMrCyFCDHEhSDHufMek6Nso5/HeJn3aqxlgs',
287 ... 'hmNlvAq4FI6JQyFL7eCp/XG9cPx1p42dTI7JAih8FuK21sS4m/H5XP3R/6KXC99D',
288 ... '39rrXCvvR+yNgKe2dxuJwmKuLteVlcWxiIQwVrYK70GtJHC5BO79G8yGccWoEy9C',
289 ... '9JkJiyNptqZyFjGBNmMmrCSFZ7ZFA02RB+laRmwuIiozw4TJYEksxPrgZMbbcFzx',
290 ... 'zs3JHyV23+Fz1ftalvwskHE7tJkX9Ub8iBMNZ/KxJXXdPdpuMdEYVjoUehkQBQE=',
292 ... '-----END PGP SIGNATURE-----',
294 ... ]).encode('us-ascii')
295 >>> output,verified,signatures = verify_bytes(b, signature=signature)
300 >>> for s in signatures:
302 ... # doctest: +REPORT_UDIFF
303 B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3 signature:
313 signature expired: False
317 timestamp: Wed Mar 21 19:30:07 2012
318 expiration timestamp: None
319 wrong key usage: False
320 pka trust: not available
323 validity reason: success
324 public key algorithm: RSA
327 Data signed by a subkey returns the subkey fingerprint. To find
328 the primary key for a given subkey, use
329 ``pgp_mime.key.lookup_keys()``.
332 ... '-----BEGIN PGP MESSAGE-----',
333 ... 'Version: GnuPG v2.0.19 (GNU/Linux)',
335 ... 'hQEMAxcQCLovc94uAQf9ErTZnr0lYRlLLZIk1VcpNNTHrMro+BmqpFC0jprA4/2m',
336 ... '92klBF4TIS1A9bU5oxzQquaAIDV42P3sXrbxu/YhHLmPGH+dc2JVSfPLL0XOL5GC',
337 ... 'qpQYe5lglRBReFSRktrfhukjHBoXvh3c8T4xYK2r+nIV4gsp+FrSQMIOdhhBoC36',
338 ... 'U1MOk+R+I0JDbWdzZzJONs7ZcAcNDVKqxmAXZUqVgkhPpnGBSBuF9ExKRT3S6e5N',
339 ... 'Rsorb/DjGIUHSZuH2EaWAUz1jJ3nSta7TnveT/avfJiAV7cRS4oVgyyFyuHO5gkI',
340 ... 'o0obeJaut3enVgpq2TUUk0M4L8TX4jjKvDGAYNyuPNLAsQFHLj5eLmJSudGStWuA',
341 ... 'WjKLqBHD0M8/OcwnrTMleJl+h50ZsHO1tvvkXelH+w/jD5SMS+ktxq2Te8Vj7BmM',
342 ... '0WQn3Ys7ViA5PgcSpbqNNLdgc1EMcpPI/sfJAORPKVWRPBKDXX/irY2onAMSe5gH',
343 ... 'teNX6bZd/gaoLWqD/1ZhsOCnlV7LY1R929TJ9vxnJcfKKAKwBDfAaSbecUUMECVw',
344 ... 's4u3ZT1pmNslBmH6XSy3ifLYWu/2xsJuhPradT88BJOBARMGg81gOE6zxGRrMLJa',
345 ... 'KojFgqaF2y4nlZAyaJ1Ld4qCaoQogaL9qE1BbmgtBehZ2FNQiIBSLC0fUUl8A4Py',
346 ... '4d9ZxUoSp7nZmgTN5pUH1N9DIC4ntp/Rak2WnpS7+dRPlp9A2SF0RkeLY+JD9gNm',
347 ... 'j44zBkI79KlgaE/cMt6xUXAF/1ZR/Hv/6GUazGx0l23CnSGuqzLpex2uKOxfKiJt',
348 ... 'jfgyZRhIdFJnRuEXt8dTTDiiYA==',
350 ... '-----END PGP MESSAGE-----',
352 ... ]).encode('us-ascii')
353 >>> output,verified,signatures = verify_bytes(b)
358 >>> for s in signatures:
360 ... # doctest: +REPORT_UDIFF
361 DECC812C8795ADD60538B0CD171008BA2F73DE2E signature:
371 signature expired: False
375 timestamp: Thu Sep 20 15:29:28 2012
376 expiration timestamp: None
377 wrong key usage: False
378 pka trust: not available
381 validity reason: success
382 public key algorithm: RSA
383 hash algorithm: SHA256
385 input_read,input_write = _os.pipe()
387 message_read,message_write = _os.pipe()
388 output_read = output_write = -1
390 message_read = message_write = -1
391 output_read,output_write = _os.pipe()
392 client = get_client(**kwargs)
397 client.send_fds([input_read])
398 client.make_request(_common.Request('INPUT', 'FD'))
399 _os.close(input_read)
402 client.send_fds([message_read])
403 client.make_request(_common.Request('MESSAGE', 'FD'))
404 _os.close(message_read)
407 client.send_fds([output_write])
408 client.make_request(_common.Request('OUTPUT', 'FD'))
409 _os.close(output_write)
412 _write(input_write, signature)
413 _os.close(input_write)
415 _write(message_write, data)
416 _os.close(message_write)
419 _write(input_write, data)
420 _os.close(input_write)
422 client.make_request(_common.Request('VERIFY'))
426 plain = _read(output_read)
427 rs,result = client.make_request(_common.Request('RESULT'))
428 signatures = list(_signature.verify_result_signatures(result))
430 for signature in signatures:
431 if signature.status != 'success':
433 elif signature.pka_trust != 'good':
437 for fd in [input_read, input_write, message_read, message_write,
438 output_read, output_write]:
441 return (plain, verified, signatures)