3 import codecs as _codecs
4 import logging as _logging
6 import os.path as _os_path
7 import socket as _socket
8 import subprocess as _subprocess
10 from pyassuan import client as _client
11 from pyassuan import common as _common
14 #class GPGMEClient(_client.AssuanClient):
16 #CLIENT = _client.AssuanClient(name='pgp-mime', close_on_disconnect=True)
17 #CLIENT.filename = ...
19 def connect(client, filename):
20 filename = _os_path.expanduser(filename)
22 socket = _socket.socket(_socket.AF_UNIX, _socket.SOCK_STREAM)
23 socket.connect(filename)
24 client.input = socket.makefile('rb')
25 client.output = socket.makefile('wb')
27 p = _subprocess.Popen(
28 filename, stdin=_subprocess.PIPE, stdout=_subprocess.PIPE,
30 client.input = p.stdout
31 client.output = p.stdin
37 client = _client.AssuanClient(name='pgp-mime', close_on_disconnect=True)
38 client.logger.setLevel(_logging.DEBUG)
39 socket = connect(client, '~/src/gpgme/build/src/gpgme-tool')
40 return (client, socket)
42 def disconnect(client, socket):
43 client.make_request(_common.Request('BYE'))
46 socket.shutdown(_socket.SHUT_RDWR)
49 status = socket.wait()
50 assert status == 0, status
53 responses,data = client.get_responses() # get initial 'OK' from server
54 client.make_request(_common.Request('ARMOR', 'true'))
56 def sign_and_encrypt_bytes(data, signers=None, recipients=None,
57 always_trust=False, mode='detach'):
58 r"""Sign ``data`` with ``signers`` and encrypt to ``recipients``.
62 >>> print(sign_and_encrypt_bytes(
63 ... bytes(b'Hello'), signers=['pgp-mime@invalid.com']))
64 ... # doctest: +ELLIPSIS
65 b'-----BEGIN PGP SIGNATURE-----\n...-----END PGP SIGNATURE-----\n'
69 >>> sign_and_encrypt_bytes(
70 ... bytes(b'Hello'), recipients=['pgp-mime@invalid.com'],
71 ... always_trust=True)
72 ... # doctest: +ELLIPSIS
73 b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
77 >>> sign_and_encrypt_bytes(
78 ... bytes(b'Hello'), signers=['pgp-mime@invalid.com'],
79 ... recipients=['pgp-mime@invalid.com'], always_trust=True)
80 ... # doctest: +ELLIPSIS
81 b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
83 client,socket = get_client()
87 for signer in signers:
88 client.make_request(_common.Request('SIGNER', signer))
90 for recipient in recipients:
91 client.make_request(_common.Request('RECIPIENT', recipient))
92 with open('/tmp/input', 'wb') as f:
94 client.make_request(_common.Request('INPUT', 'FILE=/tmp/input'))
95 client.make_request(_common.Request('OUTPUT', 'FILE=/tmp/output'))
97 if signers and recipients:
98 command = 'SIGN_ENCRYPT'
101 parameters.append('--{}'.format(mode))
105 raise ValueError('must specify at least one signer or recipient')
107 parameters.append('--always-trust')
109 _common.Request(command, ' '.join(parameters)))
110 with open('/tmp/output', 'rb') as f:
113 disconnect(client, socket)
115 _os.remove('/tmp/input')
116 _os.remove('/tmp/output')
121 def decrypt_bytes(data):
122 r"""Decrypt ``data``.
125 ... '-----BEGIN PGP MESSAGE-----',
126 ... 'Version: GnuPG v2.0.17 (GNU/Linux)',
128 ... 'hQEMA1Ea7aZDMrbjAQf/TAqLjksZSJxSqkBxYT5gtLQoXY6isvRZg2apjs7CW0y2',
129 ... 'tFK/ptnVYAq2OtWQFhbiJXj8hmwJyyFfb3lghpeu4ihO52JgkkwOpmJb6dxjOi83',
130 ... 'qDwaGOogEPH38BNLuwdrMCW0jmNROwvS796PtqSGUaJTuIiKUB8lETwPwIHrDc11',
131 ... 'N3RWStE5uShNkXXQXplUoeCKf3N4XguXym+GQCqJQzlEMrkkDdr4l7mzvt3Nf8EA',
132 ... 'SgSak086tUoo9x8IN5PJCuOJkcXcjQzFcpqOsA7dyZKO8NeQUZv2JvlZuorckNvN',
133 ... 'xx3PwW0a8VeJgTQrh64ZK/d3F3gNHUTzXkq/UIn25tJFAcmSUwxtsBal7p8zAeCV',
134 ... '8zefsHRQ5Y03IBeYBcVJBhDS9XfvwLQTJiGGstPCxzKTwSUT1MzV5t5twG/STDCc',
137 ... '-----END PGP MESSAGE-----',
139 ... ]).encode('us-ascii')
143 client,socket = get_client()
146 with open('/tmp/input', 'wb') as f:
148 client.make_request(_common.Request('INPUT', 'FILE=/tmp/input'))
149 client.make_request(_common.Request('OUTPUT', 'FILE=/tmp/output'))
150 client.make_request(_common.Request('DECRYPT'))
151 with open('/tmp/output', 'rb') as f:
154 disconnect(client, socket)
156 _os.remove('/tmp/input')
157 _os.remove('/tmp/output')
162 def verify_bytes(data, signature=None, always_trust=False):
163 r"""Verify a signature on ``data``, possibly decrypting first.
165 These tests assume you didn't trust the distributed test key.
168 ... '-----BEGIN PGP MESSAGE-----',
169 ... 'Version: GnuPG v2.0.17 (GNU/Linux)',
171 ... 'hQEMA1Ea7aZDMrbjAQf/YM1SeFzNGz0DnUynaEyhfGCvcqmjtbN1PtZMpT7VaQLN',
172 ... 'a+c0faskr79Atz0+2IBR7CDOlcETrRtH2EnrWukbRIDtmffNFGuhMRTNfnQ15OIN',
173 ... 'qrmt2P5gXznsgnm2XjzTK7S/Cc3Aq+zjaDrDt7bIedEdz+EyNgaKuL/lB9cAB8xL',
174 ... 'YYp/yn55Myjair2idgzsa7w/QXdE3RhpyRLqR2Jgz4P1I1xOgUYnylbpIZL9FOKN',
175 ... 'NR3RQhkGdANBku8otfthb5ZUGsNMV45ct4V8PE+xChjFb9gcwpaf1hhoIF/sYHD5',
176 ... 'Bkf+v/J8F40KGYY16b0DjQIUlnra9y7q9jj0h2bvc9LAtgHtVUso133LLcVYl7RP',
177 ... 'Vjyz9Ps366BtIdPlAL4CoF5hEcMKS5J3h1vRlyAKN4uHENl5vKvoxn7ID3JhhWQc',
178 ... '6QrPGis64zi3OnYor34HPh/KNJvkgOQkekmtYuTxnkiONA4lhMDJgeaVZ9WZq+GV',
179 ... 'MaCvCFGNYU2TV4V8wMlnUbF8d5bDQ83g8MxIVKdDcnBzzYLZha+qmz4Spry9iB53',
180 ... 'Sg/sM5H8gWWSl7Oj1lxVg7o7IscpQfVt6zL6jD2VjL3L3Hu7WEXIrcGZtvrP4d+C',
181 ... 'TGYWiGlh5B2UCFk2bVctfw8W/QfaVvJYD4Rfqta2V2p14KIJLFRSGa1g26W4ixrH',
182 ... 'XKxgaA3AIfJ+6c5RoisRLuYCxvQi91wkE9hAXR+inXK4Hq4SmiHoeITZFhHP3hh3',
183 ... 'rbpp8mopiMNxWqCbuqgILP6pShn4oPclu9aR8uJ1ziDxISTGYC71mvLUERUjFn2L',
184 ... 'fu6C0+TCC9RmeyL+eNdM6cjs1G7YR6yX',
186 ... '-----END PGP MESSAGE-----',
188 ... ]).encode('us-ascii')
189 >>> output,verified,result = verify_bytes(b)
194 >>> print(str(result, 'utf-8').replace('\x00', ''))
195 ... # doctest: +REPORT_UDIFF
196 <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
201 <summary value="0x0" />
202 <fpr>B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3</fpr>
203 <status value="0x0">Success <Unspecified source></status>
204 <timestamp unix="1332357237i" />
205 <exp-timestamp unix="0i" />
206 <wrong-key-usage value="0x0" />
207 <pka-trust value="0x0" />
208 <chain-model value="0x0" />
209 <validity value="0x0" />
210 <validity-reason value="0x0">Success <Unspecified source></validity-reason>
211 <pubkey-algo value="0x1">RSA</pubkey-algo>
212 <hash-algo value="0x8">SHA256</hash-algo>
218 >>> b = b'Success!\n'
219 >>> signature = '\n'.join([
220 ... '-----BEGIN PGP SIGNATURE-----',
221 ... 'Version: GnuPG v2.0.17 (GNU/Linux)',
223 ... 'iQEcBAEBAgAGBQJPaiw/AAoJEFEa7aZDMrbj93gH/1fQPXLjUTpONJUTmvGoMLNA',
224 ... 'W9ZhjpUL5i6rRqYGUvQ4kTEDuPMxkMrCyFCDHEhSDHufMek6Nso5/HeJn3aqxlgs',
225 ... 'hmNlvAq4FI6JQyFL7eCp/XG9cPx1p42dTI7JAih8FuK21sS4m/H5XP3R/6KXC99D',
226 ... '39rrXCvvR+yNgKe2dxuJwmKuLteVlcWxiIQwVrYK70GtJHC5BO79G8yGccWoEy9C',
227 ... '9JkJiyNptqZyFjGBNmMmrCSFZ7ZFA02RB+laRmwuIiozw4TJYEksxPrgZMbbcFzx',
228 ... 'zs3JHyV23+Fz1ftalvwskHE7tJkX9Ub8iBMNZ/KxJXXdPdpuMdEYVjoUehkQBQE=',
230 ... '-----END PGP SIGNATURE-----',
232 ... ]).encode('us-ascii')
233 >>> output,verified,result = verify_bytes(b, signature=signature)
238 >>> print(str(result, 'utf-8').replace('\x00', ''))
239 ... # doctest: +REPORT_UDIFF
240 <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
245 <summary value="0x0" />
246 <fpr>B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3</fpr>
247 <status value="0x0">Success <Unspecified source></status>
248 <timestamp unix="1332358207i" />
249 <exp-timestamp unix="0i" />
250 <wrong-key-usage value="0x0" />
251 <pka-trust value="0x0" />
252 <chain-model value="0x0" />
253 <validity value="0x0" />
254 <validity-reason value="0x0">Success <Unspecified source></validity-reason>
255 <pubkey-algo value="0x1">RSA</pubkey-algo>
256 <hash-algo value="0x2">SHA1</hash-algo>
263 client,socket = get_client()
264 verified = result = None
273 with open('/tmp/input', 'wb') as f:
275 client.make_request(_common.Request('INPUT', 'FILE=/tmp/input'))
277 with open('/tmp/message', 'wb') as f:
280 _common.Request('MESSAGE', 'FILE=/tmp/message'))
282 client.make_request(_common.Request('OUTPUT', 'FILE=/tmp/output'))
283 client.make_request(_common.Request('VERIFY'))
284 rs,result = client.make_request(_common.Request('RESULT'))
286 for line in result.splitlines():
287 if b'<status ' in line and b'Success' not in line:
289 elif b'<pka-trust' in line and b'0x2' not in line:
294 with open('/tmp/output', 'rb') as f:
297 disconnect(client, socket)
300 _os.remove('/tmp/input')
301 _os.remove('/tmp/output')
302 _os.remove('/tmp/message')
305 return (plain, verified, result)