jobs = 0
while 1:
-
- # There's a concern here that the while-loop test below
- # might delay reporting status back about failed build
- # tasks until the entire build is done if tasks execute
- # fast enough, or self.maxjobs is big enough. It looks
- # like that's enough of a corner case that we'll wait to
- # see if it's an issue in practice. If so, one possible
- # fix might be:
- #
- # while jobs < self.maxjobs and \
- # self.tp.resultsQueue.empty():
- #
- # but that's somewhat unattractive because the
- # resultsQueue.empty() check might introduce some
- # significant overhead involving mutex locking.
+ # Start up as many available tasks as we're
+ # allowed to.
while jobs < self.maxjobs:
task = self.taskmaster.next_task()
if task is None:
if not task and not jobs: break
- task, ok = self.tp.get()
+ # Let any/all completed tasks finish up before we go
+ # back and put the next batch of tasks on the queue.
+ while 1:
+ task, ok = self.tp.get()
- jobs = jobs - 1
- if ok:
- task.executed()
- else:
- task.failed()
+ jobs = jobs - 1
+ if ok:
+ task.executed()
+ else:
+ task.failed()
- task.postprocess()
+ task.postprocess()
+
+ if self.tp.resultsQueue.empty():
+ break
def prepare(self):
self.was_prepared = 1
-
+
+ def _do_something(self):
+ pass
+
def execute(self):
self.taskmaster.test_case.failUnless(self.was_prepared,
"the task wasn't prepared")
self.taskmaster.begin_list.append(self.i)
self.taskmaster.guard.release()
- # do something that will take some random amount of time:
- for i in range(random.randrange(0, num_sines, 1)):
- x = math.sin(i)
- time.sleep(0.01)
+ self._do_something()
self.was_executed = 1
"the task wasn't prepared")
self.taskmaster.test_case.failUnless(self.was_executed,
"the task wasn't really executed")
- self.taskmaster.test_case.failUnless(self.__class__ is Task,
+ self.taskmaster.test_case.failUnless(isinstance(self, Task),
"the task wasn't really a Task instance")
def failed(self):
def postprocess(self):
self.taskmaster.num_postprocessed = self.taskmaster.num_postprocessed + 1
+class RandomTask(Task):
+ def _do_something(self):
+ # do something that will take some random amount of time:
+ for i in range(random.randrange(0, num_sines, 1)):
+ x = math.sin(i)
+ time.sleep(0.01)
class ExceptionTask:
"""A dummy task class for testing purposes."""
# keep track of the order tasks are completed in
self.end_list = []
-
def next_task(self):
if self.stop or self.all_tasks_are_iterated():
return None
def exception_set(self):
pass
+SaveThreadPool = None
+ThreadPoolCallList = []
+
class ParallelTestCase(unittest.TestCase):
def runTest(self):
"test parallel jobs"
except:
raise NoThreadsException()
- taskmaster = Taskmaster(num_tasks, self, Task)
+ taskmaster = Taskmaster(num_tasks, self, RandomTask)
jobs = SCons.Job.Jobs(num_jobs, taskmaster)
jobs.run()
self.failIf(taskmaster.num_failed,
"some task(s) failed to execute")
+ # Verify that parallel jobs will pull all of the completed tasks
+ # out of the queue at once, instead of one by one. We do this by
+ # replacing the default ThreadPool class with one that records the
+ # order in which tasks are put() and get() to/from the pool, and
+ # which sleeps a little bit before call get() to let the initial
+ # tasks complete and get their notifications on the resultsQueue.
+
+ class SleepTask(Task):
+ def _do_something(self):
+ time.sleep(0.1)
+
+ global SaveThreadPool
+ SaveThreadPool = SCons.Job.ThreadPool
+
+ class WaitThreadPool(SaveThreadPool):
+ def put(self, task):
+ ThreadPoolCallList.append('put(%s)' % task.i)
+ return SaveThreadPool.put(self, task)
+ def get(self):
+ time.sleep(0.5)
+ result = SaveThreadPool.get(self)
+ ThreadPoolCallList.append('get(%s)' % result[0].i)
+ return result
+
+ SCons.Job.ThreadPool = WaitThreadPool
+
+ try:
+ taskmaster = Taskmaster(3, self, SleepTask)
+ jobs = SCons.Job.Jobs(2, taskmaster)
+ jobs.run()
+
+ # The key here is that we get(1) and get(2) from the
+ # resultsQueue before we put(3).
+ expect = [
+ 'put(1)',
+ 'put(2)',
+ 'get(1)',
+ 'get(2)',
+ 'put(3)',
+ 'get(3)',
+ ]
+ assert ThreadPoolCallList == expect, ThreadPoolCallList
+
+ finally:
+ SCons.Job.ThreadPool = SaveThreadPool
+
class SerialTestCase(unittest.TestCase):
def runTest(self):
"test a serial job"
- taskmaster = Taskmaster(num_tasks, self, Task)
+ taskmaster = Taskmaster(num_tasks, self, RandomTask)
jobs = SCons.Job.Jobs(1, taskmaster)
jobs.run()
save_Parallel = SCons.Job.Parallel
SCons.Job.Parallel = NoParallel
try:
- taskmaster = Taskmaster(num_tasks, self, Task)
+ taskmaster = Taskmaster(num_tasks, self, RandomTask)
jobs = SCons.Job.Jobs(2, taskmaster)
self.failUnless(jobs.num_jobs == 1,
"unexpected number of jobs %d" % jobs.num_jobs)