From: W. Trevor King Date: Wed, 13 Mar 2013 21:30:23 +0000 (-0400) Subject: ui.wsgi: Add a preliminary WSGI/HTML user interface X-Git-Tag: v0.4~10 X-Git-Url: http://git.tremily.us/?a=commitdiff_plain;h=f959c94d6b55d683a082f76888bb0d7c4b1ef2e6;p=quizzer.git ui.wsgi: Add a preliminary WSGI/HTML user interface The basic WSGI handling (e.g. HandlerErrorApp and WSGI_DataObject) is largely based on code I'd written for Bugs Everywhere. --- diff --git a/quizzer/ui/__init__.py b/quizzer/ui/__init__.py index 011a84b..04c560c 100644 --- a/quizzer/ui/__init__.py +++ b/quizzer/ui/__init__.py @@ -19,7 +19,7 @@ import importlib as _importlib from .. import answerdb as _answerdb -INTERFACES = ['cli'] +INTERFACES = ['cli', 'wsgi'] class UserInterface (object): diff --git a/quizzer/ui/wsgi.py b/quizzer/ui/wsgi.py new file mode 100644 index 0000000..303db6d --- /dev/null +++ b/quizzer/ui/wsgi.py @@ -0,0 +1,348 @@ +# Copyright + +import logging as _logging +import select as _select +import socket as _socket +import re as _re +import urllib.parse as _urllib_parse +import wsgiref.simple_server as _wsgiref_simple_server + +from . import UserInterface as _UserInterface + + +LOG = _logging.getLogger(__name__) + + +class HandlerError (Exception): + def __init__(self, code, msg, headers=[]): + super(HandlerError, self).__init__('{} {}'.format(code, msg)) + self.code = code + self.msg = msg + self.headers = headers + + +class HandlerErrorApp (object): + """Catch HandlerErrors and return HTTP error pages. + """ + def __init__(self, app): + self.app = app + + def __call__(self, environ, start_response): + try: + return self.app(environ, start_response) + except HandlerError as e: + LOG.error(e) + start_response('{} {}'.format(e.code, e.msg), e.headers) + return [] + + +class WSGI_DataObject (object): + "Useful WSGI utilities for handling POST data" + def __init__(self, **kwargs): + super(WSGI_DataObject, self).__init__(**kwargs) + + # Maximum input we will accept when REQUEST_METHOD is POST + # 0 ==> unlimited input + self.maxlen = 0 + + def ok_response(self, environ, start_response, content=None, + encoding=None, content_type='application/octet-stream', + headers=[]): + response = '200 OK' + if content is None: + content_length = 0 + else: + if encoding: + if hasattr(content, 'encode'): + content = content.encode(encoding) + content_type = '{}; charset={}'.format( + content_type, encoding.upper()) + content_length = len(content) + start_response(response, [ + ('Content-Type', content_type), + ('Content-Length', str(content_length)), + ]) + if content is None: + return [] + return [content] + + def _parse_query(self, query, encoding='utf-8'): + if len(query) == 0: + return {} + data = _urllib_parse.parse_qs( + query, keep_blank_values=True, strict_parsing=True) + data = {str(k, encoding): [str(v, encoding) for v in vs] + for k,vs in data.items()} + for k,v in data.items(): + if len(v) == 1: + data[k] = v[0] + return data + + def post_data(self, environ): + if environ['REQUEST_METHOD'] != 'POST': + raise HandlerError(404, 'Not Found') + post_data = self._read_post_data(environ) + return self._parse_post(post_data) + + def _parse_post(self, post): + return self._parse_query(post) + + def _read_post_data(self, environ): + try: + clen = int(environ.get('CONTENT_LENGTH', '0')) + except ValueError: + clen = 0 + if clen != 0: + if self.maxlen > 0 and clen > self.maxlen: + raise HandlerError(413, 'Request Entity Too Large') + return environ['wsgi.input'].read(clen) + return '' + + +class QuestionApp (WSGI_DataObject): + """WSGI client serving quiz questions + + For details on WGSI, see `PEP 333`_. + + .. _PEP 333: http://www.python.org/dev/peps/pep-0333/ + """ + def __init__(self, ui, **kwargs): + super(QuestionApp, self).__init__(**kwargs) + self.ui = ui + self.urls = [ + (_re.compile('^$'), self._index), + (_re.compile('^question/'), self._question), + (_re.compile('^answer/'), self._answer), + (_re.compile('^results/'), self._results), + ] + self.setting = 'quizzer' + + def __call__(self, environ, start_response): + "WSGI entry point" + path = environ.get('PATH_INFO', '').lstrip('/') + for regexp,callback in self.urls: + match = regexp.match(path) + if match is not None: + setting = '{}.url_args'.format(self.setting) + environ[setting] = match.groups() + return callback(environ, start_response) + raise HandlerError(404, 'Not Found') + + def _index(self, environ, start_response): + if self.ui.stack: + return self._start(environ, start_response) + else: + return self._results(environ, start_response) + + def _start(self, environ, start_response): + lines = [ + '', + ' ', + ' Quizzer', + ' ', + ' ', + '

Quizzer

', + ] + if self.ui.quiz.introduction: + lines.append('

