From 61f65db36071309fc9e6cd8c7a143be6c236fb62 Mon Sep 17 00:00:00 2001 From: "W. Trevor King" Date: Mon, 3 Sep 2012 13:09:06 -0400 Subject: [PATCH] util:wsgi: Pull WSGICaller out of WSGITestCase. --- libbe/util/wsgi.py | 105 ++++++++++++++++++++++++--------------------- 1 file changed, 56 insertions(+), 49 deletions(-) diff --git a/libbe/util/wsgi.py b/libbe/util/wsgi.py index 0753295..37d0e94 100644 --- a/libbe/util/wsgi.py +++ b/libbe/util/wsgi.py @@ -8,9 +8,11 @@ See Also :py:mod:`libbe.command.serve_commands`. """ +import copy import hashlib import logging import re +import StringIO import sys import time import traceback @@ -43,9 +45,7 @@ import libbe.storage if libbe.TESTING == True: - import copy import doctest - import StringIO import unittest import wsgiref.validate try: @@ -680,8 +680,61 @@ class ServerCommand (libbe.command.base.Command): raise NotImplementedError() +class WSGICaller (object): + """Call into WSGI apps programmatically + """ + def __init__(self, *args, **kwargs): + super(WSGICaller, self).__init__(*args, **kwargs) + self.default_environ = { # required by PEP 333 + 'REQUEST_METHOD': 'GET', # 'POST', 'HEAD' + 'REMOTE_ADDR': '192.168.0.123', + 'SCRIPT_NAME':'', + 'PATH_INFO': '', + #'QUERY_STRING':'', # may be empty or absent + #'CONTENT_TYPE':'', # may be empty or absent + #'CONTENT_LENGTH':'', # may be empty or absent + 'SERVER_NAME':'example.com', + 'SERVER_PORT':'80', + 'SERVER_PROTOCOL':'HTTP/1.1', + 'wsgi.version':(1,0), + 'wsgi.url_scheme':'http', + 'wsgi.input':StringIO.StringIO(), + 'wsgi.errors':StringIO.StringIO(), + 'wsgi.multithread':False, + 'wsgi.multiprocess':False, + 'wsgi.run_once':False, + } + + def getURL(self, app, path='/', method='GET', data=None, + data_dict=None, scheme='http', environ={}): + env = copy.copy(self.default_environ) + env['PATH_INFO'] = path + env['REQUEST_METHOD'] = method + env['scheme'] = scheme + if data_dict is not None: + assert data is None, (data, data_dict) + data = urllib.urlencode(data_dict) + if data is not None: + if data_dict is None: + assert method == 'POST', (method, data) + if method == 'POST': + env['CONTENT_LENGTH'] = len(data) + env['wsgi.input'] = StringIO.StringIO(data) + else: + assert method in ['GET', 'HEAD'], method + env['QUERY_STRING'] = data + for key,value in environ.items(): + env[key] = value + return ''.join(app(env, self._start_response)) + + def _start_response(self, status, response_headers, exc_info=None): + self.status = status + self.response_headers = response_headers + self.exc_info = exc_info + + if libbe.TESTING: - class WSGITestCase (unittest.TestCase): + class WSGITestCase (unittest.TestCase, WSGICaller): def setUp(self): self.logstream = StringIO.StringIO() self.logger = logging.getLogger('be-wsgi-test') @@ -691,52 +744,6 @@ if libbe.TESTING: self.logger.propagate = False console.setLevel(logging.INFO) self.logger.setLevel(logging.INFO) - self.default_environ = { # required by PEP 333 - 'REQUEST_METHOD': 'GET', # 'POST', 'HEAD' - 'REMOTE_ADDR': '192.168.0.123', - 'SCRIPT_NAME':'', - 'PATH_INFO': '', - #'QUERY_STRING':'', # may be empty or absent - #'CONTENT_TYPE':'', # may be empty or absent - #'CONTENT_LENGTH':'', # may be empty or absent - 'SERVER_NAME':'example.com', - 'SERVER_PORT':'80', - 'SERVER_PROTOCOL':'HTTP/1.1', - 'wsgi.version':(1,0), - 'wsgi.url_scheme':'http', - 'wsgi.input':StringIO.StringIO(), - 'wsgi.errors':StringIO.StringIO(), - 'wsgi.multithread':False, - 'wsgi.multiprocess':False, - 'wsgi.run_once':False, - } - - def getURL(self, app, path='/', method='GET', data=None, - data_dict=None, scheme='http', environ={}): - env = copy.copy(self.default_environ) - env['PATH_INFO'] = path - env['REQUEST_METHOD'] = method - env['scheme'] = scheme - if data_dict is not None: - assert data is None, (data, data_dict) - data = urllib.urlencode(data_dict) - if data is not None: - if data_dict is None: - assert method == 'POST', (method, data) - if method == 'POST': - env['CONTENT_LENGTH'] = len(data) - env['wsgi.input'] = StringIO.StringIO(data) - else: - assert method in ['GET', 'HEAD'], method - env['QUERY_STRING'] = data - for key,value in environ.items(): - env[key] = value - return ''.join(app(env, self.start_response)) - - def start_response(self, status, response_headers, exc_info=None): - self.status = status - self.response_headers = response_headers - self.exc_info = exc_info class WSGI_ObjectTestCase (WSGITestCase): -- 2.26.2