From 0d4827ca5a04187d2d587c36bfc557caf1f5eacd Mon Sep 17 00:00:00 2001 From: "W. Trevor King" Date: Wed, 21 Mar 2012 15:11:43 -0400 Subject: [PATCH] Add decrypt_bytes() and verify_bytes(). Create the encrypted test input with: $ echo 'Success!' | gpg --no-verbose --quiet --batch --output - --armor --textmode --encrypt --always-trust --recipient pgp-mime@invalid.com Create the signed and encrypted test input with: $ echo 'Success!' | gpg --no-verbose --quiet --batch --output - --armor --textmode --sign --encrypt --always-trust --local-user pgp-mime@invalid.com --recipient pgp-mime@invalid.com Created the detached signature test input with: $ echo 'Success!' | gpg --no-verbose --quiet --batch --output - --armor --textmode --detach-sign --always-trust --local-user pgp-mime@invalid.com Verification with a detached signature is the tricky bit. We are piping the signed data in via stdout. To avoid opening a temporary file, we need to pipe the signature in through another pipe. The new `thread_pipe()` function opens that pipe, and spawns a thread writing the signature data, and the `--enable-special-filenames` option lets us specify the read-descriptor with the `-&n` syntax. The threading avoids deadlocking with `execute()`'s `communicate()` call, and makes cleanup of the write-descriptor easier. --- pgp_mime.py | 196 ++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 184 insertions(+), 12 deletions(-) diff --git a/pgp_mime.py b/pgp_mime.py index 3a6f15d..b663437 100644 --- a/pgp_mime.py +++ b/pgp_mime.py @@ -30,6 +30,7 @@ import re as _re import smtplib as _smtplib import smtplib as _smtplib import subprocess as _subprocess +import threading as _threading from email.encoders import encode_7or8bit as _encode_7or8bit from email.generator import Generator as _Generator @@ -54,11 +55,17 @@ ENCODING = 'utf-8' #ENCODING = 'iso-8859-1' GPG_ARGS = [ - '/usr/bin/gpg', '--no-verbose', '--quiet', '--batch', '--output', '-', - '--armor', '--textmode'] -GPG_SIGN_ARGS = ['--detach-sign'] -GPG_ENCRYPT_ARGS = ['--encrypt', '--always-trust'] -GPG_SIGN_AND_ENCRYPT_ARGS = ['--sign', '--encrypt', '--always-trust'] + '/usr/bin/gpg', '--no-verbose', '--quiet', '--batch', '--output', '-'] +GPG_SIGN_ARGS = ['--armor', '--textmode', '--detach-sign'] +GPG_ENCRYPT_ARGS = ['--armor', '--textmode', '--encrypt', '--always-trust'] +GPG_SIGN_AND_ENCRYPT_ARGS = [ + '--armor', '--textmode', '--sign', '--encrypt', '--always-trust'] +GPG_DECRYPT_ARGS = [] +GPG_VERIFY_ARGS = [] +GPG_VERIFY_FAILED = [ + 'This key is not certified with a trusted signature', + 'WARNING', + ] SENDMAIL = ['/usr/sbin/sendmail', '-t'] @@ -146,13 +153,15 @@ def mail(message, smtp=None, sendmail=None): >>> message = encodedMIMEText('howdy!') >>> message['From'] = 'John Doe ' >>> message['To'] = 'Jack , Jill ' - >>> mail(message=message, sendmail=SENDMAIL) + >>> mail(message=message, sendmail=SENDMAIL) # doctest: +SKIP """ LOG.info('send message {} -> {}'.format(message['from'], message['to'])) if smtp: smtp.send_message(msg=message) elif sendmail: - execute(sendmail, stdin=message.as_string().encode('us-ascii')) + execute( + sendmail, stdin=message.as_string().encode('us-ascii'), + close_fds=True) else: smtp = _smtplib.SMTP() smtp.connect() @@ -305,14 +314,14 @@ def attach_root(header, root_part): root_part[k] = v return root_part -def execute(args, stdin=None, expect=(0,), env=_os.environ): +def execute(args, stdin=None, expect=(0,), env=_os.environ, **kwargs): """Execute a command (allows us to drive gpg). """ LOG.debug('$ {}'.format(args)) try: p = _subprocess.Popen( args, stdin=_subprocess.PIPE, stdout=_subprocess.PIPE, - stderr=_subprocess.PIPE, shell=False, close_fds=True, env=env) + stderr=_subprocess.PIPE, shell=False, env=env, **kwargs) except OSError as e: raise Exception('{}\nwhile executing {}'.format(e.args[1], args)) output,error = p.communicate(input=stdin) @@ -377,6 +386,46 @@ def email_targets(message): return getaddresses( tos + ccs + bccs + resent_tos + resent_ccs + resent_bccs) +def _thread_pipe(fd, data): + """Write ``data`` to ``fd`` and close ``fd``. + + A helper function for ``thread_pipe``. + + >>> + """ + LOG.debug('starting pipe-write thread') + try: + _os.write(fd, data) + finally: + LOG.debug('closing pipe-write file descriptor') + _os.close(fd) + LOG.debug('closed pipe-write file descriptor') + +def thread_pipe(data): + """Write data to a pipe. + + Return the associated read file descriptor and running ``Thread`` + that's doing the writing. + + >>> import os + >>> read,thread = thread_pipe(b'Hello world!') + >>> try: + ... print(os.read(read, 100)) + ... finally: + ... thread.join() + b'Hello world!' + """ + read,write = _os.pipe() + LOG.debug('opened a pipe {} -> {}'.format(write, read)) + try: + thread = _threading.Thread( + name='pipe writer', target=_thread_pipe, args=(write, data)) + thread.start() + except: + _os.close(read) + _os.close(write) + return (read, thread) + def sign_bytes(bytes, sign_as=None): r"""Sign ``bytes`` as ``sign_as``. @@ -387,7 +436,7 @@ def sign_bytes(bytes, sign_as=None): args = GPG_ARGS + GPG_SIGN_ARGS if sign_as: args.extend(['--local-user', sign_as]) - status,output,error = execute(args, stdin=bytes) + status,output,error = execute(args, stdin=bytes, close_fds=True) return output def encrypt_bytes(bytes, recipients): @@ -402,7 +451,7 @@ def encrypt_bytes(bytes, recipients): raise ValueError('no recipients specified for encryption') for recipient in recipients: args.extend(['--recipient', recipient]) - status,output,error = execute(args, stdin=bytes) + status,output,error = execute(args, stdin=bytes, close_fds=True) return output def sign_and_encrypt_bytes(bytes, sign_as=None, recipients=None): @@ -420,9 +469,132 @@ def sign_and_encrypt_bytes(bytes, sign_as=None, recipients=None): raise ValueError('no recipients specified for encryption') for recipient in recipients: args.extend(['--recipient', recipient]) - status,output,error = execute(args, stdin=bytes) + status,output,error = execute(args, stdin=bytes, close_fds=True) return output +def decrypt_bytes(bytes): + r"""Decrypt ``bytes``. + + >>> b = '\n'.join([ + ... '-----BEGIN PGP MESSAGE-----', + ... 'Version: GnuPG v2.0.17 (GNU/Linux)', + ... '', + ... 'hQEMA1Ea7aZDMrbjAQf/TAqLjksZSJxSqkBxYT5gtLQoXY6isvRZg2apjs7CW0y2', + ... 'tFK/ptnVYAq2OtWQFhbiJXj8hmwJyyFfb3lghpeu4ihO52JgkkwOpmJb6dxjOi83', + ... 'qDwaGOogEPH38BNLuwdrMCW0jmNROwvS796PtqSGUaJTuIiKUB8lETwPwIHrDc11', + ... 'N3RWStE5uShNkXXQXplUoeCKf3N4XguXym+GQCqJQzlEMrkkDdr4l7mzvt3Nf8EA', + ... 'SgSak086tUoo9x8IN5PJCuOJkcXcjQzFcpqOsA7dyZKO8NeQUZv2JvlZuorckNvN', + ... 'xx3PwW0a8VeJgTQrh64ZK/d3F3gNHUTzXkq/UIn25tJFAcmSUwxtsBal7p8zAeCV', + ... '8zefsHRQ5Y03IBeYBcVJBhDS9XfvwLQTJiGGstPCxzKTwSUT1MzV5t5twG/STDCc', + ... 'uxW3wSdo', + ... '=bZI+', + ... '-----END PGP MESSAGE-----', + ... '' + ... ]).encode('us-ascii') + >>> decrypt_bytes(b) + b'Success!\n' + """ + args = GPG_ARGS + GPG_DECRYPT_ARGS + status,output,error = execute(args, stdin=bytes, close_fds=True) + return output + +def verify_bytes(bytes, signature=None): + r"""Verify a signature on ``bytes``, possibly decrypting first. + + These tests assume you didn't trust the distributed test key. + + >>> b = '\n'.join([ + ... '-----BEGIN PGP MESSAGE-----', + ... 'Version: GnuPG v2.0.17 (GNU/Linux)', + ... '', + ... 'hQEMA1Ea7aZDMrbjAQf/YM1SeFzNGz0DnUynaEyhfGCvcqmjtbN1PtZMpT7VaQLN', + ... 'a+c0faskr79Atz0+2IBR7CDOlcETrRtH2EnrWukbRIDtmffNFGuhMRTNfnQ15OIN', + ... 'qrmt2P5gXznsgnm2XjzTK7S/Cc3Aq+zjaDrDt7bIedEdz+EyNgaKuL/lB9cAB8xL', + ... 'YYp/yn55Myjair2idgzsa7w/QXdE3RhpyRLqR2Jgz4P1I1xOgUYnylbpIZL9FOKN', + ... 'NR3RQhkGdANBku8otfthb5ZUGsNMV45ct4V8PE+xChjFb9gcwpaf1hhoIF/sYHD5', + ... 'Bkf+v/J8F40KGYY16b0DjQIUlnra9y7q9jj0h2bvc9LAtgHtVUso133LLcVYl7RP', + ... 'Vjyz9Ps366BtIdPlAL4CoF5hEcMKS5J3h1vRlyAKN4uHENl5vKvoxn7ID3JhhWQc', + ... '6QrPGis64zi3OnYor34HPh/KNJvkgOQkekmtYuTxnkiONA4lhMDJgeaVZ9WZq+GV', + ... 'MaCvCFGNYU2TV4V8wMlnUbF8d5bDQ83g8MxIVKdDcnBzzYLZha+qmz4Spry9iB53', + ... 'Sg/sM5H8gWWSl7Oj1lxVg7o7IscpQfVt6zL6jD2VjL3L3Hu7WEXIrcGZtvrP4d+C', + ... 'TGYWiGlh5B2UCFk2bVctfw8W/QfaVvJYD4Rfqta2V2p14KIJLFRSGa1g26W4ixrH', + ... 'XKxgaA3AIfJ+6c5RoisRLuYCxvQi91wkE9hAXR+inXK4Hq4SmiHoeITZFhHP3hh3', + ... 'rbpp8mopiMNxWqCbuqgILP6pShn4oPclu9aR8uJ1ziDxISTGYC71mvLUERUjFn2L', + ... 'fu6C0+TCC9RmeyL+eNdM6cjs1G7YR6yX', + ... '=phHd', + ... '-----END PGP MESSAGE-----', + ... '', + ... ]).encode('us-ascii') + >>> output,verified,message = verify_bytes(b) + >>> output + b'Success!\n' + >>> verified + False + >>> print(message) + gpg: Signature made Wed 21 Mar 2012 03:13:57 PM EDT using RSA key ID 4332B6E3 + gpg: Good signature from "pgp-mime-test (http://blog.tremily.us/posts/pgp-mime/) " + gpg: WARNING: This key is not certified with a trusted signature! + gpg: There is no indication that the signature belongs to the owner. + Primary key fingerprint: B2ED BE0E 771A 4B87 08DD 16A7 511A EDA6 4332 B6E3 + + + >>> b = b'Success!\n' + >>> signature = '\n'.join([ + ... '-----BEGIN PGP SIGNATURE-----', + ... 'Version: GnuPG v2.0.17 (GNU/Linux)', + ... '', + ... 'iQEcBAEBAgAGBQJPaiw/AAoJEFEa7aZDMrbj93gH/1fQPXLjUTpONJUTmvGoMLNA', + ... 'W9ZhjpUL5i6rRqYGUvQ4kTEDuPMxkMrCyFCDHEhSDHufMek6Nso5/HeJn3aqxlgs', + ... 'hmNlvAq4FI6JQyFL7eCp/XG9cPx1p42dTI7JAih8FuK21sS4m/H5XP3R/6KXC99D', + ... '39rrXCvvR+yNgKe2dxuJwmKuLteVlcWxiIQwVrYK70GtJHC5BO79G8yGccWoEy9C', + ... '9JkJiyNptqZyFjGBNmMmrCSFZ7ZFA02RB+laRmwuIiozw4TJYEksxPrgZMbbcFzx', + ... 'zs3JHyV23+Fz1ftalvwskHE7tJkX9Ub8iBMNZ/KxJXXdPdpuMdEYVjoUehkQBQE=', + ... '=rRBP', + ... '-----END PGP SIGNATURE-----', + ... '', + ... ]).encode('us-ascii') + >>> output,verified,message = verify_bytes(b, signature=signature) + >>> output + b'Success!\n' + >>> verified + False + >>> print(message) + gpg: Signature made Wed 21 Mar 2012 03:30:07 PM EDT using RSA key ID 4332B6E3 + gpg: Good signature from "pgp-mime-test (http://blog.tremily.us/posts/pgp-mime/) " + gpg: WARNING: This key is not certified with a trusted signature! + gpg: There is no indication that the signature belongs to the owner. + Primary key fingerprint: B2ED BE0E 771A 4B87 08DD 16A7 511A EDA6 4332 B6E3 + + """ + args = GPG_ARGS + GPG_VERIFY_ARGS + kwargs = {} + sig_read = sig_thread = None + if signature: + sig_read,sig_thread = thread_pipe(signature) + args.extend( + ['--enable-special-filenames', '--verify', + '--', '-&{}'.format(sig_read), '-']) + kwargs['close_fds'] = False + else: + kwargs['close_fds'] = True + try: + status,output,error = execute(args, stdin=bytes, **kwargs) + finally: + if sig_read: + _os.close(sig_read) + if sig_thread: + sig_thread.join() + if signature: + assert output == b'', output + output = bytes + error = str(error, 'us-ascii') + verified = True + for string in GPG_VERIFY_FAILED: + if string in error: + verified = False + break + return (output, verified, error) + def sign(message, sign_as=None): r"""Sign a ``Message``, returning the signed version. -- 2.26.2