Merged revisions 1968-2115 via svnmerge from
[scons.git] / src / engine / SCons / Job.py
1 """SCons.Job
2
3 This module defines the Serial and Parallel classes that execute tasks to
4 complete a build. The Jobs class provides a higher level interface to start,
5 stop, and wait on jobs.
6
7 """
8
9 #
10 # __COPYRIGHT__
11 #
12 # Permission is hereby granted, free of charge, to any person obtaining
13 # a copy of this software and associated documentation files (the
14 # "Software"), to deal in the Software without restriction, including
15 # without limitation the rights to use, copy, modify, merge, publish,
16 # distribute, sublicense, and/or sell copies of the Software, and to
17 # permit persons to whom the Software is furnished to do so, subject to
18 # the following conditions:
19 #
20 # The above copyright notice and this permission notice shall be included
21 # in all copies or substantial portions of the Software.
22 #
23 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
24 # KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
25 # WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
27 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
28 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
29 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
30 #
31
32 __revision__ = "__FILE__ __REVISION__ __DATE__ __DEVELOPER__"
33
34 import SCons.compat
35
36 class Jobs:
37     """An instance of this class initializes N jobs, and provides
38     methods for starting, stopping, and waiting on all N jobs.
39     """
40
41     def __init__(self, num, taskmaster):
42         """
43         create 'num' jobs using the given taskmaster.
44
45         If 'num' is 1 or less, then a serial job will be used,
46         otherwise a parallel job with 'num' worker threads will
47         be used.
48
49         The 'num_jobs' attribute will be set to the actual number of jobs
50         allocated.  If more than one job is requested but the Parallel
51         class can't do it, it gets reset to 1.  Wrapping interfaces that
52         care should check the value of 'num_jobs' after initialization.
53         """
54
55         self.job = None
56         if num > 1:
57             try:
58                 self.job = Parallel(taskmaster, num)
59                 self.num_jobs = num
60             except NameError:
61                 pass
62         if self.job is None:
63             self.job = Serial(taskmaster)
64             self.num_jobs = 1
65
66     def run(self):
67         """run the job"""
68         try:
69             self.job.start()
70         except KeyboardInterrupt:
71             # mask any further keyboard interrupts so that scons
72             # can shutdown cleanly:
73             # (this only masks the keyboard interrupt for Python,
74             #  child processes can still get the keyboard interrupt)
75             import signal
76             signal.signal(signal.SIGINT, signal.SIG_IGN)
77             raise
78
79     def cleanup(self):
80         self.job.cleanup()
81
82 class Serial:
83     """This class is used to execute tasks in series, and is more efficient
84     than Parallel, but is only appropriate for non-parallel builds. Only
85     one instance of this class should be in existence at a time.
86
87     This class is not thread safe.
88     """
89
90     def __init__(self, taskmaster):
91         """Create a new serial job given a taskmaster. 
92
93         The taskmaster's next_task() method should return the next task
94         that needs to be executed, or None if there are no more tasks. The
95         taskmaster's executed() method will be called for each task when it
96         is successfully executed or failed() will be called if it failed to
97         execute (e.g. execute() raised an exception)."""
98         
99         self.taskmaster = taskmaster
100
101     def start(self):
102         """Start the job. This will begin pulling tasks from the taskmaster
103         and executing them, and return when there are no more tasks. If a task
104         fails to execute (i.e. execute() raises an exception), then the job will
105         stop."""
106         
107         while 1:
108             task = self.taskmaster.next_task()
109
110             if task is None:
111                 break
112
113             try:
114                 task.prepare()
115                 task.execute()
116             except KeyboardInterrupt:
117                 raise
118             except:
119                 task.exception_set()
120                 # Let the failed() callback function arrange for the
121                 # build to stop if that's appropriate.
122                 task.failed()
123             else:
124                 task.executed()
125
126             task.postprocess()
127
128     def cleanup(self):
129         pass
130
131 # Trap import failure so that everything in the Job module but the
132 # Parallel class (and its dependent classes) will work if the interpreter
133 # doesn't support threads.
134 try:
135     import Queue
136     import threading
137 except ImportError:
138     pass
139 else:
140     class Worker(threading.Thread):
141         """A worker thread waits on a task to be posted to its request queue,
142         dequeues the task, executes it, and posts a tuple including the task
143         and a boolean indicating whether the task executed successfully. """
144
145         def __init__(self, requestQueue, resultsQueue):
146             threading.Thread.__init__(self)
147             self.setDaemon(1)
148             self.requestQueue = requestQueue
149             self.resultsQueue = resultsQueue
150             self.start()
151
152         def run(self):
153             while 1:
154                 task = self.requestQueue.get()
155
156                 if not task:
157                     # The "None" value is used as a sentinel by
158                     # ThreadPool.cleanup().  This indicates that there
159                     # are no more tasks, so we should quit.
160                     break
161
162                 try:
163                     task.execute()
164                 except KeyboardInterrupt:
165                     # be explicit here for test/interrupts.py
166                     ok = False
167                 except:
168                     task.exception_set()
169                     ok = False
170                 else:
171                     ok = True
172
173                 self.resultsQueue.put((task, ok))
174
175     class ThreadPool:
176         """This class is responsible for spawning and managing worker threads."""
177
178         def __init__(self, num):
179             """Create the request and reply queues, and 'num' worker threads."""
180             self.requestQueue = Queue.Queue(0)
181             self.resultsQueue = Queue.Queue(0)
182
183             # Create worker threads
184             self.workers = []
185             for _ in range(num):
186                 worker = Worker(self.requestQueue, self.resultsQueue)
187                 self.workers.append(worker)
188
189         def put(self, obj):
190             """Put task into request queue."""
191             self.requestQueue.put(obj)
192
193         def get(self, block = True):
194             """Remove and return a result tuple from the results queue."""
195             return self.resultsQueue.get(block)
196
197         def preparation_failed(self, obj):
198             self.resultsQueue.put((obj, False))
199
200         def cleanup(self):
201             """
202             Shuts down the thread pool, giving each worker thread a
203             chance to shut down gracefully.
204             """
205             # For each worker thread, put a sentinel "None" value
206             # on the requestQueue (indicating that there's no work
207             # to be done) so that each worker thread will get one and
208             # terminate gracefully.
209             for _ in self.workers:
210                 self.requestQueue.put(None)
211
212             # Wait for all of the workers to terminate.
213             # 
214             # If we don't do this, later Python versions (2.4, 2.5) often
215             # seem to raise exceptions during shutdown.  This happens
216             # in requestQueue.get(), as an assertion failure that
217             # requestQueue.not_full is notified while not acquired,
218             # seemingly because the main thread has shut down (or is
219             # in the process of doing so) while the workers are still
220             # trying to pull sentinels off the requestQueue.
221             #
222             # Normally these terminations should happen fairly quickly,
223             # but we'll stick a one-second timeout on here just in case
224             # someone gets hung.
225             for worker in self.workers:
226                 worker.join(1.0)
227             self.workers = []
228
229     class Parallel:
230         """This class is used to execute tasks in parallel, and is somewhat 
231         less efficient than Serial, but is appropriate for parallel builds.
232
233         This class is thread safe.
234         """
235
236         def __init__(self, taskmaster, num):
237             """Create a new parallel job given a taskmaster.
238
239             The taskmaster's next_task() method should return the next
240             task that needs to be executed, or None if there are no more
241             tasks. The taskmaster's executed() method will be called
242             for each task when it is successfully executed or failed()
243             will be called if the task failed to execute (i.e. execute()
244             raised an exception).
245
246             Note: calls to taskmaster are serialized, but calls to
247             execute() on distinct tasks are not serialized, because
248             that is the whole point of parallel jobs: they can execute
249             multiple tasks simultaneously. """
250
251             self.taskmaster = taskmaster
252             self.tp = ThreadPool(num)
253
254             self.maxjobs = num
255
256         def start(self):
257             """Start the job. This will begin pulling tasks from the
258             taskmaster and executing them, and return when there are no
259             more tasks. If a task fails to execute (i.e. execute() raises
260             an exception), then the job will stop."""
261
262             jobs = 0
263             
264             while 1:
265                 # Start up as many available tasks as we're
266                 # allowed to.
267                 while jobs < self.maxjobs:
268                     task = self.taskmaster.next_task()
269                     if task is None:
270                         break
271
272                     # prepare task for execution
273                     try:
274                         task.prepare()
275                     except KeyboardInterrupt:
276                         raise
277                     except:
278                         # Let the failed() callback function arrange
279                         # for the build to stop if that's appropriate.
280                         task.exception_set()
281                         self.tp.preparation_failed(task)
282                         jobs = jobs + 1
283                         continue
284
285                     # dispatch task
286                     self.tp.put(task)
287                     jobs = jobs + 1
288
289                 if not task and not jobs: break
290
291                 # Let any/all completed tasks finish up before we go
292                 # back and put the next batch of tasks on the queue.
293                 while 1:
294                     task, ok = self.tp.get()
295
296                     jobs = jobs - 1
297                     if ok:
298                         task.executed()
299                     else:
300                         task.failed()
301
302                     task.postprocess()
303
304                     if self.tp.resultsQueue.empty():
305                         break
306
307         def cleanup(self):
308             self.tp.cleanup()