Add decrypt_bytes() and verify_bytes().
authorW. Trevor King <wking@drexel.edu>
Wed, 21 Mar 2012 19:11:43 +0000 (15:11 -0400)
committerW. Trevor King <wking@drexel.edu>
Wed, 21 Mar 2012 21:25:13 +0000 (17:25 -0400)
Create the encrypted test input with:

  $ echo 'Success!' | gpg --no-verbose --quiet --batch --output -
      --armor --textmode --encrypt --always-trust
      --recipient pgp-mime@invalid.com

Create the signed and encrypted test input with:

  $ echo 'Success!' | gpg --no-verbose --quiet --batch --output -
      --armor --textmode --sign --encrypt --always-trust
      --local-user pgp-mime@invalid.com --recipient pgp-mime@invalid.com

Created the detached signature test input with:

  $ echo 'Success!' | gpg --no-verbose --quiet --batch --output -
      --armor --textmode --detach-sign --always-trust
      --local-user pgp-mime@invalid.com

Verification with a detached signature is the tricky bit.  We are
piping the signed data in via stdout.  To avoid opening a temporary
file, we need to pipe the signature in through another pipe.  The new
`thread_pipe()` function opens that pipe, and spawns a thread writing
the signature data, and the `--enable-special-filenames` option lets
us specify the read-descriptor with the `-&n` syntax.

The threading avoids deadlocking with `execute()`'s `communicate()`
call, and makes cleanup of the write-descriptor easier.

pgp_mime.py

index 3a6f15dba9d9e99492d8e43220b598315d9a0ec5..b663437cbe81c885ba10e9cbd57db7908188242a 100644 (file)
@@ -30,6 +30,7 @@ import re as _re
 import smtplib as _smtplib
 import smtplib as _smtplib
 import subprocess as _subprocess
+import threading as _threading
 
 from email.encoders import encode_7or8bit as _encode_7or8bit
 from email.generator import Generator as _Generator
@@ -54,11 +55,17 @@ ENCODING = 'utf-8'
 #ENCODING = 'iso-8859-1'
 
 GPG_ARGS = [
-    '/usr/bin/gpg', '--no-verbose', '--quiet', '--batch', '--output', '-',
-    '--armor', '--textmode']
-GPG_SIGN_ARGS = ['--detach-sign']
-GPG_ENCRYPT_ARGS = ['--encrypt', '--always-trust']
-GPG_SIGN_AND_ENCRYPT_ARGS = ['--sign', '--encrypt', '--always-trust']
+    '/usr/bin/gpg', '--no-verbose', '--quiet', '--batch', '--output', '-']
+GPG_SIGN_ARGS = ['--armor', '--textmode', '--detach-sign']
+GPG_ENCRYPT_ARGS = ['--armor', '--textmode', '--encrypt', '--always-trust']
+GPG_SIGN_AND_ENCRYPT_ARGS = [
+    '--armor', '--textmode', '--sign', '--encrypt', '--always-trust']
+GPG_DECRYPT_ARGS = []
+GPG_VERIFY_ARGS = []
+GPG_VERIFY_FAILED = [
+    'This key is not certified with a trusted signature',
+    'WARNING',
+    ]
 SENDMAIL = ['/usr/sbin/sendmail', '-t']
 
 
@@ -146,13 +153,15 @@ def mail(message, smtp=None, sendmail=None):
     >>> message = encodedMIMEText('howdy!')
     >>> message['From'] = 'John Doe <jdoe@a.gov.ru>'
     >>> message['To'] = 'Jack <jack@hill.org>, Jill <jill@hill.org>'
-    >>> mail(message=message, sendmail=SENDMAIL)
+    >>> mail(message=message, sendmail=SENDMAIL)  # doctest: +SKIP
     """
     LOG.info('send message {} -> {}'.format(message['from'], message['to']))
     if smtp:
         smtp.send_message(msg=message)
     elif sendmail:
-        execute(sendmail, stdin=message.as_string().encode('us-ascii'))
+        execute(
+            sendmail, stdin=message.as_string().encode('us-ascii'),
+            close_fds=True)
     else:
         smtp = _smtplib.SMTP()
         smtp.connect()
@@ -305,14 +314,14 @@ def attach_root(header, root_part):
         root_part[k] = v
     return root_part    
 
-def execute(args, stdin=None, expect=(0,), env=_os.environ):
+def execute(args, stdin=None, expect=(0,), env=_os.environ, **kwargs):
     """Execute a command (allows us to drive gpg).
     """
     LOG.debug('$ {}'.format(args))
     try:
         p = _subprocess.Popen(
             args, stdin=_subprocess.PIPE, stdout=_subprocess.PIPE,
-            stderr=_subprocess.PIPE, shell=False, close_fds=True, env=env)
+            stderr=_subprocess.PIPE, shell=False, env=env, **kwargs)
     except OSError as e:
         raise Exception('{}\nwhile executing {}'.format(e.args[1], args))
     output,error = p.communicate(input=stdin)
@@ -377,6 +386,46 @@ def email_targets(message):
     return getaddresses(
         tos + ccs + bccs + resent_tos + resent_ccs + resent_bccs)
 
+def _thread_pipe(fd, data):
+    """Write ``data`` to ``fd`` and close ``fd``.
+
+    A helper function for ``thread_pipe``.
+
+    >>> 
+    """
+    LOG.debug('starting pipe-write thread')
+    try:
+        _os.write(fd, data)
+    finally:
+        LOG.debug('closing pipe-write file descriptor')
+        _os.close(fd)
+        LOG.debug('closed pipe-write file descriptor')
+
+def thread_pipe(data):
+    """Write data to a pipe.
+
+    Return the associated read file descriptor and running ``Thread``
+    that's doing the writing.
+
+    >>> import os
+    >>> read,thread = thread_pipe(b'Hello world!')
+    >>> try:
+    ...     print(os.read(read, 100))
+    ... finally:
+    ...     thread.join()
+    b'Hello world!'
+    """
+    read,write = _os.pipe()
+    LOG.debug('opened a pipe {} -> {}'.format(write, read))
+    try:
+        thread = _threading.Thread(
+            name='pipe writer', target=_thread_pipe, args=(write, data))
+        thread.start()
+    except:
+        _os.close(read)
+        _os.close(write)
+    return (read, thread)
+
 def sign_bytes(bytes, sign_as=None):
     r"""Sign ``bytes`` as ``sign_as``.
 
@@ -387,7 +436,7 @@ def sign_bytes(bytes, sign_as=None):
     args = GPG_ARGS + GPG_SIGN_ARGS
     if sign_as:
         args.extend(['--local-user', sign_as])
-    status,output,error = execute(args, stdin=bytes)
+    status,output,error = execute(args, stdin=bytes, close_fds=True)
     return output
 
 def encrypt_bytes(bytes, recipients):
@@ -402,7 +451,7 @@ def encrypt_bytes(bytes, recipients):
         raise ValueError('no recipients specified for encryption')
     for recipient in recipients:
         args.extend(['--recipient', recipient])
-    status,output,error = execute(args, stdin=bytes)
+    status,output,error = execute(args, stdin=bytes, close_fds=True)
     return output
 
 def sign_and_encrypt_bytes(bytes, sign_as=None, recipients=None):
@@ -420,9 +469,132 @@ def sign_and_encrypt_bytes(bytes, sign_as=None, recipients=None):
         raise ValueError('no recipients specified for encryption')
     for recipient in recipients:
         args.extend(['--recipient', recipient])
-    status,output,error = execute(args, stdin=bytes)
+    status,output,error = execute(args, stdin=bytes, close_fds=True)
     return output
 
+def decrypt_bytes(bytes):
+    r"""Decrypt ``bytes``.
+
+    >>> b = '\n'.join([
+    ...     '-----BEGIN PGP MESSAGE-----',
+    ...     'Version: GnuPG v2.0.17 (GNU/Linux)',
+    ...     '',
+    ...     'hQEMA1Ea7aZDMrbjAQf/TAqLjksZSJxSqkBxYT5gtLQoXY6isvRZg2apjs7CW0y2',
+    ...     'tFK/ptnVYAq2OtWQFhbiJXj8hmwJyyFfb3lghpeu4ihO52JgkkwOpmJb6dxjOi83',
+    ...     'qDwaGOogEPH38BNLuwdrMCW0jmNROwvS796PtqSGUaJTuIiKUB8lETwPwIHrDc11',
+    ...     'N3RWStE5uShNkXXQXplUoeCKf3N4XguXym+GQCqJQzlEMrkkDdr4l7mzvt3Nf8EA',
+    ...     'SgSak086tUoo9x8IN5PJCuOJkcXcjQzFcpqOsA7dyZKO8NeQUZv2JvlZuorckNvN',
+    ...     'xx3PwW0a8VeJgTQrh64ZK/d3F3gNHUTzXkq/UIn25tJFAcmSUwxtsBal7p8zAeCV',
+    ...     '8zefsHRQ5Y03IBeYBcVJBhDS9XfvwLQTJiGGstPCxzKTwSUT1MzV5t5twG/STDCc',
+    ...     'uxW3wSdo',
+    ...     '=bZI+',
+    ...     '-----END PGP MESSAGE-----',
+    ...     ''
+    ...     ]).encode('us-ascii')
+    >>> decrypt_bytes(b)
+    b'Success!\n'
+    """
+    args = GPG_ARGS + GPG_DECRYPT_ARGS
+    status,output,error = execute(args, stdin=bytes, close_fds=True)
+    return output
+
+def verify_bytes(bytes, signature=None):
+    r"""Verify a signature on ``bytes``, possibly decrypting first.
+
+    These tests assume you didn't trust the distributed test key.
+
+    >>> b = '\n'.join([
+    ...     '-----BEGIN PGP MESSAGE-----',
+    ...     'Version: GnuPG v2.0.17 (GNU/Linux)',
+    ...     '',
+    ...     'hQEMA1Ea7aZDMrbjAQf/YM1SeFzNGz0DnUynaEyhfGCvcqmjtbN1PtZMpT7VaQLN',
+    ...     'a+c0faskr79Atz0+2IBR7CDOlcETrRtH2EnrWukbRIDtmffNFGuhMRTNfnQ15OIN',
+    ...     'qrmt2P5gXznsgnm2XjzTK7S/Cc3Aq+zjaDrDt7bIedEdz+EyNgaKuL/lB9cAB8xL',
+    ...     'YYp/yn55Myjair2idgzsa7w/QXdE3RhpyRLqR2Jgz4P1I1xOgUYnylbpIZL9FOKN',
+    ...     'NR3RQhkGdANBku8otfthb5ZUGsNMV45ct4V8PE+xChjFb9gcwpaf1hhoIF/sYHD5',
+    ...     'Bkf+v/J8F40KGYY16b0DjQIUlnra9y7q9jj0h2bvc9LAtgHtVUso133LLcVYl7RP',
+    ...     'Vjyz9Ps366BtIdPlAL4CoF5hEcMKS5J3h1vRlyAKN4uHENl5vKvoxn7ID3JhhWQc',
+    ...     '6QrPGis64zi3OnYor34HPh/KNJvkgOQkekmtYuTxnkiONA4lhMDJgeaVZ9WZq+GV',
+    ...     'MaCvCFGNYU2TV4V8wMlnUbF8d5bDQ83g8MxIVKdDcnBzzYLZha+qmz4Spry9iB53',
+    ...     'Sg/sM5H8gWWSl7Oj1lxVg7o7IscpQfVt6zL6jD2VjL3L3Hu7WEXIrcGZtvrP4d+C',
+    ...     'TGYWiGlh5B2UCFk2bVctfw8W/QfaVvJYD4Rfqta2V2p14KIJLFRSGa1g26W4ixrH',
+    ...     'XKxgaA3AIfJ+6c5RoisRLuYCxvQi91wkE9hAXR+inXK4Hq4SmiHoeITZFhHP3hh3',
+    ...     'rbpp8mopiMNxWqCbuqgILP6pShn4oPclu9aR8uJ1ziDxISTGYC71mvLUERUjFn2L',
+    ...     'fu6C0+TCC9RmeyL+eNdM6cjs1G7YR6yX',
+    ...     '=phHd',
+    ...     '-----END PGP MESSAGE-----',
+    ...     '',
+    ...     ]).encode('us-ascii')
+    >>> output,verified,message = verify_bytes(b)
+    >>> output
+    b'Success!\n'
+    >>> verified
+    False
+    >>> print(message)
+    gpg: Signature made Wed 21 Mar 2012 03:13:57 PM EDT using RSA key ID 4332B6E3
+    gpg: Good signature from "pgp-mime-test (http://blog.tremily.us/posts/pgp-mime/) <pgp-mime@invalid.com>"
+    gpg: WARNING: This key is not certified with a trusted signature!
+    gpg:          There is no indication that the signature belongs to the owner.
+    Primary key fingerprint: B2ED BE0E 771A 4B87 08DD  16A7 511A EDA6 4332 B6E3
+    <BLANKLINE>
+
+    >>> b = b'Success!\n'
+    >>> signature = '\n'.join([
+    ...     '-----BEGIN PGP SIGNATURE-----',
+    ...     'Version: GnuPG v2.0.17 (GNU/Linux)',
+    ...     '',
+    ...     'iQEcBAEBAgAGBQJPaiw/AAoJEFEa7aZDMrbj93gH/1fQPXLjUTpONJUTmvGoMLNA',
+    ...     'W9ZhjpUL5i6rRqYGUvQ4kTEDuPMxkMrCyFCDHEhSDHufMek6Nso5/HeJn3aqxlgs',
+    ...     'hmNlvAq4FI6JQyFL7eCp/XG9cPx1p42dTI7JAih8FuK21sS4m/H5XP3R/6KXC99D',
+    ...     '39rrXCvvR+yNgKe2dxuJwmKuLteVlcWxiIQwVrYK70GtJHC5BO79G8yGccWoEy9C',
+    ...     '9JkJiyNptqZyFjGBNmMmrCSFZ7ZFA02RB+laRmwuIiozw4TJYEksxPrgZMbbcFzx',
+    ...     'zs3JHyV23+Fz1ftalvwskHE7tJkX9Ub8iBMNZ/KxJXXdPdpuMdEYVjoUehkQBQE=',
+    ...     '=rRBP',
+    ...     '-----END PGP SIGNATURE-----',
+    ...     '',
+    ...     ]).encode('us-ascii')
+    >>> output,verified,message = verify_bytes(b, signature=signature)
+    >>> output
+    b'Success!\n'
+    >>> verified
+    False
+    >>> print(message)
+    gpg: Signature made Wed 21 Mar 2012 03:30:07 PM EDT using RSA key ID 4332B6E3
+    gpg: Good signature from "pgp-mime-test (http://blog.tremily.us/posts/pgp-mime/) <pgp-mime@invalid.com>"
+    gpg: WARNING: This key is not certified with a trusted signature!
+    gpg:          There is no indication that the signature belongs to the owner.
+    Primary key fingerprint: B2ED BE0E 771A 4B87 08DD  16A7 511A EDA6 4332 B6E3
+    <BLANKLINE>
+    """
+    args = GPG_ARGS + GPG_VERIFY_ARGS
+    kwargs = {}
+    sig_read = sig_thread = None
+    if signature:
+        sig_read,sig_thread = thread_pipe(signature)
+        args.extend(
+            ['--enable-special-filenames', '--verify',
+             '--', '-&{}'.format(sig_read), '-'])
+        kwargs['close_fds'] = False
+    else:
+        kwargs['close_fds'] = True
+    try:
+        status,output,error = execute(args, stdin=bytes, **kwargs)
+    finally:
+        if sig_read:
+            _os.close(sig_read)
+        if sig_thread:
+            sig_thread.join()
+    if signature:
+        assert output == b'', output
+        output = bytes
+    error = str(error, 'us-ascii')
+    verified = True
+    for string in GPG_VERIFY_FAILED:
+        if string in error:
+            verified = False
+            break
+    return (output, verified, error)
+
 def sign(message, sign_as=None):
     r"""Sign a ``Message``, returning the signed version.