If 'num' is 1 or less, then a serial job will be used,
otherwise a parallel job with 'num' worker threads will
be used.
+
+ The 'num_jobs' attribute will be set to the actual number of jobs
+ allocated. If more than one job is requested but the Parallel
+ class can't do it, it gets reset to 1. Wrapping interfaces that
+ care should check the value of 'num_jobs' after initialization.
"""
+ self.job = None
if num > 1:
- self.job = Parallel(taskmaster, num)
- else:
+ try:
+ self.job = Parallel(taskmaster, num)
+ self.num_jobs = num
+ except NameError:
+ pass
+ if self.job is None:
self.job = Serial(taskmaster)
+ self.num_jobs = 1
def run(self):
"""run the job"""
import threading
except ImportError:
pass
+else:
+ class Worker(threading.Thread):
+ """A worker thread waits on a task to be posted to its request queue,
+ dequeues the task, executes it, and posts a tuple including the task
+ and a boolean indicating whether the task executed successfully. """
+
+ def __init__(self, requestQueue, resultsQueue):
+ threading.Thread.__init__(self)
+ self.setDaemon(1)
+ self.requestQueue = requestQueue
+ self.resultsQueue = resultsQueue
+ self.start()
+
+ def run(self):
+ while 1:
+ task = self.requestQueue.get()
-class Worker(threading.Thread):
- """A worker thread waits on a task to be posted to its request queue,
- dequeues the task, executes it, and posts a tuple including the task
- and a boolean indicating whether the task executed successfully. """
-
- def __init__(self, requestQueue, resultsQueue):
- threading.Thread.__init__(self)
- self.setDaemon(1)
- self.requestQueue = requestQueue
- self.resultsQueue = resultsQueue
- self.start()
-
- def run(self):
- while 1:
- task = self.requestQueue.get()
-
- try:
- task.execute()
- except KeyboardInterrupt:
- # be explicit here for test/interrupts.py
- ok = False
- except:
- ok = 0
- else:
- ok = 1
-
- self.resultsQueue.put((task, ok))
-
-class ThreadPool:
- """This class is responsible for spawning and managing worker threads."""
+ try:
+ task.execute()
+ except KeyboardInterrupt:
+ # be explicit here for test/interrupts.py
+ ok = False
+ except:
+ ok = 0
+ else:
+ ok = 1
- def __init__(self, num):
- """Create the request and reply queues, and 'num' worker threads."""
- self.requestQueue = Queue.Queue(0)
- self.resultsQueue = Queue.Queue(0)
+ self.resultsQueue.put((task, ok))
- # Create worker threads
- for i in range(num):
- worker = Worker(self.requestQueue, self.resultsQueue)
+ class ThreadPool:
+ """This class is responsible for spawning and managing worker threads."""
- def put(self, obj):
- """Put task into request queue."""
- self.requestQueue.put(obj)
+ def __init__(self, num):
+ """Create the request and reply queues, and 'num' worker threads."""
+ self.requestQueue = Queue.Queue(0)
+ self.resultsQueue = Queue.Queue(0)
- def get(self, block = 1):
- """Remove and return a result tuple from the results queue."""
- return self.resultsQueue.get(block)
-
- def get_nowait(self):
- """Remove and result a result tuple from the results queue
- without blocking."""
- return self.get(0)
+ # Create worker threads
+ for i in range(num):
+ worker = Worker(self.requestQueue, self.resultsQueue)
-class Parallel:
- """This class is used to execute tasks in parallel, and is somewhat
- less efficient than Serial, but is appropriate for parallel builds.
+ def put(self, obj):
+ """Put task into request queue."""
+ self.requestQueue.put(obj)
- This class is thread safe.
- """
+ def get(self, block = 1):
+ """Remove and return a result tuple from the results queue."""
+ return self.resultsQueue.get(block)
+
+ def get_nowait(self):
+ """Remove and result a result tuple from the results queue
+ without blocking."""
+ return self.get(0)
- def __init__(self, taskmaster, num):
- """Create a new parallel job given a taskmaster.
+ class Parallel:
+ """This class is used to execute tasks in parallel, and is somewhat
+ less efficient than Serial, but is appropriate for parallel builds.
- The taskmaster's next_task() method should return the next task
- that needs to be executed, or None if there are no more tasks. The
- taskmaster's executed() method will be called for each task when it
- is successfully executed or failed() will be called if the task
- failed to execute (i.e. execute() raised an exception). The
- taskmaster's is_blocked() method should return true iff there are
- more tasks, but they can't be executed until one or more other
- tasks have been executed. next_task() will be called iff
- is_blocked() returned false.
-
- Note: calls to taskmaster are serialized, but calls to execute() on
- distinct tasks are not serialized, because that is the whole point
- of parallel jobs: they can execute multiple tasks
- simultaneously. """
+ This class is thread safe.
+ """
- self.taskmaster = taskmaster
- self.tp = ThreadPool(num)
+ def __init__(self, taskmaster, num):
+ """Create a new parallel job given a taskmaster.
- self.jobs = 0
- self.maxjobs = num
+ The taskmaster's next_task() method should return the next task
+ that needs to be executed, or None if there are no more tasks. The
+ taskmaster's executed() method will be called for each task when it
+ is successfully executed or failed() will be called if the task
+ failed to execute (i.e. execute() raised an exception). The
+ taskmaster's is_blocked() method should return true iff there are
+ more tasks, but they can't be executed until one or more other
+ tasks have been executed. next_task() will be called iff
+ is_blocked() returned false.
- def start(self):
- """Start the job. This will begin pulling tasks from the
- taskmaster and executing them, and return when there are no
- more tasks. If a task fails to execute (i.e. execute() raises
- an exception), then the job will stop."""
+ Note: calls to taskmaster are serialized, but calls to execute() on
+ distinct tasks are not serialized, because that is the whole point
+ of parallel jobs: they can execute multiple tasks
+ simultaneously. """
- while 1:
- if self.jobs < self.maxjobs:
- task = self.taskmaster.next_task()
- if task is None:
- break
+ self.taskmaster = taskmaster
+ self.tp = ThreadPool(num)
- # prepare task for execution
- try:
- task.prepare()
- except KeyboardInterrupt:
- raise
- except:
- # Let the failed() callback function arrange for the
- # build to stop if that's appropriate.
- task.failed()
+ self.jobs = 0
+ self.maxjobs = num
- # dispatch task
- self.tp.put(task)
- self.jobs = self.jobs + 1
+ def start(self):
+ """Start the job. This will begin pulling tasks from the
+ taskmaster and executing them, and return when there are no
+ more tasks. If a task fails to execute (i.e. execute() raises
+ an exception), then the job will stop."""
while 1:
- try:
- task, ok = self.tp.get_nowait()
- except Queue.Empty:
- if not (self.jobs is self.maxjobs or self.taskmaster.is_blocked()):
+ if self.jobs < self.maxjobs:
+ task = self.taskmaster.next_task()
+ if task is None:
break
- task, ok = self.tp.get()
- self.jobs = self.jobs - 1
- if ok:
- task.executed()
- else:
- task.failed()
+ # prepare task for execution
+ try:
+ task.prepare()
+ except KeyboardInterrupt:
+ raise
+ except:
+ # Let the failed() callback function arrange for the
+ # build to stop if that's appropriate.
+ task.failed()
+
+ # dispatch task
+ self.tp.put(task)
+ self.jobs = self.jobs + 1
+
+ while 1:
+ try:
+ task, ok = self.tp.get_nowait()
+ except Queue.Empty:
+ if not (self.jobs is self.maxjobs or self.taskmaster.is_blocked()):
+ break
+ task, ok = self.tp.get()
+
+ self.jobs = self.jobs - 1
+ if ok:
+ task.executed()
+ else:
+ task.failed()
class ParallelTestCase(unittest.TestCase):
def runTest(self):
"test parallel jobs"
-
+
try:
import threading
except:
self.failIf(taskmaster.num_failed,
"some task(s) failed to execute")
+class NoParallelTestCase(unittest.TestCase):
+ def runTest(self):
+ "test handling lack of parallel support"
+ def NoParallel(tm, num):
+ raise NameError
+ save_Parallel = SCons.Job.Parallel
+ SCons.Job.Parallel = NoParallel
+ try:
+ taskmaster = Taskmaster(num_tasks, self, Task)
+ jobs = SCons.Job.Jobs(2, taskmaster)
+ self.failUnless(jobs.num_jobs == 1,
+ "unexpected number of jobs %d" % jobs.num_jobs)
+ jobs.run()
+ self.failUnless(taskmaster.tasks_were_serial(),
+ "the tasks were not executed in series")
+ self.failUnless(taskmaster.all_tasks_are_executed(),
+ "all the tests were not executed")
+ self.failUnless(taskmaster.all_tasks_are_iterated(),
+ "all the tests were not iterated over")
+ self.failIf(taskmaster.num_failed,
+ "some task(s) failed to execute")
+ finally:
+ SCons.Job.Parallel = save_Parallel
+
+
class SerialExceptionTestCase(unittest.TestCase):
def runTest(self):
"test a serial job with tasks that raise exceptions"
suite = unittest.TestSuite()
suite.addTest(ParallelTestCase())
suite.addTest(SerialTestCase())
+ suite.addTest(NoParallelTestCase())
suite.addTest(SerialExceptionTestCase())
suite.addTest(ParallelExceptionTestCase())
return suite