crypt: Use a sublogger of pgp_mime.LOG not pyassuan.LOG for clients.
[pgp-mime.git] / pgp_mime / crypt.py
1 # Copyright (C) 2012 W. Trevor King <wking@tremily.us>
2 #
3 # This file is part of pgp-mime.
4 #
5 # pgp-mime is free software: you can redistribute it and/or modify it under the
6 # terms of the GNU General Public License as published by the Free Software
7 # Foundation, either version 3 of the License, or (at your option) any later
8 # version.
9 #
10 # pgp-mime is distributed in the hope that it will be useful, but WITHOUT ANY
11 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License along with
15 # pgp-mime.  If not, see <http://www.gnu.org/licenses/>.
16
17 import codecs as _codecs
18 import logging as _logging
19 import os as _os
20 import os.path as _os_path
21 from _socket import socket as _Socket
22 import socket as _socket
23 import subprocess as _subprocess
24
25 from pyassuan import client as _client
26 from pyassuan import common as _common
27
28 from . import LOG as _LOG
29
30
31 def connect(client, filename, **kwargs):
32     filename = _os_path.expanduser(filename)
33     if False:
34         socket = _socket.socket(_socket.AF_UNIX, _socket.SOCK_STREAM)
35         socket.connect(filename)
36         client.input = socket.makefile('rb')
37         client.output = socket.makefile('wb')
38     else:
39         p = _subprocess.Popen(
40             filename, stdin=_subprocess.PIPE, stdout=_subprocess.PIPE,
41             close_fds=True, **kwargs)
42         client.input = p.stdout
43         client.output = p.stdin
44         socket = p
45     client.connect()
46     return socket
47
48 def get_client(**kwargs):
49     logger = _logging.getLogger('{}.{}'.format(_LOG.name, 'pyassuan'))
50     client = _client.AssuanClient(
51         name='pgp-mime', logger=logger, use_sublogger=False,
52         close_on_disconnect=True)
53     socket = connect(client, '~/src/gpgme/build/src/gpgme-tool', **kwargs)
54     #socket = connect(client, '~/.assuan/S.gpgme-tool', **kwargs)
55     return (client, socket)
56
57 def disconnect(client, socket):
58     client.make_request(_common.Request('BYE'))
59     client.disconnect()
60     if isinstance(socket, _Socket):
61         socket.shutdown(_socket.SHUT_RDWR)
62         socket.close()
63     else:
64         status = socket.wait()
65         assert status == 0, status
66
67 def hello(client):
68     responses,data = client.get_responses()  # get initial 'OK' from server
69     client.make_request(_common.Request('ARMOR', 'true'))
70
71 def _read(fd, buffersize=512):
72     d = []
73     while True:
74         try:
75             new = _os.read(fd, buffersize)
76         except Exception as e:
77             _LOG.warn('error while reading: {}'.format(e))
78             break
79         if not new:
80             break
81         d.append(new)
82     return b''.join(d)
83
84 def _write(fd, data):
85     i = 0
86     while i < len(data):
87         i += _os.write(fd, data[i:])
88
89
90 def sign_and_encrypt_bytes(data, signers=None, recipients=None,
91                            always_trust=False, mode='detach',
92                            allow_default_signer=False):
93     r"""Sign ``data`` with ``signers`` and encrypt to ``recipients``.
94
95     Just sign:
96
97     >>> print(sign_and_encrypt_bytes(
98     ...     bytes(b'Hello'), signers=['pgp-mime@invalid.com']))
99     ... # doctest: +ELLIPSIS
100     b'-----BEGIN PGP SIGNATURE-----\n...-----END PGP SIGNATURE-----\n'
101
102     Just encrypt:
103
104     >>> sign_and_encrypt_bytes(
105     ...     bytes(b'Hello'), recipients=['pgp-mime@invalid.com'],
106     ...     always_trust=True)
107     ... # doctest: +ELLIPSIS
108     b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
109
110     Sign and encrypt:
111
112     >>> sign_and_encrypt_bytes(
113     ...     bytes(b'Hello'), signers=['pgp-mime@invalid.com'],
114     ...     recipients=['pgp-mime@invalid.com'], always_trust=True)
115     ... # doctest: +ELLIPSIS
116     b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
117     """
118     input_read,input_write = _os.pipe()
119     output_read,output_write = _os.pipe()
120     client,socket = get_client(pass_fds=(input_read, output_write))
121     _os.close(input_read)
122     _os.close(output_write)
123     try:
124         hello(client)
125         if signers:
126             for signer in signers:
127                 client.make_request(_common.Request('SIGNER', signer))
128         if recipients:
129             for recipient in recipients:
130                 client.make_request(_common.Request('RECIPIENT', recipient))
131         client.make_request(
132             _common.Request('INPUT', 'FD={}'.format(input_read)))
133         client.make_request(
134             _common.Request('OUTPUT', 'FD={}'.format(output_write)))
135         parameters = []
136         if signers or allow_default_signer:
137             if recipients:
138                 command = 'SIGN_ENCRYPT'
139             else:
140                 command = 'SIGN'
141                 parameters.append('--{}'.format(mode))
142         elif recipients:
143             command = 'ENCRYPT'
144         else:
145             raise ValueError('must specify at least one signer or recipient')
146         if always_trust:
147             parameters.append('--always-trust')
148         _write(input_write, data)
149         _os.close(input_write)
150         input_write = -1
151         client.make_request(
152             _common.Request(command, ' '.join(parameters)))
153         d = _read(output_read)
154     finally:
155         disconnect(client, socket)
156         for fd in [input_write, output_read]:
157             if fd >= 0:
158                 _os.close(fd)
159     return d
160
161 def decrypt_bytes(data):
162     r"""Decrypt ``data``.
163
164     >>> b = '\n'.join([
165     ...     '-----BEGIN PGP MESSAGE-----',
166     ...     'Version: GnuPG v2.0.17 (GNU/Linux)',
167     ...     '',
168     ...     'hQEMA1Ea7aZDMrbjAQf/TAqLjksZSJxSqkBxYT5gtLQoXY6isvRZg2apjs7CW0y2',
169     ...     'tFK/ptnVYAq2OtWQFhbiJXj8hmwJyyFfb3lghpeu4ihO52JgkkwOpmJb6dxjOi83',
170     ...     'qDwaGOogEPH38BNLuwdrMCW0jmNROwvS796PtqSGUaJTuIiKUB8lETwPwIHrDc11',
171     ...     'N3RWStE5uShNkXXQXplUoeCKf3N4XguXym+GQCqJQzlEMrkkDdr4l7mzvt3Nf8EA',
172     ...     'SgSak086tUoo9x8IN5PJCuOJkcXcjQzFcpqOsA7dyZKO8NeQUZv2JvlZuorckNvN',
173     ...     'xx3PwW0a8VeJgTQrh64ZK/d3F3gNHUTzXkq/UIn25tJFAcmSUwxtsBal7p8zAeCV',
174     ...     '8zefsHRQ5Y03IBeYBcVJBhDS9XfvwLQTJiGGstPCxzKTwSUT1MzV5t5twG/STDCc',
175     ...     'uxW3wSdo',
176     ...     '=bZI+',
177     ...     '-----END PGP MESSAGE-----',
178     ...     ''
179     ...     ]).encode('us-ascii')
180     >>> decrypt_bytes(b)
181     b'Success!\n'
182     """
183     input_read,input_write = _os.pipe()
184     output_read,output_write = _os.pipe()
185     client,socket = get_client(pass_fds=(input_read, output_write))
186     _os.close(input_read)
187     _os.close(output_write)
188     try:
189         hello(client)
190         client.make_request(
191             _common.Request('INPUT', 'FD={}'.format(input_read)))
192         client.make_request(
193             _common.Request('OUTPUT', 'FD={}'.format(output_write)))
194         _write(input_write, data)
195         _os.close(input_write)
196         input_write = -1
197         client.make_request(_common.Request('DECRYPT'))
198         d = _read(output_read)
199     finally:
200         disconnect(client, socket)
201         for fd in [input_write, output_read]:
202             if fd >= 0:
203                 _os.close(fd)
204     return d
205
206 def verify_bytes(data, signature=None, always_trust=False):
207     r"""Verify a signature on ``data``, possibly decrypting first.
208
209     These tests assume you didn't trust the distributed test key.
210
211     >>> b = '\n'.join([
212     ...     '-----BEGIN PGP MESSAGE-----',
213     ...     'Version: GnuPG v2.0.17 (GNU/Linux)',
214     ...     '',
215     ...     'hQEMA1Ea7aZDMrbjAQf/YM1SeFzNGz0DnUynaEyhfGCvcqmjtbN1PtZMpT7VaQLN',
216     ...     'a+c0faskr79Atz0+2IBR7CDOlcETrRtH2EnrWukbRIDtmffNFGuhMRTNfnQ15OIN',
217     ...     'qrmt2P5gXznsgnm2XjzTK7S/Cc3Aq+zjaDrDt7bIedEdz+EyNgaKuL/lB9cAB8xL',
218     ...     'YYp/yn55Myjair2idgzsa7w/QXdE3RhpyRLqR2Jgz4P1I1xOgUYnylbpIZL9FOKN',
219     ...     'NR3RQhkGdANBku8otfthb5ZUGsNMV45ct4V8PE+xChjFb9gcwpaf1hhoIF/sYHD5',
220     ...     'Bkf+v/J8F40KGYY16b0DjQIUlnra9y7q9jj0h2bvc9LAtgHtVUso133LLcVYl7RP',
221     ...     'Vjyz9Ps366BtIdPlAL4CoF5hEcMKS5J3h1vRlyAKN4uHENl5vKvoxn7ID3JhhWQc',
222     ...     '6QrPGis64zi3OnYor34HPh/KNJvkgOQkekmtYuTxnkiONA4lhMDJgeaVZ9WZq+GV',
223     ...     'MaCvCFGNYU2TV4V8wMlnUbF8d5bDQ83g8MxIVKdDcnBzzYLZha+qmz4Spry9iB53',
224     ...     'Sg/sM5H8gWWSl7Oj1lxVg7o7IscpQfVt6zL6jD2VjL3L3Hu7WEXIrcGZtvrP4d+C',
225     ...     'TGYWiGlh5B2UCFk2bVctfw8W/QfaVvJYD4Rfqta2V2p14KIJLFRSGa1g26W4ixrH',
226     ...     'XKxgaA3AIfJ+6c5RoisRLuYCxvQi91wkE9hAXR+inXK4Hq4SmiHoeITZFhHP3hh3',
227     ...     'rbpp8mopiMNxWqCbuqgILP6pShn4oPclu9aR8uJ1ziDxISTGYC71mvLUERUjFn2L',
228     ...     'fu6C0+TCC9RmeyL+eNdM6cjs1G7YR6yX',
229     ...     '=phHd',
230     ...     '-----END PGP MESSAGE-----',
231     ...     '',
232     ...     ]).encode('us-ascii')
233     >>> output,verified,result = verify_bytes(b)
234     >>> output
235     b'Success!\n'
236     >>> verified
237     False
238     >>> print(str(result, 'utf-8').replace('\x00', ''))
239     ... # doctest: +REPORT_UDIFF
240     <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
241     <gpgme>
242       <verify-result>
243         <signatures>
244           <signature>
245             <summary value="0x0" />
246             <fpr>B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3</fpr>
247             <status value="0x0">Success &lt;Unspecified source&gt;</status>
248             <timestamp unix="1332357237i" />
249             <exp-timestamp unix="0i" />
250             <wrong-key-usage value="0x0" />
251             <pka-trust value="0x0" />
252             <chain-model value="0x0" />
253             <validity value="0x0" />
254             <validity-reason value="0x0">Success &lt;Unspecified source&gt;</validity-reason>
255             <pubkey-algo value="0x1">RSA</pubkey-algo>
256             <hash-algo value="0x8">SHA256</hash-algo>
257           </signature>
258         </signatures>
259       </verify-result>
260     </gpgme>
261     <BLANKLINE>
262     >>> b = b'Success!\n'
263     >>> signature = '\n'.join([
264     ...     '-----BEGIN PGP SIGNATURE-----',
265     ...     'Version: GnuPG v2.0.17 (GNU/Linux)',
266     ...     '',
267     ...     'iQEcBAEBAgAGBQJPaiw/AAoJEFEa7aZDMrbj93gH/1fQPXLjUTpONJUTmvGoMLNA',
268     ...     'W9ZhjpUL5i6rRqYGUvQ4kTEDuPMxkMrCyFCDHEhSDHufMek6Nso5/HeJn3aqxlgs',
269     ...     'hmNlvAq4FI6JQyFL7eCp/XG9cPx1p42dTI7JAih8FuK21sS4m/H5XP3R/6KXC99D',
270     ...     '39rrXCvvR+yNgKe2dxuJwmKuLteVlcWxiIQwVrYK70GtJHC5BO79G8yGccWoEy9C',
271     ...     '9JkJiyNptqZyFjGBNmMmrCSFZ7ZFA02RB+laRmwuIiozw4TJYEksxPrgZMbbcFzx',
272     ...     'zs3JHyV23+Fz1ftalvwskHE7tJkX9Ub8iBMNZ/KxJXXdPdpuMdEYVjoUehkQBQE=',
273     ...     '=rRBP',
274     ...     '-----END PGP SIGNATURE-----',
275     ...     '',
276     ...     ]).encode('us-ascii')
277     >>> output,verified,result = verify_bytes(b, signature=signature)
278     >>> output
279     b'Success!\n'
280     >>> verified
281     False
282     >>> print(str(result, 'utf-8').replace('\x00', ''))
283     ... # doctest: +REPORT_UDIFF
284     <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
285     <gpgme>
286       <verify-result>
287         <signatures>
288           <signature>
289             <summary value="0x0" />
290             <fpr>B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3</fpr>
291             <status value="0x0">Success &lt;Unspecified source&gt;</status>
292             <timestamp unix="1332358207i" />
293             <exp-timestamp unix="0i" />
294             <wrong-key-usage value="0x0" />
295             <pka-trust value="0x0" />
296             <chain-model value="0x0" />
297             <validity value="0x0" />
298             <validity-reason value="0x0">Success &lt;Unspecified source&gt;</validity-reason>
299             <pubkey-algo value="0x1">RSA</pubkey-algo>
300             <hash-algo value="0x2">SHA1</hash-algo>
301           </signature>
302         </signatures>
303       </verify-result>
304     </gpgme>
305     <BLANKLINE>
306     """
307     input_read,input_write = _os.pipe()
308     pass_fds = [input_read]
309     if signature:
310         message_read,message_write = _os.pipe()
311         output_read = -1
312         pass_fds.append(message_read)
313     else:
314         message_write = -1
315         output_read,output_write = _os.pipe()
316         pass_fds.append(output_write)
317     client,socket = get_client(pass_fds=pass_fds)
318     _os.close(input_read)
319     if signature:
320         _os.close(message_read)
321     else:
322         _os.close(output_write)
323     verified = result = None
324     try:
325         hello(client)
326         client.make_request(
327             _common.Request('INPUT', 'FD={}'.format(input_read)))
328         if signature:
329             client.make_request(
330                 _common.Request('MESSAGE', 'FD={}'.format(message_read)))
331         else:
332             client.make_request(
333                 _common.Request('OUTPUT', 'FD={}'.format(output_write)))
334         if signature:
335             _write(input_write, signature)
336             _os.close(input_write)
337             input_write = -1
338             _write(message_write, data)
339             _os.close(message_write)
340             message_write = -1
341         else:
342             _write(input_write, data)
343             _os.close(input_write)
344             input_write = -1
345         client.make_request(_common.Request('VERIFY'))
346         if signature:
347             plain = data
348         else:
349             plain = _read(output_read)
350         rs,result = client.make_request(_common.Request('RESULT'))
351         verified = True
352         for line in result.splitlines():
353             if b'<status ' in line and b'Success' not in line:
354                 verified = False
355             elif b'<pka-trust' in line and b'0x2' not in line:
356                 verified = False
357     finally:
358         disconnect(client, socket)
359         for fd in [input_write, message_write, output_read]:
360             if fd >= 0:
361                 _os.close(fd)
362     return (plain, verified, result)