3 This module defines the Serial and Parallel classes that execute tasks to
4 complete a build. The Jobs class provides a higher level interface to start,
5 stop, and wait on jobs.
12 # Permission is hereby granted, free of charge, to any person obtaining
13 # a copy of this software and associated documentation files (the
14 # "Software"), to deal in the Software without restriction, including
15 # without limitation the rights to use, copy, modify, merge, publish,
16 # distribute, sublicense, and/or sell copies of the Software, and to
17 # permit persons to whom the Software is furnished to do so, subject to
18 # the following conditions:
20 # The above copyright notice and this permission notice shall be included
21 # in all copies or substantial portions of the Software.
23 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
24 # KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
25 # WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
27 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
28 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
29 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
32 __revision__ = "__FILE__ __REVISION__ __DATE__ __DEVELOPER__"
37 """An instance of this class initializes N jobs, and provides
38 methods for starting, stopping, and waiting on all N jobs.
41 def __init__(self, num, taskmaster):
43 create 'num' jobs using the given taskmaster.
45 If 'num' is 1 or less, then a serial job will be used,
46 otherwise a parallel job with 'num' worker threads will
49 The 'num_jobs' attribute will be set to the actual number of jobs
50 allocated. If more than one job is requested but the Parallel
51 class can't do it, it gets reset to 1. Wrapping interfaces that
52 care should check the value of 'num_jobs' after initialization.
58 self.job = Parallel(taskmaster, num)
63 self.job = Serial(taskmaster)
70 except KeyboardInterrupt:
71 # mask any further keyboard interrupts so that scons
72 # can shutdown cleanly:
73 # (this only masks the keyboard interrupt for Python,
74 # child processes can still get the keyboard interrupt)
76 signal.signal(signal.SIGINT, signal.SIG_IGN)
83 """This class is used to execute tasks in series, and is more efficient
84 than Parallel, but is only appropriate for non-parallel builds. Only
85 one instance of this class should be in existence at a time.
87 This class is not thread safe.
90 def __init__(self, taskmaster):
91 """Create a new serial job given a taskmaster.
93 The taskmaster's next_task() method should return the next task
94 that needs to be executed, or None if there are no more tasks. The
95 taskmaster's executed() method will be called for each task when it
96 is successfully executed or failed() will be called if it failed to
97 execute (e.g. execute() raised an exception)."""
99 self.taskmaster = taskmaster
102 """Start the job. This will begin pulling tasks from the taskmaster
103 and executing them, and return when there are no more tasks. If a task
104 fails to execute (i.e. execute() raises an exception), then the job will
108 task = self.taskmaster.next_task()
116 except KeyboardInterrupt:
120 # Let the failed() callback function arrange for the
121 # build to stop if that's appropriate.
131 # Trap import failure so that everything in the Job module but the
132 # Parallel class (and its dependent classes) will work if the interpreter
133 # doesn't support threads.
140 class Worker(threading.Thread):
141 """A worker thread waits on a task to be posted to its request queue,
142 dequeues the task, executes it, and posts a tuple including the task
143 and a boolean indicating whether the task executed successfully. """
145 def __init__(self, requestQueue, resultsQueue):
146 threading.Thread.__init__(self)
148 self.requestQueue = requestQueue
149 self.resultsQueue = resultsQueue
154 task = self.requestQueue.get()
157 # The "None" value is used as a sentinel by
158 # ThreadPool.cleanup(). This indicates that there
159 # are no more tasks, so we should quit.
164 except KeyboardInterrupt:
165 # be explicit here for test/interrupts.py
173 self.resultsQueue.put((task, ok))
176 """This class is responsible for spawning and managing worker threads."""
178 def __init__(self, num):
179 """Create the request and reply queues, and 'num' worker threads."""
180 self.requestQueue = Queue.Queue(0)
181 self.resultsQueue = Queue.Queue(0)
183 # Create worker threads
186 worker = Worker(self.requestQueue, self.resultsQueue)
187 self.workers.append(worker)
190 """Put task into request queue."""
191 self.requestQueue.put(obj)
193 def get(self, block = True):
194 """Remove and return a result tuple from the results queue."""
195 return self.resultsQueue.get(block)
197 def preparation_failed(self, obj):
198 self.resultsQueue.put((obj, False))
202 Shuts down the thread pool, giving each worker thread a
203 chance to shut down gracefully.
205 # For each worker thread, put a sentinel "None" value
206 # on the requestQueue (indicating that there's no work
207 # to be done) so that each worker thread will get one and
208 # terminate gracefully.
209 for _ in self.workers:
210 self.requestQueue.put(None)
212 # Wait for all of the workers to terminate.
214 # If we don't do this, later Python versions (2.4, 2.5) often
215 # seem to raise exceptions during shutdown. This happens
216 # in requestQueue.get(), as an assertion failure that
217 # requestQueue.not_full is notified while not acquired,
218 # seemingly because the main thread has shut down (or is
219 # in the process of doing so) while the workers are still
220 # trying to pull sentinels off the requestQueue.
222 # Normally these terminations should happen fairly quickly,
223 # but we'll stick a one-second timeout on here just in case
225 for worker in self.workers:
230 """This class is used to execute tasks in parallel, and is somewhat
231 less efficient than Serial, but is appropriate for parallel builds.
233 This class is thread safe.
236 def __init__(self, taskmaster, num):
237 """Create a new parallel job given a taskmaster.
239 The taskmaster's next_task() method should return the next
240 task that needs to be executed, or None if there are no more
241 tasks. The taskmaster's executed() method will be called
242 for each task when it is successfully executed or failed()
243 will be called if the task failed to execute (i.e. execute()
244 raised an exception).
246 Note: calls to taskmaster are serialized, but calls to
247 execute() on distinct tasks are not serialized, because
248 that is the whole point of parallel jobs: they can execute
249 multiple tasks simultaneously. """
251 self.taskmaster = taskmaster
252 self.tp = ThreadPool(num)
257 """Start the job. This will begin pulling tasks from the
258 taskmaster and executing them, and return when there are no
259 more tasks. If a task fails to execute (i.e. execute() raises
260 an exception), then the job will stop."""
265 # Start up as many available tasks as we're
267 while jobs < self.maxjobs:
268 task = self.taskmaster.next_task()
272 # prepare task for execution
275 except KeyboardInterrupt:
278 # Let the failed() callback function arrange
279 # for the build to stop if that's appropriate.
281 self.tp.preparation_failed(task)
289 if not task and not jobs: break
291 # Let any/all completed tasks finish up before we go
292 # back and put the next batch of tasks on the queue.
294 task, ok = self.tp.get()
304 if self.tp.resultsQueue.empty():