Fixed import_xml.py to live up to its longhelp description.
authorW. Trevor King <wking@drexel.edu>
Mon, 30 Nov 2009 11:35:29 +0000 (06:35 -0500)
committerW. Trevor King <wking@drexel.edu>
Mon, 30 Nov 2009 11:35:29 +0000 (06:35 -0500)
Also added LonghelpTestCase to prove it.

becommands/import_xml.py

index a74d329b1ab325a2e20d7a0725ccdbb4ed0117c2..892a09e47a7c6310f59ee602da5048ebfafe6067 100644 (file)
 """Import comments and bugs from XML"""
 from libbe import cmdutil, bugdir, bug, comment, utility
 from becommands.comment import complete
+import copy
 import os
 import sys
 try: # import core module, Python >= 2.5
     from xml.etree import ElementTree
 except ImportError: # look for non-core module
     from elementtree import ElementTree
+import doctest
+import unittest
 __desc__ = __doc__
 
 def execute(args, manipulate_encodings=True, restrict_file_access=False):
@@ -63,6 +66,22 @@ def execute(args, manipulate_encodings=True, restrict_file_access=False):
     if options.comment_root != None:
         croot_bug,croot_comment = \
             cmdutil.bug_comment_from_id(bd, options.comment_root)
+        croot_bug.load_comments(load_full=True)
+        croot_bug.set_sync_with_disk(False)
+        if croot_comment.uuid == comment.INVALID_UUID:
+            croot_comment = croot_bug.comment_root
+        else:
+            croot_comment = croot_bug.comment_from_uuid(croot_comment.uuid)
+        new_croot_bug = bug.Bug(bugdir=bd, uuid=croot_bug.uuid)
+        new_croot_bug.explicit_attrs = []
+        new_croot_bug.comment_root = copy.deepcopy(croot_bug.comment_root)
+        if croot_comment.uuid == comment.INVALID_UUID:
+            new_croot_comment = new_croot_bug.comment_root
+        else:
+            new_croot_comment = \
+                new_croot_bug.comment_from_uuid(croot_comment.uuid)
+        for new in new_croot_bug.comments():
+            new.explicit_attrs = []
     else:
         croot_bug,croot_comment = (None, None)
 
@@ -87,7 +106,7 @@ def execute(args, manipulate_encodings=True, restrict_file_access=False):
     for child in be_xml.getchildren():
         if child.tag == 'bug':
             new = bug.Bug(bugdir=bd)
-            new.from_xml(unicode(ElementTree.tostring(child)).decode('unicode_escape'))
+            new.from_xml(unicode(ElementTree.tostring(child)).decode("unicode_escape"))
             root_bugs.append(new)
         elif child.tag == 'comment':
             new = comment.Comment(croot_bug)
@@ -107,35 +126,47 @@ def execute(args, manipulate_encodings=True, restrict_file_access=False):
                 % (child.tag, comment_list.tag)
 
     # merge the new root_comments
+    if options.add_only == True:
+        accept_changes = False
+        accept_extra_strings = False
+    else:
+        accept_changes = True
+        accept_extra_strings = True
+    accept_comments = True
     if len(root_comments) > 0:
         if croot_bug == None:
             raise UserError(
                 '--comment-root option is required for your root comments:\n%s'
                 % '\n\n'.join([c.string() for c in root_comments]))
-        croot_cids = []
-        for c in croot_bug.comment_root.traverse():
-            croot_cids.append(c.uuid)
-            if c.alt_id != None:
-                croot_cids.append(c.alt_id)
-        for new in root_comments:
-            if new.alt_id in croot_cids:
-                raise cmdutil.UserError(
-                    'clashing comment alt_id: %s' % new.alt_id)
-            croot_cids.append(new.uuid)
-            if new.alt_id != None:
-                croot_cids.append(new.alt_id)
-            if new.in_reply_to == None:
-                new.in_reply_to = croot_comment.uuid
         try:
             # link new comments
-            comment.list_to_root(root_comments,croot_bug,root=croot_comment,
-                                 ignore_missing_references= \
-                                     options.ignore_missing_references)
+            new_croot_bug.add_comments(root_comments,
+                                       default_parent=new_croot_comment,
+                                       ignore_missing_references= \
+                                           options.ignore_missing_references)
         except comment.MissingReference, e:
             raise cmdutil.UserError(e)
+        croot_bug.merge(new_croot_bug, accept_changes=accept_changes,
+                        accept_extra_strings=accept_extra_strings,
+                        accept_comments=accept_comments)
+
     # merge the new croot_bugs
