Add 'tools' to first line in README.
[pgp-mime.git] / pgp_mime.py
1 # -*- coding: utf-8 -*-
2 # Copyright (C) 2012 W. Trevor King <wking@drexel.edu>
3 #
4 # This file is part of pgp-mime.
5 #
6 # pgp-mime is free software: you can redistribute it and/or modify it under the
7 # terms of the GNU General Public License as published by the Free Software
8 # Foundation, either version 3 of the License, or (at your option) any later
9 # version.
10 #
11 # pgp-mime is distributed in the hope that it will be useful, but WITHOUT ANY
12 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
13 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License along with
16 # pgp-mime.  If not, see <http://www.gnu.org/licenses/>.
17 """Python module and for constructing and sending pgp/mime email.
18
19 Mostly uses subprocess to call ``gpg`` and sends mail using either
20 SMTP or a sendmail-compatible mailer.  If you lack ``gpg``, either
21 don't use the encryption functions, adjust the ``GPG_*`` constants, or
22 adjust the ``*_bytes`` commands.
23 """
24
25 import configparser as _configparser
26 import io as _io
27 import logging as _logging
28 import os as _os
29 import re as _re
30 import smtplib as _smtplib
31 import smtplib as _smtplib
32 import subprocess as _subprocess
33 import threading as _threading
34
35 from email import message_from_bytes as _message_from_bytes
36 from email.encoders import encode_7or8bit as _encode_7or8bit
37 from email.header import decode_header as _decode_header
38 from email.message import Message as _Message
39 from email.mime.application import MIMEApplication as _MIMEApplication
40 from email.mime.multipart import MIMEMultipart as _MIMEMultipart
41 from email.mime.text import MIMEText as _MIMEText
42 from email.parser import Parser as _Parser
43 from email.utils import formataddr as _formataddr
44 from email.utils import getaddresses as _getaddresses
45
46
47 __version__ = '0.2'
48
49
50 LOG = _logging.getLogger('pgp-mime')
51 LOG.setLevel(_logging.ERROR)
52 LOG.addHandler(_logging.StreamHandler())
53
54 ENCODING = 'utf-8'
55 #ENCODING = 'iso-8859-1'
56
57 GPG_ARGS = [
58     '/usr/bin/gpg', '--no-verbose', '--quiet', '--batch', '--output', '-']
59 GPG_SIGN_ARGS = ['--armor', '--textmode', '--detach-sign']
60 GPG_ENCRYPT_ARGS = ['--armor', '--textmode', '--encrypt', '--always-trust']
61 GPG_SIGN_AND_ENCRYPT_ARGS = [
62     '--armor', '--textmode', '--sign', '--encrypt', '--always-trust']
63 GPG_DECRYPT_ARGS = []
64 GPG_VERIFY_ARGS = []
65 GPG_VERIFY_FAILED = [
66     'This key is not certified with a trusted signature',
67     'WARNING',
68     ]
69 SENDMAIL = ['/usr/sbin/sendmail', '-t']
70
71
72 def get_smtp_params(config):
73     r"""Retrieve SMTP paramters from a config file.
74
75     >>> from configparser import ConfigParser
76     >>> config = ConfigParser()
77     >>> config.read_string('\n'.join([
78     ...             '[smtp]',
79     ...             'host: smtp.mail.uu.edu',
80     ...             'port: 587',
81     ...             'starttls: yes',
82     ...             'username: rincewind',
83     ...             'password: 7ugg@g3',
84     ...             ]))
85     >>> get_smtp_params(config)
86     ('smtp.mail.uu.edu', 587, True, 'rincewind', '7ugg@g3')
87     >>> config = ConfigParser()
88     >>> get_smtp_params(ConfigParser())
89     (None, None, None, None, None)
90     """
91     try:
92         host = config.get('smtp', 'host')
93     except _configparser.NoSectionError:
94         return (None, None, None, None, None)
95     except _configparser.NoOptionError:
96         host = None
97     try:
98         port = config.getint('smtp', 'port')
99     except _configparser.NoOptionError:
100         port = None
101     try:
102         starttls = config.getboolean('smtp', 'starttls')
103     except _configparser.NoOptionError:
104         starttls = None
105     try:
106         username = config.get('smtp', 'username')
107     except _configparser.NoOptionError:
108         username = None
109     try:
110         password = config.get('smtp', 'password')
111     except _configparser.NoOptionError:
112         password = None
113     return (host, port, starttls, username, password)
114
115 def get_smtp(host=None, port=None, starttls=None, username=None,
116              password=None):
117     """Connect to an SMTP host using the given parameters.
118
119     >>> import smtplib
120     >>> try:  # doctest: +SKIP
121     ...     smtp = get_smtp(host='smtp.gmail.com', port=587, starttls=True,
122     ...         username='rincewind@uu.edu', password='7ugg@g3')
123     ... except smtplib.SMTPAuthenticationError as error:
124     ...     print('that was not a real account')
125     that was not a real account
126     >>> smtp = get_smtp()  # doctest: +SKIP
127     >>> smtp.quit()  # doctest: +SKIP
128     """
129     if host is None:
130         host = 'localhost'
131     if port is None:
132         port = _smtplib.SMTP_PORT
133     if username and not starttls:
134         raise ValueError(
135             'sending passwords in the clear is unsafe!  Use STARTTLS.')
136     LOG.info('connect to SMTP server at {}:{}'.format(host, port))
137     smtp = _smtplib.SMTP(host=host, port=port)
138     smtp.ehlo()
139     if starttls:
140         smtp.starttls()
141     if username:
142         smtp.login(username, password)
143     #smtp.set_debuglevel(1)
144     return smtp
145
146 def mail(message, smtp=None, sendmail=None):
147     """Send an email ``Message`` instance on its merry way.
148
149     We can shell out to the user specified sendmail in case
150     the local host doesn't have an SMTP server set up
151     for easy ``smtplib`` usage.
152
153     >>> message = encodedMIMEText('howdy!')
154     >>> message['From'] = 'John Doe <jdoe@a.gov.ru>'
155     >>> message['To'] = 'Jack <jack@hill.org>, Jill <jill@hill.org>'
156     >>> mail(message=message, sendmail=SENDMAIL)  # doctest: +SKIP
157     """
158     LOG.info('send message {} -> {}'.format(message['from'], message['to']))
159     if smtp:
160         smtp.send_message(msg=message)
161     elif sendmail:
162         execute(
163             sendmail, stdin=message.as_string().encode('us-ascii'),
164             close_fds=True)
165     else:
166         smtp = _smtplib.SMTP()
167         smtp.connect()
168         smtp.send_message(msg=message)
169         smtp.close()
170
171 def header_from_text(text):
172     r"""Simple wrapper for instantiating a ``Message`` from text.
173
174     >>> text = '\n'.join(
175     ...     ['From: me@big.edu','To: you@big.edu','Subject: testing'])
176     >>> header = header_from_text(text=text)
177     >>> print(header.as_string())  # doctest: +REPORT_UDIFF
178     From: me@big.edu
179     To: you@big.edu
180     Subject: testing
181     <BLANKLINE>
182     <BLANKLINE>
183     """
184     text = text.strip()
185     p = _Parser()
186     return p.parsestr(text, headersonly=True)
187
188 def guess_encoding(text):
189     r"""
190     >>> guess_encoding('hi there')
191     'us-ascii'
192     >>> guess_encoding('✉')
193     'utf-8'
194     """
195     for encoding in ['us-ascii', ENCODING, 'utf-8']:
196         try:
197             text.encode(encoding)
198         except UnicodeEncodeError:
199             pass
200         else:
201             return encoding
202     raise ValueError(text)
203
204 def encodedMIMEText(body, encoding=None):
205     """Wrap ``MIMEText`` with ``guess_encoding`` detection.
206
207     >>> message = encodedMIMEText('Hello')
208     >>> print(message.as_string())  # doctest: +REPORT_UDIFF
209     Content-Type: text/plain; charset="us-ascii"
210     MIME-Version: 1.0
211     Content-Transfer-Encoding: 7bit
212     Content-Disposition: inline
213     <BLANKLINE>
214     Hello
215     >>> message = encodedMIMEText('Джон Доу')
216     >>> print(message.as_string())  # doctest: +REPORT_UDIFF
217     Content-Type: text/plain; charset="utf-8"
218     MIME-Version: 1.0
219     Content-Transfer-Encoding: base64
220     Content-Disposition: inline
221     <BLANKLINE>
222     0JTQttC+0L0g0JTQvtGD
223     <BLANKLINE>
224     """
225     if encoding == None:
226         encoding = guess_encoding(body)
227     if encoding == 'us-ascii':
228         message = _MIMEText(body)
229     else:
230         # Create the message ('plain' stands for Content-Type: text/plain)
231         message = _MIMEText(body, 'plain', encoding)
232     message.add_header('Content-Disposition', 'inline')
233     return message
234
235 def strip_bcc(message):
236     """Remove the Bcc field from a ``Message`` in preparation for mailing
237
238     >>> message = encodedMIMEText('howdy!')
239     >>> message['To'] = 'John Doe <jdoe@a.gov.ru>'
240     >>> message['Bcc'] = 'Jack <jack@hill.org>, Jill <jill@hill.org>'
241     >>> message = strip_bcc(message)
242     >>> print(message.as_string())  # doctest: +REPORT_UDIFF
243     Content-Type: text/plain; charset="us-ascii"
244     MIME-Version: 1.0
245     Content-Transfer-Encoding: 7bit
246     Content-Disposition: inline
247     To: John Doe <jdoe@a.gov.ru>
248     <BLANKLINE>
249     howdy!
250     """
251     del message['bcc']
252     del message['resent-bcc']
253     return message
254
255 def append_text(text_part, new_text):
256     r"""Append text to the body of a ``plain/text`` part.
257
258     Updates encoding as necessary.
259
260     >>> message = encodedMIMEText('Hello')
261     >>> append_text(message, ' John Doe')
262     >>> print(message.as_string())  # doctest: +REPORT_UDIFF
263     Content-Type: text/plain; charset="us-ascii"
264     MIME-Version: 1.0
265     Content-Disposition: inline
266     Content-Transfer-Encoding: 7bit
267     <BLANKLINE>
268     Hello John Doe
269     >>> append_text(message, ', Джон Доу')
270     >>> print(message.as_string())  # doctest: +REPORT_UDIFF
271     MIME-Version: 1.0
272     Content-Disposition: inline
273     Content-Type: text/plain; charset="utf-8"
274     Content-Transfer-Encoding: base64
275     <BLANKLINE>
276     SGVsbG8gSm9obiBEb2UsINCU0LbQvtC9INCU0L7Rgw==
277     <BLANKLINE>
278     >>> append_text(message, ', and Jane Sixpack.')
279     >>> print(message.as_string())  # doctest: +REPORT_UDIFF
280     MIME-Version: 1.0
281     Content-Disposition: inline
282     Content-Type: text/plain; charset="utf-8"
283     Content-Transfer-Encoding: base64
284     <BLANKLINE>
285     SGVsbG8gSm9obiBEb2UsINCU0LbQvtC9INCU0L7RgywgYW5kIEphbmUgU2l4cGFjay4=
286     <BLANKLINE>
287     """
288     original_encoding = text_part.get_charset().input_charset
289     original_payload = str(
290         text_part.get_payload(decode=True), original_encoding)
291     new_payload = '{}{}'.format(original_payload, new_text)
292     new_encoding = guess_encoding(new_payload)
293     if text_part.get('content-transfer-encoding', None):
294         # clear CTE so set_payload will set it properly for the new encoding
295         del text_part['content-transfer-encoding']
296     text_part.set_payload(new_payload, new_encoding)
297
298 def attach_root(header, root_part):
299     r"""Copy headers from ``header`` onto ``root_part``.
300
301     >>> header = header_from_text('From: me@big.edu\n')
302     >>> body = encodedMIMEText('Hello')
303     >>> message = attach_root(header, body)
304     >>> print(message.as_string())  # doctest: +REPORT_UDIFF
305     Content-Type: text/plain; charset="us-ascii"
306     MIME-Version: 1.0
307     Content-Transfer-Encoding: 7bit
308     Content-Disposition: inline
309     From: me@big.edu
310     <BLANKLINE>
311     Hello
312     """
313     for k,v in header.items():
314         root_part[k] = v
315     return root_part    
316
317 def execute(args, stdin=None, expect=(0,), env=_os.environ, **kwargs):
318     """Execute a command (allows us to drive gpg).
319     """
320     LOG.debug('$ {}'.format(args))
321     try:
322         p = _subprocess.Popen(
323             args, stdin=_subprocess.PIPE, stdout=_subprocess.PIPE,
324             stderr=_subprocess.PIPE, shell=False, env=env, **kwargs)
325     except OSError as e:
326         raise Exception('{}\nwhile executing {}'.format(e.args[1], args))
327     output,error = p.communicate(input=stdin)
328     status = p.wait()
329     LOG.debug('(status: {})\n{}{}'.format(status, output, error))
330     if status not in expect:
331         raise Exception('unexpected status while executing {}\n{}\n{}'.format(
332                 args, error, status))
333     return (status, output, error)
334
335 def getaddresses(addresses):
336     """A decoding version of ``email.utils.getaddresses``.
337
338     >>> text = ('To: =?utf-8?b?0JTQttC+0L0g0JTQvtGD?= <jdoe@a.gov.ru>, '
339     ...     'Jack <jack@hill.org>')
340     >>> header = header_from_text(text=text)
341     >>> list(getaddresses(header.get_all('to', [])))
342     [('Джон Доу', 'jdoe@a.gov.ru'), ('Jack', 'jack@hill.org')]
343     """
344     for (name,address) in _getaddresses(addresses):
345         n = []
346         for b,encoding in _decode_header(name):
347             if encoding is None:
348                 n.append(b)
349             else:
350                 n.append(str(b, encoding))
351         yield (' '.join(n), address)
352
353 def email_sources(message):
354     """Extract author address from an email ``Message``
355
356     Search the header of an email Message instance to find the
357     senders' email addresses (or sender's address).
358
359     >>> text = ('From: =?utf-8?b?0JTQttC+0L0g0JTQvtGD?= <jdoe@a.gov.ru>, '
360     ...     'Jack <jack@hill.org>')
361     >>> header = header_from_text(text=text)
362     >>> list(email_sources(header))
363     [('Джон Доу', 'jdoe@a.gov.ru'), ('Jack', 'jack@hill.org')]
364     """
365     froms = message.get_all('from', [])
366     return getaddresses(froms) # [(name, address), ...]
367
368 def email_targets(message):
369     """Extract recipient addresses from an email ``Message``
370
371     Search the header of an email Message instance to find a
372     list of recipient's email addresses.
373
374     >>> text = ('To: =?utf-8?b?0JTQttC+0L0g0JTQvtGD?= <jdoe@a.gov.ru>, '
375     ...     'Jack <jack@hill.org>')
376     >>> header = header_from_text(text=text)
377     >>> list(email_targets(header))
378     [('Джон Доу', 'jdoe@a.gov.ru'), ('Jack', 'jack@hill.org')]
379     """
380     tos = message.get_all('to', [])
381     ccs = message.get_all('cc', [])
382     bccs = message.get_all('bcc', [])
383     resent_tos = message.get_all('resent-to', [])
384     resent_ccs = message.get_all('resent-cc', [])
385     resent_bccs = message.get_all('resent-bcc', [])
386     return getaddresses(
387         tos + ccs + bccs + resent_tos + resent_ccs + resent_bccs)
388
389 def _thread_pipe(fd, data):
390     """Write ``data`` to ``fd`` and close ``fd``.
391
392     A helper function for ``thread_pipe``.
393
394     >>> 
395     """
396     LOG.debug('starting pipe-write thread')
397     try:
398         remaining = len(data)
399         while remaining:
400             remaining -= _os.write(fd, data[-remaining:])
401     finally:
402         LOG.debug('closing pipe-write file descriptor')
403         _os.close(fd)
404         LOG.debug('closed pipe-write file descriptor')
405
406 def thread_pipe(data):
407     """Write data to a pipe.
408
409     Return the associated read file descriptor and running ``Thread``
410     that's doing the writing.
411
412     >>> import os
413     >>> read,thread = thread_pipe(b'Hello world!')
414     >>> try:
415     ...     print(os.read(read, 100))
416     ... finally:
417     ...     thread.join()
418     b'Hello world!'
419     """
420     read,write = _os.pipe()
421     LOG.debug('opened a pipe {} -> {}'.format(write, read))
422     try:
423         thread = _threading.Thread(
424             name='pipe writer', target=_thread_pipe, args=(write, data))
425         thread.start()
426     except:
427         _os.close(read)
428         _os.close(write)
429     return (read, thread)
430
431 def sign_bytes(bytes, sign_as=None):
432     r"""Sign ``bytes`` as ``sign_as``.
433
434     >>> print(sign_bytes(bytes(b'Hello'), 'pgp-mime@invalid.com'))
435     ... # doctest: +ELLIPSIS
436     b'-----BEGIN PGP SIGNATURE-----\n...-----END PGP SIGNATURE-----\n'
437     """
438     args = GPG_ARGS + GPG_SIGN_ARGS
439     if sign_as:
440         args.extend(['--local-user', sign_as])
441     status,output,error = execute(args, stdin=bytes, close_fds=True)
442     return output
443
444 def encrypt_bytes(bytes, recipients):
445     r"""Encrypt ``bytes`` to ``recipients``.
446
447     >>> encrypt_bytes(bytes(b'Hello'), ['pgp-mime@invalid.com'])
448     ... # doctest: +ELLIPSIS
449     b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
450     """
451     args = GPG_ARGS + GPG_ENCRYPT_ARGS
452     if not recipients:
453         raise ValueError('no recipients specified for encryption')
454     for recipient in recipients:
455         args.extend(['--recipient', recipient])
456     status,output,error = execute(args, stdin=bytes, close_fds=True)
457     return output
458
459 def sign_and_encrypt_bytes(bytes, sign_as=None, recipients=None):
460     r"""Sign ``bytes`` as ``sign_as`` and encrypt to ``recipients``.
461
462     >>> sign_and_encrypt_bytes(
463     ...     bytes(b'Hello'), 'pgp-mime@invalid.com', ['pgp-mime@invalid.com'])
464     ... # doctest: +ELLIPSIS
465     b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
466     """
467     args = GPG_ARGS + GPG_SIGN_AND_ENCRYPT_ARGS
468     if sign_as:
469         args.extend(['--local-user', sign_as])
470     if not recipients:
471         raise ValueError('no recipients specified for encryption')
472     for recipient in recipients:
473         args.extend(['--recipient', recipient])
474     status,output,error = execute(args, stdin=bytes, close_fds=True)
475     return output
476
477 def decrypt_bytes(bytes):
478     r"""Decrypt ``bytes``.
479
480     >>> b = '\n'.join([
481     ...     '-----BEGIN PGP MESSAGE-----',
482     ...     'Version: GnuPG v2.0.17 (GNU/Linux)',
483     ...     '',
484     ...     'hQEMA1Ea7aZDMrbjAQf/TAqLjksZSJxSqkBxYT5gtLQoXY6isvRZg2apjs7CW0y2',
485     ...     'tFK/ptnVYAq2OtWQFhbiJXj8hmwJyyFfb3lghpeu4ihO52JgkkwOpmJb6dxjOi83',
486     ...     'qDwaGOogEPH38BNLuwdrMCW0jmNROwvS796PtqSGUaJTuIiKUB8lETwPwIHrDc11',
487     ...     'N3RWStE5uShNkXXQXplUoeCKf3N4XguXym+GQCqJQzlEMrkkDdr4l7mzvt3Nf8EA',
488     ...     'SgSak086tUoo9x8IN5PJCuOJkcXcjQzFcpqOsA7dyZKO8NeQUZv2JvlZuorckNvN',
489     ...     'xx3PwW0a8VeJgTQrh64ZK/d3F3gNHUTzXkq/UIn25tJFAcmSUwxtsBal7p8zAeCV',
490     ...     '8zefsHRQ5Y03IBeYBcVJBhDS9XfvwLQTJiGGstPCxzKTwSUT1MzV5t5twG/STDCc',
491     ...     'uxW3wSdo',
492     ...     '=bZI+',
493     ...     '-----END PGP MESSAGE-----',
494     ...     ''
495     ...     ]).encode('us-ascii')
496     >>> decrypt_bytes(b)
497     b'Success!\n'
498     """
499     args = GPG_ARGS + GPG_DECRYPT_ARGS
500     status,output,error = execute(args, stdin=bytes, close_fds=True)
501     return output
502
503 def verify_bytes(bytes, signature=None):
504     r"""Verify a signature on ``bytes``, possibly decrypting first.
505
506     These tests assume you didn't trust the distributed test key.
507
508     >>> b = '\n'.join([
509     ...     '-----BEGIN PGP MESSAGE-----',
510     ...     'Version: GnuPG v2.0.17 (GNU/Linux)',
511     ...     '',
512     ...     'hQEMA1Ea7aZDMrbjAQf/YM1SeFzNGz0DnUynaEyhfGCvcqmjtbN1PtZMpT7VaQLN',
513     ...     'a+c0faskr79Atz0+2IBR7CDOlcETrRtH2EnrWukbRIDtmffNFGuhMRTNfnQ15OIN',
514     ...     'qrmt2P5gXznsgnm2XjzTK7S/Cc3Aq+zjaDrDt7bIedEdz+EyNgaKuL/lB9cAB8xL',
515     ...     'YYp/yn55Myjair2idgzsa7w/QXdE3RhpyRLqR2Jgz4P1I1xOgUYnylbpIZL9FOKN',
516     ...     'NR3RQhkGdANBku8otfthb5ZUGsNMV45ct4V8PE+xChjFb9gcwpaf1hhoIF/sYHD5',
517     ...     'Bkf+v/J8F40KGYY16b0DjQIUlnra9y7q9jj0h2bvc9LAtgHtVUso133LLcVYl7RP',
518     ...     'Vjyz9Ps366BtIdPlAL4CoF5hEcMKS5J3h1vRlyAKN4uHENl5vKvoxn7ID3JhhWQc',
519     ...     '6QrPGis64zi3OnYor34HPh/KNJvkgOQkekmtYuTxnkiONA4lhMDJgeaVZ9WZq+GV',
520     ...     'MaCvCFGNYU2TV4V8wMlnUbF8d5bDQ83g8MxIVKdDcnBzzYLZha+qmz4Spry9iB53',
521     ...     'Sg/sM5H8gWWSl7Oj1lxVg7o7IscpQfVt6zL6jD2VjL3L3Hu7WEXIrcGZtvrP4d+C',
522     ...     'TGYWiGlh5B2UCFk2bVctfw8W/QfaVvJYD4Rfqta2V2p14KIJLFRSGa1g26W4ixrH',
523     ...     'XKxgaA3AIfJ+6c5RoisRLuYCxvQi91wkE9hAXR+inXK4Hq4SmiHoeITZFhHP3hh3',
524     ...     'rbpp8mopiMNxWqCbuqgILP6pShn4oPclu9aR8uJ1ziDxISTGYC71mvLUERUjFn2L',
525     ...     'fu6C0+TCC9RmeyL+eNdM6cjs1G7YR6yX',
526     ...     '=phHd',
527     ...     '-----END PGP MESSAGE-----',
528     ...     '',
529     ...     ]).encode('us-ascii')
530     >>> output,verified,message = verify_bytes(b)
531     >>> output
532     b'Success!\n'
533     >>> verified
534     False
535     >>> print(message)
536     gpg: Signature made Wed 21 Mar 2012 03:13:57 PM EDT using RSA key ID 4332B6E3
537     gpg: Good signature from "pgp-mime-test (http://blog.tremily.us/posts/pgp-mime/) <pgp-mime@invalid.com>"
538     gpg: WARNING: This key is not certified with a trusted signature!
539     gpg:          There is no indication that the signature belongs to the owner.
540     Primary key fingerprint: B2ED BE0E 771A 4B87 08DD  16A7 511A EDA6 4332 B6E3
541     <BLANKLINE>
542
543     >>> b = b'Success!\n'
544     >>> signature = '\n'.join([
545     ...     '-----BEGIN PGP SIGNATURE-----',
546     ...     'Version: GnuPG v2.0.17 (GNU/Linux)',
547     ...     '',
548     ...     'iQEcBAEBAgAGBQJPaiw/AAoJEFEa7aZDMrbj93gH/1fQPXLjUTpONJUTmvGoMLNA',
549     ...     'W9ZhjpUL5i6rRqYGUvQ4kTEDuPMxkMrCyFCDHEhSDHufMek6Nso5/HeJn3aqxlgs',
550     ...     'hmNlvAq4FI6JQyFL7eCp/XG9cPx1p42dTI7JAih8FuK21sS4m/H5XP3R/6KXC99D',
551     ...     '39rrXCvvR+yNgKe2dxuJwmKuLteVlcWxiIQwVrYK70GtJHC5BO79G8yGccWoEy9C',
552     ...     '9JkJiyNptqZyFjGBNmMmrCSFZ7ZFA02RB+laRmwuIiozw4TJYEksxPrgZMbbcFzx',
553     ...     'zs3JHyV23+Fz1ftalvwskHE7tJkX9Ub8iBMNZ/KxJXXdPdpuMdEYVjoUehkQBQE=',
554     ...     '=rRBP',
555     ...     '-----END PGP SIGNATURE-----',
556     ...     '',
557     ...     ]).encode('us-ascii')
558     >>> output,verified,message = verify_bytes(b, signature=signature)
559     >>> output
560     b'Success!\n'
561     >>> verified
562     False
563     >>> print(message)
564     gpg: Signature made Wed 21 Mar 2012 03:30:07 PM EDT using RSA key ID 4332B6E3
565     gpg: Good signature from "pgp-mime-test (http://blog.tremily.us/posts/pgp-mime/) <pgp-mime@invalid.com>"
566     gpg: WARNING: This key is not certified with a trusted signature!
567     gpg:          There is no indication that the signature belongs to the owner.
568     Primary key fingerprint: B2ED BE0E 771A 4B87 08DD  16A7 511A EDA6 4332 B6E3
569     <BLANKLINE>
570     """
571     args = GPG_ARGS + GPG_VERIFY_ARGS
572     kwargs = {}
573     sig_read = sig_thread = None
574     if signature:
575         sig_read,sig_thread = thread_pipe(signature)
576         args.extend(
577             ['--enable-special-filenames', '--verify',
578              '--', '-&{}'.format(sig_read), '-'])
579         kwargs['close_fds'] = False
580     else:
581         kwargs['close_fds'] = True
582     try:
583         status,output,error = execute(args, stdin=bytes, **kwargs)
584     finally:
585         if sig_read:
586             _os.close(sig_read)
587         if sig_thread:
588             sig_thread.join()
589     if signature:
590         assert output == b'', output
591         output = bytes
592     error = str(error, 'us-ascii')
593     verified = True
594     for string in GPG_VERIFY_FAILED:
595         if string in error:
596             verified = False
597             break
598     return (output, verified, error)
599
600 def sign(message, sign_as=None):
601     r"""Sign a ``Message``, returning the signed version.
602
603     multipart/signed
604     +-> text/plain                 (body)
605     +-> application/pgp-signature  (signature)
606
607     >>> message = encodedMIMEText('Hi\nBye')
608     >>> signed = sign(message, sign_as='pgp-mime@invalid.com')
609     >>> signed.set_boundary('boundsep')
610     >>> print(signed.as_string())  # doctest: +ELLIPSIS, +REPORT_UDIFF
611     Content-Type: multipart/signed; protocol="application/pgp-signature"; micalg="pgp-sha1"; boundary="boundsep"
612     MIME-Version: 1.0
613     Content-Disposition: inline
614     <BLANKLINE>
615     --boundsep
616     Content-Type: text/plain; charset="us-ascii"
617     MIME-Version: 1.0
618     Content-Transfer-Encoding: 7bit
619     Content-Disposition: inline
620     <BLANKLINE>
621     Hi
622     Bye
623     --boundsep
624     MIME-Version: 1.0
625     Content-Transfer-Encoding: 7bit
626     Content-Description: OpenPGP digital signature
627     Content-Type: application/pgp-signature; name="signature.asc"; charset="us-ascii"
628     <BLANKLINE>
629     -----BEGIN PGP SIGNATURE-----
630     Version: GnuPG...
631     -----END PGP SIGNATURE-----
632     <BLANKLINE>
633     --boundsep--
634
635     >>> from email.mime.multipart import MIMEMultipart
636     >>> message = MIMEMultipart()
637     >>> message.attach(encodedMIMEText('Part A'))
638     >>> message.attach(encodedMIMEText('Part B'))
639     >>> signed = sign(message, sign_as='pgp-mime@invalid.com')
640     >>> signed.set_boundary('boundsep')
641     >>> print(signed.as_string())  # doctest: +ELLIPSIS, +REPORT_UDIFF
642     Content-Type: multipart/signed; protocol="application/pgp-signature"; micalg="pgp-sha1"; boundary="boundsep"
643     MIME-Version: 1.0
644     Content-Disposition: inline
645     <BLANKLINE>
646     --boundsep
647     Content-Type: multipart/mixed; boundary="===============...=="
648     MIME-Version: 1.0
649     <BLANKLINE>
650     --===============...==
651     Content-Type: text/plain; charset="us-ascii"
652     MIME-Version: 1.0
653     Content-Transfer-Encoding: 7bit
654     Content-Disposition: inline
655     <BLANKLINE>
656     Part A
657     --===============...==
658     Content-Type: text/plain; charset="us-ascii"
659     MIME-Version: 1.0
660     Content-Transfer-Encoding: 7bit
661     Content-Disposition: inline
662     <BLANKLINE>
663     Part B
664     --===============...==--
665     --boundsep
666     MIME-Version: 1.0
667     Content-Transfer-Encoding: 7bit
668     Content-Description: OpenPGP digital signature
669     Content-Type: application/pgp-signature; name="signature.asc"; charset="us-ascii"
670     <BLANKLINE>
671     -----BEGIN PGP SIGNATURE-----
672     Version: GnuPG...
673     -----END PGP SIGNATURE-----
674     <BLANKLINE>
675     --boundsep--
676     """
677     body = message.as_string().encode('us-ascii')
678     signature = str(sign_bytes(body, sign_as), 'us-ascii')
679     sig = _MIMEApplication(
680         _data=signature,
681         _subtype='pgp-signature; name="signature.asc"',
682         _encoder=_encode_7or8bit)
683     sig['Content-Description'] = 'OpenPGP digital signature'
684     sig.set_charset('us-ascii')
685
686     msg = _MIMEMultipart(
687         'signed', micalg='pgp-sha1', protocol='application/pgp-signature')
688     msg.attach(message)
689     msg.attach(sig)
690     msg['Content-Disposition'] = 'inline'
691     return msg
692
693 def encrypt(message, recipients=None):
694     r"""Encrypt a ``Message``, returning the encrypted version.
695
696     multipart/encrypted
697     +-> application/pgp-encrypted  (control information)
698     +-> application/octet-stream   (body)
699
700     >>> message = encodedMIMEText('Hi\nBye')
701     >>> message['To'] = 'pgp-mime-test <pgp-mime@invalid.com>'
702     >>> encrypted = encrypt(message)
703     >>> encrypted.set_boundary('boundsep')
704     >>> print(encrypted.as_string())  # doctest: +ELLIPSIS, +REPORT_UDIFF
705     Content-Type: multipart/encrypted; protocol="application/pgp-encrypted"; micalg="pgp-sha1"; boundary="boundsep"
706     MIME-Version: 1.0
707     Content-Disposition: inline
708     <BLANKLINE>
709     --boundsep
710     MIME-Version: 1.0
711     Content-Transfer-Encoding: 7bit
712     Content-Type: application/pgp-encrypted; charset="us-ascii"
713     <BLANKLINE>
714     Version: 1
715     <BLANKLINE>
716     --boundsep
717     MIME-Version: 1.0
718     Content-Transfer-Encoding: 7bit
719     Content-Description: OpenPGP encrypted message
720     Content-Type: application/octet-stream; name="encrypted.asc"; charset="us-ascii"
721     <BLANKLINE>
722     -----BEGIN PGP MESSAGE-----
723     Version: GnuPG...
724     -----END PGP MESSAGE-----
725     <BLANKLINE>
726     --boundsep--
727
728     >>> from email.mime.multipart import MIMEMultipart
729     >>> message = MIMEMultipart()
730     >>> message.attach(encodedMIMEText('Part A'))
731     >>> message.attach(encodedMIMEText('Part B'))
732     >>> encrypted = encrypt(message, recipients=['pgp-mime@invalid.com'])
733     >>> encrypted.set_boundary('boundsep')
734     >>> print(encrypted.as_string()) # doctest: +ELLIPSIS, +REPORT_UDIFF
735     Content-Type: multipart/encrypted; protocol="application/pgp-encrypted"; micalg="pgp-sha1"; boundary="boundsep"
736     MIME-Version: 1.0
737     Content-Disposition: inline
738     <BLANKLINE>
739     --boundsep
740     MIME-Version: 1.0
741     Content-Transfer-Encoding: 7bit
742     Content-Type: application/pgp-encrypted; charset="us-ascii"
743     <BLANKLINE>
744     Version: 1
745     <BLANKLINE>
746     --boundsep
747     MIME-Version: 1.0
748     Content-Transfer-Encoding: 7bit
749     Content-Description: OpenPGP encrypted message
750     Content-Type: application/octet-stream; name="encrypted.asc"; charset="us-ascii"
751     <BLANKLINE>
752     -----BEGIN PGP MESSAGE-----
753     Version: GnuPG...
754     -----END PGP MESSAGE-----
755     <BLANKLINE>
756     --boundsep--
757     """
758     body = message.as_string().encode('us-ascii')
759     if recipients is None:
760         recipients = [email for name,email in email_targets(message)]
761         LOG.debug('extracted encryption recipients: {}'.format(recipients))
762     encrypted = str(encrypt_bytes(body, recipients), 'us-ascii')
763     enc = _MIMEApplication(
764         _data=encrypted,
765         _subtype='octet-stream; name="encrypted.asc"',
766         _encoder=_encode_7or8bit)
767     enc['Content-Description'] = 'OpenPGP encrypted message'
768     enc.set_charset('us-ascii')
769     control = _MIMEApplication(
770         _data='Version: 1\n',
771         _subtype='pgp-encrypted',
772         _encoder=_encode_7or8bit)
773     control.set_charset('us-ascii')
774     msg = _MIMEMultipart(
775         'encrypted',
776         micalg='pgp-sha1',
777         protocol='application/pgp-encrypted')
778     msg.attach(control)
779     msg.attach(enc)
780     msg['Content-Disposition'] = 'inline'
781     return msg
782
783 def sign_and_encrypt(message, sign_as=None, recipients=None):
784     r"""Sign and encrypt a ``Message``, returning the encrypted version.
785
786     multipart/encrypted
787      +-> application/pgp-encrypted  (control information)
788      +-> application/octet-stream   (body)
789
790     >>> message = encodedMIMEText('Hi\nBye')
791     >>> message['To'] = 'pgp-mime-test <pgp-mime@invalid.com>'
792     >>> encrypted = sign_and_encrypt(message, sign_as='pgp-mime@invalid.com')
793     >>> encrypted.set_boundary('boundsep')
794     >>> print(encrypted.as_string())  # doctest: +ELLIPSIS, +REPORT_UDIFF
795     Content-Type: multipart/encrypted; protocol="application/pgp-encrypted"; micalg="pgp-sha1"; boundary="boundsep"
796     MIME-Version: 1.0
797     Content-Disposition: inline
798     <BLANKLINE>
799     --boundsep
800     MIME-Version: 1.0
801     Content-Transfer-Encoding: 7bit
802     Content-Type: application/pgp-encrypted; charset="us-ascii"
803     <BLANKLINE>
804     Version: 1
805     <BLANKLINE>
806     --boundsep
807     MIME-Version: 1.0
808     Content-Transfer-Encoding: 7bit
809     Content-Description: OpenPGP encrypted message
810     Content-Type: application/octet-stream; name="encrypted.asc"; charset="us-ascii"
811     <BLANKLINE>
812     -----BEGIN PGP MESSAGE-----
813     Version: GnuPG...
814     -----END PGP MESSAGE-----
815     <BLANKLINE>
816     --boundsep--
817
818     >>> from email.mime.multipart import MIMEMultipart
819     >>> message = MIMEMultipart()
820     >>> message.attach(encodedMIMEText('Part A'))
821     >>> message.attach(encodedMIMEText('Part B'))
822     >>> encrypted = sign_and_encrypt(
823     ...     message, sign_as='pgp-mime@invalid.com', recipients=['pgp-mime@invalid.com'])
824     >>> encrypted.set_boundary('boundsep')
825     >>> print(encrypted.as_string()) # doctest: +ELLIPSIS, +REPORT_UDIFF
826     Content-Type: multipart/encrypted; protocol="application/pgp-encrypted"; micalg="pgp-sha1"; boundary="boundsep"
827     MIME-Version: 1.0
828     Content-Disposition: inline
829     <BLANKLINE>
830     --boundsep
831     MIME-Version: 1.0
832     Content-Transfer-Encoding: 7bit
833     Content-Type: application/pgp-encrypted; charset="us-ascii"
834     <BLANKLINE>
835     Version: 1
836     <BLANKLINE>
837     --boundsep
838     MIME-Version: 1.0
839     Content-Transfer-Encoding: 7bit
840     Content-Description: OpenPGP encrypted message
841     Content-Type: application/octet-stream; name="encrypted.asc"; charset="us-ascii"
842     <BLANKLINE>
843     -----BEGIN PGP MESSAGE-----
844     Version: GnuPG...
845     -----END PGP MESSAGE-----
846     <BLANKLINE>
847     --boundsep--
848     """
849     strip_bcc(message=message)
850     body = message.as_string().encode('us-ascii')
851     if recipients is None:
852         recipients = [email for name,email in email_targets(message)]
853         LOG.debug('extracted encryption recipients: {}'.format(recipients))
854     encrypted = str(sign_and_encrypt_bytes(
855             body, sign_as=sign_as, recipients=recipients), 'us-ascii')
856     enc = _MIMEApplication(
857         _data=encrypted,
858         _subtype='octet-stream; name="encrypted.asc"',
859         _encoder=_encode_7or8bit)
860     enc['Content-Description'] = 'OpenPGP encrypted message'
861     enc.set_charset('us-ascii')
862     control = _MIMEApplication(
863         _data='Version: 1\n',
864         _subtype='pgp-encrypted',
865         _encoder=_encode_7or8bit)
866     control.set_charset('us-ascii')
867     msg = _MIMEMultipart(
868         'encrypted',
869         micalg='pgp-sha1',
870         protocol='application/pgp-encrypted')
871     msg.attach(control)
872     msg.attach(enc)
873     msg['Content-Disposition'] = 'inline'
874     return msg
875
876 def _get_encrypted_parts(message):
877     ct = message.get_content_type()
878     assert ct == 'multipart/encrypted', ct
879     params = dict(message.get_params())
880     assert params.get('protocol', None) == 'application/pgp-encrypted', params
881     assert message.is_multipart(), message
882     control = body = None
883     for part in message.get_payload():
884         if part == message:
885             continue
886         assert part.is_multipart() == False, part
887         ct = part.get_content_type()
888         if ct == 'application/pgp-encrypted':
889             if control:
890                 raise ValueError('multiple application/pgp-encrypted parts')
891             control = part
892         elif ct == 'application/octet-stream':
893             if body:
894                 raise ValueError('multiple application/octet-stream parts')
895             body = part
896         else:
897             raise ValueError('unnecessary {} part'.format(ct))
898     if not control:
899         raise ValueError('missing application/pgp-encrypted part')
900     if not body:
901         raise ValueError('missing application/octet-stream part')
902     return (control, body)
903
904 def _get_signed_parts(message):
905     ct = message.get_content_type()
906     assert ct == 'multipart/signed', ct
907     params = dict(message.get_params())
908     assert params.get('protocol', None) == 'application/pgp-signature', params
909     assert message.is_multipart(), message
910     body = signature = None
911     for part in message.get_payload():
912         if part == message:
913             continue
914         ct = part.get_content_type()
915         if ct == 'application/pgp-signature':
916             if signature:
917                 raise ValueError('multiple application/pgp-signature parts')
918             signature = part
919         else:
920             if body:
921                 raise ValueError('multiple non-signature parts')
922             body = part
923     if not body:
924         raise ValueError('missing body part')
925     if not signature:
926         raise ValueError('missing application/pgp-signature part')
927     return (body, signature)
928
929 def decrypt(message):
930     r"""Decrypt a multipart/encrypted message.
931
932     >>> message = encodedMIMEText('Hi\nBye')
933     >>> encrypted = encrypt(message, recipients=['<pgp-mime@invalid.com>'])
934     >>> decrypted = decrypt(encrypted)
935     >>> print(decrypted.as_string())  # doctest: +ELLIPSIS, +REPORT_UDIFF
936     Content-Type: text/plain; charset="us-ascii"
937     MIME-Version: 1.0
938     Content-Transfer-Encoding: 7bit
939     Content-Disposition: inline
940     <BLANKLINE>
941     Hi
942     Bye
943
944     >>> from email.mime.multipart import MIMEMultipart
945     >>> message = MIMEMultipart()
946     >>> message.attach(encodedMIMEText('Part A'))
947     >>> message.attach(encodedMIMEText('Part B'))
948     >>> encrypted = encrypt(message, recipients=['pgp-mime@invalid.com'])
949     >>> decrypted = decrypt(encrypted)
950     >>> decrypted.set_boundary('boundsep')
951     >>> print(decrypted.as_string()) # doctest: +ELLIPSIS, +REPORT_UDIFF
952     Content-Type: multipart/mixed; boundary="boundsep"
953     MIME-Version: 1.0
954     <BLANKLINE>
955     --boundsep
956     Content-Type: text/plain; charset="us-ascii"
957     MIME-Version: 1.0
958     Content-Transfer-Encoding: 7bit
959     Content-Disposition: inline
960     <BLANKLINE>
961     Part A
962     --boundsep
963     Content-Type: text/plain; charset="us-ascii"
964     MIME-Version: 1.0
965     Content-Transfer-Encoding: 7bit
966     Content-Disposition: inline
967     <BLANKLINE>
968     Part B
969     --boundsep--
970     <BLANKLINE>
971     """
972     control,body = _get_encrypted_parts(message)
973     encrypted = body.get_payload(decode=True)
974     if not isinstance(encrypted, bytes):
975         encrypted = encrypted.encode('us-ascii')
976     decrypted = decrypt_bytes(encrypted)
977     return _message_from_bytes(decrypted)
978
979 def verify(message):
980     r"""Verify a signature on ``message``, possibly decrypting first.
981
982     >>> message = encodedMIMEText('Hi\nBye')
983     >>> message['To'] = 'pgp-mime-test <pgp-mime@invalid.com>'
984     >>> encrypted = sign_and_encrypt(message, sign_as='pgp-mime@invalid.com')
985     >>> decrypted,verified,message = verify(encrypted)
986     >>> print(decrypted.as_string())  # doctest: +ELLIPSIS, +REPORT_UDIFF
987     Content-Type: text/plain; charset="us-ascii"
988     MIME-Version: 1.0
989     Content-Transfer-Encoding: 7bit
990     Content-Disposition: inline
991     To: pgp-mime-test <pgp-mime@invalid.com>
992     <BLANKLINE>
993     Hi
994     Bye
995     >>> verified
996     False
997     >>> print(message)  # doctest: +ELLIPSIS, +REPORT_UDIFF
998     gpg: Signature made ... using RSA key ID 4332B6E3
999     gpg: Good signature from "pgp-mime-test (http://blog.tremily.us/posts/pgp-mime/) <pgp-mime@invalid.com>"
1000     gpg: WARNING: This key is not certified with a trusted signature!
1001     gpg:          There is no indication that the signature belongs to the owner.
1002     Primary key fingerprint: B2ED BE0E 771A 4B87 08DD  16A7 511A EDA6 4332 B6E3
1003     <BLANKLINE>
1004
1005     >>> from email.mime.multipart import MIMEMultipart
1006     >>> message = MIMEMultipart()
1007     >>> message.attach(encodedMIMEText('Part A'))
1008     >>> message.attach(encodedMIMEText('Part B'))
1009     >>> signed = sign(message, sign_as='pgp-mime@invalid.com')
1010     >>> decrypted,verified,message = verify(signed)
1011     >>> decrypted.set_boundary('boundsep')
1012     >>> print(decrypted.as_string())  # doctest: +ELLIPSIS, +REPORT_UDIFF
1013     Content-Type: multipart/mixed; boundary="boundsep"
1014     MIME-Version: 1.0
1015     <BLANKLINE>
1016     --boundsep
1017     Content-Type: text/plain; charset="us-ascii"
1018     MIME-Version: 1.0
1019     Content-Transfer-Encoding: 7bit
1020     Content-Disposition: inline
1021     <BLANKLINE>
1022     Part A
1023     --boundsep
1024     Content-Type: text/plain; charset="us-ascii"
1025     MIME-Version: 1.0
1026     Content-Transfer-Encoding: 7bit
1027     Content-Disposition: inline
1028     <BLANKLINE>
1029     Part B
1030     --boundsep--
1031     >>> verified
1032     False
1033     >>> print(message)  # doctest: +ELLIPSIS, +REPORT_UDIFF
1034     gpg: Signature made ... using RSA key ID 4332B6E3
1035     gpg: Good signature from "pgp-mime-test (http://blog.tremily.us/posts/pgp-mime/) <pgp-mime@invalid.com>"
1036     gpg: WARNING: This key is not certified with a trusted signature!
1037     gpg:          There is no indication that the signature belongs to the owner.
1038     Primary key fingerprint: B2ED BE0E 771A 4B87 08DD  16A7 511A EDA6 4332 B6E3
1039     <BLANKLINE>
1040     """
1041     ct = message.get_content_type()
1042     if ct == 'multipart/encrypted':
1043         control,body = _get_encrypted_parts(message)
1044         encrypted = body.get_payload(decode=True)
1045         if not isinstance(encrypted, bytes):
1046             encrypted = encrypted.encode('us-ascii')
1047         decrypted,verified,message = verify_bytes(encrypted)
1048         return (_message_from_bytes(decrypted), verified, message)
1049     body,signature = _get_signed_parts(message)
1050     sig_data = signature.get_payload(decode=True)
1051     if not isinstance(sig_data, bytes):
1052         sig_data = sig_data.encode('us-ascii')
1053     decrypted,verified,message = verify_bytes(
1054         body.as_string().encode('us-ascii'), signature=sig_data)
1055     return (body, verified, message)