Major send_pgp_mime.py reorganization to better integrate with email.Message.
authorW. Trevor King <wking@drexel.edu>
Sat, 18 Jul 2009 19:17:11 +0000 (15:17 -0400)
committerW. Trevor King <wking@drexel.edu>
Sat, 18 Jul 2009 19:17:11 +0000 (15:17 -0400)
Now send_pgp_mime.py passes it's unittests again, and it should be
easier to use from be-handle-mail :).

Renamed Mail -> EncryptedMessageFactory, since its role is to generate
message bodies of various types (plain, signed, encrypted, ...)

Separated the header processing from Mail, now you need to
  header_from_text()
your header text to create an email.Message which you can use in
EncrypedMessageFactory.sign(), .encrypt(), ...  Once you've created
the body message you want, you can attach it to the header with
  attach_root(header, root_part)
where both header and root_part are email.Message instances.

Made EncryptedMessageFactory doctests more robust, through the use of
 # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
which removed the need for the .strip*() methods.

Also added the configurable from_addr and to_addr, which allows you
to run the doctests with successful gpg calls.  Just set them to
some address from your private keyring, and pass the passphrase for
that key in to your test via a file (or gpg-agent...)
  python send_pgp_mime.py -tP path/to/pasphrase/file

interfaces/email/interactive/send_pgp_mime.py

index 64aedd3aee9f185a9fe6c13efe75f0d0e3a6c4f2..029441379bfb9995aed1f772330de3e51d50a972 100644 (file)
@@ -35,6 +35,7 @@ import tempfile
 import types
 
 try:
+    from email import Message
     from email.mime.text import MIMEText
     from email.mime.multipart import MIMEMultipart
     from email.mime.application import MIMEApplication
@@ -44,6 +45,7 @@ try:
     from email.utils import getaddress
 except ImportError:
     # adjust to old python 2.4
+    from email import Message
     from email.MIMEText import MIMEText
     from email.MIMEMultipart import MIMEMultipart
     from email.MIMENonMultipart import MIMENonMultipart
@@ -115,6 +117,51 @@ pgp_encrypt_only_command='/usr/bin/gpg --no-verbose --quiet --batch --output - -
 pgp_encrypt_sign_command='/usr/bin/gpg --no-verbose --quiet --batch %p --output - --encrypt --sign %?a?-u "%a"? --armor --textmode --always-trust --encrypt-to "%a" %R -- %f'
 sendmail='/usr/sbin/sendmail -t'
 
+def mail(msg, sendmail=None):
+    """
+    Send an email Message instance on its merry way.
+
+    We can shell out to the user specified sendmail in case
+    the local host doesn't have an SMTP server set up
+    for easy smtplib usage.
+    """
+    if sendmail != None:
+        execute(sendmail, stdin=flatten(msg))
+        return None
+    s = smtplib.SMTP()
+    s.connect()
+    s.sendmail(from_addr=source_email(msg),
+               to_addrs=target_emails(msg),
+               msg=flatten(msg))
+    s.close()
+
+def header_from_text(text, encoding="us-ascii"):
+    """
+    Simple wrapper for instantiating an email.Message from text.
+    >>> header = header_from_text('\\n'.join(['From: me@big.edu','To: you@big.edu','Subject: testing']))
+    >>> print flatten(header)
+    From: me@big.edu
+    To: you@big.edu
+    Subject: testing
+    <BLANKLINE>
+    <BLANKLINE>
+    """
+    text = text.strip()
+    if type(text) == types.UnicodeType:
+        text = text.encode(encoding)
+    # assume StringType arguments are already encoded
+    p = Parser()
+    return p.parsestr(text, headersonly=True)
+
+def attach_root(header, root_part):
+    """
+    Attach the email.Message root_part to the email.Message header
+    without generating a multi-part message.
+    """
+    for k,v in self.header.items():
+        root_part[k] = v
+    return root_part    
+
 def execute(args, stdin=None, expect=(0,)):
     """
     Execute a command (allows us to drive gpg).
@@ -192,35 +239,21 @@ def target_emails(msg):
     resent_tos = msg.get_all('resent-to', [])
     resent_ccs = msg.get_all('resent-cc', [])
     resent_bccs = msg.get_all('resent-bcc', [])
-    all_recipients = getaddresses(tos + ccs + bccs + resent_tos + resent_ccs + resent_bccs)
+    all_recipients = getaddresses(tos + ccs + bccs + resent_tos
+                                  + resent_ccs + resent_bccs)
     return [addr[1] for addr in all_recipients]
 
-def mail(msg, sendmail=None):
-    """
-    Send an email Message instance on its merry way.
-
-    We can shell out to the user specified sendmail in case
-    the local host doesn't have an SMTP server set up
-    for easy smtplib usage.
-    """
-    if sendmail != None:
-        execute(sendmail, stdin=flatten(msg))
-        return None
-    s = smtplib.SMTP()
-    s.connect()
-    s.sendmail(from_addr=source_email(msg),
-               to_addrs=target_emails(msg),
-               msg=flatten(msg))
-    s.close()
-
-class Mail (object):
+class EncryptedMessageFactory (object):
     """
     See http://www.ietf.org/rfc/rfc3156.txt for specification details.
