--- /dev/null
+TestCmd.py: a testing framework for commands and scripts.
+The TestCmd module provides a framework for portable automated testing
+of executable commands and scripts (in any language, not just Python),
+especially commands and scripts that require file system interaction.
+In addition to running tests and evaluating conditions, the TestCmd module
+manages and cleans up one or more temporary workspace directories, and
+provides methods for creating files and directories in those workspace
+directories from in-line data, here-documents), allowing tests to be
+completely self-contained.
+A TestCmd environment object is created via the usual invocation:
+ test = TestCmd()
+The TestCmd module provides pass_test(), fail_test(), and no_result()
+unbound methods that report test results for use with the Aegis change
+management system. These methods terminate the test immediately,
+reporting PASSED, FAILED, or NO RESULT respectively, and exiting with
+status 0 (success), 1 or 2 respectively. This allows for a distinction
+between an actual failed test and a test that could not be properly
+evaluated because of an external condition (such as a full file system
+or incorrect permissions).
+# Copyright 2000 Steven Knight
+# This module is free software, and you may redistribute it and/or modify
+# it under the same terms as Python itself, so long as this copyright message
+# and disclaimer are retained in their original form.
+from string import join, split
+__author__ = "Steven Knight <knight@baldmt.com>"
+__revision__ = "TestCmd.py 0.D001 2001/01/14 00:43:41 software"
+__version__ = "0.01"
+from types import *
+import FCNTL
+import os
+import os.path
+import popen2
+import re
+import shutil
+import stat
+import sys
+import tempfile
+import traceback
+tempfile.template = 'testcmd.'
+_Cleanup = []
+def _clean():
+ global _Cleanup
+ list = _Cleanup[:]
+ _Cleanup = []
+ list.reverse()
+ for test in list:
+ test.cleanup()
+sys.exitfunc = _clean
+def _caller(tblist, skip):
+ string = ""
+ arr = []
+ for file, line, name, text in tblist:
+ if file[-10:] == "TestCmd.py":
+ break
+ arr = [(file, line, name, text)] + arr
+ atfrom = "at"
+ for file, line, name, text in arr[skip:]:
+ if name == "?":
+ name = ""
+ else:
+ name = " (" + name + ")"
+ string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name))
+ atfrom = "\tfrom"
+ return string
+def fail_test(self = None, condition = 1, function = None, skip = 0):
+ """Cause the test to fail.
+ By default, the fail_test() method reports that the test FAILED
+ and exits with a status of 1. If a condition argument is supplied,
+ the test fails only if the condition is true.
+ """
+ if not condition:
+ return
+ if not function is None:
+ function()
+ of = ""
+ desc = ""
+ sep = " "
+ if not self is None:
+ if self.program:
+ of = " of " + self.program
+ sep = "\n\t"
+ if self.description:
+ desc = " [" + self.description + "]"
+ sep = "\n\t"
+ at = _caller(traceback.extract_stack(), skip)
+ sys.stderr.write("FAILED test" + of + desc + sep + at)
+ sys.exit(1)
+def no_result(self = None, condition = 1, function = None, skip = 0):
+ """Causes a test to exit with no valid result.
+ By default, the no_result() method reports NO RESULT for the test
+ and exits with a status of 2. If a condition argument is supplied,
+ the test fails only if the condition is true.
+ """
+ if not condition:
+ return
+ if not function is None:
+ function()
+ of = ""
+ desc = ""
+ sep = " "
+ if not self is None:
+ if self.program:
+ of = " of " + self.program
+ sep = "\n\t"
+ if self.description:
+ desc = " [" + self.description + "]"
+ sep = "\n\t"
+ at = _caller(traceback.extract_stack(), skip)
+ sys.stderr.write("NO RESULT for test" + of + desc + sep + at)
+ sys.exit(2)
+def pass_test(self = None, condition = 1, function = None):
+ """Causes a test to pass.
+ By default, the pass_test() method reports PASSED for the test
+ and exits with a status of 0. If a condition argument is supplied,
+ the test passes only if the condition is true.
+ """
+ if not condition:
+ return
+ if not function is None:
+ function()
+ sys.stderr.write("PASSED\n")
+ sys.exit(0)
+def match_exact(lines = None, matches = None):
+ """
+ """
+ if not type(lines) is ListType:
+ lines = split(lines, "\n")
+ if not type(matches) is ListType:
+ matches = split(matches, "\n")
+ if len(lines) != len(matches):
+ return
+ for i in range(len(lines)):
+ if lines[i] != matches[i]:
+ return
+ return 1
+def match_re(lines = None, res = None):
+ """
+ """
+ if not type(lines) is ListType:
+ lines = split(lines, "\n")
+ if not type(res) is ListType:
+ res = split(res, "\n")
+ if len(lines) != len(res):
+ return
+ for i in range(len(lines)):
+ if not re.compile("^" + res[i] + "$").search(lines[i]):
+ return
+ return 1
+class TestCmd:
+ """Class TestCmd
+ """
+ def __init__(self, description = None,
+ program = None,
+ interpreter = None,
+ workdir = None,
+ subdir = None,
+ verbose = 0,
+ match = None):
+ self._cwd = os.getcwd()
+ self.description_set(description)
+ self.program_set(program)
+ self.interpreter_set(interpreter)
+ self.verbose_set(verbose)
+ if not match is None:
+ self.match_func = match
+ else:
+ self.match_func = match_re
+ self._dirlist = []
+ self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0}
+ if os.environ.has_key('PRESERVE') and not os.environ['PRESERVE'] is '':
+ self._preserve['pass_test'] = os.environ['PRESERVE']
+ self._preserve['fail_test'] = os.environ['PRESERVE']
+ self._preserve['no_result'] = os.environ['PRESERVE']
+ else:
+ try:
+ self._preserve['pass_test'] = os.environ['PRESERVE_PASS']
+ except KeyError:
+ pass
+ try:
+ self._preserve['fail_test'] = os.environ['PRESERVE_FAIL']
+ except KeyError:
+ pass
+ try:
+ self._preserve['no_result'] = os.environ['PRESERVE_NO_RESULT']
+ except KeyError:
+ pass
+ self._stdout = []
+ self._stderr = []
+ self.status = None
+ self.condition = 'no_result'
+ self.workdir_set(workdir)
+ self.subdir(subdir)
+ def __del__(self):
+ self.cleanup()
+ def __repr__(self):
+ return "%x" % id(self)
+ def cleanup(self, condition = None):
+ """Removes any temporary working directories for the specified
+ TestCmd environment. If the environment variable PRESERVE was
+ set when the TestCmd environment was created, temporary working
+ directories are not removed. If any of the environment variables
+ when the TestCmd environment was created, then temporary working
+ directories are not removed if the test passed, failed, or had
+ no result, respectively. Temporary working directories are also
+ preserved for conditions specified via the preserve method.
+ Typically, this method is not called directly, but is used when
+ the script exits to clean up temporary working directories as
+ appropriate for the exit status.
+ """
+ if not self._dirlist:
+ return
+ if condition is None:
+ condition = self.condition
+ #print "cleanup(" + condition + "): ", self._preserve
+ if self._preserve[condition]:
+ return
+ os.chdir(self._cwd)
+ self.workdir = None
+ list = self._dirlist[:]
+ self._dirlist = []
+ list.reverse()
+ for dir in list:
+ self.writable(dir, 1)
+ shutil.rmtree(dir, ignore_errors = 1)
+ try:
+ global _Cleanup
+ _Cleanup.remove(self)
+ except (AttributeError, ValueError):
+ pass
+ def description_set(self, description):
+ """Set the description of the functionality being tested.
+ """
+ self.description = description
+# def diff(self):
+# """Diff two arrays.
+# """
+ def fail_test(self, condition = 1, function = None, skip = 0):
+ """Cause the test to fail.
+ """
+ if not condition:
+ return
+ self.condition = 'fail_test'
+ fail_test(self = self,
+ condition = condition,
+ function = function,
+ skip = skip)
+ def interpreter_set(self, interpreter):
+ """Set the program to be used to interpret the program
+ under test as a script.
+ """
+ self.interpreter = interpreter
+ def match(self, lines, matches):
+ """Compare actual and expected file contents.
+ """
+ return self.match_func(lines, matches)
+ def match_exact(self, lines, matches):
+ """Compare actual and expected file contents.
+ """
+ return match_exact(lines, matches)
+ def match_re(self, lines, res):
+ """Compare actual and expected file contents.
+ """
+ return match_re(lines, res)
+ def no_result(self, condition = 1, function = None, skip = 0):
+ """Report that the test could not be run.
+ """
+ if not condition:
+ return
+ self.condition = 'no_result'
+ no_result(self = self,
+ condition = condition,
+ function = function,
+ skip = skip)
+ def pass_test(self, condition = 1, function = None):
+ """Cause the test to pass.
+ """
+ if not condition:
+ return
+ self.condition = 'pass_test'
+ pass_test(self = self, condition = condition, function = function)
+ def preserve(self, *conditions):
+ """Arrange for the temporary working directories for the
+ specified TestCmd environment to be preserved for one or more
+ conditions. If no conditions are specified, arranges for
+ the temporary working directories to be preserved for all
+ conditions.
+ """
+ if conditions is ():
+ conditions = ('pass_test', 'fail_test', 'no_result')
+ for cond in conditions:
+ self._preserve[cond] = 1
+ def program_set(self, program):
+ """Set the executable program or script to be tested.
+ """
+ if program and not os.path.isabs(program):
+ program = os.path.join(self._cwd, program)
+ self.program = program
+ def read(self, file):
+ """Reads and returns the contents of the specified file name.
+ The file name may be a list, in which case the elements are
+ concatenated with the os.path.join() method. The file is
+ assumed to be under the temporary working directory unless it
+ is an absolute path name.
+ """
+ if type(file) is ListType:
+ file = apply(os.path.join, tuple(file))
+ if not os.path.isabs(file):
+ file = os.path.join(self.workdir, file)
+ f = os.fdopen(os.open(file, FCNTL.O_RDONLY))
+ contents = f.read()
+ f.close()
+ return contents
+ def run(self, program = None,
+ interpreter = None,
+ arguments = None,
+ chdir = None,
+ stdin = None):
+ """Runs a test of the program or script for the test
+ environment. Standard output and error output are saved for
+ future retrieval via the stdout() and stderr() methods.
+ """
+ if chdir:
+ oldcwd = os.getcwd()
+ if not os.path.isabs(chdir):
+ chdir = os.path.join(self.workpath(chdir))
+ if self.verbose:
+ sys.stderr.write("chdir(" + chdir + ")\n")
+ os.chdir(chdir)
+ cmd = None
+ if program:
+ if not os.path.isabs(program):
+ program = os.path.join(self._cwd, program)
+ cmd = program
+ if interpreter:
+ cmd = interpreter + " " + cmd
+ else:
+ cmd = self.program
+ if self.interpreter:
+ cmd = self.interpreter + " " + cmd
+ if arguments:
+ cmd = cmd + " " + arguments
+ if self.verbose:
+ sys.stderr.write(cmd + "\n")
+ p = popen2.Popen3(cmd, 1)
+ if stdin:
+ if type(stdin) is ListType:
+ for line in stdin:
+ p.tochild.write(line)
+ else:
+ p.tochild.write(stdin)
+ p.tochild.close()
+ self._stdout.append(p.fromchild.read())
+ self._stderr.append(p.childerr.read())
+ self.status = p.wait()
+ if chdir:
+ os.chdir(oldcwd)
+ def stderr(self, run = None):
+ """Returns the error output from the specified run number.
+ If there is no specified run number, then returns the error
+ output of the last run. If the run number is less than zero,
+ then returns the error output from that many runs back from the
+ current run.
+ """
+ if not run:
+ run = len(self._stderr)
+ elif run < 0:
+ run = len(self._stderr) + run
+ run = run - 1
+ return self._stderr[run]
+ def stdout(self, run = None):
+ """Returns the standard output from the specified run number.
+ If there is no specified run number, then returns the standard
+ output of the last run. If the run number is less than zero,
+ then returns the standard output from that many runs back from
+ the current run.
+ """
+ if not run:
+ run = len(self._stdout)
+ elif run < 0:
+ run = len(self._stdout) + run
+ run = run - 1
+ return self._stdout[run]
+ def subdir(self, *subdirs):
+ """Create new subdirectories under the temporary working
+ directory, one for each argument. An argument may be a list,
+ in which case the list elements are concatenated using the
+ os.path.join() method. Subdirectories multiple levels deep
+ must be created using a separate argument for each level:
+ test.subdir('sub', ['sub', 'dir'], ['sub', 'dir', 'ectory'])
+ Returns the number of subdirectories actually created.
+ """
+ count = 0
+ for sub in subdirs:
+ if sub is None:
+ continue
+ if type(sub) is ListType:
+ sub = apply(os.path.join, tuple(sub))
+ new = os.path.join(self.workdir, sub)
+ try:
+ os.mkdir(new)
+ except:
+ pass
+ else:
+ count = count + 1
+ return count
+ def verbose_set(self, verbose):
+ """Set the verbose level.
+ """
+ self.verbose = verbose
+ def workdir_set(self, path):
+ """Creates a temporary working directory with the specified
+ path name. If the path is a null string (''), a unique
+ directory name is created.
+ """
+ if (path != None):
+ if path == '':
+ path = tempfile.mktemp()
+ if path != None:
+ os.mkdir(path)
+ self._dirlist.append(path)
+ global _Cleanup
+ try:
+ _Cleanup.index(self)
+ except ValueError:
+ _Cleanup.append(self)
+ # We'd like to set self.workdir like this:
+ # self.workdir = path
+ # But symlinks in the path will report things
+ # differently from os.getcwd(), so chdir there
+ # and back to fetch the canonical path.
+ cwd = os.getcwd()
+ os.chdir(path)
+ self.workdir = os.getcwd()
+ os.chdir(cwd)
+ else:
+ self.workdir = None
+ def workpath(self, *args):
+ """Returns the absolute path name to a subdirectory or file
+ within the current temporary working directory. Concatenates
+ the temporary working directory name with the specified
+ arguments using the os.path.join() method.
+ """
+ return apply(os.path.join, (self.workdir,) + tuple(args))
+ def writable(self, top, write):
+ """Make the specified directory tree writable (write == 1)
+ or not (write == None).
+ """
+ def _walk_chmod(arg, dirname, names):
+ st = os.stat(dirname)
+ os.chmod(dirname, arg(st[stat.ST_MODE]))
+ for name in names:
+ n = os.path.join(dirname, name)
+ st = os.stat(n)
+ os.chmod(n, arg(st[stat.ST_MODE]))
+ def _mode_writable(mode):
+ return stat.S_IMODE(mode|0200)
+ def _mode_non_writable(mode):
+ return stat.S_IMODE(mode&~0200)
+ if write:
+ f = _mode_writable
+ else:
+ f = _mode_non_writable
+ os.path.walk(top, _walk_chmod, f)
+ def write(self, file, content):
+ """Writes the specified content text (second argument) to the
+ specified file name (first argument). The file name may be
+ a list, in which case the elements are concatenated with the
+ os.path.join() method. The file is created under the temporary
+ working directory. Any subdirectories in the path must already
+ exist. """
+ if type(file) is ListType:
+ file = apply(os.path.join, tuple(file))
+ if not os.path.isabs(file):
+ file = os.path.join(self.workdir, file)
+ fd = os.open(file, FCNTL.O_CREAT|FCNTL.O_WRONLY)
+ os.write(fd, content)
+ os.close(fd)
--- /dev/null
+#!/usr/bin/env python
+Python unit testing framework, based on Erich Gamma's JUnit and Kent Beck's
+Smalltalk testing framework.
+Further information is available in the bundled documentation, and from
+ http://pyunit.sourceforge.net/
+This module contains the core framework classes that form the basis of
+specific test cases and suites (TestCase, TestSuite etc.), and also a
+text-based utility class for running the tests and reporting the results
+Copyright (c) 1999, 2000, 2001 Steve Purcell
+This module is free software, and you may redistribute it and/or modify
+it under the same terms as Python itself, so long as this copyright message
+and disclaimer are retained in their original form.
+__author__ = "Steve Purcell (stephen_purcell@yahoo.com)"
+__version__ = "$ Revision: 1.23 $"[11:-2]
+import time
+import sys
+import traceback
+import string
+import os
+# A platform-specific concession to help the code work for JPython users
+plat = string.lower(sys.platform)
+_isJPython = string.find(plat, 'java') >= 0 or string.find(plat, 'jdk') >= 0
+del plat
+# Test framework core
+class TestResult:
+ """Holder for test result information.
+ Test results are automatically managed by the TestCase and TestSuite
+ classes, and do not need to be explicitly manipulated by writers of tests.
+ Each instance holds the total number of tests run, and collections of
+ failures and errors that occurred among those test runs. The collections
+ contain tuples of (testcase, exceptioninfo), where exceptioninfo is a
+ tuple of values as returned by sys.exc_info().
+ """
+ def __init__(self):
+ self.failures = []
+ self.errors = []
+ self.testsRun = 0
+ self.shouldStop = 0
+ def startTest(self, test):
+ "Called when the given test is about to be run"
+ self.testsRun = self.testsRun + 1
+ def stopTest(self, test):
+ "Called when the given test has been run"
+ pass
+ def addError(self, test, err):
+ "Called when an error has occurred"
+ self.errors.append((test, err))
+ def addFailure(self, test, err):
+ "Called when a failure has occurred"
+ self.failures.append((test, err))
+ def wasSuccessful(self):
+ "Tells whether or not this result was a success"
+ return len(self.failures) == len(self.errors) == 0
+ def stop(self):
+ "Indicates that the tests should be aborted"
+ self.shouldStop = 1
+ def __repr__(self):
+ return "<%s run=%i errors=%i failures=%i>" % \
+ (self.__class__, self.testsRun, len(self.errors),
+ len(self.failures))
+class TestCase:
+ """A class whose instances are single test cases.
+ Test authors should subclass TestCase for their own tests. Construction
+ and deconstruction of the test's environment ('fixture') can be
+ implemented by overriding the 'setUp' and 'tearDown' methods respectively.
+ By default, the test code itself should be placed in a method named
+ 'runTest'.
+ If the fixture may be used for many test cases, create as
+ many test methods as are needed. When instantiating such a TestCase
+ subclass, specify in the constructor arguments the name of the test method
+ that the instance is to execute.
+ If it is necessary to override the __init__ method, the base class
+ __init__ method must always be called.
+ """
+ def __init__(self, methodName='runTest'):
+ """Create an instance of the class that will use the named test
+ method when executed. Raises a ValueError if the instance does
+ not have a method with the specified name.
+ """
+ try:
+ self.__testMethod = getattr(self,methodName)
+ except AttributeError:
+ raise ValueError, "no such test method in %s: %s" % \
+ (self.__class__, methodName)
+ def setUp(self):
+ "Hook method for setting up the test fixture before exercising it."
+ pass
+ def tearDown(self):
+ "Hook method for deconstructing the test fixture after testing it."
+ pass
+ def countTestCases(self):
+ return 1
+ def defaultTestResult(self):
+ return TestResult()
+ def shortDescription(self):
+ """Returns a one-line description of the test, or None if no
+ description has been provided.
+ The default implementation of this method returns the first line of
+ the specified test method's docstring.
+ """
+ doc = self.__testMethod.__doc__
+ return doc and string.strip(string.split(doc, "\n")[0]) or None
+ def id(self):
+ return "%s.%s" % (self.__class__, self.__testMethod.__name__)
+ def __str__(self):
+ return "%s (%s)" % (self.__testMethod.__name__, self.__class__)
+ def __repr__(self):
+ return "<%s testMethod=%s>" % \
+ (self.__class__, self.__testMethod.__name__)
+ def run(self, result=None):
+ return self(result)
+ def __call__(self, result=None):
+ if result is None: result = self.defaultTestResult()
+ result.startTest(self)
+ try:
+ try:
+ self.setUp()
+ except:
+ result.addError(self,self.__exc_info())
+ return
+ try:
+ self.__testMethod()
+ except AssertionError, e:
+ result.addFailure(self,self.__exc_info())
+ except:
+ result.addError(self,self.__exc_info())
+ try:
+ self.tearDown()
+ except:
+ result.addError(self,self.__exc_info())
+ finally:
+ result.stopTest(self)
+ def debug(self):
+ """Run the test without collecting errors in a TestResult"""
+ self.setUp()
+ self.__testMethod()
+ self.tearDown()
+ def assert_(self, expr, msg=None):
+ """Equivalent of built-in 'assert', but is not optimised out when
+ __debug__ is false.
+ """
+ if not expr:
+ raise AssertionError, msg
+ failUnless = assert_
+ def failIf(self, expr, msg=None):
+ "Fail the test if the expression is true."
+ apply(self.assert_,(not expr,msg))
+ def assertRaises(self, excClass, callableObj, *args, **kwargs):
+ """Assert that an exception of class excClass is thrown
+ by callableObj when invoked with arguments args and keyword
+ arguments kwargs. If a different type of exception is
+ thrown, it will not be caught, and the test case will be
+ deemed to have suffered an error, exactly as for an
+ unexpected exception.
+ """
+ try:
+ apply(callableObj, args, kwargs)
+ except excClass:
+ return
+ else:
+ if hasattr(excClass,'__name__'): excName = excClass.__name__
+ else: excName = str(excClass)
+ raise AssertionError, excName
+ def fail(self, msg=None):
+ """Fail immediately, with the given message."""
+ raise AssertionError, msg
+ def __exc_info(self):
+ """Return a version of sys.exc_info() with the traceback frame
+ minimised; usually the top level of the traceback frame is not
+ needed.
+ """
+ exctype, excvalue, tb = sys.exc_info()
+ newtb = tb.tb_next
+ if newtb is None:
+ return (exctype, excvalue, tb)
+ return (exctype, excvalue, newtb)
+class TestSuite:
+ """A test suite is a composite test consisting of a number of TestCases.
+ For use, create an instance of TestSuite, then add test case instances.
+ When all tests have been added, the suite can be passed to a test
+ runner, such as TextTestRunner. It will run the individual test cases
+ in the order in which they were added, aggregating the results. When
+ subclassing, do not forget to call the base class constructor.
+ """
+ def __init__(self, tests=()):
+ self._tests = []
+ self.addTests(tests)
+ def __repr__(self):
+ return "<%s tests=%s>" % (self.__class__, self._tests)
+ __str__ = __repr__
+ def countTestCases(self):
+ cases = 0
+ for test in self._tests:
+ cases = cases + test.countTestCases()
+ return cases
+ def addTest(self, test):
+ self._tests.append(test)
+ def addTests(self, tests):
+ for test in tests:
+ self.addTest(test)
+ def run(self, result):
+ return self(result)
+ def __call__(self, result):
+ for test in self._tests:
+ if result.shouldStop:
+ break
+ test(result)
+ return result
+ def debug(self):
+ """Run the tests without collecting errors in a TestResult"""
+ for test in self._tests: test.debug()
+class FunctionTestCase(TestCase):
+ """A test case that wraps a test function.
+ This is useful for slipping pre-existing test functions into the
+ PyUnit framework. Optionally, set-up and tidy-up functions can be
+ supplied. As with TestCase, the tidy-up ('tearDown') function will
+ always be called if the set-up ('setUp') function ran successfully.
+ """
+ def __init__(self, testFunc, setUp=None, tearDown=None,
+ description=None):
+ TestCase.__init__(self)
+ self.__setUpFunc = setUp
+ self.__tearDownFunc = tearDown
+ self.__testFunc = testFunc
+ self.__description = description
+ def setUp(self):
+ if self.__setUpFunc is not None:
+ self.__setUpFunc()
+ def tearDown(self):
+ if self.__tearDownFunc is not None:
+ self.__tearDownFunc()
+ def runTest(self):
+ self.__testFunc()
+ def id(self):
+ return self.__testFunc.__name__
+ def __str__(self):
+ return "%s (%s)" % (self.__class__, self.__testFunc.__name__)
+ def __repr__(self):
+ return "<%s testFunc=%s>" % (self.__class__, self.__testFunc)
+ def shortDescription(self):
+ if self.__description is not None: return self.__description
+ doc = self.__testFunc.__doc__
+ return doc and string.strip(string.split(doc, "\n")[0]) or None
+# Convenience functions
+def getTestCaseNames(testCaseClass, prefix, sortUsing=cmp):
+ """Extracts all the names of functions in the given test case class
+ and its base classes that start with the given prefix. This is used
+ by makeSuite().
+ """
+ testFnNames = filter(lambda n,p=prefix: n[:len(p)] == p,
+ dir(testCaseClass))
+ for baseclass in testCaseClass.__bases__:
+ testFnNames = testFnNames + \
+ getTestCaseNames(baseclass, prefix, sortUsing=None)
+ if sortUsing:
+ testFnNames.sort(sortUsing)
+ return testFnNames
+def makeSuite(testCaseClass, prefix='test', sortUsing=cmp):
+ """Returns a TestSuite instance built from all of the test functions
+ in the given test case class whose names begin with the given
+ prefix. The cases are sorted by their function names
+ using the supplied comparison function, which defaults to 'cmp'.
+ """
+ cases = map(testCaseClass,
+ getTestCaseNames(testCaseClass, prefix, sortUsing))
+ return TestSuite(cases)
+def createTestInstance(name, module=None):
+ """Finds tests by their name, optionally only within the given module.
+ Return the newly-constructed test, ready to run. If the name contains a ':'
+ then the portion of the name after the colon is used to find a specific
+ test case within the test case class named before the colon.
+ Examples:
+ findTest('examples.listtests.suite')
+ -- returns result of calling 'suite'
+ findTest('examples.listtests.ListTestCase:checkAppend')
+ -- returns result of calling ListTestCase('checkAppend')
+ findTest('examples.listtests.ListTestCase:check-')
+ -- returns result of calling makeSuite(ListTestCase, prefix="check")
+ """
+ spec = string.split(name, ':')
+ if len(spec) > 2: raise ValueError, "illegal test name: %s" % name
+ if len(spec) == 1:
+ testName = spec[0]
+ caseName = None
+ else:
+ testName, caseName = spec
+ parts = string.split(testName, '.')
+ if module is None:
+ if len(parts) < 2:
+ raise ValueError, "incomplete test name: %s" % name
+ constructor = __import__(string.join(parts[:-1],'.'))
+ parts = parts[1:]
+ else:
+ constructor = module
+ for part in parts:
+ constructor = getattr(constructor, part)
+ if not callable(constructor):
+ raise ValueError, "%s is not a callable object" % constructor
+ if caseName:
+ if caseName[-1] == '-':
+ prefix = caseName[:-1]
+ if not prefix:
+ raise ValueError, "prefix too short: %s" % name
+ test = makeSuite(constructor, prefix=prefix)
+ else:
+ test = constructor(caseName)
+ else:
+ test = constructor()
+ if not hasattr(test,"countTestCases"):
+ raise TypeError, \
+ "object %s found with spec %s is not a test" % (test, name)
+ return test
+# Text UI
+class _WritelnDecorator:
+ """Used to decorate file-like objects with a handy 'writeln' method"""
+ def __init__(self,stream):
+ self.stream = stream
+ if _isJPython:
+ import java.lang.System
+ self.linesep = java.lang.System.getProperty("line.separator")
+ else:
+ self.linesep = os.linesep
+ def __getattr__(self, attr):
+ return getattr(self.stream,attr)
+ def writeln(self, *args):
+ if args: apply(self.write, args)
+ self.write(self.linesep)
+class _JUnitTextTestResult(TestResult):
+ """A test result class that can print formatted text results to a stream.
+ Used by JUnitTextTestRunner.
+ """
+ def __init__(self, stream):
+ self.stream = stream
+ TestResult.__init__(self)
+ def addError(self, test, error):
+ TestResult.addError(self,test,error)
+ self.stream.write('E')
+ self.stream.flush()
+ if error[0] is KeyboardInterrupt:
+ self.shouldStop = 1
+ def addFailure(self, test, error):
+ TestResult.addFailure(self,test,error)
+ self.stream.write('F')
+ self.stream.flush()
+ def startTest(self, test):
+ TestResult.startTest(self,test)
+ self.stream.write('.')
+ self.stream.flush()
+ def printNumberedErrors(self,errFlavour,errors):
+ if not errors: return
+ if len(errors) == 1:
+ self.stream.writeln("There was 1 %s:" % errFlavour)
+ else:
+ self.stream.writeln("There were %i %ss:" %
+ (len(errors), errFlavour))
+ i = 1
+ for test,error in errors:
+ errString = string.join(apply(traceback.format_exception,error),"")
+ self.stream.writeln("%i) %s" % (i, test))
+ self.stream.writeln(errString)
+ i = i + 1
+ def printErrors(self):
+ self.printNumberedErrors("error",self.errors)
+ def printFailures(self):
+ self.printNumberedErrors("failure",self.failures)
+ def printHeader(self):
+ self.stream.writeln()
+ if self.wasSuccessful():
+ self.stream.writeln("OK (%i tests)" % self.testsRun)
+ else:
+ self.stream.writeln("!!!FAILURES!!!")
+ self.stream.writeln("Test Results")
+ self.stream.writeln()
+ self.stream.writeln("Run: %i ; Failures: %i ; Errors: %i" %
+ (self.testsRun, len(self.failures),
+ len(self.errors)))
+ def printResult(self):
+ self.printHeader()
+ self.printErrors()
+ self.printFailures()
+class JUnitTextTestRunner:
+ """A test runner class that displays results in textual form.
+ The display format approximates that of JUnit's 'textui' test runner.
+ This test runner may be removed in a future version of PyUnit.
+ """
+ def __init__(self, stream=sys.stderr):
+ self.stream = _WritelnDecorator(stream)
+ def run(self, test):
+ "Run the given test case or test suite."
+ result = _JUnitTextTestResult(self.stream)
+ startTime = time.time()
+ test(result)
+ stopTime = time.time()
+ self.stream.writeln()
+ self.stream.writeln("Time: %.3fs" % float(stopTime - startTime))
+ result.printResult()
+ return result
+# Verbose text UI
+class _VerboseTextTestResult(TestResult):
+ """A test result class that can print formatted text results to a stream.
+ Used by VerboseTextTestRunner.
+ """
+ def __init__(self, stream, descriptions):
+ TestResult.__init__(self)
+ self.stream = stream
+ self.lastFailure = None
+ self.descriptions = descriptions
+ def startTest(self, test):
+ TestResult.startTest(self, test)
+ if self.descriptions:
+ self.stream.write(test.shortDescription() or str(test))
+ else:
+ self.stream.write(str(test))
+ self.stream.write(" ... ")
+ def stopTest(self, test):
+ TestResult.stopTest(self, test)
+ if self.lastFailure is not test:
+ self.stream.writeln("ok")
+ def addError(self, test, err):
+ TestResult.addError(self, test, err)
+ self._printError("ERROR", test, err)
+ self.lastFailure = test
+ if err[0] is KeyboardInterrupt:
+ self.shouldStop = 1
+ def addFailure(self, test, err):
+ TestResult.addFailure(self, test, err)
+ self._printError("FAIL", test, err)
+ self.lastFailure = test
+ def _printError(self, flavour, test, err):
+ errLines = []
+ separator1 = "\t" + '=' * 70
+ separator2 = "\t" + '-' * 70
+ if not self.lastFailure is test:
+ self.stream.writeln()
+ self.stream.writeln(separator1)
+ self.stream.writeln("\t%s" % flavour)
+ self.stream.writeln(separator2)
+ for line in apply(traceback.format_exception, err):
+ for l in string.split(line,"\n")[:-1]:
+ self.stream.writeln("\t%s" % l)
+ self.stream.writeln(separator1)
+class VerboseTextTestRunner:
+ """A test runner class that displays results in textual form.
+ It prints out the names of tests as they are run, errors as they
+ occur, and a summary of the results at the end of the test run.
+ """
+ def __init__(self, stream=sys.stderr, descriptions=1):
+ self.stream = _WritelnDecorator(stream)
+ self.descriptions = descriptions
+ def run(self, test):
+ "Run the given test case or test suite."
+ result = _VerboseTextTestResult(self.stream, self.descriptions)
+ startTime = time.time()
+ test(result)
+ stopTime = time.time()
+ timeTaken = float(stopTime - startTime)
+ self.stream.writeln("-" * 78)
+ run = result.testsRun
+ self.stream.writeln("Ran %d test%s in %.3fs" %
+ (run, run > 1 and "s" or "", timeTaken))
+ self.stream.writeln()
+ if not result.wasSuccessful():
+ self.stream.write("FAILED (")
+ failed, errored = map(len, (result.failures, result.errors))
+ if failed:
+ self.stream.write("failures=%d" % failed)
+ if errored:
+ if failed: self.stream.write(", ")
+ self.stream.write("errors=%d" % errored)
+ self.stream.writeln(")")
+ else:
+ self.stream.writeln("OK")
+ return result
+# Which flavour of TextTestRunner is the default?
+TextTestRunner = VerboseTextTestRunner
+# Facilities for running tests from the command line
+class TestProgram:
+ """A command-line program that runs a set of tests; this is primarily
+ for making test modules conveniently executable.
+ """
+ USAGE = """\
+Usage: %(progName)s [-h|--help] [test[:(casename|prefix-)]] [...]
+ %(progName)s - run default set of tests
+ %(progName)s MyTestSuite - run suite 'MyTestSuite'
+ %(progName)s MyTestCase:checkSomething - run MyTestCase.checkSomething
+ %(progName)s MyTestCase:check- - run all 'check*' test methods
+ in MyTestCase
+ def __init__(self, module='__main__', defaultTest=None,
+ argv=None, testRunner=None):
+ if type(module) == type(''):
+ self.module = __import__(module)
+ for part in string.split(module,'.')[1:]:
+ self.module = getattr(self.module, part)
+ else:
+ self.module = module
+ if argv is None:
+ argv = sys.argv
+ self.defaultTest = defaultTest
+ self.testRunner = testRunner
+ self.progName = os.path.basename(argv[0])
+ self.parseArgs(argv)
+ self.createTests()
+ self.runTests()
+ def usageExit(self, msg=None):
+ if msg: print msg
+ print self.USAGE % self.__dict__
+ sys.exit(2)
+ def parseArgs(self, argv):
+ import getopt
+ try:
+ options, args = getopt.getopt(argv[1:], 'hH', ['help'])
+ opts = {}
+ for opt, value in options:
+ if opt in ('-h','-H','--help'):
+ self.usageExit()
+ if len(args) == 0 and self.defaultTest is None:
+ raise getopt.error, "No default test is defined."
+ if len(args) > 0:
+ self.testNames = args
+ else:
+ self.testNames = (self.defaultTest,)
+ except getopt.error, msg:
+ self.usageExit(msg)
+ def createTests(self):
+ tests = []
+ for testName in self.testNames:
+ tests.append(createTestInstance(testName, self.module))
+ self.test = TestSuite(tests)
+ def runTests(self):
+ if self.testRunner is None:
+ self.testRunner = TextTestRunner()
+ result = self.testRunner.run(self.test)
+ sys.exit(not result.wasSuccessful())
+main = TestProgram
+# Executing this module from the command line
+if __name__ == "__main__":
+ main(module=None)