Adjust pysawsim.manager.pbs to skip most doctests if pbs mod is missing.
[sawsim.git] / pysawsim / manager / pbs.py
1 # Copyright (C) 2010  W. Trevor King <wking@drexel.edu>
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License
14 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
15 #
16 # The author may be contacted at <wking@drexel.edu> on the Internet, or
17 # write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
18 # Philadelphia PA 19104, USA.
19
20 """Functions for running external commands on other hosts.
21
22 pbs_ is a Python wrapper around the Tourque PBS server.
23
24 .. _pbs: https://subtrac.sara.nl/oss/pbs_python
25 """
26
27 from __future__ import absolute_import
28 from __future__ import with_statement
29
30 import os
31 import os.path
32 import pickle
33 import re
34 import shutil
35 import socket
36 import sys
37 import tempfile
38 import time
39 import types
40
41 try:
42     import pbs
43     _SKIP = ''
44 except ImportError, pbs_error:
45     pbs = None
46     _SKIP = '  # doctest: +SKIP'
47
48 from .. import invoke
49 from . import Job, JobManager
50
51
52 SCRATCH_DIR = os.path.expanduser('~')
53
54
55 class PBSManager (JobManager):
56     __doc__ = """Manage asynchronous `Job` execution via :mod:`pbs`.
57
58     >>> from math import sqrt
59     >>> m = PBSManager()%(skip)s
60     >>> group_A = []
61     >>> for i in range(10):
62     ...     group_A.append(m.async_invoke(Job(target=sqrt, args=[i])))%(skip)s
63     >>> group_B = []
64     >>> for i in range(10):
65     ...     group_B.append(m.async_invoke(Job(target=sqrt, args=[i],
66     ...                 blocks_on=[j.id for j in group_A])))%(skip)s
67     >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]])%(skip)s
68     >>> print sorted(jobs.values(), key=lambda j: j.id)%(skip)s
69     [<Job 5>, <Job 6>, <Job 7>]
70     >>> jobs = m.wait()%(skip)s
71     >>> print sorted(jobs.values(), key=lambda j: j.id)%(skip)s
72     ... # doctest: +NORMALIZE_WHITESPACE
73     [<Job 0>, <Job 1>, <Job 2>, <Job 3>, <Job 4>, <Job 8>, <Job 9>, <Job 10>,
74      <Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
75      <Job 18>, <Job 19>]
76     >>> m.teardown()%(skip)s
77     """ % {'skip': _SKIP}
78     def __init__(self, workdir=None):
79         super(PBSManager, self).__init__()
80         self._cleanup = True
81         self._setup_pbs()
82         if workdir == None:
83             workdir = tempfile.mkdtemp(
84                 prefix='tmp-PBSManager-', dir=SCRATCH_DIR)
85             self._temporary_workdir = True
86         else:
87             self._temporary_workdir = False
88         self._workdir = workdir
89         # example tracejob line:
90         #   10/19/2010 13:57:01  S    dequeuing from batch, state COMPLETE
91         self._tracejob_re = re.compile(
92             '(\d*)/(\d*)/(\d*) (\d*):(\d*):(\d*)  (\w*)    (.*)')
93         self._tracejob_re_groups = [
94             'month', 'day', 'year', 'hour', 'minute', 'second', 'code', 'msg']
95         self._tracejob_re_int_groups = [
96             'month', 'day', 'year', 'hour', 'minute', 'second']
97         # example tracejob message:
98         #   dequeuing from batch, state COMPLETE
99         self._tracejob_re_dequeue = re.compile(
100             'dequeuing from (.*), state (.*)')
101
102     def _setup_pbs(self):
103         if pbs == None:
104             raise pbs_error
105         self._pbs_server = pbs.pbs_default()
106         if not self._pbs_server:
107             raise Exception('No default server: %s' % pbs.error())
108         self._pbs_connection = pbs.pbs_connect(self._pbs_server)
109         self._post_submit = False
110         self._post_submit_sleep = 3
111         self._receive_poll_sleep = 3
112         self._walltime = '3:00:00'
113
114     def teardown(self):
115         if self._cleanup == True and self._temporary_workdir == True:
116             shutil.rmtree(self._workdir)
117         super(PBSManager, self).teardown()
118
119     def _spawn_job(self, job):
120         job._pbs_paths = {}
121         for name,extension in [('script', 'py'), ('job', 'job.pkl'),
122                                ('stdout', 'stdout'), ('stderr', 'stderr'),
123                                ('data', 'pkl')]:
124             job._pbs_paths[name] = os.path.join(
125                 self._workdir, '%d.%s' % (job.id, extension))
126         with open(job._pbs_paths['script'], 'w') as f:
127             self._write_script(job, f)
128         with open(job._pbs_paths['job'], 'w') as f:
129             pickle.dump(job, f)
130         environ = ','.join(['%s=%s' % (k,v) for k,v
131                             in self._environ().iteritems()])
132         host = socket.getfqdn(socket.gethostname())
133         attrib = self._make_attropl({
134                 pbs.ATTR_e: '%s:%s' % (
135                     host, job._pbs_paths['stderr']),
136                 #pbs.ATTR_h: 'u',   # user hold
137                 pbs.ATTR_m: 'n',   # don't send any mail
138                 pbs.ATTR_N: 'pysawsim-%d' % id(job),  # name the job
139                 pbs.ATTR_o: '%s:%s' % (
140                     host, job._pbs_paths['stdout']),
141                 pbs.ATTR_v: environ,
142                 })
143         job._pbs_id = pbs.pbs_submit(
144             self._pbs_connection, attrib, job._pbs_paths['script'], None, None)
145         if not job._pbs_id:
146             raise Exception('Error submitting job %s: %s'
147                             % (job, pbs.error()))
148         self._post_submit = True
149
150     def _write_script(self, job, stream):
151         stream.write('#!/usr/bin/env python\n')
152         stream.write('#PBS -l walltime:%s\n' % self._walltime)
153         #stream.write('#PBS -l minwclimit:...')
154         if len(job.blocks_on) > 0:
155             blockers = [self._jobs.get(id, None) for id in job.blocks_on]
156             stream.write(
157                 '#PBS -w afterok:%s\n'
158                 % ':'.join([j._pbs_id for j in blockers if j != None]))
159         stream.write('from __future__ import with_statement\n')
160         stream.write('import os\n')
161         stream.write('import pickle\n')
162         stream.write('import sys\n')
163         stream.write('sys.path = [%s]\n' % ', '.join(
164                 ["'%s'" % p for p in sys.path]))
165         stream.write("os.chdir(os.environ['PBS_O_WORKDIR'])\n")
166         stream.write("with open('%s', 'r') as f:\n" % job._pbs_paths['job'])
167         stream.write('    job = pickle.load(f)\n')
168         stream.write('job.run()\n')
169         stream.write("with open('%s', 'w') as f:\n" % job._pbs_paths['data'])
170         stream.write('    pickle.dump(job.data, f)\n')
171         stream.write('sys.exit(job.status)\n')
172     _write_script.__doc__ = """
173         >>> from . import InvokeJob
174         >>> m = PBSManager()%(skip)s
175         >>> j = InvokeJob(id=7, target='echo "testing %%s"' %% m)%(skip)s
176         >>> j._pbs_paths = {
177         ...     'job': os.path.join(m._workdir, '%%d.job.pkl' %% j.id),
178         ...     'data': os.path.join(m._workdir, '%%d.pkl' %% j.id),
179         ...     }%(skip)s
180         >>> m._write_script(j, sys.stdout)%(skip)s
181         ... # doctest: +ELLIPSIS, +REPORT_UDIFF
182         #!/usr/bin/env python
183         #PBS -l walltime:3:00:00
184         from __future__ import with_statement
185         import os
186         import pickle
187         import sys
188         sys.path = [...]
189         os.chdir(os.environ['PBS_O_WORKDIR'])
190         with open('/.../tmp-PBSManager-.../7.job.pkl', 'r') as f:
191             job = pickle.load(f)
192         job.run()
193         with open('/.../tmp-PBSManager-.../7.pkl', 'w') as f:
194             pickle.dump(job.data, f)
195         sys.exit(job.status)
196
197         >>> for id in [3, 4]:%(skip)s
198         ...     m._jobs[id] = Job(id=id)
199         ...     m._jobs[id]._pbs_id = '%%d.big.iron.com' %% id
200         >>> j = InvokeJob(id=8, target='echo "testing %%s"' %% m,
201         ...               blocks_on=[1,2,3,4])%(skip)s
202         >>> j._pbs_paths = {
203         ...     'job': os.path.join(m._workdir, '%%d.job.pkl' %% j.id),
204         ...     'data': os.path.join(m._workdir, '%%d.pkl' %% j.id),
205         ...     }%(skip)s
206         >>> m._write_script(j, sys.stdout)%(skip)s
207         ... # doctest: +ELLIPSIS, +REPORT_UDIFF
208         #!/usr/bin/env python
209         #PBS -l walltime:3:00:00
210         #PBS -w afterok:3.big.iron.com:4.big.iron.com
211         from __future__ import with_statement
212         import os
213         import pickle
214         import sys
215         sys.path = [...]
216         os.chdir(os.environ['PBS_O_WORKDIR'])
217         with open('/.../tmp-PBSManager-.../8.job.pkl', 'r') as f:
218             job = pickle.load(f)
219         job.run()
220         with open('/.../tmp-PBSManager-.../8.pkl', 'w') as f:
221             pickle.dump(job.data, f)
222         sys.exit(job.status)
223         >>> m.teardown()%(skip)s
224         """ % {'skip': _SKIP}
225
226     def _environ(self):
227         """Mimic the environment you would get with `qsub -V`.
228         """
229         environ = dict(os.environ)
230         for key in ['HOME', 'LANG', 'LOGNAME', 'PATH', 'MAIL', 'SHELL', 'TZ']:
231             if key in environ:
232                 environ['PBS_O_%s' % key] = environ[key]
233         environ['PBS_SERVER'] = self._pbs_server
234         environ['PBS_O_WORKDIR'] = os.getcwd()
235         #environ['PBS_ARRAYID'] # not an array job
236         return environ
237
238     def _receive_job(self):
239         if self._post_submit == True:
240             self._post_submit = False
241             time.sleep(self._post_submit_sleep)
242         while True:
243             for job in self._jobs.itervalues():
244                 if job.status != None:
245                     continue
246                 info = self._tracejob(job)
247                 if info.get('state', None) == 'COMPLETE':
248                     break
249             if info.get('state', None):
250                 break
251             time.sleep(self._receive_poll_sleep)
252         job._pbs_info = info
253         job.status = int(info['Exit_status'])
254         with open(job._pbs_paths['data'], 'r') as f:
255             job.data = pickle.load(f)
256         job._pbs_stdout = open(job._pbs_paths['stdout'], 'r').read()
257         job._pbs_stderr = open(job._pbs_paths['stderr'], 'r').read()
258         if self._cleanup == True:
259             for name,path in job._pbs_paths.iteritems():
260                 os.remove(path)
261         del(job._pbs_paths)
262         return job
263
264     def _tracejob(self, job):
265         s,out,err = invoke.invoke('tracejob "%s"' % job._pbs_id)
266         lines = []
267         for line in out.splitlines():
268             m = self._tracejob_re.match(line.strip())
269             if m == None:
270                 continue
271             data = {}
272             for group,value in zip(self._tracejob_re_groups, m.groups()):
273                 if group in self._tracejob_re_int_groups:
274                     data[group] = int(value)
275                 else:
276                     data[group] = value
277             lines.append(data)
278
279         info = {'lines': lines}
280         for line in lines:
281             if line['msg'].startswith('Exit_status='):
282                 # Exit_status=0 resources_used.cput=00:00:00...
283                 fields = line['msg'].split()
284                 for field in fields:
285                     key,value = field.split('=', 1)
286                     info[key] = value
287             elif line['msg'].startswith('dequeuing from'):
288                 m = self._tracejob_re_dequeue.match(line['msg'])
289                 info['queue'],info['state'] = m.groups()
290         return info
291
292     @staticmethod
293     def _make_attr_structs (attributes, constructor):
294         """Create an array of structs (via the specified
295         `constructor`) which encode the specified `attributes`.
296
297         Code adapted from:
298
299         From: "Nate Woody" <nathaniel.x.woody@gsk.com>
300         Date: Wed Sep 6 09:01:23 MDT 2006
301         Subject: [torqueusers] python pbs_submit
302         URL: http://www.clusterresources.com/pipermail/torqueusers/2006-September/004212.html
303         """
304         # Determine the number of structs evinced by 'attributes'.
305         attr_count = len(attributes)
306
307         attr  = constructor(attr_count)
308         index = 0
309
310         # Pack the struct array.
311         for pair in attributes.iteritems():
312             name, ds = pair
313
314             # If 'ds' is a dictionary, then treat it as containing valued resources.
315             if type(ds) == types.DictType:
316                 for resource_pair in ds.iteritems():
317                     resource, value       = resource_pair
318                     attr[index].name      = name
319                     attr[index].resource  = resource
320                     attr[index].value     = str(value)
321                     index                += 1
322
323             else:
324                 # If 'ds' is a scalar object, then wrap a list around it.
325                 if type(ds) != types.ListType:
326                     ds = [ ds ]
327
328                 attr[index].name  = name
329                 attr[index].value = ",".join(map(lambda x: str(x), ds))
330                 index += 1
331
332         return attr
333
334     @staticmethod
335     def _make_attrl (attributes):
336         """Obtain an array of `attrl` structs which encode the
337         specified `attributes`.
338
339         Code adapted from:
340
341         From: "Nate Woody" <nathaniel.x.woody@gsk.com>
342         Date: Wed Sep 6 09:01:23 MDT 2006
343         Subject: [torqueusers] python pbs_submit
344         URL: http://www.clusterresources.com/pipermail/torqueusers/2006-September/004212.html
345         """
346         return PBSManager._make_attr_structs(
347             attributes, constructor=pbs.new_attrl)
348
349     @staticmethod
350     def _make_attropl (attributes):
351         """Obtain an array of `attropl` structs which encode the specified
352         `attributes`.
353
354         Code adapted from:
355
356         From: "Nate Woody" <nathaniel.x.woody@gsk.com>
357         Date: Wed Sep 6 09:01:23 MDT 2006
358         Subject: [torqueusers] python pbs_submit
359         URL: http://www.clusterresources.com/pipermail/torqueusers/2006-September/004212.html
360         """
361         return PBSManager._make_attr_structs(
362             attributes, constructor=pbs.new_attropl)