crypt: add encryption/verification doctests using the new subkey.
[pgp-mime.git] / pgp_mime / crypt.py
1 # Copyright (C) 2012 W. Trevor King <wking@tremily.us>
2 #
3 # This file is part of pgp-mime.
4 #
5 # pgp-mime is free software: you can redistribute it and/or modify it under the
6 # terms of the GNU General Public License as published by the Free Software
7 # Foundation, either version 3 of the License, or (at your option) any later
8 # version.
9 #
10 # pgp-mime is distributed in the hope that it will be useful, but WITHOUT ANY
11 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License along with
15 # pgp-mime.  If not, see <http://www.gnu.org/licenses/>.
16
17 import codecs as _codecs
18 import logging as _logging
19 import os as _os
20 import os.path as _os_path
21 from _socket import socket as _Socket
22 import socket as _socket
23 import subprocess as _subprocess
24
25 from pyassuan import client as _client
26 from pyassuan import common as _common
27
28 from . import LOG as _LOG
29 from . import signature as _signature
30
31
32 def connect(client, filename, **kwargs):
33     filename = _os_path.expanduser(filename)
34     if False:
35         socket = _socket.socket(_socket.AF_UNIX, _socket.SOCK_STREAM)
36         socket.connect(filename)
37         client.input = socket.makefile('rb')
38         client.output = socket.makefile('wb')
39     else:
40         p = _subprocess.Popen(
41             filename, stdin=_subprocess.PIPE, stdout=_subprocess.PIPE,
42             close_fds=True, **kwargs)
43         client.input = p.stdout
44         client.output = p.stdin
45         socket = p
46     client.connect()
47     return socket
48
49 def get_client(**kwargs):
50     logger = _logging.getLogger('{}.{}'.format(_LOG.name, 'pyassuan'))
51     client = _client.AssuanClient(
52         name='pgp-mime', logger=logger, use_sublogger=False,
53         close_on_disconnect=True)
54     socket = connect(client, '~/src/gpgme/build/src/gpgme-tool', **kwargs)
55     #socket = connect(client, '~/.assuan/S.gpgme-tool', **kwargs)
56     return (client, socket)
57
58 def disconnect(client, socket):
59     client.make_request(_common.Request('BYE'))
60     client.disconnect()
61     if isinstance(socket, _Socket):
62         socket.shutdown(_socket.SHUT_RDWR)
63         socket.close()
64     else:
65         status = socket.wait()
66         assert status == 0, status
67
68 def hello(client):
69     responses,data = client.get_responses()  # get initial 'OK' from server
70     client.make_request(_common.Request('ARMOR', 'true'))
71
72 def _read(fd, buffersize=512):
73     d = []
74     while True:
75         try:
76             new = _os.read(fd, buffersize)
77         except Exception as e:
78             _LOG.warn('error while reading: {}'.format(e))
79             break
80         if not new:
81             break
82         d.append(new)
83     return b''.join(d)
84
85 def _write(fd, data):
86     i = 0
87     while i < len(data):
88         i += _os.write(fd, data[i:])
89
90
91 def sign_and_encrypt_bytes(data, signers=None, recipients=None,
92                            always_trust=False, mode='detach',
93                            allow_default_signer=False):
94     r"""Sign ``data`` with ``signers`` and encrypt to ``recipients``.
95
96     Just sign:
97
98     >>> print(sign_and_encrypt_bytes(
99     ...     bytes(b'Hello'), signers=['pgp-mime@invalid.com']))
100     ... # doctest: +ELLIPSIS
101     b'-----BEGIN PGP SIGNATURE-----\n...-----END PGP SIGNATURE-----\n'
102
103     Just encrypt:
104
105     >>> sign_and_encrypt_bytes(
106     ...     bytes(b'Hello'), recipients=['pgp-mime@invalid.com'],
107     ...     always_trust=True)
108     ... # doctest: +ELLIPSIS
109     b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
110
111     Sign and encrypt:
112
113     >>> sign_and_encrypt_bytes(
114     ...     bytes(b'Hello'), signers=['pgp-mime@invalid.com'],
115     ...     recipients=['pgp-mime@invalid.com'], always_trust=True)
116     ... # doctest: +ELLIPSIS
117     b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
118
119     Sign and encrypt with a specific subkey:
120
121     >>> sign_and_encrypt_bytes(
122     ...     bytes(b'Hello'), signers=['0x2F73DE2E'],
123     ...     recipients=['pgp-mime@invalid.com'], always_trust=True)
124     ... # doctest: +ELLIPSIS
125     b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
126     """
127     input_read,input_write = _os.pipe()
128     output_read,output_write = _os.pipe()
129     client,socket = get_client(pass_fds=(input_read, output_write))
130     _os.close(input_read)
131     _os.close(output_write)
132     try:
133         hello(client)
134         if signers:
135             for signer in signers:
136                 client.make_request(_common.Request('SIGNER', signer))
137         if recipients:
138             for recipient in recipients:
139                 client.make_request(_common.Request('RECIPIENT', recipient))
140         client.make_request(
141             _common.Request('INPUT', 'FD={}'.format(input_read)))
142         client.make_request(
143             _common.Request('OUTPUT', 'FD={}'.format(output_write)))
144         parameters = []
145         if signers or allow_default_signer:
146             if recipients:
147                 command = 'SIGN_ENCRYPT'
148             else:
149                 command = 'SIGN'
150                 parameters.append('--{}'.format(mode))
151         elif recipients:
152             command = 'ENCRYPT'
153         else:
154             raise ValueError('must specify at least one signer or recipient')
155         if always_trust:
156             parameters.append('--always-trust')
157         _write(input_write, data)
158         _os.close(input_write)
159         input_write = -1
160         client.make_request(
161             _common.Request(command, ' '.join(parameters)))
162         d = _read(output_read)
163     finally:
164         disconnect(client, socket)
165         for fd in [input_write, output_read]:
166             if fd >= 0:
167                 _os.close(fd)
168     return d
169
170 def decrypt_bytes(data):
171     r"""Decrypt ``data``.
172
173     >>> b = '\n'.join([
174     ...     '-----BEGIN PGP MESSAGE-----',
175     ...     'Version: GnuPG v2.0.17 (GNU/Linux)',
176     ...     '',
177     ...     'hQEMA1Ea7aZDMrbjAQf/TAqLjksZSJxSqkBxYT5gtLQoXY6isvRZg2apjs7CW0y2',
178     ...     'tFK/ptnVYAq2OtWQFhbiJXj8hmwJyyFfb3lghpeu4ihO52JgkkwOpmJb6dxjOi83',
179     ...     'qDwaGOogEPH38BNLuwdrMCW0jmNROwvS796PtqSGUaJTuIiKUB8lETwPwIHrDc11',
180     ...     'N3RWStE5uShNkXXQXplUoeCKf3N4XguXym+GQCqJQzlEMrkkDdr4l7mzvt3Nf8EA',
181     ...     'SgSak086tUoo9x8IN5PJCuOJkcXcjQzFcpqOsA7dyZKO8NeQUZv2JvlZuorckNvN',
182     ...     'xx3PwW0a8VeJgTQrh64ZK/d3F3gNHUTzXkq/UIn25tJFAcmSUwxtsBal7p8zAeCV',
183     ...     '8zefsHRQ5Y03IBeYBcVJBhDS9XfvwLQTJiGGstPCxzKTwSUT1MzV5t5twG/STDCc',
184     ...     'uxW3wSdo',
185     ...     '=bZI+',
186     ...     '-----END PGP MESSAGE-----',
187     ...     ''
188     ...     ]).encode('us-ascii')
189     >>> decrypt_bytes(b)
190     b'Success!\n'
191     """
192     input_read,input_write = _os.pipe()
193     output_read,output_write = _os.pipe()
194     client,socket = get_client(pass_fds=(input_read, output_write))
195     _os.close(input_read)
196     _os.close(output_write)
197     try:
198         hello(client)
199         client.make_request(
200             _common.Request('INPUT', 'FD={}'.format(input_read)))
201         client.make_request(
202             _common.Request('OUTPUT', 'FD={}'.format(output_write)))
203         _write(input_write, data)
204         _os.close(input_write)
205         input_write = -1
206         client.make_request(_common.Request('DECRYPT'))
207         d = _read(output_read)
208     finally:
209         disconnect(client, socket)
210         for fd in [input_write, output_read]:
211             if fd >= 0:
212                 _os.close(fd)
213     return d
214
215 def verify_bytes(data, signature=None, always_trust=False):
216     r"""Verify a signature on ``data``, possibly decrypting first.
217
218     These tests assume you didn't trust the distributed test key.
219
220     >>> b = '\n'.join([
221     ...     '-----BEGIN PGP MESSAGE-----',
222     ...     'Version: GnuPG v2.0.17 (GNU/Linux)',
223     ...     '',
224     ...     'hQEMA1Ea7aZDMrbjAQf/YM1SeFzNGz0DnUynaEyhfGCvcqmjtbN1PtZMpT7VaQLN',
225     ...     'a+c0faskr79Atz0+2IBR7CDOlcETrRtH2EnrWukbRIDtmffNFGuhMRTNfnQ15OIN',
226     ...     'qrmt2P5gXznsgnm2XjzTK7S/Cc3Aq+zjaDrDt7bIedEdz+EyNgaKuL/lB9cAB8xL',
227     ...     'YYp/yn55Myjair2idgzsa7w/QXdE3RhpyRLqR2Jgz4P1I1xOgUYnylbpIZL9FOKN',
228     ...     'NR3RQhkGdANBku8otfthb5ZUGsNMV45ct4V8PE+xChjFb9gcwpaf1hhoIF/sYHD5',
229     ...     'Bkf+v/J8F40KGYY16b0DjQIUlnra9y7q9jj0h2bvc9LAtgHtVUso133LLcVYl7RP',
230     ...     'Vjyz9Ps366BtIdPlAL4CoF5hEcMKS5J3h1vRlyAKN4uHENl5vKvoxn7ID3JhhWQc',
231     ...     '6QrPGis64zi3OnYor34HPh/KNJvkgOQkekmtYuTxnkiONA4lhMDJgeaVZ9WZq+GV',
232     ...     'MaCvCFGNYU2TV4V8wMlnUbF8d5bDQ83g8MxIVKdDcnBzzYLZha+qmz4Spry9iB53',
233     ...     'Sg/sM5H8gWWSl7Oj1lxVg7o7IscpQfVt6zL6jD2VjL3L3Hu7WEXIrcGZtvrP4d+C',
234     ...     'TGYWiGlh5B2UCFk2bVctfw8W/QfaVvJYD4Rfqta2V2p14KIJLFRSGa1g26W4ixrH',
235     ...     'XKxgaA3AIfJ+6c5RoisRLuYCxvQi91wkE9hAXR+inXK4Hq4SmiHoeITZFhHP3hh3',
236     ...     'rbpp8mopiMNxWqCbuqgILP6pShn4oPclu9aR8uJ1ziDxISTGYC71mvLUERUjFn2L',
237     ...     'fu6C0+TCC9RmeyL+eNdM6cjs1G7YR6yX',
238     ...     '=phHd',
239     ...     '-----END PGP MESSAGE-----',
240     ...     '',
241     ...     ]).encode('us-ascii')
242     >>> output,verified,signatures = verify_bytes(b)
243     >>> output
244     b'Success!\n'
245     >>> verified
246     False
247     >>> for s in signatures:
248     ...     print(s.dumps())
249     ... # doctest: +REPORT_UDIFF
250     B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3 signature:
251       summary:
252         CRL missing: False
253         CRL too old: False
254         bad policy: False
255         green: False
256         key expired: False
257         key missing: False
258         key revoked: False
259         red: False
260         signature expired: False
261         system error: False
262         valid: False
263       status: success
264       timestamp: Wed Mar 21 19:13:57 2012
265       expiration timestamp: None
266       wrong key usage: False
267       pka trust: not available
268       chain model: False
269       validity: unknown
270       validity reason: success
271       public key algorithm: RSA
272       hash algorithm: SHA256
273     >>> b = b'Success!\n'
274     >>> signature = '\n'.join([
275     ...     '-----BEGIN PGP SIGNATURE-----',
276     ...     'Version: GnuPG v2.0.17 (GNU/Linux)',
277     ...     '',
278     ...     'iQEcBAEBAgAGBQJPaiw/AAoJEFEa7aZDMrbj93gH/1fQPXLjUTpONJUTmvGoMLNA',
279     ...     'W9ZhjpUL5i6rRqYGUvQ4kTEDuPMxkMrCyFCDHEhSDHufMek6Nso5/HeJn3aqxlgs',
280     ...     'hmNlvAq4FI6JQyFL7eCp/XG9cPx1p42dTI7JAih8FuK21sS4m/H5XP3R/6KXC99D',
281     ...     '39rrXCvvR+yNgKe2dxuJwmKuLteVlcWxiIQwVrYK70GtJHC5BO79G8yGccWoEy9C',
282     ...     '9JkJiyNptqZyFjGBNmMmrCSFZ7ZFA02RB+laRmwuIiozw4TJYEksxPrgZMbbcFzx',
283     ...     'zs3JHyV23+Fz1ftalvwskHE7tJkX9Ub8iBMNZ/KxJXXdPdpuMdEYVjoUehkQBQE=',
284     ...     '=rRBP',
285     ...     '-----END PGP SIGNATURE-----',
286     ...     '',
287     ...     ]).encode('us-ascii')
288     >>> output,verified,signatures = verify_bytes(b, signature=signature)
289     >>> output
290     b'Success!\n'
291     >>> verified
292     False
293     >>> for s in signatures:
294     ...     print(s.dumps())
295     ... # doctest: +REPORT_UDIFF
296     B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3 signature:
297       summary:
298         CRL missing: False
299         CRL too old: False
300         bad policy: False
301         green: False
302         key expired: False
303         key missing: False
304         key revoked: False
305         red: False
306         signature expired: False
307         system error: False
308         valid: False
309       status: success
310       timestamp: Wed Mar 21 19:30:07 2012
311       expiration timestamp: None
312       wrong key usage: False
313       pka trust: not available
314       chain model: False
315       validity: unknown
316       validity reason: success
317       public key algorithm: RSA
318       hash algorithm: SHA1
319
320     Data signed by a subkey returns the subkey fingerprint.
321
322     >>> b = '\n'.join([
323     ...     '-----BEGIN PGP MESSAGE-----',
324     ...     'Version: GnuPG v2.0.19 (GNU/Linux)',
325     ...     '',
326     ...     'hQEMAxcQCLovc94uAQf9ErTZnr0lYRlLLZIk1VcpNNTHrMro+BmqpFC0jprA4/2m',
327     ...     '92klBF4TIS1A9bU5oxzQquaAIDV42P3sXrbxu/YhHLmPGH+dc2JVSfPLL0XOL5GC',
328     ...     'qpQYe5lglRBReFSRktrfhukjHBoXvh3c8T4xYK2r+nIV4gsp+FrSQMIOdhhBoC36',
329     ...     'U1MOk+R+I0JDbWdzZzJONs7ZcAcNDVKqxmAXZUqVgkhPpnGBSBuF9ExKRT3S6e5N',
330     ...     'Rsorb/DjGIUHSZuH2EaWAUz1jJ3nSta7TnveT/avfJiAV7cRS4oVgyyFyuHO5gkI',
331     ...     'o0obeJaut3enVgpq2TUUk0M4L8TX4jjKvDGAYNyuPNLAsQFHLj5eLmJSudGStWuA',
332     ...     'WjKLqBHD0M8/OcwnrTMleJl+h50ZsHO1tvvkXelH+w/jD5SMS+ktxq2Te8Vj7BmM',
333     ...     '0WQn3Ys7ViA5PgcSpbqNNLdgc1EMcpPI/sfJAORPKVWRPBKDXX/irY2onAMSe5gH',
334     ...     'teNX6bZd/gaoLWqD/1ZhsOCnlV7LY1R929TJ9vxnJcfKKAKwBDfAaSbecUUMECVw',
335     ...     's4u3ZT1pmNslBmH6XSy3ifLYWu/2xsJuhPradT88BJOBARMGg81gOE6zxGRrMLJa',
336     ...     'KojFgqaF2y4nlZAyaJ1Ld4qCaoQogaL9qE1BbmgtBehZ2FNQiIBSLC0fUUl8A4Py',
337     ...     '4d9ZxUoSp7nZmgTN5pUH1N9DIC4ntp/Rak2WnpS7+dRPlp9A2SF0RkeLY+JD9gNm',
338     ...     'j44zBkI79KlgaE/cMt6xUXAF/1ZR/Hv/6GUazGx0l23CnSGuqzLpex2uKOxfKiJt',
339     ...     'jfgyZRhIdFJnRuEXt8dTTDiiYA==',
340     ...     '=0o+x',
341     ...     '-----END PGP MESSAGE-----',
342     ...     '',
343     ...     ]).encode('us-ascii')
344     >>> output,verified,signatures = verify_bytes(b)
345     >>> output
346     b'Hello'
347     >>> verified
348     False
349     >>> for s in signatures:
350     ...     print(s.dumps())
351     ... # doctest: +REPORT_UDIFF
352     DECC812C8795ADD60538B0CD171008BA2F73DE2E signature:
353       summary:
354         CRL missing: False
355         CRL too old: False
356         bad policy: False
357         green: False
358         key expired: False
359         key missing: False
360         key revoked: False
361         red: False
362         signature expired: False
363         system error: False
364         valid: False
365       status: success
366       timestamp: Thu Sep 20 15:29:28 2012
367       expiration timestamp: None
368       wrong key usage: False
369       pka trust: not available
370       chain model: False
371       validity: unknown
372       validity reason: success
373       public key algorithm: RSA
374       hash algorithm: SHA256
375     """
376     input_read,input_write = _os.pipe()
377     pass_fds = [input_read]
378     if signature:
379         message_read,message_write = _os.pipe()
380         output_read = -1
381         pass_fds.append(message_read)
382     else:
383         message_write = -1
384         output_read,output_write = _os.pipe()
385         pass_fds.append(output_write)
386     client,socket = get_client(pass_fds=pass_fds)
387     _os.close(input_read)
388     if signature:
389         _os.close(message_read)
390     else:
391         _os.close(output_write)
392     verified = None
393     signatures = []
394     try:
395         hello(client)
396         client.make_request(
397             _common.Request('INPUT', 'FD={}'.format(input_read)))
398         if signature:
399             client.make_request(
400                 _common.Request('MESSAGE', 'FD={}'.format(message_read)))
401         else:
402             client.make_request(
403                 _common.Request('OUTPUT', 'FD={}'.format(output_write)))
404         if signature:
405             _write(input_write, signature)
406             _os.close(input_write)
407             input_write = -1
408             _write(message_write, data)
409             _os.close(message_write)
410             message_write = -1
411         else:
412             _write(input_write, data)
413             _os.close(input_write)
414             input_write = -1
415         client.make_request(_common.Request('VERIFY'))
416         if signature:
417             plain = data
418         else:
419             plain = _read(output_read)
420         rs,result = client.make_request(_common.Request('RESULT'))
421         signatures = list(_signature.verify_result_signatures(result))
422         verified = True
423         for signature in signatures:
424             if signature.status != 'success':
425                 verified = False
426             elif signature.pka_trust != 'good':
427                 verified = False
428     finally:
429         disconnect(client, socket)
430         for fd in [input_write, message_write, output_read]:
431             if fd >= 0:
432                 _os.close(fd)
433     return (plain, verified, signatures)