Move 'qwait' to pysawsim.manager.pbs and add the rest of PBSManager.
authorW. Trevor King <wking@drexel.edu>
Tue, 19 Oct 2010 23:02:19 +0000 (19:02 -0400)
committerW. Trevor King <wking@drexel.edu>
Tue, 19 Oct 2010 23:02:19 +0000 (19:02 -0400)
pysawsim/manager/pbs.py [new file with mode: 0755]
pysawsim/qwait [deleted file]

diff --git a/pysawsim/manager/pbs.py b/pysawsim/manager/pbs.py
new file mode 100755 (executable)
index 0000000..f0487c1
--- /dev/null
@@ -0,0 +1,360 @@
+# Copyright (C) 2010  W. Trevor King <wking@drexel.edu>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# The author may be contacted at <wking@drexel.edu> on the Internet, or
+# write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
+# Philadelphia PA 19104, USA.
+
+"""Functions for running external commands on other hosts.
+
+pbs_ is a Python wrapper around the Tourque PBS server.
+
+.. _pbs: https://subtrac.sara.nl/oss/pbs_python
+"""
+
+from __future__ import absolute_import
+from __future__ import with_statement
+
+import os
+import os.path
+import pickle
+import re
+import shutil
+import socket
+import sys
+import tempfile
+import time
+import types
+
+try:
+    import pbs
+except ImportError, pbs_error:
+    pbs = None
+
+from .. import invoke
+from . import Job, JobManager
+
+
+SCRATCH_DIR = os.path.expanduser('~')
+
+
+class PBSManager (JobManager):
+    """Manage asynchronous `Job` execution via :mod:`pbs`.
+
+    >>> from math import sqrt
+    >>> m = PBSManager()
+    >>> group_A = []
+    >>> for i in range(10):
+    ...     group_A.append(m.async_invoke(Job(target=sqrt, args=[i])))
+    >>> group_B = []
+    >>> for i in range(10):
+    ...     group_B.append(m.async_invoke(Job(target=sqrt, args=[i],
+    ...                 blocks_on=[j.id for j in group_A])))
+    >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]])
+    >>> print sorted(jobs.values(), key=lambda j: j.id)
+    [<Job 5>, <Job 6>, <Job 7>]
+    >>> jobs = m.wait()
+    >>> print sorted(jobs.values(), key=lambda j: j.id)
+    ... # doctest: +NORMALIZE_WHITESPACE
+    [<Job 0>, <Job 1>, <Job 2>, <Job 3>, <Job 4>, <Job 8>, <Job 9>, <Job 10>,
+     <Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
+     <Job 18>, <Job 19>]
+    >>> m.teardown()
+    """
+    def __init__(self, workdir=None):
+        super(PBSManager, self).__init__()
+        self._cleanup = True
+        self._setup_pbs()
+        if workdir == None:
+            workdir = tempfile.mkdtemp(
+                prefix='tmp-PBSManager-', dir=SCRATCH_DIR)
+            self._temporary_workdir = True
+        else:
+            self._temporary_workdir = False
+        self._workdir = workdir
+        # example tracejob line:
+        #   10/19/2010 13:57:01  S    dequeuing from batch, state COMPLETE
+        self._tracejob_re = re.compile(
+            '(\d*)/(\d*)/(\d*) (\d*):(\d*):(\d*)  (\w*)    (.*)')
+        self._tracejob_re_groups = [
+            'month', 'day', 'year', 'hour', 'minute', 'second', 'code', 'msg']
+        self._tracejob_re_int_groups = [
+            'month', 'day', 'year', 'hour', 'minute', 'second']
+        # example tracejob message:
+        #   dequeuing from batch, state COMPLETE
+        self._tracejob_re_dequeue = re.compile(
+            'dequeuing from (.*), state (.*)')
+
+    def _setup_pbs(self):
+        if pbs == None:
+            raise pbs_error
+        self._pbs_server = pbs.pbs_default()
+        if not self._pbs_server:
+            raise Exception('No default server: %s' % pbs.error())
+        self._pbs_connection = pbs.pbs_connect(self._pbs_server)
+        self._post_submit = False
+        self._post_submit_sleep = 3
+        self._receive_poll_sleep = 3
+        self._walltime = '3:00:00'
+
+    def teardown(self):
+        if self._cleanup == True and self._temporary_workdir == True:
+            shutil.rmtree(self._workdir)
+        super(PBSManager, self).teardown()
+
+    def _spawn_job(self, job):
+        job._pbs_paths = {}
+        for name,extension in [('script', 'py'), ('job', 'job.pkl'),
+                               ('stdout', 'stdout'), ('stderr', 'stderr'),
+                               ('data', 'pkl')]:
+            job._pbs_paths[name] = os.path.join(
+                self._workdir, '%d.%s' % (job.id, extension))
+        with open(job._pbs_paths['script'], 'w') as f:
+            self._write_script(job, f)
+        with open(job._pbs_paths['job'], 'w') as f:
+            pickle.dump(job, f)
+        environ = ','.join(['%s=%s' % (k,v) for k,v
+                            in self._environ().iteritems()])
+        host = socket.getfqdn(socket.gethostname())
+        attrib = self._make_attropl({
+                pbs.ATTR_e: '%s:%s' % (
+                    host, job._pbs_paths['stderr']),
+                #pbs.ATTR_h: 'u',   # user hold
+                pbs.ATTR_m: 'n',   # don't send any mail
+                pbs.ATTR_N: 'pysawsim-%d' % id(job),  # name the job
+                pbs.ATTR_o: '%s:%s' % (
+                    host, job._pbs_paths['stdout']),
+                pbs.ATTR_v: environ,
+                })
+        job._pbs_id = pbs.pbs_submit(
+            self._pbs_connection, attrib, job._pbs_paths['script'], None, None)
+        if not job._pbs_id:
+            raise Exception('Error submitting job %s: %s'
+                            % (job, pbs.error()))
+        self._post_submit = True
+
+    def _write_script(self, job, stream):
+        """
+        >>> from . import InvokeJob
+        >>> m = PBSManager()
+        >>> j = InvokeJob(id=7, target='echo "testing %s"' % m)
+        >>> j._pbs_paths = {
+        ...     'job': os.path.join(m._workdir, '%d.job.pkl' % j.id),
+        ...     'data': os.path.join(m._workdir, '%d.pkl' % j.id),
+        ...     }
+        >>> m._write_script(j, sys.stdout)
+        ... # doctest: +ELLIPSIS, +REPORT_UDIFF
+        #!/usr/bin/env python
+        #PBS -l walltime:3:00:00
+        from __future__ import with_statement
+        import os
+        import pickle
+        import sys
+        sys.path = [...]
+        os.chdir(os.environ['PBS_O_WORKDIR'])
+        with open('/.../tmp-PBSManager-.../7.job.pkl', 'r') as f:
+            job = pickle.load(f)
+        job.run()
+        with open('/.../tmp-PBSManager-.../7.pkl', 'w') as f:
+            pickle.dump(job.data, f)
+        sys.exit(job.status)
+
+        >>> for id in [3, 4]:
+        ...     m._jobs[id] = Job(id=id)
+        ...     m._jobs[id]._pbs_id = '%d.big.iron.com' % id
+        >>> j = InvokeJob(id=8, target='echo "testing %s"' % m,
+        ...               blocks_on=[1,2,3,4])
+        >>> j._pbs_paths = {
+        ...     'job': os.path.join(m._workdir, '%d.job.pkl' % j.id),
+        ...     'data': os.path.join(m._workdir, '%d.pkl' % j.id),
+        ...     }
+        >>> m._write_script(j, sys.stdout)
+        ... # doctest: +ELLIPSIS, +REPORT_UDIFF
+        #!/usr/bin/env python
+        #PBS -l walltime:3:00:00
+        #PBS -w afterok:3.big.iron.com:4.big.iron.com
+        from __future__ import with_statement
+        import os
+        import pickle
+        import sys
+        sys.path = [...]
+        os.chdir(os.environ['PBS_O_WORKDIR'])
+        with open('/.../tmp-PBSManager-.../8.job.pkl', 'r') as f:
+            job = pickle.load(f)
+        job.run()
+        with open('/.../tmp-PBSManager-.../8.pkl', 'w') as f:
+            pickle.dump(job.data, f)
+        sys.exit(job.status)
+        >>> m.teardown()
+        """
+        stream.write('#!/usr/bin/env python\n')
+        stream.write('#PBS -l walltime:%s\n' % self._walltime)
+        #stream.write('#PBS -l minwclimit:...')
+        if len(job.blocks_on) > 0:
+            blockers = [self._jobs.get(id, None) for id in job.blocks_on]
+            stream.write(
+                '#PBS -w afterok:%s\n'
+                % ':'.join([j._pbs_id for j in blockers if j != None]))
+        stream.write('from __future__ import with_statement\n')
+        stream.write('import os\n')
+        stream.write('import pickle\n')
+        stream.write('import sys\n')
+        stream.write('sys.path = [%s]\n' % ', '.join(
+                ["'%s'" % p for p in sys.path]))
+        stream.write("os.chdir(os.environ['PBS_O_WORKDIR'])\n")
+        stream.write("with open('%s', 'r') as f:\n" % job._pbs_paths['job'])
+        stream.write('    job = pickle.load(f)\n')
+        stream.write('job.run()\n')
+        stream.write("with open('%s', 'w') as f:\n" % job._pbs_paths['data'])
+        stream.write('    pickle.dump(job.data, f)\n')
+        stream.write('sys.exit(job.status)\n')
+
+    def _environ(self):
+        """Mimic the environment you would get with `qsub -V`.
+        """
+        environ = dict(os.environ)
+        for key in ['HOME', 'LANG', 'LOGNAME', 'PATH', 'MAIL', 'SHELL', 'TZ']:
+            if key in environ:
+                environ['PBS_O_%s' % key] = environ[key]
+        environ['PBS_SERVER'] = self._pbs_server
+        environ['PBS_O_WORKDIR'] = os.getcwd()
+        #environ['PBS_ARRAYID'] # not an array job
+        return environ
+
+    def _receive_job(self):
+        if self._post_submit == True:
+            self._post_submit = False
+            time.sleep(self._post_submit_sleep)
+        while True:
+            for job in self._jobs.itervalues():
+                if job.status != None:
+                    continue
+                info = self._tracejob(job)
+                if info.get('state', None) == 'COMPLETE':
+                    break
+            if info.get('state', None):
+                break
+            time.sleep(self._receive_poll_sleep)
+        job._pbs_info = info
+        job.status = int(info['Exit_status'])
+        with open(job._pbs_paths['data'], 'r') as f:
+            job.data = pickle.load(f)
+        job._pbs_stdout = open(job._pbs_paths['stdout'], 'r').read()
+        job._pbs_stderr = open(job._pbs_paths['stderr'], 'r').read()
+        if self._cleanup == True:
+            for name,path in job._pbs_paths.iteritems():
+                os.remove(path)
+        del(job._pbs_paths)
+        return job
+
+    def _tracejob(self, job):
+        s,out,err = invoke.invoke('tracejob "%s"' % job._pbs_id)
+        lines = []
+        for line in out.splitlines():
+            m = self._tracejob_re.match(line.strip())
+            if m == None:
+                continue
+            data = {}
+            for group,value in zip(self._tracejob_re_groups, m.groups()):
+                if group in self._tracejob_re_int_groups:
+                    data[group] = int(value)
+                else:
+                    data[group] = value
+            lines.append(data)
+
+        info = {'lines': lines}
+        for line in lines:
+            if line['msg'].startswith('Exit_status='):
+                # Exit_status=0 resources_used.cput=00:00:00...
+                fields = line['msg'].split()
+                for field in fields:
+                    key,value = field.split('=', 1)
+                    info[key] = value
+            elif line['msg'].startswith('dequeuing from'):
+                m = self._tracejob_re_dequeue.match(line['msg'])
+                info['queue'],info['state'] = m.groups()
+        return info
+
+    @staticmethod
+    def _make_attr_structs (attributes, constructor):
+        """Create an array of structs (via the specified
+        `constructor`) which encode the specified `attributes`.
+
+        Code adapted from:
+
+        From: "Nate Woody" <nathaniel.x.woody@gsk.com>
+        Date: Wed Sep 6 09:01:23 MDT 2006
+        Subject: [torqueusers] python pbs_submit
+        URL: http://www.clusterresources.com/pipermail/torqueusers/2006-September/004212.html
+        """
+        # Determine the number of structs evinced by 'attributes'.
+        attr_count = len(attributes)
+
+        attr  = constructor(attr_count)
+        index = 0
+
+        # Pack the struct array.
+        for pair in attributes.iteritems():
+            name, ds = pair
+
+            # If 'ds' is a dictionary, then treat it as containing valued resources.
+            if type(ds) == types.DictType:
+                for resource_pair in ds.iteritems():
+                    resource, value       = resource_pair
+                    attr[index].name      = name
+                    attr[index].resource  = resource
+                    attr[index].value     = str(value)
+                    index                += 1
+
+            else:
+                # If 'ds' is a scalar object, then wrap a list around it.
+                if type(ds) != types.ListType:
+                    ds = [ ds ]
+
+                attr[index].name  = name
+                attr[index].value = ",".join(map(lambda x: str(x), ds))
+                index += 1
+
+        return attr
+
+    @staticmethod
+    def _make_attrl (attributes):
+        """Obtain an array of `attrl` structs which encode the
+        specified `attributes`.
+
+        Code adapted from:
+
+        From: "Nate Woody" <nathaniel.x.woody@gsk.com>
+        Date: Wed Sep 6 09:01:23 MDT 2006
+        Subject: [torqueusers] python pbs_submit
+        URL: http://www.clusterresources.com/pipermail/torqueusers/2006-September/004212.html
+        """
+        return PBSManager._make_attr_structs(
+            attributes, constructor=pbs.new_attrl)
+
+    @staticmethod
+    def _make_attropl (attributes):
+        """Obtain an array of `attropl` structs which encode the specified
+        `attributes`.
+
+        Code adapted from:
+
+        From: "Nate Woody" <nathaniel.x.woody@gsk.com>
+        Date: Wed Sep 6 09:01:23 MDT 2006
+        Subject: [torqueusers] python pbs_submit
+        URL: http://www.clusterresources.com/pipermail/torqueusers/2006-September/004212.html
+        """
+        return PBSManager._make_attr_structs(
+            attributes, constructor=pbs.new_attropl)
diff --git a/pysawsim/qwait b/pysawsim/qwait
deleted file mode 100755 (executable)
index 6a3427b..0000000
+++ /dev/null
@@ -1,15 +0,0 @@
-#!/bin/bash
-#
-# Wait for a job to finish execution
-
-JOBID="$1"
-
-while sleep 3; do
-  STAT=`qstat $JOBID 2>&1 1>/dev/null`
-  UNKOWN=`echo "$STAT" | grep "Unknown Job Id"`
-  if [ -n "$UNKOWN" ]; then
-    exit 0 # job has completed, since qstat doesn't recognize it.
-  fi
-done
-
-exit 1