-    >>> m = Mail('\\n'.join(['From: me@big.edu','To: you@big.edu','Subject: testing']), 'check 1 2\\ncheck 1 2\\n')
-    >>> print m.sourceEmail()
-    me@big.edu
-    >>> print m.targetEmails()
-    ['you@big.edu']
+    >>> from_addr = "wking@drexel.edu"
+    >>> to_addr = "wking@drexel.edu"
+    >>> header = header_from_text('\\n'.join(['From: %s'%from_addr,'To: %s'%to_addr,'Subject: testing']))
+    >>> source_email(header) == from_addr
+    True
+    >>> target_emails(header) == [to_addr]
+    True
+    >>> m = EncryptedMessageFactory('check 1 2\\ncheck 1 2\\n')
     >>> print flatten(m.clearBodyPart())
     Content-Type: text/plain; charset="us-ascii"
     MIME-Version: 1.0
@@ -234,30 +267,22 @@ class Mail (object):
     Content-Type: text/plain; charset="us-ascii"
     MIME-Version: 1.0
     Content-Transfer-Encoding: 7bit
-    From: me@big.edu
-    To: you@big.edu
-    Subject: testing
     <BLANKLINE>
     check 1 2
     check 1 2
     <BLANKLINE>
-    >>> m.sign()
+    >>> signed = m.sign(header)
     >>> signed.set_boundary('boundsep')
-    >>> print m.stripSig(flatten(signed)).replace('\\t', ' '*4)
-    Content-Type: multipart/signed;
-        protocol="application/pgp-signature";
+    >>> print flatten(signed).replace('\\t', ' '*4) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
+    Content-Type: multipart/signed; protocol="application/pgp-signature";
         micalg="pgp-sha1"; boundary="boundsep"
     MIME-Version: 1.0
-    From: me@big.edu
-    To: you@big.edu
-    Subject: testing
     Content-Disposition: inline
     <BLANKLINE>
     --boundsep
     Content-Type: text/plain; charset="us-ascii"
     MIME-Version: 1.0
     Content-Transfer-Encoding: 7bit
-    Content-Type: text/plain
     Content-Disposition: inline
     <BLANKLINE>
     check 1 2
@@ -271,20 +296,17 @@ class Mail (object):
         charset="us-ascii"
     <BLANKLINE>
     -----BEGIN PGP SIGNATURE-----
-    SIGNATURE STRIPPED (depends on current time)
+    ...
     -----END PGP SIGNATURE-----
     <BLANKLINE>
     --boundsep--
-    >>> encrypted = m.encrypt()
+    >>> encrypted = m.encrypt(header)
     >>> encrypted.set_boundary('boundsep')
-    >>> print m.stripPGP(flatten(encrypted)).replace('\\t', ' '*4)
+    >>> print flatten(encrypted).replace('\\t', ' '*4) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
     Content-Type: multipart/encrypted;
         protocol="application/pgp-encrypted";
         micalg="pgp-sha1"; boundary="boundsep"
     MIME-Version: 1.0
-    From: me@big.edu
-    To: you@big.edu
-    Subject: testing
     Content-Disposition: inline
     <BLANKLINE>
     --boundsep
@@ -300,20 +322,17 @@ class Mail (object):
     Content-Type: application/octet-stream; charset="us-ascii"
     <BLANKLINE>
     -----BEGIN PGP MESSAGE-----
-    MESSAGE STRIPPED (depends on current time)
+    ...
     -----END PGP MESSAGE-----
     <BLANKLINE>
     --boundsep--
-    >>> signedAndEncrypted = m.signAndEncrypt()
+    >>> signedAndEncrypted = m.signAndEncrypt(header)
     >>> signedAndEncrypted.set_boundary('boundsep')
-    >>> print m.stripPGP(flatten(signedAndEncrypted)).replace('\\t', ' '*4)
+    >>> print flatten(signedAndEncrypted).replace('\\t', ' '*4) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
     Content-Type: multipart/encrypted;
         protocol="application/pgp-encrypted";
         micalg="pgp-sha1"; boundary="boundsep"
     MIME-Version: 1.0
-    From: me@big.edu
-    To: you@big.edu
-    Subject: testing
     Content-Disposition: inline
     <BLANKLINE>
     --boundsep
@@ -329,28 +348,19 @@ class Mail (object):
     Content-Type: application/octet-stream; charset="us-ascii"
     <BLANKLINE>
     -----BEGIN PGP MESSAGE-----
-    MESSAGE STRIPPED (depends on current time)
+    ...
     -----END PGP MESSAGE-----
     <BLANKLINE>
     --boundsep--
     """
-    def __init__(self, header, body):
-        self.header = header.strip()
+    def __init__(self, body):
         self.body = body
-        if type(self.header) == types.UnicodeType:
-            self.header = self.header.encode("ascii")
-        p = Parser()
-        self.headermsg = p.parsestr(self.header, headersonly=True)
-    def sourceEmail(self):
-        return source_email(self.headermsg)
-    def targetEmails(self):
-        return target_emails(self.headermsg)
     def encodedMIMEText(self, body, encoding=None):
         if encoding == None:
             if type(body) == types.StringType:
-                encoding = "US-ASCII"
+                encoding = "us-ascii"
             elif type(body) == types.UnicodeType:
-                for encoding in ["US-ASCII", "ISO-8859-1", "UTF-8"]:
+                for encoding in ["us-ascii", "iso-8859-1", "utf-8"]:
                     try:
                         body.encode(encoding)
                     except UnicodeError:
@@ -359,7 +369,7 @@ class Mail (object):
                         break
                 assert encoding != None
         # Create the message ('plain' stands for Content-Type: text/plain)
-        if encoding == "US-ASCII":
+        if encoding == "us-ascii":
             return MIMEText(body)
         else:
             return MIMEText(body.encode(encoding), 'plain', encoding)
@@ -377,11 +387,8 @@ class Mail (object):
         """
         text/plain
         """
-        msg = self.encodedMIMEText(self.body)
-        for k,v in self.headermsg.items():
-            msg[k] = v
-        return msg
-    def sign(self, passphrase=None):
+        return self.encodedMIMEText(self.body)
+    def sign(self, header, passphrase=None):
         """
         multipart/signed
           +-> text/plain                 (body)
@@ -395,7 +402,7 @@ class Mail (object):
 
         args = replace(pgp_sign_command, 'f', bfile.name)
         if PGP_SIGN_AS == None:
