-# Copyright
+# Copyright (C) 2010-2012 Chris Ball <cjb@laptop.org>
+# W. Trevor King <wking@tremily.us>
+#
+# This file is part of Bugs Everywhere.
+#
+# Bugs Everywhere is free software: you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 2 of the License, or (at your option) any
+# later version.
+#
+# Bugs Everywhere is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# Bugs Everywhere. If not, see <http://www.gnu.org/licenses/>.
"""Utilities for building WSGI commands.
See Also
--------
-:py:mod:`libbe.command.serve` and :py:mod:`libbe.command.serve_commands`.
+:py:mod:`libbe.command.serve_storage` and
+:py:mod:`libbe.command.serve_commands`.
"""
+import copy
import hashlib
import logging
import re
+import StringIO
import sys
import time
import traceback
import libbe.util.encoding
import libbe.command
import libbe.command.base
+import libbe.storage
+
if libbe.TESTING == True:
- import copy
import doctest
- import StringIO
import unittest
import wsgiref.validate
try:
raise
+class HandlerErrorApp (WSGI_Middleware):
+ """Catch HandlerErrors and return HTTP error pages.
+ """
+ def _call(self, environ, start_response):
+ try:
+ return self.app(environ, start_response)
+ except HandlerError, e:
+ self.log_request(environ, status=str(e), bytes=0)
+ start_response('{} {}'.format(e.code, e.msg), e.headers)
+ return []
+
+
+class BEExceptionApp (WSGI_Middleware):
+ """Translate BE-specific exceptions
+ """
+ def __init__(self, *args, **kwargs):
+ super(BEExceptionApp, self).__init__(*args, **kwargs)
+ self.http_user_error = 418
+
+ def _call(self, environ, start_response):
+ try:
+ return self.app(environ, start_response)
+ except libbe.storage.NotReadable as e:
+ raise libbe.util.wsgi.HandlerError(403, 'Read permission denied')
+ except libbe.storage.NotWriteable as e:
+ raise libbe.util.wsgi.HandlerError(403, 'Write permission denied')
+ except libbe.storage.InvalidID as e:
+ raise libbe.util.wsgi.HandlerError(
+ self.http_user_error, 'InvalidID {}'.format(e))
+
+
class UppercaseHeaderApp (WSGI_Middleware):
"""WSGI middleware that uppercases incoming HTTP headers.
*args, **kwargs):
super(WSGI_AppObject, self).__init__(*args, **kwargs)
self.urls = [(re.compile(regexp),callback) for regexp,callback in urls]
- self.default_callback = default_handler
+ self.default_handler = default_handler
self.setting = setting
def _call(self, environ, start_response):
super(ServerCommand, self).__init__(*args, **kwargs)
self.options.extend([
libbe.command.Option(name='port',
- help='Bind server to port (%default)',
+ help='Bind server to port',
arg=libbe.command.Argument(
name='port', metavar='INT', type='int', default=8000)),
libbe.command.Option(name='host',
- help='Set host string (blank for localhost, %default)',
+ help='Set host string (blank for localhost)',
arg=libbe.command.Argument(
name='host', metavar='HOST', default='localhost')),
libbe.command.Option(name='read-only', short_name='r',
'socket-name':params['host'],
'port':params['port'],
}
+ app = BEExceptionApp(app, logger=self.logger)
+ app = HandlerErrorApp(app, logger=self.logger)
app = ExceptionApp(app, logger=self.logger)
if params['ssl'] == True:
details['protocol'] = 'HTTPS'
server.serve_forever()
def _stop_server(self, params, server):
- self.logger.log(self.log_level, 'Clossing server')
+ self.logger.log(self.log_level, 'Closing server')
if params['ssl'] == True:
server.stop()
else:
raise NotImplementedError()
+class WSGICaller (object):
+ """Call into WSGI apps programmatically
+ """
+ def __init__(self, *args, **kwargs):
+ super(WSGICaller, self).__init__(*args, **kwargs)
+ self.default_environ = { # required by PEP 333
+ 'REQUEST_METHOD': 'GET', # 'POST', 'HEAD'
+ 'REMOTE_ADDR': '192.168.0.123',
+ 'SCRIPT_NAME':'',
+ 'PATH_INFO': '',
+ #'QUERY_STRING':'', # may be empty or absent
+ #'CONTENT_TYPE':'', # may be empty or absent
+ #'CONTENT_LENGTH':'', # may be empty or absent
+ 'SERVER_NAME':'example.com',
+ 'SERVER_PORT':'80',
+ 'SERVER_PROTOCOL':'HTTP/1.1',
+ 'wsgi.version':(1,0),
+ 'wsgi.url_scheme':'http',
+ 'wsgi.input':StringIO.StringIO(),
+ 'wsgi.errors':StringIO.StringIO(),
+ 'wsgi.multithread':False,
+ 'wsgi.multiprocess':False,
+ 'wsgi.run_once':False,
+ }
+
+ def getURL(self, app, path='/', method='GET', data=None,
+ data_dict=None, scheme='http', environ={}):
+ env = copy.copy(self.default_environ)
+ env['PATH_INFO'] = path
+ env['REQUEST_METHOD'] = method
+ env['scheme'] = scheme
+ if data_dict is not None:
+ assert data is None, (data, data_dict)
+ data = urllib.urlencode(data_dict)
+ if data is not None:
+ if data_dict is None:
+ assert method == 'POST', (method, data)
+ if method == 'POST':
+ env['CONTENT_LENGTH'] = len(data)
+ env['wsgi.input'] = StringIO.StringIO(data)
+ else:
+ assert method in ['GET', 'HEAD'], method
+ env['QUERY_STRING'] = data
+ for key,value in environ.items():
+ env[key] = value
+ return ''.join(app(env, self.start_response))
+
+ def start_response(self, status, response_headers, exc_info=None):
+ self.status = status
+ self.response_headers = response_headers
+ self.exc_info = exc_info
+
+
if libbe.TESTING:
class WSGITestCase (unittest.TestCase):
def setUp(self):
self.logger.propagate = False
console.setLevel(logging.INFO)
self.logger.setLevel(logging.INFO)
- self.default_environ = { # required by PEP 333
- 'REQUEST_METHOD': 'GET', # 'POST', 'HEAD'
- 'REMOTE_ADDR': '192.168.0.123',
- 'SCRIPT_NAME':'',
- 'PATH_INFO': '',
- #'QUERY_STRING':'', # may be empty or absent
- #'CONTENT_TYPE':'', # may be empty or absent
- #'CONTENT_LENGTH':'', # may be empty or absent
- 'SERVER_NAME':'example.com',
- 'SERVER_PORT':'80',
- 'SERVER_PROTOCOL':'HTTP/1.1',
- 'wsgi.version':(1,0),
- 'wsgi.url_scheme':'http',
- 'wsgi.input':StringIO.StringIO(),
- 'wsgi.errors':StringIO.StringIO(),
- 'wsgi.multithread':False,
- 'wsgi.multiprocess':False,
- 'wsgi.run_once':False,
- }
-
- def getURL(self, app, path='/', method='GET', data=None,
- data_dict=None, scheme='http', environ={}):
- env = copy.copy(self.default_environ)
- env['PATH_INFO'] = path
- env['REQUEST_METHOD'] = method
- env['scheme'] = scheme
- if data_dict is not None:
- assert data is None, (data, data_dict)
- data = urllib.urlencode(data_dict)
- if data is not None:
- if data_dict is None:
- assert method == 'POST', (method, data)
- if method == 'POST':
- env['CONTENT_LENGTH'] = len(data)
- env['wsgi.input'] = StringIO.StringIO(data)
- else:
- assert method in ['GET', 'HEAD'], method
- env['QUERY_STRING'] = data
- for key,value in environ.items():
- env[key] = value
- return ''.join(app(env, self.start_response))
-
- def start_response(self, status, response_headers, exc_info=None):
- self.status = status
- self.response_headers = response_headers
- self.exc_info = exc_info
+ self.caller = WSGICaller()
+ def getURL(self, *args, **kwargs):
+ content = self.caller.getURL(*args, **kwargs)
+ self.status = self.caller.status
+ self.response_headers = self.caller.response_headers
+ self.exc_info = self.caller.exc_info
+ return content
class WSGI_ObjectTestCase (WSGITestCase):
def setUp(self):
def test_error(self):
contents = self.app.error(
- environ=self.default_environ,
- start_response=self.start_response,
+ environ=self.caller.default_environ,
+ start_response=self.caller.start_response,
error=123,
message='Dummy Error',
headers=[('X-Dummy-Header','Dummy Value')])
self.failUnless(contents == ['Dummy Error'], contents)
- self.failUnless(self.status == '123 Dummy Error', self.status)
- self.failUnless(self.response_headers == [
+ self.failUnless(
+ self.caller.status == '123 Dummy Error', self.caller.status)
+ self.failUnless(self.caller.response_headers == [
('Content-Type','text/plain'),
('X-Dummy-Header','Dummy Value')],
- self.response_headers)
- self.failUnless(self.exc_info == None, self.exc_info)
+ self.caller.response_headers)
+ self.failUnless(self.caller.exc_info == None, self.caller.exc_info)
def test_log_request(self):
self.app.log_request(
- environ=self.default_environ, status='-1 OK', bytes=123)
+ environ=self.caller.default_environ, status='-1 OK', bytes=123)
log = self.logstream.getvalue()
self.failUnless(log.startswith('192.168.0.123 -'), log)