+    merged_bugs = []
+    old_bugs = []
     for new in root_bugs:
-        bd.append(new)
+        try:
+            old = bd.bug_from_uuid(new.alt_id)
+        except KeyError:
+            old = None
+        if old == None:
+            bd.append(new)
+        else:
+            old.load_comments(load_full=True)
+            old.merge(new, accept_changes=accept_changes,
+                      accept_extra_strings=accept_extra_strings,
+                      accept_comments=accept_comments)
+            merged_bugs.append(new)
+            old_bugs.append(old)
 
     # protect against programmer error causing data loss:
     if croot_bug != None:
@@ -144,14 +175,18 @@ def execute(args, manipulate_encodings=True, restrict_file_access=False):
             assert new.uuid in comms, \
                 "comment %s wasn't added to %s" % (new.uuid, croot_comment.uuid)
     for new in root_bugs:
-        assert bd.has_bug(new.uuid), \
-            "bug %s wasn't added" % (new.uuid)
+        if not new in merged_bugs:
+            assert bd.has_bug(new.uuid), \
+                "bug %s wasn't added" % (new.uuid)
 
     # save new information
-    for new in root_comments:
-        new.save()
+    if croot_bug != None:
+        croot_bug.save()
     for new in root_bugs:
-        new.save()
+        if not new in merged_bugs:
+            new.save()
+    for old in old_bugs:
+        old.save()
 
 def get_parser():
     parser = cmdutil.CmdOptionParser("be import-xml XMLFILE")
@@ -213,7 +248,7 @@ repeats.
 
 Here's an example of import activity:
   Repository
