2 TestCmd.py: a testing framework for commands and scripts.
4 The TestCmd module provides a framework for portable automated testing
5 of executable commands and scripts (in any language, not just Python),
6 especially commands and scripts that require file system interaction.
8 In addition to running tests and evaluating conditions, the TestCmd
9 module manages and cleans up one or more temporary workspace
10 directories, and provides methods for creating files and directories in
11 those workspace directories from in-line data, here-documents), allowing
12 tests to be completely self-contained.
14 A TestCmd environment object is created via the usual invocation:
17 test = TestCmd.TestCmd()
19 There are a bunch of keyword arguments available at instantiation:
21 test = TestCmd.TestCmd(description = 'string',
22 program = 'program_or_script_to_test',
23 interpreter = 'script_interpreter',
27 match = default_match_function,
28 diff = default_diff_function,
31 There are a bunch of methods that let you do different things:
35 test.description_set('string')
37 test.program_set('program_or_script_to_test')
39 test.interpreter_set('script_interpreter')
40 test.interpreter_set(['script_interpreter', 'arg'])
42 test.workdir_set('prefix')
46 test.workpath('subdir', 'file')
48 test.subdir('subdir', ...)
50 test.rmdir('subdir', ...)
52 test.write('file', "contents\n")
53 test.write(['subdir', 'file'], "contents\n")
56 test.read(['subdir', 'file'])
57 test.read('file', mode)
58 test.read(['subdir', 'file'], mode)
60 test.writable('dir', 1)
61 test.writable('dir', None)
63 test.preserve(condition, ...)
65 test.cleanup(condition)
67 test.command_args(program = 'program_or_script_to_run',
68 interpreter = 'script_interpreter',
69 arguments = 'arguments to pass to program')
71 test.run(program = 'program_or_script_to_run',
72 interpreter = 'script_interpreter',
73 arguments = 'arguments to pass to program',
74 chdir = 'directory_to_chdir_to',
75 stdin = 'input to feed to the program\n')
76 universal_newlines = True)
78 p = test.start(program = 'program_or_script_to_run',
79 interpreter = 'script_interpreter',
80 arguments = 'arguments to pass to program',
81 universal_newlines = None)
86 test.pass_test(condition)
87 test.pass_test(condition, function)
90 test.fail_test(condition)
91 test.fail_test(condition, function)
92 test.fail_test(condition, function, skip)
95 test.no_result(condition)
96 test.no_result(condition, function)
97 test.no_result(condition, function, skip)
105 test.symlink(target, link)
108 test.banner(string, width)
110 test.diff(actual, expected)
112 test.match(actual, expected)
114 test.match_exact("actual 1\nactual 2\n", "expected 1\nexpected 2\n")
115 test.match_exact(["actual 1\n", "actual 2\n"],
116 ["expected 1\n", "expected 2\n"])
118 test.match_re("actual 1\nactual 2\n", regex_string)
119 test.match_re(["actual 1\n", "actual 2\n"], list_of_regexes)
121 test.match_re_dotall("actual 1\nactual 2\n", regex_string)
122 test.match_re_dotall(["actual 1\n", "actual 2\n"], list_of_regexes)
125 test.tempdir('temporary-directory')
131 test.where_is('foo', 'PATH1:PATH2')
132 test.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
135 test.unlink('subdir', 'file')
137 The TestCmd module provides pass_test(), fail_test(), and no_result()
138 unbound functions that report test results for use with the Aegis change
139 management system. These methods terminate the test immediately,
140 reporting PASSED, FAILED, or NO RESULT respectively, and exiting with
141 status 0 (success), 1 or 2 respectively. This allows for a distinction
142 between an actual failed test and a test that could not be properly
143 evaluated because of an external condition (such as a full file system
144 or incorrect permissions).
149 TestCmd.pass_test(condition)
150 TestCmd.pass_test(condition, function)
153 TestCmd.fail_test(condition)
154 TestCmd.fail_test(condition, function)
155 TestCmd.fail_test(condition, function, skip)
158 TestCmd.no_result(condition)
159 TestCmd.no_result(condition, function)
160 TestCmd.no_result(condition, function, skip)
162 The TestCmd module also provides unbound functions that handle matching
163 in the same way as the match_*() methods described above.
167 test = TestCmd.TestCmd(match = TestCmd.match_exact)
169 test = TestCmd.TestCmd(match = TestCmd.match_re)
171 test = TestCmd.TestCmd(match = TestCmd.match_re_dotall)
173 The TestCmd module provides unbound functions that can be used for the
174 "diff" argument to TestCmd.TestCmd instantiation:
178 test = TestCmd.TestCmd(match = TestCmd.match_re,
179 diff = TestCmd.diff_re)
181 test = TestCmd.TestCmd(diff = TestCmd.simple_diff)
183 The "diff" argument can also be used with standard difflib functions:
187 test = TestCmd.TestCmd(diff = difflib.context_diff)
189 test = TestCmd.TestCmd(diff = difflib.unified_diff)
191 Lastly, the where_is() method also exists in an unbound function
196 TestCmd.where_is('foo')
197 TestCmd.where_is('foo', 'PATH1:PATH2')
198 TestCmd.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
201 # Copyright 2000-2010 Steven Knight
202 # This module is free software, and you may redistribute it and/or modify
203 # it under the same terms as Python itself, so long as this copyright message
204 # and disclaimer are retained in their original form.
206 # IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
207 # SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
208 # THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
211 # THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
212 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
213 # PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
214 # AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
215 # SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
216 from __future__ import generators ### KEEP FOR COMPATIBILITY FIXERS
218 __author__ = "Steven Knight <knight at baldmt dot com>"
219 __revision__ = "TestCmd.py 0.37.D001 2010/01/11 16:55:50 knight"
235 # pre-2.7 doesn't have the memoryview() built-in
239 from types import SliceType
240 def __init__(self, obj):
241 # wrapping buffer in () keeps the fixer from changing it
242 self.obj = (buffer)(obj)
243 def __getitem__(self, indx):
244 if isinstance(indx, self.SliceType):
245 return self.obj[indx.start:indx.stop]
247 return self.obj[indx]
264 __all__.append('simple_diff')
267 return isinstance(e, list) \
268 or isinstance(e, UserList.UserList)
271 from UserString import UserString
279 return isinstance(e, str) or isinstance(e, UserString)
282 return isinstance(e, str) \
283 or isinstance(e, unicode) \
284 or isinstance(e, UserString)
286 tempfile.template = 'testcmd.'
287 if os.name in ('posix', 'nt'):
288 tempfile.template = 'testcmd.' + str(os.getpid()) + '.'
290 tempfile.template = 'testcmd.'
292 re_space = re.compile('\s')
296 _chain_to_exitfunc = None
300 cleanlist = [_f for _f in _Cleanup if _f]
303 for test in cleanlist:
305 if _chain_to_exitfunc:
311 # TODO(1.5): atexit requires python 2.0, so chain sys.exitfunc
313 _chain_to_exitfunc = sys.exitfunc
314 except AttributeError:
316 sys.exitfunc = _clean
318 atexit.register(_clean)
321 def __init__(self, top):
323 def __call__(self, arg, dirname, names):
324 pathjoin = lambda n: os.path.join(dirname, n)
325 self.entries.extend(list(map(pathjoin, names)))
327 def _caller(tblist, skip):
330 for file, line, name, text in tblist:
331 if file[-10:] == "TestCmd.py":
333 arr = [(file, line, name, text)] + arr
335 for file, line, name, text in arr[skip:]:
336 if name in ("?", "<module>"):
339 name = " (" + name + ")"
340 string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name))
344 def fail_test(self = None, condition = 1, function = None, skip = 0):
345 """Cause the test to fail.
347 By default, the fail_test() method reports that the test FAILED
348 and exits with a status of 1. If a condition argument is supplied,
349 the test fails only if the condition is true.
353 if not function is None:
360 of = " of " + self.program
363 desc = " [" + self.description + "]"
366 at = _caller(traceback.extract_stack(), skip)
367 sys.stderr.write("FAILED test" + of + desc + sep + at)
371 def no_result(self = None, condition = 1, function = None, skip = 0):
372 """Causes a test to exit with no valid result.
374 By default, the no_result() method reports NO RESULT for the test
375 and exits with a status of 2. If a condition argument is supplied,
376 the test fails only if the condition is true.
380 if not function is None:
387 of = " of " + self.program
390 desc = " [" + self.description + "]"
393 at = _caller(traceback.extract_stack(), skip)
394 sys.stderr.write("NO RESULT for test" + of + desc + sep + at)
398 def pass_test(self = None, condition = 1, function = None):
399 """Causes a test to pass.
401 By default, the pass_test() method reports PASSED for the test
402 and exits with a status of 0. If a condition argument is supplied,
403 the test passes only if the condition is true.
407 if not function is None:
409 sys.stderr.write("PASSED\n")
412 def match_exact(lines = None, matches = None):
415 if not is_List(lines):
416 lines = lines.split("\n")
417 if not is_List(matches):
418 matches = matches.split("\n")
419 if len(lines) != len(matches):
421 for i in range(len(lines)):
422 if lines[i] != matches[i]:
426 def match_re(lines = None, res = None):
429 if not is_List(lines):
430 lines = lines.split("\n")
432 res = res.split("\n")
433 if len(lines) != len(res):
435 for i in range(len(lines)):
436 s = "^" + res[i] + "$"
440 msg = "Regular expression error in %s: %s"
441 raise re.error, msg % (repr(s), e[0])
442 if not expr.search(lines[i]):
446 def match_re_dotall(lines = None, res = None):
449 if not isinstance(lines, str):
450 lines = "\n".join(lines)
451 if not isinstance(res, str):
455 expr = re.compile(s, re.DOTALL)
457 msg = "Regular expression error in %s: %s"
458 raise re.error, msg % (repr(s), e[0])
459 if expr.match(lines):
467 def simple_diff(a, b, fromfile='', tofile='',
468 fromfiledate='', tofiledate='', n=3, lineterm='\n'):
470 A function with the same calling signature as difflib.context_diff
471 (diff -c) and difflib.unified_diff (diff -u) but which prints
472 output like the simple, unadorned 'diff" command.
474 sm = difflib.SequenceMatcher(None, a, b)
476 return x1+1 == x2 and str(x2) or '%s,%s' % (x1+1, x2)
478 for op, a1, a2, b1, b2 in sm.get_opcodes():
480 result.append("%sd%d" % (comma(a1, a2), b1))
481 result.extend(['< ' + l for l in a[a1:a2]])
483 result.append("%da%s" % (a1, comma(b1, b2)))
484 result.extend(['> ' + l for l in b[b1:b2]])
485 elif op == 'replace':
486 result.append("%sc%s" % (comma(a1, a2), comma(b1, b2)))
487 result.extend(['< ' + l for l in a[a1:a2]])
489 result.extend(['> ' + l for l in b[b1:b2]])
492 def diff_re(a, b, fromfile='', tofile='',
493 fromfiledate='', tofiledate='', n=3, lineterm='\n'):
495 A simple "diff" of two sets of lines when the expected lines
496 are regular expressions. This is a really dumb thing that
497 just compares each line in turn, so it doesn't look for
498 chunks of matching lines and the like--but at least it lets
499 you know exactly which line first didn't compare correctl...
502 diff = len(a) - len(b)
508 for aline, bline in zip(a, b):
509 s = "^" + aline + "$"
513 msg = "Regular expression error in %s: %s"
514 raise re.error, msg % (repr(s), e[0])
515 if not expr.search(bline):
516 result.append("%sc%s" % (i+1, i+1))
517 result.append('< ' + repr(a[i]))
519 result.append('> ' + repr(b[i]))
523 if os.name == 'java':
525 python_executable = os.path.join(sys.prefix, 'jython')
529 python_executable = sys.executable
531 if sys.platform == 'win32':
533 default_sleep_seconds = 2
535 def where_is(file, path=None, pathext=None):
537 path = os.environ['PATH']
539 path = path.split(os.pathsep)
541 pathext = os.environ['PATHEXT']
542 if is_String(pathext):
543 pathext = pathext.split(os.pathsep)
545 if ext.lower() == file[-len(ext):].lower():
549 f = os.path.join(dir, file)
552 if os.path.isfile(fext):
558 def where_is(file, path=None, pathext=None):
560 path = os.environ['PATH']
562 path = path.split(os.pathsep)
564 f = os.path.join(dir, file)
565 if os.path.isfile(f):
570 if stat.S_IMODE(st[stat.ST_MODE]) & 0111:
574 default_sleep_seconds = 1
581 # The subprocess module doesn't exist in this version of Python,
582 # so we're going to cobble up something that looks just enough
583 # like its API for our purposes below.
586 subprocess = new.module('subprocess')
588 subprocess.PIPE = 'PIPE'
589 subprocess.STDOUT = 'STDOUT'
590 subprocess.mswindows = (sys.platform == 'win32')
595 except AttributeError:
597 universal_newlines = 1
598 def __init__(self, command, **kw):
599 if sys.platform == 'win32' and command[0] == '"':
600 command = '"' + command + '"'
601 (stdin, stdout, stderr) = os.popen3(' ' + command)
605 def close_output(self):
607 self.resultcode = self.stderr.close()
609 resultcode = self.resultcode
610 if os.WIFEXITED(resultcode):
611 return os.WEXITSTATUS(resultcode)
612 elif os.WIFSIGNALED(resultcode):
613 return os.WTERMSIG(resultcode)
620 except AttributeError:
621 # A cribbed Popen4 class, with some retrofitted code from
622 # the Python 1.5 Popen3 class methods to do certain things
624 class Popen4(popen2.Popen3):
627 def __init__(self, cmd, bufsize=-1):
628 p2cread, p2cwrite = os.pipe()
629 c2pread, c2pwrite = os.pipe()
636 for i in range(3, popen2.MAXFD):
641 os.execvp(cmd[0], cmd)
644 # Shouldn't come here, I guess
647 self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
649 self.fromchild = os.fdopen(c2pread, 'r', bufsize)
650 popen2._active.append(self)
652 popen2.Popen4 = Popen4
654 class Popen3(popen2.Popen3, popen2.Popen4):
655 universal_newlines = 1
656 def __init__(self, command, **kw):
657 if kw.get('stderr') == 'STDOUT':
658 popen2.Popen4.__init__(self, command, 1)
660 popen2.Popen3.__init__(self, command, 1)
661 self.stdin = self.tochild
662 self.stdout = self.fromchild
663 self.stderr = self.childerr
664 def wait(self, *args, **kw):
665 resultcode = popen2.Popen3.wait(self, *args, **kw)
666 if os.WIFEXITED(resultcode):
667 return os.WEXITSTATUS(resultcode)
668 elif os.WIFSIGNALED(resultcode):
669 return os.WTERMSIG(resultcode)
673 subprocess.Popen = Popen3
677 # From Josiah Carlson,
678 # ASPN : Python Cookbook : Module to allow Asynchronous subprocess use on Windows and Posix platforms
679 # http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440554
681 PIPE = subprocess.PIPE
683 if subprocess.mswindows:
684 from win32file import ReadFile, WriteFile
685 from win32pipe import PeekNamedPipe
692 except AttributeError: fcntl.F_GETFL = 3
695 except AttributeError: fcntl.F_SETFL = 4
697 class Popen(subprocess.Popen):
698 def recv(self, maxsize=None):
699 return self._recv('stdout', maxsize)
701 def recv_err(self, maxsize=None):
702 return self._recv('stderr', maxsize)
704 def send_recv(self, input='', maxsize=None):
705 return self.send(input), self.recv(maxsize), self.recv_err(maxsize)
707 def get_conn_maxsize(self, which, maxsize):
712 return getattr(self, which), maxsize
714 def _close(self, which):
715 getattr(self, which).close()
716 setattr(self, which, None)
718 if subprocess.mswindows:
719 def send(self, input):
724 x = msvcrt.get_osfhandle(self.stdin.fileno())
725 (errCode, written) = WriteFile(x, input)
727 return self._close('stdin')
728 except (subprocess.pywintypes.error, Exception), why:
729 if why[0] in (109, errno.ESHUTDOWN):
730 return self._close('stdin')
735 def _recv(self, which, maxsize):
736 conn, maxsize = self.get_conn_maxsize(which, maxsize)
741 x = msvcrt.get_osfhandle(conn.fileno())
742 (read, nAvail, nMessage) = PeekNamedPipe(x, 0)
746 (errCode, read) = ReadFile(x, nAvail, None)
748 return self._close(which)
749 except (subprocess.pywintypes.error, Exception), why:
750 if why[0] in (109, errno.ESHUTDOWN):
751 return self._close(which)
754 #if self.universal_newlines:
755 # read = self._translate_newlines(read)
759 def send(self, input):
763 if not select.select([], [self.stdin], [], 0)[1]:
767 written = os.write(self.stdin.fileno(), input)
769 if why[0] == errno.EPIPE: #broken pipe
770 return self._close('stdin')
775 def _recv(self, which, maxsize):
776 conn, maxsize = self.get_conn_maxsize(which, maxsize)
781 flags = fcntl.fcntl(conn, fcntl.F_GETFL)
786 fcntl.fcntl(conn, fcntl.F_SETFL, flags| os.O_NONBLOCK)
789 if not select.select([conn], [], [], 0)[0]:
792 r = conn.read(maxsize)
794 return self._close(which)
796 #if self.universal_newlines:
797 # r = self._translate_newlines(r)
800 if not conn.closed and not flags is None:
801 fcntl.fcntl(conn, fcntl.F_SETFL, flags)
803 disconnect_message = "Other end disconnected!"
805 def recv_some(p, t=.1, e=1, tr=5, stderr=0):
814 while time.time() < x or r:
818 raise Exception(disconnect_message)
824 time.sleep(max((x-time.time())/tr, 0))
827 def send_all(p, data):
831 raise Exception(disconnect_message)
832 data = memoryview(data)[sent:]
844 class TestCmd(object):
848 def __init__(self, description = None,
857 universal_newlines = 1):
858 self._cwd = os.getcwd()
859 self.description_set(description)
860 self.program_set(program)
861 self.interpreter_set(interpreter)
864 verbose = max( 0, int(os.environ.get('TESTCMD_VERBOSE', 0)) )
867 self.verbose_set(verbose)
868 self.combine = combine
869 self.universal_newlines = universal_newlines
870 if not match is None:
871 self.match_function = match
873 self.match_function = match_re
875 self.diff_function = diff
882 self.diff_function = simple_diff
883 #self.diff_function = difflib.context_diff
884 #self.diff_function = difflib.unified_diff
886 self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0}
887 if 'PRESERVE' in os.environ and not os.environ['PRESERVE'] is '':
888 self._preserve['pass_test'] = os.environ['PRESERVE']
889 self._preserve['fail_test'] = os.environ['PRESERVE']
890 self._preserve['no_result'] = os.environ['PRESERVE']
893 self._preserve['pass_test'] = os.environ['PRESERVE_PASS']
897 self._preserve['fail_test'] = os.environ['PRESERVE_FAIL']
901 self._preserve['no_result'] = os.environ['PRESERVE_NO_RESULT']
907 self.condition = 'no_result'
908 self.workdir_set(workdir)
915 return "%x" % id(self)
920 def banner(self, s, width=None):
922 width = self.banner_width
923 return s + self.banner_char * (width - len(s))
925 if os.name == 'posix':
927 def escape(self, arg):
928 "escape shell special characters"
932 arg = arg.replace(slash, slash+slash)
934 arg = arg.replace(c, slash+c)
936 if re_space.search(arg):
937 arg = '"' + arg + '"'
942 # Windows does not allow special characters in file names
943 # anyway, so no need for an escape function, we will just quote
945 def escape(self, arg):
946 if re_space.search(arg):
947 arg = '"' + arg + '"'
950 def canonicalize(self, path):
952 path = os.path.join(*tuple(path))
953 if not os.path.isabs(path):
954 path = os.path.join(self.workdir, path)
957 def chmod(self, path, mode):
958 """Changes permissions on the specified file or directory
960 path = self.canonicalize(path)
963 def cleanup(self, condition = None):
964 """Removes any temporary working directories for the specified
965 TestCmd environment. If the environment variable PRESERVE was
966 set when the TestCmd environment was created, temporary working
967 directories are not removed. If any of the environment variables
968 PRESERVE_PASS, PRESERVE_FAIL, or PRESERVE_NO_RESULT were set
969 when the TestCmd environment was created, then temporary working
970 directories are not removed if the test passed, failed, or had
971 no result, respectively. Temporary working directories are also
972 preserved for conditions specified via the preserve method.
974 Typically, this method is not called directly, but is used when
975 the script exits to clean up temporary working directories as
976 appropriate for the exit status.
978 if not self._dirlist:
982 if condition is None:
983 condition = self.condition
984 if self._preserve[condition]:
985 for dir in self._dirlist:
986 print "Preserved directory", dir
988 list = self._dirlist[:]
991 self.writable(dir, 1)
992 shutil.rmtree(dir, ignore_errors = 1)
997 _Cleanup.remove(self)
998 except (AttributeError, ValueError):
1001 def command_args(self, program = None,
1005 if isinstance(program, str) and not os.path.isabs(program):
1006 program = os.path.join(self._cwd, program)
1008 program = self.program
1010 interpreter = self.interpreter
1011 if not type(program) in [list, tuple]:
1015 if not type(interpreter) in [list, tuple]:
1016 interpreter = [interpreter]
1017 cmd = list(interpreter) + cmd
1019 if isinstance(arguments, str):
1020 arguments = arguments.split()
1021 cmd.extend(arguments)
1024 def description_set(self, description):
1025 """Set the description of the functionality being tested.
1027 self.description = description
1032 def diff(self, a, b, name, *args, **kw):
1033 print self.banner('Expected %s' % name)
1035 print self.banner('Actual %s' % name)
1038 def diff(self, a, b, name, *args, **kw):
1039 print self.banner(name)
1040 args = (a.splitlines(), b.splitlines()) + args
1041 lines = self.diff_function(*args, **kw)
1045 def fail_test(self, condition = 1, function = None, skip = 0):
1046 """Cause the test to fail.
1050 self.condition = 'fail_test'
1051 fail_test(self = self,
1052 condition = condition,
1053 function = function,
1056 def interpreter_set(self, interpreter):
1057 """Set the program to be used to interpret the program
1058 under test as a script.
1060 self.interpreter = interpreter
1062 def match(self, lines, matches):
1063 """Compare actual and expected file contents.
1065 return self.match_function(lines, matches)
1067 def match_exact(self, lines, matches):
1068 """Compare actual and expected file contents.
1070 return match_exact(lines, matches)
1072 def match_re(self, lines, res):
1073 """Compare actual and expected file contents.
1075 return match_re(lines, res)
1077 def match_re_dotall(self, lines, res):
1078 """Compare actual and expected file contents.
1080 return match_re_dotall(lines, res)
1082 def no_result(self, condition = 1, function = None, skip = 0):
1083 """Report that the test could not be run.
1087 self.condition = 'no_result'
1088 no_result(self = self,
1089 condition = condition,
1090 function = function,
1093 def pass_test(self, condition = 1, function = None):
1094 """Cause the test to pass.
1098 self.condition = 'pass_test'
1099 pass_test(self = self, condition = condition, function = function)
1101 def preserve(self, *conditions):
1102 """Arrange for the temporary working directories for the
1103 specified TestCmd environment to be preserved for one or more
1104 conditions. If no conditions are specified, arranges for
1105 the temporary working directories to be preserved for all
1108 if conditions is ():
1109 conditions = ('pass_test', 'fail_test', 'no_result')
1110 for cond in conditions:
1111 self._preserve[cond] = 1
1113 def program_set(self, program):
1114 """Set the executable program or script to be tested.
1116 if program and not os.path.isabs(program):
1117 program = os.path.join(self._cwd, program)
1118 self.program = program
1120 def read(self, file, mode = 'rb'):
1121 """Reads and returns the contents of the specified file name.
1122 The file name may be a list, in which case the elements are
1123 concatenated with the os.path.join() method. The file is
1124 assumed to be under the temporary working directory unless it
1125 is an absolute path name. The I/O mode for the file may
1126 be specified; it must begin with an 'r'. The default is
1129 file = self.canonicalize(file)
1131 raise ValueError, "mode must begin with 'r'"
1132 return open(file, mode).read()
1134 def rmdir(self, dir):
1135 """Removes the specified dir name.
1136 The dir name may be a list, in which case the elements are
1137 concatenated with the os.path.join() method. The dir is
1138 assumed to be under the temporary working directory unless it
1139 is an absolute path name.
1140 The dir must be empty.
1142 dir = self.canonicalize(dir)
1145 def start(self, program = None,
1148 universal_newlines = None,
1151 Starts a program or script for the test environment.
1153 The specified program will have the original directory
1154 prepended unless it is enclosed in a [list].
1156 cmd = self.command_args(program, interpreter, arguments)
1157 cmd_string = ' '.join(map(self.escape, cmd))
1159 sys.stderr.write(cmd_string + "\n")
1160 if universal_newlines is None:
1161 universal_newlines = self.universal_newlines
1163 # On Windows, if we make stdin a pipe when we plan to send
1164 # no input, and the test program exits before
1165 # Popen calls msvcrt.open_osfhandle, that call will fail.
1166 # So don't use a pipe for stdin if we don't need one.
1167 stdin = kw.get('stdin', None)
1168 if stdin is not None:
1169 stdin = subprocess.PIPE
1171 combine = kw.get('combine', self.combine)
1173 stderr_value = subprocess.STDOUT
1175 stderr_value = subprocess.PIPE
1179 stdout=subprocess.PIPE,
1180 stderr=stderr_value,
1181 universal_newlines=universal_newlines)
1183 def finish(self, popen, **kw):
1185 Finishes and waits for the process being run under control of
1186 the specified popen argument, recording the exit status,
1187 standard output and error output.
1190 self.status = popen.wait()
1193 self._stdout.append(popen.stdout.read())
1195 stderr = popen.stderr.read()
1198 self._stderr.append(stderr)
1200 def run(self, program = None,
1205 universal_newlines = None):
1206 """Runs a test of the program or script for the test
1207 environment. Standard output and error output are saved for
1208 future retrieval via the stdout() and stderr() methods.
1210 The specified program will have the original directory
1211 prepended unless it is enclosed in a [list].
1214 oldcwd = os.getcwd()
1215 if not os.path.isabs(chdir):
1216 chdir = os.path.join(self.workpath(chdir))
1218 sys.stderr.write("chdir(" + chdir + ")\n")
1220 p = self.start(program,
1230 p.stdin.write(stdin)
1233 out = p.stdout.read()
1234 if p.stderr is None:
1237 err = p.stderr.read()
1239 close_output = p.close_output
1240 except AttributeError:
1242 if not p.stderr is None:
1247 self._stdout.append(out)
1248 self._stderr.append(err)
1250 self.status = p.wait()
1256 if self.verbose >= 2:
1257 write = sys.stdout.write
1258 write('============ STATUS: %d\n' % self.status)
1260 if out or self.verbose >= 3:
1261 write('============ BEGIN STDOUT (len=%d):\n' % len(out))
1263 write('============ END STDOUT\n')
1265 if err or self.verbose >= 3:
1266 write('============ BEGIN STDERR (len=%d)\n' % len(err))
1268 write('============ END STDERR\n')
1270 def sleep(self, seconds = default_sleep_seconds):
1271 """Sleeps at least the specified number of seconds. If no
1272 number is specified, sleeps at least the minimum number of
1273 seconds necessary to advance file time stamps on the current
1274 system. Sleeping more seconds is all right.
1278 def stderr(self, run = None):
1279 """Returns the error output from the specified run number.
1280 If there is no specified run number, then returns the error
1281 output of the last run. If the run number is less than zero,
1282 then returns the error output from that many runs back from the
1286 run = len(self._stderr)
1288 run = len(self._stderr) + run
1290 return self._stderr[run]
1292 def stdout(self, run = None):
1293 """Returns the standard output from the specified run number.
1294 If there is no specified run number, then returns the standard
1295 output of the last run. If the run number is less than zero,
1296 then returns the standard output from that many runs back from
1300 run = len(self._stdout)
1302 run = len(self._stdout) + run
1304 return self._stdout[run]
1306 def subdir(self, *subdirs):
1307 """Create new subdirectories under the temporary working
1308 directory, one for each argument. An argument may be a list,
1309 in which case the list elements are concatenated using the
1310 os.path.join() method. Subdirectories multiple levels deep
1311 must be created using a separate argument for each level:
1313 test.subdir('sub', ['sub', 'dir'], ['sub', 'dir', 'ectory'])
1315 Returns the number of subdirectories actually created.
1322 sub = os.path.join(*tuple(sub))
1323 new = os.path.join(self.workdir, sub)
1332 def symlink(self, target, link):
1333 """Creates a symlink to the specified target.
1334 The link name may be a list, in which case the elements are
1335 concatenated with the os.path.join() method. The link is
1336 assumed to be under the temporary working directory unless it
1337 is an absolute path name. The target is *not* assumed to be
1338 under the temporary working directory.
1340 link = self.canonicalize(link)
1341 os.symlink(target, link)
1343 def tempdir(self, path=None):
1344 """Creates a temporary directory.
1345 A unique directory name is generated if no path name is specified.
1346 The directory is created, and will be removed when the TestCmd
1347 object is destroyed.
1351 path = tempfile.mktemp(prefix=tempfile.template)
1353 path = tempfile.mktemp()
1356 # Symlinks in the path will report things
1357 # differently from os.getcwd(), so chdir there
1358 # and back to fetch the canonical path.
1366 # Uppercase the drive letter since the case of drive
1367 # letters is pretty much random on win32:
1368 drive,rest = os.path.splitdrive(path)
1370 path = drive.upper() + rest
1373 self._dirlist.append(path)
1376 _Cleanup.index(self)
1378 _Cleanup.append(self)
1382 def touch(self, path, mtime=None):
1383 """Updates the modification time on the specified file or
1384 directory path name. The default is to update to the
1385 current time if no explicit modification time is specified.
1387 path = self.canonicalize(path)
1388 atime = os.path.getatime(path)
1391 os.utime(path, (atime, mtime))
1393 def unlink(self, file):
1394 """Unlinks the specified file name.
1395 The file name may be a list, in which case the elements are
1396 concatenated with the os.path.join() method. The file is
1397 assumed to be under the temporary working directory unless it
1398 is an absolute path name.
1400 file = self.canonicalize(file)
1403 def verbose_set(self, verbose):
1404 """Set the verbose level.
1406 self.verbose = verbose
1408 def where_is(self, file, path=None, pathext=None):
1409 """Find an executable file.
1412 file = os.path.join(*tuple(file))
1413 if not os.path.isabs(file):
1414 file = where_is(file, path, pathext)
1417 def workdir_set(self, path):
1418 """Creates a temporary working directory with the specified
1419 path name. If the path is a null string (''), a unique
1420 directory name is created.
1425 path = self.tempdir(path)
1428 def workpath(self, *args):
1429 """Returns the absolute path name to a subdirectory or file
1430 within the current temporary working directory. Concatenates
1431 the temporary working directory name with the specified
1432 arguments using the os.path.join() method.
1434 return os.path.join(self.workdir, *tuple(args))
1436 def readable(self, top, read=1):
1437 """Make the specified directory tree readable (read == 1)
1438 or not (read == None).
1440 This method has no effect on Windows systems, which use a
1441 completely different mechanism to control file readability.
1444 if sys.platform == 'win32':
1448 def do_chmod(fname):
1449 try: st = os.stat(fname)
1450 except OSError: pass
1451 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IREAD))
1453 def do_chmod(fname):
1454 try: st = os.stat(fname)
1455 except OSError: pass
1456 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IREAD))
1458 if os.path.isfile(top):
1459 # If it's a file, that's easy, just chmod it.
1462 # It's a directory and we're trying to turn on read
1463 # permission, so it's also pretty easy, just chmod the
1464 # directory and then chmod every entry on our walk down the
1465 # tree. Because os.path.walk() is top-down, we'll enable
1466 # read permission on any directories that have it disabled
1467 # before os.path.walk() tries to list their contents.
1470 def chmod_entries(arg, dirname, names, do_chmod=do_chmod):
1472 do_chmod(os.path.join(dirname, n))
1474 os.path.walk(top, chmod_entries, None)
1476 # It's a directory and we're trying to turn off read
1477 # permission, which means we have to chmod the directoreis
1478 # in the tree bottom-up, lest disabling read permission from
1479 # the top down get in the way of being able to get at lower
1480 # parts of the tree. But os.path.walk() visits things top
1481 # down, so we just use an object to collect a list of all
1482 # of the entries in the tree, reverse the list, and then
1483 # chmod the reversed (bottom-up) list.
1484 col = Collector(top)
1485 os.path.walk(top, col, None)
1486 col.entries.reverse()
1487 for d in col.entries: do_chmod(d)
1489 def writable(self, top, write=1):
1490 """Make the specified directory tree writable (write == 1)
1491 or not (write == None).
1494 if sys.platform == 'win32':
1497 def do_chmod(fname):
1498 try: os.chmod(fname, stat.S_IWRITE)
1499 except OSError: pass
1501 def do_chmod(fname):
1502 try: os.chmod(fname, stat.S_IREAD)
1503 except OSError: pass
1508 def do_chmod(fname):
1509 try: st = os.stat(fname)
1510 except OSError: pass
1511 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0200))
1513 def do_chmod(fname):
1514 try: st = os.stat(fname)
1515 except OSError: pass
1516 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0200))
1518 if os.path.isfile(top):
1521 col = Collector(top)
1522 os.path.walk(top, col, None)
1523 for d in col.entries: do_chmod(d)
1525 def executable(self, top, execute=1):
1526 """Make the specified directory tree executable (execute == 1)
1527 or not (execute == None).
1529 This method has no effect on Windows systems, which use a
1530 completely different mechanism to control file executability.
1533 if sys.platform == 'win32':
1537 def do_chmod(fname):
1538 try: st = os.stat(fname)
1539 except OSError: pass
1540 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IEXEC))
1542 def do_chmod(fname):
1543 try: st = os.stat(fname)
1544 except OSError: pass
1545 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IEXEC))
1547 if os.path.isfile(top):
1548 # If it's a file, that's easy, just chmod it.
1551 # It's a directory and we're trying to turn on execute
1552 # permission, so it's also pretty easy, just chmod the
1553 # directory and then chmod every entry on our walk down the
1554 # tree. Because os.path.walk() is top-down, we'll enable
1555 # execute permission on any directories that have it disabled
1556 # before os.path.walk() tries to list their contents.
1559 def chmod_entries(arg, dirname, names, do_chmod=do_chmod):
1561 do_chmod(os.path.join(dirname, n))
1563 os.path.walk(top, chmod_entries, None)
1565 # It's a directory and we're trying to turn off execute
1566 # permission, which means we have to chmod the directories
1567 # in the tree bottom-up, lest disabling execute permission from
1568 # the top down get in the way of being able to get at lower
1569 # parts of the tree. But os.path.walk() visits things top
1570 # down, so we just use an object to collect a list of all
1571 # of the entries in the tree, reverse the list, and then
1572 # chmod the reversed (bottom-up) list.
1573 col = Collector(top)
1574 os.path.walk(top, col, None)
1575 col.entries.reverse()
1576 for d in col.entries: do_chmod(d)
1578 def write(self, file, content, mode = 'wb'):
1579 """Writes the specified content text (second argument) to the
1580 specified file name (first argument). The file name may be
1581 a list, in which case the elements are concatenated with the
1582 os.path.join() method. The file is created under the temporary
1583 working directory. Any subdirectories in the path must already
1584 exist. The I/O mode for the file may be specified; it must
1585 begin with a 'w'. The default is 'wb' (binary write).
1587 file = self.canonicalize(file)
1589 raise ValueError, "mode must begin with 'w'"
1590 open(file, mode).write(content)
1594 # indent-tabs-mode:nil
1596 # vim: set expandtab tabstop=4 shiftwidth=4: