3 import logging as _logging
4 import select as _select
5 import socket as _socket
7 import urllib.parse as _urllib_parse
8 import wsgiref.simple_server as _wsgiref_simple_server
10 from . import UserInterface as _UserInterface
13 LOG = _logging.getLogger(__name__)
16 class HandlerError (Exception):
17 def __init__(self, code, msg, headers=[]):
18 super(HandlerError, self).__init__('{} {}'.format(code, msg))
21 self.headers = headers
24 class HandlerErrorApp (object):
25 """Catch HandlerErrors and return HTTP error pages.
27 def __init__(self, app):
30 def __call__(self, environ, start_response):
32 return self.app(environ, start_response)
33 except HandlerError as e:
35 start_response('{} {}'.format(e.code, e.msg), e.headers)
39 class WSGI_DataObject (object):
40 "Useful WSGI utilities for handling POST data"
41 def __init__(self, **kwargs):
42 super(WSGI_DataObject, self).__init__(**kwargs)
44 # Maximum input we will accept when REQUEST_METHOD is POST
45 # 0 ==> unlimited input
48 def ok_response(self, environ, start_response, content=None,
49 encoding=None, content_type='application/octet-stream',
56 if hasattr(content, 'encode'):
57 content = content.encode(encoding)
58 content_type = '{}; charset={}'.format(
59 content_type, encoding.upper())
60 content_length = len(content)
61 start_response(response, [
62 ('Content-Type', content_type),
63 ('Content-Length', str(content_length)),
69 def _parse_query(self, query, encoding='utf-8'):
72 data = _urllib_parse.parse_qs(
73 query, keep_blank_values=True, strict_parsing=True)
74 data = {str(k, encoding): [str(v, encoding) for v in vs]
75 for k,vs in data.items()}
76 for k,v in data.items():
81 def post_data(self, environ):
82 if environ['REQUEST_METHOD'] != 'POST':
83 raise HandlerError(404, 'Not Found')
84 post_data = self._read_post_data(environ)
85 return self._parse_post(post_data)
87 def _parse_post(self, post):
88 return self._parse_query(post)
90 def _read_post_data(self, environ):
92 clen = int(environ.get('CONTENT_LENGTH', '0'))
96 if self.maxlen > 0 and clen > self.maxlen:
97 raise HandlerError(413, 'Request Entity Too Large')
98 return environ['wsgi.input'].read(clen)
102 class QuestionApp (WSGI_DataObject):
103 """WSGI client serving quiz questions
105 For details on WGSI, see `PEP 333`_.
107 .. _PEP 333: http://www.python.org/dev/peps/pep-0333/
109 def __init__(self, ui, **kwargs):
110 super(QuestionApp, self).__init__(**kwargs)
113 (_re.compile('^$'), self._index),
114 (_re.compile('^question/'), self._question),
115 (_re.compile('^answer/'), self._answer),
116 (_re.compile('^results/'), self._results),
118 self.setting = 'quizzer'
120 def __call__(self, environ, start_response):
122 path = environ.get('PATH_INFO', '').lstrip('/')
123 for regexp,callback in self.urls:
124 match = regexp.match(path)
125 if match is not None:
126 setting = '{}.url_args'.format(self.setting)
127 environ[setting] = match.groups()
128 return callback(environ, start_response)
129 raise HandlerError(404, 'Not Found')
131 def _index(self, environ, start_response):
133 return self._start(environ, start_response)
135 return self._results(environ, start_response)
137 def _start(self, environ, start_response):
141 ' <title>Quizzer</title>',
146 if self.ui.quiz.introduction:
147 lines.append(' <p>{}</p>'.format(self.ui.quiz.introduction))
149 ' <p><a href="question/">Start the quiz</a>.</p>',
153 content = '\n'.join(lines)
154 return self.ok_response(
155 environ, start_response, content=content, encoding='utf-8',
156 content_type='text/html')
158 def _results(self, environ, start_response):
162 ' <title>Quizzer</title>',
167 for question in self.ui.quiz:
168 if question.id in self.ui.answers:
169 lines.extend(self._format_result(question=question))
170 lines.extend(self._format_totals())
175 content = '\n'.join(lines)
176 return self.ok_response(
177 environ, start_response, content=content, encoding='utf-8',
178 content_type='text/html')
180 def _format_result(self, question):
181 answers = self.ui.answers.get(question.id, [])
183 lc = len([a for a in answers if a['correct']])
186 '<p>{}</p>'.format(question.format_prompt(newline='<br />')),
187 '<p>Answers: {}/{} ({:.2f})</p>'.format(lc, la, float(lc)/la),
191 for answer in answers:
192 if answer['correct']:
195 correct = 'incorrect'
196 ans = answer['answer']
197 if question.multiline:
201 '<p>You answered:</p>',
202 '<pre>{}</pre>'.format(ans),
203 '<p>which was {}</p>'.format(correct),
206 lines.append('</ol>')
209 def _format_totals(self):
210 answered = self.ui.answers.get_answered(questions=self.ui.quiz)
211 correctly_answered = self.ui.answers.get_correctly_answered(
212 questions=self.ui.quiz)
214 lc = len(correctly_answered)
217 '<p>Answered {} of {} questions.'.format(la, len(self.ui.quiz)),
218 'Of the answered questions,',
219 '{} ({:.2f}) were answered correctly.'.format(lc, float(lc)/la),
223 def _question(self, environ, start_response):
224 if environ['REQUEST_METHOD'] == 'POST':
225 data = self.post_data(environ)
228 question = data.get('question', None)
230 question = self.ui.get_question()
231 # put the question back on the stack until it's answered
232 self.ui.stack.insert(0, question)
234 return self._index(environ, start_response)
235 if question.multiline:
237 '<textarea rows="5" cols="60" name="answer"></textarea>')
239 answer_element = '<input type="text" size="60" name="answer">'
243 ' <title>Quizzer</title>',
246 ' <h1>Question</h1>',
247 ' <form name="question" action="../answer/" method="post">',
248 ' <input type="hidden" name="question" value="{}">'.format(
251 question.format_prompt(newline='<br/>')),
254 ' <input type="submit" value="submit">',
260 content = '\n'.join(lines)
261 return self.ok_response(
262 environ, start_response, content=content, encoding='utf-8',
263 content_type='text/html')
265 def _answer(self, environ, start_response):
266 data = self.post_data(environ)
267 question_id = data.get('question', None)
268 raw_answer = data.get('answer', None)
269 if not question_id or not raw_answer:
271 raise HandlerError(422, 'Unprocessable Entity')
273 question = self.ui.quiz.get(id=question_id)
274 except KeyError as e:
275 raise HandlerError(404, 'Not Found') from e
276 if question.multiline:
277 answer = raw_answer.splitlines()
280 correct,details = self.ui.process_answer(
281 question=question, answer=answer)
282 link_target = '../question/'
284 correct_msg = 'correct'
285 self.ui.stack = [q for q in self.ui.stack if q != question]
287 link_text = 'Next question'
289 link_text = 'Results'
290 link_target = '../results/'
292 correct_msg = 'incorrect'
293 link_text = 'Try again'
295 details = '<p>{}</p>'.format(details)
299 ' <title>Quizzer</title>',
304 question.format_prompt(newline='<br/>')),
305 ' <pre>{}</pre>'.format(raw_answer),
306 ' <p>{}</p>'.format(correct_msg),
308 ' <a href="{}">{}</a>.'.format(link_target, link_text),
313 content = '\n'.join(lines)
314 return self.ok_response(
315 environ, start_response, content=content, encoding='utf-8',
316 content_type='text/html')
319 class HTMLInterface (_UserInterface):
320 def run(self, host='', port=8000):
321 app = QuestionApp(ui=self)
322 app = HandlerErrorApp(app=app)
323 server = _wsgiref_simple_server.make_server(
324 host=host, port=port, app=app)
325 self._log_start(host=host, port=port)
327 server.serve_forever()
328 except _select.error as e:
329 if len(e.args) == 2 and e.args[1] == 'Interrupted system call':
334 def _log_start(self, host, port):
336 host = _socket.getfqdn()
337 LOG.info('serving on {}:{}'.format(host, port))
339 addrs = _socket.getaddrinfo(host=host, port=port)
340 except _socket.gaierror as e:
344 for family,type_,proto,canonname,sockaddr in addrs:
345 c = canonname or host
346 if (c, sockaddr) not in seen:
347 LOG.info('address: {} {}'.format(c, sockaddr))
348 seen.add((c, sockaddr))