2 TestCmd.py: a testing framework for commands and scripts.
4 The TestCmd module provides a framework for portable automated testing
5 of executable commands and scripts (in any language, not just Python),
6 especially commands and scripts that require file system interaction.
8 In addition to running tests and evaluating conditions, the TestCmd
9 module manages and cleans up one or more temporary workspace
10 directories, and provides methods for creating files and directories in
11 those workspace directories from in-line data, here-documents), allowing
12 tests to be completely self-contained.
14 A TestCmd environment object is created via the usual invocation:
17 test = TestCmd.TestCmd()
19 There are a bunch of keyword arguments available at instantiation:
21 test = TestCmd.TestCmd(description = 'string',
22 program = 'program_or_script_to_test',
23 interpreter = 'script_interpreter',
27 match = default_match_function,
28 diff = default_diff_function,
31 There are a bunch of methods that let you do different things:
35 test.description_set('string')
37 test.program_set('program_or_script_to_test')
39 test.interpreter_set('script_interpreter')
40 test.interpreter_set(['script_interpreter', 'arg'])
42 test.workdir_set('prefix')
46 test.workpath('subdir', 'file')
48 test.subdir('subdir', ...)
50 test.rmdir('subdir', ...)
52 test.write('file', "contents\n")
53 test.write(['subdir', 'file'], "contents\n")
56 test.read(['subdir', 'file'])
57 test.read('file', mode)
58 test.read(['subdir', 'file'], mode)
60 test.writable('dir', 1)
61 test.writable('dir', None)
63 test.preserve(condition, ...)
65 test.cleanup(condition)
67 test.command_args(program = 'program_or_script_to_run',
68 interpreter = 'script_interpreter',
69 arguments = 'arguments to pass to program')
71 test.run(program = 'program_or_script_to_run',
72 interpreter = 'script_interpreter',
73 arguments = 'arguments to pass to program',
74 chdir = 'directory_to_chdir_to',
75 stdin = 'input to feed to the program\n')
76 universal_newlines = True)
78 p = test.start(program = 'program_or_script_to_run',
79 interpreter = 'script_interpreter',
80 arguments = 'arguments to pass to program',
81 universal_newlines = None)
86 test.pass_test(condition)
87 test.pass_test(condition, function)
90 test.fail_test(condition)
91 test.fail_test(condition, function)
92 test.fail_test(condition, function, skip)
95 test.no_result(condition)
96 test.no_result(condition, function)
97 test.no_result(condition, function, skip)
105 test.symlink(target, link)
108 test.banner(string, width)
110 test.diff(actual, expected)
112 test.match(actual, expected)
114 test.match_exact("actual 1\nactual 2\n", "expected 1\nexpected 2\n")
115 test.match_exact(["actual 1\n", "actual 2\n"],
116 ["expected 1\n", "expected 2\n"])
118 test.match_re("actual 1\nactual 2\n", regex_string)
119 test.match_re(["actual 1\n", "actual 2\n"], list_of_regexes)
121 test.match_re_dotall("actual 1\nactual 2\n", regex_string)
122 test.match_re_dotall(["actual 1\n", "actual 2\n"], list_of_regexes)
125 test.tempdir('temporary-directory')
131 test.where_is('foo', 'PATH1:PATH2')
132 test.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
135 test.unlink('subdir', 'file')
137 The TestCmd module provides pass_test(), fail_test(), and no_result()
138 unbound functions that report test results for use with the Aegis change
139 management system. These methods terminate the test immediately,
140 reporting PASSED, FAILED, or NO RESULT respectively, and exiting with
141 status 0 (success), 1 or 2 respectively. This allows for a distinction
142 between an actual failed test and a test that could not be properly
143 evaluated because of an external condition (such as a full file system
144 or incorrect permissions).
149 TestCmd.pass_test(condition)
150 TestCmd.pass_test(condition, function)
153 TestCmd.fail_test(condition)
154 TestCmd.fail_test(condition, function)
155 TestCmd.fail_test(condition, function, skip)
158 TestCmd.no_result(condition)
159 TestCmd.no_result(condition, function)
160 TestCmd.no_result(condition, function, skip)
162 The TestCmd module also provides unbound functions that handle matching
163 in the same way as the match_*() methods described above.
167 test = TestCmd.TestCmd(match = TestCmd.match_exact)
169 test = TestCmd.TestCmd(match = TestCmd.match_re)
171 test = TestCmd.TestCmd(match = TestCmd.match_re_dotall)
173 The TestCmd module provides unbound functions that can be used for the
174 "diff" argument to TestCmd.TestCmd instantiation:
178 test = TestCmd.TestCmd(match = TestCmd.match_re,
179 diff = TestCmd.diff_re)
181 test = TestCmd.TestCmd(diff = TestCmd.simple_diff)
183 The "diff" argument can also be used with standard difflib functions:
187 test = TestCmd.TestCmd(diff = difflib.context_diff)
189 test = TestCmd.TestCmd(diff = difflib.unified_diff)
191 Lastly, the where_is() method also exists in an unbound function
196 TestCmd.where_is('foo')
197 TestCmd.where_is('foo', 'PATH1:PATH2')
198 TestCmd.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
201 # Copyright 2000-2010 Steven Knight
202 # This module is free software, and you may redistribute it and/or modify
203 # it under the same terms as Python itself, so long as this copyright message
204 # and disclaimer are retained in their original form.
206 # IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
207 # SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
208 # THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
211 # THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
212 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
213 # PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
214 # AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
215 # SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
216 from __future__ import generators ### KEEP FOR COMPATIBILITY FIXERS
218 __author__ = "Steven Knight <knight at baldmt dot com>"
219 __revision__ = "TestCmd.py 0.37.D001 2010/01/11 16:55:50 knight"
250 __all__.append('simple_diff')
253 return type(e) is types.ListType \
254 or isinstance(e, UserList.UserList)
257 from UserString import UserString
262 if hasattr(types, 'UnicodeType'):
264 return type(e) is types.StringType \
265 or type(e) is types.UnicodeType \
266 or isinstance(e, UserString)
269 return type(e) is types.StringType or isinstance(e, UserString)
271 tempfile.template = 'testcmd.'
272 if os.name in ('posix', 'nt'):
273 tempfile.template = 'testcmd.' + str(os.getpid()) + '.'
275 tempfile.template = 'testcmd.'
277 re_space = re.compile('\s')
281 _chain_to_exitfunc = None
285 cleanlist = [_f for _f in _Cleanup if _f]
288 for test in cleanlist:
290 if _chain_to_exitfunc:
296 # TODO(1.5): atexit requires python 2.0, so chain sys.exitfunc
298 _chain_to_exitfunc = sys.exitfunc
299 except AttributeError:
301 sys.exitfunc = _clean
303 atexit.register(_clean)
310 for i in xrange(min(list(map(len, lists)))):
311 result.append(tuple([l[i] for l in lists]))
315 def __init__(self, top):
317 def __call__(self, arg, dirname, names):
318 pathjoin = lambda n: os.path.join(dirname, n)
319 self.entries.extend(list(map(pathjoin, names)))
321 def _caller(tblist, skip):
324 for file, line, name, text in tblist:
325 if file[-10:] == "TestCmd.py":
327 arr = [(file, line, name, text)] + arr
329 for file, line, name, text in arr[skip:]:
330 if name in ("?", "<module>"):
333 name = " (" + name + ")"
334 string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name))
338 def fail_test(self = None, condition = 1, function = None, skip = 0):
339 """Cause the test to fail.
341 By default, the fail_test() method reports that the test FAILED
342 and exits with a status of 1. If a condition argument is supplied,
343 the test fails only if the condition is true.
347 if not function is None:
354 of = " of " + self.program
357 desc = " [" + self.description + "]"
360 at = _caller(traceback.extract_stack(), skip)
361 sys.stderr.write("FAILED test" + of + desc + sep + at)
365 def no_result(self = None, condition = 1, function = None, skip = 0):
366 """Causes a test to exit with no valid result.
368 By default, the no_result() method reports NO RESULT for the test
369 and exits with a status of 2. If a condition argument is supplied,
370 the test fails only if the condition is true.
374 if not function is None:
381 of = " of " + self.program
384 desc = " [" + self.description + "]"
387 at = _caller(traceback.extract_stack(), skip)
388 sys.stderr.write("NO RESULT for test" + of + desc + sep + at)
392 def pass_test(self = None, condition = 1, function = None):
393 """Causes a test to pass.
395 By default, the pass_test() method reports PASSED for the test
396 and exits with a status of 0. If a condition argument is supplied,
397 the test passes only if the condition is true.
401 if not function is None:
403 sys.stderr.write("PASSED\n")
406 def match_exact(lines = None, matches = None):
409 if not is_List(lines):
410 lines = lines.split("\n")
411 if not is_List(matches):
412 matches = matches.split("\n")
413 if len(lines) != len(matches):
415 for i in range(len(lines)):
416 if lines[i] != matches[i]:
420 def match_re(lines = None, res = None):
423 if not is_List(lines):
424 lines = lines.split("\n")
426 res = res.split("\n")
427 if len(lines) != len(res):
429 for i in range(len(lines)):
430 s = "^" + res[i] + "$"
434 msg = "Regular expression error in %s: %s"
435 raise re.error, msg % (repr(s), e[0])
436 if not expr.search(lines[i]):
440 def match_re_dotall(lines = None, res = None):
443 if not type(lines) is type(""):
444 lines = "\n".join(lines)
445 if not type(res) is type(""):
449 expr = re.compile(s, re.DOTALL)
451 msg = "Regular expression error in %s: %s"
452 raise re.error, msg % (repr(s), e[0])
453 if expr.match(lines):
461 def simple_diff(a, b, fromfile='', tofile='',
462 fromfiledate='', tofiledate='', n=3, lineterm='\n'):
464 A function with the same calling signature as difflib.context_diff
465 (diff -c) and difflib.unified_diff (diff -u) but which prints
466 output like the simple, unadorned 'diff" command.
468 sm = difflib.SequenceMatcher(None, a, b)
470 return x1+1 == x2 and str(x2) or '%s,%s' % (x1+1, x2)
472 for op, a1, a2, b1, b2 in sm.get_opcodes():
474 result.append("%sd%d" % (comma(a1, a2), b1))
475 result.extend(['< ' + l for l in a[a1:a2]])
477 result.append("%da%s" % (a1, comma(b1, b2)))
478 result.extend(['> ' + l for l in b[b1:b2]])
479 elif op == 'replace':
480 result.append("%sc%s" % (comma(a1, a2), comma(b1, b2)))
481 result.extend(['< ' + l for l in a[a1:a2]])
483 result.extend(['> ' + l for l in b[b1:b2]])
486 def diff_re(a, b, fromfile='', tofile='',
487 fromfiledate='', tofiledate='', n=3, lineterm='\n'):
489 A simple "diff" of two sets of lines when the expected lines
490 are regular expressions. This is a really dumb thing that
491 just compares each line in turn, so it doesn't look for
492 chunks of matching lines and the like--but at least it lets
493 you know exactly which line first didn't compare correctl...
496 diff = len(a) - len(b)
502 for aline, bline in zip(a, b):
503 s = "^" + aline + "$"
507 msg = "Regular expression error in %s: %s"
508 raise re.error, msg % (repr(s), e[0])
509 if not expr.search(bline):
510 result.append("%sc%s" % (i+1, i+1))
511 result.append('< ' + repr(a[i]))
513 result.append('> ' + repr(b[i]))
517 if os.name == 'java':
519 python_executable = os.path.join(sys.prefix, 'jython')
523 python_executable = sys.executable
525 if sys.platform == 'win32':
527 default_sleep_seconds = 2
529 def where_is(file, path=None, pathext=None):
531 path = os.environ['PATH']
533 path = path.split(os.pathsep)
535 pathext = os.environ['PATHEXT']
536 if is_String(pathext):
537 pathext = pathext.split(os.pathsep)
539 if ext.lower() == file[-len(ext):].lower():
543 f = os.path.join(dir, file)
546 if os.path.isfile(fext):
552 def where_is(file, path=None, pathext=None):
554 path = os.environ['PATH']
556 path = path.split(os.pathsep)
558 f = os.path.join(dir, file)
559 if os.path.isfile(f):
564 if stat.S_IMODE(st[stat.ST_MODE]) & 0111:
568 default_sleep_seconds = 1
575 # The subprocess module doesn't exist in this version of Python,
576 # so we're going to cobble up something that looks just enough
577 # like its API for our purposes below.
580 subprocess = new.module('subprocess')
582 subprocess.PIPE = 'PIPE'
583 subprocess.STDOUT = 'STDOUT'
584 subprocess.mswindows = (sys.platform == 'win32')
589 except AttributeError:
591 universal_newlines = 1
592 def __init__(self, command, **kw):
593 if sys.platform == 'win32' and command[0] == '"':
594 command = '"' + command + '"'
595 (stdin, stdout, stderr) = os.popen3(' ' + command)
599 def close_output(self):
601 self.resultcode = self.stderr.close()
603 resultcode = self.resultcode
604 if os.WIFEXITED(resultcode):
605 return os.WEXITSTATUS(resultcode)
606 elif os.WIFSIGNALED(resultcode):
607 return os.WTERMSIG(resultcode)
614 except AttributeError:
615 # A cribbed Popen4 class, with some retrofitted code from
616 # the Python 1.5 Popen3 class methods to do certain things
618 class Popen4(popen2.Popen3):
621 def __init__(self, cmd, bufsize=-1):
622 p2cread, p2cwrite = os.pipe()
623 c2pread, c2pwrite = os.pipe()
630 for i in range(3, popen2.MAXFD):
635 os.execvp(cmd[0], cmd)
638 # Shouldn't come here, I guess
641 self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
643 self.fromchild = os.fdopen(c2pread, 'r', bufsize)
644 popen2._active.append(self)
646 popen2.Popen4 = Popen4
648 class Popen3(popen2.Popen3, popen2.Popen4):
649 universal_newlines = 1
650 def __init__(self, command, **kw):
651 if kw.get('stderr') == 'STDOUT':
652 popen2.Popen4.__init__(self, command, 1)
654 popen2.Popen3.__init__(self, command, 1)
655 self.stdin = self.tochild
656 self.stdout = self.fromchild
657 self.stderr = self.childerr
658 def wait(self, *args, **kw):
659 resultcode = popen2.Popen3.wait(self, *args, **kw)
660 if os.WIFEXITED(resultcode):
661 return os.WEXITSTATUS(resultcode)
662 elif os.WIFSIGNALED(resultcode):
663 return os.WTERMSIG(resultcode)
667 subprocess.Popen = Popen3
671 # From Josiah Carlson,
672 # ASPN : Python Cookbook : Module to allow Asynchronous subprocess use on Windows and Posix platforms
673 # http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440554
675 PIPE = subprocess.PIPE
677 if subprocess.mswindows:
678 from win32file import ReadFile, WriteFile
679 from win32pipe import PeekNamedPipe
686 except AttributeError: fcntl.F_GETFL = 3
689 except AttributeError: fcntl.F_SETFL = 4
691 class Popen(subprocess.Popen):
692 def recv(self, maxsize=None):
693 return self._recv('stdout', maxsize)
695 def recv_err(self, maxsize=None):
696 return self._recv('stderr', maxsize)
698 def send_recv(self, input='', maxsize=None):
699 return self.send(input), self.recv(maxsize), self.recv_err(maxsize)
701 def get_conn_maxsize(self, which, maxsize):
706 return getattr(self, which), maxsize
708 def _close(self, which):
709 getattr(self, which).close()
710 setattr(self, which, None)
712 if subprocess.mswindows:
713 def send(self, input):
718 x = msvcrt.get_osfhandle(self.stdin.fileno())
719 (errCode, written) = WriteFile(x, input)
721 return self._close('stdin')
722 except (subprocess.pywintypes.error, Exception), why:
723 if why[0] in (109, errno.ESHUTDOWN):
724 return self._close('stdin')
729 def _recv(self, which, maxsize):
730 conn, maxsize = self.get_conn_maxsize(which, maxsize)
735 x = msvcrt.get_osfhandle(conn.fileno())
736 (read, nAvail, nMessage) = PeekNamedPipe(x, 0)
740 (errCode, read) = ReadFile(x, nAvail, None)
742 return self._close(which)
743 except (subprocess.pywintypes.error, Exception), why:
744 if why[0] in (109, errno.ESHUTDOWN):
745 return self._close(which)
748 #if self.universal_newlines:
749 # read = self._translate_newlines(read)
753 def send(self, input):
757 if not select.select([], [self.stdin], [], 0)[1]:
761 written = os.write(self.stdin.fileno(), input)
763 if why[0] == errno.EPIPE: #broken pipe
764 return self._close('stdin')
769 def _recv(self, which, maxsize):
770 conn, maxsize = self.get_conn_maxsize(which, maxsize)
775 flags = fcntl.fcntl(conn, fcntl.F_GETFL)
780 fcntl.fcntl(conn, fcntl.F_SETFL, flags| os.O_NONBLOCK)
783 if not select.select([conn], [], [], 0)[0]:
786 r = conn.read(maxsize)
788 return self._close(which)
790 #if self.universal_newlines:
791 # r = self._translate_newlines(r)
794 if not conn.closed and not flags is None:
795 fcntl.fcntl(conn, fcntl.F_SETFL, flags)
797 disconnect_message = "Other end disconnected!"
799 def recv_some(p, t=.1, e=1, tr=5, stderr=0):
808 while time.time() < x or r:
812 raise Exception(disconnect_message)
818 time.sleep(max((x-time.time())/tr, 0))
821 # TODO(3.0: rewrite to use memoryview()
822 def send_all(p, data):
826 raise Exception(disconnect_message)
827 data = buffer(data, sent)
839 class TestCmd(object):
843 def __init__(self, description = None,
852 universal_newlines = 1):
853 self._cwd = os.getcwd()
854 self.description_set(description)
855 self.program_set(program)
856 self.interpreter_set(interpreter)
859 verbose = max( 0, int(os.environ.get('TESTCMD_VERBOSE', 0)) )
862 self.verbose_set(verbose)
863 self.combine = combine
864 self.universal_newlines = universal_newlines
865 if not match is None:
866 self.match_function = match
868 self.match_function = match_re
870 self.diff_function = diff
877 self.diff_function = simple_diff
878 #self.diff_function = difflib.context_diff
879 #self.diff_function = difflib.unified_diff
881 self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0}
882 if 'PRESERVE' in os.environ and not os.environ['PRESERVE'] is '':
883 self._preserve['pass_test'] = os.environ['PRESERVE']
884 self._preserve['fail_test'] = os.environ['PRESERVE']
885 self._preserve['no_result'] = os.environ['PRESERVE']
888 self._preserve['pass_test'] = os.environ['PRESERVE_PASS']
892 self._preserve['fail_test'] = os.environ['PRESERVE_FAIL']
896 self._preserve['no_result'] = os.environ['PRESERVE_NO_RESULT']
902 self.condition = 'no_result'
903 self.workdir_set(workdir)
910 return "%x" % id(self)
915 def banner(self, s, width=None):
917 width = self.banner_width
918 return s + self.banner_char * (width - len(s))
920 if os.name == 'posix':
922 def escape(self, arg):
923 "escape shell special characters"
927 arg = arg.replace(slash, slash+slash)
929 arg = arg.replace(c, slash+c)
931 if re_space.search(arg):
932 arg = '"' + arg + '"'
937 # Windows does not allow special characters in file names
938 # anyway, so no need for an escape function, we will just quote
940 def escape(self, arg):
941 if re_space.search(arg):
942 arg = '"' + arg + '"'
945 def canonicalize(self, path):
947 path = os.path.join(*tuple(path))
948 if not os.path.isabs(path):
949 path = os.path.join(self.workdir, path)
952 def chmod(self, path, mode):
953 """Changes permissions on the specified file or directory
955 path = self.canonicalize(path)
958 def cleanup(self, condition = None):
959 """Removes any temporary working directories for the specified
960 TestCmd environment. If the environment variable PRESERVE was
961 set when the TestCmd environment was created, temporary working
962 directories are not removed. If any of the environment variables
963 PRESERVE_PASS, PRESERVE_FAIL, or PRESERVE_NO_RESULT were set
964 when the TestCmd environment was created, then temporary working
965 directories are not removed if the test passed, failed, or had
966 no result, respectively. Temporary working directories are also
967 preserved for conditions specified via the preserve method.
969 Typically, this method is not called directly, but is used when
970 the script exits to clean up temporary working directories as
971 appropriate for the exit status.
973 if not self._dirlist:
977 if condition is None:
978 condition = self.condition
979 if self._preserve[condition]:
980 for dir in self._dirlist:
981 print "Preserved directory", dir
983 list = self._dirlist[:]
986 self.writable(dir, 1)
987 shutil.rmtree(dir, ignore_errors = 1)
992 _Cleanup.remove(self)
993 except (AttributeError, ValueError):
996 def command_args(self, program = None,
1000 if type(program) == type('') and not os.path.isabs(program):
1001 program = os.path.join(self._cwd, program)
1003 program = self.program
1005 interpreter = self.interpreter
1006 if not type(program) in [type([]), type(())]:
1010 if not type(interpreter) in [type([]), type(())]:
1011 interpreter = [interpreter]
1012 cmd = list(interpreter) + cmd
1014 if type(arguments) == type(''):
1015 arguments = arguments.split()
1016 cmd.extend(arguments)
1019 def description_set(self, description):
1020 """Set the description of the functionality being tested.
1022 self.description = description
1027 def diff(self, a, b, name, *args, **kw):
1028 print self.banner('Expected %s' % name)
1030 print self.banner('Actual %s' % name)
1033 def diff(self, a, b, name, *args, **kw):
1034 print self.banner(name)
1035 args = (a.splitlines(), b.splitlines()) + args
1036 lines = self.diff_function(*args, **kw)
1040 def fail_test(self, condition = 1, function = None, skip = 0):
1041 """Cause the test to fail.
1045 self.condition = 'fail_test'
1046 fail_test(self = self,
1047 condition = condition,
1048 function = function,
1051 def interpreter_set(self, interpreter):
1052 """Set the program to be used to interpret the program
1053 under test as a script.
1055 self.interpreter = interpreter
1057 def match(self, lines, matches):
1058 """Compare actual and expected file contents.
1060 return self.match_function(lines, matches)
1062 def match_exact(self, lines, matches):
1063 """Compare actual and expected file contents.
1065 return match_exact(lines, matches)
1067 def match_re(self, lines, res):
1068 """Compare actual and expected file contents.
1070 return match_re(lines, res)
1072 def match_re_dotall(self, lines, res):
1073 """Compare actual and expected file contents.
1075 return match_re_dotall(lines, res)
1077 def no_result(self, condition = 1, function = None, skip = 0):
1078 """Report that the test could not be run.
1082 self.condition = 'no_result'
1083 no_result(self = self,
1084 condition = condition,
1085 function = function,
1088 def pass_test(self, condition = 1, function = None):
1089 """Cause the test to pass.
1093 self.condition = 'pass_test'
1094 pass_test(self = self, condition = condition, function = function)
1096 def preserve(self, *conditions):
1097 """Arrange for the temporary working directories for the
1098 specified TestCmd environment to be preserved for one or more
1099 conditions. If no conditions are specified, arranges for
1100 the temporary working directories to be preserved for all
1103 if conditions is ():
1104 conditions = ('pass_test', 'fail_test', 'no_result')
1105 for cond in conditions:
1106 self._preserve[cond] = 1
1108 def program_set(self, program):
1109 """Set the executable program or script to be tested.
1111 if program and not os.path.isabs(program):
1112 program = os.path.join(self._cwd, program)
1113 self.program = program
1115 def read(self, file, mode = 'rb'):
1116 """Reads and returns the contents of the specified file name.
1117 The file name may be a list, in which case the elements are
1118 concatenated with the os.path.join() method. The file is
1119 assumed to be under the temporary working directory unless it
1120 is an absolute path name. The I/O mode for the file may
1121 be specified; it must begin with an 'r'. The default is
1124 file = self.canonicalize(file)
1126 raise ValueError, "mode must begin with 'r'"
1127 return open(file, mode).read()
1129 def rmdir(self, dir):
1130 """Removes the specified dir name.
1131 The dir name may be a list, in which case the elements are
1132 concatenated with the os.path.join() method. The dir is
1133 assumed to be under the temporary working directory unless it
1134 is an absolute path name.
1135 The dir must be empty.
1137 dir = self.canonicalize(dir)
1140 def start(self, program = None,
1143 universal_newlines = None,
1146 Starts a program or script for the test environment.
1148 The specified program will have the original directory
1149 prepended unless it is enclosed in a [list].
1151 cmd = self.command_args(program, interpreter, arguments)
1152 cmd_string = ' '.join(map(self.escape, cmd))
1154 sys.stderr.write(cmd_string + "\n")
1155 if universal_newlines is None:
1156 universal_newlines = self.universal_newlines
1158 # On Windows, if we make stdin a pipe when we plan to send
1159 # no input, and the test program exits before
1160 # Popen calls msvcrt.open_osfhandle, that call will fail.
1161 # So don't use a pipe for stdin if we don't need one.
1162 stdin = kw.get('stdin', None)
1163 if stdin is not None:
1164 stdin = subprocess.PIPE
1166 combine = kw.get('combine', self.combine)
1168 stderr_value = subprocess.STDOUT
1170 stderr_value = subprocess.PIPE
1174 stdout=subprocess.PIPE,
1175 stderr=stderr_value,
1176 universal_newlines=universal_newlines)
1178 def finish(self, popen, **kw):
1180 Finishes and waits for the process being run under control of
1181 the specified popen argument, recording the exit status,
1182 standard output and error output.
1185 self.status = popen.wait()
1188 self._stdout.append(popen.stdout.read())
1190 stderr = popen.stderr.read()
1193 self._stderr.append(stderr)
1195 def run(self, program = None,
1200 universal_newlines = None):
1201 """Runs a test of the program or script for the test
1202 environment. Standard output and error output are saved for
1203 future retrieval via the stdout() and stderr() methods.
1205 The specified program will have the original directory
1206 prepended unless it is enclosed in a [list].
1209 oldcwd = os.getcwd()
1210 if not os.path.isabs(chdir):
1211 chdir = os.path.join(self.workpath(chdir))
1213 sys.stderr.write("chdir(" + chdir + ")\n")
1215 p = self.start(program,
1225 p.stdin.write(stdin)
1228 out = p.stdout.read()
1229 if p.stderr is None:
1232 err = p.stderr.read()
1234 close_output = p.close_output
1235 except AttributeError:
1237 if not p.stderr is None:
1242 self._stdout.append(out)
1243 self._stderr.append(err)
1245 self.status = p.wait()
1251 if self.verbose >= 2:
1252 write = sys.stdout.write
1253 write('============ STATUS: %d\n' % self.status)
1255 if out or self.verbose >= 3:
1256 write('============ BEGIN STDOUT (len=%d):\n' % len(out))
1258 write('============ END STDOUT\n')
1260 if err or self.verbose >= 3:
1261 write('============ BEGIN STDERR (len=%d)\n' % len(err))
1263 write('============ END STDERR\n')
1265 def sleep(self, seconds = default_sleep_seconds):
1266 """Sleeps at least the specified number of seconds. If no
1267 number is specified, sleeps at least the minimum number of
1268 seconds necessary to advance file time stamps on the current
1269 system. Sleeping more seconds is all right.
1273 def stderr(self, run = None):
1274 """Returns the error output from the specified run number.
1275 If there is no specified run number, then returns the error
1276 output of the last run. If the run number is less than zero,
1277 then returns the error output from that many runs back from the
1281 run = len(self._stderr)
1283 run = len(self._stderr) + run
1285 return self._stderr[run]
1287 def stdout(self, run = None):
1288 """Returns the standard output from the specified run number.
1289 If there is no specified run number, then returns the standard
1290 output of the last run. If the run number is less than zero,
1291 then returns the standard output from that many runs back from
1295 run = len(self._stdout)
1297 run = len(self._stdout) + run
1299 return self._stdout[run]
1301 def subdir(self, *subdirs):
1302 """Create new subdirectories under the temporary working
1303 directory, one for each argument. An argument may be a list,
1304 in which case the list elements are concatenated using the
1305 os.path.join() method. Subdirectories multiple levels deep
1306 must be created using a separate argument for each level:
1308 test.subdir('sub', ['sub', 'dir'], ['sub', 'dir', 'ectory'])
1310 Returns the number of subdirectories actually created.
1317 sub = os.path.join(*tuple(sub))
1318 new = os.path.join(self.workdir, sub)
1327 def symlink(self, target, link):
1328 """Creates a symlink to the specified target.
1329 The link name may be a list, in which case the elements are
1330 concatenated with the os.path.join() method. The link is
1331 assumed to be under the temporary working directory unless it
1332 is an absolute path name. The target is *not* assumed to be
1333 under the temporary working directory.
1335 link = self.canonicalize(link)
1336 os.symlink(target, link)
1338 def tempdir(self, path=None):
1339 """Creates a temporary directory.
1340 A unique directory name is generated if no path name is specified.
1341 The directory is created, and will be removed when the TestCmd
1342 object is destroyed.
1346 path = tempfile.mktemp(prefix=tempfile.template)
1348 path = tempfile.mktemp()
1351 # Symlinks in the path will report things
1352 # differently from os.getcwd(), so chdir there
1353 # and back to fetch the canonical path.
1361 # Uppercase the drive letter since the case of drive
1362 # letters is pretty much random on win32:
1363 drive,rest = os.path.splitdrive(path)
1365 path = drive.upper() + rest
1368 self._dirlist.append(path)
1371 _Cleanup.index(self)
1373 _Cleanup.append(self)
1377 def touch(self, path, mtime=None):
1378 """Updates the modification time on the specified file or
1379 directory path name. The default is to update to the
1380 current time if no explicit modification time is specified.
1382 path = self.canonicalize(path)
1383 atime = os.path.getatime(path)
1386 os.utime(path, (atime, mtime))
1388 def unlink(self, file):
1389 """Unlinks the specified file name.
1390 The file name may be a list, in which case the elements are
1391 concatenated with the os.path.join() method. The file is
1392 assumed to be under the temporary working directory unless it
1393 is an absolute path name.
1395 file = self.canonicalize(file)
1398 def verbose_set(self, verbose):
1399 """Set the verbose level.
1401 self.verbose = verbose
1403 def where_is(self, file, path=None, pathext=None):
1404 """Find an executable file.
1407 file = os.path.join(*tuple(file))
1408 if not os.path.isabs(file):
1409 file = where_is(file, path, pathext)
1412 def workdir_set(self, path):
1413 """Creates a temporary working directory with the specified
1414 path name. If the path is a null string (''), a unique
1415 directory name is created.
1420 path = self.tempdir(path)
1423 def workpath(self, *args):
1424 """Returns the absolute path name to a subdirectory or file
1425 within the current temporary working directory. Concatenates
1426 the temporary working directory name with the specified
1427 arguments using the os.path.join() method.
1429 return os.path.join(self.workdir, *tuple(args))
1431 def readable(self, top, read=1):
1432 """Make the specified directory tree readable (read == 1)
1433 or not (read == None).
1435 This method has no effect on Windows systems, which use a
1436 completely different mechanism to control file readability.
1439 if sys.platform == 'win32':
1443 def do_chmod(fname):
1444 try: st = os.stat(fname)
1445 except OSError: pass
1446 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IREAD))
1448 def do_chmod(fname):
1449 try: st = os.stat(fname)
1450 except OSError: pass
1451 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IREAD))
1453 if os.path.isfile(top):
1454 # If it's a file, that's easy, just chmod it.
1457 # It's a directory and we're trying to turn on read
1458 # permission, so it's also pretty easy, just chmod the
1459 # directory and then chmod every entry on our walk down the
1460 # tree. Because os.path.walk() is top-down, we'll enable
1461 # read permission on any directories that have it disabled
1462 # before os.path.walk() tries to list their contents.
1465 def chmod_entries(arg, dirname, names, do_chmod=do_chmod):
1467 do_chmod(os.path.join(dirname, n))
1469 os.path.walk(top, chmod_entries, None)
1471 # It's a directory and we're trying to turn off read
1472 # permission, which means we have to chmod the directoreis
1473 # in the tree bottom-up, lest disabling read permission from
1474 # the top down get in the way of being able to get at lower
1475 # parts of the tree. But os.path.walk() visits things top
1476 # down, so we just use an object to collect a list of all
1477 # of the entries in the tree, reverse the list, and then
1478 # chmod the reversed (bottom-up) list.
1479 col = Collector(top)
1480 os.path.walk(top, col, None)
1481 col.entries.reverse()
1482 for d in col.entries: do_chmod(d)
1484 def writable(self, top, write=1):
1485 """Make the specified directory tree writable (write == 1)
1486 or not (write == None).
1489 if sys.platform == 'win32':
1492 def do_chmod(fname):
1493 try: os.chmod(fname, stat.S_IWRITE)
1494 except OSError: pass
1496 def do_chmod(fname):
1497 try: os.chmod(fname, stat.S_IREAD)
1498 except OSError: pass
1503 def do_chmod(fname):
1504 try: st = os.stat(fname)
1505 except OSError: pass
1506 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0200))
1508 def do_chmod(fname):
1509 try: st = os.stat(fname)
1510 except OSError: pass
1511 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0200))
1513 if os.path.isfile(top):
1516 col = Collector(top)
1517 os.path.walk(top, col, None)
1518 for d in col.entries: do_chmod(d)
1520 def executable(self, top, execute=1):
1521 """Make the specified directory tree executable (execute == 1)
1522 or not (execute == None).
1524 This method has no effect on Windows systems, which use a
1525 completely different mechanism to control file executability.
1528 if sys.platform == 'win32':
1532 def do_chmod(fname):
1533 try: st = os.stat(fname)
1534 except OSError: pass
1535 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IEXEC))
1537 def do_chmod(fname):
1538 try: st = os.stat(fname)
1539 except OSError: pass
1540 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IEXEC))
1542 if os.path.isfile(top):
1543 # If it's a file, that's easy, just chmod it.
1546 # It's a directory and we're trying to turn on execute
1547 # permission, so it's also pretty easy, just chmod the
1548 # directory and then chmod every entry on our walk down the
1549 # tree. Because os.path.walk() is top-down, we'll enable
1550 # execute permission on any directories that have it disabled
1551 # before os.path.walk() tries to list their contents.
1554 def chmod_entries(arg, dirname, names, do_chmod=do_chmod):
1556 do_chmod(os.path.join(dirname, n))
1558 os.path.walk(top, chmod_entries, None)
1560 # It's a directory and we're trying to turn off execute
1561 # permission, which means we have to chmod the directories
1562 # in the tree bottom-up, lest disabling execute permission from
1563 # the top down get in the way of being able to get at lower
1564 # parts of the tree. But os.path.walk() visits things top
1565 # down, so we just use an object to collect a list of all
1566 # of the entries in the tree, reverse the list, and then
1567 # chmod the reversed (bottom-up) list.
1568 col = Collector(top)
1569 os.path.walk(top, col, None)
1570 col.entries.reverse()
1571 for d in col.entries: do_chmod(d)
1573 def write(self, file, content, mode = 'wb'):
1574 """Writes the specified content text (second argument) to the
1575 specified file name (first argument). The file name may be
1576 a list, in which case the elements are concatenated with the
1577 os.path.join() method. The file is created under the temporary
1578 working directory. Any subdirectories in the path must already
1579 exist. The I/O mode for the file may be specified; it must
1580 begin with a 'w'. The default is 'wb' (binary write).
1582 file = self.canonicalize(file)
1584 raise ValueError, "mode must begin with 'w'"
1585 open(file, mode).write(content)
1589 # indent-tabs-mode:nil
1591 # vim: set expandtab tabstop=4 shiftwidth=4: