Add gpgme-tool.socket-path configuration to smtplib.conf.
[pgp-mime.git] / pgp_mime / crypt.py
1 # Copyright (C) 2012 W. Trevor King <wking@tremily.us>
2 #
3 # This file is part of pgp-mime.
4 #
5 # pgp-mime is free software: you can redistribute it and/or modify it under the
6 # terms of the GNU General Public License as published by the Free Software
7 # Foundation, either version 3 of the License, or (at your option) any later
8 # version.
9 #
10 # pgp-mime is distributed in the hope that it will be useful, but WITHOUT ANY
11 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License along with
15 # pgp-mime.  If not, see <http://www.gnu.org/licenses/>.
16
17 import codecs as _codecs
18 import logging as _logging
19 import os as _os
20 import os.path as _os_path
21
22 from pyassuan import client as _client
23 from pyassuan import common as _common
24
25 from . import LOG as _LOG
26 from . import signature as _signature
27
28
29 SOCKET_PATH = _os_path.expanduser(_os_path.join('~', '.gnupg', 'S.gpgme-tool'))
30
31
32 def get_client_params(config):
33     r"""Retrieve Assuan client paramters from a config file.
34
35     >>> from configparser import ConfigParser
36     >>> config = ConfigParser()
37     >>> config.read_string('\n'.join([
38     ...             '[gpgme-tool]',
39     ...             'socket-path: /tmp/S.gpgme-tool',
40     ...             ]))
41     >>> get_client_params(config)
42     {'socket_path': '/tmp/S.gpgme-tool'}
43     >>> config = ConfigParser()
44     >>> get_smtp_params(ConfigParser())
45     {'socket_path': None}
46     """
47     params = {'socket_path': None}
48     try:
49         params['socket_path'] = config.get('gpgme-tool', 'socket-path')
50     except _configparser.NoSectionError:
51         return params
52     except _configparser.NoOptionError:
53         pass
54     return params
55
56 def get_client(socket_path=None):
57     if socket_path is None:
58         socket_path = socket_path
59     logger = _logging.getLogger('{}.{}'.format(_LOG.name, 'pyassuan'))
60     client = _client.AssuanClient(
61         name='pgp-mime', logger=logger, use_sublogger=False,
62         close_on_disconnect=True)
63     client.connect(socket_path=socket_path)
64     return client
65
66 def disconnect(client):
67     client.make_request(_common.Request('BYE'))
68     client.disconnect()
69
70 def hello(client):
71     responses,data = client.get_responses()  # get initial 'OK' from server
72     client.make_request(_common.Request('ARMOR', 'true'))
73
74 def _read(fd, buffersize=512):
75     d = []
76     while True:
77         try:
78             new = _os.read(fd, buffersize)
79         except Exception as e:
80             _LOG.warn('error while reading: {}'.format(e))
81             break
82         if not new:
83             break
84         d.append(new)
85     return b''.join(d)
86
87 def _write(fd, data):
88     i = 0
89     while i < len(data):
90         i += _os.write(fd, data[i:])
91
92
93 def sign_and_encrypt_bytes(data, signers=None, recipients=None,
94                            always_trust=False, mode='detach',
95                            allow_default_signer=False, **kwargs):
96     r"""Sign ``data`` with ``signers`` and encrypt to ``recipients``.
97
98     Just sign (with a detached signature):
99
100     >>> print(sign_and_encrypt_bytes(
101     ...     bytes(b'Hello'), signers=['pgp-mime@invalid.com']))
102     ... # doctest: +ELLIPSIS
103     b'-----BEGIN PGP SIGNATURE-----\n...-----END PGP SIGNATURE-----\n'
104
105     Just encrypt:
106
107     >>> sign_and_encrypt_bytes(
108     ...     bytes(b'Hello'), recipients=['pgp-mime@invalid.com'],
109     ...     always_trust=True)
110     ... # doctest: +ELLIPSIS
111     b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
112
113     Sign and encrypt:
114
115     >>> sign_and_encrypt_bytes(
116     ...     bytes(b'Hello'), signers=['pgp-mime@invalid.com'],
117     ...     recipients=['pgp-mime@invalid.com'], always_trust=True)
118     ... # doctest: +ELLIPSIS
119     b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
120
121     Sign and encrypt with a specific subkey:
122
123     >>> sign_and_encrypt_bytes(
124     ...     bytes(b'Hello'), signers=['0x2F73DE2E'],
125     ...     recipients=['pgp-mime@invalid.com'], always_trust=True)
126     ... # doctest: +ELLIPSIS
127     b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
128     """
129     input_read,input_write = _os.pipe()
130     output_read,output_write = _os.pipe()
131     client = get_client(**kwargs)
132     try:
133         hello(client)
134         if signers:
135             for signer in signers:
136                 client.make_request(_common.Request('SIGNER', signer))
137         if recipients:
138             for recipient in recipients:
139                 client.make_request(_common.Request('RECIPIENT', recipient))
140         client.send_fds([input_read])
141         client.make_request(_common.Request('INPUT', 'FD'))
142         _os.close(input_read)
143         input_read = -1
144         client.send_fds([output_write])
145         client.make_request(_common.Request('OUTPUT', 'FD'))
146         _os.close(output_write)
147         output_write = -1
148         parameters = []
149         if signers or allow_default_signer:
150             if recipients:
151                 command = 'SIGN_ENCRYPT'
152             else:
153                 command = 'SIGN'
154                 parameters.append('--{}'.format(mode))
155         elif recipients:
156             command = 'ENCRYPT'
157         else:
158             raise ValueError('must specify at least one signer or recipient')
159         if always_trust:
160             parameters.append('--always-trust')
161         _write(input_write, data)
162         _os.close(input_write)
163         input_write = -1
164         client.make_request(
165             _common.Request(command, ' '.join(parameters)))
166         d = _read(output_read)
167     finally:
168         disconnect(client)
169         for fd in [input_read, input_write, output_read, output_write]:
170             if fd >= 0:
171                 _os.close(fd)
172     return d
173
174 def decrypt_bytes(data, **kwargs):
175     r"""Decrypt ``data``.
176
177     >>> b = '\n'.join([
178     ...     '-----BEGIN PGP MESSAGE-----',
179     ...     'Version: GnuPG v2.0.17 (GNU/Linux)',
180     ...     '',
181     ...     'hQEMA1Ea7aZDMrbjAQf/TAqLjksZSJxSqkBxYT5gtLQoXY6isvRZg2apjs7CW0y2',
182     ...     'tFK/ptnVYAq2OtWQFhbiJXj8hmwJyyFfb3lghpeu4ihO52JgkkwOpmJb6dxjOi83',
183     ...     'qDwaGOogEPH38BNLuwdrMCW0jmNROwvS796PtqSGUaJTuIiKUB8lETwPwIHrDc11',
184     ...     'N3RWStE5uShNkXXQXplUoeCKf3N4XguXym+GQCqJQzlEMrkkDdr4l7mzvt3Nf8EA',
185     ...     'SgSak086tUoo9x8IN5PJCuOJkcXcjQzFcpqOsA7dyZKO8NeQUZv2JvlZuorckNvN',
186     ...     'xx3PwW0a8VeJgTQrh64ZK/d3F3gNHUTzXkq/UIn25tJFAcmSUwxtsBal7p8zAeCV',
187     ...     '8zefsHRQ5Y03IBeYBcVJBhDS9XfvwLQTJiGGstPCxzKTwSUT1MzV5t5twG/STDCc',
188     ...     'uxW3wSdo',
189     ...     '=bZI+',
190     ...     '-----END PGP MESSAGE-----',
191     ...     ''
192     ...     ]).encode('us-ascii')
193     >>> decrypt_bytes(b)
194     b'Success!\n'
195     """
196     input_read,input_write = _os.pipe()
197     output_read,output_write = _os.pipe()
198     client = get_client(**kwargs)
199     try:
200         hello(client)
201         client.send_fds([input_read])
202         client.make_request(_common.Request('INPUT', 'FD'))
203         _os.close(input_read)
204         input_read = -1
205         client.send_fds([output_write])
206         client.make_request(_common.Request('OUTPUT', 'FD'))
207         _os.close(output_write)
208         output_write = -1
209         _write(input_write, data)
210         _os.close(input_write)
211         input_write = -1
212         client.make_request(_common.Request('DECRYPT'))
213         d = _read(output_read)
214     finally:
215         disconnect(client)
216         for fd in [input_read, input_write, output_read, output_write]:
217             if fd >= 0:
218                 _os.close(fd)
219     return d
220
221 def verify_bytes(data, signature=None, always_trust=False, **kwargs):
222     r"""Verify a signature on ``data``, possibly decrypting first.
223
224     These tests assume you didn't trust the distributed test key.
225
226     >>> b = '\n'.join([
227     ...     '-----BEGIN PGP MESSAGE-----',
228     ...     'Version: GnuPG v2.0.17 (GNU/Linux)',
229     ...     '',
230     ...     'hQEMA1Ea7aZDMrbjAQf/YM1SeFzNGz0DnUynaEyhfGCvcqmjtbN1PtZMpT7VaQLN',
231     ...     'a+c0faskr79Atz0+2IBR7CDOlcETrRtH2EnrWukbRIDtmffNFGuhMRTNfnQ15OIN',
232     ...     'qrmt2P5gXznsgnm2XjzTK7S/Cc3Aq+zjaDrDt7bIedEdz+EyNgaKuL/lB9cAB8xL',
233     ...     'YYp/yn55Myjair2idgzsa7w/QXdE3RhpyRLqR2Jgz4P1I1xOgUYnylbpIZL9FOKN',
234     ...     'NR3RQhkGdANBku8otfthb5ZUGsNMV45ct4V8PE+xChjFb9gcwpaf1hhoIF/sYHD5',
235     ...     'Bkf+v/J8F40KGYY16b0DjQIUlnra9y7q9jj0h2bvc9LAtgHtVUso133LLcVYl7RP',
236     ...     'Vjyz9Ps366BtIdPlAL4CoF5hEcMKS5J3h1vRlyAKN4uHENl5vKvoxn7ID3JhhWQc',
237     ...     '6QrPGis64zi3OnYor34HPh/KNJvkgOQkekmtYuTxnkiONA4lhMDJgeaVZ9WZq+GV',
238     ...     'MaCvCFGNYU2TV4V8wMlnUbF8d5bDQ83g8MxIVKdDcnBzzYLZha+qmz4Spry9iB53',
239     ...     'Sg/sM5H8gWWSl7Oj1lxVg7o7IscpQfVt6zL6jD2VjL3L3Hu7WEXIrcGZtvrP4d+C',
240     ...     'TGYWiGlh5B2UCFk2bVctfw8W/QfaVvJYD4Rfqta2V2p14KIJLFRSGa1g26W4ixrH',
241     ...     'XKxgaA3AIfJ+6c5RoisRLuYCxvQi91wkE9hAXR+inXK4Hq4SmiHoeITZFhHP3hh3',
242     ...     'rbpp8mopiMNxWqCbuqgILP6pShn4oPclu9aR8uJ1ziDxISTGYC71mvLUERUjFn2L',
243     ...     'fu6C0+TCC9RmeyL+eNdM6cjs1G7YR6yX',
244     ...     '=phHd',
245     ...     '-----END PGP MESSAGE-----',
246     ...     '',
247     ...     ]).encode('us-ascii')
248     >>> output,verified,signatures = verify_bytes(b)
249     >>> output
250     b'Success!\n'
251     >>> verified
252     False
253     >>> for s in signatures:
254     ...     print(s.dumps())
255     ... # doctest: +REPORT_UDIFF
256     B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3 signature:
257       summary:
258         CRL missing: False
259         CRL too old: False
260         bad policy: False
261         green: False
262         key expired: False
263         key missing: False
264         key revoked: False
265         red: False
266         signature expired: False
267         system error: False
268         valid: False
269       status: success
270       timestamp: Wed Mar 21 19:13:57 2012
271       expiration timestamp: None
272       wrong key usage: False
273       pka trust: not available
274       chain model: False
275       validity: unknown
276       validity reason: success
277       public key algorithm: RSA
278       hash algorithm: SHA256
279     >>> b = b'Success!\n'
280     >>> signature = '\n'.join([
281     ...     '-----BEGIN PGP SIGNATURE-----',
282     ...     'Version: GnuPG v2.0.17 (GNU/Linux)',
283     ...     '',
284     ...     'iQEcBAEBAgAGBQJPaiw/AAoJEFEa7aZDMrbj93gH/1fQPXLjUTpONJUTmvGoMLNA',
285     ...     'W9ZhjpUL5i6rRqYGUvQ4kTEDuPMxkMrCyFCDHEhSDHufMek6Nso5/HeJn3aqxlgs',
286     ...     'hmNlvAq4FI6JQyFL7eCp/XG9cPx1p42dTI7JAih8FuK21sS4m/H5XP3R/6KXC99D',
287     ...     '39rrXCvvR+yNgKe2dxuJwmKuLteVlcWxiIQwVrYK70GtJHC5BO79G8yGccWoEy9C',
288     ...     '9JkJiyNptqZyFjGBNmMmrCSFZ7ZFA02RB+laRmwuIiozw4TJYEksxPrgZMbbcFzx',
289     ...     'zs3JHyV23+Fz1ftalvwskHE7tJkX9Ub8iBMNZ/KxJXXdPdpuMdEYVjoUehkQBQE=',
290     ...     '=rRBP',
291     ...     '-----END PGP SIGNATURE-----',
292     ...     '',
293     ...     ]).encode('us-ascii')
294     >>> output,verified,signatures = verify_bytes(b, signature=signature)
295     >>> output
296     b'Success!\n'
297     >>> verified
298     False
299     >>> for s in signatures:
300     ...     print(s.dumps())
301     ... # doctest: +REPORT_UDIFF
302     B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3 signature:
303       summary:
304         CRL missing: False
305         CRL too old: False
306         bad policy: False
307         green: False
308         key expired: False
309         key missing: False
310         key revoked: False
311         red: False
312         signature expired: False
313         system error: False
314         valid: False
315       status: success
316       timestamp: Wed Mar 21 19:30:07 2012
317       expiration timestamp: None
318       wrong key usage: False
319       pka trust: not available
320       chain model: False
321       validity: unknown
322       validity reason: success
323       public key algorithm: RSA
324       hash algorithm: SHA1
325
326     Data signed by a subkey returns the subkey fingerprint.  To find
327     the primary key for a given subkey, use
328     ``pgp_mime.key.lookup_keys()``.
329
330     >>> b = '\n'.join([
331     ...     '-----BEGIN PGP MESSAGE-----',
332     ...     'Version: GnuPG v2.0.19 (GNU/Linux)',
333     ...     '',
334     ...     'hQEMAxcQCLovc94uAQf9ErTZnr0lYRlLLZIk1VcpNNTHrMro+BmqpFC0jprA4/2m',
335     ...     '92klBF4TIS1A9bU5oxzQquaAIDV42P3sXrbxu/YhHLmPGH+dc2JVSfPLL0XOL5GC',
336     ...     'qpQYe5lglRBReFSRktrfhukjHBoXvh3c8T4xYK2r+nIV4gsp+FrSQMIOdhhBoC36',
337     ...     'U1MOk+R+I0JDbWdzZzJONs7ZcAcNDVKqxmAXZUqVgkhPpnGBSBuF9ExKRT3S6e5N',
338     ...     'Rsorb/DjGIUHSZuH2EaWAUz1jJ3nSta7TnveT/avfJiAV7cRS4oVgyyFyuHO5gkI',
339     ...     'o0obeJaut3enVgpq2TUUk0M4L8TX4jjKvDGAYNyuPNLAsQFHLj5eLmJSudGStWuA',
340     ...     'WjKLqBHD0M8/OcwnrTMleJl+h50ZsHO1tvvkXelH+w/jD5SMS+ktxq2Te8Vj7BmM',
341     ...     '0WQn3Ys7ViA5PgcSpbqNNLdgc1EMcpPI/sfJAORPKVWRPBKDXX/irY2onAMSe5gH',
342     ...     'teNX6bZd/gaoLWqD/1ZhsOCnlV7LY1R929TJ9vxnJcfKKAKwBDfAaSbecUUMECVw',
343     ...     's4u3ZT1pmNslBmH6XSy3ifLYWu/2xsJuhPradT88BJOBARMGg81gOE6zxGRrMLJa',
344     ...     'KojFgqaF2y4nlZAyaJ1Ld4qCaoQogaL9qE1BbmgtBehZ2FNQiIBSLC0fUUl8A4Py',
345     ...     '4d9ZxUoSp7nZmgTN5pUH1N9DIC4ntp/Rak2WnpS7+dRPlp9A2SF0RkeLY+JD9gNm',
346     ...     'j44zBkI79KlgaE/cMt6xUXAF/1ZR/Hv/6GUazGx0l23CnSGuqzLpex2uKOxfKiJt',
347     ...     'jfgyZRhIdFJnRuEXt8dTTDiiYA==',
348     ...     '=0o+x',
349     ...     '-----END PGP MESSAGE-----',
350     ...     '',
351     ...     ]).encode('us-ascii')
352     >>> output,verified,signatures = verify_bytes(b)
353     >>> output
354     b'Hello'
355     >>> verified
356     False
357     >>> for s in signatures:
358     ...     print(s.dumps())
359     ... # doctest: +REPORT_UDIFF
360     DECC812C8795ADD60538B0CD171008BA2F73DE2E signature:
361       summary:
362         CRL missing: False
363         CRL too old: False
364         bad policy: False
365         green: False
366         key expired: False
367         key missing: False
368         key revoked: False
369         red: False
370         signature expired: False
371         system error: False
372         valid: False
373       status: success
374       timestamp: Thu Sep 20 15:29:28 2012
375       expiration timestamp: None
376       wrong key usage: False
377       pka trust: not available
378       chain model: False
379       validity: unknown
380       validity reason: success
381       public key algorithm: RSA
382       hash algorithm: SHA256
383     """
384     input_read,input_write = _os.pipe()
385     if signature:
386         message_read,message_write = _os.pipe()
387         output_read = output_write = -1
388     else:
389         message_read = message_write = -1
390         output_read,output_write = _os.pipe()
391     client = get_client(**kwargs)
392     verified = None
393     signatures = []
394     try:
395         hello(client)
396         client.send_fds([input_read])
397         client.make_request(_common.Request('INPUT', 'FD'))
398         _os.close(input_read)
399         input_read = -1
400         if signature:
401             client.send_fds([message_read])
402             client.make_request(_common.Request('MESSAGE', 'FD'))
403             _os.close(message_read)
404             message_read = -1
405         else:
406             client.send_fds([output_write])
407             client.make_request(_common.Request('OUTPUT', 'FD'))
408             _os.close(output_write)
409             output_write = -1
410         if signature:
411             _write(input_write, signature)
412             _os.close(input_write)
413             input_write = -1
414             _write(message_write, data)
415             _os.close(message_write)
416             message_write = -1
417         else:
418             _write(input_write, data)
419             _os.close(input_write)
420             input_write = -1
421         client.make_request(_common.Request('VERIFY'))
422         if signature:
423             plain = data
424         else:
425             plain = _read(output_read)
426         rs,result = client.make_request(_common.Request('RESULT'))
427         signatures = list(_signature.verify_result_signatures(result))
428         verified = True
429         for signature in signatures:
430             if signature.status != 'success':
431                 verified = False
432             elif signature.pka_trust != 'good':
433                 verified = False
434     finally:
435         disconnect(client)
436         for fd in [input_read, input_write, message_read, message_write,
437                    output_read, output_write]:
438             if fd >= 0:
439                 _os.close(fd)
440     return (plain, verified, signatures)