question: Question.check() now returns (correct, details)
[quizzer.git] / quizzer / util.py
1 # Copyright (C) 2013 W. Trevor King <wking@tremily.us>
2 #
3 # This file is part of quizzer.
4 #
5 # quizzer is free software: you can redistribute it and/or modify it under the
6 # terms of the GNU General Public License as published by the Free Software
7 # Foundation, either version 3 of the License, or (at your option) any later
8 # version.
9 #
10 # quizzer is distributed in the hope that it will be useful, but WITHOUT ANY
11 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License along with
15 # quizzer.  If not, see <http://www.gnu.org/licenses/>.
16
17 import logging as _logging
18 import os.path as _os_path
19 import subprocess as _subprocess
20 import tempfile as _tempfile
21
22 from . import error as _error
23
24
25 def invoke(args, stdin=None, stdout=_subprocess.PIPE, stderr=_subprocess.PIPE,
26            universal_newlines=False, timeout=None, expect=None, **kwargs):
27     if stdin:
28         stdin_pipe = _subprocess.PIPE
29     else:
30         stdin_pipe = None
31     try:
32         p = _subprocess.Popen(
33             args, stdin=stdin_pipe, stdout=stdout, stderr=stderr,
34             universal_newlines=universal_newlines, **kwargs)
35     except FileNotFoundError as e:
36         raise _error.CommandError(arguments=args, stdin=stdin) from e
37     try:
38         stdout,stderr = p.communicate(input=stdin, timeout=timeout)
39     except _subprocess.TimeoutExpired as e:
40         p.kill()
41         stdout,stderr = p.communicate()
42         status = p.wait()
43         raise _error.CommandError(
44             msg='timeout ({}s) expired'.format(timeout),
45             arguments=args, stdin=stdin, stdout=stdout, stderr=stderr,
46             status=status) from e
47     status = p.wait()
48     if expect and status not in expect:
49         raise _error.CommandError(
50             msg='unexpected exit status ({} not in {})'.format(status, expect),
51             args=args, stdin=stdin, stdout=stdout, stderr=stderr,
52             status=status)
53     return (status, stdout, stderr)
54
55 def invocation_difference(a_status, a_stdout, a_stderr,
56                           b_status, b_stdout, b_stderr):
57     for (name, a, b) in [
58         ('stderr', a_stderr, b_stderr),
59         ('status', a_status, b_status),
60         ('stdout', a_stdout, b_stdout),
61         ]:
62         if a != b:
63             return (name, a, b)
64
65 def format_invocation_difference(name, a, b):
66     if name == 'status':
67         return 'missmatched {}, expected {!r} but got {!r}'.format(name, a, b)
68     else:
69         return '\n'.join([
70                 'missmatched {}, expected:'.format(name),
71                 a,
72                 'but got:',
73                 b,
74                 ])
75
76
77 class TemporaryDirectory (object):
78     """A temporary directory for testing answers
79
80     >>> t = TemporaryDirectory()
81
82     Basic command execution:
83
84     >>> t.invoke('/bin/sh', 'touch a b c')
85     (0, '', '')
86     >>> t.invoke('/bin/sh', 'ls')
87     (0, 'a\nb\nc\n', '')
88
89     Captured stdout and stderr have instances of the random temporary
90     directory name normalized for easy comparison:
91
92     >>> t.invoke('/bin/sh', 'pwd')
93     (0, '/tmp/TemporaryDirectory-XXXXXX\n', '')
94
95     >>> t.cleanup()
96     """
97     def __init__(self):
98         self.prefix = '{}-'.format(type(self).__name__)
99         self.tempdir = _tempfile.TemporaryDirectory(prefix=self.prefix)
100
101     def cleanup(self):
102         if self.tempdir:
103             self.tempdir.cleanup()
104             self.tempdir = None
105
106     def __del__(self):
107         self.cleanup()
108
109     def __enter__(self):
110         return self
111
112     def __exit__(self, type, value, traceback):
113         self.cleanup()
114
115     def invoke(self, interpreter, text, universal_newlines=True, **kwargs):
116         if not self.tempdir:
117             raise RuntimeError(
118                 'cannot invoke() on a cleaned up {}'.format(
119                     type(self).__name__))
120         with _tempfile.NamedTemporaryFile(
121                 mode='w', prefix='{}script-'.format(self.prefix)
122                 ) as tempscript:
123             tempscript.write(text)
124             tempscript.flush()
125             status,stdout,stderr = invoke(
126                 args=[interpreter, tempscript.name],
127                 cwd=self.tempdir.name,
128                 universal_newlines=universal_newlines,
129                 **kwargs)
130             dirname = _os_path.basename(self.tempdir.name)
131         if stdout:
132             stdout = stdout.replace(dirname, '{}XXXXXX'.format(self.prefix))
133         if stderr:
134             stderr = stderr.replace(dirname, '{}XXXXXX'.format(self.prefix))
135         return (status, stdout, stderr)