From: W. Trevor King Date: Tue, 19 Oct 2010 23:02:19 +0000 (-0400) Subject: Move 'qwait' to pysawsim.manager.pbs and add the rest of PBSManager. X-Git-Url: http://git.tremily.us/?a=commitdiff_plain;h=3e1e7fdf3176b63532e70b66c481e9b787950c3b;p=sawsim.git Move 'qwait' to pysawsim.manager.pbs and add the rest of PBSManager. --- diff --git a/pysawsim/manager/pbs.py b/pysawsim/manager/pbs.py new file mode 100755 index 0000000..f0487c1 --- /dev/null +++ b/pysawsim/manager/pbs.py @@ -0,0 +1,360 @@ +# Copyright (C) 2010 W. Trevor King +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# The author may be contacted at on the Internet, or +# write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St., +# Philadelphia PA 19104, USA. + +"""Functions for running external commands on other hosts. + +pbs_ is a Python wrapper around the Tourque PBS server. + +.. _pbs: https://subtrac.sara.nl/oss/pbs_python +""" + +from __future__ import absolute_import +from __future__ import with_statement + +import os +import os.path +import pickle +import re +import shutil +import socket +import sys +import tempfile +import time +import types + +try: + import pbs +except ImportError, pbs_error: + pbs = None + +from .. import invoke +from . import Job, JobManager + + +SCRATCH_DIR = os.path.expanduser('~') + + +class PBSManager (JobManager): + """Manage asynchronous `Job` execution via :mod:`pbs`. + + >>> from math import sqrt + >>> m = PBSManager() + >>> group_A = [] + >>> for i in range(10): + ... group_A.append(m.async_invoke(Job(target=sqrt, args=[i]))) + >>> group_B = [] + >>> for i in range(10): + ... group_B.append(m.async_invoke(Job(target=sqrt, args=[i], + ... blocks_on=[j.id for j in group_A]))) + >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]]) + >>> print sorted(jobs.values(), key=lambda j: j.id) + [, , ] + >>> jobs = m.wait() + >>> print sorted(jobs.values(), key=lambda j: j.id) + ... # doctest: +NORMALIZE_WHITESPACE + [, , , , , , , , + , , , , , , , + , ] + >>> m.teardown() + """ + def __init__(self, workdir=None): + super(PBSManager, self).__init__() + self._cleanup = True + self._setup_pbs() + if workdir == None: + workdir = tempfile.mkdtemp( + prefix='tmp-PBSManager-', dir=SCRATCH_DIR) + self._temporary_workdir = True + else: + self._temporary_workdir = False + self._workdir = workdir + # example tracejob line: + # 10/19/2010 13:57:01 S dequeuing from batch, state COMPLETE + self._tracejob_re = re.compile( + '(\d*)/(\d*)/(\d*) (\d*):(\d*):(\d*) (\w*) (.*)') + self._tracejob_re_groups = [ + 'month', 'day', 'year', 'hour', 'minute', 'second', 'code', 'msg'] + self._tracejob_re_int_groups = [ + 'month', 'day', 'year', 'hour', 'minute', 'second'] + # example tracejob message: + # dequeuing from batch, state COMPLETE + self._tracejob_re_dequeue = re.compile( + 'dequeuing from (.*), state (.*)') + + def _setup_pbs(self): + if pbs == None: + raise pbs_error + self._pbs_server = pbs.pbs_default() + if not self._pbs_server: + raise Exception('No default server: %s' % pbs.error()) + self._pbs_connection = pbs.pbs_connect(self._pbs_server) + self._post_submit = False + self._post_submit_sleep = 3 + self._receive_poll_sleep = 3 + self._walltime = '3:00:00' + + def teardown(self): + if self._cleanup == True and self._temporary_workdir == True: + shutil.rmtree(self._workdir) + super(PBSManager, self).teardown() + + def _spawn_job(self, job): + job._pbs_paths = {} + for name,extension in [('script', 'py'), ('job', 'job.pkl'), + ('stdout', 'stdout'), ('stderr', 'stderr'), + ('data', 'pkl')]: + job._pbs_paths[name] = os.path.join( + self._workdir, '%d.%s' % (job.id, extension)) + with open(job._pbs_paths['script'], 'w') as f: + self._write_script(job, f) + with open(job._pbs_paths['job'], 'w') as f: + pickle.dump(job, f) + environ = ','.join(['%s=%s' % (k,v) for k,v + in self._environ().iteritems()]) + host = socket.getfqdn(socket.gethostname()) + attrib = self._make_attropl({ + pbs.ATTR_e: '%s:%s' % ( + host, job._pbs_paths['stderr']), + #pbs.ATTR_h: 'u', # user hold + pbs.ATTR_m: 'n', # don't send any mail + pbs.ATTR_N: 'pysawsim-%d' % id(job), # name the job + pbs.ATTR_o: '%s:%s' % ( + host, job._pbs_paths['stdout']), + pbs.ATTR_v: environ, + }) + job._pbs_id = pbs.pbs_submit( + self._pbs_connection, attrib, job._pbs_paths['script'], None, None) + if not job._pbs_id: + raise Exception('Error submitting job %s: %s' + % (job, pbs.error())) + self._post_submit = True + + def _write_script(self, job, stream): + """ + >>> from . import InvokeJob + >>> m = PBSManager() + >>> j = InvokeJob(id=7, target='echo "testing %s"' % m) + >>> j._pbs_paths = { + ... 'job': os.path.join(m._workdir, '%d.job.pkl' % j.id), + ... 'data': os.path.join(m._workdir, '%d.pkl' % j.id), + ... } + >>> m._write_script(j, sys.stdout) + ... # doctest: +ELLIPSIS, +REPORT_UDIFF + #!/usr/bin/env python + #PBS -l walltime:3:00:00 + from __future__ import with_statement + import os + import pickle + import sys + sys.path = [...] + os.chdir(os.environ['PBS_O_WORKDIR']) + with open('/.../tmp-PBSManager-.../7.job.pkl', 'r') as f: + job = pickle.load(f) + job.run() + with open('/.../tmp-PBSManager-.../7.pkl', 'w') as f: + pickle.dump(job.data, f) + sys.exit(job.status) + + >>> for id in [3, 4]: + ... m._jobs[id] = Job(id=id) + ... m._jobs[id]._pbs_id = '%d.big.iron.com' % id + >>> j = InvokeJob(id=8, target='echo "testing %s"' % m, + ... blocks_on=[1,2,3,4]) + >>> j._pbs_paths = { + ... 'job': os.path.join(m._workdir, '%d.job.pkl' % j.id), + ... 'data': os.path.join(m._workdir, '%d.pkl' % j.id), + ... } + >>> m._write_script(j, sys.stdout) + ... # doctest: +ELLIPSIS, +REPORT_UDIFF + #!/usr/bin/env python + #PBS -l walltime:3:00:00 + #PBS -w afterok:3.big.iron.com:4.big.iron.com + from __future__ import with_statement + import os + import pickle + import sys + sys.path = [...] + os.chdir(os.environ['PBS_O_WORKDIR']) + with open('/.../tmp-PBSManager-.../8.job.pkl', 'r') as f: + job = pickle.load(f) + job.run() + with open('/.../tmp-PBSManager-.../8.pkl', 'w') as f: + pickle.dump(job.data, f) + sys.exit(job.status) + >>> m.teardown() + """ + stream.write('#!/usr/bin/env python\n') + stream.write('#PBS -l walltime:%s\n' % self._walltime) + #stream.write('#PBS -l minwclimit:...') + if len(job.blocks_on) > 0: + blockers = [self._jobs.get(id, None) for id in job.blocks_on] + stream.write( + '#PBS -w afterok:%s\n' + % ':'.join([j._pbs_id for j in blockers if j != None])) + stream.write('from __future__ import with_statement\n') + stream.write('import os\n') + stream.write('import pickle\n') + stream.write('import sys\n') + stream.write('sys.path = [%s]\n' % ', '.join( + ["'%s'" % p for p in sys.path])) + stream.write("os.chdir(os.environ['PBS_O_WORKDIR'])\n") + stream.write("with open('%s', 'r') as f:\n" % job._pbs_paths['job']) + stream.write(' job = pickle.load(f)\n') + stream.write('job.run()\n') + stream.write("with open('%s', 'w') as f:\n" % job._pbs_paths['data']) + stream.write(' pickle.dump(job.data, f)\n') + stream.write('sys.exit(job.status)\n') + + def _environ(self): + """Mimic the environment you would get with `qsub -V`. + """ + environ = dict(os.environ) + for key in ['HOME', 'LANG', 'LOGNAME', 'PATH', 'MAIL', 'SHELL', 'TZ']: + if key in environ: + environ['PBS_O_%s' % key] = environ[key] + environ['PBS_SERVER'] = self._pbs_server + environ['PBS_O_WORKDIR'] = os.getcwd() + #environ['PBS_ARRAYID'] # not an array job + return environ + + def _receive_job(self): + if self._post_submit == True: + self._post_submit = False + time.sleep(self._post_submit_sleep) + while True: + for job in self._jobs.itervalues(): + if job.status != None: + continue + info = self._tracejob(job) + if info.get('state', None) == 'COMPLETE': + break + if info.get('state', None): + break + time.sleep(self._receive_poll_sleep) + job._pbs_info = info + job.status = int(info['Exit_status']) + with open(job._pbs_paths['data'], 'r') as f: + job.data = pickle.load(f) + job._pbs_stdout = open(job._pbs_paths['stdout'], 'r').read() + job._pbs_stderr = open(job._pbs_paths['stderr'], 'r').read() + if self._cleanup == True: + for name,path in job._pbs_paths.iteritems(): + os.remove(path) + del(job._pbs_paths) + return job + + def _tracejob(self, job): + s,out,err = invoke.invoke('tracejob "%s"' % job._pbs_id) + lines = [] + for line in out.splitlines(): + m = self._tracejob_re.match(line.strip()) + if m == None: + continue + data = {} + for group,value in zip(self._tracejob_re_groups, m.groups()): + if group in self._tracejob_re_int_groups: + data[group] = int(value) + else: + data[group] = value + lines.append(data) + + info = {'lines': lines} + for line in lines: + if line['msg'].startswith('Exit_status='): + # Exit_status=0 resources_used.cput=00:00:00... + fields = line['msg'].split() + for field in fields: + key,value = field.split('=', 1) + info[key] = value + elif line['msg'].startswith('dequeuing from'): + m = self._tracejob_re_dequeue.match(line['msg']) + info['queue'],info['state'] = m.groups() + return info + + @staticmethod + def _make_attr_structs (attributes, constructor): + """Create an array of structs (via the specified + `constructor`) which encode the specified `attributes`. + + Code adapted from: + + From: "Nate Woody" + Date: Wed Sep 6 09:01:23 MDT 2006 + Subject: [torqueusers] python pbs_submit + URL: http://www.clusterresources.com/pipermail/torqueusers/2006-September/004212.html + """ + # Determine the number of structs evinced by 'attributes'. + attr_count = len(attributes) + + attr = constructor(attr_count) + index = 0 + + # Pack the struct array. + for pair in attributes.iteritems(): + name, ds = pair + + # If 'ds' is a dictionary, then treat it as containing valued resources. + if type(ds) == types.DictType: + for resource_pair in ds.iteritems(): + resource, value = resource_pair + attr[index].name = name + attr[index].resource = resource + attr[index].value = str(value) + index += 1 + + else: + # If 'ds' is a scalar object, then wrap a list around it. + if type(ds) != types.ListType: + ds = [ ds ] + + attr[index].name = name + attr[index].value = ",".join(map(lambda x: str(x), ds)) + index += 1 + + return attr + + @staticmethod + def _make_attrl (attributes): + """Obtain an array of `attrl` structs which encode the + specified `attributes`. + + Code adapted from: + + From: "Nate Woody" + Date: Wed Sep 6 09:01:23 MDT 2006 + Subject: [torqueusers] python pbs_submit + URL: http://www.clusterresources.com/pipermail/torqueusers/2006-September/004212.html + """ + return PBSManager._make_attr_structs( + attributes, constructor=pbs.new_attrl) + + @staticmethod + def _make_attropl (attributes): + """Obtain an array of `attropl` structs which encode the specified + `attributes`. + + Code adapted from: + + From: "Nate Woody" + Date: Wed Sep 6 09:01:23 MDT 2006 + Subject: [torqueusers] python pbs_submit + URL: http://www.clusterresources.com/pipermail/torqueusers/2006-September/004212.html + """ + return PBSManager._make_attr_structs( + attributes, constructor=pbs.new_attropl) diff --git a/pysawsim/qwait b/pysawsim/qwait deleted file mode 100755 index 6a3427b..0000000 --- a/pysawsim/qwait +++ /dev/null @@ -1,15 +0,0 @@ -#!/bin/bash -# -# Wait for a job to finish execution - -JOBID="$1" - -while sleep 3; do - STAT=`qstat $JOBID 2>&1 1>/dev/null` - UNKOWN=`echo "$STAT" | grep "Unknown Job Id"` - if [ -n "$UNKOWN" ]; then - exit 0 # job has completed, since qstat doesn't recognize it. - fi -done - -exit 1