__revision__ = "__FILE__ __REVISION__ __DATE__ __DEVELOPER__"
-import time
-
class Jobs:
"""An instance of this class initializes N jobs, and provides
methods for starting, stopping, and waiting on all N jobs.
"""
-
+
def __init__(self, num, taskmaster):
"""
create 'num' jobs using the given taskmaster.
If 'num' is 1 or less, then a serial job will be used,
- otherwise 'num' parallel jobs will be used.
+ otherwise a parallel job with 'num' worker threads will
+ be used.
"""
- # Keeps track of keyboard interrupts:
- self.keyboard_interrupt = 0
-
if num > 1:
- self.jobs = []
- for i in range(num):
- self.jobs.append(Parallel(taskmaster, self))
+ self.job = Parallel(taskmaster, num)
else:
- self.jobs = [Serial(taskmaster, self)]
-
- self.running = []
+ self.job = Serial(taskmaster)
def run(self):
- """run the jobs, and wait for them to finish"""
-
+ """run the job"""
try:
- for job in self.jobs:
- job.start()
- self.running.append(job)
- while self.running:
- self.running[-1].wait()
- self.running.pop()
+ self.job.start()
except KeyboardInterrupt:
# mask any further keyboard interrupts so that scons
# can shutdown cleanly:
# child processes can still get the keyboard interrupt)
import signal
signal.signal(signal.SIGINT, signal.SIG_IGN)
+ raise
- for job in self.running:
- job.keyboard_interrupt()
- else:
- self.keyboard_interrupt = 1
-
- # wait on any remaining jobs to finish:
- for job in self.running:
- job.wait()
-
- if self.keyboard_interrupt:
- raise KeyboardInterrupt
-
class Serial:
"""This class is used to execute tasks in series, and is more efficient
than Parallel, but is only appropriate for non-parallel builds. Only
This class is not thread safe.
"""
- def __init__(self, taskmaster, jobs):
+ def __init__(self, taskmaster):
"""Create a new serial job given a taskmaster.
The taskmaster's next_task() method should return the next task
is_blocked() method will not be called. """
self.taskmaster = taskmaster
- self.jobs = jobs
def start(self):
-
"""Start the job. This will begin pulling tasks from the taskmaster
and executing them, and return when there are no more tasks. If a task
fails to execute (i.e. execute() raises an exception), then the job will
stop."""
- while not self.jobs.keyboard_interrupt:
+ while 1:
task = self.taskmaster.next_task()
if task is None:
else:
task.executed()
- def wait(self):
- """Serial jobs are always finished when start() returns, so there
- is nothing to do here"""
- pass
-
- def keyboard_interrupt(self):
- self.jobs.keyboard_interrupt = 1
+# Trap import failure so that everything in the Job module but the
+# Parallel class (and its dependent classes) will work if the interpreter
+# doesn't support threads.
+try:
+ import Queue
+ import threading
+except ImportError:
+ pass
+
+class Worker(threading.Thread):
+ """A worker thread waits on a task to be posted to its request queue,
+ dequeues the task, executes it, and posts a tuple including the task
+ and a boolean indicating whether the task executed successfully. """
-# The will hold a condition variable once the first parallel task
-# is created.
-cv = None
+ def __init__(self, requestQueue, resultsQueue):
+ threading.Thread.__init__(self)
+ self.setDaemon(1)
+ self.requestQueue = requestQueue
+ self.resultsQueue = resultsQueue
+ self.start()
+
+ def run(self):
+ while 1:
+ task = self.requestQueue.get()
+
+ try:
+ task.execute()
+ except:
+ ok = False
+ else:
+ ok = True
+
+ self.resultsQueue.put((task, ok))
+
+class ThreadPool:
+ """This class is responsible for spawning and managing worker threads."""
+
+ def __init__(self, num):
+ """Create the request and reply queues, and 'num' worker threads."""
+ # Ideally we wouldn't have to artificially limit the number of
+ # tasks that can be posted to the request queue. But this can
+ # result in a large number of pending tasks, which at the time
+ # of this writing causes the taskmaster's next_task method to
+ # take a very long time.
+ self.requestQueue = Queue.Queue(num)
+ self.resultsQueue = Queue.Queue()
+
+ # Create worker threads
+ for i in range(num):
+ worker = Worker(self.requestQueue, self.resultsQueue)
+
+ def put(self, obj):
+ """Put task into request queue."""
+ self.requestQueue.put(obj)
+
+ def get(self, block = 1):
+ """Remove and return a result tuple from the results queue."""
+ return self.resultsQueue.get(block)
+
+ def get_nowait(self):
+ """Remove and result a result tuple from the results queue
+ without blocking."""
+ return self.get(False)
class Parallel:
- """This class is used to execute tasks in parallel, and is less
- efficient than Serial, but is appropriate for parallel builds. Create
- an instance of this class for each job or thread you want.
+ """This class is used to execute tasks in parallel, and is somewhat
+ less efficient than Serial, but is appropriate for parallel builds.
This class is thread safe.
"""
-
- def __init__(self, taskmaster, jobs):
- """Create a new parallel job given a taskmaster, and a Jobs instance.
- Multiple jobs will be using the taskmaster in parallel, but all
- method calls to taskmaster methods are serialized by the jobs
- themselves.
+ def __init__(self, taskmaster, num):
+ """Create a new parallel job given a taskmaster.
The taskmaster's next_task() method should return the next task
that needs to be executed, or None if there are no more tasks. The
of parallel jobs: they can execute multiple tasks
simultaneously. """
- global cv
-
- # import threading here so that everything in the Job module
- # but the Parallel class will work if the interpreter doesn't
- # support threads
- import threading
-
self.taskmaster = taskmaster
- self.jobs = jobs
- self.thread = threading.Thread(None, self.__run)
-
- if cv is None:
- cv = threading.Condition()
+ self.tp = ThreadPool(num)
def start(self):
- """Start the job. This will spawn a thread that will begin pulling
- tasks from the task master and executing them. This method returns
- immediately and doesn't wait for the jobs to be executed.
-
- To wait for the job to finish, call wait().
- """
- self.thread.start()
-
- def wait(self):
- """Wait for the job to finish. A job is finished when there
- are no more tasks.
-
- This method should only be called after start() has been called.
- """
-
- # Sleeping in a loop like this is lame. Calling
- # self.thread.join() would be much nicer, but
- # on Linux self.thread.join() doesn't always
- # return when a KeyboardInterrupt happens, and when
- # it does return, it causes Python to hang on shutdown.
- # In other words this is just
- # a work-around for some bugs/limitations in the
- # self.thread.join() method.
- while self.thread.isAlive():
- time.sleep(0.5)
+ """Start the job. This will begin pulling tasks from the
+ taskmaster and executing them, and return when there are no
+ more tasks. If a task fails to execute (i.e. execute() raises
+ an exception), then the job will stop."""
- def keyboard_interrupt(self):
- cv.acquire()
- self.jobs.keyboard_interrupt = 1
- cv.notifyAll()
- cv.release()
-
- def __run(self):
- """private method that actually executes the tasks"""
+ while 1:
+ task = self.taskmaster.next_task()
+ if task is None:
+ break
- cv.acquire()
+ # prepare task for execution
+ try:
+ task.prepare()
+ except KeyboardInterrupt:
+ raise
+ except:
+ # Let the failed() callback function arrange for the
+ # build to stop if that's appropriate.
+ task.failed()
- try:
+ # dispatch task
+ self.tp.put(task)
while 1:
- while (self.taskmaster.is_blocked() and
- not self.jobs.keyboard_interrupt):
- cv.wait(None)
-
- if self.jobs.keyboard_interrupt:
- break
-
- task = self.taskmaster.next_task()
-
- if task == None:
- break
-
try:
- task.prepare()
- cv.release()
- try:
- task.execute()
- finally:
- cv.acquire()
- except KeyboardInterrupt:
- self.jobs.keyboard_interrupt = 1
- except:
- # Let the failed() callback function arrange for
- # calling self.jobs.stop() to to stop the build
- # if that's appropriate.
- task.failed()
- else:
- task.executed()
-
- # signal the cv whether the task failed or not,
- # or otherwise the other Jobs might
- # remain blocked:
- if (not self.taskmaster.is_blocked() or
- self.jobs.keyboard_interrupt):
- cv.notifyAll()
-
- finally:
- cv.release()
-
-
-
+ task, ok = self.tp.get_nowait()
+ except Queue.Empty:
+ if not self.taskmaster.is_blocked():
+ break
+ task, ok = self.tp.get()
+ if ok:
+ task.executed()
+ else:
+ task.failed()