3 This module defines the Serial and Parallel classes that execute tasks to
4 complete a build. The Jobs class provides a higher level interface to start,
5 stop, and wait on jobs.
12 # Permission is hereby granted, free of charge, to any person obtaining
13 # a copy of this software and associated documentation files (the
14 # "Software"), to deal in the Software without restriction, including
15 # without limitation the rights to use, copy, modify, merge, publish,
16 # distribute, sublicense, and/or sell copies of the Software, and to
17 # permit persons to whom the Software is furnished to do so, subject to
18 # the following conditions:
20 # The above copyright notice and this permission notice shall be included
21 # in all copies or substantial portions of the Software.
23 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
24 # KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
25 # WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
27 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
28 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
29 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
32 __revision__ = "__FILE__ __REVISION__ __DATE__ __DEVELOPER__"
41 # The default stack size (in kilobytes) of the threads used to execute
44 # We use a stack size of 256 kilobytes. The default on some platforms
45 # is too large and prevents us from creating enough threads to fully
46 # parallelized the build. For example, the default stack size on linux
49 explicit_stack_size = None
50 default_stack_size = 256
52 interrupt_msg = 'Build interrupted.'
57 self.interrupted = False
60 self.interrupted = True
63 return self.interrupted
67 """An instance of this class initializes N jobs, and provides
68 methods for starting, stopping, and waiting on all N jobs.
71 def __init__(self, num, taskmaster):
73 create 'num' jobs using the given taskmaster.
75 If 'num' is 1 or less, then a serial job will be used,
76 otherwise a parallel job with 'num' worker threads will
79 The 'num_jobs' attribute will be set to the actual number of jobs
80 allocated. If more than one job is requested but the Parallel
81 class can't do it, it gets reset to 1. Wrapping interfaces that
82 care should check the value of 'num_jobs' after initialization.
87 stack_size = explicit_stack_size
88 if stack_size is None:
89 stack_size = default_stack_size
92 self.job = Parallel(taskmaster, num, stack_size)
97 self.job = Serial(taskmaster)
100 def run(self, postfunc=lambda: None):
103 postfunc() will be invoked after the jobs has run. It will be
104 invoked even if the jobs are interrupted by a keyboard
105 interrupt (well, in fact by a signal such as either SIGINT,
106 SIGTERM or SIGHUP). The execution of postfunc() is protected
107 against keyboard interrupts and is guaranteed to run to
109 self._setup_sig_handler()
114 self._reset_sig_handler()
116 def were_interrupted(self):
117 """Returns whether the jobs were interrupted by a signal."""
118 return self.job.interrupted()
120 def _setup_sig_handler(self):
121 """Setup an interrupt handler so that SCons can shutdown cleanly in
124 a) SIGINT: Keyboard interrupt
125 b) SIGTERM: kill or system shutdown
126 c) SIGHUP: Controlling shell exiting
128 We handle all of these cases by stopping the taskmaster. It
129 turns out that it very difficult to stop the build process
130 by throwing asynchronously an exception such as
131 KeyboardInterrupt. For example, the python Condition
132 variables (threading.Condition) and queue's do not seem to
133 asynchronous-exception-safe. It would require adding a whole
134 bunch of try/finally block and except KeyboardInterrupt all
137 Note also that we have to be careful to handle the case when
138 SCons forks before executing another process. In that case, we
139 want the child to exit immediately.
141 def handler(signum, stack, self=self, parentpid=os.getpid()):
142 if os.getpid() == parentpid:
143 self.job.taskmaster.stop()
144 self.job.interrupted.set()
148 self.old_sigint = signal.signal(signal.SIGINT, handler)
149 self.old_sigterm = signal.signal(signal.SIGTERM, handler)
151 self.old_sighup = signal.signal(signal.SIGHUP, handler)
152 except AttributeError:
155 def _reset_sig_handler(self):
156 """Restore the signal handlers to their previous state (before the
157 call to _setup_sig_handler()."""
159 signal.signal(signal.SIGINT, self.old_sigint)
160 signal.signal(signal.SIGTERM, self.old_sigterm)
162 signal.signal(signal.SIGHUP, self.old_sighup)
163 except AttributeError:
167 """This class is used to execute tasks in series, and is more efficient
168 than Parallel, but is only appropriate for non-parallel builds. Only
169 one instance of this class should be in existence at a time.
171 This class is not thread safe.
174 def __init__(self, taskmaster):
175 """Create a new serial job given a taskmaster.
177 The taskmaster's next_task() method should return the next task
178 that needs to be executed, or None if there are no more tasks. The
179 taskmaster's executed() method will be called for each task when it
180 is successfully executed or failed() will be called if it failed to
181 execute (e.g. execute() raised an exception)."""
183 self.taskmaster = taskmaster
184 self.interrupted = InterruptState()
187 """Start the job. This will begin pulling tasks from the taskmaster
188 and executing them, and return when there are no more tasks. If a task
189 fails to execute (i.e. execute() raises an exception), then the job will
193 task = self.taskmaster.next_task()
200 if task.needs_execute():
203 if self.interrupted():
205 raise SCons.Errors.BuildError(
206 task.targets[0], errstr=interrupt_msg)
212 # Let the failed() callback function arrange for the
213 # build to stop if that's appropriate.
219 self.taskmaster.cleanup()
222 # Trap import failure so that everything in the Job module but the
223 # Parallel class (and its dependent classes) will work if the interpreter
224 # doesn't support threads.
231 class Worker(threading.Thread):
232 """A worker thread waits on a task to be posted to its request queue,
233 dequeues the task, executes it, and posts a tuple including the task
234 and a boolean indicating whether the task executed successfully. """
236 def __init__(self, requestQueue, resultsQueue, interrupted):
237 threading.Thread.__init__(self)
239 self.requestQueue = requestQueue
240 self.resultsQueue = resultsQueue
241 self.interrupted = interrupted
246 task = self.requestQueue.get()
249 # The "None" value is used as a sentinel by
250 # ThreadPool.cleanup(). This indicates that there
251 # are no more tasks, so we should quit.
255 if self.interrupted():
256 raise SCons.Errors.BuildError(
257 task.targets[0], errstr=interrupt_msg)
265 self.resultsQueue.put((task, ok))
268 """This class is responsible for spawning and managing worker threads."""
270 def __init__(self, num, stack_size, interrupted):
271 """Create the request and reply queues, and 'num' worker threads.
273 One must specify the stack size of the worker threads. The
274 stack size is specified in kilobytes.
276 self.requestQueue = queue.Queue(0)
277 self.resultsQueue = queue.Queue(0)
280 prev_size = threading.stack_size(stack_size*1024)
281 except AttributeError, e:
282 # Only print a warning if the stack size has been
284 if not explicit_stack_size is None:
285 msg = "Setting stack size is unsupported by this version of Python:\n " + \
287 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
288 except ValueError, e:
289 msg = "Setting stack size failed:\n " + str(e)
290 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
292 # Create worker threads
295 worker = Worker(self.requestQueue, self.resultsQueue, interrupted)
296 self.workers.append(worker)
298 # Once we drop Python 1.5 we can change the following to:
299 #if 'prev_size' in locals():
300 if 'prev_size' in locals().keys():
301 threading.stack_size(prev_size)
304 """Put task into request queue."""
305 self.requestQueue.put(task)
308 """Remove and return a result tuple from the results queue."""
309 return self.resultsQueue.get()
311 def preparation_failed(self, task):
312 self.resultsQueue.put((task, False))
316 Shuts down the thread pool, giving each worker thread a
317 chance to shut down gracefully.
319 # For each worker thread, put a sentinel "None" value
320 # on the requestQueue (indicating that there's no work
321 # to be done) so that each worker thread will get one and
322 # terminate gracefully.
323 for _ in self.workers:
324 self.requestQueue.put(None)
326 # Wait for all of the workers to terminate.
328 # If we don't do this, later Python versions (2.4, 2.5) often
329 # seem to raise exceptions during shutdown. This happens
330 # in requestQueue.get(), as an assertion failure that
331 # requestQueue.not_full is notified while not acquired,
332 # seemingly because the main thread has shut down (or is
333 # in the process of doing so) while the workers are still
334 # trying to pull sentinels off the requestQueue.
336 # Normally these terminations should happen fairly quickly,
337 # but we'll stick a one-second timeout on here just in case
339 for worker in self.workers:
344 """This class is used to execute tasks in parallel, and is somewhat
345 less efficient than Serial, but is appropriate for parallel builds.
347 This class is thread safe.
350 def __init__(self, taskmaster, num, stack_size):
351 """Create a new parallel job given a taskmaster.
353 The taskmaster's next_task() method should return the next
354 task that needs to be executed, or None if there are no more
355 tasks. The taskmaster's executed() method will be called
356 for each task when it is successfully executed or failed()
357 will be called if the task failed to execute (i.e. execute()
358 raised an exception).
360 Note: calls to taskmaster are serialized, but calls to
361 execute() on distinct tasks are not serialized, because
362 that is the whole point of parallel jobs: they can execute
363 multiple tasks simultaneously. """
365 self.taskmaster = taskmaster
366 self.interrupted = InterruptState()
367 self.tp = ThreadPool(num, stack_size, self.interrupted)
372 """Start the job. This will begin pulling tasks from the
373 taskmaster and executing them, and return when there are no
374 more tasks. If a task fails to execute (i.e. execute() raises
375 an exception), then the job will stop."""
380 # Start up as many available tasks as we're
382 while jobs < self.maxjobs:
383 task = self.taskmaster.next_task()
388 # prepare task for execution
395 if task.needs_execute():
403 if not task and not jobs: break
405 # Let any/all completed tasks finish up before we go
406 # back and put the next batch of tasks on the queue.
408 task, ok = self.tp.get()
414 if self.interrupted():
416 raise SCons.Errors.BuildError(
417 task.targets[0], errstr=interrupt_msg)
421 # Let the failed() callback function arrange
422 # for the build to stop if that's appropriate.
427 if self.tp.resultsQueue.empty():
431 self.taskmaster.cleanup()
435 # indent-tabs-mode:nil
437 # vim: set expandtab tabstop=4 shiftwidth=4: