from lxml import etree as _etree
from . import LOG as _LOG
-from .color import color_string as _color_string
-from .color import standard_colors as _standard_colors
from .email import construct_email as _construct_email
from .email import construct_response as _construct_response
from .model.person import Person as _Person
class InvalidHandlerMessage (_InvalidSubjectMessage):
def __init__(self, target=None, handlers=None, **kwargs):
if 'error' not in kwargs:
- kwargs['error'] = 'no handler for {!r}'.format(
- kwargs.get('target', None))
+ kwargs['error'] = 'no handler for {!r}'.format(target)
super(InvalidHandlerMessage, self).__init__(**kwargs)
self.target = target
self.handlers = handlers
handlers={
'get': _handle_get,
'submit': _handle_submission,
- }, respond=None, use_color=None, dry_run=False, **kwargs):
+ }, respond=None, dry_run=False, **kwargs):
"""Run from procmail to sort incomming submissions
For example, you can setup your ``.procmailrc`` like this::
>>> course.cleanup()
"""
- highlight,lowlight,good,bad = _standard_colors(use_color=use_color)
if stream is None:
stream = _sys.stdin
for original,message,person,subject,target in _load_messages(
course=course, stream=stream, mailbox=mailbox, input_=input_,
- output=output, use_color=use_color, dry_run=dry_run,
+ output=output, dry_run=dry_run,
continue_after_invalid_message=continue_after_invalid_message,
respond=respond):
try:
handler(
basedir=basedir, course=course, message=message,
person=person, subject=subject,
- max_late=max_late, use_color=use_color, dry_run=dry_run)
+ max_late=max_late, dry_run=dry_run)
except _InvalidMessage as error:
+ error.course = course
+ error.message = original
+ if person is not None and not hasattr(error, 'person'):
+ error.person = person
+ if subject is not None and not hasattr(error, 'subject'):
+ error.subject = subject
+ if target is not None and not hasattr(error, 'target'):
+ error.target = target
+ _LOG.warn('invalid message {}'.format(error.message_id()))
if not continue_after_invalid_message:
raise
if respond:
- error.course = course
- error.message = original
- if person is not None and not hasattr(error, 'person'):
- error.person = person
- if subject is not None and not hasattr(error, 'subject'):
- error.subject = subject
- if target is not None and not hasattr(error, 'target'):
- error.target = target
response = _get_error_response(error)
respond(response)
except _Response as response:
def _load_messages(course, stream, mailbox=None, input_=None, output=None,
continue_after_invalid_message=False, respond=None,
- use_color=None, dry_run=False):
+ dry_run=False):
if mailbox is None:
mbox = None
messages = [(None,_message_from_file(stream))]
raise ValueError(mailbox)
for key,msg in messages:
try:
- ret = _parse_message(
- course=course, message=msg, use_color=use_color)
+ ret = _parse_message(course=course, message=msg)
except _InvalidMessage as error:
+ error.message = msg
+ _LOG.warn('invalid message {}'.format(error.message_id()))
if not continue_after_invalid_message:
raise
if respond:
del mbox[key]
yield ret
-def _parse_message(course, message, use_color=None):
+def _parse_message(course, message):
"""Parse an incoming email and respond if neccessary.
Return ``(msg, person, assignment, time)`` on successful parsing.
original = message
person = subject = target = None
try:
- person = _get_message_person(
- course=course, message=message, use_color=use_color)
+ person = _get_message_person(course=course, message=message)
if person.pgp_key:
message = _get_decoded_message(
- course=course, message=message, person=person,
- use_color=use_color)
- subject = _get_message_subject(message=message, use_color=use_color)
- target = _get_message_target(subject=subject, use_color=use_color)
+ course=course, message=message, person=person)
+ subject = _get_message_subject(message=message)
+ target = _get_message_target(subject=subject)
except _InvalidMessage as error:
error.course = course
error.message = original
raise
return (original, message, person, subject, target)
-def _get_message_person(course, message, use_color=None):
+def _get_message_person(course, message):
sender = message['Return-Path'] # RFC 822
if sender is None:
raise NoReturnPath(message)
raise AmbiguousAddress(message=message, address=sender, people=people)
return people[0]
-def _get_decoded_message(course, message, person, use_color=None):
- msg = _get_verified_message(
- message, person.pgp_key, use_color=use_color)
+def _get_decoded_message(course, message, person):
+ msg = _get_verified_message(message, person.pgp_key)
if msg is None:
raise _UnsignedMessage(message=message)
return msg
-def _get_message_subject(message, use_color=None):
+def _get_message_subject(message):
"""
>>> from email.header import Header
>>> from pgp_mime.email import encodedMIMEText
_LOG.debug('decoded header {} -> {}'.format(parts[0], subject))
return subject.lower().replace('#', '')
-def _get_message_target(subject, use_color=None):
+def _get_message_target(subject):
"""
>>> _get_message_target(subject='no tag')
Traceback (most recent call last):
_LOG.debug('extracted target {} -> {}'.format(subject, target))
return target
-def _get_handler(handlers, target, use_color=None):
+def _get_handler(handlers, target):
try:
handler = handlers[target]
- except KeyError:
- response_subject = 'no handler for {}'.format(target)
- highlight,lowlight,good,bad = _standard_colors(use_color=use_color)
- _LOG.debug(_color_string(string=response_subject, color=bad))
- raise InvalidHandlerMessage(target=target, handlers=handlers)
+ except KeyError as error:
+ raise InvalidHandlerMessage(
+ target=target, handlers=handlers) from error
return handler
-def _get_verified_message(message, pgp_key, use_color=None):
+def _get_verified_message(message, pgp_key):
"""
>>> from pgp_mime import sign, encodedMIMEText
>>> print(_get_verified_message(message, pgp_key='4332B6E3'))
None
"""
- highlight,lowlight,good,bad = _standard_colors(use_color=use_color)
mid = message['message-id']
try:
decrypted,verified,result = _pgp_mime.verify(message=message)
except (ValueError, AssertionError):
- _LOG.warn(_color_string(
- string='could not verify {} (not signed?)'.format(mid),
- color=bad))
+ _LOG.warning('could not verify {} (not signed?)'.format(mid))
return None
- _LOG.info(_color_string(str(result, 'utf-8'), color=lowlight))
+ _LOG.debug(str(result, 'utf-8'))
tree = _etree.fromstring(result.replace(b'\x00', b''))
match = None
for signature in tree.findall('.//signature'):
match = signature
break
if match is None:
- _LOG.warn(_color_string(
- string='{} is not signed by the expected key'.format(mid),
- color=bad))
+ _LOG.warning('{} is not signed by the expected key'.format(mid))
return None
if not verified:
sumhex = list(signature.iterchildren('summary'))[0].get('value')
summary = int(sumhex, 16)
if summary != 0:
- _LOG.warn(_color_string(
- string='{} has an unverified signature'.format(mid),
- color=bad))
+ _LOG.warning('{} has an unverified signature'.format(mid))
return None
# otherwise, we may have an untrusted key. We'll count that
# as verified here, because the caller is explicity looking