--- /dev/null
+import os
+import sys
+import time
+from subprocess import Popen, PIPE, STDOUT
+import signal
+import socket
+import errno
+import shutil
+
+
+class LaunchError(Exception):
+ """ Exception class to signal startup error"""
+ pass
+
+class AdminError(Exception):
+ """ Exception class to handle admin errors"""
+ pass
+
+
+class Launcher:
+
+ def __init__(self, path):
+ self._buildDir = path
+ self._confDir = '%s/tests/kdc_realm/input_conf' % self._buildDir
+ confFile ='%s/test_setup.conf' % self._confDir
+ confParams = self._testSetup(confFile)
+ self._sandboxDir = '%s/%s' % (self._buildDir,confParams['sandboxDir'])
+ self._sandboxTier1 = '%s/%s' % (self._sandboxDir, 'tier1')
+ self._sandboxTier2 = '%s/%s' % (self._sandboxDir, 'tier2')
+ self._configurations = self._readServerConfiguration('%s/%s' % (self._confDir,confParams['testKDCconf']))
+ self._principals = self._readTestInputs('%s/%s' % (self._confDir,confParams['principals']))
+ os.environ["LD_LIBRARY_PATH"] = '%s/lib' % self._buildDir
+ self._pidRefKDC = 0
+ self._pidMap = dict()
+ self._initialized = False
+ self._tier1Init = False
+ self._tier2Init = False
+ self._vars = {'srcdir': self._buildDir,
+ 'tier1':self._sandboxTier1,
+ 'tier2':self._sandboxTier2,
+ 'localFQDN':socket.getfqdn()}
+
+ def _launchKDC(self, tierId, args, env):
+ """
+ Launching KDC server
+ """
+ cmd = '%s/kdc/krb5kdc' % self._buildDir
+ handle = Popen([cmd, args], env=env)
+ time.sleep(1)
+ # make sure that process is running
+ rc = handle.poll()
+ if rc is None:
+ print 'KDC server has been launched: pid=%s, tier=%s' % (handle.pid, tierId)
+ self._pidMap[handle.pid] = 1
+ return handle.pid
+ else:
+ raise LaunchError, 'Failed to launch kdc server'
+
+
+ def _prepSandbox(self):
+ for tierId in range(1,3):
+ tierdir = '%s/tier%i' % (self._sandboxDir, tierId)
+ if os.path.exists(tierdir):
+ shutil.rmtree(tierdir)
+ os.makedirs(tierdir, 0777)
+
+
+ def _kill(self, pid = None):
+ """
+ Kill specific process or group saved in pidMap
+ """
+ if pid is None:
+ target = self._pidMap.keys()
+ else:
+ target = [pid]
+ for p in target:
+ if p in self._pidMap:
+ del self._pidMap[p]
+ try:
+ os.kill(p, signal.SIGKILL)
+ except OSError:
+ pass
+
+
+ def _createDB(self, env):
+ """
+ Creating DB
+ """
+ cmd = '%s/kadmin/dbutil/kdb5_util' % self._buildDir
+ p = Popen([cmd, 'create', '-s'], env = env, stdin=PIPE, stdout=PIPE, stderr=PIPE)
+ (out, err) = p.communicate('a\na\n')
+ if p.returncode != 0:
+ err_msg = 'Failed to create DB: %s' % err
+ raise LaunchError, err_msg
+
+
+ def _launchClient(self, args, env):
+ """
+ kinit & kvno
+ """
+ self._addPrinc(args, env)
+ p = Popen(['kinit', args], env = env, stdin=PIPE, stdout=PIPE, stderr=PIPE)
+ (out, err) = p.communicate('a\n')
+ if int(p.wait()) == 0:
+ self._initialized = True
+ else:
+ err_msg = 'Failed to kinit client: %s' % err
+ raise AdminError, err_msg
+
+ # testHost', 'mybox.mit.edu is a srv defined in referral KDC. Get its kvno
+ cmd = '%s/clients/kvno/kvno' % self._buildDir
+ handle = Popen([cmd, '-C', '-S', 'testHost', 'mybox.mit.edu'],
+ env = env, stdin=PIPE, stdout=PIPE, stderr=PIPE)
+ (out, err) = handle.communicate()
+ handle.wait()
+ print 'kvno return code: %s' % handle.returncode
+
+ # Cleanup cached info
+ p = Popen(['kdestroy'], env = env, stdin=PIPE, stdout=PIPE, stderr=PIPE)
+ (out, err) = p.communicate()
+ if int(p.wait()) != 0:
+ err_msg = 'Failed to kdestroy cashed tickets: %s' % err
+ raise AdminError, err_msg
+
+ return handle.returncode
+
+
+ def _addPrinc(self, args, env):
+ """
+ Add Principal
+ """
+ msg = 'addprinc -pw a %s' % args
+ p = Popen(['kadmin.local' ], env = env, stdin=PIPE, stdout=PIPE, stderr=PIPE)
+ (out, err) = p.communicate(msg)
+ if int(p.wait()) != 0:
+ err_msg = 'Failed to add principal %s' % err_msg
+ raise AdminError, err_msg
+
+
+ def _crossRealm(self, r_local, r_remote, env):
+ """
+ Croos-realm setup
+ """
+ msg = 'addprinc -pw a krbtgt/%s@%s' % (r_remote, r_local)
+ p = Popen(['kadmin.local' ], env = env, stdin=PIPE, stdout=PIPE, stderr=PIPE)
+ (out, err) = p.communicate(msg)
+ if int(p.wait()) != 0:
+ err_msg = 'Failed to set cross-realm: %s' % err
+ raise AdminError, err_msg
+
+ msg = 'addprinc -pw a krbtgt/%s@%s' % (r_local, r_remote)
+ p = Popen(['kadmin.local' ], env = env, stdin=PIPE, stdout=PIPE, stderr=PIPE)
+ (out, err) = p.communicate(msg)
+ if int(p.wait()) != 0:
+ err_msg = 'Failed to set cross-realm: %s' % err
+ raise AdminError, err_msg
+
+
+ def _launchRefKDC(self,test_env):
+ """
+ Launch referral KDC
+ """
+ test_env["KRB5_CONFIG"] = '%s/krb5.conf' % self._sandboxTier1
+ test_env["KRB5_KDC_PROFILE"] = '%s/kdc.conf' % self._sandboxTier1
+ server_args = '-n'
+ if self._tier1Init == False:
+ # Create adequate to the environment config files
+ self._createFileFromTemplate('%s' % test_env["KRB5_CONFIG"],
+ '%s/%s' % (self._confDir,'krb5_ref_template.conf'),
+ self._vars)
+ self._createFileFromTemplate('%s' % test_env["KRB5_KDC_PROFILE"],
+ '%s/%s' % (self._confDir, 'kdc_ref_template.conf'),
+ self._vars)
+
+ # create DB for KDC to be referred to
+ pid = self._createDB(test_env)
+
+ # launch KDC to be referred to
+ self._pidRefKDC = self._launchKDC(1, server_args, test_env)
+
+ # The tests run against 'testHost/mybox.mit.edu' srv.
+ args = 'testHost/mybox.mit.edu'
+ self._addPrinc(args, test_env)
+ self._crossRealm('Z.COM', 'Y.COM', test_env)
+ self._tier1Init = True
+
+
+ def _launchTestingPair(self, srvParam,clntParam):
+ # launch KDC
+ server_env = os.environ.copy()
+ server_env["KRB5_KDC_PROFILE"] = '%s/kdc.conf' % self._sandboxTier2
+ server_env["KRB5_CONFIG"] = '%s/krb5_KDC.conf' % self._sandboxTier2
+ server_args = '-n'
+ self._createFileFromTemplate('%s' % server_env["KRB5_CONFIG"],
+ '%s/%s' % (self._confDir,srvParam),
+ self._vars)
+ self._createFileFromTemplate('%s' % server_env["KRB5_KDC_PROFILE"],
+ '%s/%s' % (self._confDir,'kdc_pri_template.conf'),
+ self._vars)
+ if self._tier2Init == False:
+ pid = self._createDB(server_env)
+ self._crossRealm('Y.COM', 'Z.COM', server_env)
+ self._tier2Init = True
+
+ server = self._launchKDC( 2, server_args, server_env)
+
+ # launch client
+ client_env = os.environ.copy()
+ client_env["KRB5_CONFIG"] = '%s/krb5_CL.conf' % self._sandboxTier2
+ self._createFileFromTemplate('%s' % client_env["KRB5_CONFIG"],
+ '%s/%s' % (self._confDir, 'krb5_priCL_template.conf'),
+ self._vars)
+ client_env["KRB5_KDC_PROFILE"] = server_env["KRB5_KDC_PROFILE"]
+ rc = self._launchClient(clntParam, client_env)
+ self._kill(server)
+ return rc
+
+
+ def run(self, args):
+ """
+ run the test
+ """
+ test_env = os.environ.copy()
+ test_env["SRCDIR"] = '%s' % self._buildDir
+
+ # create sandbox file directory if it does not exist
+ self._prepSandbox()
+
+ if self._tier1Init == False:
+ self._launchRefKDC(test_env)
+
+ result = dict()
+ for princs in self._principals:
+ for conf in self._configurations:
+ rc = self._launchTestingPair( conf['confName'], princs % self._vars)
+ result[conf['confName']] = {'expected':conf['expected'], 'actual':rc}
+ print 'Test code for configuration %s principal %s: %s' % (conf, princs, rc)
+ return result
+
+
+ def _readTestInputs(self, path):
+ f = open(path, 'r')
+ result = []
+ for line in f:
+ result.append(line.rstrip())
+ f.close()
+ return result
+
+
+ def _readServerConfiguration(self, path):
+ f = open(path, 'r')
+ result = []
+ for line in f:
+ fields = (line.rstrip()).split(',')
+ result.append({'confName':fields[0],'expected':fields[1]})
+ f.close()
+ return result
+
+
+ def _testSetup(self, path):
+ print path
+ f = open(path, 'r')
+ result = dict()
+ for line in f:
+ try:
+ (a,v) = line.rstrip().split('=')
+ result[a]=v
+ except:
+ print 'bad format for config file, line: %s' % line
+ return None
+ f.close()
+ return result
+
+
+ def _createFileFromTemplate(self, outpath, template, vars):
+ fin = open(template, 'r')
+ result = fin.read() % vars
+ fin.close()
+ fout = open(outpath, 'w')
+ fout.write(result)
+ fout.close()
+
+
+ def _getDNS(self):
+ print socket.getfqdn()
+
+
+ def printTestResults(self, testResults):
+ success_count = 0
+ fail_count = 0
+ print '\n'
+ print '------------------- Test Results ------------------------'
+ for (conf_name, result) in testResults.iteritems():
+ if int(result['expected']) == int(result['actual']):
+ print 'Test for configuration %s has succeeded' % conf_name
+ success_count += 1
+ else:
+ print 'Test for configuration %s has failed' % conf_name
+ fail_count += 1
+
+ print '------------------- Summary -----------------------------'
+ print 'Of %i tests %i failed, %i succeeded' % (len(testResults),
+ fail_count,
+ success_count)
+ print '---------------------------------------------------------'
+
+
+ def clean(self):
+ self._kill()
+
+
+if __name__ == '__main__':
+ src_path = os.environ["PWD"]
+ print "SOURCE PATH ==>" , src_path
+ test = None
+ try:
+ test = Launcher(src_path)
+ result = test.run('main')
+ test.clean()
+ test.printTestResults(result)
+
+ except:
+ if test is not None:
+ test.clean()
+ raise