4987abf6e3897809bc61eed47174c7195bac0ce3
[pgp-mime.git] / pgp_mime / crypt.py
1 # Copyright (C) 2012 W. Trevor King <wking@tremily.us>
2 #
3 # This file is part of pgp-mime.
4 #
5 # pgp-mime is free software: you can redistribute it and/or modify it under the
6 # terms of the GNU General Public License as published by the Free Software
7 # Foundation, either version 3 of the License, or (at your option) any later
8 # version.
9 #
10 # pgp-mime is distributed in the hope that it will be useful, but WITHOUT ANY
11 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License along with
15 # pgp-mime.  If not, see <http://www.gnu.org/licenses/>.
16
17 import codecs as _codecs
18 import configparser as _configparser
19 import logging as _logging
20 import os as _os
21 import os.path as _os_path
22
23 from pyassuan import client as _client
24 from pyassuan import common as _common
25
26 from . import LOG as _LOG
27 from . import signature as _signature
28
29
30 SOCKET_PATH = _os_path.expanduser(_os_path.join('~', '.gnupg', 'S.gpgme-tool'))
31
32
33 def get_client_params(config):
34     r"""Retrieve Assuan client paramters from a config file.
35
36     >>> from configparser import ConfigParser
37     >>> config = ConfigParser()
38     >>> config.read_string('\n'.join([
39     ...             '[gpgme-tool]',
40     ...             'socket-path: /tmp/S.gpgme-tool',
41     ...             ]))
42     >>> get_client_params(config)
43     {'socket_path': '/tmp/S.gpgme-tool'}
44     >>> config = ConfigParser()
45     >>> get_smtp_params(ConfigParser())
46     {'socket_path': None}
47     """
48     params = {'socket_path': None}
49     try:
50         params['socket_path'] = config.get('gpgme-tool', 'socket-path')
51     except _configparser.NoSectionError:
52         return params
53     except _configparser.NoOptionError:
54         pass
55     return params
56
57 def get_client(socket_path=None):
58     if socket_path is None:
59         socket_path = SOCKET_PATH
60     logger = _logging.getLogger('{}.{}'.format(_LOG.name, 'pyassuan'))
61     client = _client.AssuanClient(
62         name='pgp-mime', logger=logger, use_sublogger=False,
63         close_on_disconnect=True)
64     client.connect(socket_path=socket_path)
65     return client
66
67 def disconnect(client):
68     client.make_request(_common.Request('BYE'))
69     client.disconnect()
70
71 def hello(client):
72     responses,data = client.get_responses()  # get initial 'OK' from server
73     client.make_request(_common.Request('ARMOR', 'true'))
74
75 def _read(fd, buffersize=512):
76     d = []
77     while True:
78         try:
79             new = _os.read(fd, buffersize)
80         except Exception as e:
81             _LOG.warn('error while reading: {}'.format(e))
82             break
83         if not new:
84             break
85         d.append(new)
86     return b''.join(d)
87
88 def _write(fd, data):
89     i = 0
90     while i < len(data):
91         i += _os.write(fd, data[i:])
92
93
94 def sign_and_encrypt_bytes(data, signers=None, recipients=None,
95                            always_trust=False, mode='detach',
96                            allow_default_signer=False, **kwargs):
97     r"""Sign ``data`` with ``signers`` and encrypt to ``recipients``.
98
99     Just sign (with a detached signature):
100
101     >>> print(sign_and_encrypt_bytes(
102     ...     bytes(b'Hello'), signers=['pgp-mime@invalid.com']))
103     ... # doctest: +ELLIPSIS
104     b'-----BEGIN PGP SIGNATURE-----\n...-----END PGP SIGNATURE-----\n'
105
106     Just encrypt:
107
108     >>> sign_and_encrypt_bytes(
109     ...     bytes(b'Hello'), recipients=['pgp-mime@invalid.com'],
110     ...     always_trust=True)
111     ... # doctest: +ELLIPSIS
112     b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
113
114     Sign and encrypt:
115
116     >>> sign_and_encrypt_bytes(
117     ...     bytes(b'Hello'), signers=['pgp-mime@invalid.com'],
118     ...     recipients=['pgp-mime@invalid.com'], always_trust=True)
119     ... # doctest: +ELLIPSIS
120     b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
121
122     Sign and encrypt with a specific subkey:
123
124     >>> sign_and_encrypt_bytes(
125     ...     bytes(b'Hello'), signers=['0x2F73DE2E'],
126     ...     recipients=['pgp-mime@invalid.com'], always_trust=True)
127     ... # doctest: +ELLIPSIS
128     b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
129     """
130     input_read,input_write = _os.pipe()
131     output_read,output_write = _os.pipe()
132     client = get_client(**kwargs)
133     try:
134         hello(client)
135         if signers:
136             for signer in signers:
137                 client.make_request(_common.Request('SIGNER', signer))
138         if recipients:
139             for recipient in recipients:
140                 client.make_request(_common.Request('RECIPIENT', recipient))
141         client.send_fds([input_read])
142         client.make_request(_common.Request('INPUT', 'FD'))
143         _os.close(input_read)
144         input_read = -1
145         client.send_fds([output_write])
146         client.make_request(_common.Request('OUTPUT', 'FD'))
147         _os.close(output_write)
148         output_write = -1
149         parameters = []
150         if signers or allow_default_signer:
151             if recipients:
152                 command = 'SIGN_ENCRYPT'
153             else:
154                 command = 'SIGN'
155                 parameters.append('--{}'.format(mode))
156         elif recipients:
157             command = 'ENCRYPT'
158         else:
159             raise ValueError('must specify at least one signer or recipient')
160         if always_trust:
161             parameters.append('--always-trust')
162         _write(input_write, data)
163         _os.close(input_write)
164         input_write = -1
165         client.make_request(
166             _common.Request(command, ' '.join(parameters)))
167         d = _read(output_read)
168     finally:
169         disconnect(client)
170         for fd in [input_read, input_write, output_read, output_write]:
171             if fd >= 0:
172                 _os.close(fd)
173     return d
174
175 def decrypt_bytes(data, **kwargs):
176     r"""Decrypt ``data``.
177
178     >>> b = '\n'.join([
179     ...     '-----BEGIN PGP MESSAGE-----',
180     ...     'Version: GnuPG v2.0.17 (GNU/Linux)',
181     ...     '',
182     ...     'hQEMA1Ea7aZDMrbjAQf/TAqLjksZSJxSqkBxYT5gtLQoXY6isvRZg2apjs7CW0y2',
183     ...     'tFK/ptnVYAq2OtWQFhbiJXj8hmwJyyFfb3lghpeu4ihO52JgkkwOpmJb6dxjOi83',
184     ...     'qDwaGOogEPH38BNLuwdrMCW0jmNROwvS796PtqSGUaJTuIiKUB8lETwPwIHrDc11',
185     ...     'N3RWStE5uShNkXXQXplUoeCKf3N4XguXym+GQCqJQzlEMrkkDdr4l7mzvt3Nf8EA',
186     ...     'SgSak086tUoo9x8IN5PJCuOJkcXcjQzFcpqOsA7dyZKO8NeQUZv2JvlZuorckNvN',
187     ...     'xx3PwW0a8VeJgTQrh64ZK/d3F3gNHUTzXkq/UIn25tJFAcmSUwxtsBal7p8zAeCV',
188     ...     '8zefsHRQ5Y03IBeYBcVJBhDS9XfvwLQTJiGGstPCxzKTwSUT1MzV5t5twG/STDCc',
189     ...     'uxW3wSdo',
190     ...     '=bZI+',
191     ...     '-----END PGP MESSAGE-----',
192     ...     ''
193     ...     ]).encode('us-ascii')
194     >>> decrypt_bytes(b)
195     b'Success!\n'
196     """
197     input_read,input_write = _os.pipe()
198     output_read,output_write = _os.pipe()
199     client = get_client(**kwargs)
200     try:
201         hello(client)
202         client.send_fds([input_read])
203         client.make_request(_common.Request('INPUT', 'FD'))
204         _os.close(input_read)
205         input_read = -1
206         client.send_fds([output_write])
207         client.make_request(_common.Request('OUTPUT', 'FD'))
208         _os.close(output_write)
209         output_write = -1
210         _write(input_write, data)
211         _os.close(input_write)
212         input_write = -1
213         client.make_request(_common.Request('DECRYPT'))
214         d = _read(output_read)
215     finally:
216         disconnect(client)
217         for fd in [input_read, input_write, output_read, output_write]:
218             if fd >= 0:
219                 _os.close(fd)
220     return d
221
222 def verify_bytes(data, signature=None, always_trust=False, **kwargs):
223     r"""Verify a signature on ``data``, possibly decrypting first.
224
225     These tests assume you didn't trust the distributed test key.
226
227     >>> b = '\n'.join([
228     ...     '-----BEGIN PGP MESSAGE-----',
229     ...     'Version: GnuPG v2.0.17 (GNU/Linux)',
230     ...     '',
231     ...     'hQEMA1Ea7aZDMrbjAQf/YM1SeFzNGz0DnUynaEyhfGCvcqmjtbN1PtZMpT7VaQLN',
232     ...     'a+c0faskr79Atz0+2IBR7CDOlcETrRtH2EnrWukbRIDtmffNFGuhMRTNfnQ15OIN',
233     ...     'qrmt2P5gXznsgnm2XjzTK7S/Cc3Aq+zjaDrDt7bIedEdz+EyNgaKuL/lB9cAB8xL',
234     ...     'YYp/yn55Myjair2idgzsa7w/QXdE3RhpyRLqR2Jgz4P1I1xOgUYnylbpIZL9FOKN',
235     ...     'NR3RQhkGdANBku8otfthb5ZUGsNMV45ct4V8PE+xChjFb9gcwpaf1hhoIF/sYHD5',
236     ...     'Bkf+v/J8F40KGYY16b0DjQIUlnra9y7q9jj0h2bvc9LAtgHtVUso133LLcVYl7RP',
237     ...     'Vjyz9Ps366BtIdPlAL4CoF5hEcMKS5J3h1vRlyAKN4uHENl5vKvoxn7ID3JhhWQc',
238     ...     '6QrPGis64zi3OnYor34HPh/KNJvkgOQkekmtYuTxnkiONA4lhMDJgeaVZ9WZq+GV',
239     ...     'MaCvCFGNYU2TV4V8wMlnUbF8d5bDQ83g8MxIVKdDcnBzzYLZha+qmz4Spry9iB53',
240     ...     'Sg/sM5H8gWWSl7Oj1lxVg7o7IscpQfVt6zL6jD2VjL3L3Hu7WEXIrcGZtvrP4d+C',
241     ...     'TGYWiGlh5B2UCFk2bVctfw8W/QfaVvJYD4Rfqta2V2p14KIJLFRSGa1g26W4ixrH',
242     ...     'XKxgaA3AIfJ+6c5RoisRLuYCxvQi91wkE9hAXR+inXK4Hq4SmiHoeITZFhHP3hh3',
243     ...     'rbpp8mopiMNxWqCbuqgILP6pShn4oPclu9aR8uJ1ziDxISTGYC71mvLUERUjFn2L',
244     ...     'fu6C0+TCC9RmeyL+eNdM6cjs1G7YR6yX',
245     ...     '=phHd',
246     ...     '-----END PGP MESSAGE-----',
247     ...     '',
248     ...     ]).encode('us-ascii')
249     >>> output,verified,signatures = verify_bytes(b)
250     >>> output
251     b'Success!\n'
252     >>> verified
253     False
254     >>> for s in signatures:
255     ...     print(s.dumps())
256     ... # doctest: +REPORT_UDIFF
257     B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3 signature:
258       summary:
259         CRL missing: False
260         CRL too old: False
261         bad policy: False
262         green: False
263         key expired: False
264         key missing: False
265         key revoked: False
266         red: False
267         signature expired: False
268         system error: False
269         valid: False
270       status: success
271       timestamp: Wed Mar 21 19:13:57 2012
272       expiration timestamp: None
273       wrong key usage: False
274       pka trust: not available
275       chain model: False
276       validity: unknown
277       validity reason: success
278       public key algorithm: RSA
279       hash algorithm: SHA256
280     >>> b = b'Success!\n'
281     >>> signature = '\n'.join([
282     ...     '-----BEGIN PGP SIGNATURE-----',
283     ...     'Version: GnuPG v2.0.17 (GNU/Linux)',
284     ...     '',
285     ...     'iQEcBAEBAgAGBQJPaiw/AAoJEFEa7aZDMrbj93gH/1fQPXLjUTpONJUTmvGoMLNA',
286     ...     'W9ZhjpUL5i6rRqYGUvQ4kTEDuPMxkMrCyFCDHEhSDHufMek6Nso5/HeJn3aqxlgs',
287     ...     'hmNlvAq4FI6JQyFL7eCp/XG9cPx1p42dTI7JAih8FuK21sS4m/H5XP3R/6KXC99D',
288     ...     '39rrXCvvR+yNgKe2dxuJwmKuLteVlcWxiIQwVrYK70GtJHC5BO79G8yGccWoEy9C',
289     ...     '9JkJiyNptqZyFjGBNmMmrCSFZ7ZFA02RB+laRmwuIiozw4TJYEksxPrgZMbbcFzx',
290     ...     'zs3JHyV23+Fz1ftalvwskHE7tJkX9Ub8iBMNZ/KxJXXdPdpuMdEYVjoUehkQBQE=',
291     ...     '=rRBP',
292     ...     '-----END PGP SIGNATURE-----',
293     ...     '',
294     ...     ]).encode('us-ascii')
295     >>> output,verified,signatures = verify_bytes(b, signature=signature)
296     >>> output
297     b'Success!\n'
298     >>> verified
299     False
300     >>> for s in signatures:
301     ...     print(s.dumps())
302     ... # doctest: +REPORT_UDIFF
303     B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3 signature:
304       summary:
305         CRL missing: False
306         CRL too old: False
307         bad policy: False
308         green: False
309         key expired: False
310         key missing: False
311         key revoked: False
312         red: False
313         signature expired: False
314         system error: False
315         valid: False
316       status: success
317       timestamp: Wed Mar 21 19:30:07 2012
318       expiration timestamp: None
319       wrong key usage: False
320       pka trust: not available
321       chain model: False
322       validity: unknown
323       validity reason: success
324       public key algorithm: RSA
325       hash algorithm: SHA1
326
327     Data signed by a subkey returns the subkey fingerprint.  To find
328     the primary key for a given subkey, use
329     ``pgp_mime.key.lookup_keys()``.
330
331     >>> b = '\n'.join([
332     ...     '-----BEGIN PGP MESSAGE-----',
333     ...     'Version: GnuPG v2.0.19 (GNU/Linux)',
334     ...     '',
335     ...     'hQEMAxcQCLovc94uAQf9ErTZnr0lYRlLLZIk1VcpNNTHrMro+BmqpFC0jprA4/2m',
336     ...     '92klBF4TIS1A9bU5oxzQquaAIDV42P3sXrbxu/YhHLmPGH+dc2JVSfPLL0XOL5GC',
337     ...     'qpQYe5lglRBReFSRktrfhukjHBoXvh3c8T4xYK2r+nIV4gsp+FrSQMIOdhhBoC36',
338     ...     'U1MOk+R+I0JDbWdzZzJONs7ZcAcNDVKqxmAXZUqVgkhPpnGBSBuF9ExKRT3S6e5N',
339     ...     'Rsorb/DjGIUHSZuH2EaWAUz1jJ3nSta7TnveT/avfJiAV7cRS4oVgyyFyuHO5gkI',
340     ...     'o0obeJaut3enVgpq2TUUk0M4L8TX4jjKvDGAYNyuPNLAsQFHLj5eLmJSudGStWuA',
341     ...     'WjKLqBHD0M8/OcwnrTMleJl+h50ZsHO1tvvkXelH+w/jD5SMS+ktxq2Te8Vj7BmM',
342     ...     '0WQn3Ys7ViA5PgcSpbqNNLdgc1EMcpPI/sfJAORPKVWRPBKDXX/irY2onAMSe5gH',
343     ...     'teNX6bZd/gaoLWqD/1ZhsOCnlV7LY1R929TJ9vxnJcfKKAKwBDfAaSbecUUMECVw',
344     ...     's4u3ZT1pmNslBmH6XSy3ifLYWu/2xsJuhPradT88BJOBARMGg81gOE6zxGRrMLJa',
345     ...     'KojFgqaF2y4nlZAyaJ1Ld4qCaoQogaL9qE1BbmgtBehZ2FNQiIBSLC0fUUl8A4Py',
346     ...     '4d9ZxUoSp7nZmgTN5pUH1N9DIC4ntp/Rak2WnpS7+dRPlp9A2SF0RkeLY+JD9gNm',
347     ...     'j44zBkI79KlgaE/cMt6xUXAF/1ZR/Hv/6GUazGx0l23CnSGuqzLpex2uKOxfKiJt',
348     ...     'jfgyZRhIdFJnRuEXt8dTTDiiYA==',
349     ...     '=0o+x',
350     ...     '-----END PGP MESSAGE-----',
351     ...     '',
352     ...     ]).encode('us-ascii')
353     >>> output,verified,signatures = verify_bytes(b)
354     >>> output
355     b'Hello'
356     >>> verified
357     False
358     >>> for s in signatures:
359     ...     print(s.dumps())
360     ... # doctest: +REPORT_UDIFF
361     DECC812C8795ADD60538B0CD171008BA2F73DE2E signature:
362       summary:
363         CRL missing: False
364         CRL too old: False
365         bad policy: False
366         green: False
367         key expired: False
368         key missing: False
369         key revoked: False
370         red: False
371         signature expired: False
372         system error: False
373         valid: False
374       status: success
375       timestamp: Thu Sep 20 15:29:28 2012
376       expiration timestamp: None
377       wrong key usage: False
378       pka trust: not available
379       chain model: False
380       validity: unknown
381       validity reason: success
382       public key algorithm: RSA
383       hash algorithm: SHA256
384     """
385     input_read,input_write = _os.pipe()
386     if signature:
387         message_read,message_write = _os.pipe()
388         output_read = output_write = -1
389     else:
390         message_read = message_write = -1
391         output_read,output_write = _os.pipe()
392     client = get_client(**kwargs)
393     verified = None
394     signatures = []
395     try:
396         hello(client)
397         client.send_fds([input_read])
398         client.make_request(_common.Request('INPUT', 'FD'))
399         _os.close(input_read)
400         input_read = -1
401         if signature:
402             client.send_fds([message_read])
403             client.make_request(_common.Request('MESSAGE', 'FD'))
404             _os.close(message_read)
405             message_read = -1
406         else:
407             client.send_fds([output_write])
408             client.make_request(_common.Request('OUTPUT', 'FD'))
409             _os.close(output_write)
410             output_write = -1
411         if signature:
412             _write(input_write, signature)
413             _os.close(input_write)
414             input_write = -1
415             _write(message_write, data)
416             _os.close(message_write)
417             message_write = -1
418         else:
419             _write(input_write, data)
420             _os.close(input_write)
421             input_write = -1
422         client.make_request(_common.Request('VERIFY'))
423         if signature:
424             plain = data
425         else:
426             plain = _read(output_read)
427         rs,result = client.make_request(_common.Request('RESULT'))
428         signatures = list(_signature.verify_result_signatures(result))
429         verified = True
430         for signature in signatures:
431             if signature.status != 'success':
432                 verified = False
433             elif signature.pka_trust != 'good':
434                 verified = False
435     finally:
436         disconnect(client)
437         for fd in [input_read, input_write, message_read, message_write,
438                    output_read, output_write]:
439             if fd >= 0:
440                 _os.close(fd)
441     return (plain, verified, signatures)