if num > 1:
self.jobs = []
for i in range(num):
- self.jobs.append(Parallel(taskmaster))
+ self.jobs.append(Parallel(taskmaster, self))
else:
self.jobs = [Serial(taskmaster)]
def __init__(self, taskmaster):
"""Create a new serial job given a taskmaster.
- The taskmaster's next_task() method should return the next task that
- needs to be executed, or None if there are no more tasks. The
- taskmaster's executed() method will be called for each task when it is
- finished being executed. The taskmaster's is_blocked() method will not
- be called.
- """
+ The taskmaster's next_task() method should return the next task
+ that needs to be executed, or None if there are no more tasks. The
+ taskmaster's executed() method will be called for each task when it
+ is successfully executed or failed() will be called if it failed to
+ execute (e.g. execute() raised an exception). The taskmaster's
+ is_blocked() method will not be called. """
self.taskmaster = taskmaster
def start(self):
"""Start the job. This will begin pulling tasks from the taskmaster
- and executing them, and return when there are no more tasks. """
+ and executing them, and return when there are no more tasks. If a task
+ fails to execute (i.e. execute() raises an exception), then the job will
+ stop."""
while 1:
task = self.taskmaster.next_task()
if task is None:
break
-
- task.execute()
- self.taskmaster.executed(task)
+
+ try:
+ task.execute()
+ except:
+ self.taskmaster.failed(task)
+ return
+ else:
+ self.taskmaster.executed(task)
def stop(self):
"""Serial jobs are always finished when start() returns, so there
"""
- def __init__(self, taskmaster):
-
- """Create a new parallel job given a taskmaster. Multiple jobs will
- be using the taskmaster in parallel, but all method calls to taskmaster
- methods are serialized by the jobs themselves.
+ def __init__(self, taskmaster, jobs):
+ """Create a new parallel job given a taskmaster, and a Jobs instance.
+ Multiple jobs will be using the taskmaster in parallel, but all
+ method calls to taskmaster methods are serialized by the jobs
+ themselves.
The taskmaster's next_task() method should return the next task
that needs to be executed, or None if there are no more tasks. The
taskmaster's executed() method will be called for each task when it
- is finished being executed. The taskmaster's is_blocked() method
- should return true iff there are more tasks, but they can't be
- executed until one or more other tasks have been
- executed. next_task() will be called iff is_blocked() returned
- false.
+ is successfully executed or failed() will be called if the task
+ failed to execute (i.e. execute() raised an exception). The
+ taskmaster's is_blocked() method should return true iff there are
+ more tasks, but they can't be executed until one or more other
+ tasks have been executed. next_task() will be called iff
+ is_blocked() returned false.
Note: calls to taskmaster are serialized, but calls to execute() on
distinct tasks are not serialized, because that is the whole point
import threading
self.taskmaster = taskmaster
+ self.jobs = jobs
self.thread = threading.Thread(None, self.__run)
self.stop_running = 0
tasks from the task master and executing them. This method returns
immediately and doesn't wait for the jobs to be executed.
+ If a task fails to execute (i.e. execute() raises an exception),
+ all jobs will be stopped.
+
To stop the job, call stop().
To wait for the job to finish, call wait().
"""
To wait for the job to finish, call wait().
"""
- self.stop_running = 1
+ cv.acquire()
+ self.stop_running = 1
+ # wake up the sleeping jobs so this job will end as soon as possible:
+ cv.notifyAll()
+ cv.release()
+
def wait(self):
"""Wait for the job to finish. A job is finished when either there
are no more tasks or the job has been stopped and it is no longer
try:
while 1:
- while self.taskmaster.is_blocked():
+ while self.taskmaster.is_blocked() and not self.stop_running:
cv.wait(None)
+ # check this before calling next_task(), because
+ # this job may have been stopped because of a build
+ # failure:
+ if self.stop_running:
+ break
+
task = self.taskmaster.next_task()
- if task == None or self.stop_running:
+ if task == None:
break
cv.release()
- task.execute()
- cv.acquire()
-
- self.taskmaster.executed(task)
-
- if not self.taskmaster.is_blocked():
- cv.notifyAll()
-
+ try:
+ try:
+ task.execute()
+ finally:
+ cv.acquire()
+ except:
+ self.taskmaster.failed(task)
+ # stop all jobs since there was a failure:
+ # (this will wake up any waiting jobs, so
+ # it isn't necessary to explicitly wake them
+ # here)
+ self.jobs.stop()
+ else:
+ self.taskmaster.executed(task)
+
+ if not self.taskmaster.is_blocked():
+ cv.notifyAll()
+
finally:
cv.release()
self.taskmaster.end_list.append(self.i)
self.taskmaster.guard.release()
+class ExceptionTask:
+ """A dummy task class for testing purposes."""
+
+ def __init__(self, i, taskmaster):
+ pass
+
+ def execute(self):
+ raise "exception"
+
class Taskmaster:
"""A dummy taskmaster class for testing the job classes."""
- def __init__(self, n, test_case):
+ def __init__(self, n, test_case, Task):
"""n is the number of dummy tasks to perform."""
self.test_case = test_case
self.num_tasks = n
self.num_iterated = 0
self.num_executed = 0
+ self.num_failed = 0
+ self.Task = Task
# 'guard' guards 'task_begin_list' and 'task_end_list'
try:
import threading
return None
else:
self.num_iterated = self.num_iterated + 1
- return Task(self.num_iterated, self)
+ return self.Task(self.num_iterated, self)
def all_tasks_are_executed(self):
return self.num_executed == self.num_tasks
"the task wasn't really executed")
self.test_case.failUnless(task.__class__ is Task,
"the task wasn't really a Task instance")
-
-
+
+ def failed(self, task):
+ self.num_failed = self.num_failed + 1
+
def is_blocked(self):
# simulate blocking tasks
return self.num_iterated - self.num_executed >= max(num_jobs/2, 2)
except:
raise NoThreadsException()
- taskmaster = Taskmaster(num_tasks, self)
+ taskmaster = Taskmaster(num_tasks, self, Task)
jobs = scons.Job.Jobs(num_jobs, taskmaster)
jobs.start()
jobs.wait()
"all the tests were not executed")
self.failUnless(taskmaster.all_tasks_are_iterated(),
"all the tests were not iterated over")
+ self.failIf(taskmaster.num_failed,
+ "some task(s) failed to execute")
class SerialTestCase(unittest.TestCase):
def runTest(self):
"test a serial job"
- taskmaster = Taskmaster(num_tasks, self)
+ taskmaster = Taskmaster(num_tasks, self, Task)
jobs = scons.Job.Jobs(1, taskmaster)
jobs.start()
jobs.wait()
"all the tests were not executed")
self.failUnless(taskmaster.all_tasks_are_iterated(),
"all the tests were not iterated over")
+ self.failIf(taskmaster.num_failed,
+ "some task(s) failed to execute")
+
+class SerialExceptionTestCase(unittest.TestCase):
+ def runTest(self):
+ "test a serial job with tasks that raise exceptions"
+
+ taskmaster = Taskmaster(num_tasks, self, ExceptionTask)
+ jobs = scons.Job.Jobs(1, taskmaster)
+ jobs.start()
+ jobs.wait()
+
+ self.failIf(taskmaster.num_executed,
+ "a task was executed")
+ self.failUnless(taskmaster.num_iterated == 1,
+ "exactly one task should have been iterated")
+ self.failUnless(taskmaster.num_failed == 1,
+ "exactly one task should have failed")
+
+class ParallelExceptionTestCase(unittest.TestCase):
+ def runTest(self):
+ "test parallel jobs with tasks that raise exceptions"
+
+ taskmaster = Taskmaster(num_tasks, self, ExceptionTask)
+ jobs = scons.Job.Jobs(num_jobs, taskmaster)
+ jobs.start()
+ jobs.wait()
+
+ self.failIf(taskmaster.num_executed,
+ "a task was executed")
+ self.failUnless(taskmaster.num_iterated >= 1,
+ "exactly one task should have been iterated")
+ self.failUnless(taskmaster.num_failed == 1,
+ "exactly one task should have failed")
+
def suite():
suite = unittest.TestSuite()
suite.addTest(ParallelTestCase())
suite.addTest(SerialTestCase())
+ suite.addTest(SerialExceptionTestCase())
+ suite.addTest(ParallelExceptionTestCase())
return suite
if __name__ == "__main__":