3 This module defines the Serial and Parallel classes that execute tasks to
4 complete a build. The Jobs class provides a higher level interface to start,
5 stop, and wait on jobs.
12 # Permission is hereby granted, free of charge, to any person obtaining
13 # a copy of this software and associated documentation files (the
14 # "Software"), to deal in the Software without restriction, including
15 # without limitation the rights to use, copy, modify, merge, publish,
16 # distribute, sublicense, and/or sell copies of the Software, and to
17 # permit persons to whom the Software is furnished to do so, subject to
18 # the following conditions:
20 # The above copyright notice and this permission notice shall be included
21 # in all copies or substantial portions of the Software.
23 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
24 # KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
25 # WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
27 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
28 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
29 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
32 __revision__ = "__FILE__ __REVISION__ __DATE__ __DEVELOPER__"
35 """An instance of this class initializes N jobs, and provides
36 methods for starting, stopping, and waiting on all N jobs.
39 def __init__(self, num, taskmaster):
41 create 'num' jobs using the given taskmaster.
43 If 'num' is 1 or less, then a serial job will be used,
44 otherwise a parallel job with 'num' worker threads will
49 self.job = Parallel(taskmaster, num)
51 self.job = Serial(taskmaster)
57 except KeyboardInterrupt:
58 # mask any further keyboard interrupts so that scons
59 # can shutdown cleanly:
60 # (this only masks the keyboard interrupt for Python,
61 # child processes can still get the keyboard interrupt)
63 signal.signal(signal.SIGINT, signal.SIG_IGN)
67 """This class is used to execute tasks in series, and is more efficient
68 than Parallel, but is only appropriate for non-parallel builds. Only
69 one instance of this class should be in existence at a time.
71 This class is not thread safe.
74 def __init__(self, taskmaster):
75 """Create a new serial job given a taskmaster.
77 The taskmaster's next_task() method should return the next task
78 that needs to be executed, or None if there are no more tasks. The
79 taskmaster's executed() method will be called for each task when it
80 is successfully executed or failed() will be called if it failed to
81 execute (e.g. execute() raised an exception). The taskmaster's
82 is_blocked() method will not be called. """
84 self.taskmaster = taskmaster
87 """Start the job. This will begin pulling tasks from the taskmaster
88 and executing them, and return when there are no more tasks. If a task
89 fails to execute (i.e. execute() raises an exception), then the job will
93 task = self.taskmaster.next_task()
101 except KeyboardInterrupt:
104 # Let the failed() callback function arrange for the
105 # build to stop if that's appropriate.
111 # Trap import failure so that everything in the Job module but the
112 # Parallel class (and its dependent classes) will work if the interpreter
113 # doesn't support threads.
120 class Worker(threading.Thread):
121 """A worker thread waits on a task to be posted to its request queue,
122 dequeues the task, executes it, and posts a tuple including the task
123 and a boolean indicating whether the task executed successfully. """
125 def __init__(self, requestQueue, resultsQueue):
126 threading.Thread.__init__(self)
128 self.requestQueue = requestQueue
129 self.resultsQueue = resultsQueue
134 task = self.requestQueue.get()
143 self.resultsQueue.put((task, ok))
146 """This class is responsible for spawning and managing worker threads."""
148 def __init__(self, num):
149 """Create the request and reply queues, and 'num' worker threads."""
150 self.requestQueue = Queue.Queue(0)
151 self.resultsQueue = Queue.Queue(0)
153 # Create worker threads
155 worker = Worker(self.requestQueue, self.resultsQueue)
158 """Put task into request queue."""
159 self.requestQueue.put(obj)
161 def get(self, block = 1):
162 """Remove and return a result tuple from the results queue."""
163 return self.resultsQueue.get(block)
165 def get_nowait(self):
166 """Remove and result a result tuple from the results queue
171 """This class is used to execute tasks in parallel, and is somewhat
172 less efficient than Serial, but is appropriate for parallel builds.
174 This class is thread safe.
177 def __init__(self, taskmaster, num):
178 """Create a new parallel job given a taskmaster.
180 The taskmaster's next_task() method should return the next task
181 that needs to be executed, or None if there are no more tasks. The
182 taskmaster's executed() method will be called for each task when it
183 is successfully executed or failed() will be called if the task
184 failed to execute (i.e. execute() raised an exception). The
185 taskmaster's is_blocked() method should return true iff there are
186 more tasks, but they can't be executed until one or more other
187 tasks have been executed. next_task() will be called iff
188 is_blocked() returned false.
190 Note: calls to taskmaster are serialized, but calls to execute() on
191 distinct tasks are not serialized, because that is the whole point
192 of parallel jobs: they can execute multiple tasks
195 self.taskmaster = taskmaster
196 self.tp = ThreadPool(num)
202 """Start the job. This will begin pulling tasks from the
203 taskmaster and executing them, and return when there are no
204 more tasks. If a task fails to execute (i.e. execute() raises
205 an exception), then the job will stop."""
208 if self.jobs < self.maxjobs:
209 task = self.taskmaster.next_task()
213 # prepare task for execution
216 except KeyboardInterrupt:
219 # Let the failed() callback function arrange for the
220 # build to stop if that's appropriate.
225 self.jobs = self.jobs + 1
229 task, ok = self.tp.get_nowait()
231 if not (self.jobs is self.maxjobs or self.taskmaster.is_blocked()):
233 task, ok = self.tp.get()
235 self.jobs = self.jobs - 1