self.cached = 0
self.scanned = 0
self.scanner = None
- self.builder = Node.build
+ self.targets = [self]
+ self.prerequisites = []
+ class Builder:
+ def targets(self, node):
+ return node.targets
+ self.builder = Builder()
self.bsig = None
self.csig = None
- self.state = None
+ self.state = SCons.Node.no_state
self.prepared = None
- self.waiting_parents = []
+ self.ref_count = 0
+ self.waiting_parents = set()
+ self.waiting_s_e = set()
self.side_effect = 0
self.side_effects = []
self.alttargets = []
self.postprocessed = None
+ self._bsig_val = None
+ self._current_val = 0
+ self.always_build = None
+
+ def disambiguate(self):
+ return self
+
+ def push_to_cache(self):
+ pass
def retrieve_from_cache(self):
global cache_text
cache_text.append(self.name + " retrieved")
return self.cached
+ def make_ready(self):
+ pass
+
+ def prepare(self):
+ self.prepared = 1
+
def build(self):
global built_text
built_text = self.name + " built"
+ def built(self):
+ global built_text
+ if not self.cached:
+ built_text = built_text + " really"
+
def has_builder(self):
return not self.builder is None
def alter_targets(self):
return self.alttargets, None
- def built(self):
- global built_text
- built_text = built_text + " really"
-
def visited(self):
global visited_nodes
visited_nodes.append(self.name)
- def prepare(self):
- self.prepared = 1
-
def children(self):
if not self.scanned:
self.scan()
def scanner_key(self):
return self.name
-
+
def add_to_waiting_parents(self, node):
- self.waiting_parents.append(node)
-
- def call_for_all_waiting_parents(self, func):
- func(self)
- for parent in self.waiting_parents:
- parent.call_for_all_waiting_parents(func)
+ wp = self.waiting_parents
+ if node in wp:
+ return 0
+ wp.add(node)
+ return 1
def get_state(self):
return self.state
def store_bsig(self):
pass
- def current(self, calc):
- return calc.current(self, calc.bsig(self))
+ def is_pseudo_derived(self):
+ pass
+
+ def is_up_to_date(self):
+ return self._current_val
def depends_on(self, nodes):
for node in nodes:
def postprocess(self):
self.postprocessed = 1
-
+ self.waiting_parents = set()
+
+ def get_executor(self):
+ if not hasattr(self, 'executor'):
+ class Executor:
+ def prepare(self):
+ pass
+ def get_action_targets(self):
+ return self.targets
+ def get_all_targets(self):
+ return self.targets
+ def get_all_children(self):
+ result = []
+ for node in self.targets:
+ result.extend(node.children())
+ return result
+ def get_all_prerequisites(self):
+ return []
+ def get_action_side_effects(self):
+ return []
+ self.executor = Executor()
+ self.executor.targets = self.targets
+ return self.executor
class OtherError(Exception):
pass
"""Test fetching the next task
"""
global built_text
-
+
n1 = Node("n1")
tm = SCons.Taskmaster.Taskmaster([n1, n1])
t = tm.next_task()
t.prepare()
t.execute()
t = tm.next_task()
- assert t == None
+ assert t is None
n1 = Node("n1")
n2 = Node("n2")
n3 = Node("n3", [n1, n2])
-
+
tm = SCons.Taskmaster.Taskmaster([n3])
t = tm.next_task()
t.execute()
assert built_text == "n1 built", built_text
t.executed()
+ t.postprocess()
t = tm.next_task()
t.prepare()
t.execute()
assert built_text == "n2 built", built_text
t.executed()
+ t.postprocess()
t = tm.next_task()
t.prepare()
t.execute()
assert built_text == "n3 built", built_text
t.executed()
+ t.postprocess()
- assert tm.next_task() == None
+ assert tm.next_task() is None
built_text = "up to date: "
top_node = n3
- class MyCalc(SCons.Taskmaster.Calc):
- def current(self, node, sig):
- return 1
-
class MyTask(SCons.Taskmaster.Task):
def execute(self):
global built_text
else:
self.targets[0].build()
- n1.set_state(None)
- n2.set_state(None)
- n3.set_state(None)
- tm = SCons.Taskmaster.Taskmaster(targets = [n3],
- tasker = MyTask, calc = MyCalc())
+ n1.set_state(SCons.Node.no_state)
+ n1._current_val = 1
+ n2.set_state(SCons.Node.no_state)
+ n2._current_val = 1
+ n3.set_state(SCons.Node.no_state)
+ n3._current_val = 1
+ tm = SCons.Taskmaster.Taskmaster(targets = [n3], tasker = MyTask)
t = tm.next_task()
t.prepare()
t.execute()
assert built_text == "n1 up-to-date", built_text
t.executed()
+ t.postprocess()
t = tm.next_task()
t.prepare()
t.execute()
assert built_text == "n2 up-to-date", built_text
t.executed()
+ t.postprocess()
t = tm.next_task()
t.prepare()
t.execute()
assert built_text == "n3 up-to-date top", built_text
t.executed()
+ t.postprocess()
- assert tm.next_task() == None
+ assert tm.next_task() is None
n1 = Node("n1")
n5 = Node("n5", [n3, n4])
tm = SCons.Taskmaster.Taskmaster([n5])
- assert not tm.is_blocked()
-
t1 = tm.next_task()
assert t1.get_target() == n1
- assert not tm.is_blocked()
-
+
t2 = tm.next_task()
assert t2.get_target() == n2
- assert not tm.is_blocked()
t4 = tm.next_task()
assert t4.get_target() == n4
- assert tm.is_blocked()
t4.executed()
- assert tm.is_blocked()
-
+ t4.postprocess()
+
t1.executed()
- assert tm.is_blocked()
+ t1.postprocess()
t2.executed()
- assert not tm.is_blocked()
+ t2.postprocess()
t3 = tm.next_task()
assert t3.get_target() == n3
- assert tm.is_blocked()
t3.executed()
- assert not tm.is_blocked()
+ t3.postprocess()
t5 = tm.next_task()
assert t5.get_target() == n5, t5.get_target()
- assert tm.is_blocked() # still executing t5
t5.executed()
- assert not tm.is_blocked()
+ t5.postprocess()
+
+ assert tm.next_task() is None
- assert tm.next_task() == None
-
n4 = Node("n4")
n4.set_state(SCons.Node.executed)
tm = SCons.Taskmaster.Taskmaster([n4])
- assert tm.next_task() == None
+ assert tm.next_task() is None
n1 = Node("n1")
n2 = Node("n2", [n1])
tm = SCons.Taskmaster.Taskmaster([n2,n2])
t = tm.next_task()
- assert tm.is_blocked()
t.executed()
- assert not tm.is_blocked()
+ t.postprocess()
t = tm.next_task()
- assert tm.next_task() == None
+ assert tm.next_task() is None
n1 = Node("n1")
target = t.get_target()
assert target == n1, target
t.executed()
+ t.postprocess()
t = tm.next_task()
target = t.get_target()
assert target == n2, target
t.executed()
+ t.postprocess()
t = tm.next_task()
target = t.get_target()
assert target == n3, target
t.executed()
- assert tm.next_task() == None
+ t.postprocess()
+ assert tm.next_task() is None
n1 = Node("n1")
n2 = Node("n2")
t = tm.next_task()
assert t.get_target() == n1
t.executed()
+ t.postprocess()
t = tm.next_task()
assert t.get_target() == n2
t.executed()
+ t.postprocess()
t = tm.next_task()
assert t.get_target() == n3
t.executed()
+ t.postprocess()
t = tm.next_task()
assert t.get_target() == n4
t.executed()
- assert tm.next_task() == None
+ t.postprocess()
+ assert tm.next_task() is None
assert scan_called == 4, scan_called
tm = SCons.Taskmaster.Taskmaster([n5])
t = tm.next_task()
assert t.get_target() == n5, t.get_target()
t.executed()
- assert tm.next_task() == None
+ assert tm.next_task() is None
assert scan_called == 5, scan_called
n1 = Node("n1")
tm = SCons.Taskmaster.Taskmaster([n1,n2,n3,n4,n5])
t = tm.next_task()
assert t.get_target() == n1
- assert n4.state == SCons.Node.executing
- assert tm.is_blocked()
+ assert n4.state == SCons.Node.executing, n4.state
t.executed()
- assert not tm.is_blocked()
+ t.postprocess()
t = tm.next_task()
assert t.get_target() == n2
- assert tm.is_blocked()
t.executed()
+ t.postprocess()
t = tm.next_task()
assert t.get_target() == n3
- assert tm.is_blocked()
t.executed()
+ t.postprocess()
t = tm.next_task()
assert t.get_target() == n4
- assert tm.is_blocked()
t.executed()
+ t.postprocess()
t = tm.next_task()
assert t.get_target() == n5
- assert tm.is_blocked() # still executing n5
assert not tm.next_task()
t.executed()
- assert not tm.is_blocked()
+ t.postprocess()
n1 = Node("n1")
n2 = Node("n2")
t = tm.next_task()
assert t.get_target() == n3, t.get_target()
t.executed()
+ t.postprocess()
t = tm.next_task()
assert t.get_target() == n2, t.get_target()
t.executed()
+ t.postprocess()
t = tm.next_task()
assert t.get_target() == n1, t.get_target()
t.executed()
+ t.postprocess()
t = tm.next_task()
assert t.get_target() == n4, t.get_target()
t.executed()
+ t.postprocess()
n5 = Node("n5")
n6 = Node("n6")
n7 = Node("n7")
n6.alttargets = [n7]
+
tm = SCons.Taskmaster.Taskmaster([n5])
t = tm.next_task()
assert t.get_target() == n5
t.executed()
+ t.postprocess()
+
tm = SCons.Taskmaster.Taskmaster([n6])
t = tm.next_task()
assert t.get_target() == n7
t.executed()
+ t.postprocess()
+ t = tm.next_task()
+ assert t.get_target() == n6
+ t.executed()
+ t.postprocess()
+
+ n1 = Node("n1")
+ n2 = Node("n2", [n1])
+ n1.set_state(SCons.Node.failed)
+ tm = SCons.Taskmaster.Taskmaster([n2])
+ assert tm.next_task() is None
+
+ n1 = Node("n1")
+ n2 = Node("n2")
+ n1.targets = [n1, n2]
+ n1._current_val = 1
+ tm = SCons.Taskmaster.Taskmaster([n1])
+ t = tm.next_task()
+ t.executed()
+ t.postprocess()
+
+ s = n1.get_state()
+ assert s == SCons.Node.executed, s
+ s = n2.get_state()
+ assert s == SCons.Node.executed, s
def test_make_ready_out_of_date(self):
"""Test the Task.make_ready() method's list of out-of-date Nodes
"""
- class MyCalc(SCons.Taskmaster.Calc):
- def current(self, node, sig):
- n = str(node)
- return n[0] == 'c'
-
ood = []
def TaskGen(tm, targets, top, node, ood=ood):
class MyTask(SCons.Taskmaster.Task):
n1 = Node("n1")
c2 = Node("c2")
+ c2._current_val = 1
n3 = Node("n3")
c4 = Node("c4")
- tm = SCons.Taskmaster.Taskmaster(targets = [n1, c2, n3, c4],
- tasker = TaskGen,
- calc = MyCalc())
+ c4._current_val = 1
+ a5 = Node("a5")
+ a5._current_val = 1
+ a5.always_build = 1
+ tm = SCons.Taskmaster.Taskmaster(targets = [n1, c2, n3, c4, a5],
+ tasker = TaskGen)
del ood[:]
t = tm.next_task()
t = tm.next_task()
assert ood == [], ood
+ del ood[:]
+ t = tm.next_task()
+ assert ood == [a5], ood
def test_make_ready_exception(self):
"""Test handling exceptions from Task.make_ready()
n1 = Node("n1")
tm = SCons.Taskmaster.Taskmaster(targets = [n1], tasker = MyTask)
t = tm.next_task()
- assert isinstance(tm.exc_type, SCons.Errors.TaskmasterException), repr(tm.exc_type)
- assert tm.exc_value is None, tm.exc_value
- e = tm.exc_type
- assert e.type == MyException, e.type
- assert str(e.value) == "from make_ready()", str(e.value)
+ exc_type, exc_value, exc_tb = t.exception
+ assert exc_type == MyException, repr(exc_type)
+ assert str(exc_value) == "from make_ready()", exc_value
def test_make_ready_all(self):
- class MyCalc(SCons.Taskmaster.Calc):
- def current(self, node, sig):
- n = str(node)
- return n[0] == 'c'
-
+ """Test the make_ready_all() method"""
class MyTask(SCons.Taskmaster.Task):
make_ready = SCons.Taskmaster.Task.make_ready_all
n1 = Node("n1")
c2 = Node("c2")
+ c2._current_val = 1
n3 = Node("n3")
c4 = Node("c4")
+ c4._current_val = 1
- tm = SCons.Taskmaster.Taskmaster(targets = [n1, c2, n3, c4],
- calc = MyCalc())
+ tm = SCons.Taskmaster.Taskmaster(targets = [n1, c2, n3, c4])
t = tm.next_task()
target = t.get_target()
c4 = Node("c4")
tm = SCons.Taskmaster.Taskmaster(targets = [n1, c2, n3, c4],
- tasker = MyTask,
- calc = MyCalc())
+ tasker = MyTask)
t = tm.next_task()
target = t.get_target()
n1 = StopNode("n1")
tm = SCons.Taskmaster.Taskmaster([n1])
t = tm.next_task()
- assert isinstance(tm.exc_type, SCons.Errors.TaskmasterException), repr(tm.exc_type)
- assert tm.exc_value is None, tm.exc_value
- e = tm.exc_type
- assert e.type == SCons.Errors.StopError, e.type
- assert str(e.value) == "stop!", "Unexpected exc_value `%s'" % e.value
+ exc_type, exc_value, exc_tb = t.exception
+ assert exc_type == SCons.Errors.StopError, repr(exc_type)
+ assert str(exc_value) == "stop!", exc_value
n2 = ExitNode("n2")
tm = SCons.Taskmaster.Taskmaster([n2])
t = tm.next_task()
- assert tm.exc_type == SCons.Errors.ExplicitExit, "Did not record ExplicitExit on node"
- assert tm.exc_value.node == n2, tm.exc_value.node
- assert tm.exc_value.status == 77, tm.exc_value.status
+ exc_type, exc_value = t.exception
+ assert exc_type == SCons.Errors.ExplicitExit, repr(exc_type)
+ assert exc_value.node == n2, exc_value.node
+ assert exc_value.status == 77, exc_value.status
def test_cycle_detection(self):
"""Test detecting dependency cycles
-
"""
n1 = Node("n1")
n2 = Node("n2", [n1])
n3 = Node("n3", [n2])
n1.kids = [n3]
+ tm = SCons.Taskmaster.Taskmaster([n3])
try:
- tm = SCons.Taskmaster.Taskmaster([n3])
t = tm.next_task()
except SCons.Errors.UserError, e:
assert str(e) == "Dependency cycle: n3 -> n1 -> n2 -> n3", str(e)
else:
- assert 0
-
- def test_is_blocked(self):
- """Test whether a task is blocked
+ assert 'Did not catch expected UserError'
- Both default and overridden in a subclass.
+ def test_next_top_level_candidate(self):
+ """Test the next_top_level_candidate() method
"""
- tm = SCons.Taskmaster.Taskmaster()
- assert not tm.is_blocked()
-
- class MyTM(SCons.Taskmaster.Taskmaster):
- def _find_next_ready_node(self):
- self.ready = 1
- tm = MyTM()
- assert not tm.is_blocked()
-
- class MyTM(SCons.Taskmaster.Taskmaster):
- def _find_next_ready_node(self):
- self.ready = None
- self.pending = []
- self.executing = []
- tm = MyTM()
- assert not tm.is_blocked()
-
- class MyTM(SCons.Taskmaster.Taskmaster):
- def _find_next_ready_node(self):
- self.ready = None
- self.pending = [1]
- tm = MyTM()
- assert tm.is_blocked()
+ n1 = Node("n1")
+ n2 = Node("n2", [n1])
+ n3 = Node("n3", [n2])
- class MyTM(SCons.Taskmaster.Taskmaster):
- def _find_next_ready_node(self):
- self.ready = None
- self.executing = [1]
- tm = MyTM()
- assert tm.is_blocked()
+ tm = SCons.Taskmaster.Taskmaster([n3])
+ t = tm.next_task()
+ assert t.targets == [n1], t.targets
+ t.fail_stop()
+ assert t.targets == [n3], list(map(str, t.targets))
+ assert t.top == 1, t.top
def test_stop(self):
"""Test the stop() method
n1 = Node("n1")
n2 = Node("n2")
n3 = Node("n3", [n1, n2])
-
+
tm = SCons.Taskmaster.Taskmaster([n3])
t = tm.next_task()
t.prepare()
t.execute()
assert built_text == "n1 built", built_text
t.executed()
+ t.postprocess()
assert built_text == "n1 built really", built_text
tm.stop()
assert built_text == "MyTM.stop()"
assert tm.next_task() is None
- def test_failed(self):
- """Test when a task has failed
- """
- n1 = Node("n1")
- tm = SCons.Taskmaster.Taskmaster([n1])
- t = tm.next_task()
- assert tm.executing == [n1], tm.executing
- tm.failed(n1)
- assert tm.executing == [], tm.executing
-
def test_executed(self):
"""Test when a task has been executed
"""
built_text = "xxx"
visited_nodes = []
n1.set_state(SCons.Node.executing)
+
t.executed()
+
s = n1.get_state()
assert s == SCons.Node.executed, s
assert built_text == "xxx really", built_text
- assert visited_nodes == [], visited_nodes
+ assert visited_nodes == ['n1'], visited_nodes
n2 = Node("n2")
tm = SCons.Taskmaster.Taskmaster([n2])
built_text = "should_not_change"
visited_nodes = []
n2.set_state(None)
+
t.executed()
- s = n1.get_state()
- assert s == SCons.Node.executed, s
+
+ s = n2.get_state()
+ assert s is None, s
assert built_text == "should_not_change", built_text
- assert visited_nodes == ["n2"], visited_nodes
+ assert visited_nodes == ['n2'], visited_nodes
+
+ n3 = Node("n3")
+ n4 = Node("n4")
+ n3.targets = [n3, n4]
+ tm = SCons.Taskmaster.Taskmaster([n3])
+ t = tm.next_task()
+ visited_nodes = []
+ n3.set_state(SCons.Node.up_to_date)
+ n4.set_state(SCons.Node.executing)
+
+ t.executed()
+
+ s = n3.get_state()
+ assert s == SCons.Node.up_to_date, s
+ s = n4.get_state()
+ assert s == SCons.Node.executed, s
+ assert visited_nodes == ['n3', 'n4'], visited_nodes
def test_prepare(self):
"""Test preparation of multiple Nodes for a task
-
"""
n1 = Node("n1")
n2 = Node("n2")
# set it up by having something that approximates a real Builder
# return this list--but that's more work than is probably
# warranted right now.
- t.targets = [n1, n2]
+ n1.get_executor().targets = [n1, n2]
t.prepare()
assert n1.prepared
assert n2.prepared
t = tm.next_task()
# More bogus reaching in and setting the targets.
n3.set_state(SCons.Node.up_to_date)
- t.targets = [n3, n4]
+ n3.get_executor().targets = [n3, n4]
t.prepare()
assert n3.prepared
assert n4.prepared
built_text = None
n5 = Node("n5")
tm = SCons.Taskmaster.Taskmaster([n5])
- tm.exc_type = MyException
- tm.exc_value = "exception value"
t = tm.next_task()
+ t.exception_set((MyException, "exception value"))
exc_caught = None
try:
t.prepare()
n6.side_effects = [ n8 ]
n7.side_effects = [ n9, n10 ]
-
+
tm = SCons.Taskmaster.Taskmaster([n6, n7])
t = tm.next_task()
# More bogus reaching in and setting the targets.
- t.targets = [n6, n7]
+ n6.get_executor().targets = [n6, n7]
t.prepare()
assert n6.prepared
assert n7.prepared
assert n9.prepared
assert n10.prepared
+ # Make sure we call an Executor's prepare() method.
+ class ExceptionExecutor:
+ def prepare(self):
+ raise Exception, "Executor.prepare() exception"
+ def get_all_targets(self):
+ return self.nodes
+ def get_all_children(self):
+ result = []
+ for node in self.nodes:
+ result.extend(node.children())
+ return result
+ def get_all_prerequisites(self):
+ return []
+ def get_action_side_effects(self):
+ return []
+
+ n11 = Node("n11")
+ n11.executor = ExceptionExecutor()
+ n11.executor.nodes = [n11]
+ tm = SCons.Taskmaster.Taskmaster([n11])
+ t = tm.next_task()
+ try:
+ t.prepare()
+ except Exception, e:
+ assert str(e) == "Executor.prepare() exception", e
+ else:
+ raise AssertionError, "did not catch expected exception"
+
def test_execute(self):
"""Test executing a task
-
"""
global built_text
global cache_text
t.execute()
except SCons.Errors.BuildError, e:
assert e.node == n4, e.node
- assert e.errstr == "Exception", e.errstr
- assert len(e.args) == 3, `e.args`
- assert e.args[0] == OtherError, e.args[0]
- assert isinstance(e.args[1], OtherError), type(e.args[1])
- assert type(e.args[2]) == type(sys.exc_traceback), e.args[2]
+ assert e.errstr == "OtherError : ", e.errstr
+ assert len(e.exc_info) == 3, e.exc_info
+ exc_traceback = sys.exc_info()[2]
+ assert isinstance(e.exc_info[2], type(exc_traceback)), e.exc_info[2]
else:
raise TestFailed, "did not catch expected BuildError"
"""
n1 = Node("n1")
tm = SCons.Taskmaster.Taskmaster([n1])
+ t = tm.next_task()
+
+ t.exception_set((1, 2))
+ exc_type, exc_value = t.exception
+ assert exc_type == 1, exc_type
+ assert exc_value == 2, exc_value
+
+ t.exception_set(3)
+ assert t.exception == 3
+
+ try: 1/0
+ except: pass
+ t.exception_set(None)
+ exc_type, exc_value, exc_tb = t.exception
+ assert exc_type is ZeroDivisionError, exc_type
+ exception_values = [
+ "integer division or modulo",
+ "integer division or modulo by zero",
+ ]
+ assert str(exc_value) in exception_values, exc_value
+
+ class Exception1(Exception):
+ pass
- tm.exception_set(1, 2)
- assert tm.exc_type == 1, tm.exc_type
- assert tm.exc_value == 2, tm.exc_value
-
- tm.exception_set(3)
- assert tm.exc_type == 3, tm.exc_type
- assert tm.exc_value is None, tm.exc_value
+ t.exception_set((Exception1, None))
+ try:
+ t.exception_raise()
+ except:
+ exc_type, exc_value = sys.exc_info()[:2]
+ assert exc_type == Exception1, exc_type
+ assert str(exc_value) == '', exc_value
+ else:
+ assert 0, "did not catch expected exception"
- tm.exception_set(None, None)
- assert tm.exc_type is None, tm.exc_type
- assert tm.exc_value is None, tm.exc_value
+ class Exception2(Exception):
+ pass
- tm.exception_set("exception 1", None)
+ t.exception_set((Exception2, "xyzzy"))
try:
- tm.exception_raise()
+ t.exception_raise()
except:
- assert sys.exc_type == "exception 1", sys.exc_type
- assert sys.exc_value is None, sys.exc_value
+ exc_type, exc_value = sys.exc_info()[:2]
+ assert exc_type == Exception2, exc_type
+ assert str(exc_value) == "xyzzy", exc_value
else:
assert 0, "did not catch expected exception"
- tm.exception_set("exception 2", "xyzzy")
+ class Exception3(Exception):
+ pass
+
+ try:
+ 1/0
+ except:
+ tb = sys.exc_info()[2]
+ t.exception_set((Exception3, "arg", tb))
try:
- tm.exception_raise()
+ t.exception_raise()
except:
- assert sys.exc_type == "exception 2", sys.exc_type
- assert sys.exc_value == "xyzzy", sys.exc_value
+ exc_type, exc_value, exc_tb = sys.exc_info()
+ assert exc_type == Exception3, exc_type
+ assert str(exc_value) == "arg", exc_value
+ import traceback
+ x = traceback.extract_tb(tb)[-1]
+ y = traceback.extract_tb(exc_tb)[-1]
+ assert x == y, "x = %s, y = %s" % (x, y)
else:
assert 0, "did not catch expected exception"
def test_postprocess(self):
"""Test postprocessing targets to give them a chance to clean up
-
"""
n1 = Node("n1")
tm = SCons.Taskmaster.Taskmaster([n1])
assert n2.postprocessed
assert n3.postprocessed
+ def test_trace(self):
+ """Test Taskmaster tracing
+ """
+ import StringIO
+
+ trace = StringIO.StringIO()
+ n1 = Node("n1")
+ n2 = Node("n2")
+ n3 = Node("n3", [n1, n2])
+ tm = SCons.Taskmaster.Taskmaster([n1, n1, n3], trace=trace)
+ t = tm.next_task()
+ t.prepare()
+ t.execute()
+ t.postprocess()
+ n1.set_state(SCons.Node.executed)
+ t = tm.next_task()
+ t.prepare()
+ t.execute()
+ t.postprocess()
+ n2.set_state(SCons.Node.executed)
+ t = tm.next_task()
+ t.prepare()
+ t.execute()
+ t.postprocess()
+ t = tm.next_task()
+ assert t is None
+
+ value = trace.getvalue()
+ expect = """\
+
+Taskmaster: Looking for a node to evaluate
+Taskmaster: Considering node <no_state 0 'n1'> and its children:
+Taskmaster: Evaluating <pending 0 'n1'>
+
+Task.make_ready_current(): node <pending 0 'n1'>
+Task.prepare(): node <executing 0 'n1'>
+Task.execute(): node <executing 0 'n1'>
+Task.postprocess(): node <executing 0 'n1'>
+
+Taskmaster: Looking for a node to evaluate
+Taskmaster: Considering node <executed 0 'n1'> and its children:
+Taskmaster: already handled (executed)
+Taskmaster: Considering node <no_state 0 'n3'> and its children:
+Taskmaster: <executed 0 'n1'>
+Taskmaster: <no_state 0 'n2'>
+Taskmaster: adjusted ref count: <pending 1 'n3'>, child 'n2'
+Taskmaster: Considering node <no_state 0 'n2'> and its children:
+Taskmaster: Evaluating <pending 0 'n2'>
+
+Task.make_ready_current(): node <pending 0 'n2'>
+Task.prepare(): node <executing 0 'n2'>
+Task.execute(): node <executing 0 'n2'>
+Task.postprocess(): node <executing 0 'n2'>
+Task.postprocess(): removing <executing 0 'n2'>
+Task.postprocess(): adjusted parent ref count <pending 0 'n3'>
+
+Taskmaster: Looking for a node to evaluate
+Taskmaster: Considering node <pending 0 'n3'> and its children:
+Taskmaster: <executed 0 'n1'>
+Taskmaster: <executed 0 'n2'>
+Taskmaster: Evaluating <pending 0 'n3'>
+
+Task.make_ready_current(): node <pending 0 'n3'>
+Task.prepare(): node <executing 0 'n3'>
+Task.execute(): node <executing 0 'n3'>
+Task.postprocess(): node <executing 0 'n3'>
+
+Taskmaster: Looking for a node to evaluate
+Taskmaster: No candidate anymore.
+
+"""
+ assert value == expect, value
+
if __name__ == "__main__":
suite = unittest.makeSuite(TaskmasterTestCase, 'test_')
if not unittest.TextTestRunner().run(suite).wasSuccessful():
sys.exit(1)
+
+# Local Variables:
+# tab-width:4
+# indent-tabs-mode:nil
+# End:
+# vim: set expandtab tabstop=4 shiftwidth=4: