From 3296946ce0383442f3a8ff8e96005828036c48b9 Mon Sep 17 00:00:00 2001 From: stevenknight Date: Mon, 6 Oct 2003 21:41:15 +0000 Subject: [PATCH] New parallel job execution. (J.T. Conklin) git-svn-id: http://scons.tigris.org/svn/scons/trunk@812 fdb21ef1-2011-0410-befe-b5e4ea1792b1 --- src/CHANGES.txt | 4 + src/engine/SCons/Job.py | 248 ++++++++++++---------------- src/engine/SCons/JobTests.py | 6 + src/engine/SCons/Taskmaster.py | 2 +- src/engine/SCons/TaskmasterTests.py | 33 +++- test/SideEffect.py | 24 +-- 6 files changed, 159 insertions(+), 158 deletions(-) diff --git a/src/CHANGES.txt b/src/CHANGES.txt index 440099f9..5a7f01cf 100644 --- a/src/CHANGES.txt +++ b/src/CHANGES.txt @@ -17,6 +17,10 @@ RELEASE X.XX - XXX - Scan .S, .spp and .SPP files for C preprocessor dependencies. + - Refactor the Job.Parallel() class to use a thread pool without a + condition variable. This improves parallel build performance and + handles keyboard interrupts properly when -j is used. + From Charles Crain: - Add support for a JARCHDIR variable to control changing to a diff --git a/src/engine/SCons/Job.py b/src/engine/SCons/Job.py index 87a1bc23..202f86f7 100644 --- a/src/engine/SCons/Job.py +++ b/src/engine/SCons/Job.py @@ -31,43 +31,29 @@ stop, and wait on jobs. __revision__ = "__FILE__ __REVISION__ __DATE__ __DEVELOPER__" -import time - class Jobs: """An instance of this class initializes N jobs, and provides methods for starting, stopping, and waiting on all N jobs. """ - + def __init__(self, num, taskmaster): """ create 'num' jobs using the given taskmaster. If 'num' is 1 or less, then a serial job will be used, - otherwise 'num' parallel jobs will be used. + otherwise a parallel job with 'num' worker threads will + be used. """ - # Keeps track of keyboard interrupts: - self.keyboard_interrupt = 0 - if num > 1: - self.jobs = [] - for i in range(num): - self.jobs.append(Parallel(taskmaster, self)) + self.job = Parallel(taskmaster, num) else: - self.jobs = [Serial(taskmaster, self)] - - self.running = [] + self.job = Serial(taskmaster) def run(self): - """run the jobs, and wait for them to finish""" - + """run the job""" try: - for job in self.jobs: - job.start() - self.running.append(job) - while self.running: - self.running[-1].wait() - self.running.pop() + self.job.start() except KeyboardInterrupt: # mask any further keyboard interrupts so that scons # can shutdown cleanly: @@ -75,19 +61,8 @@ class Jobs: # child processes can still get the keyboard interrupt) import signal signal.signal(signal.SIGINT, signal.SIG_IGN) + raise - for job in self.running: - job.keyboard_interrupt() - else: - self.keyboard_interrupt = 1 - - # wait on any remaining jobs to finish: - for job in self.running: - job.wait() - - if self.keyboard_interrupt: - raise KeyboardInterrupt - class Serial: """This class is used to execute tasks in series, and is more efficient than Parallel, but is only appropriate for non-parallel builds. Only @@ -96,7 +71,7 @@ class Serial: This class is not thread safe. """ - def __init__(self, taskmaster, jobs): + def __init__(self, taskmaster): """Create a new serial job given a taskmaster. The taskmaster's next_task() method should return the next task @@ -107,16 +82,14 @@ class Serial: is_blocked() method will not be called. """ self.taskmaster = taskmaster - self.jobs = jobs def start(self): - """Start the job. This will begin pulling tasks from the taskmaster and executing them, and return when there are no more tasks. If a task fails to execute (i.e. execute() raises an exception), then the job will stop.""" - while not self.jobs.keyboard_interrupt: + while 1: task = self.taskmaster.next_task() if task is None: @@ -134,33 +107,80 @@ class Serial: else: task.executed() - def wait(self): - """Serial jobs are always finished when start() returns, so there - is nothing to do here""" - pass - - def keyboard_interrupt(self): - self.jobs.keyboard_interrupt = 1 +# Trap import failure so that everything in the Job module but the +# Parallel class (and its dependent classes) will work if the interpreter +# doesn't support threads. +try: + import Queue + import threading +except ImportError: + pass + +class Worker(threading.Thread): + """A worker thread waits on a task to be posted to its request queue, + dequeues the task, executes it, and posts a tuple including the task + and a boolean indicating whether the task executed successfully. """ -# The will hold a condition variable once the first parallel task -# is created. -cv = None + def __init__(self, requestQueue, resultsQueue): + threading.Thread.__init__(self) + self.setDaemon(1) + self.requestQueue = requestQueue + self.resultsQueue = resultsQueue + self.start() + + def run(self): + while 1: + task = self.requestQueue.get() + + try: + task.execute() + except: + ok = False + else: + ok = True + + self.resultsQueue.put((task, ok)) + +class ThreadPool: + """This class is responsible for spawning and managing worker threads.""" + + def __init__(self, num): + """Create the request and reply queues, and 'num' worker threads.""" + # Ideally we wouldn't have to artificially limit the number of + # tasks that can be posted to the request queue. But this can + # result in a large number of pending tasks, which at the time + # of this writing causes the taskmaster's next_task method to + # take a very long time. + self.requestQueue = Queue.Queue(num) + self.resultsQueue = Queue.Queue() + + # Create worker threads + for i in range(num): + worker = Worker(self.requestQueue, self.resultsQueue) + + def put(self, obj): + """Put task into request queue.""" + self.requestQueue.put(obj) + + def get(self, block = 1): + """Remove and return a result tuple from the results queue.""" + return self.resultsQueue.get(block) + + def get_nowait(self): + """Remove and result a result tuple from the results queue + without blocking.""" + return self.get(False) class Parallel: - """This class is used to execute tasks in parallel, and is less - efficient than Serial, but is appropriate for parallel builds. Create - an instance of this class for each job or thread you want. + """This class is used to execute tasks in parallel, and is somewhat + less efficient than Serial, but is appropriate for parallel builds. This class is thread safe. """ - - def __init__(self, taskmaster, jobs): - """Create a new parallel job given a taskmaster, and a Jobs instance. - Multiple jobs will be using the taskmaster in parallel, but all - method calls to taskmaster methods are serialized by the jobs - themselves. + def __init__(self, taskmaster, num): + """Create a new parallel job given a taskmaster. The taskmaster's next_task() method should return the next task that needs to be executed, or None if there are no more tasks. The @@ -177,100 +197,42 @@ class Parallel: of parallel jobs: they can execute multiple tasks simultaneously. """ - global cv - - # import threading here so that everything in the Job module - # but the Parallel class will work if the interpreter doesn't - # support threads - import threading - self.taskmaster = taskmaster - self.jobs = jobs - self.thread = threading.Thread(None, self.__run) - - if cv is None: - cv = threading.Condition() + self.tp = ThreadPool(num) def start(self): - """Start the job. This will spawn a thread that will begin pulling - tasks from the task master and executing them. This method returns - immediately and doesn't wait for the jobs to be executed. - - To wait for the job to finish, call wait(). - """ - self.thread.start() - - def wait(self): - """Wait for the job to finish. A job is finished when there - are no more tasks. - - This method should only be called after start() has been called. - """ - - # Sleeping in a loop like this is lame. Calling - # self.thread.join() would be much nicer, but - # on Linux self.thread.join() doesn't always - # return when a KeyboardInterrupt happens, and when - # it does return, it causes Python to hang on shutdown. - # In other words this is just - # a work-around for some bugs/limitations in the - # self.thread.join() method. - while self.thread.isAlive(): - time.sleep(0.5) + """Start the job. This will begin pulling tasks from the + taskmaster and executing them, and return when there are no + more tasks. If a task fails to execute (i.e. execute() raises + an exception), then the job will stop.""" - def keyboard_interrupt(self): - cv.acquire() - self.jobs.keyboard_interrupt = 1 - cv.notifyAll() - cv.release() - - def __run(self): - """private method that actually executes the tasks""" + while 1: + task = self.taskmaster.next_task() + if task is None: + break - cv.acquire() + # prepare task for execution + try: + task.prepare() + except KeyboardInterrupt: + raise + except: + # Let the failed() callback function arrange for the + # build to stop if that's appropriate. + task.failed() - try: + # dispatch task + self.tp.put(task) while 1: - while (self.taskmaster.is_blocked() and - not self.jobs.keyboard_interrupt): - cv.wait(None) - - if self.jobs.keyboard_interrupt: - break - - task = self.taskmaster.next_task() - - if task == None: - break - try: - task.prepare() - cv.release() - try: - task.execute() - finally: - cv.acquire() - except KeyboardInterrupt: - self.jobs.keyboard_interrupt = 1 - except: - # Let the failed() callback function arrange for - # calling self.jobs.stop() to to stop the build - # if that's appropriate. - task.failed() - else: - task.executed() - - # signal the cv whether the task failed or not, - # or otherwise the other Jobs might - # remain blocked: - if (not self.taskmaster.is_blocked() or - self.jobs.keyboard_interrupt): - cv.notifyAll() - - finally: - cv.release() - - - + task, ok = self.tp.get_nowait() + except Queue.Empty: + if not self.taskmaster.is_blocked(): + break + task, ok = self.tp.get() + if ok: + task.executed() + else: + task.failed() diff --git a/src/engine/SCons/JobTests.py b/src/engine/SCons/JobTests.py index 58b98cfc..2c840285 100644 --- a/src/engine/SCons/JobTests.py +++ b/src/engine/SCons/JobTests.py @@ -28,6 +28,7 @@ import random import math import SCons.Job import sys +import time # a large number num_sines = 10000 @@ -75,6 +76,7 @@ class Task: # do something that will take some random amount of time: for i in range(random.randrange(0, num_sines, 1)): x = math.sin(i) + time.sleep(0.01) self.was_executed = 1 @@ -169,6 +171,10 @@ class Taskmaster: return self.num_iterated == self.num_tasks def is_blocked(self): + if self.stop or self.all_tasks_are_executed(): + return False + if self.all_tasks_are_iterated(): + return True # simulate blocking tasks return self.num_iterated - self.num_executed >= max(num_jobs/2, 2) diff --git a/src/engine/SCons/Taskmaster.py b/src/engine/SCons/Taskmaster.py index 9b13b60d..7760cfe1 100644 --- a/src/engine/SCons/Taskmaster.py +++ b/src/engine/SCons/Taskmaster.py @@ -358,7 +358,7 @@ class Taskmaster: def is_blocked(self): self._find_next_ready_node() - return not self.ready and self.pending + return not self.ready and (self.pending or self.executing) def stop(self): """Stop the current build completely.""" diff --git a/src/engine/SCons/TaskmasterTests.py b/src/engine/SCons/TaskmasterTests.py index 4dbf8b3b..a3941512 100644 --- a/src/engine/SCons/TaskmasterTests.py +++ b/src/engine/SCons/TaskmasterTests.py @@ -258,6 +258,8 @@ class TaskmasterTestCase(unittest.TestCase): assert not tm.is_blocked() t5 = tm.next_task() assert t5.get_target() == n5, t5.get_target() + assert tm.is_blocked() # still executing t5 + t5.executed() assert not tm.is_blocked() assert tm.next_task() == None @@ -355,9 +357,10 @@ class TaskmasterTestCase(unittest.TestCase): t.executed() t = tm.next_task() assert t.get_target() == n5 - assert not tm.is_blocked() + assert tm.is_blocked() # still executing n5 assert not tm.next_task() t.executed() + assert not tm.is_blocked() n1 = Node("n1") n2 = Node("n2") @@ -464,10 +467,32 @@ class TaskmasterTestCase(unittest.TestCase): assert not tm.is_blocked() class MyTM(SCons.Taskmaster.Taskmaster): - def is_blocked(self): - return 1 + def _find_next_ready_node(self): + self.ready = 1 + tm = MyTM() + assert not tm.is_blocked() + + class MyTM(SCons.Taskmaster.Taskmaster): + def _find_next_ready_node(self): + self.ready = None + self.pending = [] + self.executing = [] + tm = MyTM() + assert not tm.is_blocked() + + class MyTM(SCons.Taskmaster.Taskmaster): + def _find_next_ready_node(self): + self.ready = None + self.pending = [1] + tm = MyTM() + assert tm.is_blocked() + + class MyTM(SCons.Taskmaster.Taskmaster): + def _find_next_ready_node(self): + self.ready = None + self.executing = [1] tm = MyTM() - assert tm.is_blocked() == 1 + assert tm.is_blocked() def test_stop(self): """Test the stop() method diff --git a/test/SideEffect.py b/test/SideEffect.py index 33a553ab..a4803269 100644 --- a/test/SideEffect.py +++ b/test/SideEffect.py @@ -25,6 +25,7 @@ __revision__ = "__FILE__ __REVISION__ __DATE__ __DEVELOPER__" import os.path +import string import TestSCons @@ -111,16 +112,19 @@ test.fail_test(os.path.exists(test.workpath('bar.out'))) test.fail_test(os.path.exists(test.workpath('blat.out'))) test.fail_test(os.path.exists(test.workpath('log.txt'))) -test.run(arguments = "-j 4 .", stdout=test.wrap_stdout("""\ -build("bar.out", "bar.in") -build("blat.out", "blat.in") -build("foo.out", "foo.in") -build("log.out", "log.txt") -build("%s", "baz.in") -build("%s", "%s") -""" % (os.path.join('subdir', 'baz.out'), - os.path.join('subdir', 'out.out'), - os.path.join('subdir', 'out.txt')))) +build_lines = [ + 'build("bar.out", "bar.in")', + 'build("blat.out", "blat.in")', + 'build("foo.out", "foo.in")', + 'build("log.out", "log.txt")', + 'build("%s", "baz.in")' % os.path.join('subdir', 'baz.out'), + 'build("%s", "%s")' % (os.path.join('subdir', 'out.out'), + os.path.join('subdir', 'out.txt')), +] +test.run(arguments = "-j 4 .") +output = test.stdout() +for line in build_lines: + test.fail_test(string.find(output, line) == -1) expect = """\ bar.in -> bar.out -- 2.26.2