from .handler import InvalidSubjectMessage as _InvalidSubjectMessage
from .handler import Response as _Response
from .handler import UnsignedMessage as _UnsignedMessage
+from .handler import InsecureMessage as _InsecureMessage
from .handler.get import InvalidStudent as _InvalidStudent
from .handler.get import run as _handle_get
from .handler.submission import InvalidAssignment as _InvalidAssignment
self.people = people
+class WrongSignatureMessage (_InsecureMessage):
+ def __init__(self, pgp_key=None, fingerprints=None, decrypted=None,
+ **kwargs):
+ if 'error' not in kwargs:
+ kwargs['error'] = 'not signed by the expected key'
+ super(WrongSignatureMessage, self).__init__(**kwargs)
+ self.pgp_key = pgp_key
+ self.fingerprints = fingerprints
+ self.decrypted = decrypted
+
+class UnverifiedSignatureMessage (_InsecureMessage):
+ def __init__(self, signature=None, decrypted=None, **kwargs):
+ if 'error' not in kwargs:
+ kwargs['error'] = 'unverified signature'
+ super(UnverifiedSignatureMessage, self).__init__(**kwargs)
+ self.signature = signature
+ self.decrypted = decrypted
+
+
class SubjectlessMessage (_InvalidSubjectMessage):
def __init__(self, **kwargs):
if 'error' not in kwargs:
course=course, stream=stream, mailbox=mailbox, input_=input_,
output=output, dry_run=dry_run,
continue_after_invalid_message=continue_after_invalid_message,
+ trust_email_infrastructure=trust_email_infrastructure,
respond=respond):
try:
handler = _get_handler(handlers=handlers, target=target)
+ _LOG.debug('handling {}'.format(target))
handler(
basedir=basedir, course=course, message=message,
person=person, subject=subject,
def _load_messages(course, stream, mailbox=None, input_=None, output=None,
- continue_after_invalid_message=False, respond=None,
+ continue_after_invalid_message=False,
+ trust_email_infrastructure=False, respond=None,
dry_run=False):
if mailbox is None:
_LOG.debug('loading message from {}'.format(stream))
messages = sorted(messages, key=_get_message_time)
for key,msg in messages:
try:
- ret = _parse_message(course=course, message=msg)
+ ret = _parse_message(
+ course=course, message=msg,
+ trust_email_infrastructure=trust_email_infrastructure)
except _InvalidMessage as error:
error.message = msg
_LOG.warn('invalid message {}'.format(error.message_id()))
del mbox[key]
yield ret
-def _parse_message(course, message):
+def _parse_message(course, message, trust_email_infrastructure=False):
"""Parse an incoming email and respond if neccessary.
Return ``(msg, person, assignment, time)`` on successful parsing.
try:
person = _get_message_person(course=course, message=message)
if person.pgp_key:
- message = _get_decoded_message(
- course=course, message=message, person=person)
+ _LOG.debug('verify message is from {}'.format(person))
+ try:
+ message = _get_verified_message(message, person.pgp_key)
+ except _UnsignedMessage as error:
+ if trust_email_infrastructure:
+ _LOG.warn('{}'.format(error))
+ else:
+ raise
subject = _get_message_subject(message=message)
target = _get_message_target(subject=subject)
except _InvalidMessage as error:
raise AmbiguousAddress(message=message, address=sender, people=people)
return people[0]
-def _get_decoded_message(course, message, person):
- msg = _get_verified_message(message, person.pgp_key)
- if msg is None:
- raise _UnsignedMessage(message=message)
- return msg
-
def _get_message_subject(message):
"""
>>> from email.header import Header
>>> our_message.authenticated
True
- If it is signed, but not by the right key, we get ``None``.
+ If it is signed, but not by the right key, we get an error.
>>> print(_get_verified_message(signed, pgp_key='01234567'))
- None
+ Traceback (most recent call last):
+ ...
+ pygrader.mailpipe.WrongSignatureMessage: not signed by the expected key
- If it is not signed at all, we get ``None``.
+ If it is not signed at all, we get another error.
>>> print(_get_verified_message(message, pgp_key='4332B6E3'))
- None
+ Traceback (most recent call last):
+ ...
+ pygrader.handler.UnsignedMessage: unsigned message
"""
mid = message['message-id']
try:
decrypted,verified,result = _pgp_mime.verify(message=message)
- except (ValueError, AssertionError):
- _LOG.warning('could not verify {} (not signed?)'.format(mid))
- return None
+ except (ValueError, AssertionError) as error:
+ raise _UnsignedMessage(message=message) from error
_LOG.debug(str(result, 'utf-8'))
tree = _etree.fromstring(result.replace(b'\x00', b''))
match = None
+ fingerprints = []
for signature in tree.findall('.//signature'):
for fingerprint in signature.iterchildren('fpr'):
- if fingerprint.text.endswith(pgp_key):
- match = signature
- break
- if match is None:
- _LOG.warning('{} is not signed by the expected key'.format(mid))
- return None
+ fingerprints.append(fingerprint)
+ matches = [f for f in fingerprints if f.text.endswith(pgp_key)]
+ if len(matches) == 0:
+ raise WrongSignatureMessage(
+ message=message, pgp_key=pgp_key, fingerprints=fingerprints,
+ decrypted=decrypted)
+ match = matches[0]
if not verified:
sumhex = list(signature.iterchildren('summary'))[0].get('value')
summary = int(sumhex, 16)
if summary != 0:
- _LOG.warning('{} has an unverified signature'.format(mid))
- return None
+ raise UnverifiedSignatureMessage(
+ message=message, signature=signature, decrypted=decrypted)
# otherwise, we may have an untrusted key. We'll count that
# as verified here, because the caller is explicity looking
# for signatures by this fingerprint.