Allow SConsignFile() to take a dbm module argument; portability fixes. (Ralf W....
[scons.git] / src / engine / SCons / Job.py
1 """SCons.Job
2
3 This module defines the Serial and Parallel classes that execute tasks to
4 complete a build. The Jobs class provides a higher level interface to start,
5 stop, and wait on jobs.
6
7 """
8
9 #
10 # __COPYRIGHT__
11 #
12 # Permission is hereby granted, free of charge, to any person obtaining
13 # a copy of this software and associated documentation files (the
14 # "Software"), to deal in the Software without restriction, including
15 # without limitation the rights to use, copy, modify, merge, publish,
16 # distribute, sublicense, and/or sell copies of the Software, and to
17 # permit persons to whom the Software is furnished to do so, subject to
18 # the following conditions:
19 #
20 # The above copyright notice and this permission notice shall be included
21 # in all copies or substantial portions of the Software.
22 #
23 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
24 # KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
25 # WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
27 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
28 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
29 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
30 #
31
32 __revision__ = "__FILE__ __REVISION__ __DATE__ __DEVELOPER__"
33
34 class Jobs:
35     """An instance of this class initializes N jobs, and provides
36     methods for starting, stopping, and waiting on all N jobs.
37     """
38
39     def __init__(self, num, taskmaster):
40         """
41         create 'num' jobs using the given taskmaster.
42
43         If 'num' is 1 or less, then a serial job will be used,
44         otherwise a parallel job with 'num' worker threads will
45         be used.
46         """
47
48         if num > 1:
49             self.job = Parallel(taskmaster, num)
50         else:
51             self.job = Serial(taskmaster)
52
53     def run(self):
54         """run the job"""
55         try:
56             self.job.start()
57         except KeyboardInterrupt:
58             # mask any further keyboard interrupts so that scons
59             # can shutdown cleanly:
60             # (this only masks the keyboard interrupt for Python,
61             #  child processes can still get the keyboard interrupt)
62             import signal
63             signal.signal(signal.SIGINT, signal.SIG_IGN)
64             raise
65
66 class Serial:
67     """This class is used to execute tasks in series, and is more efficient
68     than Parallel, but is only appropriate for non-parallel builds. Only
69     one instance of this class should be in existence at a time.
70
71     This class is not thread safe.
72     """
73
74     def __init__(self, taskmaster):
75         """Create a new serial job given a taskmaster. 
76
77         The taskmaster's next_task() method should return the next task
78         that needs to be executed, or None if there are no more tasks. The
79         taskmaster's executed() method will be called for each task when it
80         is successfully executed or failed() will be called if it failed to
81         execute (e.g. execute() raised an exception). The taskmaster's
82         is_blocked() method will not be called.  """
83         
84         self.taskmaster = taskmaster
85
86     def start(self):
87         """Start the job. This will begin pulling tasks from the taskmaster
88         and executing them, and return when there are no more tasks. If a task
89         fails to execute (i.e. execute() raises an exception), then the job will
90         stop."""
91         
92         while 1:
93             task = self.taskmaster.next_task()
94
95             if task is None:
96                 break
97
98             try:
99                 task.prepare()
100                 task.execute()
101             except KeyboardInterrupt:
102                 raise
103             except:
104                 # Let the failed() callback function arrange for the
105                 # build to stop if that's appropriate.
106                 task.failed()
107             else:
108                 task.executed()
109
110
111 # Trap import failure so that everything in the Job module but the
112 # Parallel class (and its dependent classes) will work if the interpreter
113 # doesn't support threads.
114 try:
115     import Queue
116     import threading
117 except ImportError:
118     pass
119
120 class Worker(threading.Thread):
121     """A worker thread waits on a task to be posted to its request queue,
122     dequeues the task, executes it, and posts a tuple including the task
123     and a boolean indicating whether the task executed successfully. """
124
125     def __init__(self, requestQueue, resultsQueue):
126         threading.Thread.__init__(self)
127         self.setDaemon(1)
128         self.requestQueue = requestQueue
129         self.resultsQueue = resultsQueue
130         self.start()
131
132     def run(self):
133         while 1:
134             task = self.requestQueue.get()
135
136             try:
137                 task.execute()
138             except:
139                 ok = 0
140             else:
141                 ok = 1
142
143             self.resultsQueue.put((task, ok))
144
145 class ThreadPool:
146     """This class is responsible for spawning and managing worker threads."""
147
148     def __init__(self, num):
149         """Create the request and reply queues, and 'num' worker threads."""
150         self.requestQueue = Queue.Queue(0)
151         self.resultsQueue = Queue.Queue(0)
152
153         # Create worker threads
154         for i in range(num):
155             worker = Worker(self.requestQueue, self.resultsQueue)
156
157     def put(self, obj):
158         """Put task into request queue."""
159         self.requestQueue.put(obj)
160
161     def get(self, block = 1):
162         """Remove and return a result tuple from the results queue."""
163         return self.resultsQueue.get(block)
164         
165     def get_nowait(self):
166         """Remove and result a result tuple from the results queue 
167         without blocking."""
168         return self.get(0)
169
170 class Parallel:
171     """This class is used to execute tasks in parallel, and is somewhat 
172     less efficient than Serial, but is appropriate for parallel builds.
173
174     This class is thread safe.
175     """
176
177     def __init__(self, taskmaster, num):
178         """Create a new parallel job given a taskmaster.
179
180         The taskmaster's next_task() method should return the next task
181         that needs to be executed, or None if there are no more tasks. The
182         taskmaster's executed() method will be called for each task when it
183         is successfully executed or failed() will be called if the task
184         failed to execute (i.e. execute() raised an exception).  The
185         taskmaster's is_blocked() method should return true iff there are
186         more tasks, but they can't be executed until one or more other
187         tasks have been executed. next_task() will be called iff
188         is_blocked() returned false.
189
190         Note: calls to taskmaster are serialized, but calls to execute() on
191         distinct tasks are not serialized, because that is the whole point
192         of parallel jobs: they can execute multiple tasks
193         simultaneously. """
194
195         self.taskmaster = taskmaster
196         self.tp = ThreadPool(num)
197
198         self.jobs = 0
199         self.maxjobs = num
200
201     def start(self):
202         """Start the job. This will begin pulling tasks from the
203         taskmaster and executing them, and return when there are no
204         more tasks. If a task fails to execute (i.e. execute() raises
205         an exception), then the job will stop."""
206
207         while 1:
208             if self.jobs < self.maxjobs:
209                 task = self.taskmaster.next_task()
210                 if task is None:
211                     break
212
213                 # prepare task for execution
214                 try:
215                     task.prepare()
216                 except KeyboardInterrupt:
217                     raise
218                 except:
219                     # Let the failed() callback function arrange for the
220                     # build to stop if that's appropriate.
221                     task.failed()
222
223                 # dispatch task
224                 self.tp.put(task)
225                 self.jobs = self.jobs + 1
226
227             while 1:
228                 try:
229                     task, ok = self.tp.get_nowait()
230                 except Queue.Empty:
231                     if not (self.jobs is self.maxjobs or self.taskmaster.is_blocked()):
232                         break
233                     task, ok = self.tp.get()
234
235                 self.jobs = self.jobs - 1
236                 if ok:
237                     task.executed()
238                 else:
239                     task.failed()