5d6befe6592fe469b616e23086f34e18cecbf423
[scons.git] / src / engine / SCons / Job.py
1 """SCons.Job
2
3 This module defines the Serial and Parallel classes that execute tasks to
4 complete a build. The Jobs class provides a higher level interface to start,
5 stop, and wait on jobs.
6
7 """
8
9 #
10 # __COPYRIGHT__
11 #
12 # Permission is hereby granted, free of charge, to any person obtaining
13 # a copy of this software and associated documentation files (the
14 # "Software"), to deal in the Software without restriction, including
15 # without limitation the rights to use, copy, modify, merge, publish,
16 # distribute, sublicense, and/or sell copies of the Software, and to
17 # permit persons to whom the Software is furnished to do so, subject to
18 # the following conditions:
19 #
20 # The above copyright notice and this permission notice shall be included
21 # in all copies or substantial portions of the Software.
22 #
23 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
24 # KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
25 # WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
27 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
28 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
29 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
30 #
31
32 __revision__ = "__FILE__ __REVISION__ __DATE__ __DEVELOPER__"
33
34 import SCons.compat
35
36 import os
37 import signal
38
39 import SCons.Errors
40
41 # The default stack size (in kilobytes) of the threads used to execute
42 # jobs in parallel.
43 #
44 # We use a stack size of 256 kilobytes. The default on some platforms
45 # is too large and prevents us from creating enough threads to fully
46 # parallelized the build. For example, the default stack size on linux
47 # is 8 MBytes.
48
49 explicit_stack_size = None
50 default_stack_size = 256
51
52 interrupt_msg = 'Build interrupted.'
53
54
55 class InterruptState:
56    def __init__(self):
57        self.interrupted = False
58
59    def set(self):
60        self.interrupted = True
61
62    def __call__(self):
63        return self.interrupted
64
65
66 class Jobs:
67     """An instance of this class initializes N jobs, and provides
68     methods for starting, stopping, and waiting on all N jobs.
69     """
70
71     def __init__(self, num, taskmaster):
72         """
73         create 'num' jobs using the given taskmaster.
74
75         If 'num' is 1 or less, then a serial job will be used,
76         otherwise a parallel job with 'num' worker threads will
77         be used.
78
79         The 'num_jobs' attribute will be set to the actual number of jobs
80         allocated.  If more than one job is requested but the Parallel
81         class can't do it, it gets reset to 1.  Wrapping interfaces that
82         care should check the value of 'num_jobs' after initialization.
83         """
84
85         self.job = None
86         if num > 1:
87             stack_size = explicit_stack_size
88             if stack_size is None:
89                 stack_size = default_stack_size
90                 
91             try:
92                 self.job = Parallel(taskmaster, num, stack_size)
93                 self.num_jobs = num
94             except NameError:
95                 pass
96         if self.job is None:
97             self.job = Serial(taskmaster)
98             self.num_jobs = 1
99
100     def run(self, postfunc=lambda: None):
101         """Run the jobs.
102
103         postfunc() will be invoked after the jobs has run. It will be
104         invoked even if the jobs are interrupted by a keyboard
105         interrupt (well, in fact by a signal such as either SIGINT,
106         SIGTERM or SIGHUP). The execution of postfunc() is protected
107         against keyboard interrupts and is guaranteed to run to
108         completion."""
109         self._setup_sig_handler()
110         try:
111             self.job.start()
112         finally:
113             postfunc()
114             self._reset_sig_handler()
115
116     def were_interrupted(self):
117         """Returns whether the jobs were interrupted by a signal."""
118         return self.job.interrupted()
119
120     def _setup_sig_handler(self):
121         """Setup an interrupt handler so that SCons can shutdown cleanly in
122         various conditions:
123
124           a) SIGINT: Keyboard interrupt
125           b) SIGTERM: kill or system shutdown
126           c) SIGHUP: Controlling shell exiting
127
128         We handle all of these cases by stopping the taskmaster. It
129         turns out that it very difficult to stop the build process
130         by throwing asynchronously an exception such as
131         KeyboardInterrupt. For example, the python Condition
132         variables (threading.Condition) and queue's do not seem to
133         asynchronous-exception-safe. It would require adding a whole
134         bunch of try/finally block and except KeyboardInterrupt all
135         over the place.
136
137         Note also that we have to be careful to handle the case when
138         SCons forks before executing another process. In that case, we
139         want the child to exit immediately.
140         """
141         def handler(signum, stack, self=self, parentpid=os.getpid()):
142             if os.getpid() == parentpid:
143                 self.job.taskmaster.stop()
144                 self.job.interrupted.set()
145             else:
146                 os._exit(2)
147
148         self.old_sigint  = signal.signal(signal.SIGINT, handler)
149         self.old_sigterm = signal.signal(signal.SIGTERM, handler)
150         try:
151             self.old_sighup = signal.signal(signal.SIGHUP, handler)
152         except AttributeError:
153             pass
154
155     def _reset_sig_handler(self):
156         """Restore the signal handlers to their previous state (before the
157          call to _setup_sig_handler()."""
158
159         signal.signal(signal.SIGINT, self.old_sigint)
160         signal.signal(signal.SIGTERM, self.old_sigterm)
161         try:
162             signal.signal(signal.SIGHUP, self.old_sighup)
163         except AttributeError:
164             pass
165
166 class Serial:
167     """This class is used to execute tasks in series, and is more efficient
168     than Parallel, but is only appropriate for non-parallel builds. Only
169     one instance of this class should be in existence at a time.
170
171     This class is not thread safe.
172     """
173
174     def __init__(self, taskmaster):
175         """Create a new serial job given a taskmaster. 
176
177         The taskmaster's next_task() method should return the next task
178         that needs to be executed, or None if there are no more tasks. The
179         taskmaster's executed() method will be called for each task when it
180         is successfully executed or failed() will be called if it failed to
181         execute (e.g. execute() raised an exception)."""
182         
183         self.taskmaster = taskmaster
184         self.interrupted = InterruptState()
185
186     def start(self):
187         """Start the job. This will begin pulling tasks from the taskmaster
188         and executing them, and return when there are no more tasks. If a task
189         fails to execute (i.e. execute() raises an exception), then the job will
190         stop."""
191         
192         while 1:
193             task = self.taskmaster.next_task()
194
195             if task is None:
196                 break
197
198             try:
199                 task.prepare()
200                 if task.needs_execute():
201                     task.execute()
202             except:
203                 if self.interrupted():
204                     try:
205                         raise SCons.Errors.BuildError(
206                             task.targets[0], errstr=interrupt_msg)
207                     except:
208                         task.exception_set()
209                 else:
210                     task.exception_set()
211
212                 # Let the failed() callback function arrange for the
213                 # build to stop if that's appropriate.
214                 task.failed()
215             else:
216                 task.executed()
217
218             task.postprocess()
219         self.taskmaster.cleanup()
220
221
222 # Trap import failure so that everything in the Job module but the
223 # Parallel class (and its dependent classes) will work if the interpreter
224 # doesn't support threads.
225 try:
226     import queue
227     import threading
228 except ImportError:
229     pass
230 else:
231     class Worker(threading.Thread):
232         """A worker thread waits on a task to be posted to its request queue,
233         dequeues the task, executes it, and posts a tuple including the task
234         and a boolean indicating whether the task executed successfully. """
235
236         def __init__(self, requestQueue, resultsQueue, interrupted):
237             threading.Thread.__init__(self)
238             self.setDaemon(1)
239             self.requestQueue = requestQueue
240             self.resultsQueue = resultsQueue
241             self.interrupted = interrupted
242             self.start()
243
244         def run(self):
245             while 1:
246                 task = self.requestQueue.get()
247
248                 if task is None:
249                     # The "None" value is used as a sentinel by
250                     # ThreadPool.cleanup().  This indicates that there
251                     # are no more tasks, so we should quit.
252                     break
253
254                 try:
255                     if self.interrupted():
256                         raise SCons.Errors.BuildError(
257                             task.targets[0], errstr=interrupt_msg)
258                     task.execute()
259                 except:
260                     task.exception_set()
261                     ok = False
262                 else:
263                     ok = True
264
265                 self.resultsQueue.put((task, ok))
266
267     class ThreadPool:
268         """This class is responsible for spawning and managing worker threads."""
269
270         def __init__(self, num, stack_size, interrupted):
271             """Create the request and reply queues, and 'num' worker threads.
272             
273             One must specify the stack size of the worker threads. The
274             stack size is specified in kilobytes.
275             """
276             self.requestQueue = queue.Queue(0)
277             self.resultsQueue = queue.Queue(0)
278
279             try:
280                 prev_size = threading.stack_size(stack_size*1024) 
281             except AttributeError, e:
282                 # Only print a warning if the stack size has been
283                 # explicitly set.
284                 if not explicit_stack_size is None:
285                     msg = "Setting stack size is unsupported by this version of Python:\n    " + \
286                         e.args[0]
287                     SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
288             except ValueError, e:
289                 msg = "Setting stack size failed:\n    " + str(e)
290                 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
291
292             # Create worker threads
293             self.workers = []
294             for _ in range(num):
295                 worker = Worker(self.requestQueue, self.resultsQueue, interrupted)
296                 self.workers.append(worker)
297
298             # Once we drop Python 1.5 we can change the following to:
299             #if 'prev_size' in locals():
300             if 'prev_size' in locals().keys():
301                 threading.stack_size(prev_size)
302
303         def put(self, task):
304             """Put task into request queue."""
305             self.requestQueue.put(task)
306
307         def get(self):
308             """Remove and return a result tuple from the results queue."""
309             return self.resultsQueue.get()
310
311         def preparation_failed(self, task):
312             self.resultsQueue.put((task, False))
313
314         def cleanup(self):
315             """
316             Shuts down the thread pool, giving each worker thread a
317             chance to shut down gracefully.
318             """
319             # For each worker thread, put a sentinel "None" value
320             # on the requestQueue (indicating that there's no work
321             # to be done) so that each worker thread will get one and
322             # terminate gracefully.
323             for _ in self.workers:
324                 self.requestQueue.put(None)
325
326             # Wait for all of the workers to terminate.
327             # 
328             # If we don't do this, later Python versions (2.4, 2.5) often
329             # seem to raise exceptions during shutdown.  This happens
330             # in requestQueue.get(), as an assertion failure that
331             # requestQueue.not_full is notified while not acquired,
332             # seemingly because the main thread has shut down (or is
333             # in the process of doing so) while the workers are still
334             # trying to pull sentinels off the requestQueue.
335             #
336             # Normally these terminations should happen fairly quickly,
337             # but we'll stick a one-second timeout on here just in case
338             # someone gets hung.
339             for worker in self.workers:
340                 worker.join(1.0)
341             self.workers = []
342
343     class Parallel:
344         """This class is used to execute tasks in parallel, and is somewhat 
345         less efficient than Serial, but is appropriate for parallel builds.
346
347         This class is thread safe.
348         """
349
350         def __init__(self, taskmaster, num, stack_size):
351             """Create a new parallel job given a taskmaster.
352
353             The taskmaster's next_task() method should return the next
354             task that needs to be executed, or None if there are no more
355             tasks. The taskmaster's executed() method will be called
356             for each task when it is successfully executed or failed()
357             will be called if the task failed to execute (i.e. execute()
358             raised an exception).
359
360             Note: calls to taskmaster are serialized, but calls to
361             execute() on distinct tasks are not serialized, because
362             that is the whole point of parallel jobs: they can execute
363             multiple tasks simultaneously. """
364
365             self.taskmaster = taskmaster
366             self.interrupted = InterruptState()
367             self.tp = ThreadPool(num, stack_size, self.interrupted)
368
369             self.maxjobs = num
370
371         def start(self):
372             """Start the job. This will begin pulling tasks from the
373             taskmaster and executing them, and return when there are no
374             more tasks. If a task fails to execute (i.e. execute() raises
375             an exception), then the job will stop."""
376
377             jobs = 0
378             
379             while 1:
380                 # Start up as many available tasks as we're
381                 # allowed to.
382                 while jobs < self.maxjobs:
383                     task = self.taskmaster.next_task()
384                     if task is None:
385                         break
386
387                     try:
388                         # prepare task for execution
389                         task.prepare()
390                     except:
391                         task.exception_set()
392                         task.failed()
393                         task.postprocess()
394                     else:
395                         if task.needs_execute():
396                             # dispatch task
397                             self.tp.put(task)
398                             jobs = jobs + 1
399                         else:
400                             task.executed()
401                             task.postprocess()
402
403                 if not task and not jobs: break
404
405                 # Let any/all completed tasks finish up before we go
406                 # back and put the next batch of tasks on the queue.
407                 while 1:
408                     task, ok = self.tp.get()
409                     jobs = jobs - 1
410
411                     if ok:
412                         task.executed()
413                     else:
414                         if self.interrupted():
415                             try:
416                                 raise SCons.Errors.BuildError(
417                                     task.targets[0], errstr=interrupt_msg)
418                             except:
419                                 task.exception_set()
420
421                         # Let the failed() callback function arrange
422                         # for the build to stop if that's appropriate.
423                         task.failed()
424
425                     task.postprocess()
426
427                     if self.tp.resultsQueue.empty():
428                         break
429
430             self.tp.cleanup()
431             self.taskmaster.cleanup()
432
433 # Local Variables:
434 # tab-width:4
435 # indent-tabs-mode:nil
436 # End:
437 # vim: set expandtab tabstop=4 shiftwidth=4: