question: Add the Question.multimedia attribute
[quizzer.git] / quizzer / ui / wsgi.py
1 # Copyright (C) 2013 W. Trevor King <wking@tremily.us>
2 #
3 # This file is part of quizzer.
4 #
5 # quizzer is free software: you can redistribute it and/or modify it under the
6 # terms of the GNU General Public License as published by the Free Software
7 # Foundation, either version 3 of the License, or (at your option) any later
8 # version.
9 #
10 # quizzer is distributed in the hope that it will be useful, but WITHOUT ANY
11 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License along with
15 # quizzer.  If not, see <http://www.gnu.org/licenses/>.
16
17 import hashlib as _hashlib
18 import logging as _logging
19 import os.path as _os_path
20 import select as _select
21 import socket as _socket
22 import re as _re
23 import urllib.parse as _urllib_parse
24 import wsgiref.simple_server as _wsgiref_simple_server
25
26 from .. import question as _question
27 from . import UserInterface as _UserInterface
28
29
30 LOG = _logging.getLogger(__name__)
31
32
33 class HandlerError (Exception):
34     def __init__(self, code, msg, headers=[]):
35         super(HandlerError, self).__init__('{} {}'.format(code, msg))
36         self.code = code
37         self.msg = msg
38         self.headers = headers
39
40
41 class HandlerErrorApp (object):
42     """Catch HandlerErrors and return HTTP error pages.
43     """
44     def __init__(self, app):
45         self.app = app
46
47     def __call__(self, environ, start_response):
48         try:
49             return self.app(environ, start_response)
50         except HandlerError as e:
51             LOG.error(e)
52             start_response('{} {}'.format(e.code, e.msg), e.headers)
53             return []
54
55
56 class WSGI_UrlObject (object):
57     """Useful WSGI utilities for handling URL delegation.
58     """
59     def __init__(self, urls=tuple(), default_handler=None, setting='quizzer',
60                  *args, **kwargs):
61         super(WSGI_UrlObject, self).__init__(*args, **kwargs)
62         self.urls = [
63             (_re.compile(regexp),callback) for regexp,callback in urls]
64         self.default_handler = default_handler
65         self.setting = setting
66
67     def __call__(self, environ, start_response):
68         "WSGI entry point"
69         path = environ.get('PATH_INFO', '').lstrip('/')
70         for regexp,callback in self.urls:
71             match = regexp.match(path)
72             if match is not None:
73                 setting = '{}.url_args'.format(self.setting)
74                 environ[setting] = match.groups()
75                 return callback(environ, start_response)
76         if self.default_handler is None:
77             raise HandlerError(404, 'Not Found')
78         return self.default_handler(environ, start_response)
79
80
81 class WSGI_DataObject (object):
82     "Useful WSGI utilities for handling POST data"
83     def __init__(self, **kwargs):
84         super(WSGI_DataObject, self).__init__(**kwargs)
85
86         # Maximum input we will accept when REQUEST_METHOD is POST
87         # 0 ==> unlimited input
88         self.maxlen = 0
89
90     def ok_response(self, environ, start_response, content=None,
91                     encoding=None, content_type='application/octet-stream',
92                     headers=[]):
93         response = '200 OK'
94         if content is None:
95             content_length = 0
96         else:
97             if encoding:
98                 if hasattr(content, 'encode'):
99                     content = content.encode(encoding)
100                 content_type = '{}; charset={}'.format(
101                     content_type, encoding.upper())
102             content_length = len(content)
103         start_response(response, [
104                 ('Content-Type', content_type),
105                 ('Content-Length', str(content_length)),
106                 ])
107         if content is None:
108             return []
109         return [content]
110
111     def _parse_query(self, query, encoding='utf-8'):
112         if len(query) == 0:
113             return {}
114         data = _urllib_parse.parse_qs(
115             query, keep_blank_values=True, strict_parsing=True)
116         data = {str(k, encoding): [str(v, encoding) for v in vs]
117                 for k,vs in data.items()}
118         for k,v in data.items():
119             if len(v) == 1:
120                 data[k] = v[0]
121         return data
122
123     def post_data(self, environ):
124         if environ['REQUEST_METHOD'] != 'POST':
125             raise HandlerError(404, 'Not Found')
126         post_data = self._read_post_data(environ)
127         return self._parse_post(post_data)
128
129     def _parse_post(self, post):
130         return self._parse_query(post)
131
132     def _read_post_data(self, environ):
133         try:
134             clen = int(environ.get('CONTENT_LENGTH', '0'))
135         except ValueError:
136             clen = 0
137         if clen != 0:
138             if self.maxlen > 0 and clen > self.maxlen:
139                 raise HandlerError(413,  'Request Entity Too Large')
140             return environ['wsgi.input'].read(clen)
141         return ''
142
143
144 class QuestionApp (WSGI_UrlObject, WSGI_DataObject):
145     """WSGI client serving quiz questions
146
147     For details on WGSI, see `PEP 333`_.
148
149     .. _PEP 333: http://www.python.org/dev/peps/pep-0333/
150     """
151     def __init__(self, ui, **kwargs):
152         super(QuestionApp, self).__init__(
153             urls=[
154                 ('^$', self._index),
155                 ('^question/', self._question),
156                 ('^answer/', self._answer),
157                 ('^results/', self._results),
158                 ('^media/([^/]+)', self._media),
159             ],
160             **kwargs)
161         self.ui = ui
162
163         self.user_regexp = _re.compile('^\w+$')
164         self._local_media = {}
165
166     def _index(self, environ, start_response):
167         lines = [
168             '<html>',
169             '  <head>',
170             '    <title>Quizzer</title>',
171             '  </head>',
172             '  <body>',
173             '    <h1>Quizzer</h1>',
174             ]
175         if self.ui.quiz.introduction:
176             lines.append('    <p>{}</p>'.format(self.ui.quiz.introduction))
177         lines.extend([
178                 '    <form name="question" action="../question/" method="post">',
179                 '      <p>Username: <input type="text" size="20" name="user">',
180                 '        (required, alphanumeric)</p>',
181                 '      <input type="submit" value="Start the quiz">',
182                 '    </form>',
183                 '  </body>',
184                 '</html>',
185                 ])
186         content = '\n'.join(lines)
187         return self.ok_response(
188             environ, start_response, content=content, encoding='utf-8',
189             content_type='text/html')
190
191     def _results(self, environ, start_response):
192         data = self.post_data(environ)
193         user = data.get('user', '')
194         if not self.user_regexp.match(user):
195             raise HandlerError(303, 'See Other', headers=[('Location', '/')])
196         lines = [
197             '<html>',
198             '  <head>',
199             '    <title>Quizzer</title>',
200             '  </head>',
201             '  <body>',
202             '    <h1>Results</h1>',
203             ]
204         answers = self.ui.answers.get_answers(user=user)
205         for question in self.ui.quiz:
206             if question.id in answers:
207                 lines.extend(self._format_result(question=question, user=user))
208         lines.extend(self._format_totals(user=user))
209         lines.extend([
210                 '  </body>',
211                 '</html>',
212                 ])
213         content = '\n'.join(lines)
214         return self.ok_response(
215             environ, start_response, content=content, encoding='utf-8',
216             content_type='text/html')
217
218     def _format_result(self, question, user):
219         answers = self.ui.answers.get_answers(user=user).get(question.id, [])
220         la = len(answers)
221         lc = len([a for a in answers if a['correct']])
222         lines = [
223             '<h2>Question</h2>',
224             self._format_prompt(question=question),
225             '<p>Answers: {}/{} ({:.2f})</p>'.format(lc, la, float(lc)/la),
226             ]
227         if answers:
228             lines.append('<ol>')
229             for answer in answers:
230                 if answer['correct']:
231                     correct = 'correct'
232                 else:
233                     correct = 'incorrect'
234                 ans = answer['answer']
235                 if question.multiline:
236                     ans = '\n'.join(ans)
237                 lines.extend([
238                         '<li>',
239                         '<p>You answered:</p>',
240                         '<pre>{}</pre>'.format(ans),
241                         '<p>which was {}</p>'.format(correct),
242                         '</li>',
243                         ])
244             lines.append('</ol>')
245         return lines
246
247     def _format_totals(self, user=None):
248         answered = self.ui.answers.get_answered(
249             questions=self.ui.quiz, user=user)
250         correctly_answered = self.ui.answers.get_correctly_answered(
251             questions=self.ui.quiz, user=user)
252         la = len(answered)
253         lc = len(correctly_answered)
254         return [
255             '<h2>Totals</h2>',
256             '<p>Answered {} of {} questions.'.format(la, len(self.ui.quiz)),
257             'Of the answered questions,',
258             '{} ({:.2f}) were answered correctly.'.format(lc, float(lc)/la),
259             '</p>',
260             ]
261
262     def _question(self, environ, start_response):
263         if environ['REQUEST_METHOD'] == 'POST':
264             data = self.post_data(environ)
265         else:
266             data = {}
267         user = data.get('user', '')
268         if not self.user_regexp.match(user):
269             raise HandlerError(303, 'See Other', headers=[('Location', '/')])
270         question = data.get('question', None)
271         if not question:
272             question = self.ui.get_question(user=user)
273             # put the question back on the stack until it's answered
274             self.ui.stack[user].insert(0, question)
275         if question is None:
276             raise HandlerError(
277                 307, 'Temporary Redirect', headers=[('Location', '/results/')])
278         if (isinstance(question, _question.ChoiceQuestion) and
279                 question.display_choices):
280             if question.multiple_answers:
281                 itype = 'checkbox'
282             else:
283                 itype = 'radio'
284             choices = [
285                 ('<input type="{0}" name="answer" value="{1}"/>{1}<br/>'
286                  ).format(itype, answer)
287                 for answer in question.answer]
288             if question.accept_all:
289                 choices.extend([
290                     ('<input type="{}" name="answer" value="answer-other"/>'
291                      ).format(itype),
292                     '<input type="text" size="60" name="answer-other"/>'])
293             answer_element = '\n'.join(choices)
294         elif question.multiline:
295             answer_element = (
296                 '<textarea rows="5" cols="60" name="answer"></textarea>')
297         else:
298             answer_element = '<input type="text" size="60" name="answer"/>'
299         lines = [
300             '<html>',
301             '  <head>',
302             '    <title>Quizzer</title>',
303             '  </head>',
304             '  <body>',
305             '    <h1>Question</h1>',
306             '    <form name="question" action="../answer/" method="post">',
307             '      <input type="hidden" name="user" value="{}">'.format(user),
308             '      <input type="hidden" name="question" value="{}">'.format(
309                 question.id),
310             self._format_prompt(question=question),
311             answer_element,
312             '      <br />',
313             '      <input type="submit" value="submit">',
314             '    </form>',
315             '  </body>',
316             '</html>',
317             '',
318             ]
319         content = '\n'.join(lines)
320         return self.ok_response(
321             environ, start_response, content=content, encoding='utf-8',
322             content_type='text/html')
323
324     def _answer(self, environ, start_response):
325         data = self.post_data(environ)
326         user = data.get('user', '')
327         if not self.user_regexp.match(user):
328             raise HandlerError(303, 'See Other', headers=[('Location', '/')])
329         question_id = data.get('question', None)
330         raw_answer = data.get('answer', None)
331         if not question_id or not raw_answer:
332             LOG.error(data)
333             raise HandlerError(422, 'Unprocessable Entity')
334         try:
335             question = self.ui.quiz.get(id=question_id)
336         except KeyError as e:
337             raise HandlerError(404, 'Not Found') from e
338         if (isinstance(question, _question.ChoiceQuestion) and
339                 question.display_choices and
340                 question.accept_all):
341             if raw_answer == 'answer-other':
342                 answer = print_answer = data.get('answer-other', None)
343             elif 'answer-other' in raw_answer:
344                 i = raw_answer.index('answer-other')
345                 raw_answer[i] = data.get('answer-other', None)
346                 answer = print_answer = raw_answer
347             else:
348                 answer = print_answer = raw_answer
349         elif question.multiline:
350             answer = raw_answer.splitlines()
351             print_answer = raw_answer
352         else:
353             answer = print_answer = raw_answer
354         correct,details = self.ui.process_answer(
355             question=question, answer=answer, user=user)
356         link_target = '../question/'
357         if correct:
358             correct_msg = 'correct'
359             self.ui.stack[user] = [q for q in self.ui.stack[user]
360                                    if q != question]
361             if self.ui.stack[user]:
362                 link_text = 'Next question'
363             else:
364                 link_text = 'Results'
365                 link_target = '../results/'
366         else:
367             correct_msg = 'incorrect'
368             link_text = 'Try again'
369         if details:
370             details = '<p>{}</p>'.format(details)
371         lines = [
372             '<html>',
373             '  <head>',
374             '    <title>Quizzer</title>',
375             '  </head>',
376             '  <body>',
377             '    <h1>Answer</h1>',
378             self._format_prompt(question=question),
379             '    <pre>{}</pre>'.format(print_answer),
380             '    <p>{}</p>'.format(correct_msg),
381             details or '',
382             '    <form name="question" action="{}" method="post">'.format(
383                 link_target),
384             '      <input type="hidden" name="user" value="{}">'.format(user),
385             '      <input type="submit" value="{}">'.format(link_text),
386             '    </form>',
387             '  </body>',
388             '</html>',
389             '',
390             ]
391         content = '\n'.join(lines)
392         return self.ok_response(
393             environ, start_response, content=content, encoding='utf-8',
394             content_type='text/html')
395
396     def _format_prompt(self, question):
397         lines = ['<p>{}</p>'.format(question.format_prompt(newline='<br/>\n'))]
398         for multimedia in question.multimedia:
399             lines.append(self._format_multimedia(multimedia=multimedia))
400         return '\n'.join(lines)
401
402     def _format_multimedia(self, multimedia):
403         content_type = multimedia['content-type']
404         if 'path' in multimedia:
405             uid = _hashlib.sha1(
406                 str(multimedia).encode('unicode-escape')
407                 ).hexdigest()
408             path = self.ui.quiz.multimedia_path(multimedia)
409             self._local_media[uid] = {
410                 'content-type': content_type,
411                 'path': path,
412                 }
413             url = '../media/{}'.format(uid)
414         else:
415             raise NotImplementedError(multimedia)
416         if content_type.startswith('image/'):
417             return '<p><img src="{}" /></p>'.format(url)
418         else:
419             raise NotImplementedError(content_type)
420
421     def _media(self, environ, start_response):
422         try:
423             uid, = environ['{}.url_args'.format(self.setting)]
424         except:
425             raise HandlerError(404, 'Not Found')
426         if uid not in self._local_media:
427             raise HandlerError(404, 'Not Found')
428         content_type = self._local_media[uid]['content-type']
429         with open(self._local_media[uid]['path'], 'rb') as f:
430             content = f.read()
431         return self.ok_response(
432             environ, start_response, content=content,
433             content_type=content_type)
434
435
436 class HTMLInterface (_UserInterface):
437     def run(self, host='', port=8000):
438         app = QuestionApp(ui=self)
439         app = HandlerErrorApp(app=app)
440         server = _wsgiref_simple_server.make_server(
441             host=host, port=port, app=app)
442         self._log_start(host=host, port=port)
443         try:
444             server.serve_forever()
445         except _select.error as e:
446             if len(e.args) == 2 and e.args[1] == 'Interrupted system call':
447                 pass
448             else:
449                 raise
450
451     def _log_start(self, host, port):
452         if not host:
453             host = _socket.getfqdn()
454         LOG.info('serving on {}:{}'.format(host, port))
455         try:
456             addrs = _socket.getaddrinfo(host=host, port=port)
457         except _socket.gaierror as e:
458             LOG.warning(e)
459         else:
460             seen = set()
461             for family,type_,proto,canonname,sockaddr in addrs:
462                 c = canonname or host
463                 if (c, sockaddr) not in seen:
464                     LOG.info('address: {} {}'.format(c, sockaddr))
465                     seen.add((c, sockaddr))