Fix problems with Parallel Tasks and Exception handling. (Kevin Quick)
authorstevenknight <stevenknight@fdb21ef1-2011-0410-befe-b5e4ea1792b1>
Fri, 17 Sep 2004 12:52:09 +0000 (12:52 +0000)
committerstevenknight <stevenknight@fdb21ef1-2011-0410-befe-b5e4ea1792b1>
Fri, 17 Sep 2004 12:52:09 +0000 (12:52 +0000)
git-svn-id: http://scons.tigris.org/svn/scons/trunk@1079 fdb21ef1-2011-0410-befe-b5e4ea1792b1

src/CHANGES.txt
src/engine/SCons/Job.py
src/engine/SCons/JobTests.py
src/engine/SCons/Taskmaster.py
src/engine/SCons/TaskmasterTests.py
test/exceptions.py

index 7a6ef02879b39258b6fac4bbaefe02b4ac9c6bb6..6ee412c176c714b3ad47b2d387691690bc338253 100644 (file)
@@ -100,6 +100,8 @@ RELEASE 0.97 - XXX
 
   - Test enhancements in SourceCode.py and option-n.py.
 
+  - Fix problems with Parallel Task Exception handling.
+
   From Christoph Wiedemann:
 
   - Add an Environment.SetDefault() method that only sets values if
index bbe0a2da1eddef5b3fb2469582a7004fe8dd1529..2d4dbf8f4c10fabcc14c1622222c5382deecffd2 100644 (file)
@@ -179,11 +179,9 @@ else:
         def get(self, block = 1):
             """Remove and return a result tuple from the results queue."""
             return self.resultsQueue.get(block)
-            
-        def get_nowait(self):
-            """Remove and result a result tuple from the results queue 
-            without blocking."""
-            return self.get(0)
+
+        def preparation_failed(self, obj):
+            self.resultsQueue.put((obj, 0))
 
     class Parallel:
         """This class is used to execute tasks in parallel, and is somewhat 
@@ -213,7 +211,6 @@ else:
             self.taskmaster = taskmaster
             self.tp = ThreadPool(num)
 
-            self.jobs = 0
             self.maxjobs = num
 
         def start(self):
@@ -222,8 +219,25 @@ else:
             more tasks. If a task fails to execute (i.e. execute() raises
             an exception), then the job will stop."""
 
+            jobs = 0
+            
             while 1:
-                if self.jobs < self.maxjobs:
+
+                # There's a concern here that the while-loop test below
+                # might delay reporting status back about failed build
+                # tasks until the entire build is done if tasks execute
+                # fast enough, or self.maxjobs is big enough.  It looks
+                # like that's enough of a corner case that we'll wait to
+                # see if it's an issue in practice.  If so, one possible
+                # fix might be:
+                #
+                #       while jobs < self.maxjobs and \
+                #             self.tp.resultsQueue.empty():
+                #
+                # but that's somewhat unattractive because the
+                # resultsQueue.empty() check might introduce some
+                # significant overhead involving mutex locking.
+                while jobs < self.maxjobs:
                     task = self.taskmaster.next_task()
                     if task is None:
                         break
@@ -234,26 +248,25 @@ else:
                     except KeyboardInterrupt:
                         raise
                     except:
-                        # Let the failed() callback function arrange for the
-                        # build to stop if that's appropriate.
-                        task.failed()
+                        # Let the failed() callback function arrange
+                        # for the build to stop if that's appropriate.
+                        task.exception_set()
+                        self.tp.preparation_failed(task)
+                        jobs = jobs + 1
+                        continue
 
                     # dispatch task
                     self.tp.put(task)
-                    self.jobs = self.jobs + 1
+                    jobs = jobs + 1
 
-                while 1:
-                    try:
-                        task, ok = self.tp.get_nowait()
-                    except Queue.Empty:
-                        if not (self.jobs is self.maxjobs or self.taskmaster.is_blocked()):
-                            break
-                        task, ok = self.tp.get()
-
-                    self.jobs = self.jobs - 1
-                    if ok:
-                        task.executed()
-                    else:
-                        task.failed()
-
-                    task.postprocess()
+                if not task and not jobs: break
+
+                task, ok = self.tp.get()
+
+                jobs = jobs - 1
+                if ok:
+                    task.executed()
+                else:
+                    task.failed()
+
+                task.postprocess()
index d216464bd81d501ee21ffff724fc98bb72506402..7dc865808c832df646f2f3aeec9dba98c5dbe274 100644 (file)
@@ -307,6 +307,145 @@ class ParallelExceptionTestCase(unittest.TestCase):
         self.failUnless(taskmaster.num_postprocessed >= 1,
                     "one or more tasks should have been postprocessed")
 
+#---------------------------------------------------------------------
+# Above tested Job object with contrived Task and Taskmaster objects.
+# Now test Job object with actual Task and Taskmaster objects.
+
+import SCons.Taskmaster
+import SCons.Node
+import time
+
+
+class testnode (SCons.Node.Node):
+    def __init__(self):
+        SCons.Node.Node.__init__(self)
+        self.expect_to_be = SCons.Node.executed
+
+class goodnode (testnode):
+    pass
+
+class slowgoodnode (goodnode):
+    def prepare(self):
+        # Delay to allow scheduled Jobs to run while the dispatcher
+        # sleeps.  Keep this short because it affects the time taken
+        # by this test.
+        time.sleep(0.15)
+        goodnode.prepare(self)
+        
+class badnode (goodnode):
+    def __init__(self):
+        goodnode.__init__(self)
+        self.expect_to_be = SCons.Node.failed
+    def build(self, **kw):
+        raise 'badnode exception'
+
+class slowbadnode (badnode):
+    def build(self, **kw):
+        # Appears to take a while to build, allowing faster builds to
+        # overlap.  Time duration is not especially important, but if
+        # it is faster than slowgoodnode then these could complete
+        # while the scheduler is sleeping.
+        time.sleep(0.05)
+        raise 'slowbadnode exception'
+
+class badpreparenode (badnode):
+    def prepare(self):
+        raise 'badpreparenode exception'
+
+class _SConsTaskTest(unittest.TestCase):
+
+    def _test_seq(self, num_jobs):
+        for node_seq in [
+            [goodnode],
+            [badnode],
+            [slowbadnode],
+            [slowgoodnode],
+            [badpreparenode],
+            [goodnode, badnode],
+            [slowgoodnode, badnode],
+            [goodnode, slowbadnode],
+            [goodnode, goodnode, goodnode, slowbadnode],
+            [goodnode, slowbadnode, badpreparenode, slowgoodnode],
+            [goodnode, slowbadnode, slowgoodnode, badnode]
+            ]:
+
+            self._do_test(num_jobs, node_seq)
+
+    def _do_test(self, num_jobs, node_seq):
+
+        testnodes = []
+        for tnum in range(num_tasks):
+            testnodes.append(node_seq[tnum % len(node_seq)]())
+
+        taskmaster = SCons.Taskmaster.Taskmaster(testnodes)
+        jobs = SCons.Job.Jobs(num_jobs, taskmaster)
+
+        # Exceptions thrown by tasks are not actually propagated to
+        # this level, but are instead stored in the Taskmaster.
+        
+        jobs.run()
+
+        # Now figure out if tests proceeded correctly.  The first test
+        # that fails will shutdown the initiation of subsequent tests,
+        # but any tests currently queued for execution will still be
+        # processed, and any tests that completed before the failure
+        # would have resulted in new tests being queued for execution.
+
+        # Apply the following operational heuristics of Job.py:
+        #  0) An initial jobset of tasks will be queued before any
+        #     good/bad results are obtained (from "execute" of task in
+        #     thread).
+        #  1) A goodnode will complete immediately on its thread and
+        #     allow another node to be queued for execution.
+        #  2) A badnode will complete immediately and suppress any
+        #     subsequent execution queuing, but all currently queued
+        #     tasks will still be processed.
+        #  3) A slowbadnode will fail later.  It will block slots in
+        #     the job queue.  Nodes that complete immediately will
+        #     allow other nodes to be queued in their place, and this
+        #     will continue until either (#2) above or until all job
+        #     slots are filled with slowbadnode entries.
+
+        # One approach to validating this test would be to try to
+        # determine exactly how many nodes executed, how many didn't,
+        # and the results of each, and then to assert failure on any
+        # mismatch (including the total number of built nodes).
+        # However, while this is possible to do for a single-processor
+        # system, it is nearly impossible to predict correctly for a
+        # multi-processor system and still test the characteristics of
+        # delayed execution nodes.  Stated another way, multithreading
+        # is inherently non-deterministic unless you can completely
+        # characterize the entire system, and since that's not
+        # possible here, we shouldn't try.
+
+        # Therefore, this test will simply scan the set of nodes to
+        # see if the node was executed or not and if it was executed
+        # that it obtained the expected value for that node
+        # (i.e. verifying we don't get failure crossovers or
+        # mislabelling of results).
+
+        for N in testnodes:
+            self.failUnless(N.get_state() in [None, N.expect_to_be],
+                            "node ran but got unexpected result")
+
+        self.failUnless(filter(lambda N: N.get_state(), testnodes),
+                        "no nodes ran at all.")
+
+
+class SerialTaskTest(_SConsTaskTest):
+    def runTest(self):
+        "test serial jobs with actual Taskmaster and Task"
+        self._test_seq(1)
+
+
+class ParallelTaskTest(_SConsTaskTest):
+    def runTest(self):
+        "test parallel jobs with actual Taskmaster and Task"
+        self._test_seq(num_jobs)
+
+
+
+#---------------------------------------------------------------------
 
 def suite():
     suite = unittest.TestSuite()
