Add `allow_default_signer` to `sign_and_encrypt_bytes`.
[pgp-mime.git] / pgp_mime / crypt.py
1 # Copyright
2
3 import codecs as _codecs
4 import logging as _logging
5 import os as _os
6 import os.path as _os_path
7 from _socket import socket as _Socket
8 import socket as _socket
9 import subprocess as _subprocess
10
11 from pyassuan import client as _client
12 from pyassuan import common as _common
13
14 from . import LOG as _LOG
15
16
17 def connect(client, filename, **kwargs):
18     filename = _os_path.expanduser(filename)
19     if False:
20         socket = _socket.socket(_socket.AF_UNIX, _socket.SOCK_STREAM)
21         socket.connect(filename)
22         client.input = socket.makefile('rb')
23         client.output = socket.makefile('wb')
24     else:
25         p = _subprocess.Popen(
26             filename, stdin=_subprocess.PIPE, stdout=_subprocess.PIPE,
27             close_fds=True, **kwargs)
28         client.input = p.stdout
29         client.output = p.stdin
30         socket = p
31     client.connect()
32     return socket
33
34 def get_client(**kwargs):
35     client = _client.AssuanClient(name='pgp-mime', close_on_disconnect=True)
36     client.logger.setLevel(_logging.DEBUG)
37     socket = connect(client, '~/src/gpgme/build/src/gpgme-tool', **kwargs)
38     #socket = connect(client, '~/.assuan/S.gpgme-tool', **kwargs)
39     return (client, socket)
40
41 def disconnect(client, socket):
42     client.make_request(_common.Request('BYE'))
43     client.disconnect()
44     if isinstance(socket, _Socket):
45         socket.shutdown(_socket.SHUT_RDWR)
46         socket.close()
47     else:
48         status = socket.wait()
49         assert status == 0, status
50
51 def hello(client):
52     responses,data = client.get_responses()  # get initial 'OK' from server
53     client.make_request(_common.Request('ARMOR', 'true'))
54
55 def _read(fd, buffersize=512):
56     d = []
57     while True:
58         try:
59             new = _os.read(fd, buffersize)
60         except Exception as e:
61             _LOG.warn('error while reading: {}'.format(e))
62             break
63         if not new:
64             break
65         d.append(new)
66     return b''.join(d)
67
68 def _write(fd, data):
69     i = 0
70     while i < len(data):
71         i += _os.write(fd, data[i:])
72
73
74 def sign_and_encrypt_bytes(data, signers=None, recipients=None,
75                            always_trust=False, mode='detach',
76                            allow_default_signer=False):
77     r"""Sign ``data`` with ``signers`` and encrypt to ``recipients``.
78
79     Just sign:
80
81     >>> print(sign_and_encrypt_bytes(
82     ...     bytes(b'Hello'), signers=['pgp-mime@invalid.com']))
83     ... # doctest: +ELLIPSIS
84     b'-----BEGIN PGP SIGNATURE-----\n...-----END PGP SIGNATURE-----\n'
85
86     Just encrypt:
87
88     >>> sign_and_encrypt_bytes(
89     ...     bytes(b'Hello'), recipients=['pgp-mime@invalid.com'],
90     ...     always_trust=True)
91     ... # doctest: +ELLIPSIS
92     b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
93
94     Sign and encrypt:
95
96     >>> sign_and_encrypt_bytes(
97     ...     bytes(b'Hello'), signers=['pgp-mime@invalid.com'],
98     ...     recipients=['pgp-mime@invalid.com'], always_trust=True)
99     ... # doctest: +ELLIPSIS
100     b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
101     """
102     input_read,input_write = _os.pipe()
103     output_read,output_write = _os.pipe()
104     client,socket = get_client(pass_fds=(input_read, output_write))
105     _os.close(input_read)
106     _os.close(output_write)
107     try:
108         hello(client)
109         if signers:
110             for signer in signers:
111                 client.make_request(_common.Request('SIGNER', signer))
112         if recipients:
113             for recipient in recipients:
114                 client.make_request(_common.Request('RECIPIENT', recipient))
115         client.make_request(
116             _common.Request('INPUT', 'FD={}'.format(input_read)))
117         client.make_request(
118             _common.Request('OUTPUT', 'FD={}'.format(output_write)))
119         parameters = []
120         if signers or allow_default_signer:
121             if recipients:
122                 command = 'SIGN_ENCRYPT'
123             else:
124                 command = 'SIGN'
125                 parameters.append('--{}'.format(mode))
126         elif recipients:
127             command = 'ENCRYPT'
128         else:
129             raise ValueError('must specify at least one signer or recipient')
130         if always_trust:
131             parameters.append('--always-trust')
132         _write(input_write, data)
133         _os.close(input_write)
134         input_write = -1
135         client.make_request(
136             _common.Request(command, ' '.join(parameters)))
137         d = _read(output_read)
138     finally:
139         disconnect(client, socket)
140         for fd in [input_write, output_read]:
141             if fd >= 0:
142                 _os.close(fd)
143     return d
144
145 def decrypt_bytes(data):
146     r"""Decrypt ``data``.
147
148     >>> b = '\n'.join([
149     ...     '-----BEGIN PGP MESSAGE-----',
150     ...     'Version: GnuPG v2.0.17 (GNU/Linux)',
151     ...     '',
152     ...     'hQEMA1Ea7aZDMrbjAQf/TAqLjksZSJxSqkBxYT5gtLQoXY6isvRZg2apjs7CW0y2',
153     ...     'tFK/ptnVYAq2OtWQFhbiJXj8hmwJyyFfb3lghpeu4ihO52JgkkwOpmJb6dxjOi83',
154     ...     'qDwaGOogEPH38BNLuwdrMCW0jmNROwvS796PtqSGUaJTuIiKUB8lETwPwIHrDc11',
155     ...     'N3RWStE5uShNkXXQXplUoeCKf3N4XguXym+GQCqJQzlEMrkkDdr4l7mzvt3Nf8EA',
156     ...     'SgSak086tUoo9x8IN5PJCuOJkcXcjQzFcpqOsA7dyZKO8NeQUZv2JvlZuorckNvN',
157     ...     'xx3PwW0a8VeJgTQrh64ZK/d3F3gNHUTzXkq/UIn25tJFAcmSUwxtsBal7p8zAeCV',
158     ...     '8zefsHRQ5Y03IBeYBcVJBhDS9XfvwLQTJiGGstPCxzKTwSUT1MzV5t5twG/STDCc',
159     ...     'uxW3wSdo',
160     ...     '=bZI+',
161     ...     '-----END PGP MESSAGE-----',
162     ...     ''
163     ...     ]).encode('us-ascii')
164     >>> decrypt_bytes(b)
165     b'Success!\n'
166     """
167     input_read,input_write = _os.pipe()
168     output_read,output_write = _os.pipe()
169     client,socket = get_client(pass_fds=(input_read, output_write))
170     _os.close(input_read)
171     _os.close(output_write)
172     try:
173         hello(client)
174         client.make_request(
175             _common.Request('INPUT', 'FD={}'.format(input_read)))
176         client.make_request(
177             _common.Request('OUTPUT', 'FD={}'.format(output_write)))
178         _write(input_write, data)
179         _os.close(input_write)
180         input_write = -1
181         client.make_request(_common.Request('DECRYPT'))
182         d = _read(output_read)
183     finally:
184         disconnect(client, socket)
185         for fd in [input_write, output_read]:
186             if fd >= 0:
187                 _os.close(fd)
188     return d
189
190 def verify_bytes(data, signature=None, always_trust=False):
191     r"""Verify a signature on ``data``, possibly decrypting first.
192
193     These tests assume you didn't trust the distributed test key.
194
195     >>> b = '\n'.join([
196     ...     '-----BEGIN PGP MESSAGE-----',
197     ...     'Version: GnuPG v2.0.17 (GNU/Linux)',
198     ...     '',
199     ...     'hQEMA1Ea7aZDMrbjAQf/YM1SeFzNGz0DnUynaEyhfGCvcqmjtbN1PtZMpT7VaQLN',
200     ...     'a+c0faskr79Atz0+2IBR7CDOlcETrRtH2EnrWukbRIDtmffNFGuhMRTNfnQ15OIN',
201     ...     'qrmt2P5gXznsgnm2XjzTK7S/Cc3Aq+zjaDrDt7bIedEdz+EyNgaKuL/lB9cAB8xL',
202     ...     'YYp/yn55Myjair2idgzsa7w/QXdE3RhpyRLqR2Jgz4P1I1xOgUYnylbpIZL9FOKN',
203     ...     'NR3RQhkGdANBku8otfthb5ZUGsNMV45ct4V8PE+xChjFb9gcwpaf1hhoIF/sYHD5',
204     ...     'Bkf+v/J8F40KGYY16b0DjQIUlnra9y7q9jj0h2bvc9LAtgHtVUso133LLcVYl7RP',
205     ...     'Vjyz9Ps366BtIdPlAL4CoF5hEcMKS5J3h1vRlyAKN4uHENl5vKvoxn7ID3JhhWQc',
206     ...     '6QrPGis64zi3OnYor34HPh/KNJvkgOQkekmtYuTxnkiONA4lhMDJgeaVZ9WZq+GV',
207     ...     'MaCvCFGNYU2TV4V8wMlnUbF8d5bDQ83g8MxIVKdDcnBzzYLZha+qmz4Spry9iB53',
208     ...     'Sg/sM5H8gWWSl7Oj1lxVg7o7IscpQfVt6zL6jD2VjL3L3Hu7WEXIrcGZtvrP4d+C',
209     ...     'TGYWiGlh5B2UCFk2bVctfw8W/QfaVvJYD4Rfqta2V2p14KIJLFRSGa1g26W4ixrH',
210     ...     'XKxgaA3AIfJ+6c5RoisRLuYCxvQi91wkE9hAXR+inXK4Hq4SmiHoeITZFhHP3hh3',
211     ...     'rbpp8mopiMNxWqCbuqgILP6pShn4oPclu9aR8uJ1ziDxISTGYC71mvLUERUjFn2L',
212     ...     'fu6C0+TCC9RmeyL+eNdM6cjs1G7YR6yX',
213     ...     '=phHd',
214     ...     '-----END PGP MESSAGE-----',
215     ...     '',
216     ...     ]).encode('us-ascii')
217     >>> output,verified,result = verify_bytes(b)
218     >>> output
219     b'Success!\n'
220     >>> verified
221     False
222     >>> print(str(result, 'utf-8').replace('\x00', ''))
223     ... # doctest: +REPORT_UDIFF
224     <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
225     <gpgme>
226       <verify-result>
227         <signatures>
228           <signature>
229             <summary value="0x0" />
230             <fpr>B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3</fpr>
231             <status value="0x0">Success &lt;Unspecified source&gt;</status>
232             <timestamp unix="1332357237i" />
233             <exp-timestamp unix="0i" />
234             <wrong-key-usage value="0x0" />
235             <pka-trust value="0x0" />
236             <chain-model value="0x0" />
237             <validity value="0x0" />
238             <validity-reason value="0x0">Success &lt;Unspecified source&gt;</validity-reason>
239             <pubkey-algo value="0x1">RSA</pubkey-algo>
240             <hash-algo value="0x8">SHA256</hash-algo>
241           </signature>
242         </signatures>
243       </verify-result>
244     </gpgme>
245     <BLANKLINE>
246     >>> b = b'Success!\n'
247     >>> signature = '\n'.join([
248     ...     '-----BEGIN PGP SIGNATURE-----',
249     ...     'Version: GnuPG v2.0.17 (GNU/Linux)',
250     ...     '',
251     ...     'iQEcBAEBAgAGBQJPaiw/AAoJEFEa7aZDMrbj93gH/1fQPXLjUTpONJUTmvGoMLNA',
252     ...     'W9ZhjpUL5i6rRqYGUvQ4kTEDuPMxkMrCyFCDHEhSDHufMek6Nso5/HeJn3aqxlgs',
253     ...     'hmNlvAq4FI6JQyFL7eCp/XG9cPx1p42dTI7JAih8FuK21sS4m/H5XP3R/6KXC99D',
254     ...     '39rrXCvvR+yNgKe2dxuJwmKuLteVlcWxiIQwVrYK70GtJHC5BO79G8yGccWoEy9C',
255     ...     '9JkJiyNptqZyFjGBNmMmrCSFZ7ZFA02RB+laRmwuIiozw4TJYEksxPrgZMbbcFzx',
256     ...     'zs3JHyV23+Fz1ftalvwskHE7tJkX9Ub8iBMNZ/KxJXXdPdpuMdEYVjoUehkQBQE=',
257     ...     '=rRBP',
258     ...     '-----END PGP SIGNATURE-----',
259     ...     '',
260     ...     ]).encode('us-ascii')
261     >>> output,verified,result = verify_bytes(b, signature=signature)
262     >>> output
263     b'Success!\n'
264     >>> verified
265     False
266     >>> print(str(result, 'utf-8').replace('\x00', ''))
267     ... # doctest: +REPORT_UDIFF
268     <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
269     <gpgme>
270       <verify-result>
271         <signatures>
272           <signature>
273             <summary value="0x0" />
274             <fpr>B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3</fpr>
275             <status value="0x0">Success &lt;Unspecified source&gt;</status>
276             <timestamp unix="1332358207i" />
277             <exp-timestamp unix="0i" />
278             <wrong-key-usage value="0x0" />
279             <pka-trust value="0x0" />
280             <chain-model value="0x0" />
281             <validity value="0x0" />
282             <validity-reason value="0x0">Success &lt;Unspecified source&gt;</validity-reason>
283             <pubkey-algo value="0x1">RSA</pubkey-algo>
284             <hash-algo value="0x2">SHA1</hash-algo>
285           </signature>
286         </signatures>
287       </verify-result>
288     </gpgme>
289     <BLANKLINE>
290     """
291     input_read,input_write = _os.pipe()
292     pass_fds = [input_read]
293     if signature:
294         message_read,message_write = _os.pipe()
295         output_read = -1
296         pass_fds.append(message_read)
297     else:
298         message_write = -1
299         output_read,output_write = _os.pipe()
300         pass_fds.append(output_write)
301     client,socket = get_client(pass_fds=pass_fds)
302     _os.close(input_read)
303     if signature:
304         _os.close(message_read)
305     else:
306         _os.close(output_write)
307     verified = result = None
308     try:
309         hello(client)
310         client.make_request(
311             _common.Request('INPUT', 'FD={}'.format(input_read)))
312         if signature:
313             client.make_request(
314                 _common.Request('MESSAGE', 'FD={}'.format(message_read)))
315         else:
316             client.make_request(
317                 _common.Request('OUTPUT', 'FD={}'.format(output_write)))
318         if signature:
319             _write(input_write, signature)
320             _os.close(input_write)
321             input_write = -1
322             _write(message_write, data)
323             _os.close(message_write)
324             message_write = -1
325         else:
326             _write(input_write, data)
327             _os.close(input_write)
328             input_write = -1
329         client.make_request(_common.Request('VERIFY'))
330         if signature:
331             plain = data
332         else:
333             plain = _read(output_read)
334         rs,result = client.make_request(_common.Request('RESULT'))
335         verified = True
336         for line in result.splitlines():
337             if b'<status ' in line and b'Success' not in line:
338                 verified = False
339             elif b'<pka-trust' in line and b'0x2' not in line:
340                 verified = False
341     finally:
342         disconnect(client, socket)
343         for fd in [input_write, message_write, output_read]:
344             if fd >= 0:
345                 _os.close(fd)
346     return (plain, verified, result)