import smtplib as _smtplib
import smtplib as _smtplib
import subprocess as _subprocess
+import threading as _threading
from email.encoders import encode_7or8bit as _encode_7or8bit
from email.generator import Generator as _Generator
#ENCODING = 'iso-8859-1'
GPG_ARGS = [
- '/usr/bin/gpg', '--no-verbose', '--quiet', '--batch', '--output', '-',
- '--armor', '--textmode']
-GPG_SIGN_ARGS = ['--detach-sign']
-GPG_ENCRYPT_ARGS = ['--encrypt', '--always-trust']
-GPG_SIGN_AND_ENCRYPT_ARGS = ['--sign', '--encrypt', '--always-trust']
+ '/usr/bin/gpg', '--no-verbose', '--quiet', '--batch', '--output', '-']
+GPG_SIGN_ARGS = ['--armor', '--textmode', '--detach-sign']
+GPG_ENCRYPT_ARGS = ['--armor', '--textmode', '--encrypt', '--always-trust']
+GPG_SIGN_AND_ENCRYPT_ARGS = [
+ '--armor', '--textmode', '--sign', '--encrypt', '--always-trust']
+GPG_DECRYPT_ARGS = []
+GPG_VERIFY_ARGS = []
+GPG_VERIFY_FAILED = [
+ 'This key is not certified with a trusted signature',
+ 'WARNING',
+ ]
SENDMAIL = ['/usr/sbin/sendmail', '-t']
>>> message = encodedMIMEText('howdy!')
>>> message['From'] = 'John Doe <jdoe@a.gov.ru>'
>>> message['To'] = 'Jack <jack@hill.org>, Jill <jill@hill.org>'
- >>> mail(message=message, sendmail=SENDMAIL)
+ >>> mail(message=message, sendmail=SENDMAIL) # doctest: +SKIP
"""
LOG.info('send message {} -> {}'.format(message['from'], message['to']))
if smtp:
smtp.send_message(msg=message)
elif sendmail:
- execute(sendmail, stdin=message.as_string().encode('us-ascii'))
+ execute(
+ sendmail, stdin=message.as_string().encode('us-ascii'),
+ close_fds=True)
else:
smtp = _smtplib.SMTP()
smtp.connect()
root_part[k] = v
return root_part
-def execute(args, stdin=None, expect=(0,), env=_os.environ):
+def execute(args, stdin=None, expect=(0,), env=_os.environ, **kwargs):
"""Execute a command (allows us to drive gpg).
"""
LOG.debug('$ {}'.format(args))
try:
p = _subprocess.Popen(
args, stdin=_subprocess.PIPE, stdout=_subprocess.PIPE,
- stderr=_subprocess.PIPE, shell=False, close_fds=True, env=env)
+ stderr=_subprocess.PIPE, shell=False, env=env, **kwargs)
except OSError as e:
raise Exception('{}\nwhile executing {}'.format(e.args[1], args))
output,error = p.communicate(input=stdin)
return getaddresses(
tos + ccs + bccs + resent_tos + resent_ccs + resent_bccs)
+def _thread_pipe(fd, data):
+ """Write ``data`` to ``fd`` and close ``fd``.
+
+ A helper function for ``thread_pipe``.
+
+ >>>
+ """
+ LOG.debug('starting pipe-write thread')
+ try:
+ _os.write(fd, data)
+ finally:
+ LOG.debug('closing pipe-write file descriptor')
+ _os.close(fd)
+ LOG.debug('closed pipe-write file descriptor')
+
+def thread_pipe(data):
+ """Write data to a pipe.
+
+ Return the associated read file descriptor and running ``Thread``
+ that's doing the writing.
+
+ >>> import os
+ >>> read,thread = thread_pipe(b'Hello world!')
+ >>> try:
+ ... print(os.read(read, 100))
+ ... finally:
+ ... thread.join()
+ b'Hello world!'
+ """
+ read,write = _os.pipe()
+ LOG.debug('opened a pipe {} -> {}'.format(write, read))
+ try:
+ thread = _threading.Thread(
+ name='pipe writer', target=_thread_pipe, args=(write, data))
+ thread.start()
+ except:
+ _os.close(read)
+ _os.close(write)
+ return (read, thread)
+
def sign_bytes(bytes, sign_as=None):
r"""Sign ``bytes`` as ``sign_as``.
args = GPG_ARGS + GPG_SIGN_ARGS
if sign_as:
args.extend(['--local-user', sign_as])
- status,output,error = execute(args, stdin=bytes)
+ status,output,error = execute(args, stdin=bytes, close_fds=True)
return output
def encrypt_bytes(bytes, recipients):
raise ValueError('no recipients specified for encryption')
for recipient in recipients:
args.extend(['--recipient', recipient])
- status,output,error = execute(args, stdin=bytes)
+ status,output,error = execute(args, stdin=bytes, close_fds=True)
return output
def sign_and_encrypt_bytes(bytes, sign_as=None, recipients=None):
raise ValueError('no recipients specified for encryption')
for recipient in recipients:
args.extend(['--recipient', recipient])
- status,output,error = execute(args, stdin=bytes)
+ status,output,error = execute(args, stdin=bytes, close_fds=True)
return output
+def decrypt_bytes(bytes):
+ r"""Decrypt ``bytes``.
+
+ >>> b = '\n'.join([
+ ... '-----BEGIN PGP MESSAGE-----',
+ ... 'Version: GnuPG v2.0.17 (GNU/Linux)',
+ ... '',
+ ... 'hQEMA1Ea7aZDMrbjAQf/TAqLjksZSJxSqkBxYT5gtLQoXY6isvRZg2apjs7CW0y2',
+ ... 'tFK/ptnVYAq2OtWQFhbiJXj8hmwJyyFfb3lghpeu4ihO52JgkkwOpmJb6dxjOi83',
+ ... 'qDwaGOogEPH38BNLuwdrMCW0jmNROwvS796PtqSGUaJTuIiKUB8lETwPwIHrDc11',
+ ... 'N3RWStE5uShNkXXQXplUoeCKf3N4XguXym+GQCqJQzlEMrkkDdr4l7mzvt3Nf8EA',
+ ... 'SgSak086tUoo9x8IN5PJCuOJkcXcjQzFcpqOsA7dyZKO8NeQUZv2JvlZuorckNvN',
+ ... 'xx3PwW0a8VeJgTQrh64ZK/d3F3gNHUTzXkq/UIn25tJFAcmSUwxtsBal7p8zAeCV',
+ ... '8zefsHRQ5Y03IBeYBcVJBhDS9XfvwLQTJiGGstPCxzKTwSUT1MzV5t5twG/STDCc',
+ ... 'uxW3wSdo',
+ ... '=bZI+',
+ ... '-----END PGP MESSAGE-----',
+ ... ''
+ ... ]).encode('us-ascii')
+ >>> decrypt_bytes(b)
+ b'Success!\n'
+ """
+ args = GPG_ARGS + GPG_DECRYPT_ARGS
+ status,output,error = execute(args, stdin=bytes, close_fds=True)
+ return output
+
+def verify_bytes(bytes, signature=None):
+ r"""Verify a signature on ``bytes``, possibly decrypting first.
+
+ These tests assume you didn't trust the distributed test key.
+
+ >>> b = '\n'.join([
+ ... '-----BEGIN PGP MESSAGE-----',
+ ... 'Version: GnuPG v2.0.17 (GNU/Linux)',
+ ... '',
+ ... 'hQEMA1Ea7aZDMrbjAQf/YM1SeFzNGz0DnUynaEyhfGCvcqmjtbN1PtZMpT7VaQLN',
+ ... 'a+c0faskr79Atz0+2IBR7CDOlcETrRtH2EnrWukbRIDtmffNFGuhMRTNfnQ15OIN',
+ ... 'qrmt2P5gXznsgnm2XjzTK7S/Cc3Aq+zjaDrDt7bIedEdz+EyNgaKuL/lB9cAB8xL',
+ ... 'YYp/yn55Myjair2idgzsa7w/QXdE3RhpyRLqR2Jgz4P1I1xOgUYnylbpIZL9FOKN',
+ ... 'NR3RQhkGdANBku8otfthb5ZUGsNMV45ct4V8PE+xChjFb9gcwpaf1hhoIF/sYHD5',
+ ... 'Bkf+v/J8F40KGYY16b0DjQIUlnra9y7q9jj0h2bvc9LAtgHtVUso133LLcVYl7RP',
+ ... 'Vjyz9Ps366BtIdPlAL4CoF5hEcMKS5J3h1vRlyAKN4uHENl5vKvoxn7ID3JhhWQc',
+ ... '6QrPGis64zi3OnYor34HPh/KNJvkgOQkekmtYuTxnkiONA4lhMDJgeaVZ9WZq+GV',
+ ... 'MaCvCFGNYU2TV4V8wMlnUbF8d5bDQ83g8MxIVKdDcnBzzYLZha+qmz4Spry9iB53',
+ ... 'Sg/sM5H8gWWSl7Oj1lxVg7o7IscpQfVt6zL6jD2VjL3L3Hu7WEXIrcGZtvrP4d+C',
+ ... 'TGYWiGlh5B2UCFk2bVctfw8W/QfaVvJYD4Rfqta2V2p14KIJLFRSGa1g26W4ixrH',
+ ... 'XKxgaA3AIfJ+6c5RoisRLuYCxvQi91wkE9hAXR+inXK4Hq4SmiHoeITZFhHP3hh3',
+ ... 'rbpp8mopiMNxWqCbuqgILP6pShn4oPclu9aR8uJ1ziDxISTGYC71mvLUERUjFn2L',
+ ... 'fu6C0+TCC9RmeyL+eNdM6cjs1G7YR6yX',
+ ... '=phHd',
+ ... '-----END PGP MESSAGE-----',
+ ... '',
+ ... ]).encode('us-ascii')
+ >>> output,verified,message = verify_bytes(b)
+ >>> output
+ b'Success!\n'
+ >>> verified
+ False
+ >>> print(message)
+ gpg: Signature made Wed 21 Mar 2012 03:13:57 PM EDT using RSA key ID 4332B6E3
+ gpg: Good signature from "pgp-mime-test (http://blog.tremily.us/posts/pgp-mime/) <pgp-mime@invalid.com>"
+ gpg: WARNING: This key is not certified with a trusted signature!
+ gpg: There is no indication that the signature belongs to the owner.
+ Primary key fingerprint: B2ED BE0E 771A 4B87 08DD 16A7 511A EDA6 4332 B6E3
+ <BLANKLINE>
+
+ >>> b = b'Success!\n'
+ >>> signature = '\n'.join([
+ ... '-----BEGIN PGP SIGNATURE-----',
+ ... 'Version: GnuPG v2.0.17 (GNU/Linux)',
+ ... '',
+ ... 'iQEcBAEBAgAGBQJPaiw/AAoJEFEa7aZDMrbj93gH/1fQPXLjUTpONJUTmvGoMLNA',
+ ... 'W9ZhjpUL5i6rRqYGUvQ4kTEDuPMxkMrCyFCDHEhSDHufMek6Nso5/HeJn3aqxlgs',
+ ... 'hmNlvAq4FI6JQyFL7eCp/XG9cPx1p42dTI7JAih8FuK21sS4m/H5XP3R/6KXC99D',
+ ... '39rrXCvvR+yNgKe2dxuJwmKuLteVlcWxiIQwVrYK70GtJHC5BO79G8yGccWoEy9C',
+ ... '9JkJiyNptqZyFjGBNmMmrCSFZ7ZFA02RB+laRmwuIiozw4TJYEksxPrgZMbbcFzx',
+ ... 'zs3JHyV23+Fz1ftalvwskHE7tJkX9Ub8iBMNZ/KxJXXdPdpuMdEYVjoUehkQBQE=',
+ ... '=rRBP',
+ ... '-----END PGP SIGNATURE-----',
+ ... '',
+ ... ]).encode('us-ascii')
+ >>> output,verified,message = verify_bytes(b, signature=signature)
+ >>> output
+ b'Success!\n'
+ >>> verified
+ False
+ >>> print(message)
+ gpg: Signature made Wed 21 Mar 2012 03:30:07 PM EDT using RSA key ID 4332B6E3
+ gpg: Good signature from "pgp-mime-test (http://blog.tremily.us/posts/pgp-mime/) <pgp-mime@invalid.com>"
+ gpg: WARNING: This key is not certified with a trusted signature!
+ gpg: There is no indication that the signature belongs to the owner.
+ Primary key fingerprint: B2ED BE0E 771A 4B87 08DD 16A7 511A EDA6 4332 B6E3
+ <BLANKLINE>
+ """
+ args = GPG_ARGS + GPG_VERIFY_ARGS
+ kwargs = {}
+ sig_read = sig_thread = None
+ if signature:
+ sig_read,sig_thread = thread_pipe(signature)
+ args.extend(
+ ['--enable-special-filenames', '--verify',
+ '--', '-&{}'.format(sig_read), '-'])
+ kwargs['close_fds'] = False
+ else:
+ kwargs['close_fds'] = True
+ try:
+ status,output,error = execute(args, stdin=bytes, **kwargs)
+ finally:
+ if sig_read:
+ _os.close(sig_read)
+ if sig_thread:
+ sig_thread.join()
+ if signature:
+ assert output == b'', output
+ output = bytes
+ error = str(error, 'us-ascii')
+ verified = True
+ for string in GPG_VERIFY_FAILED:
+ if string in error:
+ verified = False
+ break
+ return (output, verified, error)
+
def sign(message, sign_as=None):
r"""Sign a ``Message``, returning the signed version.