@@ -315,6 +454,8 @@ def suite():
     suite.addTest(NoParallelTestCase())
     suite.addTest(SerialExceptionTestCase())
     suite.addTest(ParallelExceptionTestCase())
+    suite.addTest(SerialTaskTest())
+    suite.addTest(ParallelTaskTest())
     return suite
 
 if __name__ == "__main__":
index 8f9839e6143d57b43b24091909bb31ed961e89c9..dab50264ccdc96b6bef9799151cb19a27246d1eb 100644 (file)
@@ -58,6 +58,7 @@ class Task:
         self.targets = targets
         self.top = top
         self.node = node
+        self.exc_clear()
 
     def display(self, message):
         """Allow the calling interface to display a message
@@ -73,7 +74,7 @@ class Task:
         # Now that it's the appropriate time, give the TaskMaster a
         # chance to raise any exceptions it encountered while preparing
         # this task.
-        self.tm.exception_raise()
+        self.exception_raise()
 
         if self.tm.message:
             self.display(self.tm.message)
@@ -209,14 +210,25 @@ class Task:
             t.postprocess()
 
     def exc_info(self):
-        return self.tm.exception
+        return self.exception
 
     def exc_clear(self):
-        self.tm.exception_clear()
+        self.exception = (None, None, None)
+        self.exception_raise = self._no_exception_to_raise
 
-    def exception_set(self):
-        self.tm.exception_set()
+    def exception_set(self, exception=None):
+        if not exception:
+            exception = sys.exc_info()
+        self.exception = exception
+        self.exception_raise = self._exception_raise
 
+    def _no_exception_to_raise(self):
+        pass
+
+    def _exception_raise(self):
+        """Raise a pending exception that was recorded while
+        getting a Task ready for execution."""
+        self.tm.exception_raise(self.exc_info())
 
 
 def order(dependencies):
@@ -240,7 +252,6 @@ class Taskmaster:
         self.tasker = tasker
         self.ready = None # the next task that is ready to be executed
         self.order = order
-        self.exception_clear()
         self.message = None
 
     def _find_next_ready_node(self):
@@ -249,6 +260,8 @@ class Taskmaster:
         if self.ready:
             return
 
+        self.ready_exc = None
+        
         while self.candidates:
             node = self.candidates[-1]
             state = node.get_state()
@@ -266,7 +279,7 @@ class Taskmaster:
             except SystemExit:
                 exc_value = sys.exc_info()[1]
                 e = SCons.Errors.ExplicitExit(node, exc_value.code)
-                self.exception_set((SCons.Errors.ExplicitExit, e))
+                self.ready_exc = (SCons.Errors.ExplicitExit, e)
                 self.candidates.pop()
                 self.ready = node
                 break
@@ -277,7 +290,7 @@ class Taskmaster:
                 # children (like a child couldn't be linked in to a
                 # BuildDir, or a Scanner threw something).  Arrange to
                 # raise the exception when the Task is "executed."
-                self.exception_set()
+                self.ready_exc = sys.exc_info()
                 self.candidates.pop()
                 self.ready = node
                 break
@@ -313,7 +326,7 @@ class Taskmaster:
                 # the kids are derived (like a child couldn't be linked
                 # from a repository).  Arrange to raise the exception
                 # when the Task is "executed."
-                self.exception_set()
+                self.ready_exc = sys.exc_info()
                 self.candidates.pop()
                 self.ready = node
                 break
@@ -397,8 +410,13 @@ class Taskmaster:
             # a child couldn't be linked in to a BuildDir when deciding
             # whether this node is current).  Arrange to raise the
             # exception when the Task is "executed."
-            self.exception_set()
+            self.ready_exc = sys.exc_info()
+
+        if self.ready_exc:
+            task.exception_set(self.ready_exc)
+
         self.ready = None
+        self.ready_exc = None
 
         return task
 
@@ -442,21 +460,6 @@ class Taskmaster:
         self.candidates.extend(self.pending)
         self.pending = []
 
-    def exception_set(self, exception=None):
-        if exception is None:
-            exception = sys.exc_info()
-        self.exception = exception
-        self.exception_raise = self._exception_raise
-
-    def exception_clear(self):
-        self.exception = (None, None, None)
-        self.exception_raise = self._no_exception_to_raise
-
-    def _no_exception_to_raise(self):
-        pass
-
-    def _exception_raise(self):
-        """Raise a pending exception that was recorded while
-        getting a Task ready for execution."""
-        exc_type, exc_value = self.exception[:2]
+    def exception_raise(self, exception):
+        exc_type, exc_value = exception[:2]
         raise exc_type, exc_value
index ef7f51afcf8c5d68f7dc8da83c63521c5188dd35..8865e09b9ce11ed81fa60df1e2fdadf1ca14829e 100644 (file)
@@ -478,7 +478,7 @@ class TaskmasterTestCase(unittest.TestCase):
         n1 = Node("n1")
         tm = SCons.Taskmaster.Taskmaster(targets = [n1], tasker = MyTask)
         t = tm.next_task()
-        exc_type, exc_value, exc_tb = tm.exception
+        exc_type, exc_value, exc_tb = t.exception
         assert exc_type == MyException, repr(exc_type)
         assert str(exc_value) == "from make_ready()", exc_value
 
@@ -557,14 +557,14 @@ class TaskmasterTestCase(unittest.TestCase):
         n1 = StopNode("n1")
         tm = SCons.Taskmaster.Taskmaster([n1])
         t = tm.next_task()
-        exc_type, exc_value, exc_tb = tm.exception
+        exc_type, exc_value, exc_tb = t.exception
         assert exc_type == SCons.Errors.StopError, repr(exc_type)
         assert str(exc_value) == "stop!", exc_value
 
         n2 = ExitNode("n2")
         tm = SCons.Taskmaster.Taskmaster([n2])
         t = tm.next_task()
-        exc_type, exc_value = tm.exception
+        exc_type, exc_value = t.exception
         assert exc_type == SCons.Errors.ExplicitExit, repr(exc_type)
         assert exc_value.node == n2, exc_value.node
         assert exc_value.status == 77, exc_value.status
@@ -741,8 +741,8 @@ class TaskmasterTestCase(unittest.TestCase):
         built_text = None
         n5 = Node("n5")
         tm = SCons.Taskmaster.Taskmaster([n5])
-        tm.exception_set((MyException, "exception value"))
         t = tm.next_task()
+        t.exception_set((MyException, "exception value"))
         exc_caught = None
         try:
             t.prepare()
@@ -880,19 +880,20 @@ class TaskmasterTestCase(unittest.TestCase):
         """
         n1 = Node("n1")
         tm = SCons.Taskmaster.Taskmaster([n1])
