http://scons.tigris.org/issues/show_bug.cgi?id=2329
[scons.git] / QMTest / scons_tdb.py
1 #!/usr/bin/env python
2 #
3 # __COPYRIGHT__
4 #
5 # Permission is hereby granted, free of charge, to any person obtaining
6 # a copy of this software and associated documentation files (the
7 # "Software"), to deal in the Software without restriction, including
8 # without limitation the rights to use, copy, modify, merge, publish,
9 # distribute, sublicense, and/or sell copies of the Software, and to
10 # permit persons to whom the Software is furnished to do so, subject to
11 # the following conditions:
12 #
13 # The above copyright notice and this permission notice shall be included
14 # in all copies or substantial portions of the Software.
15 #
16 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
17 # KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
18 # WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
19 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
20 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
21 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
22 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23 #
24
25 """
26 QMTest classes to support SCons' testing and Aegis-inspired workflow.
27
28 Thanks to Stefan Seefeld for the initial code.
29 """
30 from __future__ import generators  ### KEEP FOR COMPATIBILITY FIXERS
31
32 __revision__ = "__FILE__ __REVISION__ __DATE__ __DEVELOPER__"
33
34 ########################################################################
35 # Imports
36 ########################################################################
37
38 import qm
39 import qm.common
40 import qm.test.base
41 from   qm.fields import *
42 from   qm.executable import *
43 from   qm.test import database
44 from   qm.test import test
45 from   qm.test import resource
46 from   qm.test import suite
47 from   qm.test.result import Result
48 from   qm.test.file_result_stream import FileResultStream
49 from   qm.test.classes.text_result_stream import TextResultStream
50 from   qm.test.classes.xml_result_stream import XMLResultStream
51 from   qm.test.directory_suite import DirectorySuite
52 from   qm.extension import get_extension_class_name, get_class_arguments_as_dictionary
53
54 import dircache
55 import os
56 import imp
57
58 if sys.platform == 'win32':
59     console = 'con'
60 else:
61     console = '/dev/tty'
62
63 def Trace(msg):
64     open(console, 'w').write(msg)
65
66 # QMTest 2.3 hard-codes how it captures the beginning and end time by
67 # calling the qm.common.format_time_iso() function, which canonicalizes
68 # the time stamp in one-second granularity ISO format.  In order to get
69 # sub-second granularity, as well as to use the more precise time.clock()
70 # function on Windows, we must replace that function with our own.
71
72 orig_format_time_iso = qm.common.format_time_iso
73
74 if sys.platform == 'win32':
75     time_func = time.clock
76 else:
77     time_func = time.time
78
79 def my_format_time(time_secs=None):
80     return str(time_func())
81
82 qm.common.format_time_iso = my_format_time
83
84 ########################################################################
85 # Classes
86 ########################################################################
87
88 def get_explicit_arguments(e):
89     """This function can be removed once QMTest 2.4 is out."""
90
91     # Get all of the arguments.
92     arguments = get_class_arguments_as_dictionary(e.__class__)
93     # Determine which subset of the 'arguments' have been set
94     # explicitly.
95     explicit_arguments = {}
96     for name, field in arguments.items():
97         # Do not record computed fields.
98         if field.IsComputed():
99             continue
100         if name in e.__dict__:
101             explicit_arguments[name] = e.__dict__[name]
102
103     return explicit_arguments
104
105
106 def check_exit_status(result, prefix, desc, status):
107     """This function can be removed once QMTest 2.4 is out."""
108
109     if sys.platform == "win32" or os.WIFEXITED(status):
110         # Obtain the exit code.
111         if sys.platform == "win32":
112             exit_code = status
113         else:
114             exit_code = os.WEXITSTATUS(status)
115             # If the exit code is non-zero, the test fails.
116         if exit_code != 0:
117             result.Fail("%s failed with exit code %d." % (desc, exit_code))
118             # Record the exit code in the result.
119             result[prefix + "exit_code"] = str(exit_code)
120             return False
121
122     elif os.WIFSIGNALED(status):
123         # Obtain the signal number.
124         signal = os.WTERMSIG(status)
125         # If the program gets a fatal signal, the test fails .
126         result.Fail("%s received fatal signal %d." % (desc, signal))
127         result[prefix + "signal"] = str(signal)
128         return False
129     else:
130         # A process should only be able to stop by exiting, or
131         # by being terminated with a signal.
132         assert None
133
134     return True
135
136
137
138 class Null:
139     pass
140
141 _null = Null()
142
143 sys_attributes = [
144     'byteorder',
145     'exec_prefix',
146     'executable',
147     'maxint',
148     'maxunicode',
149     'platform',
150     'prefix',
151     'version',
152     'version_info',
153 ]
154
155 def get_sys_values():
156     sys_attributes.sort()
157     result = [(k, getattr(sys, k, _null)) for k in sys_attributes]
158     result = [t for t in result if not t[1] is _null]
159     result = [t[0] + '=' + repr(t[1]) for t in result]
160     return '\n '.join(result)
161
162 module_attributes = [
163     '__version__',
164     '__build__',
165     '__buildsys__',
166     '__date__',
167     '__developer__',
168 ]
169
170 def get_module_info(module):
171     module_attributes.sort()
172     result = [(k, getattr(module, k, _null)) for k in module_attributes]
173     result = [t for t in result if not t[1] is _null]
174     result = [t[0] + '=' + repr(t[1]) for t in result]
175     return '\n '.join(result)
176
177 environ_keys = [
178    'PATH',
179    'SCONS',
180    'SCONSFLAGS',
181    'SCONS_LIB_DIR',
182    'PYTHON_ROOT',
183    'QTDIR',
184
185    'COMSPEC',
186    'INTEL_LICENSE_FILE',
187    'INCLUDE',
188    'LIB',
189    'MSDEVDIR',
190    'OS',
191    'PATHEXT',
192    'SystemRoot',
193    'TEMP',
194    'TMP',
195    'USERNAME',
196    'VXDOMNTOOLS',
197    'WINDIR',
198    'XYZZY'
199
200    'ENV',
201    'HOME',
202    'LANG',
203    'LANGUAGE',
204    'LC_ALL',
205    'LC_MESSAGES',
206    'LOGNAME',
207    'MACHINE',
208    'OLDPWD',
209    'PWD',
210    'OPSYS',
211    'SHELL',
212    'TMPDIR',
213    'USER',
214 ]
215
216 def get_environment():
217     environ_keys.sort()
218     result = [(k, os.environ.get(k, _null)) for k in environ_keys]
219     result = [t for t in result if not t[1] is _null]
220     result = [t[0] + '-' + t[1] for t in result]
221     return '\n '.join(result)
222
223 class SConsXMLResultStream(XMLResultStream):
224     def __init__(self, *args, **kw):
225         super(SConsXMLResultStream, self).__init__(*args, **kw)
226     def WriteAllAnnotations(self, context):
227         # Load (by hand) the SCons modules we just unwrapped so we can
228         # extract their version information.  Note that we have to override
229         # SCons.Script.main() with a do_nothing() function, because loading up
230         # the 'scons' script will actually try to execute SCons...
231
232         src_engine = os.environ.get('SCONS_LIB_DIR')
233         if not src_engine:
234             src_engine = os.path.join('src', 'engine')
235         fp, pname, desc = imp.find_module('SCons', [src_engine])
236         SCons = imp.load_module('SCons', fp, pname, desc)
237
238         # Override SCons.Script.main() with a do-nothing function, because
239         # loading the 'scons' script will actually try to execute SCons...
240
241         src_engine_SCons = os.path.join(src_engine, 'SCons')
242         fp, pname, desc = imp.find_module('Script', [src_engine_SCons])
243         SCons.Script = imp.load_module('Script', fp, pname, desc)
244         def do_nothing():
245             pass
246         SCons.Script.main = do_nothing
247
248         scons_file = os.environ.get('SCONS')
249         if scons_file:
250             src_script, scons_py = os.path.split(scons_file)
251             scons = os.path.splitext(scons_py)[0]
252         else:
253             src_script = os.path.join('src', 'script')
254             scons = 'scons'
255         fp, pname, desc = imp.find_module(scons, [src_script])
256         scons = imp.load_module('scons', fp, pname, desc)
257         fp.close()
258
259         self.WriteAnnotation("scons_test.engine", get_module_info(SCons))
260         self.WriteAnnotation("scons_test.script", get_module_info(scons))
261
262         self.WriteAnnotation("scons_test.sys", get_sys_values())
263         self.WriteAnnotation("scons_test.os.environ", get_environment())
264
265 class AegisStream(TextResultStream):
266     arguments = [
267         qm.fields.IntegerField(
268             name = "print_time",
269             title = "print individual test times",
270             description = """
271             """,
272             default_value = 0,
273         ),
274     ]
275     def __init__(self, *args, **kw):
276         super(AegisStream, self).__init__(*args, **kw)
277         self._num_tests = 0
278         self._outcomes = {}
279         self._outcome_counts = {}
280         for outcome in AegisTest.aegis_outcomes:
281             self._outcome_counts[outcome] = 0
282         self.format = "full"
283     def _percent(self, outcome):
284         return 100. * self._outcome_counts[outcome] / self._num_tests
285     def _aegis_no_result(self, result):
286         outcome = result.GetOutcome()
287         return (outcome == Result.FAIL and result.get('Test.exit_code') == '2')
288     def _DisplayText(self, text):
289         # qm.common.html_to_text() uses htmllib, which sticks an extra
290         # '\n' on the front of the text.  Strip it and only display
291         # the text if there's anything to display.
292         text = qm.common.html_to_text(text)
293         if text[0] == '\n':
294             text = text[1:]
295         if text:
296             lines = text.splitlines()
297             if lines[-1] == '':
298                 lines = lines[:-1]
299             self.file.write('    ' + '\n    '.join(lines) + '\n\n')
300     def _DisplayResult(self, result, format):
301         test_id = result.GetId()
302         kind = result.GetKind()
303         if self._aegis_no_result(result):
304             outcome = "NO_RESULT"
305         else:
306             outcome = result.GetOutcome()
307         self._WriteOutcome(test_id, kind, outcome)
308         self.file.write('\n')
309     def _DisplayAnnotations(self, result):
310         try:
311             self._DisplayText(result["Test.stdout"])
312         except KeyError:
313             pass
314         try:
315             self._DisplayText(result["Test.stderr"])
316         except KeyError:
317             pass
318         if self.print_time:
319             start = float(result['qmtest.start_time'])
320             end = float(result['qmtest.end_time'])
321             fmt = "    Total execution time: %.1f seconds\n\n"
322             self.file.write(fmt % (end - start))
323
324 class AegisChangeStream(AegisStream):
325     def WriteResult(self, result):
326         test_id = result.GetId()
327         if self._aegis_no_result(result):
328             outcome = AegisTest.NO_RESULT
329         else:
330             outcome = result.GetOutcome()
331         self._num_tests += 1
332         self._outcome_counts[outcome] += 1
333         super(AegisStream, self).WriteResult(result)
334     def _SummarizeTestStats(self):
335         self.file.write("\n")
336         self._DisplayHeading("STATISTICS")
337         if self._num_tests != 0:
338             # We'd like to use the _FormatStatistics() method to do
339             # this, but it's wrapped around the list in Result.outcomes,
340             # so it's simpler to just do it ourselves.
341             print "  %6d        tests total\n" % self._num_tests
342             for outcome in AegisTest.aegis_outcomes:
343                 if self._outcome_counts[outcome] != 0:
344                     print "  %6d (%3.0f%%) tests %s" % (
345                         self._outcome_counts[outcome],
346                         self._percent(outcome),
347                         outcome
348                     )
349
350 class AegisBaselineStream(AegisStream):
351     def WriteResult(self, result):
352         test_id = result.GetId()
353         if self._aegis_no_result(result):
354             outcome = AegisTest.NO_RESULT
355             self.expected_outcomes[test_id] = Result.PASS
356             self._outcome_counts[outcome] += 1
357         else:
358             self.expected_outcomes[test_id] = Result.FAIL
359             outcome = result.GetOutcome()
360             if outcome != Result.Fail:
361                 self._outcome_counts[outcome] += 1
362         self._num_tests += 1
363         super(AegisStream, self).WriteResult(result)
364     def _SummarizeRelativeTestStats(self):
365         self.file.write("\n")
366         self._DisplayHeading("STATISTICS")
367         if self._num_tests != 0:
368             # We'd like to use the _FormatStatistics() method to do
369             # this, but it's wrapped around the list in Result.outcomes,
370             # so it's simpler to just do it ourselves.
371             if self._outcome_counts[AegisTest.FAIL]:
372                 print "  %6d (%3.0f%%) tests as expected" % (
373                     self._outcome_counts[AegisTest.FAIL],
374                     self._percent(AegisTest.FAIL),
375                 )
376             non_fail_outcomes = list(AegisTest.aegis_outcomes[:])
377             non_fail_outcomes.remove(AegisTest.FAIL)
378             for outcome in non_fail_outcomes:
379                 if self._outcome_counts[outcome] != 0:
380                     print "  %6d (%3.0f%%) tests unexpected %s" % (
381                         self._outcome_counts[outcome],
382                         self._percent(outcome),
383                         outcome,
384                     )
385
386 class AegisBatchStream(FileResultStream):
387     def __init__(self, arguments):
388         super(AegisBatchStream, self).__init__(arguments)
389         self._outcomes = {}
390     def WriteResult(self, result):
391         test_id = result.GetId()
392         kind = result.GetKind()
393         outcome = result.GetOutcome()
394         exit_status = '0'
395         if outcome == Result.FAIL:
396             exit_status = result.get('Test.exit_code')
397         self._outcomes[test_id] = exit_status
398     def Summarize(self):
399         self.file.write('test_result = [\n')
400         for file_name in sorted(self._outcomes.keys()):
401             exit_status = self._outcomes[file_name]
402             file_name = file_name.replace('\\', '/')
403             self.file.write('    { file_name = "%s";\n' % file_name)
404             self.file.write('      exit_status = %s; },\n' % exit_status)
405         self.file.write('];\n')
406
407 class AegisTest(test.Test):
408     PASS = "PASS"
409     FAIL = "FAIL"
410     NO_RESULT = "NO_RESULT"
411     ERROR = "ERROR"
412     UNTESTED = "UNTESTED"
413
414     aegis_outcomes = (
415         PASS, FAIL, NO_RESULT, ERROR, UNTESTED,
416     )
417     """Aegis test outcomes."""
418
419 class Test(AegisTest):
420     """Simple test that runs a python script and checks the status
421     to determine whether the test passes."""
422
423     script = TextField(title="Script to test")
424     topdir = TextField(title="Top source directory")
425
426     def Run(self, context, result):
427         """Run the test. The test passes if the command exits with status=0,
428         and fails otherwise. The program output is logged, but not validated."""
429
430         command = RedirectedExecutable()
431         args = [context.get('python', sys.executable), '-tt', self.script]
432         status = command.Run(args, os.environ)
433         if not check_exit_status(result, 'Test.', self.script, status):
434             # In case of failure record exit code, stdout, and stderr.
435             result.Fail("Non-zero exit_code.")
436             result["Test.stdout"] = result.Quote(command.stdout)
437             result["Test.stderr"] = result.Quote(command.stderr)
438
439
440 class Database(database.Database):
441     """Scons test database.
442     * The 'src' and 'test' directories are explicit suites.
443     * Their subdirectories are implicit suites.
444     * All files under 'src/' ending with 'Tests.py' contain tests.
445     * All files under 'test/' with extension '.py' contain tests.
446     * Right now there is only a single test class, which simply runs
447       the specified python interpreter on the given script. To be refined..."""
448
449     srcdir = TextField(title = "Source Directory",
450                        description = "The root of the test suite's source tree.")
451     _is_generic_database = True
452
453     def is_a_test_under_test(path, t):
454         return os.path.splitext(t)[1] == '.py' \
455                and os.path.isfile(os.path.join(path, t))
456
457     def is_a_test_under_src(path, t):
458         return t[-8:] == 'Tests.py' \
459                and os.path.isfile(os.path.join(path, t))
460
461     is_a_test = {
462         'src' : is_a_test_under_src,
463         'test' : is_a_test_under_test,
464     }
465
466     exclude_subdirs = {
467         '.svn' : 1,
468         'CVS' : 1,
469     }
470
471     def is_a_test_subdir(path, subdir):
472         if exclude_subdirs.get(subdir):
473             return None
474         return os.path.isdir(os.path.join(path, subdir))
475
476     def __init__(self, path, arguments):
477
478         self.label_class = "file_label.FileLabel"
479         self.modifiable = "false"
480         # Initialize the base class.
481         super(Database, self).__init__(path, arguments)
482
483
484     def GetRoot(self):
485
486         return self.srcdir
487
488
489     def GetSubdirectories(self, directory):
490
491         components = self.GetLabelComponents(directory)
492         path = os.path.join(self.GetRoot(), *components)
493         if directory:
494             dirs = [d for d in dircache.listdir(path)
495                     if os.path.isdir(os.path.join(path, d))]
496         else:
497             dirs = self.is_a_test.keys()
498
499         dirs.sort()
500         return dirs
501
502
503     def GetIds(self, kind, directory = "", scan_subdirs = 1):
504
505         components = self.GetLabelComponents(directory)
506         path = os.path.join(self.GetRoot(), *components)
507
508         if kind == database.Database.TEST:
509
510             if not components:
511                 return []
512
513             ids = [self.JoinLabels(directory, t)
514                    for t in dircache.listdir(path)
515                    if self.is_a_test[components[0]](path, t)]
516
517         elif kind == Database.RESOURCE:
518             return [] # no resources yet
519
520         else: # SUITE
521
522             if directory:
523                 ids = [self.JoinLabels(directory, d)
524                        for d in dircache.listdir(path)
525                        if os.path.isdir(os.path.join(path, d))]
526             else:
527                 ids = self.is_a_test.keys()
528
529         if scan_subdirs:
530             for d in dircache.listdir(path):
531                 if (os.path.isdir(d)):
532                     ids.extend(self.GetIds(kind,
533                                            self.JoinLabels(directory, d),
534                                            True))
535
536         return ids
537
538
539     def GetExtension(self, id):
540
541         if not id:
542             return DirectorySuite(self, id)
543
544         components = self.GetLabelComponents(id)
545         path = os.path.join(self.GetRoot(), *components)
546
547         if os.path.isdir(path): # a directory
548             return DirectorySuite(self, id)
549
550         elif os.path.isfile(path): # a test
551
552             arguments = {}
553             arguments['script'] = path
554             arguments['topdir'] = self.GetRoot()
555
556             return Test(arguments, qmtest_id = id, qmtest_database = self)
557
558         else: # nothing else to offer
559
560             return None
561
562
563     def GetTest(self, test_id):
564         """This method can be removed once QMTest 2.4 is out."""
565
566         t = self.GetExtension(test_id)
567         if isinstance(t, test.Test):
568             return database.TestDescriptor(self,
569                                            test_id,
570                                            get_extension_class_name(t.__class__),
571                                            get_explicit_arguments(t))
572
573         raise database.NoSuchTestError(test_id)
574
575     def GetSuite(self, suite_id):
576         """This method can be removed once QMTest 2.4 is out."""
577
578         if suite_id == "":
579             return DirectorySuite(self, "")
580
581         s = self.GetExtension(suite_id)
582         if isinstance(s, suite.Suite):
583             return s
584
585         raise database.NoSuchSuiteError(suite_id)
586
587
588     def GetResource(self, resource_id):
589         """This method can be removed once QMTest 2.4 is out."""
590
591         r = self.GetExtension(resource_id)
592         if isinstance(r, resource.Resource):
593             return ResourceDescriptor(self,
594                                       resource_id,
595                                       get_extension_class_name(r.__class__),
596                                       get_explicit_arguments(r))
597
598         raise database.NoSuchResourceError(resource_id)
599
600 # Local Variables:
601 # tab-width:4
602 # indent-tabs-mode:nil
603 # End:
604 # vim: set expandtab tabstop=4 shiftwidth=4: