48ba850b2be6a8e0bfb5c6b83eba4deb30367ad8
[scons.git] / QMTest / TestCmd.py
1 """
2 TestCmd.py:  a testing framework for commands and scripts.
3
4 The TestCmd module provides a framework for portable automated testing
5 of executable commands and scripts (in any language, not just Python),
6 especially commands and scripts that require file system interaction.
7
8 In addition to running tests and evaluating conditions, the TestCmd
9 module manages and cleans up one or more temporary workspace
10 directories, and provides methods for creating files and directories in
11 those workspace directories from in-line data, here-documents), allowing
12 tests to be completely self-contained.
13
14 A TestCmd environment object is created via the usual invocation:
15
16     import TestCmd
17     test = TestCmd.TestCmd()
18
19 There are a bunch of keyword arguments available at instantiation:
20
21     test = TestCmd.TestCmd(description = 'string',
22                            program = 'program_or_script_to_test',
23                            interpreter = 'script_interpreter',
24                            workdir = 'prefix',
25                            subdir = 'subdir',
26                            verbose = Boolean,
27                            match = default_match_function,
28                            combine = Boolean)
29
30 There are a bunch of methods that let you do different things:
31
32     test.verbose_set(1)
33
34     test.description_set('string')
35
36     test.program_set('program_or_script_to_test')
37
38     test.interpreter_set('script_interpreter')
39     test.interpreter_set(['script_interpreter', 'arg'])
40
41     test.workdir_set('prefix')
42     test.workdir_set('')
43
44     test.workpath('file')
45     test.workpath('subdir', 'file')
46
47     test.subdir('subdir', ...)
48
49     test.rmdir('subdir', ...)
50
51     test.write('file', "contents\n")
52     test.write(['subdir', 'file'], "contents\n")
53
54     test.read('file')
55     test.read(['subdir', 'file'])
56     test.read('file', mode)
57     test.read(['subdir', 'file'], mode)
58
59     test.writable('dir', 1)
60     test.writable('dir', None)
61
62     test.preserve(condition, ...)
63
64     test.cleanup(condition)
65
66     test.command_args(program = 'program_or_script_to_run',
67                       interpreter = 'script_interpreter',
68                       arguments = 'arguments to pass to program')
69
70     test.run(program = 'program_or_script_to_run',
71              interpreter = 'script_interpreter',
72              arguments = 'arguments to pass to program',
73              chdir = 'directory_to_chdir_to',
74              stdin = 'input to feed to the program\n')
75              universal_newlines = True)
76
77     p = test.start(program = 'program_or_script_to_run',
78                    interpreter = 'script_interpreter',
79                    arguments = 'arguments to pass to program',
80                    universal_newlines = None)
81
82     test.finish(self, p)
83
84     test.pass_test()
85     test.pass_test(condition)
86     test.pass_test(condition, function)
87
88     test.fail_test()
89     test.fail_test(condition)
90     test.fail_test(condition, function)
91     test.fail_test(condition, function, skip)
92
93     test.no_result()
94     test.no_result(condition)
95     test.no_result(condition, function)
96     test.no_result(condition, function, skip)
97
98     test.stdout()
99     test.stdout(run)
100
101     test.stderr()
102     test.stderr(run)
103
104     test.symlink(target, link)
105
106     test.match(actual, expected)
107
108     test.match_exact("actual 1\nactual 2\n", "expected 1\nexpected 2\n")
109     test.match_exact(["actual 1\n", "actual 2\n"],
110                      ["expected 1\n", "expected 2\n"])
111
112     test.match_re("actual 1\nactual 2\n", regex_string)
113     test.match_re(["actual 1\n", "actual 2\n"], list_of_regexes)
114
115     test.match_re_dotall("actual 1\nactual 2\n", regex_string)
116     test.match_re_dotall(["actual 1\n", "actual 2\n"], list_of_regexes)
117
118     test.tempdir()
119     test.tempdir('temporary-directory')
120
121     test.sleep()
122     test.sleep(seconds)
123
124     test.where_is('foo')
125     test.where_is('foo', 'PATH1:PATH2')
126     test.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
127
128     test.unlink('file')
129     test.unlink('subdir', 'file')
130
131 The TestCmd module provides pass_test(), fail_test(), and no_result()
132 unbound functions that report test results for use with the Aegis change
133 management system.  These methods terminate the test immediately,
134 reporting PASSED, FAILED, or NO RESULT respectively, and exiting with
135 status 0 (success), 1 or 2 respectively.  This allows for a distinction
136 between an actual failed test and a test that could not be properly
137 evaluated because of an external condition (such as a full file system
138 or incorrect permissions).
139
140     import TestCmd
141
142     TestCmd.pass_test()
143     TestCmd.pass_test(condition)
144     TestCmd.pass_test(condition, function)
145
146     TestCmd.fail_test()
147     TestCmd.fail_test(condition)
148     TestCmd.fail_test(condition, function)
149     TestCmd.fail_test(condition, function, skip)
150
151     TestCmd.no_result()
152     TestCmd.no_result(condition)
153     TestCmd.no_result(condition, function)
154     TestCmd.no_result(condition, function, skip)
155
156 The TestCmd module also provides unbound functions that handle matching
157 in the same way as the match_*() methods described above.
158
159     import TestCmd
160
161     test = TestCmd.TestCmd(match = TestCmd.match_exact)
162
163     test = TestCmd.TestCmd(match = TestCmd.match_re)
164
165     test = TestCmd.TestCmd(match = TestCmd.match_re_dotall)
166
167 Lastly, the where_is() method also exists in an unbound function
168 version.
169
170     import TestCmd
171
172     TestCmd.where_is('foo')
173     TestCmd.where_is('foo', 'PATH1:PATH2')
174     TestCmd.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
175 """
176
177 # Copyright 2000, 2001, 2002, 2003, 2004 Steven Knight
178 # This module is free software, and you may redistribute it and/or modify
179 # it under the same terms as Python itself, so long as this copyright message
180 # and disclaimer are retained in their original form.
181 #
182 # IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
183 # SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
184 # THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
185 # DAMAGE.
186 #
187 # THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
188 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
189 # PARTICULAR PURPOSE.  THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
190 # AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
191 # SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
192
193 __author__ = "Steven Knight <knight at baldmt dot com>"
194 __revision__ = "TestCmd.py 0.35.D001 2009/02/08 07:10:39 knight"
195 __version__ = "0.35"
196
197 import errno
198 import os
199 import os.path
200 import re
201 import shutil
202 import stat
203 import string
204 import sys
205 import tempfile
206 import time
207 import traceback
208 import types
209 import UserList
210
211 __all__ = [
212     'diff_re',
213     'fail_test',
214     'no_result',
215     'pass_test',
216     'match_exact',
217     'match_re',
218     'match_re_dotall',
219     'python_executable',
220     'TestCmd'
221 ]
222
223 def is_List(e):
224     return type(e) is types.ListType \
225         or isinstance(e, UserList.UserList)
226
227 try:
228     from UserString import UserString
229 except ImportError:
230     class UserString:
231         pass
232
233 if hasattr(types, 'UnicodeType'):
234     def is_String(e):
235         return type(e) is types.StringType \
236             or type(e) is types.UnicodeType \
237             or isinstance(e, UserString)
238 else:
239     def is_String(e):
240         return type(e) is types.StringType or isinstance(e, UserString)
241
242 tempfile.template = 'testcmd.'
243 if os.name in ('posix', 'nt'):
244     tempfile.template = 'testcmd.' + str(os.getpid()) + '.'
245 else:
246     tempfile.template = 'testcmd.'
247
248 re_space = re.compile('\s')
249
250 _Cleanup = []
251
252 _chain_to_exitfunc = None
253
254 def _clean():
255     global _Cleanup
256     cleanlist = filter(None, _Cleanup)
257     del _Cleanup[:]
258     cleanlist.reverse()
259     for test in cleanlist:
260         test.cleanup()
261     if _chain_to_exitfunc:
262         _chain_to_exitfunc()
263
264 try:
265     import atexit
266 except ImportError:
267     # TODO(1.5): atexit requires python 2.0, so chain sys.exitfunc
268     try:
269         _chain_to_exitfunc = sys.exitfunc
270     except AttributeError:
271         pass
272     sys.exitfunc = _clean
273 else:
274     atexit.register(_clean)
275
276 try:
277     zip
278 except NameError:
279     def zip(*lists):
280         result = []
281         for i in xrange(min(map(len, lists))):
282             result.append(tuple(map(lambda l, i=i: l[i], lists)))
283         return result
284
285 class Collector:
286     def __init__(self, top):
287         self.entries = [top]
288     def __call__(self, arg, dirname, names):
289         pathjoin = lambda n, d=dirname: os.path.join(d, n)
290         self.entries.extend(map(pathjoin, names))
291
292 def _caller(tblist, skip):
293     string = ""
294     arr = []
295     for file, line, name, text in tblist:
296         if file[-10:] == "TestCmd.py":
297                 break
298         arr = [(file, line, name, text)] + arr
299     atfrom = "at"
300     for file, line, name, text in arr[skip:]:
301         if name in ("?", "<module>"):
302             name = ""
303         else:
304             name = " (" + name + ")"
305         string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name))
306         atfrom = "\tfrom"
307     return string
308
309 def fail_test(self = None, condition = 1, function = None, skip = 0):
310     """Cause the test to fail.
311
312     By default, the fail_test() method reports that the test FAILED
313     and exits with a status of 1.  If a condition argument is supplied,
314     the test fails only if the condition is true.
315     """
316     if not condition:
317         return
318     if not function is None:
319         function()
320     of = ""
321     desc = ""
322     sep = " "
323     if not self is None:
324         if self.program:
325             of = " of " + self.program
326             sep = "\n\t"
327         if self.description:
328             desc = " [" + self.description + "]"
329             sep = "\n\t"
330
331     at = _caller(traceback.extract_stack(), skip)
332     sys.stderr.write("FAILED test" + of + desc + sep + at)
333
334     sys.exit(1)
335
336 def no_result(self = None, condition = 1, function = None, skip = 0):
337     """Causes a test to exit with no valid result.
338
339     By default, the no_result() method reports NO RESULT for the test
340     and exits with a status of 2.  If a condition argument is supplied,
341     the test fails only if the condition is true.
342     """
343     if not condition:
344         return
345     if not function is None:
346         function()
347     of = ""
348     desc = ""
349     sep = " "
350     if not self is None:
351         if self.program:
352             of = " of " + self.program
353             sep = "\n\t"
354         if self.description:
355             desc = " [" + self.description + "]"
356             sep = "\n\t"
357
358     at = _caller(traceback.extract_stack(), skip)
359     sys.stderr.write("NO RESULT for test" + of + desc + sep + at)
360
361     sys.exit(2)
362
363 def pass_test(self = None, condition = 1, function = None):
364     """Causes a test to pass.
365
366     By default, the pass_test() method reports PASSED for the test
367     and exits with a status of 0.  If a condition argument is supplied,
368     the test passes only if the condition is true.
369     """
370     if not condition:
371         return
372     if not function is None:
373         function()
374     sys.stderr.write("PASSED\n")
375     sys.exit(0)
376
377 def match_exact(lines = None, matches = None):
378     """
379     """
380     if not is_List(lines):
381         lines = string.split(lines, "\n")
382     if not is_List(matches):
383         matches = string.split(matches, "\n")
384     if len(lines) != len(matches):
385         return
386     for i in range(len(lines)):
387         if lines[i] != matches[i]:
388             return
389     return 1
390
391 def match_re(lines = None, res = None):
392     """
393     """
394     if not is_List(lines):
395         lines = string.split(lines, "\n")
396     if not is_List(res):
397         res = string.split(res, "\n")
398     if len(lines) != len(res):
399         return
400     for i in range(len(lines)):
401         s = "^" + res[i] + "$"
402         try:
403             expr = re.compile(s)
404         except re.error, e:
405             msg = "Regular expression error in %s: %s"
406             raise re.error, msg % (repr(s), e[0])
407         if not expr.search(lines[i]):
408             return
409     return 1
410
411 def match_re_dotall(lines = None, res = None):
412     """
413     """
414     if not type(lines) is type(""):
415         lines = string.join(lines, "\n")
416     if not type(res) is type(""):
417         res = string.join(res, "\n")
418     s = "^" + res + "$"
419     try:
420         expr = re.compile(s, re.DOTALL)
421     except re.error, e:
422         msg = "Regular expression error in %s: %s"
423         raise re.error, msg % (repr(s), e[0])
424     if expr.match(lines):
425         return 1
426
427 def diff_re(a, b, fromfile='', tofile='',
428                 fromfiledate='', tofiledate='', n=3, lineterm='\n'):
429     """
430     A simple "diff" of two sets of lines when the expected lines
431     are regular expressions.  This is a really dumb thing that
432     just compares each line in turn, so it doesn't look for
433     chunks of matching lines and the like--but at least it lets
434     you know exactly which line first didn't compare correctl...
435     """
436     result = []
437     diff = len(a) - len(b)
438     if diff < 0:
439         a = a + ['']*(-diff)
440     elif diff > 0:
441         b = b + ['']*diff
442     i = 0
443     for aline, bline in zip(a, b):
444         s = "^" + aline + "$"
445         try:
446             expr = re.compile(s)
447         except re.error, e:
448             msg = "Regular expression error in %s: %s"
449             raise re.error, msg % (repr(s), e[0])
450         if not expr.search(bline):
451             result.append("%sc%s" % (i+1, i+1))
452             result.append('< ' + repr(a[i]))
453             result.append('---')
454             result.append('> ' + repr(b[i]))
455         i = i+1
456     return result
457
458 if os.name == 'java':
459
460     python_executable = os.path.join(sys.prefix, 'jython')
461
462 else:
463
464     python_executable = sys.executable
465
466 if sys.platform == 'win32':
467
468     default_sleep_seconds = 2
469
470     def where_is(file, path=None, pathext=None):
471         if path is None:
472             path = os.environ['PATH']
473         if is_String(path):
474             path = string.split(path, os.pathsep)
475         if pathext is None:
476             pathext = os.environ['PATHEXT']
477         if is_String(pathext):
478             pathext = string.split(pathext, os.pathsep)
479         for ext in pathext:
480             if string.lower(ext) == string.lower(file[-len(ext):]):
481                 pathext = ['']
482                 break
483         for dir in path:
484             f = os.path.join(dir, file)
485             for ext in pathext:
486                 fext = f + ext
487                 if os.path.isfile(fext):
488                     return fext
489         return None
490
491 else:
492
493     def where_is(file, path=None, pathext=None):
494         if path is None:
495             path = os.environ['PATH']
496         if is_String(path):
497             path = string.split(path, os.pathsep)
498         for dir in path:
499             f = os.path.join(dir, file)
500             if os.path.isfile(f):
501                 try:
502                     st = os.stat(f)
503                 except OSError:
504                     continue
505                 if stat.S_IMODE(st[stat.ST_MODE]) & 0111:
506                     return f
507         return None
508
509     default_sleep_seconds = 1
510
511
512
513 try:
514     import subprocess
515 except ImportError:
516     # The subprocess module doesn't exist in this version of Python,
517     # so we're going to cobble up something that looks just enough
518     # like its API for our purposes below.
519     import new
520
521     subprocess = new.module('subprocess')
522
523     subprocess.PIPE = 'PIPE'
524     subprocess.STDOUT = 'STDOUT'
525     subprocess.mswindows = (sys.platform == 'win32')
526
527     try:
528         import popen2
529         popen2.Popen3
530     except AttributeError:
531         class Popen3:
532             universal_newlines = 1
533             def __init__(self, command, **kw):
534                 if sys.platform == 'win32' and command[0] == '"':
535                     command = '"' + command + '"'
536                 (stdin, stdout, stderr) = os.popen3(' ' + command)
537                 self.stdin = stdin
538                 self.stdout = stdout
539                 self.stderr = stderr
540             def close_output(self):
541                 self.stdout.close()
542                 self.resultcode = self.stderr.close()
543             def wait(self):
544                 resultcode = self.resultcode
545                 if os.WIFEXITED(resultcode):
546                     return os.WEXITSTATUS(resultcode)
547                 elif os.WIFSIGNALED(resultcode):
548                     return os.WTERMSIG(resultcode)
549                 else:
550                     return None
551
552     else:
553         try:
554             popen2.Popen4
555         except AttributeError:
556             # A cribbed Popen4 class, with some retrofitted code from
557             # the Python 1.5 Popen3 class methods to do certain things
558             # by hand.
559             class Popen4(popen2.Popen3):
560                 childerr = None
561
562                 def __init__(self, cmd, bufsize=-1):
563                     p2cread, p2cwrite = os.pipe()
564                     c2pread, c2pwrite = os.pipe()
565                     self.pid = os.fork()
566                     if self.pid == 0:
567                         # Child
568                         os.dup2(p2cread, 0)
569                         os.dup2(c2pwrite, 1)
570                         os.dup2(c2pwrite, 2)
571                         for i in range(3, popen2.MAXFD):
572                             try:
573                                 os.close(i)
574                             except: pass
575                         try:
576                             os.execvp(cmd[0], cmd)
577                         finally:
578                             os._exit(1)
579                         # Shouldn't come here, I guess
580                         os._exit(1)
581                     os.close(p2cread)
582                     self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
583                     os.close(c2pwrite)
584                     self.fromchild = os.fdopen(c2pread, 'r', bufsize)
585                     popen2._active.append(self)
586
587             popen2.Popen4 = Popen4
588
589         class Popen3(popen2.Popen3, popen2.Popen4):
590             universal_newlines = 1
591             def __init__(self, command, **kw):
592                 if kw.get('stderr') == 'STDOUT':
593                     apply(popen2.Popen4.__init__, (self, command, 1))
594                 else:
595                     apply(popen2.Popen3.__init__, (self, command, 1))
596                 self.stdin = self.tochild
597                 self.stdout = self.fromchild
598                 self.stderr = self.childerr
599             def wait(self, *args, **kw):
600                 resultcode = apply(popen2.Popen3.wait, (self,)+args, kw)
601                 if os.WIFEXITED(resultcode):
602                     return os.WEXITSTATUS(resultcode)
603                 elif os.WIFSIGNALED(resultcode):
604                     return os.WTERMSIG(resultcode)
605                 else:
606                     return None
607
608     subprocess.Popen = Popen3
609
610
611
612 # From Josiah Carlson,
613 # ASPN : Python Cookbook : Module to allow Asynchronous subprocess use on Windows and Posix platforms
614 # http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440554
615
616 PIPE = subprocess.PIPE
617
618 if subprocess.mswindows:
619     from win32file import ReadFile, WriteFile
620     from win32pipe import PeekNamedPipe
621     import msvcrt
622 else:
623     import select
624     import fcntl
625
626     try:                    fcntl.F_GETFL
627     except AttributeError:  fcntl.F_GETFL = 3
628
629     try:                    fcntl.F_SETFL
630     except AttributeError:  fcntl.F_SETFL = 4
631
632 class Popen(subprocess.Popen):
633     def recv(self, maxsize=None):
634         return self._recv('stdout', maxsize)
635
636     def recv_err(self, maxsize=None):
637         return self._recv('stderr', maxsize)
638
639     def send_recv(self, input='', maxsize=None):
640         return self.send(input), self.recv(maxsize), self.recv_err(maxsize)
641
642     def get_conn_maxsize(self, which, maxsize):
643         if maxsize is None:
644             maxsize = 1024
645         elif maxsize < 1:
646             maxsize = 1
647         return getattr(self, which), maxsize
648
649     def _close(self, which):
650         getattr(self, which).close()
651         setattr(self, which, None)
652
653     if subprocess.mswindows:
654         def send(self, input):
655             if not self.stdin:
656                 return None
657
658             try:
659                 x = msvcrt.get_osfhandle(self.stdin.fileno())
660                 (errCode, written) = WriteFile(x, input)
661             except ValueError:
662                 return self._close('stdin')
663             except (subprocess.pywintypes.error, Exception), why:
664                 if why[0] in (109, errno.ESHUTDOWN):
665                     return self._close('stdin')
666                 raise
667
668             return written
669
670         def _recv(self, which, maxsize):
671             conn, maxsize = self.get_conn_maxsize(which, maxsize)
672             if conn is None:
673                 return None
674
675             try:
676                 x = msvcrt.get_osfhandle(conn.fileno())
677                 (read, nAvail, nMessage) = PeekNamedPipe(x, 0)
678                 if maxsize < nAvail:
679                     nAvail = maxsize
680                 if nAvail > 0:
681                     (errCode, read) = ReadFile(x, nAvail, None)
682             except ValueError:
683                 return self._close(which)
684             except (subprocess.pywintypes.error, Exception), why:
685                 if why[0] in (109, errno.ESHUTDOWN):
686                     return self._close(which)
687                 raise
688
689             #if self.universal_newlines:
690             #    read = self._translate_newlines(read)
691             return read
692
693     else:
694         def send(self, input):
695             if not self.stdin:
696                 return None
697
698             if not select.select([], [self.stdin], [], 0)[1]:
699                 return 0
700
701             try:
702                 written = os.write(self.stdin.fileno(), input)
703             except OSError, why:
704                 if why[0] == errno.EPIPE: #broken pipe
705                     return self._close('stdin')
706                 raise
707
708             return written
709
710         def _recv(self, which, maxsize):
711             conn, maxsize = self.get_conn_maxsize(which, maxsize)
712             if conn is None:
713                 return None
714
715             try:
716                 flags = fcntl.fcntl(conn, fcntl.F_GETFL)
717             except TypeError:
718                 flags = None
719             else:
720                 if not conn.closed:
721                     fcntl.fcntl(conn, fcntl.F_SETFL, flags| os.O_NONBLOCK)
722
723             try:
724                 if not select.select([conn], [], [], 0)[0]:
725                     return ''
726
727                 r = conn.read(maxsize)
728                 if not r:
729                     return self._close(which)
730
731                 #if self.universal_newlines:
732                 #    r = self._translate_newlines(r)
733                 return r
734             finally:
735                 if not conn.closed and not flags is None:
736                     fcntl.fcntl(conn, fcntl.F_SETFL, flags)
737
738 disconnect_message = "Other end disconnected!"
739
740 def recv_some(p, t=.1, e=1, tr=5, stderr=0):
741     if tr < 1:
742         tr = 1
743     x = time.time()+t
744     y = []
745     r = ''
746     pr = p.recv
747     if stderr:
748         pr = p.recv_err
749     while time.time() < x or r:
750         r = pr()
751         if r is None:
752             if e:
753                 raise Exception(disconnect_message)
754             else:
755                 break
756         elif r:
757             y.append(r)
758         else:
759             time.sleep(max((x-time.time())/tr, 0))
760     return ''.join(y)
761
762 def send_all(p, data):
763     while len(data):
764         sent = p.send(data)
765         if sent is None:
766             raise Exception(disconnect_message)
767         data = buffer(data, sent)
768
769
770
771 class TestCmd:
772     """Class TestCmd
773     """
774
775     def __init__(self, description = None,
776                        program = None,
777                        interpreter = None,
778                        workdir = None,
779                        subdir = None,
780                        verbose = None,
781                        match = None,
782                        combine = 0,
783                        universal_newlines = 1):
784         self._cwd = os.getcwd()
785         self.description_set(description)
786         self.program_set(program)
787         self.interpreter_set(interpreter)
788         if verbose is None:
789             try:
790                 verbose = max( 0, int(os.environ.get('TESTCMD_VERBOSE', 0)) )
791             except ValueError:
792                 verbose = 0
793         self.verbose_set(verbose)
794         self.combine = combine
795         self.universal_newlines = universal_newlines
796         if not match is None:
797             self.match_func = match
798         else:
799             self.match_func = match_re
800         self._dirlist = []
801         self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0}
802         if os.environ.has_key('PRESERVE') and not os.environ['PRESERVE'] is '':
803             self._preserve['pass_test'] = os.environ['PRESERVE']
804             self._preserve['fail_test'] = os.environ['PRESERVE']
805             self._preserve['no_result'] = os.environ['PRESERVE']
806         else:
807             try:
808                 self._preserve['pass_test'] = os.environ['PRESERVE_PASS']
809             except KeyError:
810                 pass
811             try:
812                 self._preserve['fail_test'] = os.environ['PRESERVE_FAIL']
813             except KeyError:
814                 pass
815             try:
816                 self._preserve['no_result'] = os.environ['PRESERVE_NO_RESULT']
817             except KeyError:
818                 pass
819         self._stdout = []
820         self._stderr = []
821         self.status = None
822         self.condition = 'no_result'
823         self.workdir_set(workdir)
824         self.subdir(subdir)
825
826     def __del__(self):
827         self.cleanup()
828
829     def __repr__(self):
830         return "%x" % id(self)
831
832     if os.name == 'posix':
833
834         def escape(self, arg):
835             "escape shell special characters"
836             slash = '\\'
837             special = '"$'
838
839             arg = string.replace(arg, slash, slash+slash)
840             for c in special:
841                 arg = string.replace(arg, c, slash+c)
842
843             if re_space.search(arg):
844                 arg = '"' + arg + '"'
845             return arg
846
847     else:
848
849         # Windows does not allow special characters in file names
850         # anyway, so no need for an escape function, we will just quote
851         # the arg.
852         def escape(self, arg):
853             if re_space.search(arg):
854                 arg = '"' + arg + '"'
855             return arg
856
857     def canonicalize(self, path):
858         if is_List(path):
859             path = apply(os.path.join, tuple(path))
860         if not os.path.isabs(path):
861             path = os.path.join(self.workdir, path)
862         return path
863
864     def chmod(self, path, mode):
865         """Changes permissions on the specified file or directory
866         path name."""
867         path = self.canonicalize(path)
868         os.chmod(path, mode)
869
870     def cleanup(self, condition = None):
871         """Removes any temporary working directories for the specified
872         TestCmd environment.  If the environment variable PRESERVE was
873         set when the TestCmd environment was created, temporary working
874         directories are not removed.  If any of the environment variables
875         PRESERVE_PASS, PRESERVE_FAIL, or PRESERVE_NO_RESULT were set
876         when the TestCmd environment was created, then temporary working
877         directories are not removed if the test passed, failed, or had
878         no result, respectively.  Temporary working directories are also
879         preserved for conditions specified via the preserve method.
880
881         Typically, this method is not called directly, but is used when
882         the script exits to clean up temporary working directories as
883         appropriate for the exit status.
884         """
885         if not self._dirlist:
886             return
887         os.chdir(self._cwd)
888         self.workdir = None
889         if condition is None:
890             condition = self.condition
891         if self._preserve[condition]:
892             for dir in self._dirlist:
893                 print "Preserved directory", dir
894         else:
895             list = self._dirlist[:]
896             list.reverse()
897             for dir in list:
898                 self.writable(dir, 1)
899                 shutil.rmtree(dir, ignore_errors = 1)
900             self._dirlist = []
901
902         try:
903             global _Cleanup
904             _Cleanup.remove(self)
905         except (AttributeError, ValueError):
906             pass
907
908     def command_args(self, program = None,
909                            interpreter = None,
910                            arguments = None):
911         if program:
912             if type(program) == type('') and not os.path.isabs(program):
913                 program = os.path.join(self._cwd, program)
914         else:
915             program = self.program
916             if not interpreter:
917                 interpreter = self.interpreter
918         if not type(program) in [type([]), type(())]:
919             program = [program]
920         cmd = list(program)
921         if interpreter:
922             if not type(interpreter) in [type([]), type(())]:
923                 interpreter = [interpreter]
924             cmd = list(interpreter) + cmd
925         if arguments:
926             if type(arguments) == type(''):
927                 arguments = string.split(arguments)
928             cmd.extend(arguments)
929         return cmd
930
931     def description_set(self, description):
932         """Set the description of the functionality being tested.
933         """
934         self.description = description
935
936 #    def diff(self):
937 #        """Diff two arrays.
938 #        """
939
940     def fail_test(self, condition = 1, function = None, skip = 0):
941         """Cause the test to fail.
942         """
943         if not condition:
944             return
945         self.condition = 'fail_test'
946         fail_test(self = self,
947                   condition = condition,
948                   function = function,
949                   skip = skip)
950
951     def interpreter_set(self, interpreter):
952         """Set the program to be used to interpret the program
953         under test as a script.
954         """
955         self.interpreter = interpreter
956
957     def match(self, lines, matches):
958         """Compare actual and expected file contents.
959         """
960         return self.match_func(lines, matches)
961
962     def match_exact(self, lines, matches):
963         """Compare actual and expected file contents.
964         """
965         return match_exact(lines, matches)
966
967     def match_re(self, lines, res):
968         """Compare actual and expected file contents.
969         """
970         return match_re(lines, res)
971
972     def match_re_dotall(self, lines, res):
973         """Compare actual and expected file contents.
974         """
975         return match_re_dotall(lines, res)
976
977     def no_result(self, condition = 1, function = None, skip = 0):
978         """Report that the test could not be run.
979         """
980         if not condition:
981             return
982         self.condition = 'no_result'
983         no_result(self = self,
984                   condition = condition,
985                   function = function,
986                   skip = skip)
987
988     def pass_test(self, condition = 1, function = None):
989         """Cause the test to pass.
990         """
991         if not condition:
992             return
993         self.condition = 'pass_test'
994         pass_test(self = self, condition = condition, function = function)
995
996     def preserve(self, *conditions):
997         """Arrange for the temporary working directories for the
998         specified TestCmd environment to be preserved for one or more
999         conditions.  If no conditions are specified, arranges for
1000         the temporary working directories to be preserved for all
1001         conditions.
1002         """
1003         if conditions is ():
1004             conditions = ('pass_test', 'fail_test', 'no_result')
1005         for cond in conditions:
1006             self._preserve[cond] = 1
1007
1008     def program_set(self, program):
1009         """Set the executable program or script to be tested.
1010         """
1011         if program and not os.path.isabs(program):
1012             program = os.path.join(self._cwd, program)
1013         self.program = program
1014
1015     def read(self, file, mode = 'rb'):
1016         """Reads and returns the contents of the specified file name.
1017         The file name may be a list, in which case the elements are
1018         concatenated with the os.path.join() method.  The file is
1019         assumed to be under the temporary working directory unless it
1020         is an absolute path name.  The I/O mode for the file may
1021         be specified; it must begin with an 'r'.  The default is
1022         'rb' (binary read).
1023         """
1024         file = self.canonicalize(file)
1025         if mode[0] != 'r':
1026             raise ValueError, "mode must begin with 'r'"
1027         return open(file, mode).read()
1028
1029     def rmdir(self, dir):
1030         """Removes the specified dir name.
1031         The dir name may be a list, in which case the elements are
1032         concatenated with the os.path.join() method.  The dir is
1033         assumed to be under the temporary working directory unless it
1034         is an absolute path name.
1035         The dir must be empty.
1036         """
1037         dir = self.canonicalize(dir)
1038         os.rmdir(dir)
1039
1040     def start(self, program = None,
1041                     interpreter = None,
1042                     arguments = None,
1043                     universal_newlines = None,
1044                     **kw):
1045         """
1046         Starts a program or script for the test environment.
1047
1048         The specified program will have the original directory
1049         prepended unless it is enclosed in a [list].
1050         """
1051         cmd = self.command_args(program, interpreter, arguments)
1052         cmd_string = string.join(map(self.escape, cmd), ' ')
1053         if self.verbose:
1054             sys.stderr.write(cmd_string + "\n")
1055         if universal_newlines is None:
1056             universal_newlines = self.universal_newlines
1057
1058         combine = kw.get('combine', self.combine)
1059         if combine:
1060             stderr_value = subprocess.STDOUT
1061         else:
1062             stderr_value = subprocess.PIPE
1063
1064         return Popen(cmd,
1065                      stdin=subprocess.PIPE,
1066                      stdout=subprocess.PIPE,
1067                      stderr=stderr_value,
1068                      universal_newlines=universal_newlines)
1069
1070     def finish(self, popen, **kw):
1071         """
1072         Finishes and waits for the process being run under control of
1073         the specified popen argument, recording the exit status,
1074         standard output and error output.
1075         """
1076         popen.stdin.close()
1077         self.status = popen.wait()
1078         if not self.status:
1079             self.status = 0
1080         self._stdout.append(popen.stdout.read())
1081         if popen.stderr:
1082             stderr = popen.stderr.read()
1083         else:
1084             stderr = ''
1085         self._stderr.append(stderr)
1086
1087     def run(self, program = None,
1088                   interpreter = None,
1089                   arguments = None,
1090                   chdir = None,
1091                   stdin = None,
1092                   universal_newlines = None):
1093         """Runs a test of the program or script for the test
1094         environment.  Standard output and error output are saved for
1095         future retrieval via the stdout() and stderr() methods.
1096
1097         The specified program will have the original directory
1098         prepended unless it is enclosed in a [list].
1099         """
1100         if chdir:
1101             oldcwd = os.getcwd()
1102             if not os.path.isabs(chdir):
1103                 chdir = os.path.join(self.workpath(chdir))
1104             if self.verbose:
1105                 sys.stderr.write("chdir(" + chdir + ")\n")
1106             os.chdir(chdir)
1107         p = self.start(program, interpreter, arguments, universal_newlines)
1108         if stdin:
1109             if is_List(stdin):
1110                 for line in stdin:
1111                     p.stdin.write(line)
1112             else:
1113                 p.stdin.write(stdin)
1114         p.stdin.close()
1115
1116         out = p.stdout.read()
1117         if p.stderr is None:
1118             err = ''
1119         else:
1120             err = p.stderr.read()
1121         try:
1122             close_output = p.close_output
1123         except AttributeError:
1124             p.stdout.close()
1125             if not p.stderr is None:
1126                 p.stderr.close()
1127         else:
1128             close_output()
1129
1130         self._stdout.append(out)
1131         self._stderr.append(err)
1132
1133         self.status = p.wait()
1134         if not self.status:
1135             self.status = 0
1136
1137         if chdir:
1138             os.chdir(oldcwd)
1139         if self.verbose >= 2:
1140             write = sys.stdout.write
1141             write('============ STATUS: %d\n' % self.status)
1142             out = self.stdout()
1143             if out or self.verbose >= 3:
1144                 write('============ BEGIN STDOUT (len=%d):\n' % len(out))
1145                 write(out)
1146                 write('============ END STDOUT\n')
1147             err = self.stderr()
1148             if err or self.verbose >= 3:
1149                 write('============ BEGIN STDERR (len=%d)\n' % len(err))
1150                 write(err)
1151                 write('============ END STDERR\n')
1152
1153     def sleep(self, seconds = default_sleep_seconds):
1154         """Sleeps at least the specified number of seconds.  If no
1155         number is specified, sleeps at least the minimum number of
1156         seconds necessary to advance file time stamps on the current
1157         system.  Sleeping more seconds is all right.
1158         """
1159         time.sleep(seconds)
1160
1161     def stderr(self, run = None):
1162         """Returns the error output from the specified run number.
1163         If there is no specified run number, then returns the error
1164         output of the last run.  If the run number is less than zero,
1165         then returns the error output from that many runs back from the
1166         current run.
1167         """
1168         if not run:
1169             run = len(self._stderr)
1170         elif run < 0:
1171             run = len(self._stderr) + run
1172         run = run - 1
1173         return self._stderr[run]
1174
1175     def stdout(self, run = None):
1176         """Returns the standard output from the specified run number.
1177         If there is no specified run number, then returns the standard
1178         output of the last run.  If the run number is less than zero,
1179         then returns the standard output from that many runs back from
1180         the current run.
1181         """
1182         if not run:
1183             run = len(self._stdout)
1184         elif run < 0:
1185             run = len(self._stdout) + run
1186         run = run - 1
1187         return self._stdout[run]
1188
1189     def subdir(self, *subdirs):
1190         """Create new subdirectories under the temporary working
1191         directory, one for each argument.  An argument may be a list,
1192         in which case the list elements are concatenated using the
1193         os.path.join() method.  Subdirectories multiple levels deep
1194         must be created using a separate argument for each level:
1195
1196                 test.subdir('sub', ['sub', 'dir'], ['sub', 'dir', 'ectory'])
1197
1198         Returns the number of subdirectories actually created.
1199         """
1200         count = 0
1201         for sub in subdirs:
1202             if sub is None:
1203                 continue
1204             if is_List(sub):
1205                 sub = apply(os.path.join, tuple(sub))
1206             new = os.path.join(self.workdir, sub)
1207             try:
1208                 os.mkdir(new)
1209             except OSError:
1210                 pass
1211             else:
1212                 count = count + 1
1213         return count
1214
1215     def symlink(self, target, link):
1216         """Creates a symlink to the specified target.
1217         The link name may be a list, in which case the elements are
1218         concatenated with the os.path.join() method.  The link is
1219         assumed to be under the temporary working directory unless it
1220         is an absolute path name. The target is *not* assumed to be
1221         under the temporary working directory.
1222         """
1223         link = self.canonicalize(link)
1224         os.symlink(target, link)
1225
1226     def tempdir(self, path=None):
1227         """Creates a temporary directory.
1228         A unique directory name is generated if no path name is specified.
1229         The directory is created, and will be removed when the TestCmd
1230         object is destroyed.
1231         """
1232         if path is None:
1233             try:
1234                 path = tempfile.mktemp(prefix=tempfile.template)
1235             except TypeError:
1236                 path = tempfile.mktemp()
1237         os.mkdir(path)
1238
1239         # Symlinks in the path will report things
1240         # differently from os.getcwd(), so chdir there
1241         # and back to fetch the canonical path.
1242         cwd = os.getcwd()
1243         try:
1244             os.chdir(path)
1245             path = os.getcwd()
1246         finally:
1247             os.chdir(cwd)
1248
1249         # Uppercase the drive letter since the case of drive
1250         # letters is pretty much random on win32:
1251         drive,rest = os.path.splitdrive(path)
1252         if drive:
1253             path = string.upper(drive) + rest
1254
1255         #
1256         self._dirlist.append(path)
1257         global _Cleanup
1258         try:
1259             _Cleanup.index(self)
1260         except ValueError:
1261             _Cleanup.append(self)
1262
1263         return path
1264
1265     def touch(self, path, mtime=None):
1266         """Updates the modification time on the specified file or
1267         directory path name.  The default is to update to the
1268         current time if no explicit modification time is specified.
1269         """
1270         path = self.canonicalize(path)
1271         atime = os.path.getatime(path)
1272         if mtime is None:
1273             mtime = time.time()
1274         os.utime(path, (atime, mtime))
1275
1276     def unlink(self, file):
1277         """Unlinks the specified file name.
1278         The file name may be a list, in which case the elements are
1279         concatenated with the os.path.join() method.  The file is
1280         assumed to be under the temporary working directory unless it
1281         is an absolute path name.
1282         """
1283         file = self.canonicalize(file)
1284         os.unlink(file)
1285
1286     def verbose_set(self, verbose):
1287         """Set the verbose level.
1288         """
1289         self.verbose = verbose
1290
1291     def where_is(self, file, path=None, pathext=None):
1292         """Find an executable file.
1293         """
1294         if is_List(file):
1295             file = apply(os.path.join, tuple(file))
1296         if not os.path.isabs(file):
1297             file = where_is(file, path, pathext)
1298         return file
1299
1300     def workdir_set(self, path):
1301         """Creates a temporary working directory with the specified
1302         path name.  If the path is a null string (''), a unique
1303         directory name is created.
1304         """
1305         if (path != None):
1306             if path == '':
1307                 path = None
1308             path = self.tempdir(path)
1309         self.workdir = path
1310
1311     def workpath(self, *args):
1312         """Returns the absolute path name to a subdirectory or file
1313         within the current temporary working directory.  Concatenates
1314         the temporary working directory name with the specified
1315         arguments using the os.path.join() method.
1316         """
1317         return apply(os.path.join, (self.workdir,) + tuple(args))
1318
1319     def readable(self, top, read=1):
1320         """Make the specified directory tree readable (read == 1)
1321         or not (read == None).
1322
1323         This method has no effect on Windows systems, which use a
1324         completely different mechanism to control file readability.
1325         """
1326
1327         if sys.platform == 'win32':
1328             return
1329
1330         if read:
1331             def do_chmod(fname):
1332                 try: st = os.stat(fname)
1333                 except OSError: pass
1334                 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IREAD))
1335         else:
1336             def do_chmod(fname):
1337                 try: st = os.stat(fname)
1338                 except OSError: pass
1339                 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IREAD))
1340
1341         if os.path.isfile(top):
1342             # If it's a file, that's easy, just chmod it.
1343             do_chmod(top)
1344         elif read:
1345             # It's a directory and we're trying to turn on read
1346             # permission, so it's also pretty easy, just chmod the
1347             # directory and then chmod every entry on our walk down the
1348             # tree.  Because os.path.walk() is top-down, we'll enable
1349             # read permission on any directories that have it disabled
1350             # before os.path.walk() tries to list their contents.
1351             do_chmod(top)
1352
1353             def chmod_entries(arg, dirname, names, do_chmod=do_chmod):
1354                 for n in names:
1355                     do_chmod(os.path.join(dirname, n))
1356
1357             os.path.walk(top, chmod_entries, None)
1358         else:
1359             # It's a directory and we're trying to turn off read
1360             # permission, which means we have to chmod the directoreis
1361             # in the tree bottom-up, lest disabling read permission from
1362             # the top down get in the way of being able to get at lower
1363             # parts of the tree.  But os.path.walk() visits things top
1364             # down, so we just use an object to collect a list of all
1365             # of the entries in the tree, reverse the list, and then
1366             # chmod the reversed (bottom-up) list.
1367             col = Collector(top)
1368             os.path.walk(top, col, None)
1369             col.entries.reverse()
1370             for d in col.entries: do_chmod(d)
1371
1372     def writable(self, top, write=1):
1373         """Make the specified directory tree writable (write == 1)
1374         or not (write == None).
1375         """
1376
1377         if sys.platform == 'win32':
1378
1379             if write:
1380                 def do_chmod(fname):
1381                     try: os.chmod(fname, stat.S_IWRITE)
1382                     except OSError: pass
1383             else:
1384                 def do_chmod(fname):
1385                     try: os.chmod(fname, stat.S_IREAD)
1386                     except OSError: pass
1387
1388         else:
1389
1390             if write:
1391                 def do_chmod(fname):
1392                     try: st = os.stat(fname)
1393                     except OSError: pass
1394                     else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0200))
1395             else:
1396                 def do_chmod(fname):
1397                     try: st = os.stat(fname)
1398                     except OSError: pass
1399                     else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0200))
1400
1401         if os.path.isfile(top):
1402             do_chmod(top)
1403         else:
1404             col = Collector(top)
1405             os.path.walk(top, col, None)
1406             for d in col.entries: do_chmod(d)
1407
1408     def executable(self, top, execute=1):
1409         """Make the specified directory tree executable (execute == 1)
1410         or not (execute == None).
1411
1412         This method has no effect on Windows systems, which use a
1413         completely different mechanism to control file executability.
1414         """
1415
1416         if sys.platform == 'win32':
1417             return
1418
1419         if execute:
1420             def do_chmod(fname):
1421                 try: st = os.stat(fname)
1422                 except OSError: pass
1423                 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IEXEC))
1424         else:
1425             def do_chmod(fname):
1426                 try: st = os.stat(fname)
1427                 except OSError: pass
1428                 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IEXEC))
1429
1430         if os.path.isfile(top):
1431             # If it's a file, that's easy, just chmod it.
1432             do_chmod(top)
1433         elif execute:
1434             # It's a directory and we're trying to turn on execute
1435             # permission, so it's also pretty easy, just chmod the
1436             # directory and then chmod every entry on our walk down the
1437             # tree.  Because os.path.walk() is top-down, we'll enable
1438             # execute permission on any directories that have it disabled
1439             # before os.path.walk() tries to list their contents.
1440             do_chmod(top)
1441
1442             def chmod_entries(arg, dirname, names, do_chmod=do_chmod):
1443                 for n in names:
1444                     do_chmod(os.path.join(dirname, n))
1445
1446             os.path.walk(top, chmod_entries, None)
1447         else:
1448             # It's a directory and we're trying to turn off execute
1449             # permission, which means we have to chmod the directories
1450             # in the tree bottom-up, lest disabling execute permission from
1451             # the top down get in the way of being able to get at lower
1452             # parts of the tree.  But os.path.walk() visits things top
1453             # down, so we just use an object to collect a list of all
1454             # of the entries in the tree, reverse the list, and then
1455             # chmod the reversed (bottom-up) list.
1456             col = Collector(top)
1457             os.path.walk(top, col, None)
1458             col.entries.reverse()
1459             for d in col.entries: do_chmod(d)
1460
1461     def write(self, file, content, mode = 'wb'):
1462         """Writes the specified content text (second argument) to the
1463         specified file name (first argument).  The file name may be
1464         a list, in which case the elements are concatenated with the
1465         os.path.join() method.  The file is created under the temporary
1466         working directory.  Any subdirectories in the path must already
1467         exist.  The I/O mode for the file may be specified; it must
1468         begin with a 'w'.  The default is 'wb' (binary write).
1469         """
1470         file = self.canonicalize(file)
1471         if mode[0] != 'w':
1472             raise ValueError, "mode must begin with 'w'"
1473         open(file, mode).write(content)