+        t  = tm.next_task()
 
-        tm.exception_set((1, 2))
-        exc_type, exc_value = tm.exception
+        t.exception_set((1, 2))
+        exc_type, exc_value = t.exception
         assert exc_type == 1, exc_type
         assert exc_value == 2, exc_value
 
-        tm.exception_set(3)
-        assert tm.exception == 3
+        t.exception_set(3)
+        assert t.exception == 3
 
         try: 1/0
         except: pass
-        tm.exception_set(None)
-        exc_type, exc_value, exc_tb = tm.exception
+        t.exception_set(None)
+        exc_type, exc_value, exc_tb = t.exception
         assert exc_type is ZeroDivisionError, exc_type
         exception_values = [
             "integer division or modulo",
@@ -900,9 +901,9 @@ class TaskmasterTestCase(unittest.TestCase):
         ]
         assert str(exc_value) in exception_values, exc_value
 
-        tm.exception_set(("exception 1", None))
+        t.exception_set(("exception 1", None))
         try:
-            tm.exception_raise()
+            t.exception_raise()
         except:
             exc_type, exc_value = sys.exc_info()[:2]
             assert exc_type == "exception 1", exc_type
@@ -910,9 +911,9 @@ class TaskmasterTestCase(unittest.TestCase):
         else:
             assert 0, "did not catch expected exception"
 
-        tm.exception_set(("exception 2", "xyzzy"))
+        t.exception_set(("exception 2", "xyzzy"))
         try:
-            tm.exception_raise()
+            t.exception_raise()
         except:
             exc_type, exc_value = sys.exc_info()[:2]
             assert exc_type == "exception 2", exc_type
@@ -920,6 +921,20 @@ class TaskmasterTestCase(unittest.TestCase):
         else:
             assert 0, "did not catch expected exception"
 
+        t.exception_set(("exception 3", "XYZZY"))
+        def fw_exc(exc):
+            raise 'exception_forwarded', exc
+        tm.exception_raise = fw_exc 
+        try:
+            t.exception_raise()
+        except:
+            exc_type, exc_value = sys.exc_info()[:2]
+            assert exc_type == 'exception_forwarded', exc_type
+            assert exc_value[0] == "exception 3", exc_value[0]
+            assert exc_value[1] == "XYZZY", exc_value[1]
+        else:
+            assert 0, "did not catch expected exception"
+
     def test_postprocess(self):
         """Test postprocessing targets to give them a chance to clean up
         
index ef409fab80f04c98ff97ef2c08ad21b495006cf2..7b45a4ba752f294a45ccaf31e35d439f016ab24a 100644 (file)
 __revision__ = "__FILE__ __REVISION__ __DATE__ __DEVELOPER__"
 
 import os
+import string
 import sys
 import TestSCons
 import TestCmd
 
+python = TestSCons.python
+
 test = TestSCons.TestSCons(match = TestCmd.match_re_dotall)
 
 test.write('SConstruct', """
@@ -41,7 +44,7 @@ env.B(target = 'foo.out', source = 'foo.in')
 
 test.write('foo.in', "foo.in\n")
 
-test.run(arguments = "foo.out", stderr = """scons: \*\*\* \[foo.out\] Exception
+expected_stderr = """scons: \*\*\* \[foo.out\] Exception
 Traceback \((most recent call|innermost) last\):
   File ".+", line \d+, in \S+
     [^\n]+
@@ -50,6 +53,74 @@ Traceback \((most recent call|innermost) last\):
   File "SConstruct", line 3, in func
     raise "func exception"
 func exception
-""", status = 2)
+"""
+
+test.run(arguments = "foo.out", stderr = expected_stderr, status = 2)
+
+test.run(arguments = "-j2 foo.out", stderr = expected_stderr, status = 2)
+
+
+# Verify that exceptions caused by exit values of builder actions are
+# correectly signalled, for both Serial and Parallel jobs.
+
+test.write('myfail.py', r"""\
+import sys
+sys.exit(1)
+""")
+
+test.write('SConstruct', """
+Fail = Builder(action = r'%s myfail.py $TARGETS $SOURCE')
+env = Environment(BUILDERS = { 'Fail' : Fail })
+env.Fail(target = 'f1', source = 'f1.in')
+""" % (python))
+
+test.write('f1.in', "f1.in\n")
+
+expected_stderr = "scons: \*\*\* \[f1\] Error 1\n"
+
+test.run(arguments = '.', status = 2, stderr = expected_stderr)
+test.run(arguments = '-j2 .', status = 2, stderr = expected_stderr)
+
+
+# Verify that all exceptions from simultaneous tasks are reported,
+# even if the exception is raised during the Task.prepare()
+# [Node.prepare()]
+
+test.write('SConstruct', """
+Fail = Builder(action = r'%s myfail.py $TARGETS $SOURCE')
+env = Environment(BUILDERS = { 'Fail' : Fail })
+env.Fail(target = 'f1', source = 'f1.in')
+env.Fail(target = 'f2', source = 'f2.in')
+env.Fail(target = 'f3', source = 'f3.in')
+""" % (python))
+
+# f2.in is not created to cause a Task.prepare exception
+test.write('f3.in', 'f3.in\n')
+
+# In Serial task mode, get the first exception and stop
+test.run(arguments = '.', status = 2, stderr = expected_stderr)
+
+# In Parallel task mode, we will get all three exceptions.
+
+expected_stderr_list = [
+  expected_stderr,
+  "scons: \*\*\* Source `f2\.in' not found, needed by target `f2'\.  Stop\.\n",
+  string.replace(expected_stderr, 'f1', 'f3')
+  ]
+  
+# Unfortunately, we aren't guaranteed what order we will get the
+# exceptions in...
+orders = [ (1,2,3), (1,3,2), (2,1,3), (2,3,1), (3,1,2), (3,2,1) ]
+otexts = []
+for A,B,C in orders:
+    otexts.append("%s%s%s"%(expected_stderr_list[A-1],
+                            expected_stderr_list[B-1],
+                            expected_stderr_list[C-1]))
+                           
+                      
+expected_stderrs = "(" + string.join(otexts, "|") + ")"
+
+test.run(arguments = '-j3 .', status = 2, stderr = expected_stderrs)
+
 
 test.pass_test()