3 import logging as _logging
4 import select as _select
5 import socket as _socket
7 import urllib.parse as _urllib_parse
8 import wsgiref.simple_server as _wsgiref_simple_server
10 from .. import question as _question
11 from . import UserInterface as _UserInterface
14 LOG = _logging.getLogger(__name__)
17 class HandlerError (Exception):
18 def __init__(self, code, msg, headers=[]):
19 super(HandlerError, self).__init__('{} {}'.format(code, msg))
22 self.headers = headers
25 class HandlerErrorApp (object):
26 """Catch HandlerErrors and return HTTP error pages.
28 def __init__(self, app):
31 def __call__(self, environ, start_response):
33 return self.app(environ, start_response)
34 except HandlerError as e:
36 start_response('{} {}'.format(e.code, e.msg), e.headers)
40 class WSGI_DataObject (object):
41 "Useful WSGI utilities for handling POST data"
42 def __init__(self, **kwargs):
43 super(WSGI_DataObject, self).__init__(**kwargs)
45 # Maximum input we will accept when REQUEST_METHOD is POST
46 # 0 ==> unlimited input
49 def ok_response(self, environ, start_response, content=None,
50 encoding=None, content_type='application/octet-stream',
57 if hasattr(content, 'encode'):
58 content = content.encode(encoding)
59 content_type = '{}; charset={}'.format(
60 content_type, encoding.upper())
61 content_length = len(content)
62 start_response(response, [
63 ('Content-Type', content_type),
64 ('Content-Length', str(content_length)),
70 def _parse_query(self, query, encoding='utf-8'):
73 data = _urllib_parse.parse_qs(
74 query, keep_blank_values=True, strict_parsing=True)
75 data = {str(k, encoding): [str(v, encoding) for v in vs]
76 for k,vs in data.items()}
77 for k,v in data.items():
82 def post_data(self, environ):
83 if environ['REQUEST_METHOD'] != 'POST':
84 raise HandlerError(404, 'Not Found')
85 post_data = self._read_post_data(environ)
86 return self._parse_post(post_data)
88 def _parse_post(self, post):
89 return self._parse_query(post)
91 def _read_post_data(self, environ):
93 clen = int(environ.get('CONTENT_LENGTH', '0'))
97 if self.maxlen > 0 and clen > self.maxlen:
98 raise HandlerError(413, 'Request Entity Too Large')
99 return environ['wsgi.input'].read(clen)
103 class QuestionApp (WSGI_DataObject):
104 """WSGI client serving quiz questions
106 For details on WGSI, see `PEP 333`_.
108 .. _PEP 333: http://www.python.org/dev/peps/pep-0333/
110 def __init__(self, ui, **kwargs):
111 super(QuestionApp, self).__init__(**kwargs)
114 (_re.compile('^$'), self._index),
115 (_re.compile('^question/'), self._question),
116 (_re.compile('^answer/'), self._answer),
117 (_re.compile('^results/'), self._results),
119 self.setting = 'quizzer'
120 self.user_regexp = _re.compile('^\w+$')
122 def __call__(self, environ, start_response):
124 path = environ.get('PATH_INFO', '').lstrip('/')
125 for regexp,callback in self.urls:
126 match = regexp.match(path)
127 if match is not None:
128 setting = '{}.url_args'.format(self.setting)
129 environ[setting] = match.groups()
130 return callback(environ, start_response)
131 raise HandlerError(404, 'Not Found')
133 def _index(self, environ, start_response):
137 ' <title>Quizzer</title>',
142 if self.ui.quiz.introduction:
143 lines.append(' <p>{}</p>'.format(self.ui.quiz.introduction))
145 ' <form name="question" action="../question/" method="post">',
146 ' <p>Username: <input type="text" size="20" name="user">',
147 ' (required, alphanumeric)</p>',
148 ' <input type="submit" value="Start the quiz">',
153 content = '\n'.join(lines)
154 return self.ok_response(
155 environ, start_response, content=content, encoding='utf-8',
156 content_type='text/html')
158 def _results(self, environ, start_response):
159 data = self.post_data(environ)
160 user = data.get('user', '')
161 if not self.user_regexp.match(user):
162 raise HandlerError(303, 'See Other', headers=[('Location', '/')])
166 ' <title>Quizzer</title>',
171 answers = self.ui.answers.get_answers(user=user)
172 for question in self.ui.quiz:
173 if question.id in answers:
174 lines.extend(self._format_result(question=question, user=user))
175 lines.extend(self._format_totals(user=user))
180 content = '\n'.join(lines)
181 return self.ok_response(
182 environ, start_response, content=content, encoding='utf-8',
183 content_type='text/html')
185 def _format_result(self, question, user):
186 answers = self.ui.answers.get_answers(user=user).get(question.id, [])
188 lc = len([a for a in answers if a['correct']])
191 '<p>{}</p>'.format(question.format_prompt(newline='<br />')),
192 '<p>Answers: {}/{} ({:.2f})</p>'.format(lc, la, float(lc)/la),
196 for answer in answers:
197 if answer['correct']:
200 correct = 'incorrect'
201 ans = answer['answer']
202 if question.multiline:
206 '<p>You answered:</p>',
207 '<pre>{}</pre>'.format(ans),
208 '<p>which was {}</p>'.format(correct),
211 lines.append('</ol>')
214 def _format_totals(self, user=None):
215 answered = self.ui.answers.get_answered(
216 questions=self.ui.quiz, user=user)
217 correctly_answered = self.ui.answers.get_correctly_answered(
218 questions=self.ui.quiz, user=user)
220 lc = len(correctly_answered)
223 '<p>Answered {} of {} questions.'.format(la, len(self.ui.quiz)),
224 'Of the answered questions,',
225 '{} ({:.2f}) were answered correctly.'.format(lc, float(lc)/la),
229 def _question(self, environ, start_response):
230 if environ['REQUEST_METHOD'] == 'POST':
231 data = self.post_data(environ)
234 user = data.get('user', '')
235 if not self.user_regexp.match(user):
236 raise HandlerError(303, 'See Other', headers=[('Location', '/')])
237 question = data.get('question', None)
239 question = self.ui.get_question(user=user)
240 # put the question back on the stack until it's answered
241 self.ui.stack[user].insert(0, question)
244 307, 'Temporary Redirect', headers=[('Location', '/results/')])
245 if (isinstance(question, _question.ChoiceQuestion) and
246 question.display_choices):
248 ('<input type="radio" name="answer" value="{0}"/>{0}<br/>'
250 for answer in question.answer]
251 if question.accept_all:
253 '<input type="radio" name="answer" value="answer-other"/>',
254 '<input type="text" size="60" name="answer-other"/>'])
255 answer_element = '\n'.join(choices)
256 elif question.multiline:
258 '<textarea rows="5" cols="60" name="answer"></textarea>')
260 answer_element = '<input type="text" size="60" name="answer"/>'
264 ' <title>Quizzer</title>',
267 ' <h1>Question</h1>',
268 ' <form name="question" action="../answer/" method="post">',
269 ' <input type="hidden" name="user" value="{}">'.format(user),
270 ' <input type="hidden" name="question" value="{}">'.format(
273 question.format_prompt(newline='<br/>')),
276 ' <input type="submit" value="submit">',
282 content = '\n'.join(lines)
283 return self.ok_response(
284 environ, start_response, content=content, encoding='utf-8',
285 content_type='text/html')
287 def _answer(self, environ, start_response):
288 data = self.post_data(environ)
289 user = data.get('user', '')
290 if not self.user_regexp.match(user):
291 raise HandlerError(303, 'See Other', headers=[('Location', '/')])
292 question_id = data.get('question', None)
293 raw_answer = data.get('answer', None)
294 if not question_id or not raw_answer:
296 raise HandlerError(422, 'Unprocessable Entity')
298 question = self.ui.quiz.get(id=question_id)
299 except KeyError as e:
300 raise HandlerError(404, 'Not Found') from e
301 if (isinstance(question, _question.ChoiceQuestion) and
302 question.display_choices and
303 question.accept_all and
304 raw_answer == 'answer-other'):
305 answer = print_answer = data.get('answer-other', None)
306 elif question.multiline:
307 answer = raw_answer.splitlines()
308 print_answer = raw_answer
310 answer = print_answer = raw_answer
311 correct,details = self.ui.process_answer(
312 question=question, answer=answer, user=user)
313 link_target = '../question/'
315 correct_msg = 'correct'
316 self.ui.stack[user] = [q for q in self.ui.stack[user]
318 if self.ui.stack[user]:
319 link_text = 'Next question'
321 link_text = 'Results'
322 link_target = '../results/'
324 correct_msg = 'incorrect'
325 link_text = 'Try again'
327 details = '<p>{}</p>'.format(details)
331 ' <title>Quizzer</title>',
336 question.format_prompt(newline='<br/>')),
337 ' <pre>{}</pre>'.format(print_answer),
338 ' <p>{}</p>'.format(correct_msg),
340 ' <form name="question" action="{}" method="post">'.format(
342 ' <input type="hidden" name="user" value="{}">'.format(user),
343 ' <input type="submit" value="{}">'.format(link_text),
349 content = '\n'.join(lines)
350 return self.ok_response(
351 environ, start_response, content=content, encoding='utf-8',
352 content_type='text/html')
355 class HTMLInterface (_UserInterface):
356 def run(self, host='', port=8000):
357 app = QuestionApp(ui=self)
358 app = HandlerErrorApp(app=app)
359 server = _wsgiref_simple_server.make_server(
360 host=host, port=port, app=app)
361 self._log_start(host=host, port=port)
363 server.serve_forever()
364 except _select.error as e:
365 if len(e.args) == 2 and e.args[1] == 'Interrupted system call':
370 def _log_start(self, host, port):
372 host = _socket.getfqdn()
373 LOG.info('serving on {}:{}'.format(host, port))
375 addrs = _socket.getaddrinfo(host=host, port=port)
376 except _socket.gaierror as e:
380 for family,type_,proto,canonname,sockaddr in addrs:
381 c = canonname or host
382 if (c, sockaddr) not in seen:
383 LOG.info('address: {} {}'.format(c, sockaddr))
384 seen.add((c, sockaddr))