2 TestCmd.py: a testing framework for commands and scripts.
4 The TestCmd module provides a framework for portable automated testing
5 of executable commands and scripts (in any language, not just Python),
6 especially commands and scripts that require file system interaction.
8 In addition to running tests and evaluating conditions, the TestCmd
9 module manages and cleans up one or more temporary workspace
10 directories, and provides methods for creating files and directories in
11 those workspace directories from in-line data, here-documents), allowing
12 tests to be completely self-contained.
14 A TestCmd environment object is created via the usual invocation:
17 test = TestCmd.TestCmd()
19 There are a bunch of keyword arguments available at instantiation:
21 test = TestCmd.TestCmd(description = 'string',
22 program = 'program_or_script_to_test',
23 interpreter = 'script_interpreter',
27 match = default_match_function,
28 diff = default_diff_function,
31 There are a bunch of methods that let you do different things:
35 test.description_set('string')
37 test.program_set('program_or_script_to_test')
39 test.interpreter_set('script_interpreter')
40 test.interpreter_set(['script_interpreter', 'arg'])
42 test.workdir_set('prefix')
46 test.workpath('subdir', 'file')
48 test.subdir('subdir', ...)
50 test.rmdir('subdir', ...)
52 test.write('file', "contents\n")
53 test.write(['subdir', 'file'], "contents\n")
56 test.read(['subdir', 'file'])
57 test.read('file', mode)
58 test.read(['subdir', 'file'], mode)
60 test.writable('dir', 1)
61 test.writable('dir', None)
63 test.preserve(condition, ...)
65 test.cleanup(condition)
67 test.command_args(program = 'program_or_script_to_run',
68 interpreter = 'script_interpreter',
69 arguments = 'arguments to pass to program')
71 test.run(program = 'program_or_script_to_run',
72 interpreter = 'script_interpreter',
73 arguments = 'arguments to pass to program',
74 chdir = 'directory_to_chdir_to',
75 stdin = 'input to feed to the program\n')
76 universal_newlines = True)
78 p = test.start(program = 'program_or_script_to_run',
79 interpreter = 'script_interpreter',
80 arguments = 'arguments to pass to program',
81 universal_newlines = None)
86 test.pass_test(condition)
87 test.pass_test(condition, function)
90 test.fail_test(condition)
91 test.fail_test(condition, function)
92 test.fail_test(condition, function, skip)
95 test.no_result(condition)
96 test.no_result(condition, function)
97 test.no_result(condition, function, skip)
105 test.symlink(target, link)
108 test.banner(string, width)
110 test.diff(actual, expected)
112 test.match(actual, expected)
114 test.match_exact("actual 1\nactual 2\n", "expected 1\nexpected 2\n")
115 test.match_exact(["actual 1\n", "actual 2\n"],
116 ["expected 1\n", "expected 2\n"])
118 test.match_re("actual 1\nactual 2\n", regex_string)
119 test.match_re(["actual 1\n", "actual 2\n"], list_of_regexes)
121 test.match_re_dotall("actual 1\nactual 2\n", regex_string)
122 test.match_re_dotall(["actual 1\n", "actual 2\n"], list_of_regexes)
125 test.tempdir('temporary-directory')
131 test.where_is('foo', 'PATH1:PATH2')
132 test.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
135 test.unlink('subdir', 'file')
137 The TestCmd module provides pass_test(), fail_test(), and no_result()
138 unbound functions that report test results for use with the Aegis change
139 management system. These methods terminate the test immediately,
140 reporting PASSED, FAILED, or NO RESULT respectively, and exiting with
141 status 0 (success), 1 or 2 respectively. This allows for a distinction
142 between an actual failed test and a test that could not be properly
143 evaluated because of an external condition (such as a full file system
144 or incorrect permissions).
149 TestCmd.pass_test(condition)
150 TestCmd.pass_test(condition, function)
153 TestCmd.fail_test(condition)
154 TestCmd.fail_test(condition, function)
155 TestCmd.fail_test(condition, function, skip)
158 TestCmd.no_result(condition)
159 TestCmd.no_result(condition, function)
160 TestCmd.no_result(condition, function, skip)
162 The TestCmd module also provides unbound functions that handle matching
163 in the same way as the match_*() methods described above.
167 test = TestCmd.TestCmd(match = TestCmd.match_exact)
169 test = TestCmd.TestCmd(match = TestCmd.match_re)
171 test = TestCmd.TestCmd(match = TestCmd.match_re_dotall)
173 The TestCmd module provides unbound functions that can be used for the
174 "diff" argument to TestCmd.TestCmd instantiation:
178 test = TestCmd.TestCmd(match = TestCmd.match_re,
179 diff = TestCmd.diff_re)
181 test = TestCmd.TestCmd(diff = TestCmd.simple_diff)
183 The "diff" argument can also be used with standard difflib functions:
187 test = TestCmd.TestCmd(diff = difflib.context_diff)
189 test = TestCmd.TestCmd(diff = difflib.unified_diff)
191 Lastly, the where_is() method also exists in an unbound function
196 TestCmd.where_is('foo')
197 TestCmd.where_is('foo', 'PATH1:PATH2')
198 TestCmd.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
201 # Copyright 2000-2010 Steven Knight
202 # This module is free software, and you may redistribute it and/or modify
203 # it under the same terms as Python itself, so long as this copyright message
204 # and disclaimer are retained in their original form.
206 # IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
207 # SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
208 # THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
211 # THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
212 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
213 # PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
214 # AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
215 # SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
216 from __future__ import generators ### KEEP FOR COMPATIBILITY FIXERS
218 __author__ = "Steven Knight <knight at baldmt dot com>"
219 __revision__ = "TestCmd.py 0.37.D001 2010/01/11 16:55:50 knight"
249 __all__.append('simple_diff')
252 return isinstance(e, list) \
253 or isinstance(e, UserList.UserList)
256 from UserString import UserString
264 return isinstance(e, str) or isinstance(e, UserString)
267 return isinstance(e, str) \
268 or isinstance(e, unicode) \
269 or isinstance(e, UserString)
271 tempfile.template = 'testcmd.'
272 if os.name in ('posix', 'nt'):
273 tempfile.template = 'testcmd.' + str(os.getpid()) + '.'
275 tempfile.template = 'testcmd.'
277 re_space = re.compile('\s')
281 _chain_to_exitfunc = None
285 cleanlist = [_f for _f in _Cleanup if _f]
288 for test in cleanlist:
290 if _chain_to_exitfunc:
296 # TODO(1.5): atexit requires python 2.0, so chain sys.exitfunc
298 _chain_to_exitfunc = sys.exitfunc
299 except AttributeError:
301 sys.exitfunc = _clean
303 atexit.register(_clean)
306 def __init__(self, top):
308 def __call__(self, arg, dirname, names):
309 pathjoin = lambda n: os.path.join(dirname, n)
310 self.entries.extend(list(map(pathjoin, names)))
312 def _caller(tblist, skip):
315 for file, line, name, text in tblist:
316 if file[-10:] == "TestCmd.py":
318 arr = [(file, line, name, text)] + arr
320 for file, line, name, text in arr[skip:]:
321 if name in ("?", "<module>"):
324 name = " (" + name + ")"
325 string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name))
329 def fail_test(self = None, condition = 1, function = None, skip = 0):
330 """Cause the test to fail.
332 By default, the fail_test() method reports that the test FAILED
333 and exits with a status of 1. If a condition argument is supplied,
334 the test fails only if the condition is true.
338 if not function is None:
345 of = " of " + self.program
348 desc = " [" + self.description + "]"
351 at = _caller(traceback.extract_stack(), skip)
352 sys.stderr.write("FAILED test" + of + desc + sep + at)
356 def no_result(self = None, condition = 1, function = None, skip = 0):
357 """Causes a test to exit with no valid result.
359 By default, the no_result() method reports NO RESULT for the test
360 and exits with a status of 2. If a condition argument is supplied,
361 the test fails only if the condition is true.
365 if not function is None:
372 of = " of " + self.program
375 desc = " [" + self.description + "]"
378 at = _caller(traceback.extract_stack(), skip)
379 sys.stderr.write("NO RESULT for test" + of + desc + sep + at)
383 def pass_test(self = None, condition = 1, function = None):
384 """Causes a test to pass.
386 By default, the pass_test() method reports PASSED for the test
387 and exits with a status of 0. If a condition argument is supplied,
388 the test passes only if the condition is true.
392 if not function is None:
394 sys.stderr.write("PASSED\n")
397 def match_exact(lines = None, matches = None):
400 if not is_List(lines):
401 lines = lines.split("\n")
402 if not is_List(matches):
403 matches = matches.split("\n")
404 if len(lines) != len(matches):
406 for i in range(len(lines)):
407 if lines[i] != matches[i]:
411 def match_re(lines = None, res = None):
414 if not is_List(lines):
415 lines = lines.split("\n")
417 res = res.split("\n")
418 if len(lines) != len(res):
420 for i in range(len(lines)):
421 s = "^" + res[i] + "$"
425 msg = "Regular expression error in %s: %s"
426 raise re.error, msg % (repr(s), e[0])
427 if not expr.search(lines[i]):
431 def match_re_dotall(lines = None, res = None):
434 if not isinstance(lines, str):
435 lines = "\n".join(lines)
436 if not isinstance(res, str):
440 expr = re.compile(s, re.DOTALL)
442 msg = "Regular expression error in %s: %s"
443 raise re.error, msg % (repr(s), e[0])
444 if expr.match(lines):
452 def simple_diff(a, b, fromfile='', tofile='',
453 fromfiledate='', tofiledate='', n=3, lineterm='\n'):
455 A function with the same calling signature as difflib.context_diff
456 (diff -c) and difflib.unified_diff (diff -u) but which prints
457 output like the simple, unadorned 'diff" command.
459 sm = difflib.SequenceMatcher(None, a, b)
461 return x1+1 == x2 and str(x2) or '%s,%s' % (x1+1, x2)
463 for op, a1, a2, b1, b2 in sm.get_opcodes():
465 result.append("%sd%d" % (comma(a1, a2), b1))
466 result.extend(['< ' + l for l in a[a1:a2]])
468 result.append("%da%s" % (a1, comma(b1, b2)))
469 result.extend(['> ' + l for l in b[b1:b2]])
470 elif op == 'replace':
471 result.append("%sc%s" % (comma(a1, a2), comma(b1, b2)))
472 result.extend(['< ' + l for l in a[a1:a2]])
474 result.extend(['> ' + l for l in b[b1:b2]])
477 def diff_re(a, b, fromfile='', tofile='',
478 fromfiledate='', tofiledate='', n=3, lineterm='\n'):
480 A simple "diff" of two sets of lines when the expected lines
481 are regular expressions. This is a really dumb thing that
482 just compares each line in turn, so it doesn't look for
483 chunks of matching lines and the like--but at least it lets
484 you know exactly which line first didn't compare correctl...
487 diff = len(a) - len(b)
493 for aline, bline in zip(a, b):
494 s = "^" + aline + "$"
498 msg = "Regular expression error in %s: %s"
499 raise re.error, msg % (repr(s), e[0])
500 if not expr.search(bline):
501 result.append("%sc%s" % (i+1, i+1))
502 result.append('< ' + repr(a[i]))
504 result.append('> ' + repr(b[i]))
508 if os.name == 'java':
510 python_executable = os.path.join(sys.prefix, 'jython')
514 python_executable = sys.executable
516 if sys.platform == 'win32':
518 default_sleep_seconds = 2
520 def where_is(file, path=None, pathext=None):
522 path = os.environ['PATH']
524 path = path.split(os.pathsep)
526 pathext = os.environ['PATHEXT']
527 if is_String(pathext):
528 pathext = pathext.split(os.pathsep)
530 if ext.lower() == file[-len(ext):].lower():
534 f = os.path.join(dir, file)
537 if os.path.isfile(fext):
543 def where_is(file, path=None, pathext=None):
545 path = os.environ['PATH']
547 path = path.split(os.pathsep)
549 f = os.path.join(dir, file)
550 if os.path.isfile(f):
555 if stat.S_IMODE(st[stat.ST_MODE]) & 0111:
559 default_sleep_seconds = 1
566 # The subprocess module doesn't exist in this version of Python,
567 # so we're going to cobble up something that looks just enough
568 # like its API for our purposes below.
571 subprocess = new.module('subprocess')
573 subprocess.PIPE = 'PIPE'
574 subprocess.STDOUT = 'STDOUT'
575 subprocess.mswindows = (sys.platform == 'win32')
580 except AttributeError:
582 universal_newlines = 1
583 def __init__(self, command, **kw):
584 if sys.platform == 'win32' and command[0] == '"':
585 command = '"' + command + '"'
586 (stdin, stdout, stderr) = os.popen3(' ' + command)
590 def close_output(self):
592 self.resultcode = self.stderr.close()
594 resultcode = self.resultcode
595 if os.WIFEXITED(resultcode):
596 return os.WEXITSTATUS(resultcode)
597 elif os.WIFSIGNALED(resultcode):
598 return os.WTERMSIG(resultcode)
605 except AttributeError:
606 # A cribbed Popen4 class, with some retrofitted code from
607 # the Python 1.5 Popen3 class methods to do certain things
609 class Popen4(popen2.Popen3):
612 def __init__(self, cmd, bufsize=-1):
613 p2cread, p2cwrite = os.pipe()
614 c2pread, c2pwrite = os.pipe()
621 for i in range(3, popen2.MAXFD):
626 os.execvp(cmd[0], cmd)
629 # Shouldn't come here, I guess
632 self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
634 self.fromchild = os.fdopen(c2pread, 'r', bufsize)
635 popen2._active.append(self)
637 popen2.Popen4 = Popen4
639 class Popen3(popen2.Popen3, popen2.Popen4):
640 universal_newlines = 1
641 def __init__(self, command, **kw):
642 if kw.get('stderr') == 'STDOUT':
643 popen2.Popen4.__init__(self, command, 1)
645 popen2.Popen3.__init__(self, command, 1)
646 self.stdin = self.tochild
647 self.stdout = self.fromchild
648 self.stderr = self.childerr
649 def wait(self, *args, **kw):
650 resultcode = popen2.Popen3.wait(self, *args, **kw)
651 if os.WIFEXITED(resultcode):
652 return os.WEXITSTATUS(resultcode)
653 elif os.WIFSIGNALED(resultcode):
654 return os.WTERMSIG(resultcode)
658 subprocess.Popen = Popen3
662 # From Josiah Carlson,
663 # ASPN : Python Cookbook : Module to allow Asynchronous subprocess use on Windows and Posix platforms
664 # http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440554
666 PIPE = subprocess.PIPE
668 if subprocess.mswindows:
669 from win32file import ReadFile, WriteFile
670 from win32pipe import PeekNamedPipe
677 except AttributeError: fcntl.F_GETFL = 3
680 except AttributeError: fcntl.F_SETFL = 4
682 class Popen(subprocess.Popen):
683 def recv(self, maxsize=None):
684 return self._recv('stdout', maxsize)
686 def recv_err(self, maxsize=None):
687 return self._recv('stderr', maxsize)
689 def send_recv(self, input='', maxsize=None):
690 return self.send(input), self.recv(maxsize), self.recv_err(maxsize)
692 def get_conn_maxsize(self, which, maxsize):
697 return getattr(self, which), maxsize
699 def _close(self, which):
700 getattr(self, which).close()
701 setattr(self, which, None)
703 if subprocess.mswindows:
704 def send(self, input):
709 x = msvcrt.get_osfhandle(self.stdin.fileno())
710 (errCode, written) = WriteFile(x, input)
712 return self._close('stdin')
713 except (subprocess.pywintypes.error, Exception), why:
714 if why[0] in (109, errno.ESHUTDOWN):
715 return self._close('stdin')
720 def _recv(self, which, maxsize):
721 conn, maxsize = self.get_conn_maxsize(which, maxsize)
726 x = msvcrt.get_osfhandle(conn.fileno())
727 (read, nAvail, nMessage) = PeekNamedPipe(x, 0)
731 (errCode, read) = ReadFile(x, nAvail, None)
733 return self._close(which)
734 except (subprocess.pywintypes.error, Exception), why:
735 if why[0] in (109, errno.ESHUTDOWN):
736 return self._close(which)
739 #if self.universal_newlines:
740 # read = self._translate_newlines(read)
744 def send(self, input):
748 if not select.select([], [self.stdin], [], 0)[1]:
752 written = os.write(self.stdin.fileno(), input)
754 if why[0] == errno.EPIPE: #broken pipe
755 return self._close('stdin')
760 def _recv(self, which, maxsize):
761 conn, maxsize = self.get_conn_maxsize(which, maxsize)
766 flags = fcntl.fcntl(conn, fcntl.F_GETFL)
771 fcntl.fcntl(conn, fcntl.F_SETFL, flags| os.O_NONBLOCK)
774 if not select.select([conn], [], [], 0)[0]:
777 r = conn.read(maxsize)
779 return self._close(which)
781 #if self.universal_newlines:
782 # r = self._translate_newlines(r)
785 if not conn.closed and not flags is None:
786 fcntl.fcntl(conn, fcntl.F_SETFL, flags)
788 disconnect_message = "Other end disconnected!"
790 def recv_some(p, t=.1, e=1, tr=5, stderr=0):
799 while time.time() < x or r:
803 raise Exception(disconnect_message)
809 time.sleep(max((x-time.time())/tr, 0))
812 # TODO(3.0: rewrite to use memoryview()
813 def send_all(p, data):
817 raise Exception(disconnect_message)
818 data = buffer(data, sent)
830 class TestCmd(object):
834 def __init__(self, description = None,
843 universal_newlines = 1):
844 self._cwd = os.getcwd()
845 self.description_set(description)
846 self.program_set(program)
847 self.interpreter_set(interpreter)
850 verbose = max( 0, int(os.environ.get('TESTCMD_VERBOSE', 0)) )
853 self.verbose_set(verbose)
854 self.combine = combine
855 self.universal_newlines = universal_newlines
856 if not match is None:
857 self.match_function = match
859 self.match_function = match_re
861 self.diff_function = diff
868 self.diff_function = simple_diff
869 #self.diff_function = difflib.context_diff
870 #self.diff_function = difflib.unified_diff
872 self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0}
873 if 'PRESERVE' in os.environ and not os.environ['PRESERVE'] is '':
874 self._preserve['pass_test'] = os.environ['PRESERVE']
875 self._preserve['fail_test'] = os.environ['PRESERVE']
876 self._preserve['no_result'] = os.environ['PRESERVE']
879 self._preserve['pass_test'] = os.environ['PRESERVE_PASS']
883 self._preserve['fail_test'] = os.environ['PRESERVE_FAIL']
887 self._preserve['no_result'] = os.environ['PRESERVE_NO_RESULT']
893 self.condition = 'no_result'
894 self.workdir_set(workdir)
901 return "%x" % id(self)
906 def banner(self, s, width=None):
908 width = self.banner_width
909 return s + self.banner_char * (width - len(s))
911 if os.name == 'posix':
913 def escape(self, arg):
914 "escape shell special characters"
918 arg = arg.replace(slash, slash+slash)
920 arg = arg.replace(c, slash+c)
922 if re_space.search(arg):
923 arg = '"' + arg + '"'
928 # Windows does not allow special characters in file names
929 # anyway, so no need for an escape function, we will just quote
931 def escape(self, arg):
932 if re_space.search(arg):
933 arg = '"' + arg + '"'
936 def canonicalize(self, path):
938 path = os.path.join(*tuple(path))
939 if not os.path.isabs(path):
940 path = os.path.join(self.workdir, path)
943 def chmod(self, path, mode):
944 """Changes permissions on the specified file or directory
946 path = self.canonicalize(path)
949 def cleanup(self, condition = None):
950 """Removes any temporary working directories for the specified
951 TestCmd environment. If the environment variable PRESERVE was
952 set when the TestCmd environment was created, temporary working
953 directories are not removed. If any of the environment variables
954 PRESERVE_PASS, PRESERVE_FAIL, or PRESERVE_NO_RESULT were set
955 when the TestCmd environment was created, then temporary working
956 directories are not removed if the test passed, failed, or had
957 no result, respectively. Temporary working directories are also
958 preserved for conditions specified via the preserve method.
960 Typically, this method is not called directly, but is used when
961 the script exits to clean up temporary working directories as
962 appropriate for the exit status.
964 if not self._dirlist:
968 if condition is None:
969 condition = self.condition
970 if self._preserve[condition]:
971 for dir in self._dirlist:
972 print "Preserved directory", dir
974 list = self._dirlist[:]
977 self.writable(dir, 1)
978 shutil.rmtree(dir, ignore_errors = 1)
983 _Cleanup.remove(self)
984 except (AttributeError, ValueError):
987 def command_args(self, program = None,
991 if isinstance(program, str) and not os.path.isabs(program):
992 program = os.path.join(self._cwd, program)
994 program = self.program
996 interpreter = self.interpreter
997 if not type(program) in [list, tuple]:
1001 if not type(interpreter) in [list, tuple]:
1002 interpreter = [interpreter]
1003 cmd = list(interpreter) + cmd
1005 if isinstance(arguments, str):
1006 arguments = arguments.split()
1007 cmd.extend(arguments)
1010 def description_set(self, description):
1011 """Set the description of the functionality being tested.
1013 self.description = description
1018 def diff(self, a, b, name, *args, **kw):
1019 print self.banner('Expected %s' % name)
1021 print self.banner('Actual %s' % name)
1024 def diff(self, a, b, name, *args, **kw):
1025 print self.banner(name)
1026 args = (a.splitlines(), b.splitlines()) + args
1027 lines = self.diff_function(*args, **kw)
1031 def fail_test(self, condition = 1, function = None, skip = 0):
1032 """Cause the test to fail.
1036 self.condition = 'fail_test'
1037 fail_test(self = self,
1038 condition = condition,
1039 function = function,
1042 def interpreter_set(self, interpreter):
1043 """Set the program to be used to interpret the program
1044 under test as a script.
1046 self.interpreter = interpreter
1048 def match(self, lines, matches):
1049 """Compare actual and expected file contents.
1051 return self.match_function(lines, matches)
1053 def match_exact(self, lines, matches):
1054 """Compare actual and expected file contents.
1056 return match_exact(lines, matches)
1058 def match_re(self, lines, res):
1059 """Compare actual and expected file contents.
1061 return match_re(lines, res)
1063 def match_re_dotall(self, lines, res):
1064 """Compare actual and expected file contents.
1066 return match_re_dotall(lines, res)
1068 def no_result(self, condition = 1, function = None, skip = 0):
1069 """Report that the test could not be run.
1073 self.condition = 'no_result'
1074 no_result(self = self,
1075 condition = condition,
1076 function = function,
1079 def pass_test(self, condition = 1, function = None):
1080 """Cause the test to pass.
1084 self.condition = 'pass_test'
1085 pass_test(self = self, condition = condition, function = function)
1087 def preserve(self, *conditions):
1088 """Arrange for the temporary working directories for the
1089 specified TestCmd environment to be preserved for one or more
1090 conditions. If no conditions are specified, arranges for
1091 the temporary working directories to be preserved for all
1094 if conditions is ():
1095 conditions = ('pass_test', 'fail_test', 'no_result')
1096 for cond in conditions:
1097 self._preserve[cond] = 1
1099 def program_set(self, program):
1100 """Set the executable program or script to be tested.
1102 if program and not os.path.isabs(program):
1103 program = os.path.join(self._cwd, program)
1104 self.program = program
1106 def read(self, file, mode = 'rb'):
1107 """Reads and returns the contents of the specified file name.
1108 The file name may be a list, in which case the elements are
1109 concatenated with the os.path.join() method. The file is
1110 assumed to be under the temporary working directory unless it
1111 is an absolute path name. The I/O mode for the file may
1112 be specified; it must begin with an 'r'. The default is
1115 file = self.canonicalize(file)
1117 raise ValueError, "mode must begin with 'r'"
1118 return open(file, mode).read()
1120 def rmdir(self, dir):
1121 """Removes the specified dir name.
1122 The dir name may be a list, in which case the elements are
1123 concatenated with the os.path.join() method. The dir is
1124 assumed to be under the temporary working directory unless it
1125 is an absolute path name.
1126 The dir must be empty.
1128 dir = self.canonicalize(dir)
1131 def start(self, program = None,
1134 universal_newlines = None,
1137 Starts a program or script for the test environment.
1139 The specified program will have the original directory
1140 prepended unless it is enclosed in a [list].
1142 cmd = self.command_args(program, interpreter, arguments)
1143 cmd_string = ' '.join(map(self.escape, cmd))
1145 sys.stderr.write(cmd_string + "\n")
1146 if universal_newlines is None:
1147 universal_newlines = self.universal_newlines
1149 # On Windows, if we make stdin a pipe when we plan to send
1150 # no input, and the test program exits before
1151 # Popen calls msvcrt.open_osfhandle, that call will fail.
1152 # So don't use a pipe for stdin if we don't need one.
1153 stdin = kw.get('stdin', None)
1154 if stdin is not None:
1155 stdin = subprocess.PIPE
1157 combine = kw.get('combine', self.combine)
1159 stderr_value = subprocess.STDOUT
1161 stderr_value = subprocess.PIPE
1165 stdout=subprocess.PIPE,
1166 stderr=stderr_value,
1167 universal_newlines=universal_newlines)
1169 def finish(self, popen, **kw):
1171 Finishes and waits for the process being run under control of
1172 the specified popen argument, recording the exit status,
1173 standard output and error output.
1176 self.status = popen.wait()
1179 self._stdout.append(popen.stdout.read())
1181 stderr = popen.stderr.read()
1184 self._stderr.append(stderr)
1186 def run(self, program = None,
1191 universal_newlines = None):
1192 """Runs a test of the program or script for the test
1193 environment. Standard output and error output are saved for
1194 future retrieval via the stdout() and stderr() methods.
1196 The specified program will have the original directory
1197 prepended unless it is enclosed in a [list].
1200 oldcwd = os.getcwd()
1201 if not os.path.isabs(chdir):
1202 chdir = os.path.join(self.workpath(chdir))
1204 sys.stderr.write("chdir(" + chdir + ")\n")
1206 p = self.start(program,
1216 p.stdin.write(stdin)
1219 out = p.stdout.read()
1220 if p.stderr is None:
1223 err = p.stderr.read()
1225 close_output = p.close_output
1226 except AttributeError:
1228 if not p.stderr is None:
1233 self._stdout.append(out)
1234 self._stderr.append(err)
1236 self.status = p.wait()
1242 if self.verbose >= 2:
1243 write = sys.stdout.write
1244 write('============ STATUS: %d\n' % self.status)
1246 if out or self.verbose >= 3:
1247 write('============ BEGIN STDOUT (len=%d):\n' % len(out))
1249 write('============ END STDOUT\n')
1251 if err or self.verbose >= 3:
1252 write('============ BEGIN STDERR (len=%d)\n' % len(err))
1254 write('============ END STDERR\n')
1256 def sleep(self, seconds = default_sleep_seconds):
1257 """Sleeps at least the specified number of seconds. If no
1258 number is specified, sleeps at least the minimum number of
1259 seconds necessary to advance file time stamps on the current
1260 system. Sleeping more seconds is all right.
1264 def stderr(self, run = None):
1265 """Returns the error output from the specified run number.
1266 If there is no specified run number, then returns the error
1267 output of the last run. If the run number is less than zero,
1268 then returns the error output from that many runs back from the
1272 run = len(self._stderr)
1274 run = len(self._stderr) + run
1276 return self._stderr[run]
1278 def stdout(self, run = None):
1279 """Returns the standard output from the specified run number.
1280 If there is no specified run number, then returns the standard
1281 output of the last run. If the run number is less than zero,
1282 then returns the standard output from that many runs back from
1286 run = len(self._stdout)
1288 run = len(self._stdout) + run
1290 return self._stdout[run]
1292 def subdir(self, *subdirs):
1293 """Create new subdirectories under the temporary working
1294 directory, one for each argument. An argument may be a list,
1295 in which case the list elements are concatenated using the
1296 os.path.join() method. Subdirectories multiple levels deep
1297 must be created using a separate argument for each level:
1299 test.subdir('sub', ['sub', 'dir'], ['sub', 'dir', 'ectory'])
1301 Returns the number of subdirectories actually created.
1308 sub = os.path.join(*tuple(sub))
1309 new = os.path.join(self.workdir, sub)
1318 def symlink(self, target, link):
1319 """Creates a symlink to the specified target.
1320 The link name may be a list, in which case the elements are
1321 concatenated with the os.path.join() method. The link is
1322 assumed to be under the temporary working directory unless it
1323 is an absolute path name. The target is *not* assumed to be
1324 under the temporary working directory.
1326 link = self.canonicalize(link)
1327 os.symlink(target, link)
1329 def tempdir(self, path=None):
1330 """Creates a temporary directory.
1331 A unique directory name is generated if no path name is specified.
1332 The directory is created, and will be removed when the TestCmd
1333 object is destroyed.
1337 path = tempfile.mktemp(prefix=tempfile.template)
1339 path = tempfile.mktemp()
1342 # Symlinks in the path will report things
1343 # differently from os.getcwd(), so chdir there
1344 # and back to fetch the canonical path.
1352 # Uppercase the drive letter since the case of drive
1353 # letters is pretty much random on win32:
1354 drive,rest = os.path.splitdrive(path)
1356 path = drive.upper() + rest
1359 self._dirlist.append(path)
1362 _Cleanup.index(self)
1364 _Cleanup.append(self)
1368 def touch(self, path, mtime=None):
1369 """Updates the modification time on the specified file or
1370 directory path name. The default is to update to the
1371 current time if no explicit modification time is specified.
1373 path = self.canonicalize(path)
1374 atime = os.path.getatime(path)
1377 os.utime(path, (atime, mtime))
1379 def unlink(self, file):
1380 """Unlinks the specified file name.
1381 The file name may be a list, in which case the elements are
1382 concatenated with the os.path.join() method. The file is
1383 assumed to be under the temporary working directory unless it
1384 is an absolute path name.
1386 file = self.canonicalize(file)
1389 def verbose_set(self, verbose):
1390 """Set the verbose level.
1392 self.verbose = verbose
1394 def where_is(self, file, path=None, pathext=None):
1395 """Find an executable file.
1398 file = os.path.join(*tuple(file))
1399 if not os.path.isabs(file):
1400 file = where_is(file, path, pathext)
1403 def workdir_set(self, path):
1404 """Creates a temporary working directory with the specified
1405 path name. If the path is a null string (''), a unique
1406 directory name is created.
1411 path = self.tempdir(path)
1414 def workpath(self, *args):
1415 """Returns the absolute path name to a subdirectory or file
1416 within the current temporary working directory. Concatenates
1417 the temporary working directory name with the specified
1418 arguments using the os.path.join() method.
1420 return os.path.join(self.workdir, *tuple(args))
1422 def readable(self, top, read=1):
1423 """Make the specified directory tree readable (read == 1)
1424 or not (read == None).
1426 This method has no effect on Windows systems, which use a
1427 completely different mechanism to control file readability.
1430 if sys.platform == 'win32':
1434 def do_chmod(fname):
1435 try: st = os.stat(fname)
1436 except OSError: pass
1437 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IREAD))
1439 def do_chmod(fname):
1440 try: st = os.stat(fname)
1441 except OSError: pass
1442 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IREAD))
1444 if os.path.isfile(top):
1445 # If it's a file, that's easy, just chmod it.
1448 # It's a directory and we're trying to turn on read
1449 # permission, so it's also pretty easy, just chmod the
1450 # directory and then chmod every entry on our walk down the
1451 # tree. Because os.path.walk() is top-down, we'll enable
1452 # read permission on any directories that have it disabled
1453 # before os.path.walk() tries to list their contents.
1456 def chmod_entries(arg, dirname, names, do_chmod=do_chmod):
1458 do_chmod(os.path.join(dirname, n))
1460 os.path.walk(top, chmod_entries, None)
1462 # It's a directory and we're trying to turn off read
1463 # permission, which means we have to chmod the directoreis
1464 # in the tree bottom-up, lest disabling read permission from
1465 # the top down get in the way of being able to get at lower
1466 # parts of the tree. But os.path.walk() visits things top
1467 # down, so we just use an object to collect a list of all
1468 # of the entries in the tree, reverse the list, and then
1469 # chmod the reversed (bottom-up) list.
1470 col = Collector(top)
1471 os.path.walk(top, col, None)
1472 col.entries.reverse()
1473 for d in col.entries: do_chmod(d)
1475 def writable(self, top, write=1):
1476 """Make the specified directory tree writable (write == 1)
1477 or not (write == None).
1480 if sys.platform == 'win32':
1483 def do_chmod(fname):
1484 try: os.chmod(fname, stat.S_IWRITE)
1485 except OSError: pass
1487 def do_chmod(fname):
1488 try: os.chmod(fname, stat.S_IREAD)
1489 except OSError: pass
1494 def do_chmod(fname):
1495 try: st = os.stat(fname)
1496 except OSError: pass
1497 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0200))
1499 def do_chmod(fname):
1500 try: st = os.stat(fname)
1501 except OSError: pass
1502 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0200))
1504 if os.path.isfile(top):
1507 col = Collector(top)
1508 os.path.walk(top, col, None)
1509 for d in col.entries: do_chmod(d)
1511 def executable(self, top, execute=1):
1512 """Make the specified directory tree executable (execute == 1)
1513 or not (execute == None).
1515 This method has no effect on Windows systems, which use a
1516 completely different mechanism to control file executability.
1519 if sys.platform == 'win32':
1523 def do_chmod(fname):
1524 try: st = os.stat(fname)
1525 except OSError: pass
1526 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IEXEC))
1528 def do_chmod(fname):
1529 try: st = os.stat(fname)
1530 except OSError: pass
1531 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IEXEC))
1533 if os.path.isfile(top):
1534 # If it's a file, that's easy, just chmod it.
1537 # It's a directory and we're trying to turn on execute
1538 # permission, so it's also pretty easy, just chmod the
1539 # directory and then chmod every entry on our walk down the
1540 # tree. Because os.path.walk() is top-down, we'll enable
1541 # execute permission on any directories that have it disabled
1542 # before os.path.walk() tries to list their contents.
1545 def chmod_entries(arg, dirname, names, do_chmod=do_chmod):
1547 do_chmod(os.path.join(dirname, n))
1549 os.path.walk(top, chmod_entries, None)
1551 # It's a directory and we're trying to turn off execute
1552 # permission, which means we have to chmod the directories
1553 # in the tree bottom-up, lest disabling execute permission from
1554 # the top down get in the way of being able to get at lower
1555 # parts of the tree. But os.path.walk() visits things top
1556 # down, so we just use an object to collect a list of all
1557 # of the entries in the tree, reverse the list, and then
1558 # chmod the reversed (bottom-up) list.
1559 col = Collector(top)
1560 os.path.walk(top, col, None)
1561 col.entries.reverse()
1562 for d in col.entries: do_chmod(d)
1564 def write(self, file, content, mode = 'wb'):
1565 """Writes the specified content text (second argument) to the
1566 specified file name (first argument). The file name may be
1567 a list, in which case the elements are concatenated with the
1568 os.path.join() method. The file is created under the temporary
1569 working directory. Any subdirectories in the path must already
1570 exist. The I/O mode for the file may be specified; it must
1571 begin with a 'w'. The default is 'wb' (binary write).
1573 file = self.canonicalize(file)
1575 raise ValueError, "mode must begin with 'w'"
1576 open(file, mode).write(content)
1580 # indent-tabs-mode:nil
1582 # vim: set expandtab tabstop=4 shiftwidth=4: