This way we don't have to pass use_color around all over the place.
import logging as _logging
+from .color import ColoredFormatter as _ColoredFormatter
+
+
__version__ = '0.2'
ENCODING = 'utf-8'
LOG = _logging.getLogger('pygrade')
LOG.setLevel(_logging.ERROR)
LOG.addHandler(_logging.StreamHandler())
+LOG_FORMATTER = _ColoredFormatter()
+LOG.handlers[0].setFormatter(LOG_FORMATTER)
# You should have received a copy of the GNU General Public License along with
# pygrader. If not, see <http://www.gnu.org/licenses/>.
+import logging as _logging
import sys as _sys
'black', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white']
USE_COLOR = True
+GOOD_DEBUG = _logging.DEBUG + 2
+BAD_DEBUG = _logging.DEBUG + 4
def standard_colors(use_color=None):
stream = _sys.stdout
stream.write(color_string(string=string, color=color))
stream.flush()
+
+
+class ColoredFormatter (_logging.Formatter):
+ def __init__(self, *args, **kwargs):
+ super(ColoredFormatter, self).__init__(*args, **kwargs)
+ self.colored = None # `None` to use USE_COLOR; True/False to override
+
+ def format(self, record):
+ s = super(ColoredFormatter, self).format(record)
+ if self.colored or (self.colored is None and USE_COLOR):
+ highlight,lowlight,good,bad = standard_colors()
+ if record.levelno <= _logging.DEBUG:
+ color = lowlight
+ elif record.levelno <= GOOD_DEBUG:
+ color = good
+ elif record.levelno <= BAD_DEBUG:
+ color = bad
+ elif record.levelno <= _logging.INFO:
+ color = highlight
+ else:
+ color = bad
+ return color_string(string=s, color=color)
+ return s
from . import ENCODING as _ENCODING
from . import LOG as _LOG
-from .color import standard_colors as _standard_colors
-from .color import color_string as _color_string
-from .color import write_color as _write_color
from .model.person import Person as _Person
smtp.send_message(msg=msg)
test_smtp.__test__ = False # not a test for nose
-def send_emails(emails, smtp=None, use_color=None, debug_target=None,
- dry_run=False):
+def send_emails(emails, smtp=None, debug_target=None, dry_run=False):
"""Iterate through `emails` and mail them off one-by-one
>>> from email.mime.text import MIMEText
... emails.append(
... (msg,
... lambda status: stdout.write('SUCCESS: {}\\n'.format(status))))
- >>> send_emails(emails, use_color=False, dry_run=True)
+ >>> send_emails(emails, dry_run=True)
... # doctest: +REPORT_UDIFF, +NORMALIZE_WHITESPACE
- sending message to ['Moneypenny <mp@sis.gov.uk>', 'James Bond <007@sis.gov.uk>']...\tDRY-RUN
SUCCESS: None
- sending message to ['M <m@sis.gov.uk>', 'James Bond <007@sis.gov.uk>']...\tDRY-RUN
SUCCESS: None
"""
local_smtp = smtp is None
- highlight,lowlight,good,bad = _standard_colors(use_color=use_color)
for msg,callback in emails:
sources = [
_email_utils.formataddr(a) for a in _pgp_mime.email_sources(msg)]
# TODO: remove convert_content_transfer_encoding?
#if msg.get('content-transfer-encoding', None) == 'base64':
# convert_content_transfer_encoding(msg, '8bit')
- _LOG.debug(_color_string(
- '\n{}\n'.format(msg.as_string()), color=lowlight))
- _write_color('sending message to {}...'.format(targets),
- color=highlight)
+ _LOG.debug('\n{}\n'.format(msg.as_string()))
+ _LOG.info('sending message to {}...'.format(targets))
if not dry_run:
try:
if local_smtp:
if local_smtp:
smtp.quit()
except:
- _write_color('\tFAILED\n', bad)
+ _LOG.warning('failed to send message to {}'.format(targets))
if callback:
callback(False)
raise
else:
- _write_color('\tOK\n', good)
+ _LOG.info('sent message to {}'.format(targets))
if callback:
callback(True)
else:
- _write_color('\tDRY-RUN\n', good)
+ _LOG.info('dry run, so no message sent to {}'.format(targets))
if callback:
callback(None)
import time as _time
from . import LOG as _LOG
-from .color import color_string as _color_string
-from .color import standard_colors as _standard_colors
-def message_time(message, use_color=None):
+def message_time(message):
"""Get the Unix time when ``message`` was received.
>>> from email.utils import formatdate
>>> formatdate(time)
'Sun, 09 Oct 2011 15:50:46 -0000'
"""
- highlight,lowlight,good,bad = _standard_colors(use_color=use_color)
received = message['Received'] # RFC 822
if received is None:
mid = message['Message-ID']
- _LOG.debug(_color_string(
- string='no Received in {}'.format(mid), color=lowlight))
+ _LOG.debug('no Received in {}'.format(mid))
return None
date = received.split(';', 1)[1]
return _time.mktime(_email_utils.parsedate(date))
import pgp_mime as _pgp_mime
from .. import LOG as _LOG
-from ..color import color_string as _color_string
-from ..color import standard_colors as _standard_colors
from ..email import construct_text_email as _construct_text_email
from ..email import construct_email as _construct_email
from ..storage import assignment_path as _assignment_path
def run(basedir, course, message, person, subject,
- trust_email_infrastructure=False,
- use_color=None, dry_run=False, **kwargs):
+ trust_email_infrastructure=False, dry_run=False, **kwargs):
"""
>>> from pgp_mime.email import encodedMIMEText
>>> from ..model.grade import Grade
raise _UnsignedMessage()
if 'assistants' in person.groups or 'professors' in person.groups:
email = _get_admin_email(
- basedir=basedir, course=course, person=person, subject=subject,
- use_color=use_color)
+ basedir=basedir, course=course, person=person, subject=subject)
elif 'students' in person.groups:
email = _get_student_email(
- basedir=basedir, course=course, person=person,
- use_color=use_color)
+ basedir=basedir, course=course, person=person)
else:
raise NotImplementedError(
'strange groups {} for {}'.format(person.groups, person))
raise _Response(message=email)
-def _get_student_email(basedir, course, person, student=None, use_color=None):
+def _get_student_email(basedir, course, person, student=None):
if student is None:
student = person
targets = None
return email
def _get_student_submission_email(
- basedir, course, person, assignments, student, use_color=None):
+ basedir, course, person, assignments, student):
subject = '{} assignment submissions for {}'.format(
course.name, student.name)
text = '{}:\n * {}\n'.format(
author=course.robot, targets=[person], subject=subject,
message=message)
-def _get_admin_email(basedir, course, person, subject, use_color=True):
+def _get_admin_email(basedir, course, person, subject):
lsubject = subject.lower()
students = [p for p in course.find_people()
if p.name.lower() in lsubject]
if len(students) == 0:
stream = _io.StringIO()
- _tabulate(course=course, statistics=True, stream=stream)
+ _tabulate(
+ course=course, statistics=True, stream=stream, use_color=False)
text = stream.getvalue()
email = _construct_text_email(
author=course.robot, targets=[person],
else:
email = _get_student_submission_email(
basedir=basedir, course=course, person=person, student=student,
- assignments=assignments, use_color=use_color)
+ assignments=assignments)
else:
raise InvalidStudent(students=students)
return email
import pgp_mime as _pgp_mime
from .. import LOG as _LOG
-from ..color import color_string as _color_string
-from ..color import standard_colors as _standard_colors
+from ..color import GOOD_DEBUG as _GOOD_DEBUG
from ..extract_mime import extract_mime as _extract_mime
from ..extract_mime import message_time as _message_time
from ..storage import assignment_path as _assignment_path
self.assignment = assignment
-def run(basedir, course, message, person, subject,
- max_late=0, use_color=None, dry_run=None, **kwargs):
+def run(basedir, course, message, person, subject, max_late=0, dry_run=None,
+ **kwargs):
"""
>>> from pgp_mime.email import encodedMIMEText
>>> from ..test.course import StubCourse
>>> course.cleanup()
"""
- time = _message_time(message=message, use_color=use_color)
- assignment = _get_assignment(
- course=course, subject=subject, use_color=use_color)
+ time = _message_time(message=message)
+ assignment = _get_assignment(course=course, subject=subject)
assignment_path = _assignment_path(basedir, assignment, person)
_save_local_message_copy(
msg=message, person=person, assignment_path=assignment_path,
- use_color=use_color, dry_run=dry_run)
+ dry_run=dry_run)
_extract_mime(message=message, output=assignment_path, dry_run=dry_run)
_check_late(
basedir=basedir, assignment=assignment, person=person, time=time,
- max_late=max_late, use_color=use_color, dry_run=dry_run)
+ max_late=max_late, dry_run=dry_run)
if time:
time_str = 'on {}'.format(_formatdate(time))
else:
def _match_assignment(assignment, subject):
return assignment.name.lower() in subject.lower()
-def _get_assignment(course, subject, use_color):
+def _get_assignment(course, subject):
assignments = [a for a in course.assignments
if _match_assignment(a, subject)]
if len(assignments) != 1:
raise InvalidAssignment(assignment)
return assignments[0]
-def _save_local_message_copy(msg, person, assignment_path, use_color=None,
- dry_run=False):
- highlight,lowlight,good,bad = _standard_colors(use_color=use_color)
+def _save_local_message_copy(msg, person, assignment_path, dry_run=False):
try:
_os.makedirs(assignment_path)
except OSError:
try:
mbox = _mailbox.Maildir(mpath, factory=None, create=not dry_run)
except _mailbox.NoSuchMailboxError as e:
- _LOG.debug(_color_string(
- string='could not open mailbox at {}'.format(mpath),
- color=bad))
+ _LOG.warn('could not open mailbox at {}'.format(mpath))
mbox = None
new_msg = True
else:
new_msg = False
break
if new_msg:
- _LOG.debug(_color_string(
- string='saving email from {} to {}'.format(
- person, assignment_path), color=good))
+ _LOG.log(_GOOD_DEBUG, 'saving email from {} to {}'.format(
+ person, assignment_path))
if mbox is not None and not dry_run:
mdmsg = _mailbox.MaildirMessage(msg)
mdmsg.add_flag('S')
mbox.add(mdmsg)
mbox.close()
else:
- _LOG.debug(_color_string(
- string='already found {} in {}'.format(
- msg['Message-ID'], mpath), color=good))
+ _LOG.log(_GOOD_DEBUG, 'already found {} in {}'.format(
+ msg['Message-ID'], mpath))
-def _check_late(basedir, assignment, person, time, max_late=0, use_color=None,
- dry_run=False):
- highlight,lowlight,good,bad = _standard_colors(use_color=use_color)
+def _check_late(basedir, assignment, person, time, max_late=0, dry_run=False):
if time > assignment.due + max_late:
dt = time - assignment.due
- _LOG.warn(_color_string(
- string='{} {} late by {} seconds ({} hours)'.format(
- person.name, assignment.name, dt, dt/3600.),
- color=bad))
+ _LOG.warning('{} {} late by {} seconds ({} hours)'.format(
+ person.name, assignment.name, dt, dt/3600.))
if not dry_run:
_set_late(basedir=basedir, assignment=assignment, person=person)
from lxml import etree as _etree
from . import LOG as _LOG
-from .color import color_string as _color_string
-from .color import standard_colors as _standard_colors
from .email import construct_email as _construct_email
from .email import construct_response as _construct_response
from .model.person import Person as _Person
handlers={
'get': _handle_get,
'submit': _handle_submission,
- }, respond=None, use_color=None, dry_run=False, **kwargs):
+ }, respond=None, dry_run=False, **kwargs):
"""Run from procmail to sort incomming submissions
For example, you can setup your ``.procmailrc`` like this::
>>> course.cleanup()
"""
- highlight,lowlight,good,bad = _standard_colors(use_color=use_color)
if stream is None:
stream = _sys.stdin
for original,message,person,subject,target in _load_messages(
course=course, stream=stream, mailbox=mailbox, input_=input_,
- output=output, use_color=use_color, dry_run=dry_run,
+ output=output, dry_run=dry_run,
continue_after_invalid_message=continue_after_invalid_message,
respond=respond):
try:
handler(
basedir=basedir, course=course, message=message,
person=person, subject=subject,
- max_late=max_late, use_color=use_color, dry_run=dry_run)
+ max_late=max_late, dry_run=dry_run)
except _InvalidMessage as error:
if not continue_after_invalid_message:
raise
def _load_messages(course, stream, mailbox=None, input_=None, output=None,
continue_after_invalid_message=False, respond=None,
- use_color=None, dry_run=False):
+ dry_run=False):
if mailbox is None:
mbox = None
messages = [(None,_message_from_file(stream))]
raise ValueError(mailbox)
for key,msg in messages:
try:
- ret = _parse_message(
- course=course, message=msg, use_color=use_color)
+ ret = _parse_message(course=course, message=msg)
except _InvalidMessage as error:
if not continue_after_invalid_message:
raise
del mbox[key]
yield ret
-def _parse_message(course, message, use_color=None):
+def _parse_message(course, message):
"""Parse an incoming email and respond if neccessary.
Return ``(msg, person, assignment, time)`` on successful parsing.
original = message
person = subject = target = None
try:
- person = _get_message_person(
- course=course, message=message, use_color=use_color)
+ person = _get_message_person(course=course, message=message)
if person.pgp_key:
message = _get_decoded_message(
- course=course, message=message, person=person,
- use_color=use_color)
- subject = _get_message_subject(message=message, use_color=use_color)
- target = _get_message_target(subject=subject, use_color=use_color)
+ course=course, message=message, person=person)
+ subject = _get_message_subject(message=message)
+ target = _get_message_target(subject=subject)
except _InvalidMessage as error:
error.course = course
error.message = original
raise
return (original, message, person, subject, target)
-def _get_message_person(course, message, use_color=None):
+def _get_message_person(course, message):
sender = message['Return-Path'] # RFC 822
if sender is None:
raise NoReturnPath(message)
raise AmbiguousAddress(message=message, address=sender, people=people)
return people[0]
-def _get_decoded_message(course, message, person, use_color=None):
- msg = _get_verified_message(
- message, person.pgp_key, use_color=use_color)
+def _get_decoded_message(course, message, person):
+ msg = _get_verified_message(message, person.pgp_key)
if msg is None:
raise _UnsignedMessage(message=message)
return msg
-def _get_message_subject(message, use_color=None):
+def _get_message_subject(message):
"""
>>> from email.header import Header
>>> from pgp_mime.email import encodedMIMEText
_LOG.debug('decoded header {} -> {}'.format(parts[0], subject))
return subject.lower().replace('#', '')
-def _get_message_target(subject, use_color=None):
+def _get_message_target(subject):
"""
>>> _get_message_target(subject='no tag')
Traceback (most recent call last):
_LOG.debug('extracted target {} -> {}'.format(subject, target))
return target
-def _get_handler(handlers, target, use_color=None):
+def _get_handler(handlers, target):
try:
handler = handlers[target]
except KeyError:
response_subject = 'no handler for {}'.format(target)
- highlight,lowlight,good,bad = _standard_colors(use_color=use_color)
- _LOG.debug(_color_string(string=response_subject, color=bad))
+ _LOG.warning(_color_string(string=response_subject))
raise InvalidHandlerMessage(target=target, handlers=handlers)
return handler
-def _get_verified_message(message, pgp_key, use_color=None):
+def _get_verified_message(message, pgp_key):
"""
>>> from pgp_mime import sign, encodedMIMEText
>>> print(_get_verified_message(message, pgp_key='4332B6E3'))
None
"""
- highlight,lowlight,good,bad = _standard_colors(use_color=use_color)
mid = message['message-id']
try:
decrypted,verified,result = _pgp_mime.verify(message=message)
except (ValueError, AssertionError):
- _LOG.warn(_color_string(
- string='could not verify {} (not signed?)'.format(mid),
- color=bad))
+ _LOG.warning('could not verify {} (not signed?)'.format(mid))
return None
- _LOG.info(_color_string(str(result, 'utf-8'), color=lowlight))
+ _LOG.debug(str(result, 'utf-8'))
tree = _etree.fromstring(result.replace(b'\x00', b''))
match = None
for signature in tree.findall('.//signature'):
match = signature
break
if match is None:
- _LOG.warn(_color_string(
- string='{} is not signed by the expected key'.format(mid),
- color=bad))
+ _LOG.warning('{} is not signed by the expected key'.format(mid))
return None
if not verified:
sumhex = list(signature.iterchildren('summary'))[0].get('value')
summary = int(sumhex, 16)
if summary != 0:
- _LOG.warn(_color_string(
- string='{} has an unverified signature'.format(mid),
- color=bad))
+ _LOG.warning('{} has an unverified signature'.format(mid))
return None
# otherwise, we may have an untrusted key. We'll count that
# as verified here, because the caller is explicity looking
from .color import write_color as _write_color
-def tabulate(course, statistics=False, stream=None, use_color=False, **kwargs):
+def tabulate(course, statistics=False, stream=None, use_color=None, **kwargs):
"""Return a table of student's grades to date
"""
if stream is None:
"""
target = join_with_and([t.alias() for t in targets])
table = _io.StringIO()
- _tabulate(course=course, statistics=True, stream=table)
+ _tabulate(course=course, statistics=True, stream=table, use_color=False)
return _construct_text_email(
author=author, targets=targets, cc=cc,
subject='Course grades',