X-Git-Url: http://git.tremily.us/?p=sawsim.git;a=blobdiff_plain;f=pysawsim%2Fmanager%2Fpbs.py;h=ca237b0f25e4b23e670552ca6033ce1c626967d4;hp=f0487c126fac6ed16588168c9c442de35048a198;hb=HEAD;hpb=3e1e7fdf3176b63532e70b66c481e9b787950c3b diff --git a/pysawsim/manager/pbs.py b/pysawsim/manager/pbs.py old mode 100755 new mode 100644 index f0487c1..ca237b0 --- a/pysawsim/manager/pbs.py +++ b/pysawsim/manager/pbs.py @@ -40,8 +40,12 @@ import types try: import pbs -except ImportError, pbs_error: - pbs = None + _ENABLED = True + _DISABLING_ERROR = None + _SKIP = '' +except ImportError, _DISABLING_ERROR: + _ENABLED = False + _SKIP = ' # doctest: +SKIP' from .. import invoke from . import Job, JobManager @@ -51,28 +55,28 @@ SCRATCH_DIR = os.path.expanduser('~') class PBSManager (JobManager): - """Manage asynchronous `Job` execution via :mod:`pbs`. + __doc__ = """Manage asynchronous `Job` execution via :mod:`pbs`. >>> from math import sqrt - >>> m = PBSManager() + >>> m = PBSManager()%(skip)s >>> group_A = [] >>> for i in range(10): - ... group_A.append(m.async_invoke(Job(target=sqrt, args=[i]))) + ... group_A.append(m.async_invoke(Job(target=sqrt, args=[i])))%(skip)s >>> group_B = [] >>> for i in range(10): ... group_B.append(m.async_invoke(Job(target=sqrt, args=[i], - ... blocks_on=[j.id for j in group_A]))) - >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]]) - >>> print sorted(jobs.values(), key=lambda j: j.id) + ... blocks_on=[j.id for j in group_A])))%(skip)s + >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]])%(skip)s + >>> print sorted(jobs.values(), key=lambda j: j.id)%(skip)s [, , ] - >>> jobs = m.wait() - >>> print sorted(jobs.values(), key=lambda j: j.id) + >>> jobs = m.wait()%(skip)s + >>> print sorted(jobs.values(), key=lambda j: j.id)%(skip)s ... # doctest: +NORMALIZE_WHITESPACE [, , , , , , , , , , , , , , , , ] - >>> m.teardown() - """ + >>> m.teardown()%(skip)s + """ % {'skip': _SKIP} def __init__(self, workdir=None): super(PBSManager, self).__init__() self._cleanup = True @@ -98,8 +102,8 @@ class PBSManager (JobManager): 'dequeuing from (.*), state (.*)') def _setup_pbs(self): - if pbs == None: - raise pbs_error + if _ENABLED == False: + raise _DISABLING_ERROR self._pbs_server = pbs.pbs_default() if not self._pbs_server: raise Exception('No default server: %s' % pbs.error()) @@ -146,15 +150,36 @@ class PBSManager (JobManager): self._post_submit = True def _write_script(self, job, stream): - """ + stream.write('#!/usr/bin/env python\n') + stream.write('#PBS -l walltime:%s\n' % self._walltime) + #stream.write('#PBS -l minwclimit:...') + if len(job.blocks_on) > 0: + blockers = [self._jobs.get(id, None) for id in job.blocks_on] + stream.write( + '#PBS -w afterok:%s\n' + % ':'.join([j._pbs_id for j in blockers if j != None])) + stream.write('from __future__ import with_statement\n') + stream.write('import os\n') + stream.write('import pickle\n') + stream.write('import sys\n') + stream.write('sys.path = [%s]\n' % ', '.join( + ["'%s'" % p for p in sys.path])) + stream.write("os.chdir(os.environ['PBS_O_WORKDIR'])\n") + stream.write("with open('%s', 'r') as f:\n" % job._pbs_paths['job']) + stream.write(' job = pickle.load(f)\n') + stream.write('job.run()\n') + stream.write("with open('%s', 'w') as f:\n" % job._pbs_paths['data']) + stream.write(' pickle.dump(job.data, f)\n') + stream.write('sys.exit(job.status)\n') + _write_script.__doc__ = """ >>> from . import InvokeJob - >>> m = PBSManager() - >>> j = InvokeJob(id=7, target='echo "testing %s"' % m) + >>> m = PBSManager()%(skip)s + >>> j = InvokeJob(id=7, target='echo "testing %%s"' %% m)%(skip)s >>> j._pbs_paths = { - ... 'job': os.path.join(m._workdir, '%d.job.pkl' % j.id), - ... 'data': os.path.join(m._workdir, '%d.pkl' % j.id), - ... } - >>> m._write_script(j, sys.stdout) + ... 'job': os.path.join(m._workdir, '%%d.job.pkl' %% j.id), + ... 'data': os.path.join(m._workdir, '%%d.pkl' %% j.id), + ... }%(skip)s + >>> m._write_script(j, sys.stdout)%(skip)s ... # doctest: +ELLIPSIS, +REPORT_UDIFF #!/usr/bin/env python #PBS -l walltime:3:00:00 @@ -171,16 +196,16 @@ class PBSManager (JobManager): pickle.dump(job.data, f) sys.exit(job.status) - >>> for id in [3, 4]: + >>> for id in [3, 4]:%(skip)s ... m._jobs[id] = Job(id=id) - ... m._jobs[id]._pbs_id = '%d.big.iron.com' % id - >>> j = InvokeJob(id=8, target='echo "testing %s"' % m, - ... blocks_on=[1,2,3,4]) + ... m._jobs[id]._pbs_id = '%%d.big.iron.com' %% id + >>> j = InvokeJob(id=8, target='echo "testing %%s"' %% m, + ... blocks_on=[1,2,3,4])%(skip)s >>> j._pbs_paths = { - ... 'job': os.path.join(m._workdir, '%d.job.pkl' % j.id), - ... 'data': os.path.join(m._workdir, '%d.pkl' % j.id), - ... } - >>> m._write_script(j, sys.stdout) + ... 'job': os.path.join(m._workdir, '%%d.job.pkl' %% j.id), + ... 'data': os.path.join(m._workdir, '%%d.pkl' %% j.id), + ... }%(skip)s + >>> m._write_script(j, sys.stdout)%(skip)s ... # doctest: +ELLIPSIS, +REPORT_UDIFF #!/usr/bin/env python #PBS -l walltime:3:00:00 @@ -197,29 +222,8 @@ class PBSManager (JobManager): with open('/.../tmp-PBSManager-.../8.pkl', 'w') as f: pickle.dump(job.data, f) sys.exit(job.status) - >>> m.teardown() - """ - stream.write('#!/usr/bin/env python\n') - stream.write('#PBS -l walltime:%s\n' % self._walltime) - #stream.write('#PBS -l minwclimit:...') - if len(job.blocks_on) > 0: - blockers = [self._jobs.get(id, None) for id in job.blocks_on] - stream.write( - '#PBS -w afterok:%s\n' - % ':'.join([j._pbs_id for j in blockers if j != None])) - stream.write('from __future__ import with_statement\n') - stream.write('import os\n') - stream.write('import pickle\n') - stream.write('import sys\n') - stream.write('sys.path = [%s]\n' % ', '.join( - ["'%s'" % p for p in sys.path])) - stream.write("os.chdir(os.environ['PBS_O_WORKDIR'])\n") - stream.write("with open('%s', 'r') as f:\n" % job._pbs_paths['job']) - stream.write(' job = pickle.load(f)\n') - stream.write('job.run()\n') - stream.write("with open('%s', 'w') as f:\n" % job._pbs_paths['data']) - stream.write(' pickle.dump(job.data, f)\n') - stream.write('sys.exit(job.status)\n') + >>> m.teardown()%(skip)s + """ % {'skip': _SKIP} def _environ(self): """Mimic the environment you would get with `qsub -V`.