question: Move ChoiceQuestion.open_ended to Question.accept_all
[quizzer.git] / quizzer / ui / wsgi.py
1 # Copyright
2
3 import logging as _logging
4 import select as _select
5 import socket as _socket
6 import re as _re
7 import urllib.parse as _urllib_parse
8 import wsgiref.simple_server as _wsgiref_simple_server
9
10 from .. import question as _question
11 from . import UserInterface as _UserInterface
12
13
14 LOG = _logging.getLogger(__name__)
15
16
17 class HandlerError (Exception):
18     def __init__(self, code, msg, headers=[]):
19         super(HandlerError, self).__init__('{} {}'.format(code, msg))
20         self.code = code
21         self.msg = msg
22         self.headers = headers
23
24
25 class HandlerErrorApp (object):
26     """Catch HandlerErrors and return HTTP error pages.
27     """
28     def __init__(self, app):
29         self.app = app
30
31     def __call__(self, environ, start_response):
32         try:
33             return self.app(environ, start_response)
34         except HandlerError as e:
35             LOG.error(e)
36             start_response('{} {}'.format(e.code, e.msg), e.headers)
37             return []
38
39
40 class WSGI_DataObject (object):
41     "Useful WSGI utilities for handling POST data"
42     def __init__(self, **kwargs):
43         super(WSGI_DataObject, self).__init__(**kwargs)
44
45         # Maximum input we will accept when REQUEST_METHOD is POST
46         # 0 ==> unlimited input
47         self.maxlen = 0
48
49     def ok_response(self, environ, start_response, content=None,
50                     encoding=None, content_type='application/octet-stream',
51                     headers=[]):
52         response = '200 OK'
53         if content is None:
54             content_length = 0
55         else:
56             if encoding:
57                 if hasattr(content, 'encode'):
58                     content = content.encode(encoding)
59                 content_type = '{}; charset={}'.format(
60                     content_type, encoding.upper())
61             content_length = len(content)
62         start_response(response, [
63                 ('Content-Type', content_type),
64                 ('Content-Length', str(content_length)),
65                 ])
66         if content is None:
67             return []
68         return [content]
69
70     def _parse_query(self, query, encoding='utf-8'):
71         if len(query) == 0:
72             return {}
73         data = _urllib_parse.parse_qs(
74             query, keep_blank_values=True, strict_parsing=True)
75         data = {str(k, encoding): [str(v, encoding) for v in vs]
76                 for k,vs in data.items()}
77         for k,v in data.items():
78             if len(v) == 1:
79                 data[k] = v[0]
80         return data
81
82     def post_data(self, environ):
83         if environ['REQUEST_METHOD'] != 'POST':
84             raise HandlerError(404, 'Not Found')
85         post_data = self._read_post_data(environ)
86         return self._parse_post(post_data)
87
88     def _parse_post(self, post):
89         return self._parse_query(post)
90
91     def _read_post_data(self, environ):
92         try:
93             clen = int(environ.get('CONTENT_LENGTH', '0'))
94         except ValueError:
95             clen = 0
96         if clen != 0:
97             if self.maxlen > 0 and clen > self.maxlen:
98                 raise HandlerError(413,  'Request Entity Too Large')
99             return environ['wsgi.input'].read(clen)
100         return ''
101
102
103 class QuestionApp (WSGI_DataObject):
104     """WSGI client serving quiz questions
105
106     For details on WGSI, see `PEP 333`_.
107
108     .. _PEP 333: http://www.python.org/dev/peps/pep-0333/
109     """
110     def __init__(self, ui, **kwargs):
111         super(QuestionApp, self).__init__(**kwargs)
112         self.ui = ui
113         self.urls = [
114             (_re.compile('^$'), self._index),
115             (_re.compile('^question/'), self._question),
116             (_re.compile('^answer/'), self._answer),
117             (_re.compile('^results/'), self._results),
118             ]
119         self.setting = 'quizzer'
120         self.user_regexp = _re.compile('^\w+$')
121
122     def __call__(self, environ, start_response):
123         "WSGI entry point"
124         path = environ.get('PATH_INFO', '').lstrip('/')
125         for regexp,callback in self.urls:
126             match = regexp.match(path)
127             if match is not None:
128                 setting = '{}.url_args'.format(self.setting)
129                 environ[setting] = match.groups()
130                 return callback(environ, start_response)
131         raise HandlerError(404, 'Not Found')
132
133     def _index(self, environ, start_response):
134         lines = [
135             '<html>',
136             '  <head>',
137             '    <title>Quizzer</title>',
138             '  </head>',
139             '  <body>',
140             '    <h1>Quizzer</h1>',
141             ]
142         if self.ui.quiz.introduction:
143             lines.append('    <p>{}</p>'.format(self.ui.quiz.introduction))
144         lines.extend([
145                 '    <form name="question" action="../question/" method="post">',
146                 '      <p>Username: <input type="text" size="20" name="user">',
147                 '        (required, alphanumeric)</p>',
148                 '      <input type="submit" value="Start the quiz">',
149                 '    </form>',
150                 '  </body>',
151                 '</html>',
152                 ])
153         content = '\n'.join(lines)
154         return self.ok_response(
155             environ, start_response, content=content, encoding='utf-8',
156             content_type='text/html')
157
158     def _results(self, environ, start_response):
159         data = self.post_data(environ)
160         user = data.get('user', '')
161         if not self.user_regexp.match(user):
162             raise HandlerError(303, 'See Other', headers=[('Location', '/')])
163         lines = [
164             '<html>',
165             '  <head>',
166             '    <title>Quizzer</title>',
167             '  </head>',
168             '  <body>',
169             '    <h1>Results</h1>',
170             ]
171         answers = self.ui.answers.get_answers(user=user)
172         for question in self.ui.quiz:
173             if question.id in answers:
174                 lines.extend(self._format_result(question=question, user=user))
175         lines.extend(self._format_totals(user=user))
176         lines.extend([
177                 '  </body>',
178                 '</html>',
179                 ])
180         content = '\n'.join(lines)
181         return self.ok_response(
182             environ, start_response, content=content, encoding='utf-8',
183             content_type='text/html')
184
185     def _format_result(self, question, user):
186         answers = self.ui.answers.get_answers(user=user).get(question.id, [])
187         la = len(answers)
188         lc = len([a for a in answers if a['correct']])
189         lines = [
190             '<h2>Question</h2>',
191             '<p>{}</p>'.format(question.format_prompt(newline='<br />')),
192             '<p>Answers: {}/{} ({:.2f})</p>'.format(lc, la, float(lc)/la),
193             ]
194         if answers:
195             lines.append('<ol>')
196             for answer in answers:
197                 if answer['correct']:
198                     correct = 'correct'
199                 else:
200                     correct = 'incorrect'
201                 ans = answer['answer']
202                 if question.multiline:
203                     ans = '\n'.join(ans)
204                 lines.extend([
205                         '<li>',
206                         '<p>You answered:</p>',
207                         '<pre>{}</pre>'.format(ans),
208                         '<p>which was {}</p>'.format(correct),
209                         '</li>',
210                         ])
211             lines.append('</ol>')
212         return lines
213
214     def _format_totals(self, user=None):
215         answered = self.ui.answers.get_answered(
216             questions=self.ui.quiz, user=user)
217         correctly_answered = self.ui.answers.get_correctly_answered(
218             questions=self.ui.quiz, user=user)
219         la = len(answered)
220         lc = len(correctly_answered)
221         return [
222             '<h2>Totals</h2>',
223             '<p>Answered {} of {} questions.'.format(la, len(self.ui.quiz)),
224             'Of the answered questions,',
225             '{} ({:.2f}) were answered correctly.'.format(lc, float(lc)/la),
226             '</p>',
227             ]
228
229     def _question(self, environ, start_response):
230         if environ['REQUEST_METHOD'] == 'POST':
231             data = self.post_data(environ)
232         else:
233             data = {}
234         user = data.get('user', '')
235         if not self.user_regexp.match(user):
236             raise HandlerError(303, 'See Other', headers=[('Location', '/')])
237         question = data.get('question', None)
238         if not question:
239             question = self.ui.get_question(user=user)
240             # put the question back on the stack until it's answered
241             self.ui.stack[user].insert(0, question)
242         if question is None:
243             raise HandlerError(
244                 307, 'Temporary Redirect', headers=[('Location', '/results/')])
245         if (isinstance(question, _question.ChoiceQuestion) and
246                 question.display_choices):
247             choices = [
248                 ('<input type="radio" name="answer" value="{0}"/>{0}<br/>'
249                  ).format(answer)
250                 for answer in question.answer]
251             if question.accept_all:
252                 choices.extend([
253                     '<input type="radio" name="answer" value="answer-other"/>',
254                     '<input type="text" size="60" name="answer-other"/>'])
255             answer_element = '\n'.join(choices)
256         elif question.multiline:
257             answer_element = (
258                 '<textarea rows="5" cols="60" name="answer"></textarea>')
259         else:
260             answer_element = '<input type="text" size="60" name="answer"/>'
261         lines = [
262             '<html>',
263             '  <head>',
264             '    <title>Quizzer</title>',
265             '  </head>',
266             '  <body>',
267             '    <h1>Question</h1>',
268             '    <form name="question" action="../answer/" method="post">',
269             '      <input type="hidden" name="user" value="{}">'.format(user),
270             '      <input type="hidden" name="question" value="{}">'.format(
271                 question.id),
272             '      <p>{}</p>'.format(
273                 question.format_prompt(newline='<br/>')),
274             answer_element,
275             '      <br />',
276             '      <input type="submit" value="submit">',
277             '    </form>',
278             '  </body>',
279             '</html>',
280             '',
281             ]
282         content = '\n'.join(lines)
283         return self.ok_response(
284             environ, start_response, content=content, encoding='utf-8',
285             content_type='text/html')
286
287     def _answer(self, environ, start_response):
288         data = self.post_data(environ)
289         user = data.get('user', '')
290         if not self.user_regexp.match(user):
291             raise HandlerError(303, 'See Other', headers=[('Location', '/')])
292         question_id = data.get('question', None)
293         raw_answer = data.get('answer', None)
294         if not question_id or not raw_answer:
295             LOG.error(data)
296             raise HandlerError(422, 'Unprocessable Entity')
297         try:
298             question = self.ui.quiz.get(id=question_id)
299         except KeyError as e:
300             raise HandlerError(404, 'Not Found') from e
301         if (isinstance(question, _question.ChoiceQuestion) and
302                 question.display_choices and
303                 question.accept_all and
304                 raw_answer == 'answer-other'):
305             answer = print_answer = data.get('answer-other', None)
306         elif question.multiline:
307             answer = raw_answer.splitlines()
308             print_answer = raw_answer
309         else:
310             answer = print_answer = raw_answer
311         correct,details = self.ui.process_answer(
312             question=question, answer=answer, user=user)
313         link_target = '../question/'
314         if correct:
315             correct_msg = 'correct'
316             self.ui.stack[user] = [q for q in self.ui.stack[user]
317                                    if q != question]
318             if self.ui.stack[user]:
319                 link_text = 'Next question'
320             else:
321                 link_text = 'Results'
322                 link_target = '../results/'
323         else:
324             correct_msg = 'incorrect'
325             link_text = 'Try again'
326         if details:
327             details = '<p>{}</p>'.format(details)
328         lines = [
329             '<html>',
330             '  <head>',
331             '    <title>Quizzer</title>',
332             '  </head>',
333             '  <body>',
334             '    <h1>Answer</h1>',
335             '    <p>{}</p>'.format(
336                 question.format_prompt(newline='<br/>')),
337             '    <pre>{}</pre>'.format(print_answer),
338             '    <p>{}</p>'.format(correct_msg),
339             details or '',
340             '    <form name="question" action="{}" method="post">'.format(
341                 link_target),
342             '      <input type="hidden" name="user" value="{}">'.format(user),
343             '      <input type="submit" value="{}">'.format(link_text),
344             '    </form>',
345             '  </body>',
346             '</html>',
347             '',
348             ]
349         content = '\n'.join(lines)
350         return self.ok_response(
351             environ, start_response, content=content, encoding='utf-8',
352             content_type='text/html')
353
354
355 class HTMLInterface (_UserInterface):
356     def run(self, host='', port=8000):
357         app = QuestionApp(ui=self)
358         app = HandlerErrorApp(app=app)
359         server = _wsgiref_simple_server.make_server(
360             host=host, port=port, app=app)
361         self._log_start(host=host, port=port)
362         try:
363             server.serve_forever()
364         except _select.error as e:
365             if len(e.args) == 2 and e.args[1] == 'Interrupted system call':
366                 pass
367             else:
368                 raise
369
370     def _log_start(self, host, port):
371         if not host:
372             host = _socket.getfqdn()
373         LOG.info('serving on {}:{}'.format(host, port))
374         try:
375             addrs = _socket.getaddrinfo(host=host, port=port)
376         except _socket.gaierror as e:
377             LOG.warning(e)
378         else:
379             seen = set()
380             for family,type_,proto,canonname,sockaddr in addrs:
381                 c = canonname or host
382                 if (c, sockaddr) not in seen:
383                     LOG.info('address: {} {}'.format(c, sockaddr))
384                     seen.add((c, sockaddr))