ui.wsgi: Add a preliminary WSGI/HTML user interface
[quizzer.git] / quizzer / ui / wsgi.py
1 # Copyright
2
3 import logging as _logging
4 import select as _select
5 import socket as _socket
6 import re as _re
7 import urllib.parse as _urllib_parse
8 import wsgiref.simple_server as _wsgiref_simple_server
9
10 from . import UserInterface as _UserInterface
11
12
13 LOG = _logging.getLogger(__name__)
14
15
16 class HandlerError (Exception):
17     def __init__(self, code, msg, headers=[]):
18         super(HandlerError, self).__init__('{} {}'.format(code, msg))
19         self.code = code
20         self.msg = msg
21         self.headers = headers
22
23
24 class HandlerErrorApp (object):
25     """Catch HandlerErrors and return HTTP error pages.
26     """
27     def __init__(self, app):
28         self.app = app
29
30     def __call__(self, environ, start_response):
31         try:
32             return self.app(environ, start_response)
33         except HandlerError as e:
34             LOG.error(e)
35             start_response('{} {}'.format(e.code, e.msg), e.headers)
36             return []
37
38
39 class WSGI_DataObject (object):
40     "Useful WSGI utilities for handling POST data"
41     def __init__(self, **kwargs):
42         super(WSGI_DataObject, self).__init__(**kwargs)
43
44         # Maximum input we will accept when REQUEST_METHOD is POST
45         # 0 ==> unlimited input
46         self.maxlen = 0
47
48     def ok_response(self, environ, start_response, content=None,
49                     encoding=None, content_type='application/octet-stream',
50                     headers=[]):
51         response = '200 OK'
52         if content is None:
53             content_length = 0
54         else:
55             if encoding:
56                 if hasattr(content, 'encode'):
57                     content = content.encode(encoding)
58                 content_type = '{}; charset={}'.format(
59                     content_type, encoding.upper())
60             content_length = len(content)
61         start_response(response, [
62                 ('Content-Type', content_type),
63                 ('Content-Length', str(content_length)),
64                 ])
65         if content is None:
66             return []
67         return [content]
68
69     def _parse_query(self, query, encoding='utf-8'):
70         if len(query) == 0:
71             return {}
72         data = _urllib_parse.parse_qs(
73             query, keep_blank_values=True, strict_parsing=True)
74         data = {str(k, encoding): [str(v, encoding) for v in vs]
75                 for k,vs in data.items()}
76         for k,v in data.items():
77             if len(v) == 1:
78                 data[k] = v[0]
79         return data
80
81     def post_data(self, environ):
82         if environ['REQUEST_METHOD'] != 'POST':
83             raise HandlerError(404, 'Not Found')
84         post_data = self._read_post_data(environ)
85         return self._parse_post(post_data)
86
87     def _parse_post(self, post):
88         return self._parse_query(post)
89
90     def _read_post_data(self, environ):
91         try:
92             clen = int(environ.get('CONTENT_LENGTH', '0'))
93         except ValueError:
94             clen = 0
95         if clen != 0:
96             if self.maxlen > 0 and clen > self.maxlen:
97                 raise HandlerError(413,  'Request Entity Too Large')
98             return environ['wsgi.input'].read(clen)
99         return ''
100
101
102 class QuestionApp (WSGI_DataObject):
103     """WSGI client serving quiz questions
104
105     For details on WGSI, see `PEP 333`_.
106
107     .. _PEP 333: http://www.python.org/dev/peps/pep-0333/
108     """
109     def __init__(self, ui, **kwargs):
110         super(QuestionApp, self).__init__(**kwargs)
111         self.ui = ui
112         self.urls = [
113             (_re.compile('^$'), self._index),
114             (_re.compile('^question/'), self._question),
115             (_re.compile('^answer/'), self._answer),
116             (_re.compile('^results/'), self._results),
117             ]
118         self.setting = 'quizzer'
119
120     def __call__(self, environ, start_response):
121         "WSGI entry point"
122         path = environ.get('PATH_INFO', '').lstrip('/')
123         for regexp,callback in self.urls:
124             match = regexp.match(path)
125             if match is not None:
126                 setting = '{}.url_args'.format(self.setting)
127                 environ[setting] = match.groups()
128                 return callback(environ, start_response)
129         raise HandlerError(404, 'Not Found')
130
131     def _index(self, environ, start_response):
132         if self.ui.stack:
133             return self._start(environ, start_response)
134         else:
135             return self._results(environ, start_response)
136
137     def _start(self, environ, start_response):
138         lines = [
139             '<html>',
140             '  <head>',
141             '    <title>Quizzer</title>',
142             '  </head>',
143             '  <body>',
144             '    <h1>Quizzer</h1>',
145             ]
146         if self.ui.quiz.introduction:
147             lines.append('    <p>{}</p>'.format(self.ui.quiz.introduction))
148         lines.extend([
149                 '    <p><a href="question/">Start the quiz</a>.</p>',
150                 '  </body>',
151                 '</html>',
152                 ])
153         content = '\n'.join(lines)
154         return self.ok_response(
155             environ, start_response, content=content, encoding='utf-8',
156             content_type='text/html')
157
158     def _results(self, environ, start_response):
159         lines = [
160             '<html>',
161             '  <head>',
162             '    <title>Quizzer</title>',
163             '  </head>',
164             '  <body>',
165             '    <h1>Results</h1>',
166             ]
167         for question in self.ui.quiz:
168             if question.id in self.ui.answers:
169                 lines.extend(self._format_result(question=question))
170         lines.extend(self._format_totals())
171         lines.extend([
172                 '  </body>',
173                 '</html>',
174                 ])
175         content = '\n'.join(lines)
176         return self.ok_response(
177             environ, start_response, content=content, encoding='utf-8',
178             content_type='text/html')
179
180     def _format_result(self, question):
181         answers = self.ui.answers.get(question.id, [])
182         la = len(answers)
183         lc = len([a for a in answers if a['correct']])
184         lines = [
185             '<h2>Question</h2>',
186             '<p>{}</p>'.format(question.format_prompt(newline='<br />')),
187             '<p>Answers: {}/{} ({:.2f})</p>'.format(lc, la, float(lc)/la),
188             ]
189         if answers:
190             lines.append('<ol>')
191             for answer in answers:
192                 if answer['correct']:
193                     correct = 'correct'
194                 else:
195                     correct = 'incorrect'
196                 ans = answer['answer']
197                 if question.multiline:
198                     ans = '\n'.join(ans)
199                 lines.extend([
200                         '<li>',
201                         '<p>You answered:</p>',
202                         '<pre>{}</pre>'.format(ans),
203                         '<p>which was {}</p>'.format(correct),
204                         '</li>',
205                         ])
206             lines.append('</ol>')
207         return lines
208
209     def _format_totals(self):
210         answered = self.ui.answers.get_answered(questions=self.ui.quiz)
211         correctly_answered = self.ui.answers.get_correctly_answered(
212             questions=self.ui.quiz)
213         la = len(answered)
214         lc = len(correctly_answered)
215         return [
216             '<h2>Totals</h2>',
217             '<p>Answered {} of {} questions.'.format(la, len(self.ui.quiz)),
218             'Of the answered questions,',
219             '{} ({:.2f}) were answered correctly.'.format(lc, float(lc)/la),
220             '</p>',
221             ]
222
223     def _question(self, environ, start_response):
224         if environ['REQUEST_METHOD'] == 'POST':
225             data = self.post_data(environ)
226         else:
227             data = {}
228         question = data.get('question', None)
229         if not question:
230             question = self.ui.get_question()
231             # put the question back on the stack until it's answered
232             self.ui.stack.insert(0, question)
233         if question is None:
234             return self._index(environ, start_response)
235         if question.multiline:
236             answer_element = (
237                 '<textarea rows="5" cols="60" name="answer"></textarea>')
238         else:
239             answer_element = '<input type="text" size="60" name="answer">'
240         lines = [
241             '<html>',
242             '  <head>',
243             '    <title>Quizzer</title>',
244             '  </head>',
245             '  <body>',
246             '    <h1>Question</h1>',
247             '    <form name="question" action="../answer/" method="post">',
248             '      <input type="hidden" name="question" value="{}">'.format(
249                 question.id),
250             '      <p>{}</p>'.format(
251                 question.format_prompt(newline='<br/>')),
252             answer_element,
253             '      <br />',
254             '      <input type="submit" value="submit">',
255             '    </form>',
256             '  </body>',
257             '</html>',
258             '',
259             ]
260         content = '\n'.join(lines)
261         return self.ok_response(
262             environ, start_response, content=content, encoding='utf-8',
263             content_type='text/html')
264
265     def _answer(self, environ, start_response):
266         data = self.post_data(environ)
267         question_id = data.get('question', None)
268         raw_answer = data.get('answer', None)
269         if not question_id or not raw_answer:
270             LOG.error(data)
271             raise HandlerError(422, 'Unprocessable Entity')
272         try:
273             question = self.ui.quiz.get(id=question_id)
274         except KeyError as e:
275             raise HandlerError(404, 'Not Found') from e
276         if question.multiline:
277             answer = raw_answer.splitlines()
278         else:
279             answer = raw_answer
280         correct,details = self.ui.process_answer(
281             question=question, answer=answer)
282         link_target = '../question/'
283         if correct:
284             correct_msg = 'correct'
285             self.ui.stack = [q for q in self.ui.stack if q != question]
286             if self.ui.stack:
287                 link_text = 'Next question'
288             else:
289                 link_text = 'Results'
290                 link_target = '../results/'
291         else:
292             correct_msg = 'incorrect'
293             link_text = 'Try again'
294         if details:
295             details = '<p>{}</p>'.format(details)
296         lines = [
297             '<html>',
298             '  <head>',
299             '    <title>Quizzer</title>',
300             '  </head>',
301             '  <body>',
302             '    <h1>Answer</h1>',
303             '    <p>{}</p>'.format(
304                 question.format_prompt(newline='<br/>')),
305             '    <pre>{}</pre>'.format(raw_answer),
306             '    <p>{}</p>'.format(correct_msg),
307             details or '',
308             '    <a href="{}">{}</a>.'.format(link_target, link_text),
309             '  </body>',
310             '</html>',
311             '',
312             ]
313         content = '\n'.join(lines)
314         return self.ok_response(
315             environ, start_response, content=content, encoding='utf-8',
316             content_type='text/html')
317
318
319 class HTMLInterface (_UserInterface):
320     def run(self, host='', port=8000):
321         app = QuestionApp(ui=self)
322         app = HandlerErrorApp(app=app)
323         server = _wsgiref_simple_server.make_server(
324             host=host, port=port, app=app)
325         self._log_start(host=host, port=port)
326         try:
327             server.serve_forever()
328         except _select.error as e:
329             if len(e.args) == 2 and e.args[1] == 'Interrupted system call':
330                 pass
331             else:
332                 raise
333
334     def _log_start(self, host, port):
335         if not host:
336             host = _socket.getfqdn()
337         LOG.info('serving on {}:{}'.format(host, port))
338         try:
339             addrs = _socket.getaddrinfo(host=host, port=port)
340         except _socket.gaierror as e:
341             LOG.warning(e)
342         else:
343             seen = set()
344             for family,type_,proto,canonname,sockaddr in addrs:
345                 c = canonname or host
346                 if (c, sockaddr) not in seen:
347                     LOG.info('address: {} {}'.format(c, sockaddr))
348                     seen.add((c, sockaddr))