d326d24758a2cd8b2dcfb12984e83bbf82d92472
[pgp-mime.git] / pgp_mime / crypt.py
1 # Copyright
2
3 import codecs as _codecs
4 import logging as _logging
5 import os as _os
6 import os.path as _os_path
7 from _socket import socket as _Socket
8 import socket as _socket
9 import subprocess as _subprocess
10
11 from pyassuan import client as _client
12 from pyassuan import common as _common
13
14 from . import LOG as _LOG
15
16
17 def connect(client, filename, **kwargs):
18     filename = _os_path.expanduser(filename)
19     if False:
20         socket = _socket.socket(_socket.AF_UNIX, _socket.SOCK_STREAM)
21         socket.connect(filename)
22         client.input = socket.makefile('rb')
23         client.output = socket.makefile('wb')
24     else:
25         p = _subprocess.Popen(
26             filename, stdin=_subprocess.PIPE, stdout=_subprocess.PIPE,
27             close_fds=True, **kwargs)
28         client.input = p.stdout
29         client.output = p.stdin
30         socket = p
31     client.connect()
32     return socket
33
34 def get_client(**kwargs):
35     client = _client.AssuanClient(name='pgp-mime', close_on_disconnect=True)
36     client.logger.setLevel(_logging.DEBUG)
37     socket = connect(client, '~/src/gpgme/build/src/gpgme-tool', **kwargs)
38     #socket = connect(client, '~/.assuan/S.gpgme-tool', **kwargs)
39     return (client, socket)
40
41 def disconnect(client, socket):
42     client.make_request(_common.Request('BYE'))
43     client.disconnect()
44     if isinstance(socket, _Socket):
45         socket.shutdown(_socket.SHUT_RDWR)
46         socket.close()
47     else:
48         status = socket.wait()
49         assert status == 0, status
50
51 def hello(client):
52     responses,data = client.get_responses()  # get initial 'OK' from server
53     client.make_request(_common.Request('ARMOR', 'true'))
54
55 def _read(fd, buffersize=512):
56     d = []
57     while True:
58         try:
59             new = _os.read(fd, buffersize)
60         except Exception as e:
61             _LOG.warn('error while reading: {}'.format(e))
62             break
63         if not new:
64             break
65         d.append(new)
66     return b''.join(d)
67
68 def _write(fd, data):
69     i = 0
70     while i < len(data):
71         i += _os.write(fd, data[i:])
72
73
74 def sign_and_encrypt_bytes(data, signers=None, recipients=None,
75                            always_trust=False, mode='detach'):
76     r"""Sign ``data`` with ``signers`` and encrypt to ``recipients``.
77
78     Just sign:
79
80     >>> print(sign_and_encrypt_bytes(
81     ...     bytes(b'Hello'), signers=['pgp-mime@invalid.com']))
82     ... # doctest: +ELLIPSIS
83     b'-----BEGIN PGP SIGNATURE-----\n...-----END PGP SIGNATURE-----\n'
84
85     Just encrypt:
86
87     >>> sign_and_encrypt_bytes(
88     ...     bytes(b'Hello'), recipients=['pgp-mime@invalid.com'],
89     ...     always_trust=True)
90     ... # doctest: +ELLIPSIS
91     b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
92
93     Sign and encrypt:
94
95     >>> sign_and_encrypt_bytes(
96     ...     bytes(b'Hello'), signers=['pgp-mime@invalid.com'],
97     ...     recipients=['pgp-mime@invalid.com'], always_trust=True)
98     ... # doctest: +ELLIPSIS
99     b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
100     """
101     input_read,input_write = _os.pipe()
102     output_read,output_write = _os.pipe()
103     client,socket = get_client(pass_fds=(input_read, output_write))
104     _os.close(input_read)
105     _os.close(output_write)
106     try:
107         hello(client)
108         if signers:
109             for signer in signers:
110                 client.make_request(_common.Request('SIGNER', signer))
111         if recipients:
112             for recipient in recipients:
113                 client.make_request(_common.Request('RECIPIENT', recipient))
114         client.make_request(
115             _common.Request('INPUT', 'FD={}'.format(input_read)))
116         client.make_request(
117             _common.Request('OUTPUT', 'FD={}'.format(output_write)))
118         parameters = []
119         if signers and recipients:
120             command = 'SIGN_ENCRYPT'
121         elif signers:
122             command = 'SIGN'
123             parameters.append('--{}'.format(mode))
124         elif recipients:
125             command = 'ENCRYPT'
126         else:
127             raise ValueError('must specify at least one signer or recipient')
128         if always_trust:
129             parameters.append('--always-trust')
130         _write(input_write, data)
131         _os.close(input_write)
132         input_write = -1
133         client.make_request(
134             _common.Request(command, ' '.join(parameters)))
135         d = _read(output_read)
136     finally:
137         disconnect(client, socket)
138         for fd in [input_write, output_read]:
139             if fd >= 0:
140                 _os.close(fd)
141     return d
142
143 def decrypt_bytes(data):
144     r"""Decrypt ``data``.
145
146     >>> b = '\n'.join([
147     ...     '-----BEGIN PGP MESSAGE-----',
148     ...     'Version: GnuPG v2.0.17 (GNU/Linux)',
149     ...     '',
150     ...     'hQEMA1Ea7aZDMrbjAQf/TAqLjksZSJxSqkBxYT5gtLQoXY6isvRZg2apjs7CW0y2',
151     ...     'tFK/ptnVYAq2OtWQFhbiJXj8hmwJyyFfb3lghpeu4ihO52JgkkwOpmJb6dxjOi83',
152     ...     'qDwaGOogEPH38BNLuwdrMCW0jmNROwvS796PtqSGUaJTuIiKUB8lETwPwIHrDc11',
153     ...     'N3RWStE5uShNkXXQXplUoeCKf3N4XguXym+GQCqJQzlEMrkkDdr4l7mzvt3Nf8EA',
154     ...     'SgSak086tUoo9x8IN5PJCuOJkcXcjQzFcpqOsA7dyZKO8NeQUZv2JvlZuorckNvN',
155     ...     'xx3PwW0a8VeJgTQrh64ZK/d3F3gNHUTzXkq/UIn25tJFAcmSUwxtsBal7p8zAeCV',
156     ...     '8zefsHRQ5Y03IBeYBcVJBhDS9XfvwLQTJiGGstPCxzKTwSUT1MzV5t5twG/STDCc',
157     ...     'uxW3wSdo',
158     ...     '=bZI+',
159     ...     '-----END PGP MESSAGE-----',
160     ...     ''
161     ...     ]).encode('us-ascii')
162     >>> decrypt_bytes(b)
163     b'Success!\n'
164     """
165     input_read,input_write = _os.pipe()
166     output_read,output_write = _os.pipe()
167     client,socket = get_client(pass_fds=(input_read, output_write))
168     _os.close(input_read)
169     _os.close(output_write)
170     try:
171         hello(client)
172         client.make_request(
173             _common.Request('INPUT', 'FD={}'.format(input_read)))
174         client.make_request(
175             _common.Request('OUTPUT', 'FD={}'.format(output_write)))
176         _write(input_write, data)
177         _os.close(input_write)
178         input_write = -1
179         client.make_request(_common.Request('DECRYPT'))
180         d = _read(output_read)
181     finally:
182         disconnect(client, socket)
183         for fd in [input_write, output_read]:
184             if fd >= 0:
185                 _os.close(fd)
186     return d
187
188 def verify_bytes(data, signature=None, always_trust=False):
189     r"""Verify a signature on ``data``, possibly decrypting first.
190
191     These tests assume you didn't trust the distributed test key.
192
193     >>> b = '\n'.join([
194     ...     '-----BEGIN PGP MESSAGE-----',
195     ...     'Version: GnuPG v2.0.17 (GNU/Linux)',
196     ...     '',
197     ...     'hQEMA1Ea7aZDMrbjAQf/YM1SeFzNGz0DnUynaEyhfGCvcqmjtbN1PtZMpT7VaQLN',
198     ...     'a+c0faskr79Atz0+2IBR7CDOlcETrRtH2EnrWukbRIDtmffNFGuhMRTNfnQ15OIN',
199     ...     'qrmt2P5gXznsgnm2XjzTK7S/Cc3Aq+zjaDrDt7bIedEdz+EyNgaKuL/lB9cAB8xL',
200     ...     'YYp/yn55Myjair2idgzsa7w/QXdE3RhpyRLqR2Jgz4P1I1xOgUYnylbpIZL9FOKN',
201     ...     'NR3RQhkGdANBku8otfthb5ZUGsNMV45ct4V8PE+xChjFb9gcwpaf1hhoIF/sYHD5',
202     ...     'Bkf+v/J8F40KGYY16b0DjQIUlnra9y7q9jj0h2bvc9LAtgHtVUso133LLcVYl7RP',
203     ...     'Vjyz9Ps366BtIdPlAL4CoF5hEcMKS5J3h1vRlyAKN4uHENl5vKvoxn7ID3JhhWQc',
204     ...     '6QrPGis64zi3OnYor34HPh/KNJvkgOQkekmtYuTxnkiONA4lhMDJgeaVZ9WZq+GV',
205     ...     'MaCvCFGNYU2TV4V8wMlnUbF8d5bDQ83g8MxIVKdDcnBzzYLZha+qmz4Spry9iB53',
206     ...     'Sg/sM5H8gWWSl7Oj1lxVg7o7IscpQfVt6zL6jD2VjL3L3Hu7WEXIrcGZtvrP4d+C',
207     ...     'TGYWiGlh5B2UCFk2bVctfw8W/QfaVvJYD4Rfqta2V2p14KIJLFRSGa1g26W4ixrH',
208     ...     'XKxgaA3AIfJ+6c5RoisRLuYCxvQi91wkE9hAXR+inXK4Hq4SmiHoeITZFhHP3hh3',
209     ...     'rbpp8mopiMNxWqCbuqgILP6pShn4oPclu9aR8uJ1ziDxISTGYC71mvLUERUjFn2L',
210     ...     'fu6C0+TCC9RmeyL+eNdM6cjs1G7YR6yX',
211     ...     '=phHd',
212     ...     '-----END PGP MESSAGE-----',
213     ...     '',
214     ...     ]).encode('us-ascii')
215     >>> output,verified,result = verify_bytes(b)
216     >>> output
217     b'Success!\n'
218     >>> verified
219     False
220     >>> print(str(result, 'utf-8').replace('\x00', ''))
221     ... # doctest: +REPORT_UDIFF
222     <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
223     <gpgme>
224       <verify-result>
225         <signatures>
226           <signature>
227             <summary value="0x0" />
228             <fpr>B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3</fpr>
229             <status value="0x0">Success &lt;Unspecified source&gt;</status>
230             <timestamp unix="1332357237i" />
231             <exp-timestamp unix="0i" />
232             <wrong-key-usage value="0x0" />
233             <pka-trust value="0x0" />
234             <chain-model value="0x0" />
235             <validity value="0x0" />
236             <validity-reason value="0x0">Success &lt;Unspecified source&gt;</validity-reason>
237             <pubkey-algo value="0x1">RSA</pubkey-algo>
238             <hash-algo value="0x8">SHA256</hash-algo>
239           </signature>
240         </signatures>
241       </verify-result>
242     </gpgme>
243     <BLANKLINE>
244     >>> b = b'Success!\n'
245     >>> signature = '\n'.join([
246     ...     '-----BEGIN PGP SIGNATURE-----',
247     ...     'Version: GnuPG v2.0.17 (GNU/Linux)',
248     ...     '',
249     ...     'iQEcBAEBAgAGBQJPaiw/AAoJEFEa7aZDMrbj93gH/1fQPXLjUTpONJUTmvGoMLNA',
250     ...     'W9ZhjpUL5i6rRqYGUvQ4kTEDuPMxkMrCyFCDHEhSDHufMek6Nso5/HeJn3aqxlgs',
251     ...     'hmNlvAq4FI6JQyFL7eCp/XG9cPx1p42dTI7JAih8FuK21sS4m/H5XP3R/6KXC99D',
252     ...     '39rrXCvvR+yNgKe2dxuJwmKuLteVlcWxiIQwVrYK70GtJHC5BO79G8yGccWoEy9C',
253     ...     '9JkJiyNptqZyFjGBNmMmrCSFZ7ZFA02RB+laRmwuIiozw4TJYEksxPrgZMbbcFzx',
254     ...     'zs3JHyV23+Fz1ftalvwskHE7tJkX9Ub8iBMNZ/KxJXXdPdpuMdEYVjoUehkQBQE=',
255     ...     '=rRBP',
256     ...     '-----END PGP SIGNATURE-----',
257     ...     '',
258     ...     ]).encode('us-ascii')
259     >>> output,verified,result = verify_bytes(b, signature=signature)
260     >>> output
261     b'Success!\n'
262     >>> verified
263     False
264     >>> print(str(result, 'utf-8').replace('\x00', ''))
265     ... # doctest: +REPORT_UDIFF
266     <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
267     <gpgme>
268       <verify-result>
269         <signatures>
270           <signature>
271             <summary value="0x0" />
272             <fpr>B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3</fpr>
273             <status value="0x0">Success &lt;Unspecified source&gt;</status>
274             <timestamp unix="1332358207i" />
275             <exp-timestamp unix="0i" />
276             <wrong-key-usage value="0x0" />
277             <pka-trust value="0x0" />
278             <chain-model value="0x0" />
279             <validity value="0x0" />
280             <validity-reason value="0x0">Success &lt;Unspecified source&gt;</validity-reason>
281             <pubkey-algo value="0x1">RSA</pubkey-algo>
282             <hash-algo value="0x2">SHA1</hash-algo>
283           </signature>
284         </signatures>
285       </verify-result>
286     </gpgme>
287     <BLANKLINE>
288     """
289     input_read,input_write = _os.pipe()
290     pass_fds = [input_read]
291     if signature:
292         message_read,message_write = _os.pipe()
293         output_read = -1
294         pass_fds.append(message_read)
295     else:
296         message_write = -1
297         output_read,output_write = _os.pipe()
298         pass_fds.append(output_write)
299     client,socket = get_client(pass_fds=pass_fds)
300     _os.close(input_read)
301     if signature:
302         _os.close(message_read)
303     else:
304         _os.close(output_write)
305     verified = result = None
306     try:
307         hello(client)
308         client.make_request(
309             _common.Request('INPUT', 'FD={}'.format(input_read)))
310         if signature:
311             client.make_request(
312                 _common.Request('MESSAGE', 'FD={}'.format(message_read)))
313         else:
314             client.make_request(
315                 _common.Request('OUTPUT', 'FD={}'.format(output_write)))
316         if signature:
317             _write(input_write, signature)
318             _os.close(input_write)
319             input_write = -1
320             _write(message_write, data)
321             _os.close(message_write)
322             message_write = -1
323         else:
324             _write(input_write, data)
325             _os.close(input_write)
326             input_write = -1
327         client.make_request(_common.Request('VERIFY'))
328         if signature:
329             plain = data
330         else:
331             plain = _read(output_read)
332         rs,result = client.make_request(_common.Request('RESULT'))
333         verified = True
334         for line in result.splitlines():
335             if b'<status ' in line and b'Success' not in line:
336                 verified = False
337             elif b'<pka-trust' in line and b'0x2' not in line:
338                 verified = False
339     finally:
340         disconnect(client, socket)
341         for fd in [input_write, message_write, output_read]:
342             if fd >= 0:
343                 _os.close(fd)
344     return (plain, verified, result)