Adjust to use gpgme-tool (from the gpgme package).
[pgp-mime.git] / pgp_mime / crypt.py
1 # Copyright
2
3 import codecs as _codecs
4 import logging as _logging
5 import os as _os
6 import os.path as _os_path
7 import socket as _socket
8 import subprocess as _subprocess
9
10 from pyassuan import client as _client
11 from pyassuan import common as _common
12
13
14 #class GPGMEClient(_client.AssuanClient):
15 #    pass
16 #CLIENT = _client.AssuanClient(name='pgp-mime', close_on_disconnect=True)
17 #CLIENT.filename = ...
18
19 def connect(client, filename):
20     filename = _os_path.expanduser(filename)
21     if False:
22         socket = _socket.socket(_socket.AF_UNIX, _socket.SOCK_STREAM)
23         socket.connect(filename)
24         client.input = socket.makefile('rb')
25         client.output = socket.makefile('wb')
26     else:
27         p = _subprocess.Popen(
28             filename, stdin=_subprocess.PIPE, stdout=_subprocess.PIPE,
29             close_fds=True)
30         client.input = p.stdout
31         client.output = p.stdin
32         socket = p
33     client.connect()
34     return socket
35
36 def get_client():
37     client = _client.AssuanClient(name='pgp-mime', close_on_disconnect=True)
38     client.logger.setLevel(_logging.DEBUG)
39     socket = connect(client, '~/src/gpgme/build/src/gpgme-tool')
40     return (client, socket)
41
42 def disconnect(client, socket):
43     client.make_request(_common.Request('BYE'))
44     client.disconnect()
45     if False:
46         socket.shutdown(_socket.SHUT_RDWR)
47         socket.close()
48     else:
49         status = socket.wait()
50         assert status == 0, status
51
52 def hello(client):
53     responses,data = client.get_responses()  # get initial 'OK' from server
54     client.make_request(_common.Request('ARMOR', 'true'))
55
56 def sign_and_encrypt_bytes(data, signers=None, recipients=None,
57                            always_trust=False, mode='detach'):
58     r"""Sign ``data`` with ``signers`` and encrypt to ``recipients``.
59
60     Just sign:
61
62     >>> print(sign_and_encrypt_bytes(
63     ...     bytes(b'Hello'), signers=['pgp-mime@invalid.com']))
64     ... # doctest: +ELLIPSIS
65     b'-----BEGIN PGP SIGNATURE-----\n...-----END PGP SIGNATURE-----\n'
66
67     Just encrypt:
68
69     >>> sign_and_encrypt_bytes(
70     ...     bytes(b'Hello'), recipients=['pgp-mime@invalid.com'],
71     ...     always_trust=True)
72     ... # doctest: +ELLIPSIS
73     b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
74
75     Sign and encrypt:
76
77     >>> sign_and_encrypt_bytes(
78     ...     bytes(b'Hello'), signers=['pgp-mime@invalid.com'],
79     ...     recipients=['pgp-mime@invalid.com'], always_trust=True)
80     ... # doctest: +ELLIPSIS
81     b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
82     """
83     client,socket = get_client()
84     try:
85         hello(client)
86         if signers:
87             for signer in signers:
88                 client.make_request(_common.Request('SIGNER', signer))
89         if recipients:
90             for recipient in recipients:
91                 client.make_request(_common.Request('RECIPIENT', recipient))
92         with open('/tmp/input', 'wb') as f:
93             f.write(data)
94         client.make_request(_common.Request('INPUT', 'FILE=/tmp/input'))
95         client.make_request(_common.Request('OUTPUT', 'FILE=/tmp/output'))
96         parameters = []
97         if signers and recipients:
98             command = 'SIGN_ENCRYPT'
99         elif signers:
100             command = 'SIGN'
101             parameters.append('--{}'.format(mode))
102         elif recipients:
103             command = 'ENCRYPT'
104         else:
105             raise ValueError('must specify at least one signer or recipient')
106         if always_trust:
107             parameters.append('--always-trust')
108         client.make_request(
109             _common.Request(command, ' '.join(parameters)))
110         with open('/tmp/output', 'rb') as f:
111             d = f.read()
112     finally:
113         disconnect(client, socket)
114         try:
115             _os.remove('/tmp/input')
116             _os.remove('/tmp/output')
117         except OSError:
118             pass
119     return d
120
121 def decrypt_bytes(data):
122     r"""Decrypt ``data``.
123
124     >>> b = '\n'.join([
125     ...     '-----BEGIN PGP MESSAGE-----',
126     ...     'Version: GnuPG v2.0.17 (GNU/Linux)',
127     ...     '',
128     ...     'hQEMA1Ea7aZDMrbjAQf/TAqLjksZSJxSqkBxYT5gtLQoXY6isvRZg2apjs7CW0y2',
129     ...     'tFK/ptnVYAq2OtWQFhbiJXj8hmwJyyFfb3lghpeu4ihO52JgkkwOpmJb6dxjOi83',
130     ...     'qDwaGOogEPH38BNLuwdrMCW0jmNROwvS796PtqSGUaJTuIiKUB8lETwPwIHrDc11',
131     ...     'N3RWStE5uShNkXXQXplUoeCKf3N4XguXym+GQCqJQzlEMrkkDdr4l7mzvt3Nf8EA',
132     ...     'SgSak086tUoo9x8IN5PJCuOJkcXcjQzFcpqOsA7dyZKO8NeQUZv2JvlZuorckNvN',
133     ...     'xx3PwW0a8VeJgTQrh64ZK/d3F3gNHUTzXkq/UIn25tJFAcmSUwxtsBal7p8zAeCV',
134     ...     '8zefsHRQ5Y03IBeYBcVJBhDS9XfvwLQTJiGGstPCxzKTwSUT1MzV5t5twG/STDCc',
135     ...     'uxW3wSdo',
136     ...     '=bZI+',
137     ...     '-----END PGP MESSAGE-----',
138     ...     ''
139     ...     ]).encode('us-ascii')
140     >>> decrypt_bytes(b)
141     b'Success!\n'
142     """
143     client,socket = get_client()
144     try:
145         hello(client)
146         with open('/tmp/input', 'wb') as f:
147             f.write(data)
148         client.make_request(_common.Request('INPUT', 'FILE=/tmp/input'))
149         client.make_request(_common.Request('OUTPUT', 'FILE=/tmp/output'))
150         client.make_request(_common.Request('DECRYPT'))
151         with open('/tmp/output', 'rb') as f:
152             d = f.read()
153     finally:
154         disconnect(client, socket)
155         try:
156             _os.remove('/tmp/input')
157             _os.remove('/tmp/output')
158         except OSError:
159             pass
160     return d
161
162 def verify_bytes(data, signature=None, always_trust=False):
163     r"""Verify a signature on ``data``, possibly decrypting first.
164
165     These tests assume you didn't trust the distributed test key.
166
167     >>> b = '\n'.join([
168     ...     '-----BEGIN PGP MESSAGE-----',
169     ...     'Version: GnuPG v2.0.17 (GNU/Linux)',
170     ...     '',
171     ...     'hQEMA1Ea7aZDMrbjAQf/YM1SeFzNGz0DnUynaEyhfGCvcqmjtbN1PtZMpT7VaQLN',
172     ...     'a+c0faskr79Atz0+2IBR7CDOlcETrRtH2EnrWukbRIDtmffNFGuhMRTNfnQ15OIN',
173     ...     'qrmt2P5gXznsgnm2XjzTK7S/Cc3Aq+zjaDrDt7bIedEdz+EyNgaKuL/lB9cAB8xL',
174     ...     'YYp/yn55Myjair2idgzsa7w/QXdE3RhpyRLqR2Jgz4P1I1xOgUYnylbpIZL9FOKN',
175     ...     'NR3RQhkGdANBku8otfthb5ZUGsNMV45ct4V8PE+xChjFb9gcwpaf1hhoIF/sYHD5',
176     ...     'Bkf+v/J8F40KGYY16b0DjQIUlnra9y7q9jj0h2bvc9LAtgHtVUso133LLcVYl7RP',
177     ...     'Vjyz9Ps366BtIdPlAL4CoF5hEcMKS5J3h1vRlyAKN4uHENl5vKvoxn7ID3JhhWQc',
178     ...     '6QrPGis64zi3OnYor34HPh/KNJvkgOQkekmtYuTxnkiONA4lhMDJgeaVZ9WZq+GV',
179     ...     'MaCvCFGNYU2TV4V8wMlnUbF8d5bDQ83g8MxIVKdDcnBzzYLZha+qmz4Spry9iB53',
180     ...     'Sg/sM5H8gWWSl7Oj1lxVg7o7IscpQfVt6zL6jD2VjL3L3Hu7WEXIrcGZtvrP4d+C',
181     ...     'TGYWiGlh5B2UCFk2bVctfw8W/QfaVvJYD4Rfqta2V2p14KIJLFRSGa1g26W4ixrH',
182     ...     'XKxgaA3AIfJ+6c5RoisRLuYCxvQi91wkE9hAXR+inXK4Hq4SmiHoeITZFhHP3hh3',
183     ...     'rbpp8mopiMNxWqCbuqgILP6pShn4oPclu9aR8uJ1ziDxISTGYC71mvLUERUjFn2L',
184     ...     'fu6C0+TCC9RmeyL+eNdM6cjs1G7YR6yX',
185     ...     '=phHd',
186     ...     '-----END PGP MESSAGE-----',
187     ...     '',
188     ...     ]).encode('us-ascii')
189     >>> output,verified,result = verify_bytes(b)
190     >>> output
191     b'Success!\n'
192     >>> verified
193     False
194     >>> print(str(result, 'utf-8').replace('\x00', ''))
195     ... # doctest: +REPORT_UDIFF
196     <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
197     <gpgme>
198       <verify-result>
199         <signatures>
200           <signature>
201             <summary value="0x0" />
202             <fpr>B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3</fpr>
203             <status value="0x0">Success &lt;Unspecified source&gt;</status>
204             <timestamp unix="1332357237i" />
205             <exp-timestamp unix="0i" />
206             <wrong-key-usage value="0x0" />
207             <pka-trust value="0x0" />
208             <chain-model value="0x0" />
209             <validity value="0x0" />
210             <validity-reason value="0x0">Success &lt;Unspecified source&gt;</validity-reason>
211             <pubkey-algo value="0x1">RSA</pubkey-algo>
212             <hash-algo value="0x8">SHA256</hash-algo>
213           </signature>
214         </signatures>
215       </verify-result>
216     </gpgme>
217     <BLANKLINE>
218     >>> b = b'Success!\n'
219     >>> signature = '\n'.join([
220     ...     '-----BEGIN PGP SIGNATURE-----',
221     ...     'Version: GnuPG v2.0.17 (GNU/Linux)',
222     ...     '',
223     ...     'iQEcBAEBAgAGBQJPaiw/AAoJEFEa7aZDMrbj93gH/1fQPXLjUTpONJUTmvGoMLNA',
224     ...     'W9ZhjpUL5i6rRqYGUvQ4kTEDuPMxkMrCyFCDHEhSDHufMek6Nso5/HeJn3aqxlgs',
225     ...     'hmNlvAq4FI6JQyFL7eCp/XG9cPx1p42dTI7JAih8FuK21sS4m/H5XP3R/6KXC99D',
226     ...     '39rrXCvvR+yNgKe2dxuJwmKuLteVlcWxiIQwVrYK70GtJHC5BO79G8yGccWoEy9C',
227     ...     '9JkJiyNptqZyFjGBNmMmrCSFZ7ZFA02RB+laRmwuIiozw4TJYEksxPrgZMbbcFzx',
228     ...     'zs3JHyV23+Fz1ftalvwskHE7tJkX9Ub8iBMNZ/KxJXXdPdpuMdEYVjoUehkQBQE=',
229     ...     '=rRBP',
230     ...     '-----END PGP SIGNATURE-----',
231     ...     '',
232     ...     ]).encode('us-ascii')
233     >>> output,verified,result = verify_bytes(b, signature=signature)
234     >>> output
235     b'Success!\n'
236     >>> verified
237     False
238     >>> print(str(result, 'utf-8').replace('\x00', ''))
239     ... # doctest: +REPORT_UDIFF
240     <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
241     <gpgme>
242       <verify-result>
243         <signatures>
244           <signature>
245             <summary value="0x0" />
246             <fpr>B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3</fpr>
247             <status value="0x0">Success &lt;Unspecified source&gt;</status>
248             <timestamp unix="1332358207i" />
249             <exp-timestamp unix="0i" />
250             <wrong-key-usage value="0x0" />
251             <pka-trust value="0x0" />
252             <chain-model value="0x0" />
253             <validity value="0x0" />
254             <validity-reason value="0x0">Success &lt;Unspecified source&gt;</validity-reason>
255             <pubkey-algo value="0x1">RSA</pubkey-algo>
256             <hash-algo value="0x2">SHA1</hash-algo>
257           </signature>
258         </signatures>
259       </verify-result>
260     </gpgme>
261     <BLANKLINE>
262     """
263     client,socket = get_client()
264     verified = result = None
265     try:
266         hello(client)
267         if signature:
268             input_ = signature
269             message = data
270         else:
271             input_ = data
272             message = None
273         with open('/tmp/input', 'wb') as f:
274             f.write(input_)
275         client.make_request(_common.Request('INPUT', 'FILE=/tmp/input'))
276         if message:
277             with open('/tmp/message', 'wb') as f:
278                 f.write(message)
279             client.make_request(
280                 _common.Request('MESSAGE', 'FILE=/tmp/message'))
281         if not signature:
282             client.make_request(_common.Request('OUTPUT', 'FILE=/tmp/output'))
283         client.make_request(_common.Request('VERIFY'))
284         rs,result = client.make_request(_common.Request('RESULT'))
285         verified = True
286         for line in result.splitlines():
287             if b'<status ' in line and b'Success' not in line:
288                 verified = False
289             elif b'<pka-trust' in line and b'0x2' not in line:
290                 verified = False
291         if signature:
292             plain = data
293         else:
294             with open('/tmp/output', 'rb') as f:
295                 plain = f.read()
296     finally:
297         disconnect(client, socket)
298         try:
299             pass
300             _os.remove('/tmp/input')
301             _os.remove('/tmp/output')
302             _os.remove('/tmp/message')
303         except OSError:
304             pass
305     return (plain, verified, result)