-            pgp_sign_as = '<%s>' % self.sourceEmail()
+            pgp_sign_as = '<%s>' % source_email(header)
         else:
             pgp_sign_as = PGP_SIGN_AS
         args = replace(args, 'a', pgp_sign_as)
@@ -403,19 +410,20 @@ class Mail (object):
         status,output,error = execute(args, stdin=passphrase)
         signature = output
 
-        sig = MIMEApplication(_data=signature, _subtype='pgp-signature; name="signature.asc"', _encoder=encode_7or8bit)
+        sig = MIMEApplication(_data=signature,
+                              _subtype='pgp-signature; name="signature.asc"',
+                              _encoder=encode_7or8bit)
         sig['Content-Description'] = 'signature'
         sig.set_charset('us-ascii')
 
-        msg = MIMEMultipart('signed', micalg='pgp-sha1', protocol='application/pgp-signature')
+        msg = MIMEMultipart('signed', micalg='pgp-sha1',
+                            protocol='application/pgp-signature')
         msg.attach(body)
         msg.attach(sig)
 
-        for k,v in self.headermsg.items():
-            msg[k] = v
         msg['Content-Disposition'] = 'inline'
         return msg
-    def encrypt(self, passphrase=None):
+    def encrypt(self, header, passphrase=None):
         """
         multipart/encrypted
          +-> application/pgp-encrypted  (control information)
@@ -426,48 +434,53 @@ class Mail (object):
         bfile.write(flatten(body))
         bfile.flush()
 
-        recipient_string = ' '.join([replace(pgp_recipient_arg, 'r', recipient) for recipient in self.targetEmails()])
+        recipients = [replace(pgp_recipient_arg, 'r', recipient)
+                      for recipient in target_emails(header)]
+        recipient_string = ' '.join(recipients)
         args = replace(pgp_encrypt_only_command, 'R', recipient_string)
         args = replace(args, 'f', bfile.name)
         if PGP_SIGN_AS == None:
-            pgp_sign_as = '<%s>' % self.sourceEmail()
+            pgp_sign_as = '<%s>' % source_email(header)
         else:
             pgp_sign_as = PGP_SIGN_AS
         args = replace(args, 'a', pgp_sign_as)
         status,output,error = execute(args)
         encrypted = output
 
-        enc = MIMEApplication(_data=encrypted, _subtype='octet-stream', _encoder=encode_7or8bit)
+        enc = MIMEApplication(_data=encrypted, _subtype='octet-stream',
+                              _encoder=encode_7or8bit)
         enc.set_charset('us-ascii')
 
-        control = MIMEApplication(_data='Version: 1\n', _subtype='pgp-encrypted', _encoder=encode_7or8bit)
+        control = MIMEApplication(_data='Version: 1\n', _subtype='pgp-encrypted',
+                                  _encoder=encode_7or8bit)
 
-        msg = MIMEMultipart('encrypted', micalg='pgp-sha1', protocol='application/pgp-encrypted')
+        msg = MIMEMultipart('encrypted', micalg='pgp-sha1',
+                            protocol='application/pgp-encrypted')
         msg.attach(control)
         msg.attach(enc)
 
-        for k,v in self.headermsg.items():
-            msg[k] = v
         msg['Content-Disposition'] = 'inline'
         return msg
-    def signAndEncrypt(self, passphrase=None):
+    def signAndEncrypt(self, header, passphrase=None):
         """
         multipart/encrypted
          +-> application/pgp-encrypted  (control information)
          +-> application/octet-stream   (body)
         """
         passphrase,pass_arg = self.passphrase_arg(passphrase)
-        body = self.sign()
+        body = self.sign(header, passphrase)
         body.__delitem__('Bcc')
         bfile = tempfile.NamedTemporaryFile()
         bfile.write(flatten(body))
         bfile.flush()
 
-        recipient_string = ' '.join([replace(pgp_recipient_arg, 'r', recipient) for recipient in self.targetEmails()])
+        recipients = [replace(pgp_recipient_arg, 'r', recipient)
+                      for recipient in target_emails(header)]
+        recipient_string = ' '.join(recipients)
         args = replace(pgp_encrypt_only_command, 'R', recipient_string)
         args = replace(args, 'f', bfile.name)
         if PGP_SIGN_AS == None:
-            pgp_sign_as = '<%s>' % self.sourceEmail()
+            pgp_sign_as = '<%s>' % source_email(header)
         else:
             pgp_sign_as = PGP_SIGN_AS
         args = replace(args, 'a', pgp_sign_as)
@@ -475,44 +488,21 @@ class Mail (object):
         status,output,error = execute(args, stdin=passphrase)
         encrypted = output
 
-        enc = MIMEApplication(_data=encrypted, _subtype='octet-stream', _encoder=encode_7or8bit)
+        enc = MIMEApplication(_data=encrypted, _subtype='octet-stream',
+                              _encoder=encode_7or8bit)
         enc.set_charset('us-ascii')
 
-        control = MIMEApplication(_data='Version: 1\n', _subtype='pgp-encrypted', _encoder=encode_7or8bit)
+        control = MIMEApplication(_data='Version: 1\n',
+                                  _subtype='pgp-encrypted',
+                                  _encoder=encode_7or8bit)
 
-        msg = MIMEMultipart('encrypted', micalg='pgp-sha1', protocol='application/pgp-encrypted')
+        msg = MIMEMultipart('encrypted', micalg='pgp-sha1',
+                            protocol='application/pgp-encrypted')
         msg.attach(control)
         msg.attach(enc)
 
-        for k,v in self.headermsg.items():
-            msg[k] = v
         msg['Content-Disposition'] = 'inline'
         return msg
-    def stripChanging(self, text, start, stop, replacement):
-        stripping = False
-        lines = []
-        for line in text.splitlines():
-            line.strip()
-            if stripping == False:
-                lines.append(line)
-                if line == start:
-                    stripping = True
-                    lines.append(replacement)
-            else:
-                if line == stop:
-                    stripping = False
-                    lines.append(line)
-        return '\n'.join(lines)
-    def stripSig(self, text):
-        return self.stripChanging(text,
-                                  '-----BEGIN PGP SIGNATURE-----',
-                                  '-----END PGP SIGNATURE-----',
-                                  'SIGNATURE STRIPPED (depends on current time)')
-    def stripPGP(self, text):
-        return self.stripChanging(text,
-                                  '-----BEGIN PGP MESSAGE-----',
-                                  '-----END PGP MESSAGE-----',
-                                  'MESSAGE STRIPPED (depends on current time)')
 
 def test():
     import doctest
@@ -578,6 +568,7 @@ if __name__ == '__main__':
             header = file(options.header_filename, 'r').read()
     if header == None:
         raise Exception, "missing header"
+    headermsg = header_from_text(header)
     body = None
     if options.body_filename != None:
         if options.body_filename == '-':
@@ -589,18 +580,19 @@ if __name__ == '__main__':
     if body == None:
         raise Exception, "missing body"
 
-    m = Mail(header, body)
+    m = EncryptedMessageFactory(body)
     if options.mode == "sign":
-        message = m.sign()
+        bodymsg = m.sign(header)
     elif options.mode == "encrypt":
-        message = m.encrypt()
+        bodymsg = m.encrypt(header)
     elif options.mode == "sign-encrypt":
-        message = m.signAndEncrypt()
+        bodymsg = m.signAndEncrypt(header)
     elif options.mode == "plain":
-        message = m.plain()
+        bodymsg = m.plain()
     else:
         print "Unrecognized mode '%s'" % options.mode
 
+    message = attach_root(headermsg, bodymsg)
     if options.output == True:
         message = flatten(message)
         print message