count = count + 1
return count
+ def unlink (self, file):
+ """Unlinks the specified file name.
+ The file name may be a list, in which case the elements are
+ concatenated with the os.path.join() method. The file is
+ assumed to be under the temporary working directory unless it
+ is an absolute path name.
+ """
+ if type(file) is ListType:
+ file = apply(os.path.join, tuple(file))
+ if not os.path.isabs(file):
+ file = os.path.join(self.workdir, file)
+ os.unlink(file)
+
+
def verbose_set(self, verbose):
"""Set the verbose level.
"""
kids.sort()
assert kids == ['five', 'four', 'one', 'six', 'three', 'two']
+ def test_state(self):
+ """Test setting and getting the state of a node
+ """
+ node = SCons.Node.Node()
+ assert node.get_state() == None
+ node.set_state(SCons.Node.executing)
+ assert node.get_state() == SCons.Node.executing
+
def test_walker(self):
"""Test walking a Node tree.
"""
n1 = MyNode("n1")
nw = SCons.Node.Walker(n1)
+ assert not nw.is_done()
assert nw.next().name == "n1"
+ assert nw.is_done()
assert nw.next() == None
n2 = MyNode("n2")
from SCons.Errors import BuildError
import string
import types
+import copy
+
+# Node states:
+executing = 1
+executed = 2
+up_to_date = 3
+failed = 4
class Node:
self.depends = []
self.derived = 0
self.env = None
+ self.state = None
def build(self):
if not hasattr(self, "builder"):
def children(self):
return self.sources + self.depends
+ def set_state(self, state):
+ self.state = state
-
+ def get_state(self):
+ return self.state
class Wrapper:
def __init__(self, node):
self.node = node
- self.kids = node.children()
+ self.kids = copy.copy(node.children())
# XXX randomize kids here, if requested
class Walker:
"""An iterator for walking a Node tree.
-
+
This is depth-first, children are visited before the parent.
- The Walker object can be initialized with any node, and
+ The Walker object can be initialized with any node, and
returns the next node on the descent with each next() call.
"""
def __init__(self, node):
- self.current = Wrapper(node)
- self.stack = []
- self.top = self.current
+ self.stack = [Wrapper(node)]
def next(self):
"""Return the next node for this walk of the tree.
This function is intentionally iterative, not recursive,
to sidestep any issues of stack size limitations.
"""
- if not self.current:
- return None
- while 1:
- if self.current.kids:
- k = Wrapper(self.current.kids[0])
- self.current.kids = self.current.kids[1:]
- if k.kids:
- self.stack.append(self.current)
- self.current = k
- else:
- return k.node
- else:
- n = self.current.node
- if self.stack:
- self.current = self.stack[-1]
- self.stack = self.stack[0:-1]
- else:
- self.current = None
- return n
+ while self.stack:
+ if self.stack[-1].kids:
+ self.stack.append(Wrapper(self.stack[-1].kids.pop(0)))
+ else:
+ return self.stack.pop().node
+
+ def is_done(self):
+ return not self.stack
def execute(self):
self.target.build()
-
-
+ def set_state(self, state):
+ return self.target.set_state(state)
+
def current(node):
"""Default SCons build engine is-it-current function.
class Taskmaster:
"""A generic Taskmaster for handling a bunch of targets.
+
+ Classes that override methods of this class should call
+ the base class method, so this class can do it's thing.
"""
def __init__(self, targets=[], tasker=Task, current=current):
- self.targets = targets
+ self.walkers = map(SCons.Node.Walker, targets)
self.tasker = tasker
self.current = current
- self.num_iterated = 0
- self.walker = None
-
- def next_node(self):
- t = None
- if self.walker:
- t = self.walker.next()
- if t == None and self.num_iterated < len(self.targets):
- t = self.targets[self.num_iterated]
- self.num_iterated = self.num_iterated + 1
- t.top_target = 1
- self.walker = SCons.Node.Walker(t)
- t = self.walker.next()
- top = None
- if hasattr(t, "top_target"):
- top = 1
- return t, top
+ self.targets = targets
def next_task(self):
- n, top = self.next_node()
- while n != None:
- if self.current(n):
- self.up_to_date(n)
+ while self.walkers:
+ n = self.walkers[0].next()
+ if n == None:
+ self.walkers.pop(0)
+ elif n.get_state() == SCons.Node.up_to_date:
+ self.up_to_date(n, self.walkers[0].is_done())
+ elif n.get_state() == SCons.Node.failed:
+ # XXX do the right thing here
+ pass
+ elif n.get_state() == SCons.Node.executing:
+ # XXX do the right thing here
+ pass
+ elif n.get_state() == SCons.Node.executed:
+ # skip this node because it has already been executed
+ pass
+ elif self.current(n):
+ n.set_state(SCons.Node.up_to_date)
+ self.up_to_date(n, self.walkers[0].is_done())
else:
+ n.set_state(SCons.Node.executing)
return self.tasker(n)
- n, top = self.next_node()
- return None
-
+ return None
+
def is_blocked(self):
return 0
pass
def executed(self, task):
- pass
+ task.set_state(SCons.Node.executed)
def failed(self, task):
- self.walker = None
- self.num_iterated = len(self.targets)
+ self.walkers = []
+ task.set_state(SCons.Node.failed)
def children(self):
return self.kids
+ def get_state(self):
+ pass
+ def set_state(self, state):
+ pass
-class TaskmasterTestCase(unittest.TestCase):
+class Task:
+ def __init__(self, target):
+ self.target = target
- def test_next_node(self):
- """Test fetching the next node
- """
- n1 = Node("n1")
- n2 = Node("n2")
- n3 = Node("n3", [n1, n2])
+ def set_state(self, state):
+ pass
- tm = SCons.Taskmaster.Taskmaster([n3])
- n, top = tm.next_node()
- assert n.name == "n1"
- assert top == None
- n, top = tm.next_node()
- assert n.name == "n2"
- assert top == None
- n, top = tm.next_node()
- assert n.name == "n3"
- assert top == 1
- n, top = tm.next_node()
- assert n == None
- assert top == None
+
+class TaskmasterTestCase(unittest.TestCase):
def test_next_task(self):
"""Test fetching the next task
n1 = Node("n1")
n2 = Node("n2")
- n3 = Node("n3", [n1, n2])
-
+ n3 = Node("n3", [n1, n2])
+
tm = SCons.Taskmaster.Taskmaster([n3])
tm.next_task().execute()
assert built == "n1 built"
built = "up to date: "
+ global top_node
+ top_node = n3
class MyTM(SCons.Taskmaster.Taskmaster):
- def up_to_date(self, node):
- global built
- built = built + " " + node.name
+ def up_to_date(self, node, top):
+ if node == top_node:
+ assert top
+ global built
+ built = built + " " + node.name
tm = MyTM(targets = [n3], current = current)
assert tm.next_task() == None
+ print built
assert built == "up to date: n1 n2 n3"
+ n4 = Node("n4")
+ n4.get_state = lambda: SCons.Node.executed
+ tm = SCons.Taskmaster.Taskmaster([n4])
+ assert tm.next_task() == None
+
def test_is_blocked(self):
"""Test whether a task is blocked
Both default and overridden in a subclass.
"""
tm = SCons.Taskmaster.Taskmaster()
- tm.executed('foo')
+ tm.executed(Task('foo'))
class MyTM(SCons.Taskmaster.Taskmaster):
def executed(self, task):
"""
tm = SCons.Taskmaster.Taskmaster()
#XXX
- tm.failed('foo')
+ tm.failed(Task('foo'))
class MyTM(SCons.Taskmaster.Taskmaster):
def failed(self, task):
except BuildError, e:
sys.stderr.write("scons: *** [%s] Error %d\n" % (e.node, e.stat))
raise
+
+ def set_state(self, state):
+ return self.target.set_state(state)
class CleanTask(SCons.Taskmaster.Task):
"""An SCons clean task."""
"""Controlling logic for tasks.
This is the stock Taskmaster from the build engine, except
- that we override the next_task() method to provide our
- script-specific up-to-date message for command-line targets.
+ that we override the up_to_date() method to provide our
+ script-specific up-to-date message for command-line targets,
+ and failed to provide the ignore-errors feature.
"""
- def next_task(self):
- t, top = SCons.Taskmaster.Taskmaster.next_node(self)
- while t != None:
- if not self.current(t):
- return self.tasker(t)
- elif top:
- print 'scons: "%s" is up to date.' % t
- t, top = SCons.Taskmaster.Taskmaster.next_node(self)
- return None
-
-
-
+ def up_to_date(self, node, top):
+ if top:
+ print 'scons: "%s" is up to date.' % node
+ SCons.Taskmaster.Taskmaster.up_to_date(self, node)
+
+ def failed(self, task):
+ if self.ignore_errors:
+ SCons.Taskmaster.Taskmaster.executed(self, task)
+ else:
+ SCons.Taskmaster.Taskmaster.failed(self, task)
+
+ ignore_errors = 0
+
# Global variables
default_targets = []
short = 'H', long = ['help-options'],
help = "Print this message and exit.")
- Option(func = opt_not_yet,
+ def opt_i(opt, arg):
+ ScriptTaskmaster.ignore_errors = 1
+
+ Option(func = opt_i,
short = 'i', long = ['ignore-errors'],
help = "Ignore errors from build actions.")
test = TestSCons.TestSCons()
-test.pass_test() #XXX Short-circuit until this is supported.
-
test.write('succeed.py', r"""
import sys
file = open(sys.argv[1], 'w')
env.Succeed(target = 'bbb.out', source = 'bbb.1')
""")
-test.run(arguments = '.')
+test.run(arguments = 'aaa.1 aaa.out bbb.1 bbb.out',
+ stderr = 'scons: *** [aaa.1] Error 1\n')
test.fail_test(os.path.exists(test.workpath('aaa.1')))
test.fail_test(os.path.exists(test.workpath('aaa.out')))
test.fail_test(os.path.exists(test.workpath('bbb.1')))
test.fail_test(os.path.exists(test.workpath('bbb.out')))
-test.run(arguments = '-i .')
-
+test.run(arguments = '-i aaa.1 aaa.out bbb.1 bbb.out',
+ stderr =
+ 'scons: *** [aaa.1] Error 1\n'
+ 'scons: *** [bbb.1] Error 1\n')
+
test.fail_test(os.path.exists(test.workpath('aaa.1')))
-test.fail_test(test.read('aaa.out') != "aaa.out\n")
+test.fail_test(test.read('aaa.out') != "succeed.py: aaa.out\n")
test.fail_test(os.path.exists(test.workpath('bbb.1')))
-test.fail_test(test.read('bbb.out') != "bbb.out\n")
+test.fail_test(test.read('bbb.out') != "succeed.py: bbb.out\n")
test.unlink("aaa.out")
test.unlink("bbb.out")
-test.run(arguments = '--ignore-errors .')
+test.run(arguments = '--ignore-errors aaa.1 aaa.out bbb.1 bbb.out',
+ stderr =
+ 'scons: *** [aaa.1] Error 1\n'
+ 'scons: *** [bbb.1] Error 1\n')
test.fail_test(os.path.exists(test.workpath('aaa.1')))
-test.fail_test(test.read('aaa.out') != "aaa.out\n")
+test.fail_test(test.read('aaa.out') != "succeed.py: aaa.out\n")
test.fail_test(os.path.exists(test.workpath('bbb.1')))
-test.fail_test(test.read('bbb.out') != "bbb.out\n")
+test.fail_test(test.read('bbb.out') != "succeed.py: bbb.out\n")
test.pass_test()