3 This module defines the Serial and Parallel classes that execute tasks to
4 complete a build. The Jobs class provides a higher level interface to start,
5 stop, and wait on jobs.
12 # Permission is hereby granted, free of charge, to any person obtaining
13 # a copy of this software and associated documentation files (the
14 # "Software"), to deal in the Software without restriction, including
15 # without limitation the rights to use, copy, modify, merge, publish,
16 # distribute, sublicense, and/or sell copies of the Software, and to
17 # permit persons to whom the Software is furnished to do so, subject to
18 # the following conditions:
20 # The above copyright notice and this permission notice shall be included
21 # in all copies or substantial portions of the Software.
23 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
24 # KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
25 # WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
27 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
28 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
29 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
32 __revision__ = "__FILE__ __REVISION__ __DATE__ __DEVELOPER__"
35 """An instance of this class initializes N jobs, and provides
36 methods for starting, stopping, and waiting on all N jobs.
39 def __init__(self, num, taskmaster):
41 create 'num' jobs using the given taskmaster.
43 If 'num' is 1 or less, then a serial job will be used,
44 otherwise a parallel job with 'num' worker threads will
49 self.job = Parallel(taskmaster, num)
51 self.job = Serial(taskmaster)
57 except KeyboardInterrupt:
58 # mask any further keyboard interrupts so that scons
59 # can shutdown cleanly:
60 # (this only masks the keyboard interrupt for Python,
61 # child processes can still get the keyboard interrupt)
63 signal.signal(signal.SIGINT, signal.SIG_IGN)
67 """This class is used to execute tasks in series, and is more efficient
68 than Parallel, but is only appropriate for non-parallel builds. Only
69 one instance of this class should be in existence at a time.
71 This class is not thread safe.
74 def __init__(self, taskmaster):
75 """Create a new serial job given a taskmaster.
77 The taskmaster's next_task() method should return the next task
78 that needs to be executed, or None if there are no more tasks. The
79 taskmaster's executed() method will be called for each task when it
80 is successfully executed or failed() will be called if it failed to
81 execute (e.g. execute() raised an exception). The taskmaster's
82 is_blocked() method will not be called. """
84 self.taskmaster = taskmaster
87 """Start the job. This will begin pulling tasks from the taskmaster
88 and executing them, and return when there are no more tasks. If a task
89 fails to execute (i.e. execute() raises an exception), then the job will
93 task = self.taskmaster.next_task()
101 except KeyboardInterrupt:
104 # Let the failed() callback function arrange for the
105 # build to stop if that's appropriate.
111 # Trap import failure so that everything in the Job module but the
112 # Parallel class (and its dependent classes) will work if the interpreter
113 # doesn't support threads.
120 class Worker(threading.Thread):
121 """A worker thread waits on a task to be posted to its request queue,
122 dequeues the task, executes it, and posts a tuple including the task
123 and a boolean indicating whether the task executed successfully. """
125 def __init__(self, requestQueue, resultsQueue):
126 threading.Thread.__init__(self)
128 self.requestQueue = requestQueue
129 self.resultsQueue = resultsQueue
134 task = self.requestQueue.get()
143 self.resultsQueue.put((task, ok))
146 """This class is responsible for spawning and managing worker threads."""
148 def __init__(self, num):
149 """Create the request and reply queues, and 'num' worker threads."""
150 # Ideally we wouldn't have to artificially limit the number of
151 # tasks that can be posted to the request queue. But this can
152 # result in a large number of pending tasks, which at the time
153 # of this writing causes the taskmaster's next_task method to
154 # take a very long time.
155 self.requestQueue = Queue.Queue(num)
156 self.resultsQueue = Queue.Queue()
158 # Create worker threads
160 worker = Worker(self.requestQueue, self.resultsQueue)
163 """Put task into request queue."""
164 self.requestQueue.put(obj)
166 def get(self, block = 1):
167 """Remove and return a result tuple from the results queue."""
168 return self.resultsQueue.get(block)
170 def get_nowait(self):
171 """Remove and result a result tuple from the results queue
173 return self.get(False)
176 """This class is used to execute tasks in parallel, and is somewhat
177 less efficient than Serial, but is appropriate for parallel builds.
179 This class is thread safe.
182 def __init__(self, taskmaster, num):
183 """Create a new parallel job given a taskmaster.
185 The taskmaster's next_task() method should return the next task
186 that needs to be executed, or None if there are no more tasks. The
187 taskmaster's executed() method will be called for each task when it
188 is successfully executed or failed() will be called if the task
189 failed to execute (i.e. execute() raised an exception). The
190 taskmaster's is_blocked() method should return true iff there are
191 more tasks, but they can't be executed until one or more other
192 tasks have been executed. next_task() will be called iff
193 is_blocked() returned false.
195 Note: calls to taskmaster are serialized, but calls to execute() on
196 distinct tasks are not serialized, because that is the whole point
197 of parallel jobs: they can execute multiple tasks
200 self.taskmaster = taskmaster
201 self.tp = ThreadPool(num)
204 """Start the job. This will begin pulling tasks from the
205 taskmaster and executing them, and return when there are no
206 more tasks. If a task fails to execute (i.e. execute() raises
207 an exception), then the job will stop."""
210 task = self.taskmaster.next_task()
214 # prepare task for execution
217 except KeyboardInterrupt:
220 # Let the failed() callback function arrange for the
221 # build to stop if that's appropriate.
229 task, ok = self.tp.get_nowait()
231 if not self.taskmaster.is_blocked():
233 task, ok = self.tp.get()