import time
import traceback
-SUBJECT_TAG = "[be-bug]"
-HANDLER_ADDRESS = "BE Bugs <wking@thor.physics.drexel.edu>"
+HANDLER_ADDRESS = u"BE Bugs <wking@thor.physics.drexel.edu>"
_THIS_DIR = os.path.abspath(os.path.dirname(__file__))
BE_DIR = _THIS_DIR
-LOGPATH = os.path.join(_THIS_DIR, "be-handle-mail.log")
+LOGPATH = os.path.join(_THIS_DIR, u"be-handle-mail.log")
LOGFILE = None
-libbe.encoding.ENCODING = "utf-8" # force default encoding
-ENCODING = libbe.encoding.get_encoding()
+SUBJECT_TAG_BASE = u"[be-bug"
+SUBJECT_TAG_NEW = u"%s:submit]" % SUBJECT_TAG_BASE
+SUBJECT_TAG_COMMENT = re.compile(u"%s:([\-0-9a-z]*)]"
+ % SUBJECT_TAG_BASE.replace("[","\["))
+SUBJECT_TAG_CONTROL = u"%s]" % SUBJECT_TAG_BASE
+
+NEW_REQUIRED_PSEUDOHEADERS = [u"Version"]
+NEW_OPTIONAL_PSEUDOHEADERS = [u"Reporter"]
+
+CONTROL_COMMENT = u"#"
+CONTROL_BREAK = u"--"
-ALLOWED_COMMANDS = ["new", "comment", "list", "show", "help"]
+ALLOWED_COMMANDS = [u"new", u"comment", u"list", u"show", u"help"]
+
+
+libbe.encoding.ENCODING = u"utf-8" # force default encoding
+ENCODING = libbe.encoding.get_encoding()
class InvalidEmail (ValueError):
def __init__(self, msg, message):
class InvalidSubject (InvalidEmail):
def __init__(self, msg, message=None):
if message == None:
- message = "Invalid subject"
+ message = u"Invalid subject"
InvalidEmail.__init__(self, msg, message)
def response_body(self):
err_text = u"\n".join([unicode(self), u"",
self.msg.subject()])
return err_text
-class InvalidEmailCommand (InvalidSubject):
- def __init__(self, msg, message=None):
- if message == None:
- message = "Invalid command '%s'" % msg.subject_command()
- InvalidSubject.__init__(self, msg, message)
+class InvalidPseudoHeader (InvalidEmail):
+ def response_body(self):
+ err_text = [u"Invalid pseudo-header:\n",
+ unicode(self)]
+ return u"\n".join(err_text)
class InvalidExecutionCommand (InvalidEmail):
def __init__(self, msg, command, message=None):
if message == None:
- message = "Invalid execution command '%s'" % command
+ message = u"Invalid execution command '%s'" % command
InvalidEmail.__init__(self, msg, message)
self.command = command
class InvalidOption (InvalidExecutionCommand):
def __init__(self, msg, option, message=None):
if message == None:
- message = "Invalid option '%s' to command '%s'" % (option, command)
+ message = u"Invalid option '%s' to command '%s'" % (option, command)
InvalidCommand.__init__(self, msg, info, command, message)
self.option = option
self.command = command
def extract_id(self):
assert self.command.ret == 0, self.command.ret
- if self.command.command == "new":
- regexp = re.compile("Created bug with ID (.*)")
+ if self.command.command == u"new":
+ regexp = re.compile(u"Created bug with ID (.*)")
else:
raise NotImplementedError, self.command.command
match = regexp.match(self.command.stdout)
info. Returns the exit code, stdout, and stderr produced by the
command.
"""
- assert self.ret == None, "running %s twice!" % str(self)
+ if self.command in [None, u""]: # don't accept blank commands
+ raise InvalidCommand(self.msg, self)
+ elif self.command not in ALLOWED_COMMANDS:
+ raise InvalidCommand(self.msg, self)
+ assert self.ret == None, u"running %s twice!" % unicode(self)
self.normalize_args()
# set stdin and catch stdout and stderr
if self.stdin != None:
except libbe.cmdutil.GetHelp:
print libbe.cmdutil.help(command)
except libbe.cmdutil.GetCompletions:
- self.err = InvalidOption(self.msg, self.command, "--complete")
+ self.err = InvalidOption(self.msg, self.command, u"--complete")
except libbe.cmdutil.UsageError, e:
self.err = InvalidCommand(self.msg, self.command, e)
except libbe.cmdutil.UserError, e:
p=email.Parser.Parser()
self.msg=p.parsestr(self.text)
if LOGFILE != None:
- LOGFILE.write("handling %s\n" % self.author_addr())
- LOGFILE.write("\n%s\n\n" % self.text)
+ LOGFILE.write(u"handling %s\n" % self.author_addr())
+ LOGFILE.write(u"\n%s\n\n" % self.text)
def author_tuple(self):
"""
Extract and normalize the sender's email address. Returns a
return self.default_msg_attribute_access("message-id", default=default)
def subject(self):
if "subject" not in self.msg:
- raise InvalidSubject(self, "Email must contain a subject")
+ raise InvalidSubject(self, u"Email must contain a subject")
return self.msg["subject"]
def _split_subject(self):
"""
- Returns (tag, command, arg), with missing values replaced by
- None.
+ Returns (tag, subject), with missing values replaced by None.
"""
if hasattr(self, "_split_subject_cache"):
return self._split_subject_cache
- args = self.subject().split()
+ args = self.subject().split(u"]",1)
if len(args) < 1:
- self._split_subject_cache = (None, None, None)
+ self._split_subject_cache = (None, None)
elif len(args) < 2:
- self._split_subject_cache = (args[0], None, None)
- elif len(args) > 2:
- self._split_subject_cache = (args[0], args[1], tuple(args[2:]))
+ self._split_subject_cache = (args[0]+u"]", None)
else:
- self._split_subject_cache = (args[0], args[1], tuple())
+ self._split_subject_cache = (args[0]+u"]", args[1].strip())
return self._split_subject_cache
+ def _subject_tag_type(self):
+ """
+ Parse subject tag, return (type, value), where type is one of
+ None, "new", "comment", or "control"; and value is None except
+ in the case of "comment", in which case it's the bug
+ ID/shortname.
+ """
+ tag,subject = self._split_subject()
+ type = None
+ value = None
+ if tag == SUBJECT_TAG_NEW:
+ type = u"new"
+ elif tag == SUBJECT_TAG_CONTROL:
+ type = u"control"
+ else:
+ match = SUBJECT_TAG_COMMENT.match(tag)
+ if len(match.groups()) == 1:
+ type = u"comment"
+ value = match.group(1)
+ return (type, value)
def validate_subject(self):
"""
- Validate the subject line as best we can without attempting
- command execution.
+ Validate the subject line.
"""
- tag,command,args = self._split_subject()
- if tag != SUBJECT_TAG:
+ tag,subject = self._split_subject()
+ if not tag.startswith(SUBJECT_TAG_BASE):
raise InvalidSubject(
- self, "Subject must start with '%s '" % SUBJECT_TAG)
- elif command == None: # don't accept blank commands
- raise InvalidEmailCommand(self)
- if command not in ALLOWED_COMMANDS:
- raise InvalidEmailCommand(self)
- def subject_command(self):
- tag,command,args = self._split_subject()
- return command
- def subject_args(self):
- tag,command,args = self._split_subject()
- return args
+ self, u"Subject must start with '%s'" % SUBJECT_TAG_BASE)
+ tag_type,value = self._subject_tag_type()
+ if tag_type == None:
+ raise InvalidSubject(self, u"Invalid tag '%s'" % tag)
+ elif tag_type == u"new" and len(subject) == 0:
+ raise InvalidSubject(self, u"Cannot create a bug with blank title")
+ elif tag_type == u"comment" and len(value) == 0:
+ raise InvalidSubject(self, u"Must specify a bug ID to comment")
def _get_bodies_and_mime_types(self):
"""
Traverse the email message returning (body, mime_type) for
for part in self.msg.walk():
if part.is_multipart():
continue
- body,mime_type = (part.get_payload(decode=1), part.get_content_type())
+ body,mime_type=(part.get_payload(decode=1),part.get_content_type())
yield (body, mime_type)
+ def _parse_body_pseudoheaders(self, body, required, optional,
+ dictionary=None):
+ """
+ Grab any pseudo-headers from the beginning of body. Raise
+ InvalidPseudoHeader on errors. Returns the body text after
+ the pseudo-header and a dictionary of set options. If you
+ like, you can initialize the dictionary with some defaults
+ and pass your initialized dict in as dictionary.
+ """
+ if dictionary == None:
+ dictionary = {}
+ body_lines = body.splitlines()
+ all = required+optional
+ for i,line in enumerate(body_lines):
+ line = line.strip()
+ if len(line) == 0:
+ break
+ key,value = line.split(":", 1)
+ value = value.strip()
+ if key not in all:
+ raise InvalidPseudoHeader(self, key)
+ if len(value) == 0:
+ raise InvalidEmail(
+ self, u"Blank value for: %s" % key)
+ dictionary[key] = value
+ missing = []
+ for key in required:
+ if key not in dictionary:
+ missing.append(key)
+ if len(missing) > 0:
+ raise InvalidPseudoHeader(self,
+ u"Missing required pseudo-headers:\n%s"
+ % u", ".join(missing))
+ remaining_body = u"\n".join(body_lines[i:]).strip()
+ return (remaining_body, dictionary)
def parse(self):
"""
Parse the commands given in the email. Raises assorted
otherwise returns a list of suggested commands to run.
"""
self.validate_subject()
- tag,command,arg_tuple = self._split_subject()
- args = list(arg_tuple)
+ tag_type,value = self._subject_tag_type()
commands = []
- if command == "new":
+ if tag_type == u"new":
+ command = u"new"
+ tag,subject = self._split_subject()
+ summary = subject
+ options = {u"Reporter": self.author_addr()}
body,mime_type = list(self._get_bodies_and_mime_types())[0]
- lines = body.strip().split("\n", 1)
- summary = lines[0]
- if "--reporter" not in args and "-r" not in args:
- args = ["--reporter", self.author_addr()]+args
+ comment_body,options = \
+ self._parse_body_pseudoheaders(body,
+ NEW_REQUIRED_PSEUDOHEADERS,
+ NEW_OPTIONAL_PSEUDOHEADERS,
+ options)
+ args = [u"--reporter", options[u"Reporter"]]
args.append(summary)
commands.append(Command(self, command, args))
- if len(lines) == 2:
- comment = lines[1]
- args = ["--author", self.author_addr(),
- "--alt-id", self.message_id(),
- "--content-type", mime_type]
+ if len(comment_body) > 0:
+ command = u"comment"
+ comment = u"Version: %s\n\n"%options[u"Version"] + comment_body
+ args = [u"--author", self.author_addr(),
+ u"--alt-id", self.message_id(),
+ u"--content-type", mime_type]
args.append(ID(commands[0]))
- args.append("-")
- commands.append(Command(self, "comment", args, stdin=comment))
- elif command == "comment":
- if "--author" not in args and "-a" not in args:
- args = ["--author", self.author_addr()] + args
- if "--alt-id" not in args:
- args = ["--alt-id", self.message_id()] + args
+ args.append(u"-")
+ commands.append(Command(self, u"comment", args, stdin=comment))
+ elif tag_type == u"comment":
+ command = u"comment"
+ bug_id = value
+ author = self.author_addr()
+ alt_id = self.message_id()
body,mime_type = list(self._get_bodies_and_mime_types())[0]
- if "--content-type" not in args and "-c" not in args:
- args = ["--content-type", mime_type] + args
- args.append("-")
+ content_type = mime_type
+ args = [u"--author", author, u"--alt-id", alt_id,
+ u"--content-type", content_type, bug_id, u"-"]
commands.append(Command(self, command, args, stdin=body))
+ elif tag_type == u"control":
+ body,mime_type = list(self._get_bodies_and_mime_types())[0]
+ for line in body.splitlines():
+ line = line.strip()
+ if line.startswith(CONTROL_COMMENT) or len(line) == 0:
+ continue
+ if line.startswith(CONTROL_BREAK):
+ break
+ command,args = line.split(u" ",1)
+ commands.append(Command(self, command, args))
+ if len(commands) == 0:
+ raise InvalidEmail(self, u"No commands in control email.")
else:
- commands.append(Command(self, command, args))
+ raise Exception, u"Unrecognized tag type '%s'" % tag_type
return commands
def run(self):
self._begin_response()
"""
global LOGPATH, LOGFILE
if logpath != None:
- if logpath == "-":
- LOGPATH = "stderr"
+ if logpath == u"-":
+ LOGPATH = u"stderr"
LOGFILE = sys.stderr
- elif logpath == "none":
- LOGPATH = "none"
+ elif logpath == u"none":
+ LOGPATH = u"none"
LOGFILE = None
elif os.path.isabs(logpath):
LOGPATH = logpath
else:
LOGPATH = os.path.join(_THIS_DIR, logpath)
- if LOGFILE == None and LOGPATH != "none":
- LOGFILE = codecs.open(LOGPATH, "a+", ENCODING)
- LOGFILE.write("Default encoding: %s\n" % ENCODING)
+ if LOGFILE == None and LOGPATH != u"none":
+ LOGFILE = codecs.open(LOGPATH, u"a+", ENCODING)
+ LOGFILE.write(u"Default encoding: %s\n" % ENCODING)
def close_logfile():
- if LOGFILE != None and LOGPATH not in ["stderr", "none"]:
+ if LOGFILE != None and LOGPATH not in [u"stderr", u"none"]:
LOGFILE.close()
open_logfile(options.logfile)
if len(msg_text.strip()) == 0: # blank email!?
if LOGFILE != None:
- LOGFILE.write("Blank email!")
+ LOGFILE.write(u"Blank email!\n")
close_logfile()
sys.exit(1)
try:
response = e.response()
except Exception, e:
if LOGFILE != None:
- LOGFILE.write("Uncaught exception:\n%s\n" % (e,))
+ LOGFILE.write(u"Uncaught exception:\n%s\n" % (e,))
traceback.print_tb(sys.exc_traceback, file=LOGFILE)
close_logfile()
sys.exit(1)