4 # Permission is hereby granted, free of charge, to any person obtaining
5 # a copy of this software and associated documentation files (the
6 # "Software"), to deal in the Software without restriction, including
7 # without limitation the rights to use, copy, modify, merge, publish,
8 # distribute, sublicense, and/or sell copies of the Software, and to
9 # permit persons to whom the Software is furnished to do so, subject to
10 # the following conditions:
12 # The above copyright notice and this permission notice shall be included
13 # in all copies or substantial portions of the Software.
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
16 # KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
17 # WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
18 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
19 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
20 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
21 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23 from __future__ import generators ### KEEP FOR COMPATIBILITY FIXERS
25 __revision__ = "__FILE__ __REVISION__ __DATE__ __DEVELOPER__"
37 # how many parallel jobs to perform for the test
40 # how many tasks to perform for the test
41 num_tasks = num_jobs*5
44 "fake lock class to use if threads are not supported"
51 class NoThreadsException:
52 "raised by the ParallelTestCase if threads are not supported"
55 return "the interpreter doesn't support threads"
58 """A dummy task class for testing purposes."""
60 def __init__(self, i, taskmaster):
62 self.taskmaster = taskmaster
69 def _do_something(self):
72 def needs_execute(self):
76 self.taskmaster.test_case.failUnless(self.was_prepared,
77 "the task wasn't prepared")
79 self.taskmaster.guard.acquire()
80 self.taskmaster.begin_list.append(self.i)
81 self.taskmaster.guard.release()
87 self.taskmaster.guard.acquire()
88 self.taskmaster.end_list.append(self.i)
89 self.taskmaster.guard.release()
92 self.taskmaster.num_executed = self.taskmaster.num_executed + 1
94 self.taskmaster.test_case.failUnless(self.was_prepared,
95 "the task wasn't prepared")
96 self.taskmaster.test_case.failUnless(self.was_executed,
97 "the task wasn't really executed")
98 self.taskmaster.test_case.failUnless(isinstance(self, Task),
99 "the task wasn't really a Task instance")
102 self.taskmaster.num_failed = self.taskmaster.num_failed + 1
103 self.taskmaster.stop = 1
104 self.taskmaster.test_case.failUnless(self.was_prepared,
105 "the task wasn't prepared")
107 def postprocess(self):
108 self.taskmaster.num_postprocessed = self.taskmaster.num_postprocessed + 1
110 class RandomTask(Task):
111 def _do_something(self):
112 # do something that will take some random amount of time:
113 for i in range(random.randrange(0, num_sines, 1)):
118 """A dummy task class for testing purposes."""
120 def __init__(self, i, taskmaster):
121 self.taskmaster = taskmaster
122 self.was_prepared = 0
125 self.was_prepared = 1
127 def needs_execute(self):
134 self.taskmaster.num_executed = self.taskmaster.num_executed + 1
136 self.taskmaster.test_case.failUnless(self.was_prepared,
137 "the task wasn't prepared")
138 self.taskmaster.test_case.failUnless(self.was_executed,
139 "the task wasn't really executed")
140 self.taskmaster.test_case.failUnless(self.__class__ is Task,
141 "the task wasn't really a Task instance")
144 self.taskmaster.num_failed = self.taskmaster.num_failed + 1
145 self.taskmaster.stop = 1
146 self.taskmaster.test_case.failUnless(self.was_prepared,
147 "the task wasn't prepared")
149 def postprocess(self):
150 self.taskmaster.num_postprocessed = self.taskmaster.num_postprocessed + 1
152 def exception_set(self):
153 self.taskmaster.exception_set()
156 """A dummy taskmaster class for testing the job classes."""
158 def __init__(self, n, test_case, Task):
159 """n is the number of dummy tasks to perform."""
161 self.test_case = test_case
164 self.num_iterated = 0
165 self.num_executed = 0
167 self.num_postprocessed = 0
169 # 'guard' guards 'task_begin_list' and 'task_end_list'
172 self.guard = threading.Lock()
174 self.guard = DummyLock()
176 # keep track of the order tasks are begun in
179 # keep track of the order tasks are completed in
183 if self.stop or self.all_tasks_are_iterated():
186 self.num_iterated = self.num_iterated + 1
187 return self.Task(self.num_iterated, self)
189 def all_tasks_are_executed(self):
190 return self.num_executed == self.num_tasks
192 def all_tasks_are_iterated(self):
193 return self.num_iterated == self.num_tasks
195 def all_tasks_are_postprocessed(self):
196 return self.num_postprocessed == self.num_tasks
198 def tasks_were_serial(self):
199 "analyze the task order to see if they were serial"
200 serial = 1 # assume the tasks were serial
201 for i in range(num_tasks):
202 serial = serial and (self.begin_list[i]
207 def exception_set(self):
213 SaveThreadPool = None
214 ThreadPoolCallList = []
216 class ParallelTestCase(unittest.TestCase):
223 raise NoThreadsException()
225 taskmaster = Taskmaster(num_tasks, self, RandomTask)
226 jobs = SCons.Job.Jobs(num_jobs, taskmaster)
229 self.failUnless(not taskmaster.tasks_were_serial(),
230 "the tasks were not executed in parallel")
231 self.failUnless(taskmaster.all_tasks_are_executed(),
232 "all the tests were not executed")
233 self.failUnless(taskmaster.all_tasks_are_iterated(),
234 "all the tests were not iterated over")
235 self.failUnless(taskmaster.all_tasks_are_postprocessed(),
236 "all the tests were not postprocessed")
237 self.failIf(taskmaster.num_failed,
238 "some task(s) failed to execute")
240 # Verify that parallel jobs will pull all of the completed tasks
241 # out of the queue at once, instead of one by one. We do this by
242 # replacing the default ThreadPool class with one that records the
243 # order in which tasks are put() and get() to/from the pool, and
244 # which sleeps a little bit before call get() to let the initial
245 # tasks complete and get their notifications on the resultsQueue.
247 class SleepTask(Task):
248 def _do_something(self):
251 global SaveThreadPool
252 SaveThreadPool = SCons.Job.ThreadPool
254 class WaitThreadPool(SaveThreadPool):
256 ThreadPoolCallList.append('put(%s)' % task.i)
257 return SaveThreadPool.put(self, task)
260 result = SaveThreadPool.get(self)
261 ThreadPoolCallList.append('get(%s)' % result[0].i)
264 SCons.Job.ThreadPool = WaitThreadPool
267 taskmaster = Taskmaster(3, self, SleepTask)
268 jobs = SCons.Job.Jobs(2, taskmaster)
271 # The key here is that we get(1) and get(2) from the
272 # resultsQueue before we put(3), but get(1) and get(2) can
273 # be in either order depending on how the first two parallel
274 # tasks get scheduled by the operating system.
276 ['put(1)', 'put(2)', 'get(1)', 'get(2)', 'put(3)', 'get(3)'],
277 ['put(1)', 'put(2)', 'get(2)', 'get(1)', 'put(3)', 'get(3)'],
279 assert ThreadPoolCallList in expect, ThreadPoolCallList
282 SCons.Job.ThreadPool = SaveThreadPool
284 class SerialTestCase(unittest.TestCase):
288 taskmaster = Taskmaster(num_tasks, self, RandomTask)
289 jobs = SCons.Job.Jobs(1, taskmaster)
292 self.failUnless(taskmaster.tasks_were_serial(),
293 "the tasks were not executed in series")
294 self.failUnless(taskmaster.all_tasks_are_executed(),
295 "all the tests were not executed")
296 self.failUnless(taskmaster.all_tasks_are_iterated(),
297 "all the tests were not iterated over")
298 self.failUnless(taskmaster.all_tasks_are_postprocessed(),
299 "all the tests were not postprocessed")
300 self.failIf(taskmaster.num_failed,
301 "some task(s) failed to execute")
303 class NoParallelTestCase(unittest.TestCase):
305 "test handling lack of parallel support"
306 def NoParallel(tm, num, stack_size):
308 save_Parallel = SCons.Job.Parallel
309 SCons.Job.Parallel = NoParallel
311 taskmaster = Taskmaster(num_tasks, self, RandomTask)
312 jobs = SCons.Job.Jobs(2, taskmaster)
313 self.failUnless(jobs.num_jobs == 1,
314 "unexpected number of jobs %d" % jobs.num_jobs)
316 self.failUnless(taskmaster.tasks_were_serial(),
317 "the tasks were not executed in series")
318 self.failUnless(taskmaster.all_tasks_are_executed(),
319 "all the tests were not executed")
320 self.failUnless(taskmaster.all_tasks_are_iterated(),
321 "all the tests were not iterated over")
322 self.failUnless(taskmaster.all_tasks_are_postprocessed(),
323 "all the tests were not postprocessed")
324 self.failIf(taskmaster.num_failed,
325 "some task(s) failed to execute")
327 SCons.Job.Parallel = save_Parallel
330 class SerialExceptionTestCase(unittest.TestCase):
332 "test a serial job with tasks that raise exceptions"
334 taskmaster = Taskmaster(num_tasks, self, ExceptionTask)
335 jobs = SCons.Job.Jobs(1, taskmaster)
338 self.failIf(taskmaster.num_executed,
339 "a task was executed")
340 self.failUnless(taskmaster.num_iterated == 1,
341 "exactly one task should have been iterated")
342 self.failUnless(taskmaster.num_failed == 1,
343 "exactly one task should have failed")
344 self.failUnless(taskmaster.num_postprocessed == 1,
345 "exactly one task should have been postprocessed")
347 class ParallelExceptionTestCase(unittest.TestCase):
349 "test parallel jobs with tasks that raise exceptions"
351 taskmaster = Taskmaster(num_tasks, self, ExceptionTask)
352 jobs = SCons.Job.Jobs(num_jobs, taskmaster)
355 self.failIf(taskmaster.num_executed,
356 "a task was executed")
357 self.failUnless(taskmaster.num_iterated >= 1,
358 "one or more task should have been iterated")
359 self.failUnless(taskmaster.num_failed >= 1,
360 "one or more tasks should have failed")
361 self.failUnless(taskmaster.num_postprocessed >= 1,
362 "one or more tasks should have been postprocessed")
364 #---------------------------------------------------------------------
365 # Above tested Job object with contrived Task and Taskmaster objects.
366 # Now test Job object with actual Task and Taskmaster objects.
368 import SCons.Taskmaster
373 def update(self, obj):
376 class testnode (SCons.Node.Node):
378 SCons.Node.Node.__init__(self)
379 self.expect_to_be = SCons.Node.executed
380 self.ninfo = DummyNodeInfo()
382 class goodnode (testnode):
384 SCons.Node.Node.__init__(self)
385 self.expect_to_be = SCons.Node.up_to_date
386 self.ninfo = DummyNodeInfo()
388 class slowgoodnode (goodnode):
390 # Delay to allow scheduled Jobs to run while the dispatcher
391 # sleeps. Keep this short because it affects the time taken
394 goodnode.prepare(self)
396 class badnode (goodnode):
398 goodnode.__init__(self)
399 self.expect_to_be = SCons.Node.failed
400 def build(self, **kw):
401 raise Exception('badnode exception')
403 class slowbadnode (badnode):
404 def build(self, **kw):
405 # Appears to take a while to build, allowing faster builds to
406 # overlap. Time duration is not especially important, but if
407 # it is faster than slowgoodnode then these could complete
408 # while the scheduler is sleeping.
410 raise Exception('slowbadnode exception')
412 class badpreparenode (badnode):
414 raise Exception('badpreparenode exception')
416 class _SConsTaskTest(unittest.TestCase):
418 def _test_seq(self, num_jobs):
426 [slowgoodnode, badnode],
427 [goodnode, slowbadnode],
428 [goodnode, goodnode, goodnode, slowbadnode],
429 [goodnode, slowbadnode, badpreparenode, slowgoodnode],
430 [goodnode, slowbadnode, slowgoodnode, badnode]
433 self._do_test(num_jobs, node_seq)
435 def _do_test(self, num_jobs, node_seq):
438 for tnum in range(num_tasks):
439 testnodes.append(node_seq[tnum % len(node_seq)]())
441 taskmaster = SCons.Taskmaster.Taskmaster(testnodes,
442 tasker=SCons.Taskmaster.AlwaysTask)
444 jobs = SCons.Job.Jobs(num_jobs, taskmaster)
446 # Exceptions thrown by tasks are not actually propagated to
447 # this level, but are instead stored in the Taskmaster.
451 # Now figure out if tests proceeded correctly. The first test
452 # that fails will shutdown the initiation of subsequent tests,
453 # but any tests currently queued for execution will still be
454 # processed, and any tests that completed before the failure
455 # would have resulted in new tests being queued for execution.
457 # Apply the following operational heuristics of Job.py:
458 # 0) An initial jobset of tasks will be queued before any
459 # good/bad results are obtained (from "execute" of task in
461 # 1) A goodnode will complete immediately on its thread and
462 # allow another node to be queued for execution.
463 # 2) A badnode will complete immediately and suppress any
464 # subsequent execution queuing, but all currently queued
465 # tasks will still be processed.
466 # 3) A slowbadnode will fail later. It will block slots in
467 # the job queue. Nodes that complete immediately will
468 # allow other nodes to be queued in their place, and this
469 # will continue until either (#2) above or until all job
470 # slots are filled with slowbadnode entries.
472 # One approach to validating this test would be to try to
473 # determine exactly how many nodes executed, how many didn't,
474 # and the results of each, and then to assert failure on any
475 # mismatch (including the total number of built nodes).
476 # However, while this is possible to do for a single-processor
477 # system, it is nearly impossible to predict correctly for a
478 # multi-processor system and still test the characteristics of
479 # delayed execution nodes. Stated another way, multithreading
480 # is inherently non-deterministic unless you can completely
481 # characterize the entire system, and since that's not
482 # possible here, we shouldn't try.
484 # Therefore, this test will simply scan the set of nodes to
485 # see if the node was executed or not and if it was executed
486 # that it obtained the expected value for that node
487 # (i.e. verifying we don't get failure crossovers or
488 # mislabelling of results).
491 state = N.get_state()
492 self.failUnless(state in [SCons.Node.no_state, N.expect_to_be],
493 "Node %s got unexpected result: %s" % (N, state))
495 self.failUnless([N for N in testnodes if N.get_state()],
496 "no nodes ran at all.")
499 class SerialTaskTest(_SConsTaskTest):
501 "test serial jobs with actual Taskmaster and Task"
505 class ParallelTaskTest(_SConsTaskTest):
507 "test parallel jobs with actual Taskmaster and Task"
508 self._test_seq(num_jobs)
512 #---------------------------------------------------------------------
515 suite = unittest.TestSuite()
516 suite.addTest(ParallelTestCase())
517 suite.addTest(SerialTestCase())
518 suite.addTest(NoParallelTestCase())
519 suite.addTest(SerialExceptionTestCase())
520 suite.addTest(ParallelExceptionTestCase())
521 suite.addTest(SerialTaskTest())
522 suite.addTest(ParallelTaskTest())
525 if __name__ == "__main__":
526 runner = unittest.TextTestRunner()
527 result = runner.run(suite())
528 if (len(result.failures) == 0
529 and len(result.errors) == 1
530 and isinstance(result.errors[0][0], SerialTestCase)
531 and isinstance(result.errors[0][1][0], NoThreadsException)):
533 elif not result.wasSuccessful():
538 # indent-tabs-mode:nil
540 # vim: set expandtab tabstop=4 shiftwidth=4: