New parallel job execution. (J.T. Conklin)
authorstevenknight <stevenknight@fdb21ef1-2011-0410-befe-b5e4ea1792b1>
Mon, 6 Oct 2003 21:41:15 +0000 (21:41 +0000)
committerstevenknight <stevenknight@fdb21ef1-2011-0410-befe-b5e4ea1792b1>
Mon, 6 Oct 2003 21:41:15 +0000 (21:41 +0000)
git-svn-id: http://scons.tigris.org/svn/scons/trunk@812 fdb21ef1-2011-0410-befe-b5e4ea1792b1

src/CHANGES.txt
src/engine/SCons/Job.py
src/engine/SCons/JobTests.py
src/engine/SCons/Taskmaster.py
src/engine/SCons/TaskmasterTests.py
test/SideEffect.py

index 440099f9bff4bc9b3984f04d7a25a2bf9adc0be2..5a7f01cf9e56b10f991fc5239d75bc17b3fe47c3 100644 (file)
@@ -17,6 +17,10 @@ RELEASE X.XX - XXX
 
   - Scan .S, .spp and .SPP files for C preprocessor dependencies.
 
+  - Refactor the Job.Parallel() class to use a thread pool without a
+    condition variable.  This improves parallel build performance and
+    handles keyboard interrupts properly when -j is used.
+
   From Charles Crain:
 
   - Add support for a JARCHDIR variable to control changing to a
index 87a1bc233521f3d224d12a7ff1c649d38f493c00..202f86f7edfa980d7154f319db75e2965b95d57d 100644 (file)
@@ -31,43 +31,29 @@ stop, and wait on jobs.
 
 __revision__ = "__FILE__ __REVISION__ __DATE__ __DEVELOPER__"
 
-import time
-
 class Jobs:
     """An instance of this class initializes N jobs, and provides
     methods for starting, stopping, and waiting on all N jobs.
     """
-    
+
     def __init__(self, num, taskmaster):
         """
         create 'num' jobs using the given taskmaster.
 
         If 'num' is 1 or less, then a serial job will be used,
-        otherwise 'num' parallel jobs will be used.
+        otherwise a parallel job with 'num' worker threads will
+        be used.
         """
 
-        # Keeps track of keyboard interrupts:
-        self.keyboard_interrupt = 0
-
         if num > 1:
-            self.jobs = []
-            for i in range(num):
-                self.jobs.append(Parallel(taskmaster, self))
+            self.job = Parallel(taskmaster, num)
         else:
-            self.jobs = [Serial(taskmaster, self)]
-   
-        self.running = []
+            self.job = Serial(taskmaster)
 
     def run(self):
-        """run the jobs, and wait for them to finish"""
-
+        """run the job"""
         try:
-            for job in self.jobs:
-                job.start()
-                self.running.append(job)
-            while self.running:
-                self.running[-1].wait()
-                self.running.pop()
+            self.job.start()
         except KeyboardInterrupt:
             # mask any further keyboard interrupts so that scons
             # can shutdown cleanly:
@@ -75,19 +61,8 @@ class Jobs:
             #  child processes can still get the keyboard interrupt)
             import signal
             signal.signal(signal.SIGINT, signal.SIG_IGN)
+            raise
 
-            for job in self.running:
-                job.keyboard_interrupt()
-            else:
-                self.keyboard_interrupt = 1
-
-            # wait on any remaining jobs to finish:
-            for job in self.running:
-                job.wait()
-
-        if self.keyboard_interrupt:
-            raise KeyboardInterrupt
-    
 class Serial:
     """This class is used to execute tasks in series, and is more efficient
     than Parallel, but is only appropriate for non-parallel builds. Only
@@ -96,7 +71,7 @@ class Serial:
     This class is not thread safe.
     """
 
-    def __init__(self, taskmaster, jobs):
+    def __init__(self, taskmaster):
         """Create a new serial job given a taskmaster. 
 
         The taskmaster's next_task() method should return the next task
@@ -107,16 +82,14 @@ class Serial:
         is_blocked() method will not be called.  """
         
         self.taskmaster = taskmaster
