--- /dev/null
+# Copyright (C) 2010 W. Trevor King <wking@drexel.edu>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# The author may be contacted at <wking@drexel.edu> on the Internet, or
+# write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
+# Philadelphia PA 19104, USA.
+
+"""Functions for running external commands on other hosts.
+
+pbs_ is a Python wrapper around the Tourque PBS server.
+
+.. _pbs: https://subtrac.sara.nl/oss/pbs_python
+"""
+
+from __future__ import absolute_import
+from __future__ import with_statement
+
+import os
+import os.path
+import pickle
+import re
+import shutil
+import socket
+import sys
+import tempfile
+import time
+import types
+
+try:
+ import pbs
+except ImportError, pbs_error:
+ pbs = None
+
+from .. import invoke
+from . import Job, JobManager
+
+
+SCRATCH_DIR = os.path.expanduser('~')
+
+
+class PBSManager (JobManager):
+ """Manage asynchronous `Job` execution via :mod:`pbs`.
+
+ >>> from math import sqrt
+ >>> m = PBSManager()
+ >>> group_A = []
+ >>> for i in range(10):
+ ... group_A.append(m.async_invoke(Job(target=sqrt, args=[i])))
+ >>> group_B = []
+ >>> for i in range(10):
+ ... group_B.append(m.async_invoke(Job(target=sqrt, args=[i],
+ ... blocks_on=[j.id for j in group_A])))
+ >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]])
+ >>> print sorted(jobs.values(), key=lambda j: j.id)
+ [<Job 5>, <Job 6>, <Job 7>]
+ >>> jobs = m.wait()
+ >>> print sorted(jobs.values(), key=lambda j: j.id)
+ ... # doctest: +NORMALIZE_WHITESPACE
+ [<Job 0>, <Job 1>, <Job 2>, <Job 3>, <Job 4>, <Job 8>, <Job 9>, <Job 10>,
+ <Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
+ <Job 18>, <Job 19>]
+ >>> m.teardown()
+ """
+ def __init__(self, workdir=None):
+ super(PBSManager, self).__init__()
+ self._cleanup = True
+ self._setup_pbs()
+ if workdir == None:
+ workdir = tempfile.mkdtemp(
+ prefix='tmp-PBSManager-', dir=SCRATCH_DIR)
+ self._temporary_workdir = True
+ else:
+ self._temporary_workdir = False
+ self._workdir = workdir
+ # example tracejob line:
+ # 10/19/2010 13:57:01 S dequeuing from batch, state COMPLETE
+ self._tracejob_re = re.compile(
+ '(\d*)/(\d*)/(\d*) (\d*):(\d*):(\d*) (\w*) (.*)')
+ self._tracejob_re_groups = [
+ 'month', 'day', 'year', 'hour', 'minute', 'second', 'code', 'msg']
+ self._tracejob_re_int_groups = [
+ 'month', 'day', 'year', 'hour', 'minute', 'second']
+ # example tracejob message:
+ # dequeuing from batch, state COMPLETE
+ self._tracejob_re_dequeue = re.compile(
+ 'dequeuing from (.*), state (.*)')
+
+ def _setup_pbs(self):
+ if pbs == None:
+ raise pbs_error
+ self._pbs_server = pbs.pbs_default()
+ if not self._pbs_server:
+ raise Exception('No default server: %s' % pbs.error())
+ self._pbs_connection = pbs.pbs_connect(self._pbs_server)
+ self._post_submit = False
+ self._post_submit_sleep = 3
+ self._receive_poll_sleep = 3
+ self._walltime = '3:00:00'
+
+ def teardown(self):
+ if self._cleanup == True and self._temporary_workdir == True:
+ shutil.rmtree(self._workdir)
+ super(PBSManager, self).teardown()
+
+ def _spawn_job(self, job):
+ job._pbs_paths = {}
+ for name,extension in [('script', 'py'), ('job', 'job.pkl'),
+ ('stdout', 'stdout'), ('stderr', 'stderr'),
+ ('data', 'pkl')]:
+ job._pbs_paths[name] = os.path.join(
+ self._workdir, '%d.%s' % (job.id, extension))
+ with open(job._pbs_paths['script'], 'w') as f:
+ self._write_script(job, f)
+ with open(job._pbs_paths['job'], 'w') as f:
+ pickle.dump(job, f)
+ environ = ','.join(['%s=%s' % (k,v) for k,v
+ in self._environ().iteritems()])
+ host = socket.getfqdn(socket.gethostname())
+ attrib = self._make_attropl({
+ pbs.ATTR_e: '%s:%s' % (
+ host, job._pbs_paths['stderr']),
+ #pbs.ATTR_h: 'u', # user hold
+ pbs.ATTR_m: 'n', # don't send any mail
+ pbs.ATTR_N: 'pysawsim-%d' % id(job), # name the job
+ pbs.ATTR_o: '%s:%s' % (
+ host, job._pbs_paths['stdout']),
+ pbs.ATTR_v: environ,
+ })
+ job._pbs_id = pbs.pbs_submit(
+ self._pbs_connection, attrib, job._pbs_paths['script'], None, None)
+ if not job._pbs_id:
+ raise Exception('Error submitting job %s: %s'
+ % (job, pbs.error()))
+ self._post_submit = True
+
+ def _write_script(self, job, stream):
+ """
+ >>> from . import InvokeJob
+ >>> m = PBSManager()
+ >>> j = InvokeJob(id=7, target='echo "testing %s"' % m)
+ >>> j._pbs_paths = {
+ ... 'job': os.path.join(m._workdir, '%d.job.pkl' % j.id),
+ ... 'data': os.path.join(m._workdir, '%d.pkl' % j.id),
+ ... }
+ >>> m._write_script(j, sys.stdout)
+ ... # doctest: +ELLIPSIS, +REPORT_UDIFF
+ #!/usr/bin/env python
+ #PBS -l walltime:3:00:00
+ from __future__ import with_statement
+ import os
+ import pickle
+ import sys
+ sys.path = [...]
+ os.chdir(os.environ['PBS_O_WORKDIR'])
+ with open('/.../tmp-PBSManager-.../7.job.pkl', 'r') as f:
+ job = pickle.load(f)
+ job.run()
+ with open('/.../tmp-PBSManager-.../7.pkl', 'w') as f:
+ pickle.dump(job.data, f)
+ sys.exit(job.status)
+
+ >>> for id in [3, 4]:
+ ... m._jobs[id] = Job(id=id)
+ ... m._jobs[id]._pbs_id = '%d.big.iron.com' % id
+ >>> j = InvokeJob(id=8, target='echo "testing %s"' % m,
+ ... blocks_on=[1,2,3,4])
+ >>> j._pbs_paths = {
+ ... 'job': os.path.join(m._workdir, '%d.job.pkl' % j.id),
+ ... 'data': os.path.join(m._workdir, '%d.pkl' % j.id),
+ ... }
+ >>> m._write_script(j, sys.stdout)
+ ... # doctest: +ELLIPSIS, +REPORT_UDIFF
+ #!/usr/bin/env python
+ #PBS -l walltime:3:00:00
+ #PBS -w afterok:3.big.iron.com:4.big.iron.com
+ from __future__ import with_statement
+ import os
+ import pickle
+ import sys
+ sys.path = [...]
+ os.chdir(os.environ['PBS_O_WORKDIR'])
+ with open('/.../tmp-PBSManager-.../8.job.pkl', 'r') as f:
+ job = pickle.load(f)
+ job.run()
+ with open('/.../tmp-PBSManager-.../8.pkl', 'w') as f:
+ pickle.dump(job.data, f)
+ sys.exit(job.status)
+ >>> m.teardown()
+ """
+ stream.write('#!/usr/bin/env python\n')
+ stream.write('#PBS -l walltime:%s\n' % self._walltime)
+ #stream.write('#PBS -l minwclimit:...')
+ if len(job.blocks_on) > 0:
+ blockers = [self._jobs.get(id, None) for id in job.blocks_on]
+ stream.write(
+ '#PBS -w afterok:%s\n'
+ % ':'.join([j._pbs_id for j in blockers if j != None]))
+ stream.write('from __future__ import with_statement\n')
+ stream.write('import os\n')
+ stream.write('import pickle\n')
+ stream.write('import sys\n')
+ stream.write('sys.path = [%s]\n' % ', '.join(
+ ["'%s'" % p for p in sys.path]))
+ stream.write("os.chdir(os.environ['PBS_O_WORKDIR'])\n")
+ stream.write("with open('%s', 'r') as f:\n" % job._pbs_paths['job'])
+ stream.write(' job = pickle.load(f)\n')
+ stream.write('job.run()\n')
+ stream.write("with open('%s', 'w') as f:\n" % job._pbs_paths['data'])
+ stream.write(' pickle.dump(job.data, f)\n')
+ stream.write('sys.exit(job.status)\n')
+
+ def _environ(self):
+ """Mimic the environment you would get with `qsub -V`.
+ """
+ environ = dict(os.environ)
+ for key in ['HOME', 'LANG', 'LOGNAME', 'PATH', 'MAIL', 'SHELL', 'TZ']:
+ if key in environ:
+ environ['PBS_O_%s' % key] = environ[key]
+ environ['PBS_SERVER'] = self._pbs_server
+ environ['PBS_O_WORKDIR'] = os.getcwd()
+ #environ['PBS_ARRAYID'] # not an array job
+ return environ
+
+ def _receive_job(self):
+ if self._post_submit == True:
+ self._post_submit = False
+ time.sleep(self._post_submit_sleep)
+ while True:
+ for job in self._jobs.itervalues():
+ if job.status != None:
+ continue
+ info = self._tracejob(job)
+ if info.get('state', None) == 'COMPLETE':
+ break
+ if info.get('state', None):
+ break
+ time.sleep(self._receive_poll_sleep)
+ job._pbs_info = info
+ job.status = int(info['Exit_status'])
+ with open(job._pbs_paths['data'], 'r') as f:
+ job.data = pickle.load(f)
+ job._pbs_stdout = open(job._pbs_paths['stdout'], 'r').read()
+ job._pbs_stderr = open(job._pbs_paths['stderr'], 'r').read()
+ if self._cleanup == True:
+ for name,path in job._pbs_paths.iteritems():
+ os.remove(path)
+ del(job._pbs_paths)
+ return job
+
+ def _tracejob(self, job):
+ s,out,err = invoke.invoke('tracejob "%s"' % job._pbs_id)
+ lines = []
+ for line in out.splitlines():
+ m = self._tracejob_re.match(line.strip())
+ if m == None:
+ continue
+ data = {}
+ for group,value in zip(self._tracejob_re_groups, m.groups()):
+ if group in self._tracejob_re_int_groups:
+ data[group] = int(value)
+ else:
+ data[group] = value
+ lines.append(data)
+
+ info = {'lines': lines}
+ for line in lines:
+ if line['msg'].startswith('Exit_status='):
+ # Exit_status=0 resources_used.cput=00:00:00...
+ fields = line['msg'].split()
+ for field in fields:
+ key,value = field.split('=', 1)
+ info[key] = value
+ elif line['msg'].startswith('dequeuing from'):
+ m = self._tracejob_re_dequeue.match(line['msg'])
+ info['queue'],info['state'] = m.groups()
+ return info
+
+ @staticmethod
+ def _make_attr_structs (attributes, constructor):
+ """Create an array of structs (via the specified
+ `constructor`) which encode the specified `attributes`.
+
+ Code adapted from:
+
+ From: "Nate Woody" <nathaniel.x.woody@gsk.com>
+ Date: Wed Sep 6 09:01:23 MDT 2006
+ Subject: [torqueusers] python pbs_submit
+ URL: http://www.clusterresources.com/pipermail/torqueusers/2006-September/004212.html
+ """
+ # Determine the number of structs evinced by 'attributes'.
+ attr_count = len(attributes)
+
+ attr = constructor(attr_count)
+ index = 0
+
+ # Pack the struct array.
+ for pair in attributes.iteritems():
+ name, ds = pair
+
+ # If 'ds' is a dictionary, then treat it as containing valued resources.
+ if type(ds) == types.DictType:
+ for resource_pair in ds.iteritems():
+ resource, value = resource_pair
+ attr[index].name = name
+ attr[index].resource = resource
+ attr[index].value = str(value)
+ index += 1
+
+ else:
+ # If 'ds' is a scalar object, then wrap a list around it.
+ if type(ds) != types.ListType:
+ ds = [ ds ]
+
+ attr[index].name = name
+ attr[index].value = ",".join(map(lambda x: str(x), ds))
+ index += 1
+
+ return attr
+
+ @staticmethod
+ def _make_attrl (attributes):
+ """Obtain an array of `attrl` structs which encode the
+ specified `attributes`.
+
+ Code adapted from:
+
+ From: "Nate Woody" <nathaniel.x.woody@gsk.com>
+ Date: Wed Sep 6 09:01:23 MDT 2006
+ Subject: [torqueusers] python pbs_submit
+ URL: http://www.clusterresources.com/pipermail/torqueusers/2006-September/004212.html
+ """
+ return PBSManager._make_attr_structs(
+ attributes, constructor=pbs.new_attrl)
+
+ @staticmethod
+ def _make_attropl (attributes):
+ """Obtain an array of `attropl` structs which encode the specified
+ `attributes`.
+
+ Code adapted from:
+
+ From: "Nate Woody" <nathaniel.x.woody@gsk.com>
+ Date: Wed Sep 6 09:01:23 MDT 2006
+ Subject: [torqueusers] python pbs_submit
+ URL: http://www.clusterresources.com/pipermail/torqueusers/2006-September/004212.html
+ """
+ return PBSManager._make_attr_structs(
+ attributes, constructor=pbs.new_attropl)