From: stevenknight Date: Fri, 17 Sep 2004 12:52:09 +0000 (+0000) Subject: Fix problems with Parallel Tasks and Exception handling. (Kevin Quick) X-Git-Url: http://git.tremily.us/?a=commitdiff_plain;h=9e45571957b079adec1e18a59b7e101ebc8b1741;p=scons.git Fix problems with Parallel Tasks and Exception handling. (Kevin Quick) git-svn-id: http://scons.tigris.org/svn/scons/trunk@1079 fdb21ef1-2011-0410-befe-b5e4ea1792b1 --- diff --git a/src/CHANGES.txt b/src/CHANGES.txt index 7a6ef028..6ee412c1 100644 --- a/src/CHANGES.txt +++ b/src/CHANGES.txt @@ -100,6 +100,8 @@ RELEASE 0.97 - XXX - Test enhancements in SourceCode.py and option-n.py. + - Fix problems with Parallel Task Exception handling. + From Christoph Wiedemann: - Add an Environment.SetDefault() method that only sets values if diff --git a/src/engine/SCons/Job.py b/src/engine/SCons/Job.py index bbe0a2da..2d4dbf8f 100644 --- a/src/engine/SCons/Job.py +++ b/src/engine/SCons/Job.py @@ -179,11 +179,9 @@ else: def get(self, block = 1): """Remove and return a result tuple from the results queue.""" return self.resultsQueue.get(block) - - def get_nowait(self): - """Remove and result a result tuple from the results queue - without blocking.""" - return self.get(0) + + def preparation_failed(self, obj): + self.resultsQueue.put((obj, 0)) class Parallel: """This class is used to execute tasks in parallel, and is somewhat @@ -213,7 +211,6 @@ else: self.taskmaster = taskmaster self.tp = ThreadPool(num) - self.jobs = 0 self.maxjobs = num def start(self): @@ -222,8 +219,25 @@ else: more tasks. If a task fails to execute (i.e. execute() raises an exception), then the job will stop.""" + jobs = 0 + while 1: - if self.jobs < self.maxjobs: + + # There's a concern here that the while-loop test below + # might delay reporting status back about failed build + # tasks until the entire build is done if tasks execute + # fast enough, or self.maxjobs is big enough. It looks + # like that's enough of a corner case that we'll wait to + # see if it's an issue in practice. If so, one possible + # fix might be: + # + # while jobs < self.maxjobs and \ + # self.tp.resultsQueue.empty(): + # + # but that's somewhat unattractive because the + # resultsQueue.empty() check might introduce some + # significant overhead involving mutex locking. + while jobs < self.maxjobs: task = self.taskmaster.next_task() if task is None: break @@ -234,26 +248,25 @@ else: except KeyboardInterrupt: raise except: - # Let the failed() callback function arrange for the - # build to stop if that's appropriate. - task.failed() + # Let the failed() callback function arrange + # for the build to stop if that's appropriate. + task.exception_set() + self.tp.preparation_failed(task) + jobs = jobs + 1 + continue # dispatch task self.tp.put(task) - self.jobs = self.jobs + 1 + jobs = jobs + 1 - while 1: - try: - task, ok = self.tp.get_nowait() - except Queue.Empty: - if not (self.jobs is self.maxjobs or self.taskmaster.is_blocked()): - break - task, ok = self.tp.get() - - self.jobs = self.jobs - 1 - if ok: - task.executed() - else: - task.failed() - - task.postprocess() + if not task and not jobs: break + + task, ok = self.tp.get() + + jobs = jobs - 1 + if ok: + task.executed() + else: + task.failed() + + task.postprocess() diff --git a/src/engine/SCons/JobTests.py b/src/engine/SCons/JobTests.py index d216464b..7dc86580 100644 --- a/src/engine/SCons/JobTests.py +++ b/src/engine/SCons/JobTests.py @@ -307,6 +307,145 @@ class ParallelExceptionTestCase(unittest.TestCase): self.failUnless(taskmaster.num_postprocessed >= 1, "one or more tasks should have been postprocessed") +#--------------------------------------------------------------------- +# Above tested Job object with contrived Task and Taskmaster objects. +# Now test Job object with actual Task and Taskmaster objects. + +import SCons.Taskmaster +import SCons.Node +import time + + +class testnode (SCons.Node.Node): + def __init__(self): + SCons.Node.Node.__init__(self) + self.expect_to_be = SCons.Node.executed + +class goodnode (testnode): + pass + +class slowgoodnode (goodnode): + def prepare(self): + # Delay to allow scheduled Jobs to run while the dispatcher + # sleeps. Keep this short because it affects the time taken + # by this test. + time.sleep(0.15) + goodnode.prepare(self) + +class badnode (goodnode): + def __init__(self): + goodnode.__init__(self) + self.expect_to_be = SCons.Node.failed + def build(self, **kw): + raise 'badnode exception' + +class slowbadnode (badnode): + def build(self, **kw): + # Appears to take a while to build, allowing faster builds to + # overlap. Time duration is not especially important, but if + # it is faster than slowgoodnode then these could complete + # while the scheduler is sleeping. + time.sleep(0.05) + raise 'slowbadnode exception' + +class badpreparenode (badnode): + def prepare(self): + raise 'badpreparenode exception' + +class _SConsTaskTest(unittest.TestCase): + + def _test_seq(self, num_jobs): + for node_seq in [ + [goodnode], + [badnode], + [slowbadnode], + [slowgoodnode], + [badpreparenode], + [goodnode, badnode], + [slowgoodnode, badnode], + [goodnode, slowbadnode], + [goodnode, goodnode, goodnode, slowbadnode], + [goodnode, slowbadnode, badpreparenode, slowgoodnode], + [goodnode, slowbadnode, slowgoodnode, badnode] + ]: + + self._do_test(num_jobs, node_seq) + + def _do_test(self, num_jobs, node_seq): + + testnodes = [] + for tnum in range(num_tasks): + testnodes.append(node_seq[tnum % len(node_seq)]()) + + taskmaster = SCons.Taskmaster.Taskmaster(testnodes) + jobs = SCons.Job.Jobs(num_jobs, taskmaster) + + # Exceptions thrown by tasks are not actually propagated to + # this level, but are instead stored in the Taskmaster. + + jobs.run() + + # Now figure out if tests proceeded correctly. The first test + # that fails will shutdown the initiation of subsequent tests, + # but any tests currently queued for execution will still be + # processed, and any tests that completed before the failure + # would have resulted in new tests being queued for execution. + + # Apply the following operational heuristics of Job.py: + # 0) An initial jobset of tasks will be queued before any + # good/bad results are obtained (from "execute" of task in + # thread). + # 1) A goodnode will complete immediately on its thread and + # allow another node to be queued for execution. + # 2) A badnode will complete immediately and suppress any + # subsequent execution queuing, but all currently queued + # tasks will still be processed. + # 3) A slowbadnode will fail later. It will block slots in + # the job queue. Nodes that complete immediately will + # allow other nodes to be queued in their place, and this + # will continue until either (#2) above or until all job + # slots are filled with slowbadnode entries. + + # One approach to validating this test would be to try to + # determine exactly how many nodes executed, how many didn't, + # and the results of each, and then to assert failure on any + # mismatch (including the total number of built nodes). + # However, while this is possible to do for a single-processor + # system, it is nearly impossible to predict correctly for a + # multi-processor system and still test the characteristics of + # delayed execution nodes. Stated another way, multithreading + # is inherently non-deterministic unless you can completely + # characterize the entire system, and since that's not + # possible here, we shouldn't try. + + # Therefore, this test will simply scan the set of nodes to + # see if the node was executed or not and if it was executed + # that it obtained the expected value for that node + # (i.e. verifying we don't get failure crossovers or + # mislabelling of results). + + for N in testnodes: + self.failUnless(N.get_state() in [None, N.expect_to_be], + "node ran but got unexpected result") + + self.failUnless(filter(lambda N: N.get_state(), testnodes), + "no nodes ran at all.") + + +class SerialTaskTest(_SConsTaskTest): + def runTest(self): + "test serial jobs with actual Taskmaster and Task" + self._test_seq(1) + + +class ParallelTaskTest(_SConsTaskTest): + def runTest(self): + "test parallel jobs with actual Taskmaster and Task" + self._test_seq(num_jobs) + + + +#--------------------------------------------------------------------- def suite(): suite = unittest.TestSuite() @@ -315,6 +454,8 @@ def suite(): suite.addTest(NoParallelTestCase()) suite.addTest(SerialExceptionTestCase()) suite.addTest(ParallelExceptionTestCase()) + suite.addTest(SerialTaskTest()) + suite.addTest(ParallelTaskTest()) return suite if __name__ == "__main__": diff --git a/src/engine/SCons/Taskmaster.py b/src/engine/SCons/Taskmaster.py index 8f9839e6..dab50264 100644 --- a/src/engine/SCons/Taskmaster.py +++ b/src/engine/SCons/Taskmaster.py @@ -58,6 +58,7 @@ class Task: self.targets = targets self.top = top self.node = node + self.exc_clear() def display(self, message): """Allow the calling interface to display a message @@ -73,7 +74,7 @@ class Task: # Now that it's the appropriate time, give the TaskMaster a # chance to raise any exceptions it encountered while preparing # this task. - self.tm.exception_raise() + self.exception_raise() if self.tm.message: self.display(self.tm.message) @@ -209,14 +210,25 @@ class Task: t.postprocess() def exc_info(self): - return self.tm.exception + return self.exception def exc_clear(self): - self.tm.exception_clear() + self.exception = (None, None, None) + self.exception_raise = self._no_exception_to_raise - def exception_set(self): - self.tm.exception_set() + def exception_set(self, exception=None): + if not exception: + exception = sys.exc_info() + self.exception = exception + self.exception_raise = self._exception_raise + def _no_exception_to_raise(self): + pass + + def _exception_raise(self): + """Raise a pending exception that was recorded while + getting a Task ready for execution.""" + self.tm.exception_raise(self.exc_info()) def order(dependencies): @@ -240,7 +252,6 @@ class Taskmaster: self.tasker = tasker self.ready = None # the next task that is ready to be executed self.order = order - self.exception_clear() self.message = None def _find_next_ready_node(self): @@ -249,6 +260,8 @@ class Taskmaster: if self.ready: return + self.ready_exc = None + while self.candidates: node = self.candidates[-1] state = node.get_state() @@ -266,7 +279,7 @@ class Taskmaster: except SystemExit: exc_value = sys.exc_info()[1] e = SCons.Errors.ExplicitExit(node, exc_value.code) - self.exception_set((SCons.Errors.ExplicitExit, e)) + self.ready_exc = (SCons.Errors.ExplicitExit, e) self.candidates.pop() self.ready = node break @@ -277,7 +290,7 @@ class Taskmaster: # children (like a child couldn't be linked in to a # BuildDir, or a Scanner threw something). Arrange to # raise the exception when the Task is "executed." - self.exception_set() + self.ready_exc = sys.exc_info() self.candidates.pop() self.ready = node break @@ -313,7 +326,7 @@ class Taskmaster: # the kids are derived (like a child couldn't be linked # from a repository). Arrange to raise the exception # when the Task is "executed." - self.exception_set() + self.ready_exc = sys.exc_info() self.candidates.pop() self.ready = node break @@ -397,8 +410,13 @@ class Taskmaster: # a child couldn't be linked in to a BuildDir when deciding # whether this node is current). Arrange to raise the # exception when the Task is "executed." - self.exception_set() + self.ready_exc = sys.exc_info() + + if self.ready_exc: + task.exception_set(self.ready_exc) + self.ready = None + self.ready_exc = None return task @@ -442,21 +460,6 @@ class Taskmaster: self.candidates.extend(self.pending) self.pending = [] - def exception_set(self, exception=None): - if exception is None: - exception = sys.exc_info() - self.exception = exception - self.exception_raise = self._exception_raise - - def exception_clear(self): - self.exception = (None, None, None) - self.exception_raise = self._no_exception_to_raise - - def _no_exception_to_raise(self): - pass - - def _exception_raise(self): - """Raise a pending exception that was recorded while - getting a Task ready for execution.""" - exc_type, exc_value = self.exception[:2] + def exception_raise(self, exception): + exc_type, exc_value = exception[:2] raise exc_type, exc_value diff --git a/src/engine/SCons/TaskmasterTests.py b/src/engine/SCons/TaskmasterTests.py index ef7f51af..8865e09b 100644 --- a/src/engine/SCons/TaskmasterTests.py +++ b/src/engine/SCons/TaskmasterTests.py @@ -478,7 +478,7 @@ class TaskmasterTestCase(unittest.TestCase): n1 = Node("n1") tm = SCons.Taskmaster.Taskmaster(targets = [n1], tasker = MyTask) t = tm.next_task() - exc_type, exc_value, exc_tb = tm.exception + exc_type, exc_value, exc_tb = t.exception assert exc_type == MyException, repr(exc_type) assert str(exc_value) == "from make_ready()", exc_value @@ -557,14 +557,14 @@ class TaskmasterTestCase(unittest.TestCase): n1 = StopNode("n1") tm = SCons.Taskmaster.Taskmaster([n1]) t = tm.next_task() - exc_type, exc_value, exc_tb = tm.exception + exc_type, exc_value, exc_tb = t.exception assert exc_type == SCons.Errors.StopError, repr(exc_type) assert str(exc_value) == "stop!", exc_value n2 = ExitNode("n2") tm = SCons.Taskmaster.Taskmaster([n2]) t = tm.next_task() - exc_type, exc_value = tm.exception + exc_type, exc_value = t.exception assert exc_type == SCons.Errors.ExplicitExit, repr(exc_type) assert exc_value.node == n2, exc_value.node assert exc_value.status == 77, exc_value.status @@ -741,8 +741,8 @@ class TaskmasterTestCase(unittest.TestCase): built_text = None n5 = Node("n5") tm = SCons.Taskmaster.Taskmaster([n5]) - tm.exception_set((MyException, "exception value")) t = tm.next_task() + t.exception_set((MyException, "exception value")) exc_caught = None try: t.prepare() @@ -880,19 +880,20 @@ class TaskmasterTestCase(unittest.TestCase): """ n1 = Node("n1") tm = SCons.Taskmaster.Taskmaster([n1]) + t = tm.next_task() - tm.exception_set((1, 2)) - exc_type, exc_value = tm.exception + t.exception_set((1, 2)) + exc_type, exc_value = t.exception assert exc_type == 1, exc_type assert exc_value == 2, exc_value - tm.exception_set(3) - assert tm.exception == 3 + t.exception_set(3) + assert t.exception == 3 try: 1/0 except: pass - tm.exception_set(None) - exc_type, exc_value, exc_tb = tm.exception + t.exception_set(None) + exc_type, exc_value, exc_tb = t.exception assert exc_type is ZeroDivisionError, exc_type exception_values = [ "integer division or modulo", @@ -900,9 +901,9 @@ class TaskmasterTestCase(unittest.TestCase): ] assert str(exc_value) in exception_values, exc_value - tm.exception_set(("exception 1", None)) + t.exception_set(("exception 1", None)) try: - tm.exception_raise() + t.exception_raise() except: exc_type, exc_value = sys.exc_info()[:2] assert exc_type == "exception 1", exc_type @@ -910,9 +911,9 @@ class TaskmasterTestCase(unittest.TestCase): else: assert 0, "did not catch expected exception" - tm.exception_set(("exception 2", "xyzzy")) + t.exception_set(("exception 2", "xyzzy")) try: - tm.exception_raise() + t.exception_raise() except: exc_type, exc_value = sys.exc_info()[:2] assert exc_type == "exception 2", exc_type @@ -920,6 +921,20 @@ class TaskmasterTestCase(unittest.TestCase): else: assert 0, "did not catch expected exception" + t.exception_set(("exception 3", "XYZZY")) + def fw_exc(exc): + raise 'exception_forwarded', exc + tm.exception_raise = fw_exc + try: + t.exception_raise() + except: + exc_type, exc_value = sys.exc_info()[:2] + assert exc_type == 'exception_forwarded', exc_type + assert exc_value[0] == "exception 3", exc_value[0] + assert exc_value[1] == "XYZZY", exc_value[1] + else: + assert 0, "did not catch expected exception" + def test_postprocess(self): """Test postprocessing targets to give them a chance to clean up diff --git a/test/exceptions.py b/test/exceptions.py index ef409fab..7b45a4ba 100644 --- a/test/exceptions.py +++ b/test/exceptions.py @@ -25,10 +25,13 @@ __revision__ = "__FILE__ __REVISION__ __DATE__ __DEVELOPER__" import os +import string import sys import TestSCons import TestCmd +python = TestSCons.python + test = TestSCons.TestSCons(match = TestCmd.match_re_dotall) test.write('SConstruct', """ @@ -41,7 +44,7 @@ env.B(target = 'foo.out', source = 'foo.in') test.write('foo.in', "foo.in\n") -test.run(arguments = "foo.out", stderr = """scons: \*\*\* \[foo.out\] Exception +expected_stderr = """scons: \*\*\* \[foo.out\] Exception Traceback \((most recent call|innermost) last\): File ".+", line \d+, in \S+ [^\n]+ @@ -50,6 +53,74 @@ Traceback \((most recent call|innermost) last\): File "SConstruct", line 3, in func raise "func exception" func exception -""", status = 2) +""" + +test.run(arguments = "foo.out", stderr = expected_stderr, status = 2) + +test.run(arguments = "-j2 foo.out", stderr = expected_stderr, status = 2) + + +# Verify that exceptions caused by exit values of builder actions are +# correectly signalled, for both Serial and Parallel jobs. + +test.write('myfail.py', r"""\ +import sys +sys.exit(1) +""") + +test.write('SConstruct', """ +Fail = Builder(action = r'%s myfail.py $TARGETS $SOURCE') +env = Environment(BUILDERS = { 'Fail' : Fail }) +env.Fail(target = 'f1', source = 'f1.in') +""" % (python)) + +test.write('f1.in', "f1.in\n") + +expected_stderr = "scons: \*\*\* \[f1\] Error 1\n" + +test.run(arguments = '.', status = 2, stderr = expected_stderr) +test.run(arguments = '-j2 .', status = 2, stderr = expected_stderr) + + +# Verify that all exceptions from simultaneous tasks are reported, +# even if the exception is raised during the Task.prepare() +# [Node.prepare()] + +test.write('SConstruct', """ +Fail = Builder(action = r'%s myfail.py $TARGETS $SOURCE') +env = Environment(BUILDERS = { 'Fail' : Fail }) +env.Fail(target = 'f1', source = 'f1.in') +env.Fail(target = 'f2', source = 'f2.in') +env.Fail(target = 'f3', source = 'f3.in') +""" % (python)) + +# f2.in is not created to cause a Task.prepare exception +test.write('f3.in', 'f3.in\n') + +# In Serial task mode, get the first exception and stop +test.run(arguments = '.', status = 2, stderr = expected_stderr) + +# In Parallel task mode, we will get all three exceptions. + +expected_stderr_list = [ + expected_stderr, + "scons: \*\*\* Source `f2\.in' not found, needed by target `f2'\. Stop\.\n", + string.replace(expected_stderr, 'f1', 'f3') + ] + +# Unfortunately, we aren't guaranteed what order we will get the +# exceptions in... +orders = [ (1,2,3), (1,3,2), (2,1,3), (2,3,1), (3,1,2), (3,2,1) ] +otexts = [] +for A,B,C in orders: + otexts.append("%s%s%s"%(expected_stderr_list[A-1], + expected_stderr_list[B-1], + expected_stderr_list[C-1])) + + +expected_stderrs = "(" + string.join(otexts, "|") + ")" + +test.run(arguments = '-j3 .', status = 2, stderr = expected_stderrs) + test.pass_test()