From aa7458bfc4d1007def966baa0108994bc51184fc Mon Sep 17 00:00:00 2001 From: "W. Trevor King" Date: Wed, 20 Oct 2010 07:30:45 -0400 Subject: [PATCH] Adjust pysawsim.manager.pbs to skip most doctests if pbs mod is missing. --- pysawsim/manager/pbs.py | 98 +++++++++++++++++++++-------------------- 1 file changed, 50 insertions(+), 48 deletions(-) diff --git a/pysawsim/manager/pbs.py b/pysawsim/manager/pbs.py index f0487c1..f98a0aa 100644 --- a/pysawsim/manager/pbs.py +++ b/pysawsim/manager/pbs.py @@ -40,8 +40,10 @@ import types try: import pbs + _SKIP = '' except ImportError, pbs_error: pbs = None + _SKIP = ' # doctest: +SKIP' from .. import invoke from . import Job, JobManager @@ -51,28 +53,28 @@ SCRATCH_DIR = os.path.expanduser('~') class PBSManager (JobManager): - """Manage asynchronous `Job` execution via :mod:`pbs`. + __doc__ = """Manage asynchronous `Job` execution via :mod:`pbs`. >>> from math import sqrt - >>> m = PBSManager() + >>> m = PBSManager()%(skip)s >>> group_A = [] >>> for i in range(10): - ... group_A.append(m.async_invoke(Job(target=sqrt, args=[i]))) + ... group_A.append(m.async_invoke(Job(target=sqrt, args=[i])))%(skip)s >>> group_B = [] >>> for i in range(10): ... group_B.append(m.async_invoke(Job(target=sqrt, args=[i], - ... blocks_on=[j.id for j in group_A]))) - >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]]) - >>> print sorted(jobs.values(), key=lambda j: j.id) + ... blocks_on=[j.id for j in group_A])))%(skip)s + >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]])%(skip)s + >>> print sorted(jobs.values(), key=lambda j: j.id)%(skip)s [, , ] - >>> jobs = m.wait() - >>> print sorted(jobs.values(), key=lambda j: j.id) + >>> jobs = m.wait()%(skip)s + >>> print sorted(jobs.values(), key=lambda j: j.id)%(skip)s ... # doctest: +NORMALIZE_WHITESPACE [, , , , , , , , , , , , , , , , ] - >>> m.teardown() - """ + >>> m.teardown()%(skip)s + """ % {'skip': _SKIP} def __init__(self, workdir=None): super(PBSManager, self).__init__() self._cleanup = True @@ -146,15 +148,36 @@ class PBSManager (JobManager): self._post_submit = True def _write_script(self, job, stream): - """ + stream.write('#!/usr/bin/env python\n') + stream.write('#PBS -l walltime:%s\n' % self._walltime) + #stream.write('#PBS -l minwclimit:...') + if len(job.blocks_on) > 0: + blockers = [self._jobs.get(id, None) for id in job.blocks_on] + stream.write( + '#PBS -w afterok:%s\n' + % ':'.join([j._pbs_id for j in blockers if j != None])) + stream.write('from __future__ import with_statement\n') + stream.write('import os\n') + stream.write('import pickle\n') + stream.write('import sys\n') + stream.write('sys.path = [%s]\n' % ', '.join( + ["'%s'" % p for p in sys.path])) + stream.write("os.chdir(os.environ['PBS_O_WORKDIR'])\n") + stream.write("with open('%s', 'r') as f:\n" % job._pbs_paths['job']) + stream.write(' job = pickle.load(f)\n') + stream.write('job.run()\n') + stream.write("with open('%s', 'w') as f:\n" % job._pbs_paths['data']) + stream.write(' pickle.dump(job.data, f)\n') + stream.write('sys.exit(job.status)\n') + _write_script.__doc__ = """ >>> from . import InvokeJob - >>> m = PBSManager() - >>> j = InvokeJob(id=7, target='echo "testing %s"' % m) + >>> m = PBSManager()%(skip)s + >>> j = InvokeJob(id=7, target='echo "testing %%s"' %% m)%(skip)s >>> j._pbs_paths = { - ... 'job': os.path.join(m._workdir, '%d.job.pkl' % j.id), - ... 'data': os.path.join(m._workdir, '%d.pkl' % j.id), - ... } - >>> m._write_script(j, sys.stdout) + ... 'job': os.path.join(m._workdir, '%%d.job.pkl' %% j.id), + ... 'data': os.path.join(m._workdir, '%%d.pkl' %% j.id), + ... }%(skip)s + >>> m._write_script(j, sys.stdout)%(skip)s ... # doctest: +ELLIPSIS, +REPORT_UDIFF #!/usr/bin/env python #PBS -l walltime:3:00:00 @@ -171,16 +194,16 @@ class PBSManager (JobManager): pickle.dump(job.data, f) sys.exit(job.status) - >>> for id in [3, 4]: + >>> for id in [3, 4]:%(skip)s ... m._jobs[id] = Job(id=id) - ... m._jobs[id]._pbs_id = '%d.big.iron.com' % id - >>> j = InvokeJob(id=8, target='echo "testing %s"' % m, - ... blocks_on=[1,2,3,4]) + ... m._jobs[id]._pbs_id = '%%d.big.iron.com' %% id + >>> j = InvokeJob(id=8, target='echo "testing %%s"' %% m, + ... blocks_on=[1,2,3,4])%(skip)s >>> j._pbs_paths = { - ... 'job': os.path.join(m._workdir, '%d.job.pkl' % j.id), - ... 'data': os.path.join(m._workdir, '%d.pkl' % j.id), - ... } - >>> m._write_script(j, sys.stdout) + ... 'job': os.path.join(m._workdir, '%%d.job.pkl' %% j.id), + ... 'data': os.path.join(m._workdir, '%%d.pkl' %% j.id), + ... }%(skip)s + >>> m._write_script(j, sys.stdout)%(skip)s ... # doctest: +ELLIPSIS, +REPORT_UDIFF #!/usr/bin/env python #PBS -l walltime:3:00:00 @@ -197,29 +220,8 @@ class PBSManager (JobManager): with open('/.../tmp-PBSManager-.../8.pkl', 'w') as f: pickle.dump(job.data, f) sys.exit(job.status) - >>> m.teardown() - """ - stream.write('#!/usr/bin/env python\n') - stream.write('#PBS -l walltime:%s\n' % self._walltime) - #stream.write('#PBS -l minwclimit:...') - if len(job.blocks_on) > 0: - blockers = [self._jobs.get(id, None) for id in job.blocks_on] - stream.write( - '#PBS -w afterok:%s\n' - % ':'.join([j._pbs_id for j in blockers if j != None])) - stream.write('from __future__ import with_statement\n') - stream.write('import os\n') - stream.write('import pickle\n') - stream.write('import sys\n') - stream.write('sys.path = [%s]\n' % ', '.join( - ["'%s'" % p for p in sys.path])) - stream.write("os.chdir(os.environ['PBS_O_WORKDIR'])\n") - stream.write("with open('%s', 'r') as f:\n" % job._pbs_paths['job']) - stream.write(' job = pickle.load(f)\n') - stream.write('job.run()\n') - stream.write("with open('%s', 'w') as f:\n" % job._pbs_paths['data']) - stream.write(' pickle.dump(job.data, f)\n') - stream.write('sys.exit(job.status)\n') + >>> m.teardown()%(skip)s + """ % {'skip': _SKIP} def _environ(self): """Mimic the environment you would get with `qsub -V`. -- 2.26.2