command:serve_commands: remove duplicate cruft and get working unit tests.
authorW. Trevor King <wking@tremily.us>
Fri, 24 Aug 2012 14:13:45 +0000 (10:13 -0400)
committerW. Trevor King <wking@tremily.us>
Fri, 24 Aug 2012 14:13:45 +0000 (10:13 -0400)
libbe/command/serve_commands.py

index 1912180b3e8905ad0bdcea4ae7e28c8d3259ef35..810f6984d32d7b02b705ecfe0d42c841aeced63f 100644 (file)
@@ -79,6 +79,7 @@ if libbe.TESTING == True:
         cherrypy_test_webtest = None
 
     import libbe.bugdir
+    import libbe.command.list
 
     
 class ServerApp (libbe.command.serve.WSGI_AppObject):
@@ -281,7 +282,7 @@ class Serve_Commands (libbe.command.Command):
                 (params['host'], params['port']), app)
             #server.throw_errors = True
             #server.show_tracebacks = True
-            private_key,certificate = get_cert_filenames(
+            private_key,certificate = libbe.command.serve.get_cert_filenames(
                 'be-server', logger=self.logger)
             if cherrypy.wsgiserver.ssl_builtin == None:
                 server.ssl_module = 'builtin'
@@ -341,339 +342,30 @@ for example::
 # alias for libbe.command.base.get_command_class()
 Serve_commands = Serve_Commands
 
-def random_string(length=256):
-    if os.path.exists(os.path.join('dev', 'urandom')):
-        return open("/dev/urandom").read(length)
-    else:
-        import array
-        from random import randint
-        d = array.array('B')
-        for i in xrange(1000000):
-            d.append(randint(0,255))
-        return d.tostring()
-
 if libbe.TESTING == True:
-    class WSGITestCase (unittest.TestCase):
-        def setUp(self):
-            self.logstream = StringIO.StringIO()
-            self.logger = logging.getLogger('be-serve-test')
-            console = logging.StreamHandler(self.logstream)
-            console.setFormatter(logging.Formatter('%(message)s'))
-            self.logger.addHandler(console)
-            self.logger.propagate = False
-            console.setLevel(logging.INFO)
-            self.logger.setLevel(logging.INFO)
-            self.default_environ = { # required by PEP 333
-                'REQUEST_METHOD': 'GET', # 'POST', 'HEAD'
-                'REMOTE_ADDR': '192.168.0.123',
-                'SCRIPT_NAME':'',
-                'PATH_INFO': '',
-                #'QUERY_STRING':'',   # may be empty or absent
-                #'CONTENT_TYPE':'',   # may be empty or absent
-                #'CONTENT_LENGTH':'', # may be empty or absent
-                'SERVER_NAME':'example.com',
-                'SERVER_PORT':'80',
-                'SERVER_PROTOCOL':'HTTP/1.1',
-                'wsgi.version':(1,0),
-                'wsgi.url_scheme':'http',
-                'wsgi.input':StringIO.StringIO(),
-                'wsgi.errors':StringIO.StringIO(),
-                'wsgi.multithread':False,
-                'wsgi.multiprocess':False,
-                'wsgi.run_once':False,
-                }
-        def getURL(self, app, path='/', method='GET', data=None,
-                   scheme='http', environ={}):
-            env = copy.copy(self.default_environ)
-            env['PATH_INFO'] = path
-            env['REQUEST_METHOD'] = method
-            env['scheme'] = scheme
-            if data != None:
-                enc_data = urllib.urlencode(data)
-                if method == 'POST':
-                    env['CONTENT_LENGTH'] = len(enc_data)
-                    env['wsgi.input'] = StringIO.StringIO(enc_data)
-                else:
-                    assert method in ['GET', 'HEAD'], method
-                    env['QUERY_STRING'] = enc_data
-            for key,value in environ.items():
-                env[key] = value
-            return ''.join(app(env, self.start_response))
-        def start_response(self, status, response_headers, exc_info=None):
-            self.status = status
-            self.response_headers = response_headers
-            self.exc_info = exc_info
-
-    class WSGI_ObjectTestCase (WSGITestCase):
-        def setUp(self):
-            WSGITestCase.setUp(self)
-            self.app = WSGI_Object(self.logger)
-        def test_error(self):
-            contents = self.app.error(
-                environ=self.default_environ,
-                start_response=self.start_response,
-                error=123,
-                message='Dummy Error',
-                headers=[('X-Dummy-Header','Dummy Value')])
-            self.failUnless(contents == ['Dummy Error'], contents)
-            self.failUnless(self.status == '123 Dummy Error', self.status)
-            self.failUnless(self.response_headers == [
-                    ('Content-Type','text/plain'),
-                    ('X-Dummy-Header','Dummy Value')],
-                            self.response_headers)
-            self.failUnless(self.exc_info == None, self.exc_info)
-        def test_log_request(self):
-            self.app.log_request(
-                environ=self.default_environ, status='-1 OK', bytes=123)
-            log = self.logstream.getvalue()
-            self.failUnless(log.startswith('192.168.0.123 -'), log)
-
-    class ExceptionAppTestCase (WSGITestCase):
-        def setUp(self):
-            WSGITestCase.setUp(self)
-            def child_app(environ, start_response):
-                raise ValueError('Dummy Error')
-            self.app = ExceptionApp(child_app, self.logger)
-        def test_traceback(self):
-            try:
-                self.getURL(self.app)
-            except ValueError, e:
-                pass
-            log = self.logstream.getvalue()
-            self.failUnless(log.startswith('Traceback'), log)
-            self.failUnless('child_app' in log, log)
-            self.failUnless('ValueError: Dummy Error' in log, log)
-
-    class AdminAppTestCase (WSGITestCase):
-        def setUp(self):
-            WSGITestCase.setUp(self)
-            self.users = Users()
-            self.users.add_user(
-                User('Aladdin', 'Big Al', password='open sesame'))
-            self.users.add_user(
-                User('guest', 'Guest', password='guestpass'))
-            def child_app(environ, start_response):
-                pass
-            self.app = AdminApp(
-                child_app, users=self.users, logger=self.logger)
-            self.app = AuthenticationApp(
-                self.app, realm='Dummy Realm', users=self.users,
-                logger=self.logger)
-            self.app = UppercaseHeaderApp(self.app, logger=self.logger)
-        def basic_auth(self, uname, password):
-            """HTTP basic authorization string"""
-            return 'Basic %s' % \
-                ('%s:%s' % (uname, password)).encode('base64')
-        def test_new_name(self):
-            self.getURL(
-                self.app, '/admin/', method='POST',
-                data={'name':'Prince Al'},
-                environ={'HTTP_Authorization':
-                             self.basic_auth('Aladdin', 'open sesame')})
-            self.failUnless(self.status == '200 OK', self.status)
-            self.failUnless(self.response_headers == [],
-                            self.response_headers)
-            self.failUnless(self.exc_info == None, self.exc_info)
-            self.failUnless(self.users['Aladdin'].name == 'Prince Al',
-                            self.users['Aladdin'].name)
-            self.failUnless(self.users.changed == True,
-                            self.users.changed)
-        def test_new_password(self):
-            self.getURL(
-                self.app, '/admin/', method='POST',
-                data={'password':'New Pass'},
-                environ={'HTTP_Authorization':
-                             self.basic_auth('Aladdin', 'open sesame')})
-            self.failUnless(self.status == '200 OK', self.status)
-            self.failUnless(self.response_headers == [],
-                            self.response_headers)
-            self.failUnless(self.exc_info == None, self.exc_info)
-            self.failUnless(self.users['Aladdin'].passhash == \
-                            self.users['Aladdin'].hash('New Pass'),
-                            self.users['Aladdin'].passhash)
-            self.failUnless(self.users.changed == True,
-                            self.users.changed)
-        def test_guest_name(self):
-            self.getURL(
-                self.app, '/admin/', method='POST',
-                data={'name':'SPAM'},
-                environ={'HTTP_Authorization':
-                             self.basic_auth('guest', 'guestpass')})
-            self.failUnless(self.status.startswith('403 '), self.status)
-            self.failUnless(self.response_headers == [
-                    ('Content-Type', 'text/plain')],
-                            self.response_headers)
-            self.failUnless(self.exc_info == None, self.exc_info)
-            self.failUnless(self.users['guest'].name == 'Guest',
-                            self.users['guest'].name)
-            self.failUnless(self.users.changed == False,
-                            self.users.changed)
-        def test_guest_password(self):
-            self.getURL(
-                self.app, '/admin/', method='POST',
-                data={'password':'SPAM'},
-                environ={'HTTP_Authorization':
-                             self.basic_auth('guest', 'guestpass')})
-            self.failUnless(self.status.startswith('403 '), self.status)
-            self.failUnless(self.response_headers == [
-                    ('Content-Type', 'text/plain')],
-                            self.response_headers)
-            self.failUnless(self.exc_info == None, self.exc_info)
-            self.failUnless(self.users['guest'].name == 'Guest',
-                            self.users['guest'].name)
-            self.failUnless(self.users.changed == False,
-                            self.users.changed)
-
-    class ServerAppTestCase (WSGITestCase):
+    class ServerAppTestCase (libbe.command.serve.WSGITestCase):
         def setUp(self):
-            WSGITestCase.setUp(self)
+            libbe.command.serve.WSGITestCase.setUp(self)
             self.bd = libbe.bugdir.SimpleBugDir(memory=False)
             self.app = ServerApp(self.bd.storage, logger=self.logger)
         def tearDown(self):
             self.bd.cleanup()
-            WSGITestCase.tearDown(self)
-        def test_add_get(self):
-            self.getURL(self.app, '/add/', method='GET')
-            self.failUnless(self.status.startswith('404 '), self.status)
-            self.failUnless(self.response_headers == [
-                    ('Content-Type', 'text/plain')],
-                            self.response_headers)
+            libbe.command.serve.WSGITestCase.tearDown(self)
+        def test_run_list(self):
+            list = libbe.command.list.List()
+            params = list._parse_options_args()
+            data = yaml.safe_dump({
+                    'command': 'list',
+                    'parameters': params,
+                    })
+            self.getURL(self.app, '/run', method='POST', data=data)
+            self.failUnless(self.status.startswith('200 '), self.status)
+            self.failUnless(
+                ('Content-Type', 'application/octet-stream'
+                 ) in self.response_headers,
+                self.response_headers)
             self.failUnless(self.exc_info == None, self.exc_info)
-        def test_add_post(self):
-            self.getURL(self.app, '/add/', method='POST',
-                        data={'id':'123456', 'parent':'abc123',
-                              'directory':'True'})
-            self.failUnless(self.status == '200 OK', self.status)
-            self.failUnless(self.response_headers == [],
-                            self.response_headers)
-            self.failUnless(self.exc_info == None, self.exc_info)
-        # Note: other methods tested in libbe.storage.http
-
-        # TODO: integration tests on Serve?
+        # TODO: integration tests on ServeCommands?
 
     unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
     suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
-
-
-# The following certificate-creation code is adapted From pyOpenSSL's
-# examples.
-
-def get_cert_filenames(server_name, autogenerate=True, logger=None):
-    """
-    Generate private key and certification filenames.
-    get_cert_filenames(server_name) -> (pkey_filename, cert_filename)
-    """
-    pkey_file = '%s.pkey' % server_name
-    cert_file = '%s.cert' % server_name
-    if autogenerate == True:
-        for file in [pkey_file, cert_file]:
-            if not os.path.exists(file):
-                make_certs(server_name, logger)
-    return (pkey_file, cert_file)
-
-def createKeyPair(type, bits):
-    """Create a public/private key pair.
-
-    Returns the public/private key pair in a PKey object.
-
-    Parameters
-    ----------
-    type : TYPE_RSA or TYPE_DSA
-      Key type.
-    bits : int
-      Number of bits to use in the key.
-    """
-    pkey = OpenSSL.crypto.PKey()
-    pkey.generate_key(type, bits)
-    return pkey
-
-def createCertRequest(pkey, digest="md5", **name):
-    """Create a certificate request.
-
-    Returns the certificate request in an X509Req object.
-
-    Parameters
-    ----------
-    pkey : PKey
-      The key to associate with the request.
-    digest : "md5" or ?
-      Digestion method to use for signing, default is "md5",
-    `**name` :
-      The name of the subject of the request, possible.
-      Arguments are:
-
-      ============ ========================
-      C            Country name
-      ST           State or province name
-      L            Locality name
-      O            Organization name
-      OU           Organizational unit name
-      CN           Common name
-      emailAddress E-mail address
-      ============ ========================
-    """
-    req = OpenSSL.crypto.X509Req()
-    subj = req.get_subject()
-
-    for (key,value) in name.items():
-        setattr(subj, key, value)
-
-    req.set_pubkey(pkey)
-    req.sign(pkey, digest)
-    return req
-
-def createCertificate(req, (issuerCert, issuerKey), serial, (notBefore, notAfter), digest="md5"):
-    """Generate a certificate given a certificate request.
-
-    Returns the signed certificate in an X509 object.
-
-    Parameters
-    ----------
-    req :
-      Certificate reqeust to use
-    issuerCert :
-      The certificate of the issuer
-    issuerKey :
-      The private key of the issuer
-    serial :
-      Serial number for the certificate
-    notBefore :
-      Timestamp (relative to now) when the certificate
-      starts being valid
-    notAfter :
-      Timestamp (relative to now) when the certificate
-      stops being valid
-    digest :
-      Digest method to use for signing, default is md5
-    """
-    cert = OpenSSL.crypto.X509()
-    cert.set_serial_number(serial)
-    cert.gmtime_adj_notBefore(notBefore)
-    cert.gmtime_adj_notAfter(notAfter)
-    cert.set_issuer(issuerCert.get_subject())
-    cert.set_subject(req.get_subject())
-    cert.set_pubkey(req.get_pubkey())
-    cert.sign(issuerKey, digest)
-    return cert
-
-def make_certs(server_name, logger=None) :
-    """Generate private key and certification files.
-
-    `mk_certs(server_name) -> (pkey_filename, cert_filename)`
-    """
-    if OpenSSL == None:
-        raise libbe.command.UserError, \
-            'SSL certificate generation requires the OpenSSL module'
-    pkey_file,cert_file = get_cert_filenames(
-        server_name, autogenerate=False)
-    if logger != None:
-        logger.log(logger._server_level,
-                   'Generating certificates', pkey_file, cert_file)
-    cakey = createKeyPair(OpenSSL.crypto.TYPE_RSA, 1024)
-    careq = createCertRequest(cakey, CN='Certificate Authority')
-    cacert = createCertificate(
-        careq, (careq, cakey), 0, (0, 60*60*24*365*5)) # five years
-    open(pkey_file, 'w').write(OpenSSL.crypto.dump_privatekey(
-            OpenSSL.crypto.FILETYPE_PEM, cakey))
-    open(cert_file, 'w').write(OpenSSL.crypto.dump_certificate(
-            OpenSSL.crypto.FILETYPE_PEM, cacert))