-#!/usr/bin/env python
+#!/usr/bin/env python3.3
import quizzer.cli
__version__ = '0.1'
LOG = _logging.getLogger(__name__)
-LOG.setLevel(_logging.ERROR)
+LOG.setLevel(_logging.DEBUG)
+LOG.addHandler(_logging.StreamHandler())
--- /dev/null
+class CommandError (RuntimeError):
+ def __init__(self, arguments, stdin=None, stdout=None, stderr=None,
+ status=None, msg=None):
+ error_msg = 'error executing {}'.format(arguments)
+ if msg:
+ error_msg = '{}: {}'.format(error_msg, msg)
+ super(CommandError, self).__init__(error_msg)
+ self.arguments = arguments
+ self.stdin = stdin
+ self.stdout = stdout
+ self.stderr = stderr
+ self.status = status
+import logging as _logging
+import tempfile as _tempfile
+
+from . import error as _error
+from . import util as _util
+
+
+LOG = _logging.getLogger(__name__)
QUESTION_CLASS = {}
'interpreter',
'setup',
'teardown',
+ 'timeout',
]
def __setstate__(self, state):
if 'interpreter' not in state:
state['interpreter'] = 'sh' # POSIX-compatible shell
+ if 'timeout' not in state:
+ state['timeout'] = 3
for key in ['setup', 'teardown']:
if key not in state:
state[key] = []
super(ScriptQuestion, self).__setstate__(state)
def check(self, answer):
- script = '\n'.join(self.setup + [answer] + self.teardown)
- raise ValueError(script)
-
+ # figure out the expected values
+ e_status,e_stdout,e_stderr = self._invoke(self.answer)
+ # get values for the user-supplied answer
+ try:
+ a_status,a_stdout,a_stderr = self._invoke(answer)
+ except _error.CommandError as e:
+ LOG.warning(e)
+ return False
+ for (name, e, a) in [
+ ('stderr', e_stderr, a_stderr),
+ ('status', e_status, a_status),
+ ('stdout', e_stdout, a_stdout),
+ ]:
+ if a != e:
+ if name == 'status':
+ LOG.info(
+ 'missmatched {}, expected {!r} but got {!r}'.format(
+ name, e, a))
+ else:
+ LOG.info('missmatched {}, expected:'.format(name))
+ LOG.info(e)
+ LOG.info('but got:')
+ LOG.info(a)
+ return False
+ return True
+
+ def _invoke(self, answer):
+ with _tempfile.TemporaryDirectory(
+ prefix='{}-'.format(type(self).__name__),
+ ) as tempdir:
+ script = '\n'.join(self.setup + [answer] + self.teardown)
+ return _util.invoke(
+ args=[self.interpreter],
+ stdin=script,
+ cwd=tempdir,
+ universal_newlines=True,
+ timeout=self.timeout,)
for name,obj in list(locals().items()):
if name.startswith('_'):
answers = _answerdb.AnswerDatabase()
self.answers = answers
if stack is None:
- stack = quiz.leaf_questions()
+ stack = self.answers.get_never_correctly_answered(
+ questions=quiz.leaf_questions())
self.stack = stack
def run(self):
--- /dev/null
+import subprocess as _subprocess
+
+from . import error as _error
+
+
+def invoke(args, stdin=None, universal_newlines=False, timeout=None,
+ expect=None, **kwargs):
+ if stdin:
+ stdin_pipe = _subprocess.PIPE
+ else:
+ stdin_pipe = None
+ try:
+ p = _subprocess.Popen(
+ args, stdin=stdin_pipe, stdout=_subprocess.PIPE,
+ stderr=_subprocess.PIPE, universal_newlines=universal_newlines,
+ **kwargs)
+ except FileNotFoundError as e:
+ raise _error.CommandError(arguments=args, stdin=stdin) from e
+ try:
+ stdout,stderr = p.communicate(input=stdin, timeout=timeout)
+ except _subprocess.TimeoutExpired as e:
+ p.kill()
+ stdout,stderr = p.communicate()
+ status = p.wait()
+ raise _error.CommandError(
+ msg='timeout ({}s) expired'.format(timeout),
+ arguments=args, stdin=stdin, stdout=stdout, stderr=stderr,
+ status=status) from e
+ status = p.wait()
+ if expect and status not in expect:
+ raise _error.CommandError(
+ msg='unexpected exit status ({} not in {})'.format(status, expect),
+ args=args, stdin=stdin, stdout=stdout, stderr=stderr,
+ status=status)
+ return (status, stdout, stderr)