Merged revisions 1784-1824 via svnmerge from
[scons.git] / src / engine / SCons / Job.py
1 """SCons.Job
2
3 This module defines the Serial and Parallel classes that execute tasks to
4 complete a build. The Jobs class provides a higher level interface to start,
5 stop, and wait on jobs.
6
7 """
8
9 #
10 # __COPYRIGHT__
11 #
12 # Permission is hereby granted, free of charge, to any person obtaining
13 # a copy of this software and associated documentation files (the
14 # "Software"), to deal in the Software without restriction, including
15 # without limitation the rights to use, copy, modify, merge, publish,
16 # distribute, sublicense, and/or sell copies of the Software, and to
17 # permit persons to whom the Software is furnished to do so, subject to
18 # the following conditions:
19 #
20 # The above copyright notice and this permission notice shall be included
21 # in all copies or substantial portions of the Software.
22 #
23 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
24 # KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
25 # WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
27 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
28 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
29 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
30 #
31
32 __revision__ = "__FILE__ __REVISION__ __DATE__ __DEVELOPER__"
33
34 import SCons.compat
35
36 class Jobs:
37     """An instance of this class initializes N jobs, and provides
38     methods for starting, stopping, and waiting on all N jobs.
39     """
40
41     def __init__(self, num, taskmaster):
42         """
43         create 'num' jobs using the given taskmaster.
44
45         If 'num' is 1 or less, then a serial job will be used,
46         otherwise a parallel job with 'num' worker threads will
47         be used.
48
49         The 'num_jobs' attribute will be set to the actual number of jobs
50         allocated.  If more than one job is requested but the Parallel
51         class can't do it, it gets reset to 1.  Wrapping interfaces that
52         care should check the value of 'num_jobs' after initialization.
53         """
54
55         self.job = None
56         if num > 1:
57             try:
58                 self.job = Parallel(taskmaster, num)
59                 self.num_jobs = num
60             except NameError:
61                 pass
62         if self.job is None:
63             self.job = Serial(taskmaster)
64             self.num_jobs = 1
65
66     def run(self):
67         """run the job"""
68         try:
69             self.job.start()
70         except KeyboardInterrupt:
71             # mask any further keyboard interrupts so that scons
72             # can shutdown cleanly:
73             # (this only masks the keyboard interrupt for Python,
74             #  child processes can still get the keyboard interrupt)
75             import signal
76             signal.signal(signal.SIGINT, signal.SIG_IGN)
77             raise
78
79 class Serial:
80     """This class is used to execute tasks in series, and is more efficient
81     than Parallel, but is only appropriate for non-parallel builds. Only
82     one instance of this class should be in existence at a time.
83
84     This class is not thread safe.
85     """
86
87     def __init__(self, taskmaster):
88         """Create a new serial job given a taskmaster. 
89
90         The taskmaster's next_task() method should return the next task
91         that needs to be executed, or None if there are no more tasks. The
92         taskmaster's executed() method will be called for each task when it
93         is successfully executed or failed() will be called if it failed to
94         execute (e.g. execute() raised an exception)."""
95         
96         self.taskmaster = taskmaster
97
98     def start(self):
99         """Start the job. This will begin pulling tasks from the taskmaster
100         and executing them, and return when there are no more tasks. If a task
101         fails to execute (i.e. execute() raises an exception), then the job will
102         stop."""
103         
104         while 1:
105             task = self.taskmaster.next_task()
106
107             if task is None:
108                 break
109
110             try:
111                 task.prepare()
112                 task.execute()
113             except KeyboardInterrupt:
114                 raise
115             except:
116                 task.exception_set()
117                 # Let the failed() callback function arrange for the
118                 # build to stop if that's appropriate.
119                 task.failed()
120             else:
121                 task.executed()
122
123             task.postprocess()
124
125
126 # Trap import failure so that everything in the Job module but the
127 # Parallel class (and its dependent classes) will work if the interpreter
128 # doesn't support threads.
129 try:
130     import Queue
131     import threading
132 except ImportError:
133     pass
134 else:
135     class Worker(threading.Thread):
136         """A worker thread waits on a task to be posted to its request queue,
137         dequeues the task, executes it, and posts a tuple including the task
138         and a boolean indicating whether the task executed successfully. """
139
140         def __init__(self, requestQueue, resultsQueue):
141             threading.Thread.__init__(self)
142             self.setDaemon(1)
143             self.requestQueue = requestQueue
144             self.resultsQueue = resultsQueue
145             self.start()
146
147         def run(self):
148             while 1:
149                 task = self.requestQueue.get()
150
151                 try:
152                     task.execute()
153                 except KeyboardInterrupt:
154                     # be explicit here for test/interrupts.py
155                     ok = False
156                 except:
157                     task.exception_set()
158                     ok = False
159                 else:
160                     ok = True
161
162                 self.resultsQueue.put((task, ok))
163
164     class ThreadPool:
165         """This class is responsible for spawning and managing worker threads."""
166
167         def __init__(self, num):
168             """Create the request and reply queues, and 'num' worker threads."""
169             self.requestQueue = Queue.Queue(0)
170             self.resultsQueue = Queue.Queue(0)
171
172             # Create worker threads
173             for _ in range(num):
174                 Worker(self.requestQueue, self.resultsQueue)
175
176         def put(self, obj):
177             """Put task into request queue."""
178             self.requestQueue.put(obj)
179
180         def get(self, block = True):
181             """Remove and return a result tuple from the results queue."""
182             return self.resultsQueue.get(block)
183
184         def preparation_failed(self, obj):
185             self.resultsQueue.put((obj, 0))
186
187     class Parallel:
188         """This class is used to execute tasks in parallel, and is somewhat 
189         less efficient than Serial, but is appropriate for parallel builds.
190
191         This class is thread safe.
192         """
193
194         def __init__(self, taskmaster, num):
195             """Create a new parallel job given a taskmaster.
196
197             The taskmaster's next_task() method should return the next
198             task that needs to be executed, or None if there are no more
199             tasks. The taskmaster's executed() method will be called
200             for each task when it is successfully executed or failed()
201             will be called if the task failed to execute (i.e. execute()
202             raised an exception).
203
204             Note: calls to taskmaster are serialized, but calls to
205             execute() on distinct tasks are not serialized, because
206             that is the whole point of parallel jobs: they can execute
207             multiple tasks simultaneously. """
208
209             self.taskmaster = taskmaster
210             self.tp = ThreadPool(num)
211
212             self.maxjobs = num
213
214         def start(self):
215             """Start the job. This will begin pulling tasks from the
216             taskmaster and executing them, and return when there are no
217             more tasks. If a task fails to execute (i.e. execute() raises
218             an exception), then the job will stop."""
219
220             jobs = 0
221             
222             while 1:
223                 # Start up as many available tasks as we're
224                 # allowed to.
225                 while jobs < self.maxjobs:
226                     task = self.taskmaster.next_task()
227                     if task is None:
228                         break
229
230                     # prepare task for execution
231                     try:
232                         task.prepare()
233                     except KeyboardInterrupt:
234                         raise
235                     except:
236                         # Let the failed() callback function arrange
237                         # for the build to stop if that's appropriate.
238                         task.exception_set()
239                         self.tp.preparation_failed(task)
240                         jobs = jobs + 1
241                         continue
242
243                     # dispatch task
244                     self.tp.put(task)
245                     jobs = jobs + 1
246
247                 if not task and not jobs: break
248
249                 # Let any/all completed tasks finish up before we go
250                 # back and put the next batch of tasks on the queue.
251                 while 1:
252                     task, ok = self.tp.get()
253
254                     jobs = jobs - 1
255                     if ok:
256                         task.executed()
257                     else:
258                         task.failed()
259
260                     task.postprocess()
261
262                     if self.tp.resultsQueue.empty():
263                         break