-        self.jobs = jobs
 
     def start(self):
-        
         """Start the job. This will begin pulling tasks from the taskmaster
         and executing them, and return when there are no more tasks. If a task
         fails to execute (i.e. execute() raises an exception), then the job will
         stop."""
         
-        while not self.jobs.keyboard_interrupt:
+        while 1:
             task = self.taskmaster.next_task()
 
             if task is None:
@@ -134,33 +107,80 @@ class Serial:
             else:
                 task.executed()
 
-    def wait(self):
-        """Serial jobs are always finished when start() returns, so there
-        is nothing to do here"""
-        pass
-    
-    def keyboard_interrupt(self):
-        self.jobs.keyboard_interrupt = 1
 
+# Trap import failure so that everything in the Job module but the
+# Parallel class (and its dependent classes) will work if the interpreter
+# doesn't support threads.
+try:
+    import Queue
+    import threading
+except ImportError:
+    pass
+
+class Worker(threading.Thread):
+    """A worker thread waits on a task to be posted to its request queue,
+    dequeues the task, executes it, and posts a tuple including the task
+    and a boolean indicating whether the task executed successfully. """
 
-# The will hold a condition variable once the first parallel task
-# is created.
-cv = None
+    def __init__(self, requestQueue, resultsQueue):
+        threading.Thread.__init__(self)
+        self.setDaemon(1)
+        self.requestQueue = requestQueue
+        self.resultsQueue = resultsQueue
+        self.start()
+
+    def run(self):
+        while 1:
+            task = self.requestQueue.get()
+
+            try:
+                task.execute()
+            except:
+                ok = False
+            else:
+                ok = True
+
+            self.resultsQueue.put((task, ok))
+
+class ThreadPool:
+    """This class is responsible for spawning and managing worker threads."""
+
+    def __init__(self, num):
+        """Create the request and reply queues, and 'num' worker threads."""
+        # Ideally we wouldn't have to artificially limit the number of
+        # tasks that can be posted to the request queue.  But this can
+        # result in a large number of pending tasks, which at the time
+        # of this writing causes the taskmaster's next_task method to
+        # take a very long time.
+        self.requestQueue = Queue.Queue(num)
+        self.resultsQueue = Queue.Queue()
+
+        # Create worker threads
+        for i in range(num):
+            worker = Worker(self.requestQueue, self.resultsQueue)
+
+    def put(self, obj):
+        """Put task into request queue."""
+        self.requestQueue.put(obj)
+
+    def get(self, block = 1):
+        """Remove and return a result tuple from the results queue."""
+        return self.resultsQueue.get(block)
+        
+    def get_nowait(self):
+        """Remove and result a result tuple from the results queue 
+        without blocking."""
+        return self.get(False)
 
 class Parallel:
-    """This class is used to execute tasks in parallel, and is less
-    efficient than Serial, but is appropriate for parallel builds. Create
-    an instance of this class for each job or thread you want.
+    """This class is used to execute tasks in parallel, and is somewhat 
+    less efficient than Serial, but is appropriate for parallel builds.
 
     This class is thread safe.
     """
 
-
-    def __init__(self, taskmaster, jobs):
-        """Create a new parallel job given a taskmaster, and a Jobs instance.
-        Multiple jobs will be using the taskmaster in parallel, but all
-        method calls to taskmaster methods are serialized by the jobs
-        themselves.
+    def __init__(self, taskmaster, num):
+        """Create a new parallel job given a taskmaster.
 
         The taskmaster's next_task() method should return the next task
         that needs to be executed, or None if there are no more tasks. The
@@ -177,100 +197,42 @@ class Parallel:
         of parallel jobs: they can execute multiple tasks
         simultaneously. """
 
-        global cv
-        
-        # import threading here so that everything in the Job module
-        # but the Parallel class will work if the interpreter doesn't
-        # support threads
-        import threading
-        
         self.taskmaster = taskmaster
-        self.jobs = jobs
-        self.thread = threading.Thread(None, self.__run)
-
-        if cv is None:
-            cv = threading.Condition()
+        self.tp = ThreadPool(num)
 
     def start(self):
-        """Start the job. This will spawn a thread that will begin pulling
-        tasks from the task master and executing them. This method returns
-        immediately and doesn't wait for the jobs to be executed.
-
-        To wait for the job to finish, call wait().
-        """
-        self.thread.start()
-
-    def wait(self):
-        """Wait for the job to finish. A job is finished when there
-        are no more tasks.
-
-        This method should only be called after start() has been called.
-        """
-
-        # Sleeping in a loop like this is lame. Calling 
-        # self.thread.join() would be much nicer, but
-        # on Linux self.thread.join() doesn't always
-        # return when a KeyboardInterrupt happens, and when
-        # it does return, it causes Python to hang on shutdown.
-        # In other words this is just
-        # a work-around for some bugs/limitations in the
-        # self.thread.join() method.
-        while self.thread.isAlive():
-            time.sleep(0.5)
+        """Start the job. This will begin pulling tasks from the
+        taskmaster and executing them, and return when there are no
+        more tasks. If a task fails to execute (i.e. execute() raises
+        an exception), then the job will stop."""
 
-    def keyboard_interrupt(self):
-        cv.acquire()
-        self.jobs.keyboard_interrupt = 1
-        cv.notifyAll()
-        cv.release()
-
-    def __run(self):
-        """private method that actually executes the tasks"""
+        while 1:
+            task = self.taskmaster.next_task()
+            if task is None:
+                break
 
-        cv.acquire()
+            # prepare task for execution
+            try:
+                task.prepare()
+            except KeyboardInterrupt:
+                raise
+            except:
+                # Let the failed() callback function arrange for the
+                # build to stop if that's appropriate.
+                task.failed()
 
-        try:
+            # dispatch task
+            self.tp.put(task)
 
             while 1:
-                while (self.taskmaster.is_blocked() and 
-                       not self.jobs.keyboard_interrupt):
-                    cv.wait(None)
-
-                if self.jobs.keyboard_interrupt:
-                    break
-                    
-                task = self.taskmaster.next_task()
-
-                if task == None:
-                    break
-
                 try:
-                    task.prepare()
-                    cv.release()
-                    try:
-                        task.execute()
-                    finally:
-                        cv.acquire()
-                except KeyboardInterrupt:
-                    self.jobs.keyboard_interrupt = 1
-                except:
-                    # Let the failed() callback function arrange for
-                    # calling self.jobs.stop() to to stop the build
-                    # if that's appropriate.
-                    task.failed()
-                else:
-                    task.executed()
-
-                # signal the cv whether the task failed or not,
-                # or otherwise the other Jobs might
-                # remain blocked:
-                if (not self.taskmaster.is_blocked() or
-                    self.jobs.keyboard_interrupt):
-                    cv.notifyAll()
-                    
-        finally:
-            cv.release()
-
-
-
+                    task, ok = self.tp.get_nowait()
+                except Queue.Empty:
+                    if not self.taskmaster.is_blocked():
+                        break
+                    task, ok = self.tp.get()
 
+                if ok:
+                    task.executed()
+                else:
+                    task.failed()
index 58b98cfc63f00edc7589ae47a1bb1909469b4d75..2c8402852429a2717766fe938732fe5241734071 100644 (file)
@@ -28,6 +28,7 @@ import random
 import math
 import SCons.Job
 import sys
+import time
 
 # a large number
 num_sines = 10000
@@ -75,6 +76,7 @@ class Task:
         # do something that will take some random amount of time:
         for i in range(random.randrange(0, num_sines, 1)):
             x = math.sin(i)
+        time.sleep(0.01)
 
         self.was_executed = 1
 
@@ -169,6 +171,10 @@ class Taskmaster:
         return self.num_iterated == self.num_tasks
 
     def is_blocked(self):
+        if self.stop or self.all_tasks_are_executed():
+            return False
+        if self.all_tasks_are_iterated():
+            return True
         # simulate blocking tasks
         return self.num_iterated - self.num_executed >= max(num_jobs/2, 2)
 
index 9b13b60d624d25c87d6bbb2311d2bf0b121444ca..7760cfe1e41791672e6c2c33c6f04701c224e8f7 100644 (file)
@@ -358,7 +358,7 @@ class Taskmaster:
     def is_blocked(self):
         self._find_next_ready_node()
 