-   bug (uuid=B, author=John, status=open)
+   bug (uuid=B, creator=John, status=open)
      estr (don't forget your towel)
      estr (helps with space travel)
      com (uuid=C1, author=Jane, body=Hello)
@@ -225,7 +260,7 @@ Here's an example of import activity:
      com (uuid=C1, body=So long)
      com (uuid=C3, author=Jed, body=And thanks)
   Result
-   bug (uuid=B, author=John, status=fixed)
+   bug (uuid=B, creator=John, status=fixed)
      estr (don't forget your towel)
      estr (helps with space travel)
      estr (watch out for flying dolphins)
@@ -233,7 +268,7 @@ Here's an example of import activity:
      com (uuid=C2, author=Jess, body=World)
      com (uuid=C4, alt-id=C3, author=Jed, body=And thanks)
   Result, with --add-only
-   bug (uuid=B, author=John, status=open)
+   bug (uuid=B, creator=John, status=open)
      estr (don't forget your towel)
      estr (helps with space travel)
      com (uuid=C1, author=Jane, body=Hello)
@@ -265,3 +300,129 @@ Devs recieve email, and save it's contents as demux-bug.xml
 
 def help():
     return get_parser().help_str() + longhelp
+
+
+class LonghelpTestCase (unittest.TestCase):
+    """
+    Test import scenarios given in longhelp.
+    """
+    def setUp(self):
+        self.bugdir = bugdir.SimpleBugDir()
+        self.original_working_dir = os.getcwd()
+        os.chdir(self.bugdir.root)
+        bugA = self.bugdir.bug_from_uuid('a')
+        self.bugdir.remove_bug(bugA)
+        self.bugdir.set_sync_with_disk(False)
+        bugB = self.bugdir.bug_from_uuid('b')
+        bugB.creator = 'John'
+        bugB.status = 'open'
+        bugB.extra_strings += ["don't forget your towel"]
+        bugB.extra_strings += ['helps with space travel']
+        comm1 = bugB.comment_root.new_reply(body='Hello\n')
+        comm1.uuid = 'c1'
+        comm1.author = 'Jane'
+        comm2 = bugB.comment_root.new_reply(body='World\n')
+        comm2.uuid = 'c2'
+        comm2.author = 'Jess'
+        bugB.save()
+        self.bugdir.set_sync_with_disk(True)
+        self.xml = """
+        <be-xml>
+          <bug>
+            <uuid>b</uuid>
+            <status>fixed</status>
+            <summary>a test bug</summary>
+            <extra-string>don't forget your towel</extra-string>
+            <extra-string>watch out for flying dolphins</extra-string>
+            <comment>
+              <uuid>c1</uuid>
+              <body>So long</body>
+            </comment>
+            <comment>
+              <uuid>c3</uuid>
+              <author>Jed</author>
+              <body>And thanks</body>
+            </comment>
+          </bug>
+        </be-xml>
+        """
+    def tearDown(self):
+        os.chdir(self.original_working_dir)
+        self.bugdir.cleanup()
+    def _execute(self, *args):
+        import StringIO
+        orig_stdin = sys.stdin
+        sys.stdin = StringIO.StringIO(self.xml)
+        execute(list(args)+["-"], manipulate_encodings=False,
+                restrict_file_access=True)
+        sys.stdin = orig_stdin
+        self.bugdir._clear_bugs()
+    def testCleanBugdir(self):
+        uuids = list(self.bugdir.uuids())
+        self.failUnless(uuids == ['b'], uuids)
+    def testNotAddOnly(self):
+        self._execute()
+        uuids = list(self.bugdir.uuids())
+        self.failUnless(uuids == ['b'], uuids)
+        bugB = self.bugdir.bug_from_uuid('b')
+        self.failUnless(bugB.uuid == 'b', bugB.uuid)
+        self.failUnless(bugB.creator == 'John', bugB.creator)
+        self.failUnless(bugB.status == 'fixed', bugB.status)
+        estrs = ["don't forget your towel",
+                 'helps with space travel',
+                 'watch out for flying dolphins']
+        self.failUnless(bugB.extra_strings == estrs, bugB.extra_strings)
+        comments = list(bugB.comments())
+        self.failUnless(len(comments) == 3,
+                        ['%s (%s, %s)' % (c.uuid, c.alt_id, c.body) for c in comments])
+        c1 = bugB.comment_from_uuid('c1')
+        comments.remove(c1)
+        self.failUnless(c1.uuid == 'c1', c1.uuid)
+        self.failUnless(c1.alt_id == None, c1.alt_id)
+        self.failUnless(c1.author == 'Jane', c1.author)
+        self.failUnless(c1.body == 'So long\n', c1.body)
+        c2 = bugB.comment_from_uuid('c2')
+        comments.remove(c2)
+        self.failUnless(c2.uuid == 'c2', c2.uuid)
+        self.failUnless(c2.alt_id == None, c2.alt_id)
+        self.failUnless(c2.author == 'Jess', c2.author)
+        self.failUnless(c2.body == 'World\n', c2.body)
+        c4 = comments[0]
+        self.failUnless(len(c4.uuid) == 36, c4.uuid)
+        self.failUnless(c4.alt_id == 'c3', c4.alt_id)
+        self.failUnless(c4.author == 'Jed', c4.author)
+        self.failUnless(c4.body == 'And thanks\n', c4.body)
+    def testAddOnly(self): 
+        self._execute('--add-only')
+        uuids = list(self.bugdir.uuids())
+        self.failUnless(uuids == ['b'], uuids)
+        bugB = self.bugdir.bug_from_uuid('b')
+        self.failUnless(bugB.uuid == 'b', bugB.uuid)
+        self.failUnless(bugB.creator == 'John', bugB.creator)
+        self.failUnless(bugB.status == 'open', bugB.status)
+        estrs = ["don't forget your towel",
+                 'helps with space travel']
+        self.failUnless(bugB.extra_strings == estrs, bugB.extra_strings)
+        comments = list(bugB.comments())
+        self.failUnless(len(comments) == 3,
+                        ['%s (%s)' % (c.uuid, c.alt_id) for c in comments])
+        c1 = bugB.comment_from_uuid('c1')
+        comments.remove(c1)
+        self.failUnless(c1.uuid == 'c1', c1.uuid)
+        self.failUnless(c1.alt_id == None, c1.alt_id)
+        self.failUnless(c1.author == 'Jane', c1.author)
+        self.failUnless(c1.body == 'Hello\n', c1.body)
+        c2 = bugB.comment_from_uuid('c2')
+        comments.remove(c2)
+        self.failUnless(c2.uuid == 'c2', c2.uuid)
+        self.failUnless(c2.alt_id == None, c2.alt_id)
+        self.failUnless(c2.author == 'Jess', c2.author)
+        self.failUnless(c2.body == 'World\n', c2.body)
+        c4 = comments[0]
+        self.failUnless(len(c4.uuid) == 36, c4.uuid)
+        self.failUnless(c4.alt_id == 'c3', c4.alt_id)
+        self.failUnless(c4.author == 'Jed', c4.author)
+        self.failUnless(c4.body == 'And thanks\n', c4.body)
+
+unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])