5 # Permission is hereby granted, free of charge, to any person obtaining
6 # a copy of this software and associated documentation files (the
7 # "Software"), to deal in the Software without restriction, including
8 # without limitation the rights to use, copy, modify, merge, publish,
9 # distribute, sublicense, and/or sell copies of the Software, and to
10 # permit persons to whom the Software is furnished to do so, subject to
11 # the following conditions:
13 # The above copyright notice and this permission notice shall be included
14 # in all copies or substantial portions of the Software.
16 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
17 # KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
18 # WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
19 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
20 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
21 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
22 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
26 QMTest classes to support SCons' testing and Aegis-inspired workflow.
28 Thanks to Stefan Seefeld for the initial code.
30 from __future__ import generators ### KEEP FOR COMPATIBILITY FIXERS
32 __revision__ = "__FILE__ __REVISION__ __DATE__ __DEVELOPER__"
34 ########################################################################
36 ########################################################################
41 from qm.fields import *
42 from qm.executable import *
43 from qm.test import database
44 from qm.test import test
45 from qm.test import resource
46 from qm.test import suite
47 from qm.test.result import Result
48 from qm.test.file_result_stream import FileResultStream
49 from qm.test.classes.text_result_stream import TextResultStream
50 from qm.test.classes.xml_result_stream import XMLResultStream
51 from qm.test.directory_suite import DirectorySuite
52 from qm.extension import get_extension_class_name, get_class_arguments_as_dictionary
58 if sys.platform == 'win32':
64 open(console, 'w').write(msg)
66 # QMTest 2.3 hard-codes how it captures the beginning and end time by
67 # calling the qm.common.format_time_iso() function, which canonicalizes
68 # the time stamp in one-second granularity ISO format. In order to get
69 # sub-second granularity, as well as to use the more precise time.clock()
70 # function on Windows, we must replace that function with our own.
72 orig_format_time_iso = qm.common.format_time_iso
74 if sys.platform == 'win32':
75 time_func = time.clock
79 def my_format_time(time_secs=None):
80 return str(time_func())
82 qm.common.format_time_iso = my_format_time
84 ########################################################################
86 ########################################################################
88 def get_explicit_arguments(e):
89 """This function can be removed once QMTest 2.4 is out."""
91 # Get all of the arguments.
92 arguments = get_class_arguments_as_dictionary(e.__class__)
93 # Determine which subset of the 'arguments' have been set
95 explicit_arguments = {}
96 for name, field in arguments.items():
97 # Do not record computed fields.
98 if field.IsComputed():
100 if name in e.__dict__:
101 explicit_arguments[name] = e.__dict__[name]
103 return explicit_arguments
106 def check_exit_status(result, prefix, desc, status):
107 """This function can be removed once QMTest 2.4 is out."""
109 if sys.platform == "win32" or os.WIFEXITED(status):
110 # Obtain the exit code.
111 if sys.platform == "win32":
114 exit_code = os.WEXITSTATUS(status)
115 # If the exit code is non-zero, the test fails.
117 result.Fail("%s failed with exit code %d." % (desc, exit_code))
118 # Record the exit code in the result.
119 result[prefix + "exit_code"] = str(exit_code)
122 elif os.WIFSIGNALED(status):
123 # Obtain the signal number.
124 signal = os.WTERMSIG(status)
125 # If the program gets a fatal signal, the test fails .
126 result.Fail("%s received fatal signal %d." % (desc, signal))
127 result[prefix + "signal"] = str(signal)
130 # A process should only be able to stop by exiting, or
131 # by being terminated with a signal.
155 def get_sys_values():
156 sys_attributes.sort()
157 result = [(k, getattr(sys, k, _null)) for k in sys_attributes]
158 result = [t for t in result if not t[1] is _null]
159 result = [t[0] + '=' + repr(t[1]) for t in result]
160 return '\n '.join(result)
162 module_attributes = [
170 def get_module_info(module):
171 module_attributes.sort()
172 result = [(k, getattr(module, k, _null)) for k in module_attributes]
173 result = [t for t in result if not t[1] is _null]
174 result = [t[0] + '=' + repr(t[1]) for t in result]
175 return '\n '.join(result)
186 'INTEL_LICENSE_FILE',
216 def get_environment():
218 result = [(k, os.environ.get(k, _null)) for k in environ_keys]
219 result = [t for t in result if not t[1] is _null]
220 result = [t[0] + '-' + t[1] for t in result]
221 return '\n '.join(result)
223 class SConsXMLResultStream(XMLResultStream):
224 def __init__(self, *args, **kw):
225 super(SConsXMLResultStream, self).__init__(*args, **kw)
226 def WriteAllAnnotations(self, context):
227 # Load (by hand) the SCons modules we just unwrapped so we can
228 # extract their version information. Note that we have to override
229 # SCons.Script.main() with a do_nothing() function, because loading up
230 # the 'scons' script will actually try to execute SCons...
232 src_engine = os.environ.get('SCONS_LIB_DIR')
234 src_engine = os.path.join('src', 'engine')
235 fp, pname, desc = imp.find_module('SCons', [src_engine])
236 SCons = imp.load_module('SCons', fp, pname, desc)
238 # Override SCons.Script.main() with a do-nothing function, because
239 # loading the 'scons' script will actually try to execute SCons...
241 src_engine_SCons = os.path.join(src_engine, 'SCons')
242 fp, pname, desc = imp.find_module('Script', [src_engine_SCons])
243 SCons.Script = imp.load_module('Script', fp, pname, desc)
246 SCons.Script.main = do_nothing
248 scons_file = os.environ.get('SCONS')
250 src_script, scons_py = os.path.split(scons_file)
251 scons = os.path.splitext(scons_py)[0]
253 src_script = os.path.join('src', 'script')
255 fp, pname, desc = imp.find_module(scons, [src_script])
256 scons = imp.load_module('scons', fp, pname, desc)
259 self.WriteAnnotation("scons_test.engine", get_module_info(SCons))
260 self.WriteAnnotation("scons_test.script", get_module_info(scons))
262 self.WriteAnnotation("scons_test.sys", get_sys_values())
263 self.WriteAnnotation("scons_test.os.environ", get_environment())
265 class AegisStream(TextResultStream):
267 qm.fields.IntegerField(
269 title = "print individual test times",
275 def __init__(self, *args, **kw):
276 super(AegisStream, self).__init__(*args, **kw)
279 self._outcome_counts = {}
280 for outcome in AegisTest.aegis_outcomes:
281 self._outcome_counts[outcome] = 0
283 def _percent(self, outcome):
284 return 100. * self._outcome_counts[outcome] / self._num_tests
285 def _aegis_no_result(self, result):
286 outcome = result.GetOutcome()
287 return (outcome == Result.FAIL and result.get('Test.exit_code') == '2')
288 def _DisplayText(self, text):
289 # qm.common.html_to_text() uses htmllib, which sticks an extra
290 # '\n' on the front of the text. Strip it and only display
291 # the text if there's anything to display.
292 text = qm.common.html_to_text(text)
296 lines = text.splitlines()
299 self.file.write(' ' + '\n '.join(lines) + '\n\n')
300 def _DisplayResult(self, result, format):
301 test_id = result.GetId()
302 kind = result.GetKind()
303 if self._aegis_no_result(result):
304 outcome = "NO_RESULT"
306 outcome = result.GetOutcome()
307 self._WriteOutcome(test_id, kind, outcome)
308 self.file.write('\n')
309 def _DisplayAnnotations(self, result):
311 self._DisplayText(result["Test.stdout"])
315 self._DisplayText(result["Test.stderr"])
319 start = float(result['qmtest.start_time'])
320 end = float(result['qmtest.end_time'])
321 fmt = " Total execution time: %.1f seconds\n\n"
322 self.file.write(fmt % (end - start))
324 class AegisChangeStream(AegisStream):
325 def WriteResult(self, result):
326 test_id = result.GetId()
327 if self._aegis_no_result(result):
328 outcome = AegisTest.NO_RESULT
330 outcome = result.GetOutcome()
332 self._outcome_counts[outcome] += 1
333 super(AegisStream, self).WriteResult(result)
334 def _SummarizeTestStats(self):
335 self.file.write("\n")
336 self._DisplayHeading("STATISTICS")
337 if self._num_tests != 0:
338 # We'd like to use the _FormatStatistics() method to do
339 # this, but it's wrapped around the list in Result.outcomes,
340 # so it's simpler to just do it ourselves.
341 print " %6d tests total\n" % self._num_tests
342 for outcome in AegisTest.aegis_outcomes:
343 if self._outcome_counts[outcome] != 0:
344 print " %6d (%3.0f%%) tests %s" % (
345 self._outcome_counts[outcome],
346 self._percent(outcome),
350 class AegisBaselineStream(AegisStream):
351 def WriteResult(self, result):
352 test_id = result.GetId()
353 if self._aegis_no_result(result):
354 outcome = AegisTest.NO_RESULT
355 self.expected_outcomes[test_id] = Result.PASS
356 self._outcome_counts[outcome] += 1
358 self.expected_outcomes[test_id] = Result.FAIL
359 outcome = result.GetOutcome()
360 if outcome != Result.Fail:
361 self._outcome_counts[outcome] += 1
363 super(AegisStream, self).WriteResult(result)
364 def _SummarizeRelativeTestStats(self):
365 self.file.write("\n")
366 self._DisplayHeading("STATISTICS")
367 if self._num_tests != 0:
368 # We'd like to use the _FormatStatistics() method to do
369 # this, but it's wrapped around the list in Result.outcomes,
370 # so it's simpler to just do it ourselves.
371 if self._outcome_counts[AegisTest.FAIL]:
372 print " %6d (%3.0f%%) tests as expected" % (
373 self._outcome_counts[AegisTest.FAIL],
374 self._percent(AegisTest.FAIL),
376 non_fail_outcomes = list(AegisTest.aegis_outcomes[:])
377 non_fail_outcomes.remove(AegisTest.FAIL)
378 for outcome in non_fail_outcomes:
379 if self._outcome_counts[outcome] != 0:
380 print " %6d (%3.0f%%) tests unexpected %s" % (
381 self._outcome_counts[outcome],
382 self._percent(outcome),
386 class AegisBatchStream(FileResultStream):
387 def __init__(self, arguments):
388 super(AegisBatchStream, self).__init__(arguments)
390 def WriteResult(self, result):
391 test_id = result.GetId()
392 kind = result.GetKind()
393 outcome = result.GetOutcome()
395 if outcome == Result.FAIL:
396 exit_status = result.get('Test.exit_code')
397 self._outcomes[test_id] = exit_status
399 self.file.write('test_result = [\n')
400 file_names = self._outcomes.keys()
402 for file_name in file_names:
403 exit_status = self._outcomes[file_name]
404 file_name = file_name.replace('\\', '/')
405 self.file.write(' { file_name = "%s";\n' % file_name)
406 self.file.write(' exit_status = %s; },\n' % exit_status)
407 self.file.write('];\n')
409 class AegisTest(test.Test):
412 NO_RESULT = "NO_RESULT"
414 UNTESTED = "UNTESTED"
417 PASS, FAIL, NO_RESULT, ERROR, UNTESTED,
419 """Aegis test outcomes."""
421 class Test(AegisTest):
422 """Simple test that runs a python script and checks the status
423 to determine whether the test passes."""
425 script = TextField(title="Script to test")
426 topdir = TextField(title="Top source directory")
428 def Run(self, context, result):
429 """Run the test. The test passes if the command exits with status=0,
430 and fails otherwise. The program output is logged, but not validated."""
432 command = RedirectedExecutable()
433 args = [context.get('python', sys.executable), '-tt', self.script]
434 status = command.Run(args, os.environ)
435 if not check_exit_status(result, 'Test.', self.script, status):
436 # In case of failure record exit code, stdout, and stderr.
437 result.Fail("Non-zero exit_code.")
438 result["Test.stdout"] = result.Quote(command.stdout)
439 result["Test.stderr"] = result.Quote(command.stderr)
442 class Database(database.Database):
443 """Scons test database.
444 * The 'src' and 'test' directories are explicit suites.
445 * Their subdirectories are implicit suites.
446 * All files under 'src/' ending with 'Tests.py' contain tests.
447 * All files under 'test/' with extension '.py' contain tests.
448 * Right now there is only a single test class, which simply runs
449 the specified python interpreter on the given script. To be refined..."""
451 srcdir = TextField(title = "Source Directory",
452 description = "The root of the test suite's source tree.")
453 _is_generic_database = True
455 def is_a_test_under_test(path, t):
456 return os.path.splitext(t)[1] == '.py' \
457 and os.path.isfile(os.path.join(path, t))
459 def is_a_test_under_src(path, t):
460 return t[-8:] == 'Tests.py' \
461 and os.path.isfile(os.path.join(path, t))
464 'src' : is_a_test_under_src,
465 'test' : is_a_test_under_test,
473 def is_a_test_subdir(path, subdir):
474 if exclude_subdirs.get(subdir):
476 return os.path.isdir(os.path.join(path, subdir))
478 def __init__(self, path, arguments):
480 self.label_class = "file_label.FileLabel"
481 self.modifiable = "false"
482 # Initialize the base class.
483 super(Database, self).__init__(path, arguments)
491 def GetSubdirectories(self, directory):
493 components = self.GetLabelComponents(directory)
494 path = os.path.join(self.GetRoot(), *components)
496 dirs = [d for d in dircache.listdir(path)
497 if os.path.isdir(os.path.join(path, d))]
499 dirs = self.is_a_test.keys()
505 def GetIds(self, kind, directory = "", scan_subdirs = 1):
507 components = self.GetLabelComponents(directory)
508 path = os.path.join(self.GetRoot(), *components)
510 if kind == database.Database.TEST:
515 ids = [self.JoinLabels(directory, t)
516 for t in dircache.listdir(path)
517 if self.is_a_test[components[0]](path, t)]
519 elif kind == Database.RESOURCE:
520 return [] # no resources yet
525 ids = [self.JoinLabels(directory, d)
526 for d in dircache.listdir(path)
527 if os.path.isdir(os.path.join(path, d))]
529 ids = self.is_a_test.keys()
532 for d in dircache.listdir(path):
533 if (os.path.isdir(d)):
534 ids.extend(self.GetIds(kind,
535 self.JoinLabels(directory, d),
541 def GetExtension(self, id):
544 return DirectorySuite(self, id)
546 components = self.GetLabelComponents(id)
547 path = os.path.join(self.GetRoot(), *components)
549 if os.path.isdir(path): # a directory
550 return DirectorySuite(self, id)
552 elif os.path.isfile(path): # a test
555 arguments['script'] = path
556 arguments['topdir'] = self.GetRoot()
558 return Test(arguments, qmtest_id = id, qmtest_database = self)
560 else: # nothing else to offer
565 def GetTest(self, test_id):
566 """This method can be removed once QMTest 2.4 is out."""
568 t = self.GetExtension(test_id)
569 if isinstance(t, test.Test):
570 return database.TestDescriptor(self,
572 get_extension_class_name(t.__class__),
573 get_explicit_arguments(t))
575 raise database.NoSuchTestError(test_id)
577 def GetSuite(self, suite_id):
578 """This method can be removed once QMTest 2.4 is out."""
581 return DirectorySuite(self, "")
583 s = self.GetExtension(suite_id)
584 if isinstance(s, suite.Suite):
587 raise database.NoSuchSuiteError(suite_id)
590 def GetResource(self, resource_id):
591 """This method can be removed once QMTest 2.4 is out."""
593 r = self.GetExtension(resource_id)
594 if isinstance(r, resource.Resource):
595 return ResourceDescriptor(self,
597 get_extension_class_name(r.__class__),
598 get_explicit_arguments(r))
600 raise database.NoSuchResourceError(resource_id)
604 # indent-tabs-mode:nil
606 # vim: set expandtab tabstop=4 shiftwidth=4: