Reap as many thread responses as possible at once. (J.T. Conklin)
authorstevenknight <stevenknight@fdb21ef1-2011-0410-befe-b5e4ea1792b1>
Wed, 4 May 2005 04:31:58 +0000 (04:31 +0000)
committerstevenknight <stevenknight@fdb21ef1-2011-0410-befe-b5e4ea1792b1>
Wed, 4 May 2005 04:31:58 +0000 (04:31 +0000)
git-svn-id: http://scons.tigris.org/svn/scons/trunk@1286 fdb21ef1-2011-0410-befe-b5e4ea1792b1

src/engine/SCons/Job.py
src/engine/SCons/JobTests.py

index 2d4dbf8f4c10fabcc14c1622222c5382deecffd2..60eb86150fe9a187c22c7824f9982423c5568f56 100644 (file)
@@ -222,21 +222,8 @@ else:
             jobs = 0
             
             while 1:
-
-                # There's a concern here that the while-loop test below
-                # might delay reporting status back about failed build
-                # tasks until the entire build is done if tasks execute
-                # fast enough, or self.maxjobs is big enough.  It looks
-                # like that's enough of a corner case that we'll wait to
-                # see if it's an issue in practice.  If so, one possible
-                # fix might be:
-                #
-                #       while jobs < self.maxjobs and \
-                #             self.tp.resultsQueue.empty():
-                #
-                # but that's somewhat unattractive because the
-                # resultsQueue.empty() check might introduce some
-                # significant overhead involving mutex locking.
+                # Start up as many available tasks as we're
+                # allowed to.
                 while jobs < self.maxjobs:
                     task = self.taskmaster.next_task()
                     if task is None:
@@ -261,12 +248,18 @@ else:
 
                 if not task and not jobs: break
 
-                task, ok = self.tp.get()
+                # Let any/all completed tasks finish up before we go
+                # back and put the next batch of tasks on the queue.
+                while 1:
+                    task, ok = self.tp.get()
 
-                jobs = jobs - 1
-                if ok:
-                    task.executed()
-                else:
-                    task.failed()
+                    jobs = jobs - 1
+                    if ok:
+                        task.executed()
+                    else:
+                        task.failed()
 
-                task.postprocess()
+                    task.postprocess()
+
+                    if self.tp.resultsQueue.empty():
+                        break
index 3326f33c8c1bc79d466890667269f2354dc43909..2ace9c31c12e83f5b71d8b4ae3b4875c4975710a 100644 (file)
@@ -64,7 +64,10 @@ class Task:
         
     def prepare(self):
         self.was_prepared = 1
-        
+
+    def _do_something(self):
+        pass
+
     def execute(self):
         self.taskmaster.test_case.failUnless(self.was_prepared,
                                   "the task wasn't prepared")
@@ -73,10 +76,7 @@ class Task:
         self.taskmaster.begin_list.append(self.i)
         self.taskmaster.guard.release()
 
-        # do something that will take some random amount of time:
-        for i in range(random.randrange(0, num_sines, 1)):
-            x = math.sin(i)
-        time.sleep(0.01)
+        self._do_something()
 
         self.was_executed = 1
 
@@ -91,7 +91,7 @@ class Task:
                                   "the task wasn't prepared")
         self.taskmaster.test_case.failUnless(self.was_executed,
                                   "the task wasn't really executed")
-        self.taskmaster.test_case.failUnless(self.__class__ is Task,
+        self.taskmaster.test_case.failUnless(isinstance(self, Task),
                                   "the task wasn't really a Task instance")
 
     def failed(self):
@@ -103,6 +103,12 @@ class Task:
     def postprocess(self):
         self.taskmaster.num_postprocessed = self.taskmaster.num_postprocessed + 1
 
+class RandomTask(Task):
+    def _do_something(self):
+        # do something that will take some random amount of time:
+        for i in range(random.randrange(0, num_sines, 1)):
+            x = math.sin(i)
+        time.sleep(0.01)
 
 class ExceptionTask:
     """A dummy task class for testing purposes."""
@@ -166,7 +172,6 @@ class Taskmaster:
         # keep track of the order tasks are completed in
         self.end_list = []
 
-
     def next_task(self):
         if self.stop or self.all_tasks_are_iterated():
             return None
@@ -203,6 +208,9 @@ class Taskmaster:
     def exception_set(self):
         pass
 
+SaveThreadPool = None
+ThreadPoolCallList = []
+
 class ParallelTestCase(unittest.TestCase):
     def runTest(self):
         "test parallel jobs"
@@ -212,7 +220,7 @@ class ParallelTestCase(unittest.TestCase):
         except:
             raise NoThreadsException()
 
-        taskmaster = Taskmaster(num_tasks, self, Task)
+        taskmaster = Taskmaster(num_tasks, self, RandomTask)
         jobs = SCons.Job.Jobs(num_jobs, taskmaster)
         jobs.run()
 
@@ -227,11 +235,57 @@ class ParallelTestCase(unittest.TestCase):
         self.failIf(taskmaster.num_failed,
                     "some task(s) failed to execute") 
 
+        # Verify that parallel jobs will pull all of the completed tasks
+        # out of the queue at once, instead of one by one.  We do this by
+        # replacing the default ThreadPool class with one that records the
+        # order in which tasks are put() and get() to/from the pool, and
+        # which sleeps a little bit before call get() to let the initial
+        # tasks complete and get their notifications on the resultsQueue.
+
+        class SleepTask(Task):
+            def _do_something(self):
+                time.sleep(0.1)
+
+        global SaveThreadPool
+        SaveThreadPool = SCons.Job.ThreadPool
+
+        class WaitThreadPool(SaveThreadPool):
+            def put(self, task):
+                ThreadPoolCallList.append('put(%s)' % task.i)
+                return SaveThreadPool.put(self, task)
+            def get(self):
+                time.sleep(0.5)
+                result = SaveThreadPool.get(self)
+                ThreadPoolCallList.append('get(%s)' % result[0].i)
+                return result
+
+        SCons.Job.ThreadPool = WaitThreadPool
+
+        try:
+            taskmaster = Taskmaster(3, self, SleepTask)
+            jobs = SCons.Job.Jobs(2, taskmaster)
+            jobs.run()
+
+            # The key here is that we get(1) and get(2) from the
+            # resultsQueue before we put(3).
+            expect = [
+                'put(1)',
+                'put(2)',
+                'get(1)',
+                'get(2)',
+                'put(3)',
+                'get(3)',
+            ]
+            assert ThreadPoolCallList == expect, ThreadPoolCallList
+
+        finally:
+            SCons.Job.ThreadPool = SaveThreadPool
+
 class SerialTestCase(unittest.TestCase):
     def runTest(self):
         "test a serial job"
 
-        taskmaster = Taskmaster(num_tasks, self, Task)
+        taskmaster = Taskmaster(num_tasks, self, RandomTask)
         jobs = SCons.Job.Jobs(1, taskmaster)
         jobs.run()
 
@@ -254,7 +308,7 @@ class NoParallelTestCase(unittest.TestCase):
         save_Parallel = SCons.Job.Parallel
         SCons.Job.Parallel = NoParallel
         try:
-            taskmaster = Taskmaster(num_tasks, self, Task)
+            taskmaster = Taskmaster(num_tasks, self, RandomTask)
             jobs = SCons.Job.Jobs(2, taskmaster)
             self.failUnless(jobs.num_jobs == 1, 
                             "unexpected number of jobs %d" % jobs.num_jobs)