2 TestCmd.py: a testing framework for commands and scripts.
4 The TestCmd module provides a framework for portable automated testing
5 of executable commands and scripts (in any language, not just Python),
6 especially commands and scripts that require file system interaction.
8 In addition to running tests and evaluating conditions, the TestCmd
9 module manages and cleans up one or more temporary workspace
10 directories, and provides methods for creating files and directories in
11 those workspace directories from in-line data, here-documents), allowing
12 tests to be completely self-contained.
14 A TestCmd environment object is created via the usual invocation:
17 test = TestCmd.TestCmd()
19 There are a bunch of keyword arguments available at instantiation:
21 test = TestCmd.TestCmd(description = 'string',
22 program = 'program_or_script_to_test',
23 interpreter = 'script_interpreter',
27 match = default_match_function,
30 There are a bunch of methods that let you do different things:
34 test.description_set('string')
36 test.program_set('program_or_script_to_test')
38 test.interpreter_set('script_interpreter')
39 test.interpreter_set(['script_interpreter', 'arg'])
41 test.workdir_set('prefix')
45 test.workpath('subdir', 'file')
47 test.subdir('subdir', ...)
49 test.rmdir('subdir', ...)
51 test.write('file', "contents\n")
52 test.write(['subdir', 'file'], "contents\n")
55 test.read(['subdir', 'file'])
56 test.read('file', mode)
57 test.read(['subdir', 'file'], mode)
59 test.writable('dir', 1)
60 test.writable('dir', None)
62 test.preserve(condition, ...)
64 test.cleanup(condition)
66 test.command_args(program = 'program_or_script_to_run',
67 interpreter = 'script_interpreter',
68 arguments = 'arguments to pass to program')
70 test.run(program = 'program_or_script_to_run',
71 interpreter = 'script_interpreter',
72 arguments = 'arguments to pass to program',
73 chdir = 'directory_to_chdir_to',
74 stdin = 'input to feed to the program\n')
75 universal_newlines = True)
77 p = test.start(program = 'program_or_script_to_run',
78 interpreter = 'script_interpreter',
79 arguments = 'arguments to pass to program',
80 universal_newlines = None)
85 test.pass_test(condition)
86 test.pass_test(condition, function)
89 test.fail_test(condition)
90 test.fail_test(condition, function)
91 test.fail_test(condition, function, skip)
94 test.no_result(condition)
95 test.no_result(condition, function)
96 test.no_result(condition, function, skip)
104 test.symlink(target, link)
106 test.match(actual, expected)
108 test.match_exact("actual 1\nactual 2\n", "expected 1\nexpected 2\n")
109 test.match_exact(["actual 1\n", "actual 2\n"],
110 ["expected 1\n", "expected 2\n"])
112 test.match_re("actual 1\nactual 2\n", regex_string)
113 test.match_re(["actual 1\n", "actual 2\n"], list_of_regexes)
115 test.match_re_dotall("actual 1\nactual 2\n", regex_string)
116 test.match_re_dotall(["actual 1\n", "actual 2\n"], list_of_regexes)
119 test.tempdir('temporary-directory')
125 test.where_is('foo', 'PATH1:PATH2')
126 test.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
129 test.unlink('subdir', 'file')
131 The TestCmd module provides pass_test(), fail_test(), and no_result()
132 unbound functions that report test results for use with the Aegis change
133 management system. These methods terminate the test immediately,
134 reporting PASSED, FAILED, or NO RESULT respectively, and exiting with
135 status 0 (success), 1 or 2 respectively. This allows for a distinction
136 between an actual failed test and a test that could not be properly
137 evaluated because of an external condition (such as a full file system
138 or incorrect permissions).
143 TestCmd.pass_test(condition)
144 TestCmd.pass_test(condition, function)
147 TestCmd.fail_test(condition)
148 TestCmd.fail_test(condition, function)
149 TestCmd.fail_test(condition, function, skip)
152 TestCmd.no_result(condition)
153 TestCmd.no_result(condition, function)
154 TestCmd.no_result(condition, function, skip)
156 The TestCmd module also provides unbound functions that handle matching
157 in the same way as the match_*() methods described above.
161 test = TestCmd.TestCmd(match = TestCmd.match_exact)
163 test = TestCmd.TestCmd(match = TestCmd.match_re)
165 test = TestCmd.TestCmd(match = TestCmd.match_re_dotall)
167 Lastly, the where_is() method also exists in an unbound function
172 TestCmd.where_is('foo')
173 TestCmd.where_is('foo', 'PATH1:PATH2')
174 TestCmd.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
177 # Copyright 2000, 2001, 2002, 2003, 2004 Steven Knight
178 # This module is free software, and you may redistribute it and/or modify
179 # it under the same terms as Python itself, so long as this copyright message
180 # and disclaimer are retained in their original form.
182 # IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
183 # SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
184 # THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
187 # THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
188 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
189 # PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
190 # AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
191 # SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
193 __author__ = "Steven Knight <knight at baldmt dot com>"
194 __revision__ = "TestCmd.py 0.35.D001 2009/02/08 07:10:39 knight"
224 return type(e) is types.ListType \
225 or isinstance(e, UserList.UserList)
228 from UserString import UserString
233 if hasattr(types, 'UnicodeType'):
235 return type(e) is types.StringType \
236 or type(e) is types.UnicodeType \
237 or isinstance(e, UserString)
240 return type(e) is types.StringType or isinstance(e, UserString)
242 tempfile.template = 'testcmd.'
243 if os.name in ('posix', 'nt'):
244 tempfile.template = 'testcmd.' + str(os.getpid()) + '.'
246 tempfile.template = 'testcmd.'
248 re_space = re.compile('\s')
252 _chain_to_exitfunc = None
256 cleanlist = filter(None, _Cleanup)
259 for test in cleanlist:
261 if _chain_to_exitfunc:
267 # TODO(1.5): atexit requires python 2.0, so chain sys.exitfunc
269 _chain_to_exitfunc = sys.exitfunc
270 except AttributeError:
272 sys.exitfunc = _clean
274 atexit.register(_clean)
281 for i in xrange(min(map(len, lists))):
282 result.append(tuple(map(lambda l, i=i: l[i], lists)))
286 def __init__(self, top):
288 def __call__(self, arg, dirname, names):
289 pathjoin = lambda n, d=dirname: os.path.join(d, n)
290 self.entries.extend(map(pathjoin, names))
292 def _caller(tblist, skip):
295 for file, line, name, text in tblist:
296 if file[-10:] == "TestCmd.py":
298 arr = [(file, line, name, text)] + arr
300 for file, line, name, text in arr[skip:]:
301 if name in ("?", "<module>"):
304 name = " (" + name + ")"
305 string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name))
309 def fail_test(self = None, condition = 1, function = None, skip = 0):
310 """Cause the test to fail.
312 By default, the fail_test() method reports that the test FAILED
313 and exits with a status of 1. If a condition argument is supplied,
314 the test fails only if the condition is true.
318 if not function is None:
325 of = " of " + self.program
328 desc = " [" + self.description + "]"
331 at = _caller(traceback.extract_stack(), skip)
332 sys.stderr.write("FAILED test" + of + desc + sep + at)
336 def no_result(self = None, condition = 1, function = None, skip = 0):
337 """Causes a test to exit with no valid result.
339 By default, the no_result() method reports NO RESULT for the test
340 and exits with a status of 2. If a condition argument is supplied,
341 the test fails only if the condition is true.
345 if not function is None:
352 of = " of " + self.program
355 desc = " [" + self.description + "]"
358 at = _caller(traceback.extract_stack(), skip)
359 sys.stderr.write("NO RESULT for test" + of + desc + sep + at)
363 def pass_test(self = None, condition = 1, function = None):
364 """Causes a test to pass.
366 By default, the pass_test() method reports PASSED for the test
367 and exits with a status of 0. If a condition argument is supplied,
368 the test passes only if the condition is true.
372 if not function is None:
374 sys.stderr.write("PASSED\n")
377 def match_exact(lines = None, matches = None):
380 if not is_List(lines):
381 lines = string.split(lines, "\n")
382 if not is_List(matches):
383 matches = string.split(matches, "\n")
384 if len(lines) != len(matches):
386 for i in range(len(lines)):
387 if lines[i] != matches[i]:
391 def match_re(lines = None, res = None):
394 if not is_List(lines):
395 lines = string.split(lines, "\n")
397 res = string.split(res, "\n")
398 if len(lines) != len(res):
400 for i in range(len(lines)):
401 s = "^" + res[i] + "$"
405 msg = "Regular expression error in %s: %s"
406 raise re.error, msg % (repr(s), e[0])
407 if not expr.search(lines[i]):
411 def match_re_dotall(lines = None, res = None):
414 if not type(lines) is type(""):
415 lines = string.join(lines, "\n")
416 if not type(res) is type(""):
417 res = string.join(res, "\n")
420 expr = re.compile(s, re.DOTALL)
422 msg = "Regular expression error in %s: %s"
423 raise re.error, msg % (repr(s), e[0])
424 if expr.match(lines):
427 def diff_re(a, b, fromfile='', tofile='',
428 fromfiledate='', tofiledate='', n=3, lineterm='\n'):
430 A simple "diff" of two sets of lines when the expected lines
431 are regular expressions. This is a really dumb thing that
432 just compares each line in turn, so it doesn't look for
433 chunks of matching lines and the like--but at least it lets
434 you know exactly which line first didn't compare correctl...
437 diff = len(a) - len(b)
443 for aline, bline in zip(a, b):
444 s = "^" + aline + "$"
448 msg = "Regular expression error in %s: %s"
449 raise re.error, msg % (repr(s), e[0])
450 if not expr.search(bline):
451 result.append("%sc%s" % (i+1, i+1))
452 result.append('< ' + repr(a[i]))
454 result.append('> ' + repr(b[i]))
458 if os.name == 'java':
460 python_executable = os.path.join(sys.prefix, 'jython')
464 python_executable = sys.executable
466 if sys.platform == 'win32':
468 default_sleep_seconds = 2
470 def where_is(file, path=None, pathext=None):
472 path = os.environ['PATH']
474 path = string.split(path, os.pathsep)
476 pathext = os.environ['PATHEXT']
477 if is_String(pathext):
478 pathext = string.split(pathext, os.pathsep)
480 if string.lower(ext) == string.lower(file[-len(ext):]):
484 f = os.path.join(dir, file)
487 if os.path.isfile(fext):
493 def where_is(file, path=None, pathext=None):
495 path = os.environ['PATH']
497 path = string.split(path, os.pathsep)
499 f = os.path.join(dir, file)
500 if os.path.isfile(f):
505 if stat.S_IMODE(st[stat.ST_MODE]) & 0111:
509 default_sleep_seconds = 1
516 # The subprocess module doesn't exist in this version of Python,
517 # so we're going to cobble up something that looks just enough
518 # like its API for our purposes below.
521 subprocess = new.module('subprocess')
523 subprocess.PIPE = 'PIPE'
524 subprocess.STDOUT = 'STDOUT'
525 subprocess.mswindows = (sys.platform == 'win32')
530 except AttributeError:
532 universal_newlines = 1
533 def __init__(self, command, **kw):
534 if sys.platform == 'win32' and command[0] == '"':
535 command = '"' + command + '"'
536 (stdin, stdout, stderr) = os.popen3(' ' + command)
540 def close_output(self):
542 self.resultcode = self.stderr.close()
544 resultcode = self.resultcode
545 if os.WIFEXITED(resultcode):
546 return os.WEXITSTATUS(resultcode)
547 elif os.WIFSIGNALED(resultcode):
548 return os.WTERMSIG(resultcode)
555 except AttributeError:
556 # A cribbed Popen4 class, with some retrofitted code from
557 # the Python 1.5 Popen3 class methods to do certain things
559 class Popen4(popen2.Popen3):
562 def __init__(self, cmd, bufsize=-1):
563 p2cread, p2cwrite = os.pipe()
564 c2pread, c2pwrite = os.pipe()
571 for i in range(3, popen2.MAXFD):
576 os.execvp(cmd[0], cmd)
579 # Shouldn't come here, I guess
582 self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
584 self.fromchild = os.fdopen(c2pread, 'r', bufsize)
585 popen2._active.append(self)
587 popen2.Popen4 = Popen4
589 class Popen3(popen2.Popen3, popen2.Popen4):
590 universal_newlines = 1
591 def __init__(self, command, **kw):
592 if kw.get('stderr') == 'STDOUT':
593 apply(popen2.Popen4.__init__, (self, command, 1))
595 apply(popen2.Popen3.__init__, (self, command, 1))
596 self.stdin = self.tochild
597 self.stdout = self.fromchild
598 self.stderr = self.childerr
599 def wait(self, *args, **kw):
600 resultcode = apply(popen2.Popen3.wait, (self,)+args, kw)
601 if os.WIFEXITED(resultcode):
602 return os.WEXITSTATUS(resultcode)
603 elif os.WIFSIGNALED(resultcode):
604 return os.WTERMSIG(resultcode)
608 subprocess.Popen = Popen3
612 # From Josiah Carlson,
613 # ASPN : Python Cookbook : Module to allow Asynchronous subprocess use on Windows and Posix platforms
614 # http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440554
616 PIPE = subprocess.PIPE
618 if subprocess.mswindows:
619 from win32file import ReadFile, WriteFile
620 from win32pipe import PeekNamedPipe
627 except AttributeError: fcntl.F_GETFL = 3
630 except AttributeError: fcntl.F_SETFL = 4
632 class Popen(subprocess.Popen):
633 def recv(self, maxsize=None):
634 return self._recv('stdout', maxsize)
636 def recv_err(self, maxsize=None):
637 return self._recv('stderr', maxsize)
639 def send_recv(self, input='', maxsize=None):
640 return self.send(input), self.recv(maxsize), self.recv_err(maxsize)
642 def get_conn_maxsize(self, which, maxsize):
647 return getattr(self, which), maxsize
649 def _close(self, which):
650 getattr(self, which).close()
651 setattr(self, which, None)
653 if subprocess.mswindows:
654 def send(self, input):
659 x = msvcrt.get_osfhandle(self.stdin.fileno())
660 (errCode, written) = WriteFile(x, input)
662 return self._close('stdin')
663 except (subprocess.pywintypes.error, Exception), why:
664 if why[0] in (109, errno.ESHUTDOWN):
665 return self._close('stdin')
670 def _recv(self, which, maxsize):
671 conn, maxsize = self.get_conn_maxsize(which, maxsize)
676 x = msvcrt.get_osfhandle(conn.fileno())
677 (read, nAvail, nMessage) = PeekNamedPipe(x, 0)
681 (errCode, read) = ReadFile(x, nAvail, None)
683 return self._close(which)
684 except (subprocess.pywintypes.error, Exception), why:
685 if why[0] in (109, errno.ESHUTDOWN):
686 return self._close(which)
689 #if self.universal_newlines:
690 # read = self._translate_newlines(read)
694 def send(self, input):
698 if not select.select([], [self.stdin], [], 0)[1]:
702 written = os.write(self.stdin.fileno(), input)
704 if why[0] == errno.EPIPE: #broken pipe
705 return self._close('stdin')
710 def _recv(self, which, maxsize):
711 conn, maxsize = self.get_conn_maxsize(which, maxsize)
716 flags = fcntl.fcntl(conn, fcntl.F_GETFL)
721 fcntl.fcntl(conn, fcntl.F_SETFL, flags| os.O_NONBLOCK)
724 if not select.select([conn], [], [], 0)[0]:
727 r = conn.read(maxsize)
729 return self._close(which)
731 #if self.universal_newlines:
732 # r = self._translate_newlines(r)
735 if not conn.closed and not flags is None:
736 fcntl.fcntl(conn, fcntl.F_SETFL, flags)
738 disconnect_message = "Other end disconnected!"
740 def recv_some(p, t=.1, e=1, tr=5, stderr=0):
749 while time.time() < x or r:
753 raise Exception(disconnect_message)
759 time.sleep(max((x-time.time())/tr, 0))
762 def send_all(p, data):
766 raise Exception(disconnect_message)
767 data = buffer(data, sent)
775 def __init__(self, description = None,
783 universal_newlines = 1):
784 self._cwd = os.getcwd()
785 self.description_set(description)
786 self.program_set(program)
787 self.interpreter_set(interpreter)
790 verbose = max( 0, int(os.environ.get('TESTCMD_VERBOSE', 0)) )
793 self.verbose_set(verbose)
794 self.combine = combine
795 self.universal_newlines = universal_newlines
796 if not match is None:
797 self.match_func = match
799 self.match_func = match_re
801 self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0}
802 if os.environ.has_key('PRESERVE') and not os.environ['PRESERVE'] is '':
803 self._preserve['pass_test'] = os.environ['PRESERVE']
804 self._preserve['fail_test'] = os.environ['PRESERVE']
805 self._preserve['no_result'] = os.environ['PRESERVE']
808 self._preserve['pass_test'] = os.environ['PRESERVE_PASS']
812 self._preserve['fail_test'] = os.environ['PRESERVE_FAIL']
816 self._preserve['no_result'] = os.environ['PRESERVE_NO_RESULT']
822 self.condition = 'no_result'
823 self.workdir_set(workdir)
830 return "%x" % id(self)
832 if os.name == 'posix':
834 def escape(self, arg):
835 "escape shell special characters"
839 arg = string.replace(arg, slash, slash+slash)
841 arg = string.replace(arg, c, slash+c)
843 if re_space.search(arg):
844 arg = '"' + arg + '"'
849 # Windows does not allow special characters in file names
850 # anyway, so no need for an escape function, we will just quote
852 def escape(self, arg):
853 if re_space.search(arg):
854 arg = '"' + arg + '"'
857 def canonicalize(self, path):
859 path = apply(os.path.join, tuple(path))
860 if not os.path.isabs(path):
861 path = os.path.join(self.workdir, path)
864 def chmod(self, path, mode):
865 """Changes permissions on the specified file or directory
867 path = self.canonicalize(path)
870 def cleanup(self, condition = None):
871 """Removes any temporary working directories for the specified
872 TestCmd environment. If the environment variable PRESERVE was
873 set when the TestCmd environment was created, temporary working
874 directories are not removed. If any of the environment variables
875 PRESERVE_PASS, PRESERVE_FAIL, or PRESERVE_NO_RESULT were set
876 when the TestCmd environment was created, then temporary working
877 directories are not removed if the test passed, failed, or had
878 no result, respectively. Temporary working directories are also
879 preserved for conditions specified via the preserve method.
881 Typically, this method is not called directly, but is used when
882 the script exits to clean up temporary working directories as
883 appropriate for the exit status.
885 if not self._dirlist:
889 if condition is None:
890 condition = self.condition
891 if self._preserve[condition]:
892 for dir in self._dirlist:
893 print "Preserved directory", dir
895 list = self._dirlist[:]
898 self.writable(dir, 1)
899 shutil.rmtree(dir, ignore_errors = 1)
904 _Cleanup.remove(self)
905 except (AttributeError, ValueError):
908 def command_args(self, program = None,
912 if type(program) == type('') and not os.path.isabs(program):
913 program = os.path.join(self._cwd, program)
915 program = self.program
917 interpreter = self.interpreter
918 if not type(program) in [type([]), type(())]:
922 if not type(interpreter) in [type([]), type(())]:
923 interpreter = [interpreter]
924 cmd = list(interpreter) + cmd
926 if type(arguments) == type(''):
927 arguments = string.split(arguments)
928 cmd.extend(arguments)
931 def description_set(self, description):
932 """Set the description of the functionality being tested.
934 self.description = description
937 # """Diff two arrays.
940 def fail_test(self, condition = 1, function = None, skip = 0):
941 """Cause the test to fail.
945 self.condition = 'fail_test'
946 fail_test(self = self,
947 condition = condition,
951 def interpreter_set(self, interpreter):
952 """Set the program to be used to interpret the program
953 under test as a script.
955 self.interpreter = interpreter
957 def match(self, lines, matches):
958 """Compare actual and expected file contents.
960 return self.match_func(lines, matches)
962 def match_exact(self, lines, matches):
963 """Compare actual and expected file contents.
965 return match_exact(lines, matches)
967 def match_re(self, lines, res):
968 """Compare actual and expected file contents.
970 return match_re(lines, res)
972 def match_re_dotall(self, lines, res):
973 """Compare actual and expected file contents.
975 return match_re_dotall(lines, res)
977 def no_result(self, condition = 1, function = None, skip = 0):
978 """Report that the test could not be run.
982 self.condition = 'no_result'
983 no_result(self = self,
984 condition = condition,
988 def pass_test(self, condition = 1, function = None):
989 """Cause the test to pass.
993 self.condition = 'pass_test'
994 pass_test(self = self, condition = condition, function = function)
996 def preserve(self, *conditions):
997 """Arrange for the temporary working directories for the
998 specified TestCmd environment to be preserved for one or more
999 conditions. If no conditions are specified, arranges for
1000 the temporary working directories to be preserved for all
1003 if conditions is ():
1004 conditions = ('pass_test', 'fail_test', 'no_result')
1005 for cond in conditions:
1006 self._preserve[cond] = 1
1008 def program_set(self, program):
1009 """Set the executable program or script to be tested.
1011 if program and not os.path.isabs(program):
1012 program = os.path.join(self._cwd, program)
1013 self.program = program
1015 def read(self, file, mode = 'rb'):
1016 """Reads and returns the contents of the specified file name.
1017 The file name may be a list, in which case the elements are
1018 concatenated with the os.path.join() method. The file is
1019 assumed to be under the temporary working directory unless it
1020 is an absolute path name. The I/O mode for the file may
1021 be specified; it must begin with an 'r'. The default is
1024 file = self.canonicalize(file)
1026 raise ValueError, "mode must begin with 'r'"
1027 return open(file, mode).read()
1029 def rmdir(self, dir):
1030 """Removes the specified dir name.
1031 The dir name may be a list, in which case the elements are
1032 concatenated with the os.path.join() method. The dir is
1033 assumed to be under the temporary working directory unless it
1034 is an absolute path name.
1035 The dir must be empty.
1037 dir = self.canonicalize(dir)
1040 def start(self, program = None,
1043 universal_newlines = None,
1046 Starts a program or script for the test environment.
1048 The specified program will have the original directory
1049 prepended unless it is enclosed in a [list].
1051 cmd = self.command_args(program, interpreter, arguments)
1052 cmd_string = string.join(map(self.escape, cmd), ' ')
1054 sys.stderr.write(cmd_string + "\n")
1055 if universal_newlines is None:
1056 universal_newlines = self.universal_newlines
1058 combine = kw.get('combine', self.combine)
1060 stderr_value = subprocess.STDOUT
1062 stderr_value = subprocess.PIPE
1065 stdin=subprocess.PIPE,
1066 stdout=subprocess.PIPE,
1067 stderr=stderr_value,
1068 universal_newlines=universal_newlines)
1070 def finish(self, popen, **kw):
1072 Finishes and waits for the process being run under control of
1073 the specified popen argument, recording the exit status,
1074 standard output and error output.
1077 self.status = popen.wait()
1080 self._stdout.append(popen.stdout.read())
1082 stderr = popen.stderr.read()
1085 self._stderr.append(stderr)
1087 def run(self, program = None,
1092 universal_newlines = None):
1093 """Runs a test of the program or script for the test
1094 environment. Standard output and error output are saved for
1095 future retrieval via the stdout() and stderr() methods.
1097 The specified program will have the original directory
1098 prepended unless it is enclosed in a [list].
1101 oldcwd = os.getcwd()
1102 if not os.path.isabs(chdir):
1103 chdir = os.path.join(self.workpath(chdir))
1105 sys.stderr.write("chdir(" + chdir + ")\n")
1107 p = self.start(program, interpreter, arguments, universal_newlines)
1113 p.stdin.write(stdin)
1116 out = p.stdout.read()
1117 if p.stderr is None:
1120 err = p.stderr.read()
1122 close_output = p.close_output
1123 except AttributeError:
1125 if not p.stderr is None:
1130 self._stdout.append(out)
1131 self._stderr.append(err)
1133 self.status = p.wait()
1139 if self.verbose >= 2:
1140 write = sys.stdout.write
1141 write('============ STATUS: %d\n' % self.status)
1143 if out or self.verbose >= 3:
1144 write('============ BEGIN STDOUT (len=%d):\n' % len(out))
1146 write('============ END STDOUT\n')
1148 if err or self.verbose >= 3:
1149 write('============ BEGIN STDERR (len=%d)\n' % len(err))
1151 write('============ END STDERR\n')
1153 def sleep(self, seconds = default_sleep_seconds):
1154 """Sleeps at least the specified number of seconds. If no
1155 number is specified, sleeps at least the minimum number of
1156 seconds necessary to advance file time stamps on the current
1157 system. Sleeping more seconds is all right.
1161 def stderr(self, run = None):
1162 """Returns the error output from the specified run number.
1163 If there is no specified run number, then returns the error
1164 output of the last run. If the run number is less than zero,
1165 then returns the error output from that many runs back from the
1169 run = len(self._stderr)
1171 run = len(self._stderr) + run
1173 return self._stderr[run]
1175 def stdout(self, run = None):
1176 """Returns the standard output from the specified run number.
1177 If there is no specified run number, then returns the standard
1178 output of the last run. If the run number is less than zero,
1179 then returns the standard output from that many runs back from
1183 run = len(self._stdout)
1185 run = len(self._stdout) + run
1187 return self._stdout[run]
1189 def subdir(self, *subdirs):
1190 """Create new subdirectories under the temporary working
1191 directory, one for each argument. An argument may be a list,
1192 in which case the list elements are concatenated using the
1193 os.path.join() method. Subdirectories multiple levels deep
1194 must be created using a separate argument for each level:
1196 test.subdir('sub', ['sub', 'dir'], ['sub', 'dir', 'ectory'])
1198 Returns the number of subdirectories actually created.
1205 sub = apply(os.path.join, tuple(sub))
1206 new = os.path.join(self.workdir, sub)
1215 def symlink(self, target, link):
1216 """Creates a symlink to the specified target.
1217 The link name may be a list, in which case the elements are
1218 concatenated with the os.path.join() method. The link is
1219 assumed to be under the temporary working directory unless it
1220 is an absolute path name. The target is *not* assumed to be
1221 under the temporary working directory.
1223 link = self.canonicalize(link)
1224 os.symlink(target, link)
1226 def tempdir(self, path=None):
1227 """Creates a temporary directory.
1228 A unique directory name is generated if no path name is specified.
1229 The directory is created, and will be removed when the TestCmd
1230 object is destroyed.
1234 path = tempfile.mktemp(prefix=tempfile.template)
1236 path = tempfile.mktemp()
1239 # Symlinks in the path will report things
1240 # differently from os.getcwd(), so chdir there
1241 # and back to fetch the canonical path.
1249 # Uppercase the drive letter since the case of drive
1250 # letters is pretty much random on win32:
1251 drive,rest = os.path.splitdrive(path)
1253 path = string.upper(drive) + rest
1256 self._dirlist.append(path)
1259 _Cleanup.index(self)
1261 _Cleanup.append(self)
1265 def touch(self, path, mtime=None):
1266 """Updates the modification time on the specified file or
1267 directory path name. The default is to update to the
1268 current time if no explicit modification time is specified.
1270 path = self.canonicalize(path)
1271 atime = os.path.getatime(path)
1274 os.utime(path, (atime, mtime))
1276 def unlink(self, file):
1277 """Unlinks the specified file name.
1278 The file name may be a list, in which case the elements are
1279 concatenated with the os.path.join() method. The file is
1280 assumed to be under the temporary working directory unless it
1281 is an absolute path name.
1283 file = self.canonicalize(file)
1286 def verbose_set(self, verbose):
1287 """Set the verbose level.
1289 self.verbose = verbose
1291 def where_is(self, file, path=None, pathext=None):
1292 """Find an executable file.
1295 file = apply(os.path.join, tuple(file))
1296 if not os.path.isabs(file):
1297 file = where_is(file, path, pathext)
1300 def workdir_set(self, path):
1301 """Creates a temporary working directory with the specified
1302 path name. If the path is a null string (''), a unique
1303 directory name is created.
1308 path = self.tempdir(path)
1311 def workpath(self, *args):
1312 """Returns the absolute path name to a subdirectory or file
1313 within the current temporary working directory. Concatenates
1314 the temporary working directory name with the specified
1315 arguments using the os.path.join() method.
1317 return apply(os.path.join, (self.workdir,) + tuple(args))
1319 def readable(self, top, read=1):
1320 """Make the specified directory tree readable (read == 1)
1321 or not (read == None).
1323 This method has no effect on Windows systems, which use a
1324 completely different mechanism to control file readability.
1327 if sys.platform == 'win32':
1331 def do_chmod(fname):
1332 try: st = os.stat(fname)
1333 except OSError: pass
1334 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IREAD))
1336 def do_chmod(fname):
1337 try: st = os.stat(fname)
1338 except OSError: pass
1339 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IREAD))
1341 if os.path.isfile(top):
1342 # If it's a file, that's easy, just chmod it.
1345 # It's a directory and we're trying to turn on read
1346 # permission, so it's also pretty easy, just chmod the
1347 # directory and then chmod every entry on our walk down the
1348 # tree. Because os.path.walk() is top-down, we'll enable
1349 # read permission on any directories that have it disabled
1350 # before os.path.walk() tries to list their contents.
1353 def chmod_entries(arg, dirname, names, do_chmod=do_chmod):
1355 do_chmod(os.path.join(dirname, n))
1357 os.path.walk(top, chmod_entries, None)
1359 # It's a directory and we're trying to turn off read
1360 # permission, which means we have to chmod the directoreis
1361 # in the tree bottom-up, lest disabling read permission from
1362 # the top down get in the way of being able to get at lower
1363 # parts of the tree. But os.path.walk() visits things top
1364 # down, so we just use an object to collect a list of all
1365 # of the entries in the tree, reverse the list, and then
1366 # chmod the reversed (bottom-up) list.
1367 col = Collector(top)
1368 os.path.walk(top, col, None)
1369 col.entries.reverse()
1370 for d in col.entries: do_chmod(d)
1372 def writable(self, top, write=1):
1373 """Make the specified directory tree writable (write == 1)
1374 or not (write == None).
1377 if sys.platform == 'win32':
1380 def do_chmod(fname):
1381 try: os.chmod(fname, stat.S_IWRITE)
1382 except OSError: pass
1384 def do_chmod(fname):
1385 try: os.chmod(fname, stat.S_IREAD)
1386 except OSError: pass
1391 def do_chmod(fname):
1392 try: st = os.stat(fname)
1393 except OSError: pass
1394 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0200))
1396 def do_chmod(fname):
1397 try: st = os.stat(fname)
1398 except OSError: pass
1399 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0200))
1401 if os.path.isfile(top):
1404 col = Collector(top)
1405 os.path.walk(top, col, None)
1406 for d in col.entries: do_chmod(d)
1408 def executable(self, top, execute=1):
1409 """Make the specified directory tree executable (execute == 1)
1410 or not (execute == None).
1412 This method has no effect on Windows systems, which use a
1413 completely different mechanism to control file executability.
1416 if sys.platform == 'win32':
1420 def do_chmod(fname):
1421 try: st = os.stat(fname)
1422 except OSError: pass
1423 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IEXEC))
1425 def do_chmod(fname):
1426 try: st = os.stat(fname)
1427 except OSError: pass
1428 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IEXEC))
1430 if os.path.isfile(top):
1431 # If it's a file, that's easy, just chmod it.
1434 # It's a directory and we're trying to turn on execute
1435 # permission, so it's also pretty easy, just chmod the
1436 # directory and then chmod every entry on our walk down the
1437 # tree. Because os.path.walk() is top-down, we'll enable
1438 # execute permission on any directories that have it disabled
1439 # before os.path.walk() tries to list their contents.
1442 def chmod_entries(arg, dirname, names, do_chmod=do_chmod):
1444 do_chmod(os.path.join(dirname, n))
1446 os.path.walk(top, chmod_entries, None)
1448 # It's a directory and we're trying to turn off execute
1449 # permission, which means we have to chmod the directories
1450 # in the tree bottom-up, lest disabling execute permission from
1451 # the top down get in the way of being able to get at lower
1452 # parts of the tree. But os.path.walk() visits things top
1453 # down, so we just use an object to collect a list of all
1454 # of the entries in the tree, reverse the list, and then
1455 # chmod the reversed (bottom-up) list.
1456 col = Collector(top)
1457 os.path.walk(top, col, None)
1458 col.entries.reverse()
1459 for d in col.entries: do_chmod(d)
1461 def write(self, file, content, mode = 'wb'):
1462 """Writes the specified content text (second argument) to the
1463 specified file name (first argument). The file name may be
1464 a list, in which case the elements are concatenated with the
1465 os.path.join() method. The file is created under the temporary
1466 working directory. Any subdirectories in the path must already
1467 exist. The I/O mode for the file may be specified; it must
1468 begin with a 'w'. The default is 'wb' (binary write).
1470 file = self.canonicalize(file)
1472 raise ValueError, "mode must begin with 'w'"
1473 open(file, mode).write(content)