-        return not self.ready and self.pending
+        return not self.ready and (self.pending or self.executing)
 
     def stop(self):
         """Stop the current build completely."""
index 4dbf8b3b5d427a87cb73b8b5784f527ae5da07a0..a39415129e4a6aaeee06aa691b96433ed192b69a 100644 (file)
@@ -258,6 +258,8 @@ class TaskmasterTestCase(unittest.TestCase):
         assert not tm.is_blocked()
         t5 = tm.next_task()
         assert t5.get_target() == n5, t5.get_target()
+        assert tm.is_blocked()  # still executing t5
+        t5.executed()
         assert not tm.is_blocked()
 
         assert tm.next_task() == None
@@ -355,9 +357,10 @@ class TaskmasterTestCase(unittest.TestCase):
         t.executed()
         t = tm.next_task()
         assert t.get_target() == n5
-        assert not tm.is_blocked()
+        assert tm.is_blocked()  # still executing n5
         assert not tm.next_task()
         t.executed()
+        assert not tm.is_blocked()
 
         n1 = Node("n1")
         n2 = Node("n2")
@@ -464,10 +467,32 @@ class TaskmasterTestCase(unittest.TestCase):
         assert not tm.is_blocked()
 
         class MyTM(SCons.Taskmaster.Taskmaster):
-            def is_blocked(self):
-                return 1
+            def _find_next_ready_node(self):
+                self.ready = 1
+        tm = MyTM()
+        assert not tm.is_blocked()
+
+        class MyTM(SCons.Taskmaster.Taskmaster):
+            def _find_next_ready_node(self):
+                self.ready = None
+                self.pending = []
+                self.executing = []
+        tm = MyTM()
+        assert not tm.is_blocked()
+
+        class MyTM(SCons.Taskmaster.Taskmaster):
+            def _find_next_ready_node(self):
+                self.ready = None
+                self.pending = [1]
+        tm = MyTM()
+        assert tm.is_blocked()
+
+        class MyTM(SCons.Taskmaster.Taskmaster):
+            def _find_next_ready_node(self):
+                self.ready = None
+                self.executing = [1]
         tm = MyTM()
-        assert tm.is_blocked() == 1
+        assert tm.is_blocked()
 
     def test_stop(self):
         """Test the stop() method
index 33a553abcb5536a7a1097a9efa394d26d9aae2d8..a48032695be85dfd69173afcb596066969f5e16d 100644 (file)
@@ -25,6 +25,7 @@
 __revision__ = "__FILE__ __REVISION__ __DATE__ __DEVELOPER__"
 
 import os.path
+import string
 
 import TestSCons
 
@@ -111,16 +112,19 @@ test.fail_test(os.path.exists(test.workpath('bar.out')))
 test.fail_test(os.path.exists(test.workpath('blat.out')))
 test.fail_test(os.path.exists(test.workpath('log.txt')))
 
-test.run(arguments = "-j 4 .", stdout=test.wrap_stdout("""\
-build("bar.out", "bar.in")
-build("blat.out", "blat.in")
-build("foo.out", "foo.in")
-build("log.out", "log.txt")
-build("%s", "baz.in")
-build("%s", "%s")
-""" % (os.path.join('subdir', 'baz.out'),
-       os.path.join('subdir', 'out.out'),
-       os.path.join('subdir', 'out.txt'))))
+build_lines =  [
+    'build("bar.out", "bar.in")', 
+    'build("blat.out", "blat.in")', 
+    'build("foo.out", "foo.in")', 
+    'build("log.out", "log.txt")', 
+    'build("%s", "baz.in")' % os.path.join('subdir', 'baz.out'),
+    'build("%s", "%s")' % (os.path.join('subdir', 'out.out'),
+                           os.path.join('subdir', 'out.txt')),
+]
+test.run(arguments = "-j 4 .")
+output = test.stdout()
+for line in build_lines:
+    test.fail_test(string.find(output, line) == -1)
 
 expect = """\
 bar.in -> bar.out