try:
import pbs
+ _SKIP = ''
except ImportError, pbs_error:
pbs = None
+ _SKIP = ' # doctest: +SKIP'
from .. import invoke
from . import Job, JobManager
class PBSManager (JobManager):
- """Manage asynchronous `Job` execution via :mod:`pbs`.
+ __doc__ = """Manage asynchronous `Job` execution via :mod:`pbs`.
>>> from math import sqrt
- >>> m = PBSManager()
+ >>> m = PBSManager()%(skip)s
>>> group_A = []
>>> for i in range(10):
- ... group_A.append(m.async_invoke(Job(target=sqrt, args=[i])))
+ ... group_A.append(m.async_invoke(Job(target=sqrt, args=[i])))%(skip)s
>>> group_B = []
>>> for i in range(10):
... group_B.append(m.async_invoke(Job(target=sqrt, args=[i],
- ... blocks_on=[j.id for j in group_A])))
- >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]])
- >>> print sorted(jobs.values(), key=lambda j: j.id)
+ ... blocks_on=[j.id for j in group_A])))%(skip)s
+ >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]])%(skip)s
+ >>> print sorted(jobs.values(), key=lambda j: j.id)%(skip)s
[<Job 5>, <Job 6>, <Job 7>]
- >>> jobs = m.wait()
- >>> print sorted(jobs.values(), key=lambda j: j.id)
+ >>> jobs = m.wait()%(skip)s
+ >>> print sorted(jobs.values(), key=lambda j: j.id)%(skip)s
... # doctest: +NORMALIZE_WHITESPACE
[<Job 0>, <Job 1>, <Job 2>, <Job 3>, <Job 4>, <Job 8>, <Job 9>, <Job 10>,
<Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
<Job 18>, <Job 19>]
- >>> m.teardown()
- """
+ >>> m.teardown()%(skip)s
+ """ % {'skip': _SKIP}
def __init__(self, workdir=None):
super(PBSManager, self).__init__()
self._cleanup = True
self._post_submit = True
def _write_script(self, job, stream):
- """
+ stream.write('#!/usr/bin/env python\n')
+ stream.write('#PBS -l walltime:%s\n' % self._walltime)
+ #stream.write('#PBS -l minwclimit:...')
+ if len(job.blocks_on) > 0:
+ blockers = [self._jobs.get(id, None) for id in job.blocks_on]
+ stream.write(
+ '#PBS -w afterok:%s\n'
+ % ':'.join([j._pbs_id for j in blockers if j != None]))
+ stream.write('from __future__ import with_statement\n')
+ stream.write('import os\n')
+ stream.write('import pickle\n')
+ stream.write('import sys\n')
+ stream.write('sys.path = [%s]\n' % ', '.join(
+ ["'%s'" % p for p in sys.path]))
+ stream.write("os.chdir(os.environ['PBS_O_WORKDIR'])\n")
+ stream.write("with open('%s', 'r') as f:\n" % job._pbs_paths['job'])
+ stream.write(' job = pickle.load(f)\n')
+ stream.write('job.run()\n')
+ stream.write("with open('%s', 'w') as f:\n" % job._pbs_paths['data'])
+ stream.write(' pickle.dump(job.data, f)\n')
+ stream.write('sys.exit(job.status)\n')
+ _write_script.__doc__ = """
>>> from . import InvokeJob
- >>> m = PBSManager()
- >>> j = InvokeJob(id=7, target='echo "testing %s"' % m)
+ >>> m = PBSManager()%(skip)s
+ >>> j = InvokeJob(id=7, target='echo "testing %%s"' %% m)%(skip)s
>>> j._pbs_paths = {
- ... 'job': os.path.join(m._workdir, '%d.job.pkl' % j.id),
- ... 'data': os.path.join(m._workdir, '%d.pkl' % j.id),
- ... }
- >>> m._write_script(j, sys.stdout)
+ ... 'job': os.path.join(m._workdir, '%%d.job.pkl' %% j.id),
+ ... 'data': os.path.join(m._workdir, '%%d.pkl' %% j.id),
+ ... }%(skip)s
+ >>> m._write_script(j, sys.stdout)%(skip)s
... # doctest: +ELLIPSIS, +REPORT_UDIFF
#!/usr/bin/env python
#PBS -l walltime:3:00:00
pickle.dump(job.data, f)
sys.exit(job.status)
- >>> for id in [3, 4]:
+ >>> for id in [3, 4]:%(skip)s
... m._jobs[id] = Job(id=id)
- ... m._jobs[id]._pbs_id = '%d.big.iron.com' % id
- >>> j = InvokeJob(id=8, target='echo "testing %s"' % m,
- ... blocks_on=[1,2,3,4])
+ ... m._jobs[id]._pbs_id = '%%d.big.iron.com' %% id
+ >>> j = InvokeJob(id=8, target='echo "testing %%s"' %% m,
+ ... blocks_on=[1,2,3,4])%(skip)s
>>> j._pbs_paths = {
- ... 'job': os.path.join(m._workdir, '%d.job.pkl' % j.id),
- ... 'data': os.path.join(m._workdir, '%d.pkl' % j.id),
- ... }
- >>> m._write_script(j, sys.stdout)
+ ... 'job': os.path.join(m._workdir, '%%d.job.pkl' %% j.id),
+ ... 'data': os.path.join(m._workdir, '%%d.pkl' %% j.id),
+ ... }%(skip)s
+ >>> m._write_script(j, sys.stdout)%(skip)s
... # doctest: +ELLIPSIS, +REPORT_UDIFF
#!/usr/bin/env python
#PBS -l walltime:3:00:00
with open('/.../tmp-PBSManager-.../8.pkl', 'w') as f:
pickle.dump(job.data, f)
sys.exit(job.status)
- >>> m.teardown()
- """
- stream.write('#!/usr/bin/env python\n')
- stream.write('#PBS -l walltime:%s\n' % self._walltime)
- #stream.write('#PBS -l minwclimit:...')
- if len(job.blocks_on) > 0:
- blockers = [self._jobs.get(id, None) for id in job.blocks_on]
- stream.write(
- '#PBS -w afterok:%s\n'
- % ':'.join([j._pbs_id for j in blockers if j != None]))
- stream.write('from __future__ import with_statement\n')
- stream.write('import os\n')
- stream.write('import pickle\n')
- stream.write('import sys\n')
- stream.write('sys.path = [%s]\n' % ', '.join(
- ["'%s'" % p for p in sys.path]))
- stream.write("os.chdir(os.environ['PBS_O_WORKDIR'])\n")
- stream.write("with open('%s', 'r') as f:\n" % job._pbs_paths['job'])
- stream.write(' job = pickle.load(f)\n')
- stream.write('job.run()\n')
- stream.write("with open('%s', 'w') as f:\n" % job._pbs_paths['data'])
- stream.write(' pickle.dump(job.data, f)\n')
- stream.write('sys.exit(job.status)\n')
+ >>> m.teardown()%(skip)s
+ """ % {'skip': _SKIP}
def _environ(self):
"""Mimic the environment you would get with `qsub -V`.