3 This module defines the Serial and Parallel classes that execute tasks to
4 complete a build. The Jobs class provides a higher level interface to start,
5 stop, and wait on jobs.
12 # Permission is hereby granted, free of charge, to any person obtaining
13 # a copy of this software and associated documentation files (the
14 # "Software"), to deal in the Software without restriction, including
15 # without limitation the rights to use, copy, modify, merge, publish,
16 # distribute, sublicense, and/or sell copies of the Software, and to
17 # permit persons to whom the Software is furnished to do so, subject to
18 # the following conditions:
20 # The above copyright notice and this permission notice shall be included
21 # in all copies or substantial portions of the Software.
23 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
24 # KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
25 # WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
27 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
28 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
29 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
32 __revision__ = "__FILE__ __REVISION__ __DATE__ __DEVELOPER__"
37 """An instance of this class initializes N jobs, and provides
38 methods for starting, stopping, and waiting on all N jobs.
41 def __init__(self, num, taskmaster):
43 create 'num' jobs using the given taskmaster.
45 If 'num' is 1 or less, then a serial job will be used,
46 otherwise a parallel job with 'num' worker threads will
49 The 'num_jobs' attribute will be set to the actual number of jobs
50 allocated. If more than one job is requested but the Parallel
51 class can't do it, it gets reset to 1. Wrapping interfaces that
52 care should check the value of 'num_jobs' after initialization.
58 self.job = Parallel(taskmaster, num)
63 self.job = Serial(taskmaster)
70 except KeyboardInterrupt:
71 # mask any further keyboard interrupts so that scons
72 # can shutdown cleanly:
73 # (this only masks the keyboard interrupt for Python,
74 # child processes can still get the keyboard interrupt)
76 signal.signal(signal.SIGINT, signal.SIG_IGN)
80 """This class is used to execute tasks in series, and is more efficient
81 than Parallel, but is only appropriate for non-parallel builds. Only
82 one instance of this class should be in existence at a time.
84 This class is not thread safe.
87 def __init__(self, taskmaster):
88 """Create a new serial job given a taskmaster.
90 The taskmaster's next_task() method should return the next task
91 that needs to be executed, or None if there are no more tasks. The
92 taskmaster's executed() method will be called for each task when it
93 is successfully executed or failed() will be called if it failed to
94 execute (e.g. execute() raised an exception)."""
96 self.taskmaster = taskmaster
99 """Start the job. This will begin pulling tasks from the taskmaster
100 and executing them, and return when there are no more tasks. If a task
101 fails to execute (i.e. execute() raises an exception), then the job will
105 task = self.taskmaster.next_task()
113 except KeyboardInterrupt:
117 # Let the failed() callback function arrange for the
118 # build to stop if that's appropriate.
126 # Trap import failure so that everything in the Job module but the
127 # Parallel class (and its dependent classes) will work if the interpreter
128 # doesn't support threads.
135 class Worker(threading.Thread):
136 """A worker thread waits on a task to be posted to its request queue,
137 dequeues the task, executes it, and posts a tuple including the task
138 and a boolean indicating whether the task executed successfully. """
140 def __init__(self, requestQueue, resultsQueue):
141 threading.Thread.__init__(self)
143 self.requestQueue = requestQueue
144 self.resultsQueue = resultsQueue
149 task = self.requestQueue.get()
153 except KeyboardInterrupt:
154 # be explicit here for test/interrupts.py
162 self.resultsQueue.put((task, ok))
165 """This class is responsible for spawning and managing worker threads."""
167 def __init__(self, num):
168 """Create the request and reply queues, and 'num' worker threads."""
169 self.requestQueue = Queue.Queue(0)
170 self.resultsQueue = Queue.Queue(0)
172 # Create worker threads
174 Worker(self.requestQueue, self.resultsQueue)
177 """Put task into request queue."""
178 self.requestQueue.put(obj)
180 def get(self, block = True):
181 """Remove and return a result tuple from the results queue."""
182 return self.resultsQueue.get(block)
184 def preparation_failed(self, obj):
185 self.resultsQueue.put((obj, 0))
188 """This class is used to execute tasks in parallel, and is somewhat
189 less efficient than Serial, but is appropriate for parallel builds.
191 This class is thread safe.
194 def __init__(self, taskmaster, num):
195 """Create a new parallel job given a taskmaster.
197 The taskmaster's next_task() method should return the next
198 task that needs to be executed, or None if there are no more
199 tasks. The taskmaster's executed() method will be called
200 for each task when it is successfully executed or failed()
201 will be called if the task failed to execute (i.e. execute()
202 raised an exception).
204 Note: calls to taskmaster are serialized, but calls to
205 execute() on distinct tasks are not serialized, because
206 that is the whole point of parallel jobs: they can execute
207 multiple tasks simultaneously. """
209 self.taskmaster = taskmaster
210 self.tp = ThreadPool(num)
215 """Start the job. This will begin pulling tasks from the
216 taskmaster and executing them, and return when there are no
217 more tasks. If a task fails to execute (i.e. execute() raises
218 an exception), then the job will stop."""
223 # Start up as many available tasks as we're
225 while jobs < self.maxjobs:
226 task = self.taskmaster.next_task()
230 # prepare task for execution
233 except KeyboardInterrupt:
236 # Let the failed() callback function arrange
237 # for the build to stop if that's appropriate.
239 self.tp.preparation_failed(task)
247 if not task and not jobs: break
249 # Let any/all completed tasks finish up before we go
250 # back and put the next batch of tasks on the queue.
252 task, ok = self.tp.get()
262 if self.tp.resultsQueue.empty():