Merged revisions 1441-1539 via svnmerge from
[scons.git] / src / engine / SCons / Job.py
1 """SCons.Job
2
3 This module defines the Serial and Parallel classes that execute tasks to
4 complete a build. The Jobs class provides a higher level interface to start,
5 stop, and wait on jobs.
6
7 """
8
9 #
10 # __COPYRIGHT__
11 #
12 # Permission is hereby granted, free of charge, to any person obtaining
13 # a copy of this software and associated documentation files (the
14 # "Software"), to deal in the Software without restriction, including
15 # without limitation the rights to use, copy, modify, merge, publish,
16 # distribute, sublicense, and/or sell copies of the Software, and to
17 # permit persons to whom the Software is furnished to do so, subject to
18 # the following conditions:
19 #
20 # The above copyright notice and this permission notice shall be included
21 # in all copies or substantial portions of the Software.
22 #
23 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
24 # KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
25 # WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
27 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
28 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
29 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
30 #
31
32 __revision__ = "__FILE__ __REVISION__ __DATE__ __DEVELOPER__"
33
34 class Jobs:
35     """An instance of this class initializes N jobs, and provides
36     methods for starting, stopping, and waiting on all N jobs.
37     """
38
39     def __init__(self, num, taskmaster):
40         """
41         create 'num' jobs using the given taskmaster.
42
43         If 'num' is 1 or less, then a serial job will be used,
44         otherwise a parallel job with 'num' worker threads will
45         be used.
46
47         The 'num_jobs' attribute will be set to the actual number of jobs
48         allocated.  If more than one job is requested but the Parallel
49         class can't do it, it gets reset to 1.  Wrapping interfaces that
50         care should check the value of 'num_jobs' after initialization.
51         """
52
53         self.job = None
54         if num > 1:
55             try:
56                 self.job = Parallel(taskmaster, num)
57                 self.num_jobs = num
58             except NameError:
59                 pass
60         if self.job is None:
61             self.job = Serial(taskmaster)
62             self.num_jobs = 1
63
64     def run(self):
65         """run the job"""
66         try:
67             self.job.start()
68         except KeyboardInterrupt:
69             # mask any further keyboard interrupts so that scons
70             # can shutdown cleanly:
71             # (this only masks the keyboard interrupt for Python,
72             #  child processes can still get the keyboard interrupt)
73             import signal
74             signal.signal(signal.SIGINT, signal.SIG_IGN)
75             raise
76
77 class Serial:
78     """This class is used to execute tasks in series, and is more efficient
79     than Parallel, but is only appropriate for non-parallel builds. Only
80     one instance of this class should be in existence at a time.
81
82     This class is not thread safe.
83     """
84
85     def __init__(self, taskmaster):
86         """Create a new serial job given a taskmaster. 
87
88         The taskmaster's next_task() method should return the next task
89         that needs to be executed, or None if there are no more tasks. The
90         taskmaster's executed() method will be called for each task when it
91         is successfully executed or failed() will be called if it failed to
92         execute (e.g. execute() raised an exception)."""
93         
94         self.taskmaster = taskmaster
95
96     def start(self):
97         """Start the job. This will begin pulling tasks from the taskmaster
98         and executing them, and return when there are no more tasks. If a task
99         fails to execute (i.e. execute() raises an exception), then the job will
100         stop."""
101         
102         while 1:
103             task = self.taskmaster.next_task()
104
105             if task is None:
106                 break
107
108             try:
109                 task.prepare()
110                 task.execute()
111             except KeyboardInterrupt:
112                 raise
113             except:
114                 task.exception_set()
115                 # Let the failed() callback function arrange for the
116                 # build to stop if that's appropriate.
117                 task.failed()
118             else:
119                 task.executed()
120
121             task.postprocess()
122
123
124 # Trap import failure so that everything in the Job module but the
125 # Parallel class (and its dependent classes) will work if the interpreter
126 # doesn't support threads.
127 try:
128     import Queue
129     import threading
130 except ImportError:
131     pass
132 else:
133     class Worker(threading.Thread):
134         """A worker thread waits on a task to be posted to its request queue,
135         dequeues the task, executes it, and posts a tuple including the task
136         and a boolean indicating whether the task executed successfully. """
137
138         def __init__(self, requestQueue, resultsQueue):
139             threading.Thread.__init__(self)
140             self.setDaemon(1)
141             self.requestQueue = requestQueue
142             self.resultsQueue = resultsQueue
143             self.start()
144
145         def run(self):
146             while 1:
147                 task = self.requestQueue.get()
148
149                 try:
150                     task.execute()
151                 except KeyboardInterrupt:
152                     # be explicit here for test/interrupts.py
153                     ok = False
154                 except:
155                     task.exception_set()
156                     ok = 0
157                 else:
158                     ok = 1
159
160                 self.resultsQueue.put((task, ok))
161
162     class ThreadPool:
163         """This class is responsible for spawning and managing worker threads."""
164
165         def __init__(self, num):
166             """Create the request and reply queues, and 'num' worker threads."""
167             self.requestQueue = Queue.Queue(0)
168             self.resultsQueue = Queue.Queue(0)
169
170             # Create worker threads
171             for i in range(num):
172                 Worker(self.requestQueue, self.resultsQueue)
173
174         def put(self, obj):
175             """Put task into request queue."""
176             self.requestQueue.put(obj)
177
178         def get(self, block = 1):
179             """Remove and return a result tuple from the results queue."""
180             return self.resultsQueue.get(block)
181
182         def preparation_failed(self, obj):
183             self.resultsQueue.put((obj, 0))
184
185     class Parallel:
186         """This class is used to execute tasks in parallel, and is somewhat 
187         less efficient than Serial, but is appropriate for parallel builds.
188
189         This class is thread safe.
190         """
191
192         def __init__(self, taskmaster, num):
193             """Create a new parallel job given a taskmaster.
194
195             The taskmaster's next_task() method should return the next
196             task that needs to be executed, or None if there are no more
197             tasks. The taskmaster's executed() method will be called
198             for each task when it is successfully executed or failed()
199             will be called if the task failed to execute (i.e. execute()
200             raised an exception).
201
202             Note: calls to taskmaster are serialized, but calls to
203             execute() on distinct tasks are not serialized, because
204             that is the whole point of parallel jobs: they can execute
205             multiple tasks simultaneously. """
206
207             self.taskmaster = taskmaster
208             self.tp = ThreadPool(num)
209
210             self.maxjobs = num
211
212         def start(self):
213             """Start the job. This will begin pulling tasks from the
214             taskmaster and executing them, and return when there are no
215             more tasks. If a task fails to execute (i.e. execute() raises
216             an exception), then the job will stop."""
217
218             jobs = 0
219             
220             while 1:
221                 # Start up as many available tasks as we're
222                 # allowed to.
223                 while jobs < self.maxjobs:
224                     task = self.taskmaster.next_task()
225                     if task is None:
226                         break
227
228                     # prepare task for execution
229                     try:
230                         task.prepare()
231                     except KeyboardInterrupt:
232                         raise
233                     except:
234                         # Let the failed() callback function arrange
235                         # for the build to stop if that's appropriate.
236                         task.exception_set()
237                         self.tp.preparation_failed(task)
238                         jobs = jobs + 1
239                         continue
240
241                     # dispatch task
242                     self.tp.put(task)
243                     jobs = jobs + 1
244
245                 if not task and not jobs: break
246
247                 # Let any/all completed tasks finish up before we go
248                 # back and put the next batch of tasks on the queue.
249                 while 1:
250                     task, ok = self.tp.get()
251
252                     jobs = jobs - 1
253                     if ok:
254                         task.executed()
255                     else:
256                         task.failed()
257
258                     task.postprocess()
259
260                     if self.tp.resultsQueue.empty():
261                         break