import subprocess as _subprocess
import threading as _threading
+from email import message_from_bytes as _message_from_bytes
from email.encoders import encode_7or8bit as _encode_7or8bit
-from email.generator import Generator as _Generator
from email.header import decode_header as _decode_header
from email.message import Message as _Message
from email.mime.application import MIMEApplication as _MIMEApplication
"""
LOG.debug('starting pipe-write thread')
try:
- _os.write(fd, data)
+ remaining = len(data)
+ while remaining:
+ remaining -= _os.write(fd, data[-remaining:])
finally:
LOG.debug('closing pipe-write file descriptor')
_os.close(fd)
msg.attach(enc)
msg['Content-Disposition'] = 'inline'
return msg
+
+def _get_encrypted_parts(message):
+ ct = message.get_content_type()
+ assert ct == 'multipart/encrypted', ct
+ params = dict(message.get_params())
+ assert params.get('protocol', None) == 'application/pgp-encrypted', params
+ assert message.is_multipart(), message
+ control = body = None
+ for part in message.get_payload():
+ if part == message:
+ continue
+ assert part.is_multipart() == False, part
+ ct = part.get_content_type()
+ if ct == 'application/pgp-encrypted':
+ if control:
+ raise ValueError('multiple application/pgp-encrypted parts')
+ control = part
+ elif ct == 'application/octet-stream':
+ if body:
+ raise ValueError('multiple application/octet-stream parts')
+ body = part
+ else:
+ raise ValueError('unnecessary {} part'.format(ct))
+ if not control:
+ raise ValueError('missing application/pgp-encrypted part')
+ if not body:
+ raise ValueError('missing application/octet-stream part')
+ return (control, body)
+
+def _get_signed_parts(message):
+ ct = message.get_content_type()
+ assert ct == 'multipart/signed', ct
+ params = dict(message.get_params())
+ assert params.get('protocol', None) == 'application/pgp-signature', params
+ assert message.is_multipart(), message
+ body = signature = None
+ for part in message.get_payload():
+ if part == message:
+ continue
+ ct = part.get_content_type()
+ if ct == 'application/pgp-signature':
+ if signature:
+ raise ValueError('multiple application/pgp-signature parts')
+ signature = part
+ else:
+ if body:
+ raise ValueError('multiple non-signature parts')
+ body = part
+ if not body:
+ raise ValueError('missing body part')
+ if not signature:
+ raise ValueError('missing application/pgp-signature part')
+ return (body, signature)
+
+def decrypt(message):
+ r"""Decrypt a multipart/encrypted message.
+
+ >>> message = encodedMIMEText('Hi\nBye')
+ >>> encrypted = encrypt(message, recipients=['<pgp-mime@invalid.com>'])
+ >>> decrypted = decrypt(encrypted)
+ >>> print(decrypted.as_string()) # doctest: +ELLIPSIS, +REPORT_UDIFF
+ Content-Type: text/plain; charset="us-ascii"
+ MIME-Version: 1.0
+ Content-Transfer-Encoding: 7bit
+ Content-Disposition: inline
+ <BLANKLINE>
+ Hi
+ Bye
+
+ >>> from email.mime.multipart import MIMEMultipart
+ >>> message = MIMEMultipart()
+ >>> message.attach(encodedMIMEText('Part A'))
+ >>> message.attach(encodedMIMEText('Part B'))
+ >>> encrypted = encrypt(message, recipients=['pgp-mime@invalid.com'])
+ >>> decrypted = decrypt(encrypted)
+ >>> decrypted.set_boundary('boundsep')
+ >>> print(decrypted.as_string()) # doctest: +ELLIPSIS, +REPORT_UDIFF
+ Content-Type: multipart/mixed; boundary="boundsep"
+ MIME-Version: 1.0
+ <BLANKLINE>
+ --boundsep
+ Content-Type: text/plain; charset="us-ascii"
+ MIME-Version: 1.0
+ Content-Transfer-Encoding: 7bit
+ Content-Disposition: inline
+ <BLANKLINE>
+ Part A
+ --boundsep
+ Content-Type: text/plain; charset="us-ascii"
+ MIME-Version: 1.0
+ Content-Transfer-Encoding: 7bit
+ Content-Disposition: inline
+ <BLANKLINE>
+ Part B
+ --boundsep--
+ <BLANKLINE>
+ """
+ control,body = _get_encrypted_parts(message)
+ encrypted = body.get_payload(decode=True)
+ if not isinstance(encrypted, bytes):
+ encrypted = encrypted.encode('us-ascii')
+ decrypted = decrypt_bytes(encrypted)
+ return _message_from_bytes(decrypted)
+
+def verify(message):
+ r"""Verify a signature on ``message``, possibly decrypting first.
+
+ >>> message = encodedMIMEText('Hi\nBye')
+ >>> message['To'] = 'pgp-mime-test <pgp-mime@invalid.com>'
+ >>> encrypted = sign_and_encrypt(message, sign_as='pgp-mime@invalid.com')
+ >>> decrypted,verified,message = verify(encrypted)
+ >>> print(decrypted.as_string()) # doctest: +ELLIPSIS, +REPORT_UDIFF
+ Content-Type: text/plain; charset="us-ascii"
+ MIME-Version: 1.0
+ Content-Transfer-Encoding: 7bit
+ Content-Disposition: inline
+ To: pgp-mime-test <pgp-mime@invalid.com>
+ <BLANKLINE>
+ Hi
+ Bye
+ >>> verified
+ False
+ >>> print(message) # doctest: +ELLIPSIS, +REPORT_UDIFF
+ gpg: Signature made ... using RSA key ID 4332B6E3
+ gpg: Good signature from "pgp-mime-test (http://blog.tremily.us/posts/pgp-mime/) <pgp-mime@invalid.com>"
+ gpg: WARNING: This key is not certified with a trusted signature!
+ gpg: There is no indication that the signature belongs to the owner.
+ Primary key fingerprint: B2ED BE0E 771A 4B87 08DD 16A7 511A EDA6 4332 B6E3
+ <BLANKLINE>
+
+ >>> from email.mime.multipart import MIMEMultipart
+ >>> message = MIMEMultipart()
+ >>> message.attach(encodedMIMEText('Part A'))
+ >>> message.attach(encodedMIMEText('Part B'))
+ >>> signed = sign(message, sign_as='pgp-mime@invalid.com')
+ >>> decrypted,verified,message = verify(signed)
+ >>> decrypted.set_boundary('boundsep')
+ >>> print(decrypted.as_string()) # doctest: +ELLIPSIS, +REPORT_UDIFF
+ Content-Type: multipart/mixed; boundary="boundsep"
+ MIME-Version: 1.0
+ <BLANKLINE>
+ --boundsep
+ Content-Type: text/plain; charset="us-ascii"
+ MIME-Version: 1.0
+ Content-Transfer-Encoding: 7bit
+ Content-Disposition: inline
+ <BLANKLINE>
+ Part A
+ --boundsep
+ Content-Type: text/plain; charset="us-ascii"
+ MIME-Version: 1.0
+ Content-Transfer-Encoding: 7bit
+ Content-Disposition: inline
+ <BLANKLINE>
+ Part B
+ --boundsep--
+ >>> verified
+ False
+ >>> print(message) # doctest: +ELLIPSIS, +REPORT_UDIFF
+ gpg: Signature made ... using RSA key ID 4332B6E3
+ gpg: Good signature from "pgp-mime-test (http://blog.tremily.us/posts/pgp-mime/) <pgp-mime@invalid.com>"
+ gpg: WARNING: This key is not certified with a trusted signature!
+ gpg: There is no indication that the signature belongs to the owner.
+ Primary key fingerprint: B2ED BE0E 771A 4B87 08DD 16A7 511A EDA6 4332 B6E3
+ <BLANKLINE>
+ """
+ ct = message.get_content_type()
+ if ct == 'multipart/encrypted':
+ control,body = _get_encrypted_parts(message)
+ encrypted = body.get_payload(decode=True)
+ if not isinstance(encrypted, bytes):
+ encrypted = encrypted.encode('us-ascii')
+ decrypted,verified,message = verify_bytes(encrypted)
+ return (_message_from_bytes(decrypted), verified, message)
+ body,signature = _get_signed_parts(message)
+ sig_data = signature.get_payload(decode=True)
+ if not isinstance(sig_data, bytes):
+ sig_data = sig_data.encode('us-ascii')
+ decrypted,verified,message = verify_bytes(
+ body.as_string().encode('us-ascii'), signature=sig_data)
+ return (body, verified, message)