+++ /dev/null
-"""
-TestCmd.py: a testing framework for commands and scripts.
-
-The TestCmd module provides a framework for portable automated testing
-of executable commands and scripts (in any language, not just Python),
-especially commands and scripts that require file system interaction.
-
-In addition to running tests and evaluating conditions, the TestCmd module
-manages and cleans up one or more temporary workspace directories, and
-provides methods for creating files and directories in those workspace
-directories from in-line data, here-documents), allowing tests to be
-completely self-contained.
-
-A TestCmd environment object is created via the usual invocation:
-
- test = TestCmd()
-
-The TestCmd module provides pass_test(), fail_test(), and no_result()
-unbound methods that report test results for use with the Aegis change
-management system. These methods terminate the test immediately,
-reporting PASSED, FAILED, or NO RESULT respectively, and exiting with
-status 0 (success), 1 or 2 respectively. This allows for a distinction
-between an actual failed test and a test that could not be properly
-evaluated because of an external condition (such as a full file system
-or incorrect permissions).
-"""
-
-# Copyright 2000 Steven Knight
-# This module is free software, and you may redistribute it and/or modify
-# it under the same terms as Python itself, so long as this copyright message
-# and disclaimer are retained in their original form.
-#
-# IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
-# SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
-# THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
-# DAMAGE.
-#
-# THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
-# PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
-# AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
-# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
-
-from string import join, split
-
-__author__ = "Steven Knight <knight@baldmt.com>"
-__revision__ = "TestCmd.py 0.D001 2001/01/14 00:43:41 software"
-__version__ = "0.01"
-
-from types import *
-
-import FCNTL
-import os
-import os.path
-import popen2
-import re
-import shutil
-import stat
-import sys
-import tempfile
-import traceback
-
-tempfile.template = 'testcmd.'
-
-_Cleanup = []
-
-def _clean():
- global _Cleanup
- list = _Cleanup[:]
- _Cleanup = []
- list.reverse()
- for test in list:
- test.cleanup()
-
-sys.exitfunc = _clean
-
-def _caller(tblist, skip):
- string = ""
- arr = []
- for file, line, name, text in tblist:
- if file[-10:] == "TestCmd.py":
- break
- arr = [(file, line, name, text)] + arr
- atfrom = "at"
- for file, line, name, text in arr[skip:]:
- if name == "?":
- name = ""
- else:
- name = " (" + name + ")"
- string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name))
- atfrom = "\tfrom"
- return string
-
-def fail_test(self = None, condition = 1, function = None, skip = 0):
- """Cause the test to fail.
-
- By default, the fail_test() method reports that the test FAILED
- and exits with a status of 1. If a condition argument is supplied,
- the test fails only if the condition is true.
- """
- if not condition:
- return
- if not function is None:
- function()
- of = ""
- desc = ""
- sep = " "
- if not self is None:
- if self.program:
- of = " of " + self.program
- sep = "\n\t"
- if self.description:
- desc = " [" + self.description + "]"
- sep = "\n\t"
-
- at = _caller(traceback.extract_stack(), skip)
- sys.stderr.write("FAILED test" + of + desc + sep + at)
-
- sys.exit(1)
-
-def no_result(self = None, condition = 1, function = None, skip = 0):
- """Causes a test to exit with no valid result.
-
- By default, the no_result() method reports NO RESULT for the test
- and exits with a status of 2. If a condition argument is supplied,
- the test fails only if the condition is true.
- """
- if not condition:
- return
- if not function is None:
- function()
- of = ""
- desc = ""
- sep = " "
- if not self is None:
- if self.program:
- of = " of " + self.program
- sep = "\n\t"
- if self.description:
- desc = " [" + self.description + "]"
- sep = "\n\t"
-
- at = _caller(traceback.extract_stack(), skip)
- sys.stderr.write("NO RESULT for test" + of + desc + sep + at)
-
- sys.exit(2)
-
-def pass_test(self = None, condition = 1, function = None):
- """Causes a test to pass.
-
- By default, the pass_test() method reports PASSED for the test
- and exits with a status of 0. If a condition argument is supplied,
- the test passes only if the condition is true.
- """
- if not condition:
- return
- if not function is None:
- function()
- sys.stderr.write("PASSED\n")
- sys.exit(0)
-
-def match_exact(lines = None, matches = None):
- """
- """
- if not type(lines) is ListType:
- lines = split(lines, "\n")
- if not type(matches) is ListType:
- matches = split(matches, "\n")
- if len(lines) != len(matches):
- return
- for i in range(len(lines)):
- if lines[i] != matches[i]:
- return
- return 1
-
-def match_re(lines = None, res = None):
- """
- """
- if not type(lines) is ListType:
- lines = split(lines, "\n")
- if not type(res) is ListType:
- res = split(res, "\n")
- if len(lines) != len(res):
- return
- for i in range(len(lines)):
- if not re.compile("^" + res[i] + "$").search(lines[i]):
- return
- return 1
-
-class TestCmd:
- """Class TestCmd
- """
-
- def __init__(self, description = None,
- program = None,
- interpreter = None,
- workdir = None,
- subdir = None,
- verbose = 0,
- match = None):
- self._cwd = os.getcwd()
- self.description_set(description)
- self.program_set(program)
- self.interpreter_set(interpreter)
- self.verbose_set(verbose)
- if not match is None:
- self.match_func = match
- else:
- self.match_func = match_re
- self._dirlist = []
- self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0}
- if os.environ.has_key('PRESERVE') and not os.environ['PRESERVE'] is '':
- self._preserve['pass_test'] = os.environ['PRESERVE']
- self._preserve['fail_test'] = os.environ['PRESERVE']
- self._preserve['no_result'] = os.environ['PRESERVE']
- else:
- try:
- self._preserve['pass_test'] = os.environ['PRESERVE_PASS']
- except KeyError:
- pass
- try:
- self._preserve['fail_test'] = os.environ['PRESERVE_FAIL']
- except KeyError:
- pass
- try:
- self._preserve['no_result'] = os.environ['PRESERVE_NO_RESULT']
- except KeyError:
- pass
- self._stdout = []
- self._stderr = []
- self.status = None
- self.condition = 'no_result'
- self.workdir_set(workdir)
- self.subdir(subdir)
-
- def __del__(self):
- self.cleanup()
-
- def __repr__(self):
- return "%x" % id(self)
-
- def cleanup(self, condition = None):
- """Removes any temporary working directories for the specified
- TestCmd environment. If the environment variable PRESERVE was
- set when the TestCmd environment was created, temporary working
- directories are not removed. If any of the environment variables
- PRESERVE_PASS, PRESERVE_FAIL, or PRESERVE_NO_RESULT were set
- when the TestCmd environment was created, then temporary working
- directories are not removed if the test passed, failed, or had
- no result, respectively. Temporary working directories are also
- preserved for conditions specified via the preserve method.
-
- Typically, this method is not called directly, but is used when
- the script exits to clean up temporary working directories as
- appropriate for the exit status.
- """
- if not self._dirlist:
- return
- if condition is None:
- condition = self.condition
- #print "cleanup(" + condition + "): ", self._preserve
- if self._preserve[condition]:
- return
- os.chdir(self._cwd)
- self.workdir = None
- list = self._dirlist[:]
- self._dirlist = []
- list.reverse()
- for dir in list:
- self.writable(dir, 1)
- shutil.rmtree(dir, ignore_errors = 1)
- try:
- global _Cleanup
- _Cleanup.remove(self)
- except (AttributeError, ValueError):
- pass
-
- def description_set(self, description):
- """Set the description of the functionality being tested.
- """
- self.description = description
-
-# def diff(self):
-# """Diff two arrays.
-# """
-
- def fail_test(self, condition = 1, function = None, skip = 0):
- """Cause the test to fail.
- """
- if not condition:
- return
- self.condition = 'fail_test'
- fail_test(self = self,
- condition = condition,
- function = function,
- skip = skip)
-
- def interpreter_set(self, interpreter):
- """Set the program to be used to interpret the program
- under test as a script.
- """
- self.interpreter = interpreter
-
- def match(self, lines, matches):
- """Compare actual and expected file contents.
- """
- return self.match_func(lines, matches)
-
- def match_exact(self, lines, matches):
- """Compare actual and expected file contents.
- """
- return match_exact(lines, matches)
-
- def match_re(self, lines, res):
- """Compare actual and expected file contents.
- """
- return match_re(lines, res)
-
- def no_result(self, condition = 1, function = None, skip = 0):
- """Report that the test could not be run.
- """
- if not condition:
- return
- self.condition = 'no_result'
- no_result(self = self,
- condition = condition,
- function = function,
- skip = skip)
-
- def pass_test(self, condition = 1, function = None):
- """Cause the test to pass.
- """
- if not condition:
- return
- self.condition = 'pass_test'
- pass_test(self = self, condition = condition, function = function)
-
- def preserve(self, *conditions):
- """Arrange for the temporary working directories for the
- specified TestCmd environment to be preserved for one or more
- conditions. If no conditions are specified, arranges for
- the temporary working directories to be preserved for all
- conditions.
- """
- if conditions is ():
- conditions = ('pass_test', 'fail_test', 'no_result')
- for cond in conditions:
- self._preserve[cond] = 1
-
- def program_set(self, program):
- """Set the executable program or script to be tested.
- """
- if program and not os.path.isabs(program):
- program = os.path.join(self._cwd, program)
- self.program = program
-
- def read(self, file):
- """Reads and returns the contents of the specified file name.
- The file name may be a list, in which case the elements are
- concatenated with the os.path.join() method. The file is
- assumed to be under the temporary working directory unless it
- is an absolute path name.
- """
- if type(file) is ListType:
- file = apply(os.path.join, tuple(file))
- if not os.path.isabs(file):
- file = os.path.join(self.workdir, file)
- f = os.fdopen(os.open(file, FCNTL.O_RDONLY))
- contents = f.read()
- f.close()
- return contents
-
- def run(self, program = None,
- interpreter = None,
- arguments = None,
- chdir = None,
- stdin = None):
- """Runs a test of the program or script for the test
- environment. Standard output and error output are saved for
- future retrieval via the stdout() and stderr() methods.
- """
- if chdir:
- oldcwd = os.getcwd()
- if not os.path.isabs(chdir):
- chdir = os.path.join(self.workpath(chdir))
- if self.verbose:
- sys.stderr.write("chdir(" + chdir + ")\n")
- os.chdir(chdir)
- cmd = None
- if program:
- if not os.path.isabs(program):
- program = os.path.join(self._cwd, program)
- cmd = program
- if interpreter:
- cmd = interpreter + " " + cmd
- else:
- cmd = self.program
- if self.interpreter:
- cmd = self.interpreter + " " + cmd
- if arguments:
- cmd = cmd + " " + arguments
- if self.verbose:
- sys.stderr.write(cmd + "\n")
- p = popen2.Popen3(cmd, 1)
- if stdin:
- if type(stdin) is ListType:
- for line in stdin:
- p.tochild.write(line)
- else:
- p.tochild.write(stdin)
- p.tochild.close()
- self._stdout.append(p.fromchild.read())
- self._stderr.append(p.childerr.read())
- self.status = p.wait()
- if chdir:
- os.chdir(oldcwd)
-
- def stderr(self, run = None):
- """Returns the error output from the specified run number.
- If there is no specified run number, then returns the error
- output of the last run. If the run number is less than zero,
- then returns the error output from that many runs back from the
- current run.
- """
- if not run:
- run = len(self._stderr)
- elif run < 0:
- run = len(self._stderr) + run
- run = run - 1
- return self._stderr[run]
-
- def stdout(self, run = None):
- """Returns the standard output from the specified run number.
- If there is no specified run number, then returns the standard
- output of the last run. If the run number is less than zero,
- then returns the standard output from that many runs back from
- the current run.
- """
- if not run:
- run = len(self._stdout)
- elif run < 0:
- run = len(self._stdout) + run
- run = run - 1
- return self._stdout[run]
-
- def subdir(self, *subdirs):
- """Create new subdirectories under the temporary working
- directory, one for each argument. An argument may be a list,
- in which case the list elements are concatenated using the
- os.path.join() method. Subdirectories multiple levels deep
- must be created using a separate argument for each level:
-
- test.subdir('sub', ['sub', 'dir'], ['sub', 'dir', 'ectory'])
-
- Returns the number of subdirectories actually created.
- """
- count = 0
- for sub in subdirs:
- if sub is None:
- continue
- if type(sub) is ListType:
- sub = apply(os.path.join, tuple(sub))
- new = os.path.join(self.workdir, sub)
- try:
- os.mkdir(new)
- except:
- pass
- else:
- count = count + 1
- return count
-
- def verbose_set(self, verbose):
- """Set the verbose level.
- """
- self.verbose = verbose
-
- def workdir_set(self, path):
- """Creates a temporary working directory with the specified
- path name. If the path is a null string (''), a unique
- directory name is created.
- """
- if (path != None):
- if path == '':
- path = tempfile.mktemp()
- if path != None:
- os.mkdir(path)
- self._dirlist.append(path)
- global _Cleanup
- try:
- _Cleanup.index(self)
- except ValueError:
- _Cleanup.append(self)
- # We'd like to set self.workdir like this:
- # self.workdir = path
- # But symlinks in the path will report things
- # differently from os.getcwd(), so chdir there
- # and back to fetch the canonical path.
- cwd = os.getcwd()
- os.chdir(path)
- self.workdir = os.getcwd()
- os.chdir(cwd)
- else:
- self.workdir = None
-
- def workpath(self, *args):
- """Returns the absolute path name to a subdirectory or file
- within the current temporary working directory. Concatenates
- the temporary working directory name with the specified
- arguments using the os.path.join() method.
- """
- return apply(os.path.join, (self.workdir,) + tuple(args))
-
- def writable(self, top, write):
- """Make the specified directory tree writable (write == 1)
- or not (write == None).
- """
-
- def _walk_chmod(arg, dirname, names):
- st = os.stat(dirname)
- os.chmod(dirname, arg(st[stat.ST_MODE]))
- for name in names:
- n = os.path.join(dirname, name)
- st = os.stat(n)
- os.chmod(n, arg(st[stat.ST_MODE]))
-
- def _mode_writable(mode):
- return stat.S_IMODE(mode|0200)
-
- def _mode_non_writable(mode):
- return stat.S_IMODE(mode&~0200)
-
- if write:
- f = _mode_writable
- else:
- f = _mode_non_writable
- os.path.walk(top, _walk_chmod, f)
-
- def write(self, file, content):
- """Writes the specified content text (second argument) to the
- specified file name (first argument). The file name may be
- a list, in which case the elements are concatenated with the
- os.path.join() method. The file is created under the temporary
- working directory. Any subdirectories in the path must already
- exist. """
- if type(file) is ListType:
- file = apply(os.path.join, tuple(file))
- if not os.path.isabs(file):
- file = os.path.join(self.workdir, file)
- fd = os.open(file, FCNTL.O_CREAT|FCNTL.O_WRONLY)
- os.write(fd, content)
- os.close(fd)