3 This module defines the Serial and Parallel classes that execute tasks to
4 complete a build. The Jobs class provides a higher level interface to start,
5 stop, and wait on jobs.
12 # Permission is hereby granted, free of charge, to any person obtaining
13 # a copy of this software and associated documentation files (the
14 # "Software"), to deal in the Software without restriction, including
15 # without limitation the rights to use, copy, modify, merge, publish,
16 # distribute, sublicense, and/or sell copies of the Software, and to
17 # permit persons to whom the Software is furnished to do so, subject to
18 # the following conditions:
20 # The above copyright notice and this permission notice shall be included
21 # in all copies or substantial portions of the Software.
23 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
24 # KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
25 # WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
27 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
28 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
29 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
32 __revision__ = "__FILE__ __REVISION__ __DATE__ __DEVELOPER__"
35 """An instance of this class initializes N jobs, and provides
36 methods for starting, stopping, and waiting on all N jobs.
39 def __init__(self, num, taskmaster):
41 create 'num' jobs using the given taskmaster.
43 If 'num' is 1 or less, then a serial job will be used,
44 otherwise a parallel job with 'num' worker threads will
47 The 'num_jobs' attribute will be set to the actual number of jobs
48 allocated. If more than one job is requested but the Parallel
49 class can't do it, it gets reset to 1. Wrapping interfaces that
50 care should check the value of 'num_jobs' after initialization.
56 self.job = Parallel(taskmaster, num)
61 self.job = Serial(taskmaster)
68 except KeyboardInterrupt:
69 # mask any further keyboard interrupts so that scons
70 # can shutdown cleanly:
71 # (this only masks the keyboard interrupt for Python,
72 # child processes can still get the keyboard interrupt)
74 signal.signal(signal.SIGINT, signal.SIG_IGN)
78 """This class is used to execute tasks in series, and is more efficient
79 than Parallel, but is only appropriate for non-parallel builds. Only
80 one instance of this class should be in existence at a time.
82 This class is not thread safe.
85 def __init__(self, taskmaster):
86 """Create a new serial job given a taskmaster.
88 The taskmaster's next_task() method should return the next task
89 that needs to be executed, or None if there are no more tasks. The
90 taskmaster's executed() method will be called for each task when it
91 is successfully executed or failed() will be called if it failed to
92 execute (e.g. execute() raised an exception)."""
94 self.taskmaster = taskmaster
97 """Start the job. This will begin pulling tasks from the taskmaster
98 and executing them, and return when there are no more tasks. If a task
99 fails to execute (i.e. execute() raises an exception), then the job will
103 task = self.taskmaster.next_task()
111 except KeyboardInterrupt:
115 # Let the failed() callback function arrange for the
116 # build to stop if that's appropriate.
124 # Trap import failure so that everything in the Job module but the
125 # Parallel class (and its dependent classes) will work if the interpreter
126 # doesn't support threads.
133 class Worker(threading.Thread):
134 """A worker thread waits on a task to be posted to its request queue,
135 dequeues the task, executes it, and posts a tuple including the task
136 and a boolean indicating whether the task executed successfully. """
138 def __init__(self, requestQueue, resultsQueue):
139 threading.Thread.__init__(self)
141 self.requestQueue = requestQueue
142 self.resultsQueue = resultsQueue
147 task = self.requestQueue.get()
151 except KeyboardInterrupt:
152 # be explicit here for test/interrupts.py
160 self.resultsQueue.put((task, ok))
163 """This class is responsible for spawning and managing worker threads."""
165 def __init__(self, num):
166 """Create the request and reply queues, and 'num' worker threads."""
167 self.requestQueue = Queue.Queue(0)
168 self.resultsQueue = Queue.Queue(0)
170 # Create worker threads
172 Worker(self.requestQueue, self.resultsQueue)
175 """Put task into request queue."""
176 self.requestQueue.put(obj)
178 def get(self, block = 1):
179 """Remove and return a result tuple from the results queue."""
180 return self.resultsQueue.get(block)
182 def preparation_failed(self, obj):
183 self.resultsQueue.put((obj, 0))
186 """This class is used to execute tasks in parallel, and is somewhat
187 less efficient than Serial, but is appropriate for parallel builds.
189 This class is thread safe.
192 def __init__(self, taskmaster, num):
193 """Create a new parallel job given a taskmaster.
195 The taskmaster's next_task() method should return the next
196 task that needs to be executed, or None if there are no more
197 tasks. The taskmaster's executed() method will be called
198 for each task when it is successfully executed or failed()
199 will be called if the task failed to execute (i.e. execute()
200 raised an exception).
202 Note: calls to taskmaster are serialized, but calls to
203 execute() on distinct tasks are not serialized, because
204 that is the whole point of parallel jobs: they can execute
205 multiple tasks simultaneously. """
207 self.taskmaster = taskmaster
208 self.tp = ThreadPool(num)
213 """Start the job. This will begin pulling tasks from the
214 taskmaster and executing them, and return when there are no
215 more tasks. If a task fails to execute (i.e. execute() raises
216 an exception), then the job will stop."""
221 # Start up as many available tasks as we're
223 while jobs < self.maxjobs:
224 task = self.taskmaster.next_task()
228 # prepare task for execution
231 except KeyboardInterrupt:
234 # Let the failed() callback function arrange
235 # for the build to stop if that's appropriate.
237 self.tp.preparation_failed(task)
245 if not task and not jobs: break
247 # Let any/all completed tasks finish up before we go
248 # back and put the next batch of tasks on the queue.
250 task, ok = self.tp.get()
260 if self.tp.resultsQueue.empty():