{}

'.format(self.ui.quiz.introduction)) + lines.extend([ + '

Start the quiz.

', + ' ', + '', + ]) + content = '\n'.join(lines) + return self.ok_response( + environ, start_response, content=content, encoding='utf-8', + content_type='text/html') + + def _results(self, environ, start_response): + lines = [ + '', + ' ', + ' Quizzer', + ' ', + ' ', + '

Results

', + ] + for question in self.ui.quiz: + if question.id in self.ui.answers: + lines.extend(self._format_result(question=question)) + lines.extend(self._format_totals()) + lines.extend([ + ' ', + '', + ]) + content = '\n'.join(lines) + return self.ok_response( + environ, start_response, content=content, encoding='utf-8', + content_type='text/html') + + def _format_result(self, question): + answers = self.ui.answers.get(question.id, []) + la = len(answers) + lc = len([a for a in answers if a['correct']]) + lines = [ + '

Question

', + '

{}

'.format(question.format_prompt(newline='
')), + '

Answers: {}/{} ({:.2f})

'.format(lc, la, float(lc)/la), + ] + if answers: + lines.append('
    ') + for answer in answers: + if answer['correct']: + correct = 'correct' + else: + correct = 'incorrect' + ans = answer['answer'] + if question.multiline: + ans = '\n'.join(ans) + lines.extend([ + '
  1. ', + '

    You answered:

    ', + '
    {}
    '.format(ans), + '

    which was {}

    '.format(correct), + '
  2. ', + ]) + lines.append('
') + return lines + + def _format_totals(self): + answered = self.ui.answers.get_answered(questions=self.ui.quiz) + correctly_answered = self.ui.answers.get_correctly_answered( + questions=self.ui.quiz) + la = len(answered) + lc = len(correctly_answered) + return [ + '

Totals

', + '

Answered {} of {} questions.'.format(la, len(self.ui.quiz)), + 'Of the answered questions,', + '{} ({:.2f}) were answered correctly.'.format(lc, float(lc)/la), + '

', + ] + + def _question(self, environ, start_response): + if environ['REQUEST_METHOD'] == 'POST': + data = self.post_data(environ) + else: + data = {} + question = data.get('question', None) + if not question: + question = self.ui.get_question() + # put the question back on the stack until it's answered + self.ui.stack.insert(0, question) + if question is None: + return self._index(environ, start_response) + if question.multiline: + answer_element = ( + '') + else: + answer_element = '' + lines = [ + '', + ' ', + ' Quizzer', + ' ', + ' ', + '

Question

', + '
', + ' '.format( + question.id), + '

{}

'.format( + question.format_prompt(newline='
')), + answer_element, + '
', + ' ', + '
', + ' ', + '', + '', + ] + content = '\n'.join(lines) + return self.ok_response( + environ, start_response, content=content, encoding='utf-8', + content_type='text/html') + + def _answer(self, environ, start_response): + data = self.post_data(environ) + question_id = data.get('question', None) + raw_answer = data.get('answer', None) + if not question_id or not raw_answer: + LOG.error(data) + raise HandlerError(422, 'Unprocessable Entity') + try: + question = self.ui.quiz.get(id=question_id) + except KeyError as e: + raise HandlerError(404, 'Not Found') from e + if question.multiline: + answer = raw_answer.splitlines() + else: + answer = raw_answer + correct,details = self.ui.process_answer( + question=question, answer=answer) + link_target = '../question/' + if correct: + correct_msg = 'correct' + self.ui.stack = [q for q in self.ui.stack if q != question] + if self.ui.stack: + link_text = 'Next question' + else: + link_text = 'Results' + link_target = '../results/' + else: + correct_msg = 'incorrect' + link_text = 'Try again' + if details: + details = '

{}

'.format(details) + lines = [ + '', + ' ', + ' Quizzer', + ' ', + ' ', + '

Answer

', + '

{}

'.format( + question.format_prompt(newline='
')), + '
{}
'.format(raw_answer), + '

{}

'.format(correct_msg), + details or '', + ' {}.'.format(link_target, link_text), + ' ', + '', + '', + ] + content = '\n'.join(lines) + return self.ok_response( + environ, start_response, content=content, encoding='utf-8', + content_type='text/html') + + +class HTMLInterface (_UserInterface): + def run(self, host='', port=8000): + app = QuestionApp(ui=self) + app = HandlerErrorApp(app=app) + server = _wsgiref_simple_server.make_server( + host=host, port=port, app=app) + self._log_start(host=host, port=port) + try: + server.serve_forever() + except _select.error as e: + if len(e.args) == 2 and e.args[1] == 'Interrupted system call': + pass + else: + raise + + def _log_start(self, host, port): + if not host: + host = _socket.getfqdn() + LOG.info('serving on {}:{}'.format(host, port)) + try: + addrs = _socket.getaddrinfo(host=host, port=port) + except _socket.gaierror as e: + LOG.warning(e) + else: + seen = set() + for family,type_,proto,canonname,sockaddr in addrs: + c = canonname or host + if (c, sockaddr) not in seen: + LOG.info('address: {} {}'.format(c, sockaddr)) + seen.add((c, sockaddr))