http://scons.tigris.org/issues/show_bug.cgi?id=2345
[scons.git] / src / engine / SCons / JobTests.py
1 #
2 # __COPYRIGHT__
3 #
4 # Permission is hereby granted, free of charge, to any person obtaining
5 # a copy of this software and associated documentation files (the
6 # "Software"), to deal in the Software without restriction, including
7 # without limitation the rights to use, copy, modify, merge, publish,
8 # distribute, sublicense, and/or sell copies of the Software, and to
9 # permit persons to whom the Software is furnished to do so, subject to
10 # the following conditions:
11 #
12 # The above copyright notice and this permission notice shall be included
13 # in all copies or substantial portions of the Software.
14 #
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
16 # KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
17 # WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
18 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
19 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
20 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
21 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
22 #
23 from __future__ import generators  ### KEEP FOR COMPATIBILITY FIXERS
24
25 __revision__ = "__FILE__ __REVISION__ __DATE__ __DEVELOPER__"
26
27 import unittest
28 import random
29 import math
30 import SCons.Job
31 import sys
32 import time
33
34 # a large number
35 num_sines = 10000
36
37 # how many parallel jobs to perform for the test
38 num_jobs = 11
39
40 # how many tasks to perform for the test
41 num_tasks = num_jobs*5
42
43 class DummyLock:
44     "fake lock class to use if threads are not supported"
45     def acquire(self):
46         pass
47
48     def release(self):
49         pass
50
51 class NoThreadsException:
52     "raised by the ParallelTestCase if threads are not supported"
53
54     def __str__(self):
55         return "the interpreter doesn't support threads"
56
57 class Task:
58     """A dummy task class for testing purposes."""
59
60     def __init__(self, i, taskmaster):
61         self.i = i
62         self.taskmaster = taskmaster
63         self.was_executed = 0
64         self.was_prepared = 0
65
66     def prepare(self):
67         self.was_prepared = 1
68
69     def _do_something(self):
70         pass
71
72     def needs_execute(self):
73         return True
74
75     def execute(self):
76         self.taskmaster.test_case.failUnless(self.was_prepared,
77                                   "the task wasn't prepared")
78
79         self.taskmaster.guard.acquire()
80         self.taskmaster.begin_list.append(self.i)
81         self.taskmaster.guard.release()
82
83         self._do_something()
84
85         self.was_executed = 1
86
87         self.taskmaster.guard.acquire()
88         self.taskmaster.end_list.append(self.i)
89         self.taskmaster.guard.release()
90
91     def executed(self):
92         self.taskmaster.num_executed = self.taskmaster.num_executed + 1
93
94         self.taskmaster.test_case.failUnless(self.was_prepared,
95                                   "the task wasn't prepared")
96         self.taskmaster.test_case.failUnless(self.was_executed,
97                                   "the task wasn't really executed")
98         self.taskmaster.test_case.failUnless(isinstance(self, Task),
99                                   "the task wasn't really a Task instance")
100
101     def failed(self):
102         self.taskmaster.num_failed = self.taskmaster.num_failed + 1
103         self.taskmaster.stop = 1
104         self.taskmaster.test_case.failUnless(self.was_prepared,
105                                   "the task wasn't prepared")
106
107     def postprocess(self):
108         self.taskmaster.num_postprocessed = self.taskmaster.num_postprocessed + 1
109
110 class RandomTask(Task):
111     def _do_something(self):
112         # do something that will take some random amount of time:
113         for i in range(random.randrange(0, num_sines, 1)):
114             x = math.sin(i)
115         time.sleep(0.01)
116
117 class ExceptionTask:
118     """A dummy task class for testing purposes."""
119
120     def __init__(self, i, taskmaster):
121         self.taskmaster = taskmaster
122         self.was_prepared = 0
123
124     def prepare(self):
125         self.was_prepared = 1
126
127     def needs_execute(self):
128         return True
129
130     def execute(self):
131         raise Exception
132
133     def executed(self):
134         self.taskmaster.num_executed = self.taskmaster.num_executed + 1
135
136         self.taskmaster.test_case.failUnless(self.was_prepared,
137                                   "the task wasn't prepared")
138         self.taskmaster.test_case.failUnless(self.was_executed,
139                                   "the task wasn't really executed")
140         self.taskmaster.test_case.failUnless(self.__class__ is Task,
141                                   "the task wasn't really a Task instance")
142
143     def failed(self):
144         self.taskmaster.num_failed = self.taskmaster.num_failed + 1
145         self.taskmaster.stop = 1
146         self.taskmaster.test_case.failUnless(self.was_prepared,
147                                   "the task wasn't prepared")
148
149     def postprocess(self):
150         self.taskmaster.num_postprocessed = self.taskmaster.num_postprocessed + 1
151
152     def exception_set(self):
153         self.taskmaster.exception_set()
154
155 class Taskmaster:
156     """A dummy taskmaster class for testing the job classes."""
157
158     def __init__(self, n, test_case, Task):
159         """n is the number of dummy tasks to perform."""
160
161         self.test_case = test_case
162         self.stop = None
163         self.num_tasks = n
164         self.num_iterated = 0
165         self.num_executed = 0
166         self.num_failed = 0
167         self.num_postprocessed = 0
168         self.Task = Task
169         # 'guard' guards 'task_begin_list' and 'task_end_list'
170         try:
171             import threading
172             self.guard = threading.Lock()
173         except:
174             self.guard = DummyLock()
175
176         # keep track of the order tasks are begun in
177         self.begin_list = []
178
179         # keep track of the order tasks are completed in
180         self.end_list = []
181
182     def next_task(self):
183         if self.stop or self.all_tasks_are_iterated():
184             return None
185         else:
186             self.num_iterated = self.num_iterated + 1
187             return self.Task(self.num_iterated, self)
188
189     def all_tasks_are_executed(self):
190         return self.num_executed == self.num_tasks
191
192     def all_tasks_are_iterated(self):
193         return self.num_iterated == self.num_tasks
194
195     def all_tasks_are_postprocessed(self):
196         return self.num_postprocessed == self.num_tasks
197
198     def tasks_were_serial(self):
199         "analyze the task order to see if they were serial"
200         serial = 1 # assume the tasks were serial
201         for i in range(num_tasks):
202             serial = serial and (self.begin_list[i]
203                                  == self.end_list[i]
204                                  == (i + 1))
205         return serial
206
207     def exception_set(self):
208         pass
209
210     def cleanup(self):
211         pass
212
213 SaveThreadPool = None
214 ThreadPoolCallList = []
215
216 class ParallelTestCase(unittest.TestCase):
217     def runTest(self):
218         "test parallel jobs"
219
220         try:
221             import threading
222         except:
223             raise NoThreadsException()
224
225         taskmaster = Taskmaster(num_tasks, self, RandomTask)
226         jobs = SCons.Job.Jobs(num_jobs, taskmaster)
227         jobs.run()
228
229         self.failUnless(not taskmaster.tasks_were_serial(),
230                         "the tasks were not executed in parallel")
231         self.failUnless(taskmaster.all_tasks_are_executed(),
232                         "all the tests were not executed")
233         self.failUnless(taskmaster.all_tasks_are_iterated(),
234                         "all the tests were not iterated over")
235         self.failUnless(taskmaster.all_tasks_are_postprocessed(),
236                         "all the tests were not postprocessed")
237         self.failIf(taskmaster.num_failed,
238                     "some task(s) failed to execute")
239
240         # Verify that parallel jobs will pull all of the completed tasks
241         # out of the queue at once, instead of one by one.  We do this by
242         # replacing the default ThreadPool class with one that records the
243         # order in which tasks are put() and get() to/from the pool, and
244         # which sleeps a little bit before call get() to let the initial
245         # tasks complete and get their notifications on the resultsQueue.
246
247         class SleepTask(Task):
248             def _do_something(self):
249                 time.sleep(0.1)
250
251         global SaveThreadPool
252         SaveThreadPool = SCons.Job.ThreadPool
253
254         class WaitThreadPool(SaveThreadPool):
255             def put(self, task):
256                 ThreadPoolCallList.append('put(%s)' % task.i)
257                 return SaveThreadPool.put(self, task)
258             def get(self):
259                 time.sleep(0.5)
260                 result = SaveThreadPool.get(self)
261                 ThreadPoolCallList.append('get(%s)' % result[0].i)
262                 return result
263
264         SCons.Job.ThreadPool = WaitThreadPool
265
266         try:
267             taskmaster = Taskmaster(3, self, SleepTask)
268             jobs = SCons.Job.Jobs(2, taskmaster)
269             jobs.run()
270
271             # The key here is that we get(1) and get(2) from the
272             # resultsQueue before we put(3), but get(1) and get(2) can
273             # be in either order depending on how the first two parallel
274             # tasks get scheduled by the operating system.
275             expect = [
276                 ['put(1)', 'put(2)', 'get(1)', 'get(2)', 'put(3)', 'get(3)'],
277                 ['put(1)', 'put(2)', 'get(2)', 'get(1)', 'put(3)', 'get(3)'],
278             ]
279             assert ThreadPoolCallList in expect, ThreadPoolCallList
280
281         finally:
282             SCons.Job.ThreadPool = SaveThreadPool
283
284 class SerialTestCase(unittest.TestCase):
285     def runTest(self):
286         "test a serial job"
287
288         taskmaster = Taskmaster(num_tasks, self, RandomTask)
289         jobs = SCons.Job.Jobs(1, taskmaster)
290         jobs.run()
291
292         self.failUnless(taskmaster.tasks_were_serial(),
293                         "the tasks were not executed in series")
294         self.failUnless(taskmaster.all_tasks_are_executed(),
295                         "all the tests were not executed")
296         self.failUnless(taskmaster.all_tasks_are_iterated(),
297                         "all the tests were not iterated over")
298         self.failUnless(taskmaster.all_tasks_are_postprocessed(),
299                         "all the tests were not postprocessed")
300         self.failIf(taskmaster.num_failed,
301                     "some task(s) failed to execute")
302
303 class NoParallelTestCase(unittest.TestCase):
304     def runTest(self):
305         "test handling lack of parallel support"
306         def NoParallel(tm, num, stack_size):
307             raise NameError
308         save_Parallel = SCons.Job.Parallel
309         SCons.Job.Parallel = NoParallel
310         try:
311             taskmaster = Taskmaster(num_tasks, self, RandomTask)
312             jobs = SCons.Job.Jobs(2, taskmaster)
313             self.failUnless(jobs.num_jobs == 1,
314                             "unexpected number of jobs %d" % jobs.num_jobs)
315             jobs.run()
316             self.failUnless(taskmaster.tasks_were_serial(),
317                             "the tasks were not executed in series")
318             self.failUnless(taskmaster.all_tasks_are_executed(),
319                             "all the tests were not executed")
320             self.failUnless(taskmaster.all_tasks_are_iterated(),
321                             "all the tests were not iterated over")
322             self.failUnless(taskmaster.all_tasks_are_postprocessed(),
323                             "all the tests were not postprocessed")
324             self.failIf(taskmaster.num_failed,
325                         "some task(s) failed to execute")
326         finally:
327             SCons.Job.Parallel = save_Parallel
328
329
330 class SerialExceptionTestCase(unittest.TestCase):
331     def runTest(self):
332         "test a serial job with tasks that raise exceptions"
333
334         taskmaster = Taskmaster(num_tasks, self, ExceptionTask)
335         jobs = SCons.Job.Jobs(1, taskmaster)
336         jobs.run()
337
338         self.failIf(taskmaster.num_executed,
339                     "a task was executed")
340         self.failUnless(taskmaster.num_iterated == 1,
341                     "exactly one task should have been iterated")
342         self.failUnless(taskmaster.num_failed == 1,
343                     "exactly one task should have failed")
344         self.failUnless(taskmaster.num_postprocessed == 1,
345                     "exactly one task should have been postprocessed")
346
347 class ParallelExceptionTestCase(unittest.TestCase):
348     def runTest(self):
349         "test parallel jobs with tasks that raise exceptions"
350
351         taskmaster = Taskmaster(num_tasks, self, ExceptionTask)
352         jobs = SCons.Job.Jobs(num_jobs, taskmaster)
353         jobs.run()
354
355         self.failIf(taskmaster.num_executed,
356                     "a task was executed")
357         self.failUnless(taskmaster.num_iterated >= 1,
358                     "one or more task should have been iterated")
359         self.failUnless(taskmaster.num_failed >= 1,
360                     "one or more tasks should have failed")
361         self.failUnless(taskmaster.num_postprocessed >= 1,
362                     "one or more tasks should have been postprocessed")
363
364 #---------------------------------------------------------------------
365 # Above tested Job object with contrived Task and Taskmaster objects.
366 # Now test Job object with actual Task and Taskmaster objects.
367
368 import SCons.Taskmaster
369 import SCons.Node
370 import time
371
372 class DummyNodeInfo:
373     def update(self, obj):
374         pass
375
376 class testnode (SCons.Node.Node):
377     def __init__(self):
378         SCons.Node.Node.__init__(self)
379         self.expect_to_be = SCons.Node.executed
380         self.ninfo = DummyNodeInfo()
381
382 class goodnode (testnode):
383     def __init__(self):
384         SCons.Node.Node.__init__(self)
385         self.expect_to_be = SCons.Node.up_to_date
386         self.ninfo = DummyNodeInfo()
387
388 class slowgoodnode (goodnode):
389     def prepare(self):
390         # Delay to allow scheduled Jobs to run while the dispatcher
391         # sleeps.  Keep this short because it affects the time taken
392         # by this test.
393         time.sleep(0.15)
394         goodnode.prepare(self)
395
396 class badnode (goodnode):
397     def __init__(self):
398         goodnode.__init__(self)
399         self.expect_to_be = SCons.Node.failed
400     def build(self, **kw):
401         raise Exception('badnode exception')
402
403 class slowbadnode (badnode):
404     def build(self, **kw):
405         # Appears to take a while to build, allowing faster builds to
406         # overlap.  Time duration is not especially important, but if
407         # it is faster than slowgoodnode then these could complete
408         # while the scheduler is sleeping.
409         time.sleep(0.05)
410         raise Exception('slowbadnode exception')
411
412 class badpreparenode (badnode):
413     def prepare(self):
414         raise Exception('badpreparenode exception')
415
416 class _SConsTaskTest(unittest.TestCase):
417
418     def _test_seq(self, num_jobs):
419         for node_seq in [
420             [goodnode],
421             [badnode],
422             [slowbadnode],
423             [slowgoodnode],
424             [badpreparenode],
425             [goodnode, badnode],
426             [slowgoodnode, badnode],
427             [goodnode, slowbadnode],
428             [goodnode, goodnode, goodnode, slowbadnode],
429             [goodnode, slowbadnode, badpreparenode, slowgoodnode],
430             [goodnode, slowbadnode, slowgoodnode, badnode]
431             ]:
432
433             self._do_test(num_jobs, node_seq)
434
435     def _do_test(self, num_jobs, node_seq):
436
437         testnodes = []
438         for tnum in range(num_tasks):
439             testnodes.append(node_seq[tnum % len(node_seq)]())
440
441         taskmaster = SCons.Taskmaster.Taskmaster(testnodes,
442                                                  tasker=SCons.Taskmaster.AlwaysTask)
443
444         jobs = SCons.Job.Jobs(num_jobs, taskmaster)
445
446         # Exceptions thrown by tasks are not actually propagated to
447         # this level, but are instead stored in the Taskmaster.
448
449         jobs.run()
450
451         # Now figure out if tests proceeded correctly.  The first test
452         # that fails will shutdown the initiation of subsequent tests,
453         # but any tests currently queued for execution will still be
454         # processed, and any tests that completed before the failure
455         # would have resulted in new tests being queued for execution.
456
457         # Apply the following operational heuristics of Job.py:
458         #  0) An initial jobset of tasks will be queued before any
459         #     good/bad results are obtained (from "execute" of task in
460         #     thread).
461         #  1) A goodnode will complete immediately on its thread and
462         #     allow another node to be queued for execution.
463         #  2) A badnode will complete immediately and suppress any
464         #     subsequent execution queuing, but all currently queued
465         #     tasks will still be processed.
466         #  3) A slowbadnode will fail later.  It will block slots in
467         #     the job queue.  Nodes that complete immediately will
468         #     allow other nodes to be queued in their place, and this
469         #     will continue until either (#2) above or until all job
470         #     slots are filled with slowbadnode entries.
471
472         # One approach to validating this test would be to try to
473         # determine exactly how many nodes executed, how many didn't,
474         # and the results of each, and then to assert failure on any
475         # mismatch (including the total number of built nodes).
476         # However, while this is possible to do for a single-processor
477         # system, it is nearly impossible to predict correctly for a
478         # multi-processor system and still test the characteristics of
479         # delayed execution nodes.  Stated another way, multithreading
480         # is inherently non-deterministic unless you can completely
481         # characterize the entire system, and since that's not
482         # possible here, we shouldn't try.
483
484         # Therefore, this test will simply scan the set of nodes to
485         # see if the node was executed or not and if it was executed
486         # that it obtained the expected value for that node
487         # (i.e. verifying we don't get failure crossovers or
488         # mislabelling of results).
489
490         for N in testnodes:
491             state = N.get_state()
492             self.failUnless(state in [SCons.Node.no_state, N.expect_to_be],
493                             "Node %s got unexpected result: %s" % (N, state))
494
495         self.failUnless([N for N in testnodes if N.get_state()],
496                         "no nodes ran at all.")
497
498
499 class SerialTaskTest(_SConsTaskTest):
500     def runTest(self):
501         "test serial jobs with actual Taskmaster and Task"
502         self._test_seq(1)
503
504
505 class ParallelTaskTest(_SConsTaskTest):
506     def runTest(self):
507         "test parallel jobs with actual Taskmaster and Task"
508         self._test_seq(num_jobs)
509
510
511
512 #---------------------------------------------------------------------
513
514 def suite():
515     suite = unittest.TestSuite()
516     suite.addTest(ParallelTestCase())
517     suite.addTest(SerialTestCase())
518     suite.addTest(NoParallelTestCase())
519     suite.addTest(SerialExceptionTestCase())
520     suite.addTest(ParallelExceptionTestCase())
521     suite.addTest(SerialTaskTest())
522     suite.addTest(ParallelTaskTest())
523     return suite
524
525 if __name__ == "__main__":
526     runner = unittest.TextTestRunner()
527     result = runner.run(suite())
528     if (len(result.failures) == 0
529         and len(result.errors) == 1
530         and isinstance(result.errors[0][0], SerialTestCase)
531         and isinstance(result.errors[0][1][0], NoThreadsException)):
532         sys.exit(2)
533     elif not result.wasSuccessful():
534         sys.exit(1)
535
536 # Local Variables:
537 # tab-width:4
538 # indent-tabs-mode:nil
539 # End:
540 # vim: set expandtab tabstop=4 shiftwidth=4: