2 TestCmd.py: a testing framework for commands and scripts.
4 The TestCmd module provides a framework for portable automated testing
5 of executable commands and scripts (in any language, not just Python),
6 especially commands and scripts that require file system interaction.
8 In addition to running tests and evaluating conditions, the TestCmd
9 module manages and cleans up one or more temporary workspace
10 directories, and provides methods for creating files and directories in
11 those workspace directories from in-line data, here-documents), allowing
12 tests to be completely self-contained.
14 A TestCmd environment object is created via the usual invocation:
17 test = TestCmd.TestCmd()
19 There are a bunch of keyword arguments available at instantiation:
21 test = TestCmd.TestCmd(description = 'string',
22 program = 'program_or_script_to_test',
23 interpreter = 'script_interpreter',
27 match = default_match_function,
28 diff = default_diff_function,
31 There are a bunch of methods that let you do different things:
35 test.description_set('string')
37 test.program_set('program_or_script_to_test')
39 test.interpreter_set('script_interpreter')
40 test.interpreter_set(['script_interpreter', 'arg'])
42 test.workdir_set('prefix')
46 test.workpath('subdir', 'file')
48 test.subdir('subdir', ...)
50 test.rmdir('subdir', ...)
52 test.write('file', "contents\n")
53 test.write(['subdir', 'file'], "contents\n")
56 test.read(['subdir', 'file'])
57 test.read('file', mode)
58 test.read(['subdir', 'file'], mode)
60 test.writable('dir', 1)
61 test.writable('dir', None)
63 test.preserve(condition, ...)
65 test.cleanup(condition)
67 test.command_args(program = 'program_or_script_to_run',
68 interpreter = 'script_interpreter',
69 arguments = 'arguments to pass to program')
71 test.run(program = 'program_or_script_to_run',
72 interpreter = 'script_interpreter',
73 arguments = 'arguments to pass to program',
74 chdir = 'directory_to_chdir_to',
75 stdin = 'input to feed to the program\n')
76 universal_newlines = True)
78 p = test.start(program = 'program_or_script_to_run',
79 interpreter = 'script_interpreter',
80 arguments = 'arguments to pass to program',
81 universal_newlines = None)
86 test.pass_test(condition)
87 test.pass_test(condition, function)
90 test.fail_test(condition)
91 test.fail_test(condition, function)
92 test.fail_test(condition, function, skip)
95 test.no_result(condition)
96 test.no_result(condition, function)
97 test.no_result(condition, function, skip)
105 test.symlink(target, link)
108 test.banner(string, width)
110 test.diff(actual, expected)
112 test.match(actual, expected)
114 test.match_exact("actual 1\nactual 2\n", "expected 1\nexpected 2\n")
115 test.match_exact(["actual 1\n", "actual 2\n"],
116 ["expected 1\n", "expected 2\n"])
118 test.match_re("actual 1\nactual 2\n", regex_string)
119 test.match_re(["actual 1\n", "actual 2\n"], list_of_regexes)
121 test.match_re_dotall("actual 1\nactual 2\n", regex_string)
122 test.match_re_dotall(["actual 1\n", "actual 2\n"], list_of_regexes)
125 test.tempdir('temporary-directory')
131 test.where_is('foo', 'PATH1:PATH2')
132 test.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
135 test.unlink('subdir', 'file')
137 The TestCmd module provides pass_test(), fail_test(), and no_result()
138 unbound functions that report test results for use with the Aegis change
139 management system. These methods terminate the test immediately,
140 reporting PASSED, FAILED, or NO RESULT respectively, and exiting with
141 status 0 (success), 1 or 2 respectively. This allows for a distinction
142 between an actual failed test and a test that could not be properly
143 evaluated because of an external condition (such as a full file system
144 or incorrect permissions).
149 TestCmd.pass_test(condition)
150 TestCmd.pass_test(condition, function)
153 TestCmd.fail_test(condition)
154 TestCmd.fail_test(condition, function)
155 TestCmd.fail_test(condition, function, skip)
158 TestCmd.no_result(condition)
159 TestCmd.no_result(condition, function)
160 TestCmd.no_result(condition, function, skip)
162 The TestCmd module also provides unbound functions that handle matching
163 in the same way as the match_*() methods described above.
167 test = TestCmd.TestCmd(match = TestCmd.match_exact)
169 test = TestCmd.TestCmd(match = TestCmd.match_re)
171 test = TestCmd.TestCmd(match = TestCmd.match_re_dotall)
173 The TestCmd module provides unbound functions that can be used for the
174 "diff" argument to TestCmd.TestCmd instantiation:
178 test = TestCmd.TestCmd(match = TestCmd.match_re,
179 diff = TestCmd.diff_re)
181 test = TestCmd.TestCmd(diff = TestCmd.simple_diff)
183 The "diff" argument can also be used with standard difflib functions:
187 test = TestCmd.TestCmd(diff = difflib.context_diff)
189 test = TestCmd.TestCmd(diff = difflib.unified_diff)
191 Lastly, the where_is() method also exists in an unbound function
196 TestCmd.where_is('foo')
197 TestCmd.where_is('foo', 'PATH1:PATH2')
198 TestCmd.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
201 # Copyright 2000-2010 Steven Knight
202 # This module is free software, and you may redistribute it and/or modify
203 # it under the same terms as Python itself, so long as this copyright message
204 # and disclaimer are retained in their original form.
206 # IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
207 # SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
208 # THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
211 # THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
212 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
213 # PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
214 # AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
215 # SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
216 from __future__ import generators ### KEEP FOR COMPATIBILITY FIXERS
218 __author__ = "Steven Knight <knight at baldmt dot com>"
219 __revision__ = "TestCmd.py 0.37.D001 2010/01/11 16:55:50 knight"
235 # pre-2.7 doesn't have the memoryview() built-in
239 from types import SliceType
240 def __init__(self, obj):
241 # wrapping buffer in () keeps the fixer from changing it
242 self.obj = (buffer)(obj)
243 def __getitem__(self, indx):
244 if isinstance(indx, self.SliceType):
245 return self.obj[indx.start:indx.stop]
247 return self.obj[indx]
264 __all__.append('simple_diff')
267 return isinstance(e, list) \
268 or isinstance(e, UserList.UserList)
271 from UserString import UserString
279 return isinstance(e, str) or isinstance(e, UserString)
282 return isinstance(e, str) \
283 or isinstance(e, unicode) \
284 or isinstance(e, UserString)
286 tempfile.template = 'testcmd.'
287 if os.name in ('posix', 'nt'):
288 tempfile.template = 'testcmd.' + str(os.getpid()) + '.'
290 tempfile.template = 'testcmd.'
292 re_space = re.compile('\s')
296 _chain_to_exitfunc = None
300 cleanlist = [_f for _f in _Cleanup if _f]
303 for test in cleanlist:
305 if _chain_to_exitfunc:
311 # TODO(1.5): atexit requires python 2.0, so chain sys.exitfunc
313 _chain_to_exitfunc = sys.exitfunc
314 except AttributeError:
316 sys.exitfunc = _clean
318 atexit.register(_clean)
321 def __init__(self, top):
323 def __call__(self, arg, dirname, names):
324 pathjoin = lambda n: os.path.join(dirname, n)
325 self.entries.extend(list(map(pathjoin, names)))
327 def _caller(tblist, skip):
330 for file, line, name, text in tblist:
331 if file[-10:] == "TestCmd.py":
333 arr = [(file, line, name, text)] + arr
335 for file, line, name, text in arr[skip:]:
336 if name in ("?", "<module>"):
339 name = " (" + name + ")"
340 string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name))
344 def fail_test(self = None, condition = 1, function = None, skip = 0):
345 """Cause the test to fail.
347 By default, the fail_test() method reports that the test FAILED
348 and exits with a status of 1. If a condition argument is supplied,
349 the test fails only if the condition is true.
353 if not function is None:
360 of = " of " + self.program
363 desc = " [" + self.description + "]"
366 at = _caller(traceback.extract_stack(), skip)
367 sys.stderr.write("FAILED test" + of + desc + sep + at)
371 def no_result(self = None, condition = 1, function = None, skip = 0):
372 """Causes a test to exit with no valid result.
374 By default, the no_result() method reports NO RESULT for the test
375 and exits with a status of 2. If a condition argument is supplied,
376 the test fails only if the condition is true.
380 if not function is None:
387 of = " of " + self.program
390 desc = " [" + self.description + "]"
393 at = _caller(traceback.extract_stack(), skip)
394 sys.stderr.write("NO RESULT for test" + of + desc + sep + at)
398 def pass_test(self = None, condition = 1, function = None):
399 """Causes a test to pass.
401 By default, the pass_test() method reports PASSED for the test
402 and exits with a status of 0. If a condition argument is supplied,
403 the test passes only if the condition is true.
407 if not function is None:
409 sys.stderr.write("PASSED\n")
412 def match_exact(lines = None, matches = None):
415 if not is_List(lines):
416 lines = lines.split("\n")
417 if not is_List(matches):
418 matches = matches.split("\n")
419 if len(lines) != len(matches):
421 for i in range(len(lines)):
422 if lines[i] != matches[i]:
426 def match_re(lines = None, res = None):
429 if not is_List(lines):
430 lines = lines.split("\n")
432 res = res.split("\n")
433 if len(lines) != len(res):
435 for i in range(len(lines)):
436 s = "^" + res[i] + "$"
440 msg = "Regular expression error in %s: %s"
441 raise re.error(msg % (repr(s), e[0]))
442 if not expr.search(lines[i]):
446 def match_re_dotall(lines = None, res = None):
449 if not isinstance(lines, str):
450 lines = "\n".join(lines)
451 if not isinstance(res, str):
455 expr = re.compile(s, re.DOTALL)
457 msg = "Regular expression error in %s: %s"
458 raise re.error(msg % (repr(s), e[0]))
459 if expr.match(lines):
467 def simple_diff(a, b, fromfile='', tofile='',
468 fromfiledate='', tofiledate='', n=3, lineterm='\n'):
470 A function with the same calling signature as difflib.context_diff
471 (diff -c) and difflib.unified_diff (diff -u) but which prints
472 output like the simple, unadorned 'diff" command.
474 sm = difflib.SequenceMatcher(None, a, b)
476 return x1+1 == x2 and str(x2) or '%s,%s' % (x1+1, x2)
478 for op, a1, a2, b1, b2 in sm.get_opcodes():
480 result.append("%sd%d" % (comma(a1, a2), b1))
481 result.extend(['< ' + l for l in a[a1:a2]])
483 result.append("%da%s" % (a1, comma(b1, b2)))
484 result.extend(['> ' + l for l in b[b1:b2]])
485 elif op == 'replace':
486 result.append("%sc%s" % (comma(a1, a2), comma(b1, b2)))
487 result.extend(['< ' + l for l in a[a1:a2]])
489 result.extend(['> ' + l for l in b[b1:b2]])
492 def diff_re(a, b, fromfile='', tofile='',
493 fromfiledate='', tofiledate='', n=3, lineterm='\n'):
495 A simple "diff" of two sets of lines when the expected lines
496 are regular expressions. This is a really dumb thing that
497 just compares each line in turn, so it doesn't look for
498 chunks of matching lines and the like--but at least it lets
499 you know exactly which line first didn't compare correctl...
502 diff = len(a) - len(b)
508 for aline, bline in zip(a, b):
509 s = "^" + aline + "$"
513 msg = "Regular expression error in %s: %s"
514 raise re.error(msg % (repr(s), e[0]))
515 if not expr.search(bline):
516 result.append("%sc%s" % (i+1, i+1))
517 result.append('< ' + repr(a[i]))
519 result.append('> ' + repr(b[i]))
523 if os.name == 'java':
525 python_executable = os.path.join(sys.prefix, 'jython')
529 python_executable = sys.executable
531 if sys.platform == 'win32':
533 default_sleep_seconds = 2
535 def where_is(file, path=None, pathext=None):
537 path = os.environ['PATH']
539 path = path.split(os.pathsep)
541 pathext = os.environ['PATHEXT']
542 if is_String(pathext):
543 pathext = pathext.split(os.pathsep)
545 if ext.lower() == file[-len(ext):].lower():
549 f = os.path.join(dir, file)
552 if os.path.isfile(fext):
558 def where_is(file, path=None, pathext=None):
560 path = os.environ['PATH']
562 path = path.split(os.pathsep)
564 f = os.path.join(dir, file)
565 if os.path.isfile(f):
570 if stat.S_IMODE(st[stat.ST_MODE]) & 0111:
574 default_sleep_seconds = 1
581 # The subprocess module doesn't exist in this version of Python,
582 # so we're going to cobble up something that looks just enough
583 # like its API for our purposes below.
584 from types import ModuleType
585 class subprocess(ModuleType): pass
587 subprocess.PIPE = 'PIPE'
588 subprocess.STDOUT = 'STDOUT'
589 subprocess.mswindows = (sys.platform == 'win32')
594 except AttributeError:
596 universal_newlines = 1
597 def __init__(self, command, **kw):
598 if sys.platform == 'win32' and command[0] == '"':
599 command = '"' + command + '"'
600 (stdin, stdout, stderr) = os.popen3(' ' + command)
604 def close_output(self):
606 self.resultcode = self.stderr.close()
608 resultcode = self.resultcode
609 if os.WIFEXITED(resultcode):
610 return os.WEXITSTATUS(resultcode)
611 elif os.WIFSIGNALED(resultcode):
612 return os.WTERMSIG(resultcode)
619 except AttributeError:
620 # A cribbed Popen4 class, with some retrofitted code from
621 # the Python 1.5 Popen3 class methods to do certain things
623 class Popen4(popen2.Popen3):
626 def __init__(self, cmd, bufsize=-1):
627 p2cread, p2cwrite = os.pipe()
628 c2pread, c2pwrite = os.pipe()
635 for i in range(3, popen2.MAXFD):
640 os.execvp(cmd[0], cmd)
643 # Shouldn't come here, I guess
646 self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
648 self.fromchild = os.fdopen(c2pread, 'r', bufsize)
649 popen2._active.append(self)
651 popen2.Popen4 = Popen4
653 class Popen3(popen2.Popen3, popen2.Popen4):
654 universal_newlines = 1
655 def __init__(self, command, **kw):
656 if kw.get('stderr') == 'STDOUT':
657 popen2.Popen4.__init__(self, command, 1)
659 popen2.Popen3.__init__(self, command, 1)
660 self.stdin = self.tochild
661 self.stdout = self.fromchild
662 self.stderr = self.childerr
663 def wait(self, *args, **kw):
664 resultcode = popen2.Popen3.wait(self, *args, **kw)
665 if os.WIFEXITED(resultcode):
666 return os.WEXITSTATUS(resultcode)
667 elif os.WIFSIGNALED(resultcode):
668 return os.WTERMSIG(resultcode)
672 subprocess.Popen = Popen3
676 # From Josiah Carlson,
677 # ASPN : Python Cookbook : Module to allow Asynchronous subprocess use on Windows and Posix platforms
678 # http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440554
680 PIPE = subprocess.PIPE
682 if subprocess.mswindows:
683 from win32file import ReadFile, WriteFile
684 from win32pipe import PeekNamedPipe
691 except AttributeError: fcntl.F_GETFL = 3
694 except AttributeError: fcntl.F_SETFL = 4
696 class Popen(subprocess.Popen):
697 def recv(self, maxsize=None):
698 return self._recv('stdout', maxsize)
700 def recv_err(self, maxsize=None):
701 return self._recv('stderr', maxsize)
703 def send_recv(self, input='', maxsize=None):
704 return self.send(input), self.recv(maxsize), self.recv_err(maxsize)
706 def get_conn_maxsize(self, which, maxsize):
711 return getattr(self, which), maxsize
713 def _close(self, which):
714 getattr(self, which).close()
715 setattr(self, which, None)
717 if subprocess.mswindows:
718 def send(self, input):
723 x = msvcrt.get_osfhandle(self.stdin.fileno())
724 (errCode, written) = WriteFile(x, input)
726 return self._close('stdin')
727 except (subprocess.pywintypes.error, Exception), why:
728 if why[0] in (109, errno.ESHUTDOWN):
729 return self._close('stdin')
734 def _recv(self, which, maxsize):
735 conn, maxsize = self.get_conn_maxsize(which, maxsize)
740 x = msvcrt.get_osfhandle(conn.fileno())
741 (read, nAvail, nMessage) = PeekNamedPipe(x, 0)
745 (errCode, read) = ReadFile(x, nAvail, None)
747 return self._close(which)
748 except (subprocess.pywintypes.error, Exception), why:
749 if why[0] in (109, errno.ESHUTDOWN):
750 return self._close(which)
753 #if self.universal_newlines:
754 # read = self._translate_newlines(read)
758 def send(self, input):
762 if not select.select([], [self.stdin], [], 0)[1]:
766 written = os.write(self.stdin.fileno(), input)
768 if why[0] == errno.EPIPE: #broken pipe
769 return self._close('stdin')
774 def _recv(self, which, maxsize):
775 conn, maxsize = self.get_conn_maxsize(which, maxsize)
780 flags = fcntl.fcntl(conn, fcntl.F_GETFL)
785 fcntl.fcntl(conn, fcntl.F_SETFL, flags| os.O_NONBLOCK)
788 if not select.select([conn], [], [], 0)[0]:
791 r = conn.read(maxsize)
793 return self._close(which)
795 #if self.universal_newlines:
796 # r = self._translate_newlines(r)
799 if not conn.closed and not flags is None:
800 fcntl.fcntl(conn, fcntl.F_SETFL, flags)
802 disconnect_message = "Other end disconnected!"
804 def recv_some(p, t=.1, e=1, tr=5, stderr=0):
813 while time.time() < x or r:
817 raise Exception(disconnect_message)
823 time.sleep(max((x-time.time())/tr, 0))
826 def send_all(p, data):
830 raise Exception(disconnect_message)
831 data = memoryview(data)[sent:]
843 class TestCmd(object):
847 def __init__(self, description = None,
856 universal_newlines = 1):
857 self._cwd = os.getcwd()
858 self.description_set(description)
859 self.program_set(program)
860 self.interpreter_set(interpreter)
863 verbose = max( 0, int(os.environ.get('TESTCMD_VERBOSE', 0)) )
866 self.verbose_set(verbose)
867 self.combine = combine
868 self.universal_newlines = universal_newlines
869 if not match is None:
870 self.match_function = match
872 self.match_function = match_re
874 self.diff_function = diff
881 self.diff_function = simple_diff
882 #self.diff_function = difflib.context_diff
883 #self.diff_function = difflib.unified_diff
885 self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0}
886 if 'PRESERVE' in os.environ and not os.environ['PRESERVE'] is '':
887 self._preserve['pass_test'] = os.environ['PRESERVE']
888 self._preserve['fail_test'] = os.environ['PRESERVE']
889 self._preserve['no_result'] = os.environ['PRESERVE']
892 self._preserve['pass_test'] = os.environ['PRESERVE_PASS']
896 self._preserve['fail_test'] = os.environ['PRESERVE_FAIL']
900 self._preserve['no_result'] = os.environ['PRESERVE_NO_RESULT']
906 self.condition = 'no_result'
907 self.workdir_set(workdir)
914 return "%x" % id(self)
919 def banner(self, s, width=None):
921 width = self.banner_width
922 return s + self.banner_char * (width - len(s))
924 if os.name == 'posix':
926 def escape(self, arg):
927 "escape shell special characters"
931 arg = arg.replace(slash, slash+slash)
933 arg = arg.replace(c, slash+c)
935 if re_space.search(arg):
936 arg = '"' + arg + '"'
941 # Windows does not allow special characters in file names
942 # anyway, so no need for an escape function, we will just quote
944 def escape(self, arg):
945 if re_space.search(arg):
946 arg = '"' + arg + '"'
949 def canonicalize(self, path):
951 path = os.path.join(*tuple(path))
952 if not os.path.isabs(path):
953 path = os.path.join(self.workdir, path)
956 def chmod(self, path, mode):
957 """Changes permissions on the specified file or directory
959 path = self.canonicalize(path)
962 def cleanup(self, condition = None):
963 """Removes any temporary working directories for the specified
964 TestCmd environment. If the environment variable PRESERVE was
965 set when the TestCmd environment was created, temporary working
966 directories are not removed. If any of the environment variables
967 PRESERVE_PASS, PRESERVE_FAIL, or PRESERVE_NO_RESULT were set
968 when the TestCmd environment was created, then temporary working
969 directories are not removed if the test passed, failed, or had
970 no result, respectively. Temporary working directories are also
971 preserved for conditions specified via the preserve method.
973 Typically, this method is not called directly, but is used when
974 the script exits to clean up temporary working directories as
975 appropriate for the exit status.
977 if not self._dirlist:
981 if condition is None:
982 condition = self.condition
983 if self._preserve[condition]:
984 for dir in self._dirlist:
985 print "Preserved directory", dir
987 list = self._dirlist[:]
990 self.writable(dir, 1)
991 shutil.rmtree(dir, ignore_errors = 1)
996 _Cleanup.remove(self)
997 except (AttributeError, ValueError):
1000 def command_args(self, program = None,
1004 if isinstance(program, str) and not os.path.isabs(program):
1005 program = os.path.join(self._cwd, program)
1007 program = self.program
1009 interpreter = self.interpreter
1010 if not type(program) in [list, tuple]:
1014 if not type(interpreter) in [list, tuple]:
1015 interpreter = [interpreter]
1016 cmd = list(interpreter) + cmd
1018 if isinstance(arguments, str):
1019 arguments = arguments.split()
1020 cmd.extend(arguments)
1023 def description_set(self, description):
1024 """Set the description of the functionality being tested.
1026 self.description = description
1031 def diff(self, a, b, name, *args, **kw):
1032 print self.banner('Expected %s' % name)
1034 print self.banner('Actual %s' % name)
1037 def diff(self, a, b, name, *args, **kw):
1038 print self.banner(name)
1039 args = (a.splitlines(), b.splitlines()) + args
1040 lines = self.diff_function(*args, **kw)
1044 def fail_test(self, condition = 1, function = None, skip = 0):
1045 """Cause the test to fail.
1049 self.condition = 'fail_test'
1050 fail_test(self = self,
1051 condition = condition,
1052 function = function,
1055 def interpreter_set(self, interpreter):
1056 """Set the program to be used to interpret the program
1057 under test as a script.
1059 self.interpreter = interpreter
1061 def match(self, lines, matches):
1062 """Compare actual and expected file contents.
1064 return self.match_function(lines, matches)
1066 def match_exact(self, lines, matches):
1067 """Compare actual and expected file contents.
1069 return match_exact(lines, matches)
1071 def match_re(self, lines, res):
1072 """Compare actual and expected file contents.
1074 return match_re(lines, res)
1076 def match_re_dotall(self, lines, res):
1077 """Compare actual and expected file contents.
1079 return match_re_dotall(lines, res)
1081 def no_result(self, condition = 1, function = None, skip = 0):
1082 """Report that the test could not be run.
1086 self.condition = 'no_result'
1087 no_result(self = self,
1088 condition = condition,
1089 function = function,
1092 def pass_test(self, condition = 1, function = None):
1093 """Cause the test to pass.
1097 self.condition = 'pass_test'
1098 pass_test(self = self, condition = condition, function = function)
1100 def preserve(self, *conditions):
1101 """Arrange for the temporary working directories for the
1102 specified TestCmd environment to be preserved for one or more
1103 conditions. If no conditions are specified, arranges for
1104 the temporary working directories to be preserved for all
1107 if conditions is ():
1108 conditions = ('pass_test', 'fail_test', 'no_result')
1109 for cond in conditions:
1110 self._preserve[cond] = 1
1112 def program_set(self, program):
1113 """Set the executable program or script to be tested.
1115 if program and not os.path.isabs(program):
1116 program = os.path.join(self._cwd, program)
1117 self.program = program
1119 def read(self, file, mode = 'rb'):
1120 """Reads and returns the contents of the specified file name.
1121 The file name may be a list, in which case the elements are
1122 concatenated with the os.path.join() method. The file is
1123 assumed to be under the temporary working directory unless it
1124 is an absolute path name. The I/O mode for the file may
1125 be specified; it must begin with an 'r'. The default is
1128 file = self.canonicalize(file)
1130 raise ValueError("mode must begin with 'r'")
1131 return open(file, mode).read()
1133 def rmdir(self, dir):
1134 """Removes the specified dir name.
1135 The dir name may be a list, in which case the elements are
1136 concatenated with the os.path.join() method. The dir is
1137 assumed to be under the temporary working directory unless it
1138 is an absolute path name.
1139 The dir must be empty.
1141 dir = self.canonicalize(dir)
1144 def start(self, program = None,
1147 universal_newlines = None,
1150 Starts a program or script for the test environment.
1152 The specified program will have the original directory
1153 prepended unless it is enclosed in a [list].
1155 cmd = self.command_args(program, interpreter, arguments)
1156 cmd_string = ' '.join(map(self.escape, cmd))
1158 sys.stderr.write(cmd_string + "\n")
1159 if universal_newlines is None:
1160 universal_newlines = self.universal_newlines
1162 # On Windows, if we make stdin a pipe when we plan to send
1163 # no input, and the test program exits before
1164 # Popen calls msvcrt.open_osfhandle, that call will fail.
1165 # So don't use a pipe for stdin if we don't need one.
1166 stdin = kw.get('stdin', None)
1167 if stdin is not None:
1168 stdin = subprocess.PIPE
1170 combine = kw.get('combine', self.combine)
1172 stderr_value = subprocess.STDOUT
1174 stderr_value = subprocess.PIPE
1178 stdout=subprocess.PIPE,
1179 stderr=stderr_value,
1180 universal_newlines=universal_newlines)
1182 def finish(self, popen, **kw):
1184 Finishes and waits for the process being run under control of
1185 the specified popen argument, recording the exit status,
1186 standard output and error output.
1189 self.status = popen.wait()
1192 self._stdout.append(popen.stdout.read())
1194 stderr = popen.stderr.read()
1197 self._stderr.append(stderr)
1199 def run(self, program = None,
1204 universal_newlines = None):
1205 """Runs a test of the program or script for the test
1206 environment. Standard output and error output are saved for
1207 future retrieval via the stdout() and stderr() methods.
1209 The specified program will have the original directory
1210 prepended unless it is enclosed in a [list].
1213 oldcwd = os.getcwd()
1214 if not os.path.isabs(chdir):
1215 chdir = os.path.join(self.workpath(chdir))
1217 sys.stderr.write("chdir(" + chdir + ")\n")
1219 p = self.start(program,
1229 p.stdin.write(stdin)
1232 out = p.stdout.read()
1233 if p.stderr is None:
1236 err = p.stderr.read()
1238 close_output = p.close_output
1239 except AttributeError:
1241 if not p.stderr is None:
1246 self._stdout.append(out)
1247 self._stderr.append(err)
1249 self.status = p.wait()
1255 if self.verbose >= 2:
1256 write = sys.stdout.write
1257 write('============ STATUS: %d\n' % self.status)
1259 if out or self.verbose >= 3:
1260 write('============ BEGIN STDOUT (len=%d):\n' % len(out))
1262 write('============ END STDOUT\n')
1264 if err or self.verbose >= 3:
1265 write('============ BEGIN STDERR (len=%d)\n' % len(err))
1267 write('============ END STDERR\n')
1269 def sleep(self, seconds = default_sleep_seconds):
1270 """Sleeps at least the specified number of seconds. If no
1271 number is specified, sleeps at least the minimum number of
1272 seconds necessary to advance file time stamps on the current
1273 system. Sleeping more seconds is all right.
1277 def stderr(self, run = None):
1278 """Returns the error output from the specified run number.
1279 If there is no specified run number, then returns the error
1280 output of the last run. If the run number is less than zero,
1281 then returns the error output from that many runs back from the
1285 run = len(self._stderr)
1287 run = len(self._stderr) + run
1289 return self._stderr[run]
1291 def stdout(self, run = None):
1292 """Returns the standard output from the specified run number.
1293 If there is no specified run number, then returns the standard
1294 output of the last run. If the run number is less than zero,
1295 then returns the standard output from that many runs back from
1299 run = len(self._stdout)
1301 run = len(self._stdout) + run
1303 return self._stdout[run]
1305 def subdir(self, *subdirs):
1306 """Create new subdirectories under the temporary working
1307 directory, one for each argument. An argument may be a list,
1308 in which case the list elements are concatenated using the
1309 os.path.join() method. Subdirectories multiple levels deep
1310 must be created using a separate argument for each level:
1312 test.subdir('sub', ['sub', 'dir'], ['sub', 'dir', 'ectory'])
1314 Returns the number of subdirectories actually created.
1321 sub = os.path.join(*tuple(sub))
1322 new = os.path.join(self.workdir, sub)
1331 def symlink(self, target, link):
1332 """Creates a symlink to the specified target.
1333 The link name may be a list, in which case the elements are
1334 concatenated with the os.path.join() method. The link is
1335 assumed to be under the temporary working directory unless it
1336 is an absolute path name. The target is *not* assumed to be
1337 under the temporary working directory.
1339 link = self.canonicalize(link)
1340 os.symlink(target, link)
1342 def tempdir(self, path=None):
1343 """Creates a temporary directory.
1344 A unique directory name is generated if no path name is specified.
1345 The directory is created, and will be removed when the TestCmd
1346 object is destroyed.
1350 path = tempfile.mktemp(prefix=tempfile.template)
1352 path = tempfile.mktemp()
1355 # Symlinks in the path will report things
1356 # differently from os.getcwd(), so chdir there
1357 # and back to fetch the canonical path.
1365 # Uppercase the drive letter since the case of drive
1366 # letters is pretty much random on win32:
1367 drive,rest = os.path.splitdrive(path)
1369 path = drive.upper() + rest
1372 self._dirlist.append(path)
1375 _Cleanup.index(self)
1377 _Cleanup.append(self)
1381 def touch(self, path, mtime=None):
1382 """Updates the modification time on the specified file or
1383 directory path name. The default is to update to the
1384 current time if no explicit modification time is specified.
1386 path = self.canonicalize(path)
1387 atime = os.path.getatime(path)
1390 os.utime(path, (atime, mtime))
1392 def unlink(self, file):
1393 """Unlinks the specified file name.
1394 The file name may be a list, in which case the elements are
1395 concatenated with the os.path.join() method. The file is
1396 assumed to be under the temporary working directory unless it
1397 is an absolute path name.
1399 file = self.canonicalize(file)
1402 def verbose_set(self, verbose):
1403 """Set the verbose level.
1405 self.verbose = verbose
1407 def where_is(self, file, path=None, pathext=None):
1408 """Find an executable file.
1411 file = os.path.join(*tuple(file))
1412 if not os.path.isabs(file):
1413 file = where_is(file, path, pathext)
1416 def workdir_set(self, path):
1417 """Creates a temporary working directory with the specified
1418 path name. If the path is a null string (''), a unique
1419 directory name is created.
1424 path = self.tempdir(path)
1427 def workpath(self, *args):
1428 """Returns the absolute path name to a subdirectory or file
1429 within the current temporary working directory. Concatenates
1430 the temporary working directory name with the specified
1431 arguments using the os.path.join() method.
1433 return os.path.join(self.workdir, *tuple(args))
1435 def readable(self, top, read=1):
1436 """Make the specified directory tree readable (read == 1)
1437 or not (read == None).
1439 This method has no effect on Windows systems, which use a
1440 completely different mechanism to control file readability.
1443 if sys.platform == 'win32':
1447 def do_chmod(fname):
1448 try: st = os.stat(fname)
1449 except OSError: pass
1450 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IREAD))
1452 def do_chmod(fname):
1453 try: st = os.stat(fname)
1454 except OSError: pass
1455 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IREAD))
1457 if os.path.isfile(top):
1458 # If it's a file, that's easy, just chmod it.
1461 # It's a directory and we're trying to turn on read
1462 # permission, so it's also pretty easy, just chmod the
1463 # directory and then chmod every entry on our walk down the
1464 # tree. Because os.path.walk() is top-down, we'll enable
1465 # read permission on any directories that have it disabled
1466 # before os.path.walk() tries to list their contents.
1469 def chmod_entries(arg, dirname, names, do_chmod=do_chmod):
1471 do_chmod(os.path.join(dirname, n))
1473 os.path.walk(top, chmod_entries, None)
1475 # It's a directory and we're trying to turn off read
1476 # permission, which means we have to chmod the directoreis
1477 # in the tree bottom-up, lest disabling read permission from
1478 # the top down get in the way of being able to get at lower
1479 # parts of the tree. But os.path.walk() visits things top
1480 # down, so we just use an object to collect a list of all
1481 # of the entries in the tree, reverse the list, and then
1482 # chmod the reversed (bottom-up) list.
1483 col = Collector(top)
1484 os.path.walk(top, col, None)
1485 col.entries.reverse()
1486 for d in col.entries: do_chmod(d)
1488 def writable(self, top, write=1):
1489 """Make the specified directory tree writable (write == 1)
1490 or not (write == None).
1493 if sys.platform == 'win32':
1496 def do_chmod(fname):
1497 try: os.chmod(fname, stat.S_IWRITE)
1498 except OSError: pass
1500 def do_chmod(fname):
1501 try: os.chmod(fname, stat.S_IREAD)
1502 except OSError: pass
1507 def do_chmod(fname):
1508 try: st = os.stat(fname)
1509 except OSError: pass
1510 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0200))
1512 def do_chmod(fname):
1513 try: st = os.stat(fname)
1514 except OSError: pass
1515 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0200))
1517 if os.path.isfile(top):
1520 col = Collector(top)
1521 os.path.walk(top, col, None)
1522 for d in col.entries: do_chmod(d)
1524 def executable(self, top, execute=1):
1525 """Make the specified directory tree executable (execute == 1)
1526 or not (execute == None).
1528 This method has no effect on Windows systems, which use a
1529 completely different mechanism to control file executability.
1532 if sys.platform == 'win32':
1536 def do_chmod(fname):
1537 try: st = os.stat(fname)
1538 except OSError: pass
1539 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IEXEC))
1541 def do_chmod(fname):
1542 try: st = os.stat(fname)
1543 except OSError: pass
1544 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IEXEC))
1546 if os.path.isfile(top):
1547 # If it's a file, that's easy, just chmod it.
1550 # It's a directory and we're trying to turn on execute
1551 # permission, so it's also pretty easy, just chmod the
1552 # directory and then chmod every entry on our walk down the
1553 # tree. Because os.path.walk() is top-down, we'll enable
1554 # execute permission on any directories that have it disabled
1555 # before os.path.walk() tries to list their contents.
1558 def chmod_entries(arg, dirname, names, do_chmod=do_chmod):
1560 do_chmod(os.path.join(dirname, n))
1562 os.path.walk(top, chmod_entries, None)
1564 # It's a directory and we're trying to turn off execute
1565 # permission, which means we have to chmod the directories
1566 # in the tree bottom-up, lest disabling execute permission from
1567 # the top down get in the way of being able to get at lower
1568 # parts of the tree. But os.path.walk() visits things top
1569 # down, so we just use an object to collect a list of all
1570 # of the entries in the tree, reverse the list, and then
1571 # chmod the reversed (bottom-up) list.
1572 col = Collector(top)
1573 os.path.walk(top, col, None)
1574 col.entries.reverse()
1575 for d in col.entries: do_chmod(d)
1577 def write(self, file, content, mode = 'wb'):
1578 """Writes the specified content text (second argument) to the
1579 specified file name (first argument). The file name may be
1580 a list, in which case the elements are concatenated with the
1581 os.path.join() method. The file is created under the temporary
1582 working directory. Any subdirectories in the path must already
1583 exist. The I/O mode for the file may be specified; it must
1584 begin with a 'w'. The default is 'wb' (binary write).
1586 file = self.canonicalize(file)
1588 raise ValueError("mode must begin with 'w'")
1589 open(file, mode).write(content)
1593 # indent-tabs-mode:nil
1595 # vim: set expandtab tabstop=4 shiftwidth=4: