1 # Copyright (C) 2012 W. Trevor King <wking@tremily.us>
3 # This file is part of pgp-mime.
5 # pgp-mime is free software: you can redistribute it and/or modify it under the
6 # terms of the GNU General Public License as published by the Free Software
7 # Foundation, either version 3 of the License, or (at your option) any later
10 # pgp-mime is distributed in the hope that it will be useful, but WITHOUT ANY
11 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12 # A PARTICULAR PURPOSE. See the GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License along with
15 # pgp-mime. If not, see <http://www.gnu.org/licenses/>.
17 import codecs as _codecs
18 import logging as _logging
20 import os.path as _os_path
21 from _socket import socket as _Socket
22 import socket as _socket
23 import subprocess as _subprocess
25 from pyassuan import client as _client
26 from pyassuan import common as _common
28 from . import LOG as _LOG
31 def connect(client, filename, **kwargs):
32 filename = _os_path.expanduser(filename)
34 socket = _socket.socket(_socket.AF_UNIX, _socket.SOCK_STREAM)
35 socket.connect(filename)
36 client.input = socket.makefile('rb')
37 client.output = socket.makefile('wb')
39 p = _subprocess.Popen(
40 filename, stdin=_subprocess.PIPE, stdout=_subprocess.PIPE,
41 close_fds=True, **kwargs)
42 client.input = p.stdout
43 client.output = p.stdin
48 def get_client(**kwargs):
49 logger = _logging.getLogger('{}.{}'.format(_LOG.name, 'pyassuan'))
50 client = _client.AssuanClient(
51 name='pgp-mime', logger=logger, use_sublogger=False,
52 close_on_disconnect=True)
53 socket = connect(client, '~/src/gpgme/build/src/gpgme-tool', **kwargs)
54 #socket = connect(client, '~/.assuan/S.gpgme-tool', **kwargs)
55 return (client, socket)
57 def disconnect(client, socket):
58 client.make_request(_common.Request('BYE'))
60 if isinstance(socket, _Socket):
61 socket.shutdown(_socket.SHUT_RDWR)
64 status = socket.wait()
65 assert status == 0, status
68 responses,data = client.get_responses() # get initial 'OK' from server
69 client.make_request(_common.Request('ARMOR', 'true'))
71 def _read(fd, buffersize=512):
75 new = _os.read(fd, buffersize)
76 except Exception as e:
77 _LOG.warn('error while reading: {}'.format(e))
87 i += _os.write(fd, data[i:])
90 def sign_and_encrypt_bytes(data, signers=None, recipients=None,
91 always_trust=False, mode='detach',
92 allow_default_signer=False):
93 r"""Sign ``data`` with ``signers`` and encrypt to ``recipients``.
97 >>> print(sign_and_encrypt_bytes(
98 ... bytes(b'Hello'), signers=['pgp-mime@invalid.com']))
99 ... # doctest: +ELLIPSIS
100 b'-----BEGIN PGP SIGNATURE-----\n...-----END PGP SIGNATURE-----\n'
104 >>> sign_and_encrypt_bytes(
105 ... bytes(b'Hello'), recipients=['pgp-mime@invalid.com'],
106 ... always_trust=True)
107 ... # doctest: +ELLIPSIS
108 b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
112 >>> sign_and_encrypt_bytes(
113 ... bytes(b'Hello'), signers=['pgp-mime@invalid.com'],
114 ... recipients=['pgp-mime@invalid.com'], always_trust=True)
115 ... # doctest: +ELLIPSIS
116 b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
118 input_read,input_write = _os.pipe()
119 output_read,output_write = _os.pipe()
120 client,socket = get_client(pass_fds=(input_read, output_write))
121 _os.close(input_read)
122 _os.close(output_write)
126 for signer in signers:
127 client.make_request(_common.Request('SIGNER', signer))
129 for recipient in recipients:
130 client.make_request(_common.Request('RECIPIENT', recipient))
132 _common.Request('INPUT', 'FD={}'.format(input_read)))
134 _common.Request('OUTPUT', 'FD={}'.format(output_write)))
136 if signers or allow_default_signer:
138 command = 'SIGN_ENCRYPT'
141 parameters.append('--{}'.format(mode))
145 raise ValueError('must specify at least one signer or recipient')
147 parameters.append('--always-trust')
148 _write(input_write, data)
149 _os.close(input_write)
152 _common.Request(command, ' '.join(parameters)))
153 d = _read(output_read)
155 disconnect(client, socket)
156 for fd in [input_write, output_read]:
161 def decrypt_bytes(data):
162 r"""Decrypt ``data``.
165 ... '-----BEGIN PGP MESSAGE-----',
166 ... 'Version: GnuPG v2.0.17 (GNU/Linux)',
168 ... 'hQEMA1Ea7aZDMrbjAQf/TAqLjksZSJxSqkBxYT5gtLQoXY6isvRZg2apjs7CW0y2',
169 ... 'tFK/ptnVYAq2OtWQFhbiJXj8hmwJyyFfb3lghpeu4ihO52JgkkwOpmJb6dxjOi83',
170 ... 'qDwaGOogEPH38BNLuwdrMCW0jmNROwvS796PtqSGUaJTuIiKUB8lETwPwIHrDc11',
171 ... 'N3RWStE5uShNkXXQXplUoeCKf3N4XguXym+GQCqJQzlEMrkkDdr4l7mzvt3Nf8EA',
172 ... 'SgSak086tUoo9x8IN5PJCuOJkcXcjQzFcpqOsA7dyZKO8NeQUZv2JvlZuorckNvN',
173 ... 'xx3PwW0a8VeJgTQrh64ZK/d3F3gNHUTzXkq/UIn25tJFAcmSUwxtsBal7p8zAeCV',
174 ... '8zefsHRQ5Y03IBeYBcVJBhDS9XfvwLQTJiGGstPCxzKTwSUT1MzV5t5twG/STDCc',
177 ... '-----END PGP MESSAGE-----',
179 ... ]).encode('us-ascii')
183 input_read,input_write = _os.pipe()
184 output_read,output_write = _os.pipe()
185 client,socket = get_client(pass_fds=(input_read, output_write))
186 _os.close(input_read)
187 _os.close(output_write)
191 _common.Request('INPUT', 'FD={}'.format(input_read)))
193 _common.Request('OUTPUT', 'FD={}'.format(output_write)))
194 _write(input_write, data)
195 _os.close(input_write)
197 client.make_request(_common.Request('DECRYPT'))
198 d = _read(output_read)
200 disconnect(client, socket)
201 for fd in [input_write, output_read]:
206 def verify_bytes(data, signature=None, always_trust=False):
207 r"""Verify a signature on ``data``, possibly decrypting first.
209 These tests assume you didn't trust the distributed test key.
212 ... '-----BEGIN PGP MESSAGE-----',
213 ... 'Version: GnuPG v2.0.17 (GNU/Linux)',
215 ... 'hQEMA1Ea7aZDMrbjAQf/YM1SeFzNGz0DnUynaEyhfGCvcqmjtbN1PtZMpT7VaQLN',
216 ... 'a+c0faskr79Atz0+2IBR7CDOlcETrRtH2EnrWukbRIDtmffNFGuhMRTNfnQ15OIN',
217 ... 'qrmt2P5gXznsgnm2XjzTK7S/Cc3Aq+zjaDrDt7bIedEdz+EyNgaKuL/lB9cAB8xL',
218 ... 'YYp/yn55Myjair2idgzsa7w/QXdE3RhpyRLqR2Jgz4P1I1xOgUYnylbpIZL9FOKN',
219 ... 'NR3RQhkGdANBku8otfthb5ZUGsNMV45ct4V8PE+xChjFb9gcwpaf1hhoIF/sYHD5',
220 ... 'Bkf+v/J8F40KGYY16b0DjQIUlnra9y7q9jj0h2bvc9LAtgHtVUso133LLcVYl7RP',
221 ... 'Vjyz9Ps366BtIdPlAL4CoF5hEcMKS5J3h1vRlyAKN4uHENl5vKvoxn7ID3JhhWQc',
222 ... '6QrPGis64zi3OnYor34HPh/KNJvkgOQkekmtYuTxnkiONA4lhMDJgeaVZ9WZq+GV',
223 ... 'MaCvCFGNYU2TV4V8wMlnUbF8d5bDQ83g8MxIVKdDcnBzzYLZha+qmz4Spry9iB53',
224 ... 'Sg/sM5H8gWWSl7Oj1lxVg7o7IscpQfVt6zL6jD2VjL3L3Hu7WEXIrcGZtvrP4d+C',
225 ... 'TGYWiGlh5B2UCFk2bVctfw8W/QfaVvJYD4Rfqta2V2p14KIJLFRSGa1g26W4ixrH',
226 ... 'XKxgaA3AIfJ+6c5RoisRLuYCxvQi91wkE9hAXR+inXK4Hq4SmiHoeITZFhHP3hh3',
227 ... 'rbpp8mopiMNxWqCbuqgILP6pShn4oPclu9aR8uJ1ziDxISTGYC71mvLUERUjFn2L',
228 ... 'fu6C0+TCC9RmeyL+eNdM6cjs1G7YR6yX',
230 ... '-----END PGP MESSAGE-----',
232 ... ]).encode('us-ascii')
233 >>> output,verified,result = verify_bytes(b)
238 >>> print(str(result, 'utf-8').replace('\x00', ''))
239 ... # doctest: +REPORT_UDIFF
240 <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
245 <summary value="0x0" />
246 <fpr>B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3</fpr>
247 <status value="0x0">Success <Unspecified source></status>
248 <timestamp unix="1332357237i" />
249 <exp-timestamp unix="0i" />
250 <wrong-key-usage value="0x0" />
251 <pka-trust value="0x0" />
252 <chain-model value="0x0" />
253 <validity value="0x0" />
254 <validity-reason value="0x0">Success <Unspecified source></validity-reason>
255 <pubkey-algo value="0x1">RSA</pubkey-algo>
256 <hash-algo value="0x8">SHA256</hash-algo>
262 >>> b = b'Success!\n'
263 >>> signature = '\n'.join([
264 ... '-----BEGIN PGP SIGNATURE-----',
265 ... 'Version: GnuPG v2.0.17 (GNU/Linux)',
267 ... 'iQEcBAEBAgAGBQJPaiw/AAoJEFEa7aZDMrbj93gH/1fQPXLjUTpONJUTmvGoMLNA',
268 ... 'W9ZhjpUL5i6rRqYGUvQ4kTEDuPMxkMrCyFCDHEhSDHufMek6Nso5/HeJn3aqxlgs',
269 ... 'hmNlvAq4FI6JQyFL7eCp/XG9cPx1p42dTI7JAih8FuK21sS4m/H5XP3R/6KXC99D',
270 ... '39rrXCvvR+yNgKe2dxuJwmKuLteVlcWxiIQwVrYK70GtJHC5BO79G8yGccWoEy9C',
271 ... '9JkJiyNptqZyFjGBNmMmrCSFZ7ZFA02RB+laRmwuIiozw4TJYEksxPrgZMbbcFzx',
272 ... 'zs3JHyV23+Fz1ftalvwskHE7tJkX9Ub8iBMNZ/KxJXXdPdpuMdEYVjoUehkQBQE=',
274 ... '-----END PGP SIGNATURE-----',
276 ... ]).encode('us-ascii')
277 >>> output,verified,result = verify_bytes(b, signature=signature)
282 >>> print(str(result, 'utf-8').replace('\x00', ''))
283 ... # doctest: +REPORT_UDIFF
284 <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
289 <summary value="0x0" />
290 <fpr>B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3</fpr>
291 <status value="0x0">Success <Unspecified source></status>
292 <timestamp unix="1332358207i" />
293 <exp-timestamp unix="0i" />
294 <wrong-key-usage value="0x0" />
295 <pka-trust value="0x0" />
296 <chain-model value="0x0" />
297 <validity value="0x0" />
298 <validity-reason value="0x0">Success <Unspecified source></validity-reason>
299 <pubkey-algo value="0x1">RSA</pubkey-algo>
300 <hash-algo value="0x2">SHA1</hash-algo>
307 input_read,input_write = _os.pipe()
308 pass_fds = [input_read]
310 message_read,message_write = _os.pipe()
312 pass_fds.append(message_read)
315 output_read,output_write = _os.pipe()
316 pass_fds.append(output_write)
317 client,socket = get_client(pass_fds=pass_fds)
318 _os.close(input_read)
320 _os.close(message_read)
322 _os.close(output_write)
323 verified = result = None
327 _common.Request('INPUT', 'FD={}'.format(input_read)))
330 _common.Request('MESSAGE', 'FD={}'.format(message_read)))
333 _common.Request('OUTPUT', 'FD={}'.format(output_write)))
335 _write(input_write, signature)
336 _os.close(input_write)
338 _write(message_write, data)
339 _os.close(message_write)
342 _write(input_write, data)
343 _os.close(input_write)
345 client.make_request(_common.Request('VERIFY'))
349 plain = _read(output_read)
350 rs,result = client.make_request(_common.Request('RESULT'))
352 for line in result.splitlines():
353 if b'<status ' in line and b'Success' not in line:
355 elif b'<pka-trust' in line and b'0x2' not in line:
358 disconnect(client, socket)
359 for fd in [input_write, message_write, output_read]:
362 return (plain, verified, result)