From: stevenknight Date: Wed, 4 May 2005 04:31:58 +0000 (+0000) Subject: Reap as many thread responses as possible at once. (J.T. Conklin) X-Git-Url: http://git.tremily.us/?a=commitdiff_plain;h=8466b5354dc0c920f659f6c62738cf781a81adeb;p=scons.git Reap as many thread responses as possible at once. (J.T. Conklin) git-svn-id: http://scons.tigris.org/svn/scons/trunk@1286 fdb21ef1-2011-0410-befe-b5e4ea1792b1 --- diff --git a/src/engine/SCons/Job.py b/src/engine/SCons/Job.py index 2d4dbf8f..60eb8615 100644 --- a/src/engine/SCons/Job.py +++ b/src/engine/SCons/Job.py @@ -222,21 +222,8 @@ else: jobs = 0 while 1: - - # There's a concern here that the while-loop test below - # might delay reporting status back about failed build - # tasks until the entire build is done if tasks execute - # fast enough, or self.maxjobs is big enough. It looks - # like that's enough of a corner case that we'll wait to - # see if it's an issue in practice. If so, one possible - # fix might be: - # - # while jobs < self.maxjobs and \ - # self.tp.resultsQueue.empty(): - # - # but that's somewhat unattractive because the - # resultsQueue.empty() check might introduce some - # significant overhead involving mutex locking. + # Start up as many available tasks as we're + # allowed to. while jobs < self.maxjobs: task = self.taskmaster.next_task() if task is None: @@ -261,12 +248,18 @@ else: if not task and not jobs: break - task, ok = self.tp.get() + # Let any/all completed tasks finish up before we go + # back and put the next batch of tasks on the queue. + while 1: + task, ok = self.tp.get() - jobs = jobs - 1 - if ok: - task.executed() - else: - task.failed() + jobs = jobs - 1 + if ok: + task.executed() + else: + task.failed() - task.postprocess() + task.postprocess() + + if self.tp.resultsQueue.empty(): + break diff --git a/src/engine/SCons/JobTests.py b/src/engine/SCons/JobTests.py index 3326f33c..2ace9c31 100644 --- a/src/engine/SCons/JobTests.py +++ b/src/engine/SCons/JobTests.py @@ -64,7 +64,10 @@ class Task: def prepare(self): self.was_prepared = 1 - + + def _do_something(self): + pass + def execute(self): self.taskmaster.test_case.failUnless(self.was_prepared, "the task wasn't prepared") @@ -73,10 +76,7 @@ class Task: self.taskmaster.begin_list.append(self.i) self.taskmaster.guard.release() - # do something that will take some random amount of time: - for i in range(random.randrange(0, num_sines, 1)): - x = math.sin(i) - time.sleep(0.01) + self._do_something() self.was_executed = 1 @@ -91,7 +91,7 @@ class Task: "the task wasn't prepared") self.taskmaster.test_case.failUnless(self.was_executed, "the task wasn't really executed") - self.taskmaster.test_case.failUnless(self.__class__ is Task, + self.taskmaster.test_case.failUnless(isinstance(self, Task), "the task wasn't really a Task instance") def failed(self): @@ -103,6 +103,12 @@ class Task: def postprocess(self): self.taskmaster.num_postprocessed = self.taskmaster.num_postprocessed + 1 +class RandomTask(Task): + def _do_something(self): + # do something that will take some random amount of time: + for i in range(random.randrange(0, num_sines, 1)): + x = math.sin(i) + time.sleep(0.01) class ExceptionTask: """A dummy task class for testing purposes.""" @@ -166,7 +172,6 @@ class Taskmaster: # keep track of the order tasks are completed in self.end_list = [] - def next_task(self): if self.stop or self.all_tasks_are_iterated(): return None @@ -203,6 +208,9 @@ class Taskmaster: def exception_set(self): pass +SaveThreadPool = None +ThreadPoolCallList = [] + class ParallelTestCase(unittest.TestCase): def runTest(self): "test parallel jobs" @@ -212,7 +220,7 @@ class ParallelTestCase(unittest.TestCase): except: raise NoThreadsException() - taskmaster = Taskmaster(num_tasks, self, Task) + taskmaster = Taskmaster(num_tasks, self, RandomTask) jobs = SCons.Job.Jobs(num_jobs, taskmaster) jobs.run() @@ -227,11 +235,57 @@ class ParallelTestCase(unittest.TestCase): self.failIf(taskmaster.num_failed, "some task(s) failed to execute") + # Verify that parallel jobs will pull all of the completed tasks + # out of the queue at once, instead of one by one. We do this by + # replacing the default ThreadPool class with one that records the + # order in which tasks are put() and get() to/from the pool, and + # which sleeps a little bit before call get() to let the initial + # tasks complete and get their notifications on the resultsQueue. + + class SleepTask(Task): + def _do_something(self): + time.sleep(0.1) + + global SaveThreadPool + SaveThreadPool = SCons.Job.ThreadPool + + class WaitThreadPool(SaveThreadPool): + def put(self, task): + ThreadPoolCallList.append('put(%s)' % task.i) + return SaveThreadPool.put(self, task) + def get(self): + time.sleep(0.5) + result = SaveThreadPool.get(self) + ThreadPoolCallList.append('get(%s)' % result[0].i) + return result + + SCons.Job.ThreadPool = WaitThreadPool + + try: + taskmaster = Taskmaster(3, self, SleepTask) + jobs = SCons.Job.Jobs(2, taskmaster) + jobs.run() + + # The key here is that we get(1) and get(2) from the + # resultsQueue before we put(3). + expect = [ + 'put(1)', + 'put(2)', + 'get(1)', + 'get(2)', + 'put(3)', + 'get(3)', + ] + assert ThreadPoolCallList == expect, ThreadPoolCallList + + finally: + SCons.Job.ThreadPool = SaveThreadPool + class SerialTestCase(unittest.TestCase): def runTest(self): "test a serial job" - taskmaster = Taskmaster(num_tasks, self, Task) + taskmaster = Taskmaster(num_tasks, self, RandomTask) jobs = SCons.Job.Jobs(1, taskmaster) jobs.run() @@ -254,7 +308,7 @@ class NoParallelTestCase(unittest.TestCase): save_Parallel = SCons.Job.Parallel SCons.Job.Parallel = NoParallel try: - taskmaster = Taskmaster(num_tasks, self, Task) + taskmaster = Taskmaster(num_tasks, self, RandomTask) jobs = SCons.Job.Jobs(2, taskmaster) self.failUnless(jobs.num_jobs == 1, "unexpected number of jobs %d" % jobs.num_jobs)