- Test enhancements in SourceCode.py and option-n.py.
+ - Fix problems with Parallel Task Exception handling.
+
From Christoph Wiedemann:
- Add an Environment.SetDefault() method that only sets values if
def get(self, block = 1):
"""Remove and return a result tuple from the results queue."""
return self.resultsQueue.get(block)
-
- def get_nowait(self):
- """Remove and result a result tuple from the results queue
- without blocking."""
- return self.get(0)
+
+ def preparation_failed(self, obj):
+ self.resultsQueue.put((obj, 0))
class Parallel:
"""This class is used to execute tasks in parallel, and is somewhat
self.taskmaster = taskmaster
self.tp = ThreadPool(num)
- self.jobs = 0
self.maxjobs = num
def start(self):
more tasks. If a task fails to execute (i.e. execute() raises
an exception), then the job will stop."""
+ jobs = 0
+
while 1:
- if self.jobs < self.maxjobs:
+
+ # There's a concern here that the while-loop test below
+ # might delay reporting status back about failed build
+ # tasks until the entire build is done if tasks execute
+ # fast enough, or self.maxjobs is big enough. It looks
+ # like that's enough of a corner case that we'll wait to
+ # see if it's an issue in practice. If so, one possible
+ # fix might be:
+ #
+ # while jobs < self.maxjobs and \
+ # self.tp.resultsQueue.empty():
+ #
+ # but that's somewhat unattractive because the
+ # resultsQueue.empty() check might introduce some
+ # significant overhead involving mutex locking.
+ while jobs < self.maxjobs:
task = self.taskmaster.next_task()
if task is None:
break
except KeyboardInterrupt:
raise
except:
- # Let the failed() callback function arrange for the
- # build to stop if that's appropriate.
- task.failed()
+ # Let the failed() callback function arrange
+ # for the build to stop if that's appropriate.
+ task.exception_set()
+ self.tp.preparation_failed(task)
+ jobs = jobs + 1
+ continue
# dispatch task
self.tp.put(task)
- self.jobs = self.jobs + 1
+ jobs = jobs + 1
- while 1:
- try:
- task, ok = self.tp.get_nowait()
- except Queue.Empty:
- if not (self.jobs is self.maxjobs or self.taskmaster.is_blocked()):
- break
- task, ok = self.tp.get()
-
- self.jobs = self.jobs - 1
- if ok:
- task.executed()
- else:
- task.failed()
-
- task.postprocess()
+ if not task and not jobs: break
+
+ task, ok = self.tp.get()
+
+ jobs = jobs - 1
+ if ok:
+ task.executed()
+ else:
+ task.failed()
+
+ task.postprocess()
self.failUnless(taskmaster.num_postprocessed >= 1,
"one or more tasks should have been postprocessed")
+#---------------------------------------------------------------------
+# Above tested Job object with contrived Task and Taskmaster objects.
+# Now test Job object with actual Task and Taskmaster objects.
+
+import SCons.Taskmaster
+import SCons.Node
+import time
+
+
+class testnode (SCons.Node.Node):
+ def __init__(self):
+ SCons.Node.Node.__init__(self)
+ self.expect_to_be = SCons.Node.executed
+
+class goodnode (testnode):
+ pass
+
+class slowgoodnode (goodnode):
+ def prepare(self):
+ # Delay to allow scheduled Jobs to run while the dispatcher
+ # sleeps. Keep this short because it affects the time taken
+ # by this test.
+ time.sleep(0.15)
+ goodnode.prepare(self)
+
+class badnode (goodnode):
+ def __init__(self):
+ goodnode.__init__(self)
+ self.expect_to_be = SCons.Node.failed
+ def build(self, **kw):
+ raise 'badnode exception'
+
+class slowbadnode (badnode):
+ def build(self, **kw):
+ # Appears to take a while to build, allowing faster builds to
+ # overlap. Time duration is not especially important, but if
+ # it is faster than slowgoodnode then these could complete
+ # while the scheduler is sleeping.
+ time.sleep(0.05)
+ raise 'slowbadnode exception'
+
+class badpreparenode (badnode):
+ def prepare(self):
+ raise 'badpreparenode exception'
+
+class _SConsTaskTest(unittest.TestCase):
+
+ def _test_seq(self, num_jobs):
+ for node_seq in [
+ [goodnode],
+ [badnode],
+ [slowbadnode],
+ [slowgoodnode],
+ [badpreparenode],
+ [goodnode, badnode],
+ [slowgoodnode, badnode],
+ [goodnode, slowbadnode],
+ [goodnode, goodnode, goodnode, slowbadnode],
+ [goodnode, slowbadnode, badpreparenode, slowgoodnode],
+ [goodnode, slowbadnode, slowgoodnode, badnode]
+ ]:
+
+ self._do_test(num_jobs, node_seq)
+
+ def _do_test(self, num_jobs, node_seq):
+
+ testnodes = []
+ for tnum in range(num_tasks):
+ testnodes.append(node_seq[tnum % len(node_seq)]())
+
+ taskmaster = SCons.Taskmaster.Taskmaster(testnodes)
+ jobs = SCons.Job.Jobs(num_jobs, taskmaster)
+
+ # Exceptions thrown by tasks are not actually propagated to
+ # this level, but are instead stored in the Taskmaster.
+
+ jobs.run()
+
+ # Now figure out if tests proceeded correctly. The first test
+ # that fails will shutdown the initiation of subsequent tests,
+ # but any tests currently queued for execution will still be
+ # processed, and any tests that completed before the failure
+ # would have resulted in new tests being queued for execution.
+
+ # Apply the following operational heuristics of Job.py:
+ # 0) An initial jobset of tasks will be queued before any
+ # good/bad results are obtained (from "execute" of task in
+ # thread).
+ # 1) A goodnode will complete immediately on its thread and
+ # allow another node to be queued for execution.
+ # 2) A badnode will complete immediately and suppress any
+ # subsequent execution queuing, but all currently queued
+ # tasks will still be processed.
+ # 3) A slowbadnode will fail later. It will block slots in
+ # the job queue. Nodes that complete immediately will
+ # allow other nodes to be queued in their place, and this
+ # will continue until either (#2) above or until all job
+ # slots are filled with slowbadnode entries.
+
+ # One approach to validating this test would be to try to
+ # determine exactly how many nodes executed, how many didn't,
+ # and the results of each, and then to assert failure on any
+ # mismatch (including the total number of built nodes).
+ # However, while this is possible to do for a single-processor
+ # system, it is nearly impossible to predict correctly for a
+ # multi-processor system and still test the characteristics of
+ # delayed execution nodes. Stated another way, multithreading
+ # is inherently non-deterministic unless you can completely
+ # characterize the entire system, and since that's not
+ # possible here, we shouldn't try.
+
+ # Therefore, this test will simply scan the set of nodes to
+ # see if the node was executed or not and if it was executed
+ # that it obtained the expected value for that node
+ # (i.e. verifying we don't get failure crossovers or
+ # mislabelling of results).
+
+ for N in testnodes:
+ self.failUnless(N.get_state() in [None, N.expect_to_be],
+ "node ran but got unexpected result")
+
+ self.failUnless(filter(lambda N: N.get_state(), testnodes),
+ "no nodes ran at all.")
+
+
+class SerialTaskTest(_SConsTaskTest):
+ def runTest(self):
+ "test serial jobs with actual Taskmaster and Task"
+ self._test_seq(1)
+
+
+class ParallelTaskTest(_SConsTaskTest):
+ def runTest(self):
+ "test parallel jobs with actual Taskmaster and Task"
+ self._test_seq(num_jobs)
+
+
+
+#---------------------------------------------------------------------
def suite():
suite = unittest.TestSuite()
suite.addTest(NoParallelTestCase())
suite.addTest(SerialExceptionTestCase())
suite.addTest(ParallelExceptionTestCase())
+ suite.addTest(SerialTaskTest())
+ suite.addTest(ParallelTaskTest())
return suite
if __name__ == "__main__":
self.targets = targets
self.top = top
self.node = node
+ self.exc_clear()
def display(self, message):
"""Allow the calling interface to display a message
# Now that it's the appropriate time, give the TaskMaster a
# chance to raise any exceptions it encountered while preparing
# this task.
- self.tm.exception_raise()
+ self.exception_raise()
if self.tm.message:
self.display(self.tm.message)
t.postprocess()
def exc_info(self):
- return self.tm.exception
+ return self.exception
def exc_clear(self):
- self.tm.exception_clear()
+ self.exception = (None, None, None)
+ self.exception_raise = self._no_exception_to_raise
- def exception_set(self):
- self.tm.exception_set()
+ def exception_set(self, exception=None):
+ if not exception:
+ exception = sys.exc_info()
+ self.exception = exception
+ self.exception_raise = self._exception_raise
+ def _no_exception_to_raise(self):
+ pass
+
+ def _exception_raise(self):
+ """Raise a pending exception that was recorded while
+ getting a Task ready for execution."""
+ self.tm.exception_raise(self.exc_info())
def order(dependencies):
self.tasker = tasker
self.ready = None # the next task that is ready to be executed
self.order = order
- self.exception_clear()
self.message = None
def _find_next_ready_node(self):
if self.ready:
return
+ self.ready_exc = None
+
while self.candidates:
node = self.candidates[-1]
state = node.get_state()
except SystemExit:
exc_value = sys.exc_info()[1]
e = SCons.Errors.ExplicitExit(node, exc_value.code)
- self.exception_set((SCons.Errors.ExplicitExit, e))
+ self.ready_exc = (SCons.Errors.ExplicitExit, e)
self.candidates.pop()
self.ready = node
break
# children (like a child couldn't be linked in to a
# BuildDir, or a Scanner threw something). Arrange to
# raise the exception when the Task is "executed."
- self.exception_set()
+ self.ready_exc = sys.exc_info()
self.candidates.pop()
self.ready = node
break
# the kids are derived (like a child couldn't be linked
# from a repository). Arrange to raise the exception
# when the Task is "executed."
- self.exception_set()
+ self.ready_exc = sys.exc_info()
self.candidates.pop()
self.ready = node
break
# a child couldn't be linked in to a BuildDir when deciding
# whether this node is current). Arrange to raise the
# exception when the Task is "executed."
- self.exception_set()
+ self.ready_exc = sys.exc_info()
+
+ if self.ready_exc:
+ task.exception_set(self.ready_exc)
+
self.ready = None
+ self.ready_exc = None
return task
self.candidates.extend(self.pending)
self.pending = []
- def exception_set(self, exception=None):
- if exception is None:
- exception = sys.exc_info()
- self.exception = exception
- self.exception_raise = self._exception_raise
-
- def exception_clear(self):
- self.exception = (None, None, None)
- self.exception_raise = self._no_exception_to_raise
-
- def _no_exception_to_raise(self):
- pass
-
- def _exception_raise(self):
- """Raise a pending exception that was recorded while
- getting a Task ready for execution."""
- exc_type, exc_value = self.exception[:2]
+ def exception_raise(self, exception):
+ exc_type, exc_value = exception[:2]
raise exc_type, exc_value
n1 = Node("n1")
tm = SCons.Taskmaster.Taskmaster(targets = [n1], tasker = MyTask)
t = tm.next_task()
- exc_type, exc_value, exc_tb = tm.exception
+ exc_type, exc_value, exc_tb = t.exception
assert exc_type == MyException, repr(exc_type)
assert str(exc_value) == "from make_ready()", exc_value
n1 = StopNode("n1")
tm = SCons.Taskmaster.Taskmaster([n1])
t = tm.next_task()
- exc_type, exc_value, exc_tb = tm.exception
+ exc_type, exc_value, exc_tb = t.exception
assert exc_type == SCons.Errors.StopError, repr(exc_type)
assert str(exc_value) == "stop!", exc_value
n2 = ExitNode("n2")
tm = SCons.Taskmaster.Taskmaster([n2])
t = tm.next_task()
- exc_type, exc_value = tm.exception
+ exc_type, exc_value = t.exception
assert exc_type == SCons.Errors.ExplicitExit, repr(exc_type)
assert exc_value.node == n2, exc_value.node
assert exc_value.status == 77, exc_value.status
built_text = None
n5 = Node("n5")
tm = SCons.Taskmaster.Taskmaster([n5])
- tm.exception_set((MyException, "exception value"))
t = tm.next_task()
+ t.exception_set((MyException, "exception value"))
exc_caught = None
try:
t.prepare()
"""
n1 = Node("n1")
tm = SCons.Taskmaster.Taskmaster([n1])
+ t = tm.next_task()
- tm.exception_set((1, 2))
- exc_type, exc_value = tm.exception
+ t.exception_set((1, 2))
+ exc_type, exc_value = t.exception
assert exc_type == 1, exc_type
assert exc_value == 2, exc_value
- tm.exception_set(3)
- assert tm.exception == 3
+ t.exception_set(3)
+ assert t.exception == 3
try: 1/0
except: pass
- tm.exception_set(None)
- exc_type, exc_value, exc_tb = tm.exception
+ t.exception_set(None)
+ exc_type, exc_value, exc_tb = t.exception
assert exc_type is ZeroDivisionError, exc_type
exception_values = [
"integer division or modulo",
]
assert str(exc_value) in exception_values, exc_value
- tm.exception_set(("exception 1", None))
+ t.exception_set(("exception 1", None))
try:
- tm.exception_raise()
+ t.exception_raise()
except:
exc_type, exc_value = sys.exc_info()[:2]
assert exc_type == "exception 1", exc_type
else:
assert 0, "did not catch expected exception"
- tm.exception_set(("exception 2", "xyzzy"))
+ t.exception_set(("exception 2", "xyzzy"))
try:
- tm.exception_raise()
+ t.exception_raise()
except:
exc_type, exc_value = sys.exc_info()[:2]
assert exc_type == "exception 2", exc_type
else:
assert 0, "did not catch expected exception"
+ t.exception_set(("exception 3", "XYZZY"))
+ def fw_exc(exc):
+ raise 'exception_forwarded', exc
+ tm.exception_raise = fw_exc
+ try:
+ t.exception_raise()
+ except:
+ exc_type, exc_value = sys.exc_info()[:2]
+ assert exc_type == 'exception_forwarded', exc_type
+ assert exc_value[0] == "exception 3", exc_value[0]
+ assert exc_value[1] == "XYZZY", exc_value[1]
+ else:
+ assert 0, "did not catch expected exception"
+
def test_postprocess(self):
"""Test postprocessing targets to give them a chance to clean up
__revision__ = "__FILE__ __REVISION__ __DATE__ __DEVELOPER__"
import os
+import string
import sys
import TestSCons
import TestCmd
+python = TestSCons.python
+
test = TestSCons.TestSCons(match = TestCmd.match_re_dotall)
test.write('SConstruct', """
test.write('foo.in', "foo.in\n")
-test.run(arguments = "foo.out", stderr = """scons: \*\*\* \[foo.out\] Exception
+expected_stderr = """scons: \*\*\* \[foo.out\] Exception
Traceback \((most recent call|innermost) last\):
File ".+", line \d+, in \S+
[^\n]+
File "SConstruct", line 3, in func
raise "func exception"
func exception
-""", status = 2)
+"""
+
+test.run(arguments = "foo.out", stderr = expected_stderr, status = 2)
+
+test.run(arguments = "-j2 foo.out", stderr = expected_stderr, status = 2)
+
+
+# Verify that exceptions caused by exit values of builder actions are
+# correectly signalled, for both Serial and Parallel jobs.
+
+test.write('myfail.py', r"""\
+import sys
+sys.exit(1)
+""")
+
+test.write('SConstruct', """
+Fail = Builder(action = r'%s myfail.py $TARGETS $SOURCE')
+env = Environment(BUILDERS = { 'Fail' : Fail })
+env.Fail(target = 'f1', source = 'f1.in')
+""" % (python))
+
+test.write('f1.in', "f1.in\n")
+
+expected_stderr = "scons: \*\*\* \[f1\] Error 1\n"
+
+test.run(arguments = '.', status = 2, stderr = expected_stderr)
+test.run(arguments = '-j2 .', status = 2, stderr = expected_stderr)
+
+
+# Verify that all exceptions from simultaneous tasks are reported,
+# even if the exception is raised during the Task.prepare()
+# [Node.prepare()]
+
+test.write('SConstruct', """
+Fail = Builder(action = r'%s myfail.py $TARGETS $SOURCE')
+env = Environment(BUILDERS = { 'Fail' : Fail })
+env.Fail(target = 'f1', source = 'f1.in')
+env.Fail(target = 'f2', source = 'f2.in')
+env.Fail(target = 'f3', source = 'f3.in')
+""" % (python))
+
+# f2.in is not created to cause a Task.prepare exception
+test.write('f3.in', 'f3.in\n')
+
+# In Serial task mode, get the first exception and stop
+test.run(arguments = '.', status = 2, stderr = expected_stderr)
+
+# In Parallel task mode, we will get all three exceptions.
+
+expected_stderr_list = [
+ expected_stderr,
+ "scons: \*\*\* Source `f2\.in' not found, needed by target `f2'\. Stop\.\n",
+ string.replace(expected_stderr, 'f1', 'f3')
+ ]
+
+# Unfortunately, we aren't guaranteed what order we will get the
+# exceptions in...
+orders = [ (1,2,3), (1,3,2), (2,1,3), (2,3,1), (3,1,2), (3,2,1) ]
+otexts = []
+for A,B,C in orders:
+ otexts.append("%s%s%s"%(expected_stderr_list[A-1],
+ expected_stderr_list[B-1],
+ expected_stderr_list[C-1]))
+
+
+expected_stderrs = "(" + string.join(otexts, "|") + ")"
+
+test.run(arguments = '-j3 .', status = 2, stderr = expected_stderrs)
+
test.pass_test()