New parallel job execution. (J.T. Conklin)
[scons.git] / src / engine / SCons / Job.py
1 """SCons.Job
2
3 This module defines the Serial and Parallel classes that execute tasks to
4 complete a build. The Jobs class provides a higher level interface to start,
5 stop, and wait on jobs.
6
7 """
8
9 #
10 # __COPYRIGHT__
11 #
12 # Permission is hereby granted, free of charge, to any person obtaining
13 # a copy of this software and associated documentation files (the
14 # "Software"), to deal in the Software without restriction, including
15 # without limitation the rights to use, copy, modify, merge, publish,
16 # distribute, sublicense, and/or sell copies of the Software, and to
17 # permit persons to whom the Software is furnished to do so, subject to
18 # the following conditions:
19 #
20 # The above copyright notice and this permission notice shall be included
21 # in all copies or substantial portions of the Software.
22 #
23 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
24 # KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
25 # WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
27 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
28 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
29 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
30 #
31
32 __revision__ = "__FILE__ __REVISION__ __DATE__ __DEVELOPER__"
33
34 class Jobs:
35     """An instance of this class initializes N jobs, and provides
36     methods for starting, stopping, and waiting on all N jobs.
37     """
38
39     def __init__(self, num, taskmaster):
40         """
41         create 'num' jobs using the given taskmaster.
42
43         If 'num' is 1 or less, then a serial job will be used,
44         otherwise a parallel job with 'num' worker threads will
45         be used.
46         """
47
48         if num > 1:
49             self.job = Parallel(taskmaster, num)
50         else:
51             self.job = Serial(taskmaster)
52
53     def run(self):
54         """run the job"""
55         try:
56             self.job.start()
57         except KeyboardInterrupt:
58             # mask any further keyboard interrupts so that scons
59             # can shutdown cleanly:
60             # (this only masks the keyboard interrupt for Python,
61             #  child processes can still get the keyboard interrupt)
62             import signal
63             signal.signal(signal.SIGINT, signal.SIG_IGN)
64             raise
65
66 class Serial:
67     """This class is used to execute tasks in series, and is more efficient
68     than Parallel, but is only appropriate for non-parallel builds. Only
69     one instance of this class should be in existence at a time.
70
71     This class is not thread safe.
72     """
73
74     def __init__(self, taskmaster):
75         """Create a new serial job given a taskmaster. 
76
77         The taskmaster's next_task() method should return the next task
78         that needs to be executed, or None if there are no more tasks. The
79         taskmaster's executed() method will be called for each task when it
80         is successfully executed or failed() will be called if it failed to
81         execute (e.g. execute() raised an exception). The taskmaster's
82         is_blocked() method will not be called.  """
83         
84         self.taskmaster = taskmaster
85
86     def start(self):
87         """Start the job. This will begin pulling tasks from the taskmaster
88         and executing them, and return when there are no more tasks. If a task
89         fails to execute (i.e. execute() raises an exception), then the job will
90         stop."""
91         
92         while 1:
93             task = self.taskmaster.next_task()
94
95             if task is None:
96                 break
97
98             try:
99                 task.prepare()
100                 task.execute()
101             except KeyboardInterrupt:
102                 raise
103             except:
104                 # Let the failed() callback function arrange for the
105                 # build to stop if that's appropriate.
106                 task.failed()
107             else:
108                 task.executed()
109
110
111 # Trap import failure so that everything in the Job module but the
112 # Parallel class (and its dependent classes) will work if the interpreter
113 # doesn't support threads.
114 try:
115     import Queue
116     import threading
117 except ImportError:
118     pass
119
120 class Worker(threading.Thread):
121     """A worker thread waits on a task to be posted to its request queue,
122     dequeues the task, executes it, and posts a tuple including the task
123     and a boolean indicating whether the task executed successfully. """
124
125     def __init__(self, requestQueue, resultsQueue):
126         threading.Thread.__init__(self)
127         self.setDaemon(1)
128         self.requestQueue = requestQueue
129         self.resultsQueue = resultsQueue
130         self.start()
131
132     def run(self):
133         while 1:
134             task = self.requestQueue.get()
135
136             try:
137                 task.execute()
138             except:
139                 ok = False
140             else:
141                 ok = True
142
143             self.resultsQueue.put((task, ok))
144
145 class ThreadPool:
146     """This class is responsible for spawning and managing worker threads."""
147
148     def __init__(self, num):
149         """Create the request and reply queues, and 'num' worker threads."""
150         # Ideally we wouldn't have to artificially limit the number of
151         # tasks that can be posted to the request queue.  But this can
152         # result in a large number of pending tasks, which at the time
153         # of this writing causes the taskmaster's next_task method to
154         # take a very long time.
155         self.requestQueue = Queue.Queue(num)
156         self.resultsQueue = Queue.Queue()
157
158         # Create worker threads
159         for i in range(num):
160             worker = Worker(self.requestQueue, self.resultsQueue)
161
162     def put(self, obj):
163         """Put task into request queue."""
164         self.requestQueue.put(obj)
165
166     def get(self, block = 1):
167         """Remove and return a result tuple from the results queue."""
168         return self.resultsQueue.get(block)
169         
170     def get_nowait(self):
171         """Remove and result a result tuple from the results queue 
172         without blocking."""
173         return self.get(False)
174
175 class Parallel:
176     """This class is used to execute tasks in parallel, and is somewhat 
177     less efficient than Serial, but is appropriate for parallel builds.
178
179     This class is thread safe.
180     """
181
182     def __init__(self, taskmaster, num):
183         """Create a new parallel job given a taskmaster.
184
185         The taskmaster's next_task() method should return the next task
186         that needs to be executed, or None if there are no more tasks. The
187         taskmaster's executed() method will be called for each task when it
188         is successfully executed or failed() will be called if the task
189         failed to execute (i.e. execute() raised an exception).  The
190         taskmaster's is_blocked() method should return true iff there are
191         more tasks, but they can't be executed until one or more other
192         tasks have been executed. next_task() will be called iff
193         is_blocked() returned false.
194
195         Note: calls to taskmaster are serialized, but calls to execute() on
196         distinct tasks are not serialized, because that is the whole point
197         of parallel jobs: they can execute multiple tasks
198         simultaneously. """
199
200         self.taskmaster = taskmaster
201         self.tp = ThreadPool(num)
202
203     def start(self):
204         """Start the job. This will begin pulling tasks from the
205         taskmaster and executing them, and return when there are no
206         more tasks. If a task fails to execute (i.e. execute() raises
207         an exception), then the job will stop."""
208
209         while 1:
210             task = self.taskmaster.next_task()
211             if task is None:
212                 break
213
214             # prepare task for execution
215             try:
216                 task.prepare()
217             except KeyboardInterrupt:
218                 raise
219             except:
220                 # Let the failed() callback function arrange for the
221                 # build to stop if that's appropriate.
222                 task.failed()
223
224             # dispatch task
225             self.tp.put(task)
226
227             while 1:
228                 try:
229                     task, ok = self.tp.get_nowait()
230                 except Queue.Empty:
231                     if not self.taskmaster.is_blocked():
232                         break
233                     task, ok = self.tp.get()
234
235                 if ok:
236                     task.executed()
237                 else:
238                     task.failed()