Add decrypt() and verify() functions for decrypting and verifying Messages.
authorW. Trevor King <wking@drexel.edu>
Wed, 21 Mar 2012 21:55:30 +0000 (17:55 -0400)
committerW. Trevor King <wking@drexel.edu>
Wed, 21 Mar 2012 22:37:32 +0000 (18:37 -0400)
I've made _thread_pipe() a bit more robust (now it will try several
times to write), but sometimes things still block.  Print statements
show thread_pipe() wrapping up, but the main thread hangs in
communicate()'s poll(), and gpg hangs after reading the full signature.

pgp_mime.py

index b663437cbe81c885ba10e9cbd57db7908188242a..0efb7fe0890bad71e04daa547ed062cd6f6f2348 100644 (file)
@@ -32,8 +32,8 @@ import smtplib as _smtplib
 import subprocess as _subprocess
 import threading as _threading
 
+from email import message_from_bytes as _message_from_bytes
 from email.encoders import encode_7or8bit as _encode_7or8bit
-from email.generator import Generator as _Generator
 from email.header import decode_header as _decode_header
 from email.message import Message as _Message
 from email.mime.application import MIMEApplication as _MIMEApplication
@@ -395,7 +395,9 @@ def _thread_pipe(fd, data):
     """
     LOG.debug('starting pipe-write thread')
     try:
-        _os.write(fd, data)
+        remaining = len(data)
+        while remaining:
+            remaining -= _os.write(fd, data[-remaining:])
     finally:
         LOG.debug('closing pipe-write file descriptor')
         _os.close(fd)
@@ -870,3 +872,184 @@ def sign_and_encrypt(message, sign_as=None, recipients=None):
     msg.attach(enc)
     msg['Content-Disposition'] = 'inline'
     return msg
+
+def _get_encrypted_parts(message):
+    ct = message.get_content_type()
+    assert ct == 'multipart/encrypted', ct
+    params = dict(message.get_params())
+    assert params.get('protocol', None) == 'application/pgp-encrypted', params
+    assert message.is_multipart(), message
+    control = body = None
+    for part in message.get_payload():
+        if part == message:
+            continue
+        assert part.is_multipart() == False, part
+        ct = part.get_content_type()
+        if ct == 'application/pgp-encrypted':
+            if control:
+                raise ValueError('multiple application/pgp-encrypted parts')
+            control = part
+        elif ct == 'application/octet-stream':
+            if body:
+                raise ValueError('multiple application/octet-stream parts')
+            body = part
+        else:
+            raise ValueError('unnecessary {} part'.format(ct))
+    if not control:
+        raise ValueError('missing application/pgp-encrypted part')
+    if not body:
+        raise ValueError('missing application/octet-stream part')
+    return (control, body)
+
+def _get_signed_parts(message):
+    ct = message.get_content_type()
+    assert ct == 'multipart/signed', ct
+    params = dict(message.get_params())
+    assert params.get('protocol', None) == 'application/pgp-signature', params
+    assert message.is_multipart(), message
+    body = signature = None
+    for part in message.get_payload():
+        if part == message:
+            continue
+        ct = part.get_content_type()
+        if ct == 'application/pgp-signature':
+            if signature:
+                raise ValueError('multiple application/pgp-signature parts')
+            signature = part
+        else:
+            if body:
+                raise ValueError('multiple non-signature parts')
+            body = part
+    if not body:
+        raise ValueError('missing body part')
+    if not signature:
+        raise ValueError('missing application/pgp-signature part')
+    return (body, signature)
+
+def decrypt(message):
+    r"""Decrypt a multipart/encrypted message.
+
+    >>> message = encodedMIMEText('Hi\nBye')
+    >>> encrypted = encrypt(message, recipients=['<pgp-mime@invalid.com>'])
+    >>> decrypted = decrypt(encrypted)
+    >>> print(decrypted.as_string())  # doctest: +ELLIPSIS, +REPORT_UDIFF
+    Content-Type: text/plain; charset="us-ascii"
+    MIME-Version: 1.0
+    Content-Transfer-Encoding: 7bit
+    Content-Disposition: inline
+    <BLANKLINE>
+    Hi
+    Bye
+
+    >>> from email.mime.multipart import MIMEMultipart
+    >>> message = MIMEMultipart()
+    >>> message.attach(encodedMIMEText('Part A'))
+    >>> message.attach(encodedMIMEText('Part B'))
+    >>> encrypted = encrypt(message, recipients=['pgp-mime@invalid.com'])
+    >>> decrypted = decrypt(encrypted)
+    >>> decrypted.set_boundary('boundsep')
+    >>> print(decrypted.as_string()) # doctest: +ELLIPSIS, +REPORT_UDIFF
+    Content-Type: multipart/mixed; boundary="boundsep"
+    MIME-Version: 1.0
+    <BLANKLINE>
+    --boundsep
+    Content-Type: text/plain; charset="us-ascii"
+    MIME-Version: 1.0
+    Content-Transfer-Encoding: 7bit
+    Content-Disposition: inline
+    <BLANKLINE>
+    Part A
+    --boundsep
+    Content-Type: text/plain; charset="us-ascii"
+    MIME-Version: 1.0
+    Content-Transfer-Encoding: 7bit
+    Content-Disposition: inline
+    <BLANKLINE>
+    Part B
+    --boundsep--
+    <BLANKLINE>
+    """
+    control,body = _get_encrypted_parts(message)
+    encrypted = body.get_payload(decode=True)
+    if not isinstance(encrypted, bytes):
+        encrypted = encrypted.encode('us-ascii')
+    decrypted = decrypt_bytes(encrypted)
+    return _message_from_bytes(decrypted)
+
+def verify(message):
+    r"""Verify a signature on ``message``, possibly decrypting first.
+
+    >>> message = encodedMIMEText('Hi\nBye')
+    >>> message['To'] = 'pgp-mime-test <pgp-mime@invalid.com>'
+    >>> encrypted = sign_and_encrypt(message, sign_as='pgp-mime@invalid.com')
+    >>> decrypted,verified,message = verify(encrypted)
+    >>> print(decrypted.as_string())  # doctest: +ELLIPSIS, +REPORT_UDIFF
+    Content-Type: text/plain; charset="us-ascii"
+    MIME-Version: 1.0
+    Content-Transfer-Encoding: 7bit
+    Content-Disposition: inline
+    To: pgp-mime-test <pgp-mime@invalid.com>
+    <BLANKLINE>
+    Hi
+    Bye
+    >>> verified
+    False
+    >>> print(message)  # doctest: +ELLIPSIS, +REPORT_UDIFF
+    gpg: Signature made ... using RSA key ID 4332B6E3
+    gpg: Good signature from "pgp-mime-test (http://blog.tremily.us/posts/pgp-mime/) <pgp-mime@invalid.com>"
+    gpg: WARNING: This key is not certified with a trusted signature!
+    gpg:          There is no indication that the signature belongs to the owner.
+    Primary key fingerprint: B2ED BE0E 771A 4B87 08DD  16A7 511A EDA6 4332 B6E3
+    <BLANKLINE>
+
+    >>> from email.mime.multipart import MIMEMultipart
+    >>> message = MIMEMultipart()
+    >>> message.attach(encodedMIMEText('Part A'))
+    >>> message.attach(encodedMIMEText('Part B'))
+    >>> signed = sign(message, sign_as='pgp-mime@invalid.com')
+    >>> decrypted,verified,message = verify(signed)
+    >>> decrypted.set_boundary('boundsep')
+    >>> print(decrypted.as_string())  # doctest: +ELLIPSIS, +REPORT_UDIFF
+    Content-Type: multipart/mixed; boundary="boundsep"
+    MIME-Version: 1.0
+    <BLANKLINE>
+    --boundsep
+    Content-Type: text/plain; charset="us-ascii"
+    MIME-Version: 1.0
+    Content-Transfer-Encoding: 7bit
+    Content-Disposition: inline
+    <BLANKLINE>
+    Part A
+    --boundsep
+    Content-Type: text/plain; charset="us-ascii"
+    MIME-Version: 1.0
+    Content-Transfer-Encoding: 7bit
+    Content-Disposition: inline
+    <BLANKLINE>
+    Part B
+    --boundsep--
+    >>> verified
+    False
+    >>> print(message)  # doctest: +ELLIPSIS, +REPORT_UDIFF
+    gpg: Signature made ... using RSA key ID 4332B6E3
+    gpg: Good signature from "pgp-mime-test (http://blog.tremily.us/posts/pgp-mime/) <pgp-mime@invalid.com>"
+    gpg: WARNING: This key is not certified with a trusted signature!
+    gpg:          There is no indication that the signature belongs to the owner.
+    Primary key fingerprint: B2ED BE0E 771A 4B87 08DD  16A7 511A EDA6 4332 B6E3
+    <BLANKLINE>
+    """
+    ct = message.get_content_type()
+    if ct == 'multipart/encrypted':
+        control,body = _get_encrypted_parts(message)
+        encrypted = body.get_payload(decode=True)
+        if not isinstance(encrypted, bytes):
+            encrypted = encrypted.encode('us-ascii')
+        decrypted,verified,message = verify_bytes(encrypted)
+        return (_message_from_bytes(decrypted), verified, message)
+    body,signature = _get_signed_parts(message)
+    sig_data = signature.get_payload(decode=True)
+    if not isinstance(sig_data, bytes):
+        sig_data = sig_data.encode('us-ascii')
+    decrypted,verified,message = verify_bytes(
+        body.as_string().encode('us-ascii'), signature=sig_data)
+    return (body, verified, message)