import doctest
import unittest
-import libbe.cmdutil, libbe.encoding, libbe.utility
+from becommands import subscribe
+import libbe.cmdutil, libbe.encoding, libbe.utility, libbe.bugdir
+import libbe.diff
import send_pgp_mime
-HANDLER_ADDRESS = u"BE Bugs <wking@thor.physics.drexel.edu>"
+THIS_SERVER = u"thor.physics.drexel.edu"
+THIS_ADDRESS = u"BE Bugs <wking@thor.physics.drexel.edu>"
+
_THIS_DIR = os.path.abspath(os.path.dirname(__file__))
BE_DIR = _THIS_DIR
LOGPATH = os.path.join(_THIS_DIR, u"be-handle-mail.log")
InvalidCommand.__init__(self, msg, info, command, bigmessage)
self.option = option
+class NotificationFailed (Exception):
+ def __init__(self, msg):
+ bigmessage = "Notification failed: %s" % msg
+ Exception.__init__(self, bigmessage)
+ self.short_msg = msg
+
class ID (object):
"""
Sometimes you want to reference the output of a command that
finally:
if AUTOCOMMIT == True:
tag,subject = self._split_subject()
- command = Command(self, "commit", [subject])
- command.run()
+ self.commit_command = Command(self, "commit", [subject])
+ self.commit_command.run()
if LOGFILE != None:
LOGFILE.write("Autocommit:\n%s\n\n" %
- send_pgp_mime.flatten(command.response_msg(),
- to_unicode=True))
+ send_pgp_mime.flatten(self.commit_command.response_msg(),
+ to_unicode=True))
def _begin_response(self):
tag,subject = self._split_subject()
- response_header = [u"From: %s" % HANDLER_ADDRESS,
+ response_header = [u"From: %s" % THIS_ADDRESS,
u"To: %s" % self.author_addr(),
u"Date: %s" % libbe.utility.time_to_str(time.time()),
u"Subject: %s Re: %s"%(SUBJECT_TAG_RESPONSE,subject)
for message in self._response_messages:
response_body.attach(message)
return send_pgp_mime.attach_root(self.response_header, response_body)
+ def subscriber_emails(self):
+ if AUTOCOMMIT != True: # no way to tell what's changed
+ raise NotificationFailed("Autocommit dissabled")
+ assert len(self._response_messages) > 0
+ if self.commit_command.ret != 0:
+ # commit failed. Error already logged.
+ raise NotificationFailed("Commit failed")
+
+ # read only bugdir.
+ bd = libbe.bugdir.BugDir(from_disk=True,
+ manipulate_encodings=False)
+ if bd.rcs.versioned == False: # no way to tell what's changed
+ raise NotificationFailed("Not versioned")
+
+ subscribers = subscribe.get_bugdir_subscribers(bd, THIS_SERVER)
+ if len(subscribers) == 0:
+ return []
+
+ before_bd, after_bd = self._get_before_and_after_bugdirs(bd)
+ rem,mod,add = libbe.diff.bug_diffs(before_bd, after_bd)
+ bug_index = self._subscriber_bug_change_index(rem,mod,add)
+ header = self._subscriber_header(bd)
+
+ parts = {}
+ emails = []
+ for subscriber,subscriptions in subscribers.items():
+ header["to"] = subscriber
+ root = MIMEMultipart()
+ for id,types in subscriptions.items():
+ if id == "DIR":
+ if subscribe.BUGDIR_TYPE_ALL in subscriptions["DIR"]:
+ if ("DIR", "all") not in parts:
+ parts[("DIR", "all")] = \
+ self._subscriber_bugdir_all_part( \
+ rem,mod,add,before_bd,after_bd)
+ root.attach(parts[("DIR", "all")])
+ if subscribe.BUGDIR_TYPE_NEW in subscriptions["DIR"]:
+ if ("DIR", "new") not in parts:
+ parts[("DIR", "new")] = \
+ self._subscriber_bugdir_new_part(add)
+ root.attach(parts[("DIR", "new")])
+ continue
+ assert subscriptions[id] == [subscribe.BUG_TYPE_ALL], \
+ subscriptions[id]
+ type,bug = bug_index[id]
+ if type == "added":
+ pass # no-one other than self.author should be subscribed.
+ elif type == "modified":
+ old,new = bug
+ if (new.uuid, "mod") not in parts:
+ parts[(new.uuid, "mod")] = \
+ self._subscriber_bug_mod_part(old, new)
+ root.attach(parts[(new.uuid, "mod")])
+ elif type == "removed":
+ if (bug.uuid, "rem") not in parts:
+ parts[(bug.uuid, "rem")] = \
+ self._subscriber_bug_rem_part(old, bug)
+ root.attach(parts[(bug.uuid, "rem")])
+ if len(root.get_payload()) > 0:
+ emails.append(send_pgp_mime.attach_root(header, root))
+ if LOGFILE != None:
+ LOGFILE.write("Notfying %s of changes\n" % subscriber)
+ return emails
+ def _get_before_and_after_bugdirs(self, bd):
+ commit_msg = self.commit_command.stdout
+ assert commit_msg.startswith("Committed "), commit_msg
+ after_revision = commit_msg[len("Committed "):]
+ before_revision = bd.rcs.revision_id(-2)
+ if before_revision == None:
+ # this commit was the initial commit
+ before_bd = libbe.bugdir.BugDir(from_disk=False,
+ manipulate_encodings=False)
+ else:
+ before_bd = bd.duplicate_bugdir(before_revision)
+ #after_bd = bd.duplicate_bugdir(after_revision)
+ after_bd = bd # assume no changes since commit a few cycles ago
+ return (before_bd, after_bd)
+ def _subscriber_header(self, bd):
+ root_dir = os.path.basename(bd.root)
+ subject = "Changes to %s on %s by %s" \
+ % (root_dir, THIS_SERVER, self.author_addr())
+ header = [u"From: %s" % THIS_ADDRESS,
+ u"To: %s" % u"DUMMY-AUTHOR",
+ u"Date: %s" % libbe.utility.time_to_str(time.time()),
+ u"Subject: %s Re: %s" % (SUBJECT_TAG_RESPONSE, subject)
+ ]
+ return send_pgp_mime.header_from_text(text=u"\n".join(header))
+ def _subscriber_bug_change_index(self, removed, modified, added):
+ bug_index = {}
+ for bug in removed:
+ bug_index[bug.uuid] = ("removed", bug)
+ for bug in modified:
+ old,new = bug
+ bug_index[new.uuid] = ("modified", bug)
+ for bug in added:
+ bug_index[bug.uuid] = ("added", bug)
+ return bug_index
+ def _subscriber_bugdir_all_part(self, rem,mod,add,before_bd,after_bd):
+ root = MIMEMultipart()
+ self._att(root,self._subscriber_bugdir_bugdir_part(before_bd,after_bd))
+ self._att(root, self._subscriber_bugdir_mod_part(mod))
+ self._att(root, self._subscriber_bugdir_rem_part(rem))
+ self._att(root, self._subscriber_bugdir_new_part(add))
+ if len(root.get_payload()) == 0:
+ return None
+ return root
+ def _subscriber_bugdir_bugdir_part(self, before_bd, after_bd):
+ if before_bd.settings != after_bd.settings:
+ return send_pgp_mime.encodedMIMEText(u"BD changed!")
+ return None
+ def _subscriber_bugdir_rem_part(self, rem):
+ lines = [u"The following bugs were removed by %s." % self.author_addr(), u""]
+ for bug in rem:
+ bug_text = bug.string(show_comments=False)
+ lines.extend(bug_text.splitlines())
+ return send_pgp_mime.encodedMIMEText(u"\n".join(lines)+u"\n")
+ def _subscriber_bugdir_new_part(self, add):
+ lines = [u"The following bugs were added by %s." % self.author_addr(), u""]
+ for bug in add:
+ bug_text = bug.string(show_comments=True)
+ lines.extend(bug_text.splitlines())
+ return send_pgp_mime.encodedMIMEText(u"\n".join(lines)+u"\n")
+ def _subscriber_bugdir_mod_part(self, mod):
+ lines = [u"The following bugs were modified by %s." \
+ % self.author_addr(), u""]
+ for old,new in mod:
+ change_text = libbe.diff.bug_changes(old, new)
+ if change_text == None:
+ return None
+ lines.extend(change_text.splitlines())
+ return send_pgp_mime.encodedMIMEText(u"\n".join(lines)+u"\n")
+ def _subscriber_bug_rem_part(self, bug):
+ lines = [u"The following bug was removed by %s"%self.author_addr(),u""]
+ bug_text = bug.string(show_comments=False)
+ lines.extend(bug_text.splitlines())
+ return send_pgp_mime.encodedMIMEText(u"\n".join(lines)+u"\n")
+ def _subscriber_bug_mod_part(self, old, new):
+ lines = [u"Bug %s was modified by %s" % \
+ (new.uuid, self.author_addr()), u""]
+ change_text = libbe.diff.bug_changes(old, new)
+ lines.extend(change_text.splitlines())
+ return send_pgp_mime.encodedMIMEText(u"\n".join(lines)+u"\n")
+ def _att(self, root, attachment):
+ if attachment != None:
+ root.attach(attachment)
+ return root
def generate_global_tags(tag_base=u"be-bug"):
"""
parser.add_option('-a', '--disable-autocommit', dest='autocommit',
default=True, action='store_false',
help='Disable the autocommit after parsing the email.')
+ parser.add_option('-s', '--disable-subscribers', dest='subscribers',
+ default=True, action='store_false',
+ help='Disable subscriber notification emails.')
parser.add_option('--test', dest='test', action='store_true',
help='Run internal unit-tests and exit.')
LOGFILE.write(u"\n%s\n\n" % send_pgp_mime.flatten(response,
to_unicode=True))
send_pgp_mime.mail(response, send_pgp_mime.sendmail)
+ if options.subscribers == True:
+ LOGFILE.write(u"Checking for subscribers\n")
+ try:
+ emails = m.subscriber_emails()
+ except NotificationFailed, e:
+ LOGFILE.write(unicode(e) + u"\n")
+ else:
+ for msg in emails:
+ if options.output == True:
+ print send_pgp_mime.flatten(msg, to_unicode=True)
+ else:
+ send_pgp_mime.mail(msg, send_pgp_mime.sendmail)
+
close_logfile()
+
class GenerateGlobalTagsTestCase (unittest.TestCase):
def setUp(self):
super(GenerateGlobalTagsTestCase, self).setUp()