-# Copyright (C) 2012 W. Trevor King <wking@drexel.edu>
+# Copyright (C) 2012 W. Trevor King <wking@tremily.us>
#
# This file is part of pygrader.
#
from email import message_from_file as _message_from_file
from email.header import decode_header as _decode_header
from email.mime.text import MIMEText as _MIMEText
+from email.utils import parseaddr as _parseaddr
import mailbox as _mailbox
import re as _re
import sys as _sys
import pgp_mime as _pgp_mime
-from lxml import etree as _etree
+import pgp_mime.key as _pgp_mime_key
from . import LOG as _LOG
from .email import construct_email as _construct_email
from .extract_mime import message_time as _message_time
from .model.person import Person as _Person
+from .handler import InsecureMessage as _InsecureMessage
+from .handler import InvalidAssignmentSubject as _InvalidAssignmentSubject
from .handler import InvalidMessage as _InvalidMessage
+from .handler import InvalidStudentSubject as _InvalidStudentSubject
from .handler import InvalidSubjectMessage as _InvalidSubjectMessage
+from .handler import PermissionViolationMessage as _PermissionViolationMessage
from .handler import Response as _Response
from .handler import UnsignedMessage as _UnsignedMessage
-from .handler import InsecureMessage as _InsecureMessage
-from .handler.get import InvalidStudent as _InvalidStudent
from .handler.get import run as _handle_get
-from .handler.submission import InvalidAssignment as _InvalidAssignment
+from .handler.grade import run as _handle_grade
+from .handler.grade import MissingGradeMessage as _MissingGradeMessage
from .handler.submission import run as _handle_submission
+from .handler.submission import InvalidSubmission as _InvalidSubmission
_TAG_REGEXP = _re.compile('^.*\[([^]]*)\].*$')
class WrongSignatureMessage (_InsecureMessage):
- def __init__(self, pgp_key=None, fingerprints=None, decrypted=None,
- **kwargs):
+ def __init__(self, pgp_key=None, signatures=None, fingerprints=None,
+ decrypted=None, **kwargs):
if 'error' not in kwargs:
kwargs['error'] = 'not signed by the expected key'
super(WrongSignatureMessage, self).__init__(**kwargs)
self.pgp_key = pgp_key
+ self.signatures = signatures
self.fingerprints = fingerprints
self.decrypted = decrypted
+
class UnverifiedSignatureMessage (_InsecureMessage):
def __init__(self, signature=None, decrypted=None, **kwargs):
if 'error' not in kwargs:
trust_email_infrastructure=False,
handlers={
'get': _handle_get,
+ 'grade': _handle_grade,
'submit': _handle_submission,
}, respond=None, dry_run=False, **kwargs):
"""Run from procmail to sort incomming submissions
>>> message['Return-Path'] = '<invalid.return.path@home.net>'
>>> process(message) # doctest: +REPORT_UDIFF, +ELLIPSIS
respond with:
- Content-Type: multipart/signed; protocol="application/pgp-signature"; micalg="pgp-sha1"; boundary="===============...=="
+ Content-Type: multipart/signed; ...protocol="application/pgp-signature"; ...boundary="===============...=="
MIME-Version: 1.0
Content-Disposition: inline
Date: ...
>>> message['Return-Path'] = '<bb@greyhavens.net>'
>>> process(message) # doctest: +REPORT_UDIFF, +ELLIPSIS
respond with:
+ Content-Type: multipart/signed; ...protocol="application/pgp-signature"; ...boundary="===============...=="
+ MIME-Version: 1.0
+ Content-Disposition: inline
+ Date: ...
+ From: Robot101 <phys101@tower.edu>
+ Reply-to: Robot101 <phys101@tower.edu>
+ To: Bilbo Baggins <bb@shire.org>
+ Subject: Received Assignment 1 submission
+ <BLANKLINE>
+ --===============...==
Content-Type: text/plain; charset="us-ascii"
MIME-Version: 1.0
Content-Disposition: inline
Yours,
phys-101 robot
<BLANKLINE>
+ --===============...==
+ MIME-Version: 1.0
+ Content-Transfer-Encoding: 7bit
+ Content-Description: OpenPGP digital signature
+ Content-Type: application/pgp-signature; name="signature.asc"; charset="us-ascii"
+ <BLANKLINE>
+ -----BEGIN PGP SIGNATURE-----
+ Version: GnuPG v2.0.19 (GNU/Linux)
+ <BLANKLINE>
+ ...
+ -----END PGP SIGNATURE-----
+ <BLANKLINE>
+ --===============...==--
>>> course.print_tree() # doctest: +REPORT_UDIFF, +ELLIPSIS
Bilbo_Baggins
... 'for <wking@tremily.us>; Mon, 09 Oct 2011 11:50:46 -0400 (EDT)')
>>> process(message) # doctest: +REPORT_UDIFF, +ELLIPSIS
respond with:
+ Content-Type: multipart/signed; ...protocol="application/pgp-signature"; ...boundary="===============...=="
+ MIME-Version: 1.0
+ Content-Disposition: inline
+ Date: ...
+ From: Robot101 <phys101@tower.edu>
+ Reply-to: Robot101 <phys101@tower.edu>
+ To: Bilbo Baggins <bb@shire.org>
+ Subject: Received Assignment 1 submission
+ <BLANKLINE>
+ --===============...==
Content-Type: text/plain; charset="us-ascii"
MIME-Version: 1.0
Content-Disposition: inline
Yours,
phys-101 robot
<BLANKLINE>
+ --===============...==
+ MIME-Version: 1.0
+ Content-Transfer-Encoding: 7bit
+ Content-Description: OpenPGP digital signature
+ Content-Type: application/pgp-signature; name="signature.asc"; charset="us-ascii"
+ <BLANKLINE>
+ -----BEGIN PGP SIGNATURE-----
+ Version: GnuPG v2.0.19 (GNU/Linux)
+ <BLANKLINE>
+ ...
+ -----END PGP SIGNATURE-----
+ <BLANKLINE>
+ --===============...==--
+
>>> course.print_tree() # doctest: +REPORT_UDIFF, +ELLIPSIS
Bilbo_Baggins
Bilbo_Baggins/Assignment_1
>>> message['Message-ID'] = '<hgi.jlk@home.net>'
>>> process(message) # doctest: +REPORT_UDIFF, +ELLIPSIS
respond with:
+ Content-Type: multipart/signed; ...protocol="application/pgp-signature"; ...boundary="===============...=="
+ MIME-Version: 1.0
+ Content-Disposition: inline
+ Date: ...
+ From: Robot101 <phys101@tower.edu>
+ Reply-to: Robot101 <phys101@tower.edu>
+ To: Bilbo Baggins <bb@shire.org>
+ Subject: Received Assignment 1 submission
+ <BLANKLINE>
+ --===============...==
Content-Type: text/plain; charset="us-ascii"
MIME-Version: 1.0
Content-Disposition: inline
Yours,
phys-101 robot
<BLANKLINE>
+ --===============...==
+ MIME-Version: 1.0
+ Content-Transfer-Encoding: 7bit
+ Content-Description: OpenPGP digital signature
+ Content-Type: application/pgp-signature; name="signature.asc"; charset="us-ascii"
+ <BLANKLINE>
+ -----BEGIN PGP SIGNATURE-----
+ Version: GnuPG v2.0.19 (GNU/Linux)
+ <BLANKLINE>
+ ...
+ -----END PGP SIGNATURE-----
+ <BLANKLINE>
+ --===============...==--
Response to a submission on an unsubmittable assignment:
>>> message['Subject'] = '[submit] attendance 1'
>>> process(message) # doctest: +REPORT_UDIFF, +ELLIPSIS
respond with:
- Content-Type: multipart/signed; protocol="application/pgp-signature"; micalg="pgp-sha1"; boundary="===============...=="
+ Content-Type: multipart/signed; ...protocol="application/pgp-signature"; ...boundary="===============...=="
MIME-Version: 1.0
Content-Disposition: inline
Date: ...
>>> message['Subject'] = 'need help for the first homework'
>>> process(message) # doctest: +REPORT_UDIFF, +ELLIPSIS
respond with:
- Content-Type: multipart/signed; protocol="application/pgp-signature"; micalg="pgp-sha1"; boundary="===============...=="
+ Content-Type: multipart/signed; ...protocol="application/pgp-signature"; ...boundary="===============...=="
MIME-Version: 1.0
Content-Disposition: inline
Date: ...
>>> del message['Subject']
>>> process(message) # doctest: +REPORT_UDIFF, +ELLIPSIS
respond with:
- Content-Type: multipart/signed; protocol="application/pgp-signature"; micalg="pgp-sha1"; boundary="===============...=="
+ Content-Type: multipart/signed; ...protocol="application/pgp-signature"; ...boundary="===============...=="
MIME-Version: 1.0
Content-Disposition: inline
Date: ...
>>> del message['Subject']
>>> process(message) # doctest: +REPORT_UDIFF, +ELLIPSIS
respond with:
- Content-Type: multipart/encrypted; protocol="application/pgp-encrypted"; micalg="pgp-sha1"; boundary="===============...=="
+ Content-Type: multipart/encrypted; ...protocol="application/pgp-encrypted"; ...boundary="===============...=="
MIME-Version: 1.0
Content-Disposition: inline
Date: ...
except _InvalidMessage as error:
error.course = course
error.message = original
- if person is not None and not hasattr(error, 'person'):
- error.person = person
- if subject is not None and not hasattr(error, 'subject'):
- error.subject = subject
- if target is not None and not hasattr(error, 'target'):
- error.target = target
+ for attribute,value in [('person', person),
+ ('subject', subject),
+ ('target', target)]:
+ if (value is not None and
+ getattr(error, attribute, None) is None):
+ setattr(error, attribute, value)
_LOG.warn('invalid message {}'.format(error.message_id()))
if not continue_after_invalid_message:
raise
except _InvalidMessage as error:
error.course = course
error.message = original
- if person is not None and not hasattr(error, 'person'):
- error.person = person
- if subject is not None and not hasattr(error, 'subject'):
- error.subject = subject
- if target is not None and not hasattr(error, 'target'):
- error.target = target
+ for attribute,value in [('person', person),
+ ('subject', subject),
+ ('target', target)]:
+ if (value is not None and
+ getattr(error, attribute, None) is None):
+ setattr(error, attribute, value)
raise
return (original, message, person, subject, target)
-def _get_message_person(course, message):
- sender = message['Return-Path'] # RFC 822
+def _get_message_person(course, message, trust_admin_from=True):
+ """Get the `Person` that sent the message.
+
+ We use 'Return-Path' (envelope from) instead of the message's From
+ header, because it's more consistent and harder to fake. However,
+ there may be times when you *want* to send a message in somebody
+ elses name.
+
+ For example, if a student submitted an assignment from an
+ unexpected address, you might add that address to their entry in
+ your course config, and then bounce the message back into
+ pygrader. In this case, the From header will still be the
+ student, but the 'Return-Path' will be you. With
+ `trust_admin_from` (on by default), messages who's 'Return-Path'
+ matches a professor or TA will have their 'From' line used to find
+ the final person responsible for the message.
+
+ >>> from pygrader.model.course import Course
+ >>> from pygrader.model.person import Person
+ >>> from pgp_mime import encodedMIMEText
+
+ >>> course = Course(people=[
+ ... Person(
+ ... name='Gandalf', emails=['g@grey.edu'], groups=['professors']),
+ ... Person(name='Bilbo', emails=['bb@shire.org']),
+ ... ])
+ >>> message = encodedMIMEText('testing')
+ >>> message['Return-Path'] = '<g@grey.edu>'
+ >>> message['From'] = 'Bill <bb@shire.org>'
+ >>> message['Message-ID'] = '<123.456@home.net>'
+
+ >>> person = _get_message_person(course=course, message=message)
+ >>> print(person)
+ <Person Bilbo>
+
+ >>> person = _get_message_person(
+ ... course=course, message=message, trust_admin_from=False)
+ >>> print(person)
+ <Person Gandalf>
+ """
+ sender = message['return-path'] # RFC 822
if sender is None:
raise NoReturnPath(message)
sender = sender[1:-1] # strip wrapping '<' and '>'
raise UnregisteredAddress(message=message, address=sender)
if len(people) > 1:
raise AmbiguousAddress(message=message, address=sender, people=people)
- return people[0]
+ person = people[0]
+ if trust_admin_from and person.is_admin():
+ mid = message['message-id']
+ from_headers = message.get_all('from')
+ if len(from_headers) == 0:
+ _LOG.debug("no 'From' headers in {}".format(mid))
+ elif len(from_headers) > 1:
+ _LOG.debug("multiple 'From' headers in {}".format(mid))
+ else:
+ name,address = _parseaddr(from_headers[0])
+ people = list(course.find_people(email=address))
+ if len(people) == 0:
+ _LOG.debug("'From' address {} is unregistered".format(address))
+ if len(people) > 1:
+ _LOG.debug("'From' address {} is ambiguous".format(address))
+ _LOG.debug('message from {} treated as being from {}'.format(
+ person, people[0]))
+ person = people[0]
+ _LOG.debug('message from {}'.format(person))
+ return person
def _get_message_subject(message):
"""
>>> subject.append('-ascii part', 'ascii')
>>> message['Subject'] = subject.encode()
>>> _get_message_subject(message=message)
- 'unicode part-ascii part'
+ 'unicode part -ascii part'
>>> del message['Subject']
>>> message['Subject'] = 'clean subject'
>>> _get_message_subject(message=message)
"""
mid = message['message-id']
try:
- decrypted,verified,result = _pgp_mime.verify(message=message)
+ decrypted,verified,signatures = _pgp_mime.verify(message=message)
except (ValueError, AssertionError) as error:
raise _UnsignedMessage(message=message) from error
- _LOG.debug(str(result, 'utf-8'))
- tree = _etree.fromstring(result.replace(b'\x00', b''))
+ for signature in signatures:
+ _LOG.debug(signature.dumps())
match = None
- fingerprints = []
- for signature in tree.findall('.//signature'):
- for fingerprint in signature.iterchildren('fpr'):
- fingerprints.append(fingerprint)
- matches = [f for f in fingerprints if f.text.endswith(pgp_key)]
+ fingerprints = dict((s.fingerprint, s) for s in signatures)
+ for s in signatures:
+ for key in _pgp_mime_key.lookup_keys([s.fingerprint]):
+ if key.subkeys[0].fingerprint != s.fingerprint:
+ # the signature was made with a subkey. Add the primary.
+ fingerprints[key.subkeys[0].fingerprint] = s
+ if pgp_key.startswith('0x'):
+ key_tail = pgp_key[len('0x'):]
+ else:
+ key_tail = pgp_key
+ matches = [fingerprints[f] for f in fingerprints.keys()
+ if f.endswith(key_tail)]
if len(matches) == 0:
raise WrongSignatureMessage(
- message=message, pgp_key=pgp_key, fingerprints=fingerprints,
- decrypted=decrypted)
- match = matches[0]
+ message=message, pgp_key=pgp_key, signatures=signatures,
+ fingerprints=fingerprints, decrypted=decrypted)
+ signature = matches[0]
if not verified:
- sumhex = list(signature.iterchildren('summary'))[0].get('value')
- summary = int(sumhex, 16)
- if summary != 0:
+ problems = [k for k,v in signature.summary.items() if v]
+ for good in ['green', 'valid']:
+ if good in problems:
+ problems.remove(good)
+ if problems:
raise UnverifiedSignatureMessage(
message=message, signature=signature, decrypted=decrypted)
# otherwise, we may have an untrusted key. We'll count that
author = error.course.robot
target = getattr(error, 'person', None)
subject = str(error)
- if isinstance(error, InvalidHandlerMessage):
+ if isinstance(error, _InvalidSubmission):
+ subject = 'Received invalid {} submission'.format(
+ error.assignment.name)
+ text = (
+ 'We received your submission for {}, but you are not\n'
+ 'allowed to submit that assignment via email.'
+ ).format(error.assignment.name)
+ elif isinstance(error, _MissingGradeMessage):
+ subject = 'No grade in {!r}'.format(error.subject)
+ text = (
+ 'Your grade submission did not include a text/plain\n'
+ 'part containing the new grade and comment.'
+ )
+ elif isinstance(error, InvalidHandlerMessage):
targets = sorted(error.handlers.keys())
if not targets:
hint = (
'Perhaps you meant to use one of the following:\n'
' {}').format('\n '.join(targets))
text = (
- 'We got an email from you with the following subject:\n'
+ 'We received an email from you with the following subject:\n'
' {!r}\n'
'which does not match any submittable handler name for\n'
'{}.\n'
- '{}').format(repr(error.subject), error.course.name, hint)
+ '{}').format(error.subject, error.course.name, hint)
elif isinstance(error, SubjectlessMessage):
subject = 'no subject in {}'.format(error.message['Message-ID'])
text = 'We received an email message from you without a subject.'
'or TA.').format(error.course.name)
elif isinstance(error, NoReturnPath):
return
+ elif isinstance(error, _InvalidAssignmentSubject):
+ if error.assignments:
+ hint = (
+ 'but it matches several assignments:\n'
+ ' * {}').format('\n * '.join(
+ a.name for a in error.assignments))
+ else:
+ # prefer a submittable example assignment
+ assignments = [
+ a for a in error.course.assignments if a.submittable]
+ assignments += error.course.assignments # but fall back to any one
+ hint = (
+ 'Remember to use the full name for the assignment in the\n'
+ 'subject. For example:\n'
+ ' {} submission').format(assignments[0].name)
+ text = (
+ 'We received an email from you with the following subject:\n'
+ ' {!r}\n{}').format(error.subject, hint)
+ elif isinstance(error, _InvalidStudentSubject):
+ text = (
+ 'We received an email from you with the following subject:\n'
+ ' {!r}\n'
+ 'but it matches several students:\n'
+ ' * {}').format(
+ error.subject, '\n * '.join(s.name for s in error.students))
elif isinstance(error, _InvalidSubjectMessage):
text = (
'We received an email message from you with an invalid\n'
elif isinstance(error, _UnsignedMessage):
subject = 'unsigned message {}'.format(error.message['Message-ID'])
text = (
- 'We received an email message from you without a valid\n'
- 'PGP signature.')
- elif isinstance(error, _InvalidAssignment):
+ 'We received an email message from you without a PGP\n'
+ 'signature.'
+ )
+ elif isinstance(error, WrongSignatureMessage):
+ lines = [
+ 'We received an email message from you without a valid',
+ 'PGP signature. We were expecting a signature by',
+ '{}, but got signatures by:'.format(error.person.pgp_key),
+ ]
+ lines.extend([' {}'.format(s.fingerprint) for s in error.signatures])
+ text = '\n'.join(lines)
+ elif isinstance(error, UnverifiedSignatureMessage):
text = (
- 'We received your submission for {}, but you are not\n'
- 'allowed to submit that assignment via email.'
- ).format(error.assignment.name)
- elif isinstance(error, _InvalidStudent):
+ 'We received an email message from you with an unverified\n'
+ 'signature:\n\n'
+ '{}\n\n'
+ 'If this is the key you intended to use, contact your\n'
+ 'professor or TA.'
+ ).format(error.signature.dumps(prefix=' '))
+ elif isinstance(error, _PermissionViolationMessage):
text = (
- 'We got an email from you with the following subject:\n'
+ 'We received an email from you with the following subject:\n'
' {!r}\n'
- 'but it matches several students:\n'
+ "but you can't do that unless you belong to one of the\n"
+ 'following groups:\n'
' * {}').format(
- error.subject, '\n * '.join(s.name for s in error.students))
+ error.subject, '\n * '.join(error.allowed_groups))
elif isinstance(error, _InvalidMessage):
- text = subject
+ text = (
+ 'We received an email from you with the following subject:\n'
+ ' {!r}\n'
+ 'but the message was invalid:\n'
+ ' {}').format(error.subject, error)
else:
raise NotImplementedError((type(error), error))
if target is None: