From 1b2c53092f6157fa493bad00d1b64c2f3b833d35 Mon Sep 17 00:00:00 2001 From: "W. Trevor King" Date: Wed, 21 Mar 2012 17:55:30 -0400 Subject: [PATCH] Add decrypt() and verify() functions for decrypting and verifying Messages. I've made _thread_pipe() a bit more robust (now it will try several times to write), but sometimes things still block. Print statements show thread_pipe() wrapping up, but the main thread hangs in communicate()'s poll(), and gpg hangs after reading the full signature. --- pgp_mime.py | 187 +++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 185 insertions(+), 2 deletions(-) diff --git a/pgp_mime.py b/pgp_mime.py index b663437..0efb7fe 100644 --- a/pgp_mime.py +++ b/pgp_mime.py @@ -32,8 +32,8 @@ import smtplib as _smtplib import subprocess as _subprocess import threading as _threading +from email import message_from_bytes as _message_from_bytes from email.encoders import encode_7or8bit as _encode_7or8bit -from email.generator import Generator as _Generator from email.header import decode_header as _decode_header from email.message import Message as _Message from email.mime.application import MIMEApplication as _MIMEApplication @@ -395,7 +395,9 @@ def _thread_pipe(fd, data): """ LOG.debug('starting pipe-write thread') try: - _os.write(fd, data) + remaining = len(data) + while remaining: + remaining -= _os.write(fd, data[-remaining:]) finally: LOG.debug('closing pipe-write file descriptor') _os.close(fd) @@ -870,3 +872,184 @@ def sign_and_encrypt(message, sign_as=None, recipients=None): msg.attach(enc) msg['Content-Disposition'] = 'inline' return msg + +def _get_encrypted_parts(message): + ct = message.get_content_type() + assert ct == 'multipart/encrypted', ct + params = dict(message.get_params()) + assert params.get('protocol', None) == 'application/pgp-encrypted', params + assert message.is_multipart(), message + control = body = None + for part in message.get_payload(): + if part == message: + continue + assert part.is_multipart() == False, part + ct = part.get_content_type() + if ct == 'application/pgp-encrypted': + if control: + raise ValueError('multiple application/pgp-encrypted parts') + control = part + elif ct == 'application/octet-stream': + if body: + raise ValueError('multiple application/octet-stream parts') + body = part + else: + raise ValueError('unnecessary {} part'.format(ct)) + if not control: + raise ValueError('missing application/pgp-encrypted part') + if not body: + raise ValueError('missing application/octet-stream part') + return (control, body) + +def _get_signed_parts(message): + ct = message.get_content_type() + assert ct == 'multipart/signed', ct + params = dict(message.get_params()) + assert params.get('protocol', None) == 'application/pgp-signature', params + assert message.is_multipart(), message + body = signature = None + for part in message.get_payload(): + if part == message: + continue + ct = part.get_content_type() + if ct == 'application/pgp-signature': + if signature: + raise ValueError('multiple application/pgp-signature parts') + signature = part + else: + if body: + raise ValueError('multiple non-signature parts') + body = part + if not body: + raise ValueError('missing body part') + if not signature: + raise ValueError('missing application/pgp-signature part') + return (body, signature) + +def decrypt(message): + r"""Decrypt a multipart/encrypted message. + + >>> message = encodedMIMEText('Hi\nBye') + >>> encrypted = encrypt(message, recipients=['']) + >>> decrypted = decrypt(encrypted) + >>> print(decrypted.as_string()) # doctest: +ELLIPSIS, +REPORT_UDIFF + Content-Type: text/plain; charset="us-ascii" + MIME-Version: 1.0 + Content-Transfer-Encoding: 7bit + Content-Disposition: inline + + Hi + Bye + + >>> from email.mime.multipart import MIMEMultipart + >>> message = MIMEMultipart() + >>> message.attach(encodedMIMEText('Part A')) + >>> message.attach(encodedMIMEText('Part B')) + >>> encrypted = encrypt(message, recipients=['pgp-mime@invalid.com']) + >>> decrypted = decrypt(encrypted) + >>> decrypted.set_boundary('boundsep') + >>> print(decrypted.as_string()) # doctest: +ELLIPSIS, +REPORT_UDIFF + Content-Type: multipart/mixed; boundary="boundsep" + MIME-Version: 1.0 + + --boundsep + Content-Type: text/plain; charset="us-ascii" + MIME-Version: 1.0 + Content-Transfer-Encoding: 7bit + Content-Disposition: inline + + Part A + --boundsep + Content-Type: text/plain; charset="us-ascii" + MIME-Version: 1.0 + Content-Transfer-Encoding: 7bit + Content-Disposition: inline + + Part B + --boundsep-- + + """ + control,body = _get_encrypted_parts(message) + encrypted = body.get_payload(decode=True) + if not isinstance(encrypted, bytes): + encrypted = encrypted.encode('us-ascii') + decrypted = decrypt_bytes(encrypted) + return _message_from_bytes(decrypted) + +def verify(message): + r"""Verify a signature on ``message``, possibly decrypting first. + + >>> message = encodedMIMEText('Hi\nBye') + >>> message['To'] = 'pgp-mime-test ' + >>> encrypted = sign_and_encrypt(message, sign_as='pgp-mime@invalid.com') + >>> decrypted,verified,message = verify(encrypted) + >>> print(decrypted.as_string()) # doctest: +ELLIPSIS, +REPORT_UDIFF + Content-Type: text/plain; charset="us-ascii" + MIME-Version: 1.0 + Content-Transfer-Encoding: 7bit + Content-Disposition: inline + To: pgp-mime-test + + Hi + Bye + >>> verified + False + >>> print(message) # doctest: +ELLIPSIS, +REPORT_UDIFF + gpg: Signature made ... using RSA key ID 4332B6E3 + gpg: Good signature from "pgp-mime-test (http://blog.tremily.us/posts/pgp-mime/) " + gpg: WARNING: This key is not certified with a trusted signature! + gpg: There is no indication that the signature belongs to the owner. + Primary key fingerprint: B2ED BE0E 771A 4B87 08DD 16A7 511A EDA6 4332 B6E3 + + + >>> from email.mime.multipart import MIMEMultipart + >>> message = MIMEMultipart() + >>> message.attach(encodedMIMEText('Part A')) + >>> message.attach(encodedMIMEText('Part B')) + >>> signed = sign(message, sign_as='pgp-mime@invalid.com') + >>> decrypted,verified,message = verify(signed) + >>> decrypted.set_boundary('boundsep') + >>> print(decrypted.as_string()) # doctest: +ELLIPSIS, +REPORT_UDIFF + Content-Type: multipart/mixed; boundary="boundsep" + MIME-Version: 1.0 + + --boundsep + Content-Type: text/plain; charset="us-ascii" + MIME-Version: 1.0 + Content-Transfer-Encoding: 7bit + Content-Disposition: inline + + Part A + --boundsep + Content-Type: text/plain; charset="us-ascii" + MIME-Version: 1.0 + Content-Transfer-Encoding: 7bit + Content-Disposition: inline + + Part B + --boundsep-- + >>> verified + False + >>> print(message) # doctest: +ELLIPSIS, +REPORT_UDIFF + gpg: Signature made ... using RSA key ID 4332B6E3 + gpg: Good signature from "pgp-mime-test (http://blog.tremily.us/posts/pgp-mime/) " + gpg: WARNING: This key is not certified with a trusted signature! + gpg: There is no indication that the signature belongs to the owner. + Primary key fingerprint: B2ED BE0E 771A 4B87 08DD 16A7 511A EDA6 4332 B6E3 + + """ + ct = message.get_content_type() + if ct == 'multipart/encrypted': + control,body = _get_encrypted_parts(message) + encrypted = body.get_payload(decode=True) + if not isinstance(encrypted, bytes): + encrypted = encrypted.encode('us-ascii') + decrypted,verified,message = verify_bytes(encrypted) + return (_message_from_bytes(decrypted), verified, message) + body,signature = _get_signed_parts(message) + sig_data = signature.get_payload(decode=True) + if not isinstance(sig_data, bytes): + sig_data = sig_data.encode('us-ascii') + decrypted,verified,message = verify_bytes( + body.as_string().encode('us-ascii'), signature=sig_data) + return (body, verified, message) -- 2.26.2