efbfbf: upgrade to Bugs Everywhere Directory v1.5
[sawsim.git] / pysawsim / manager / pbs.py
1 # Copyright (C) 2010  W. Trevor King <wking@drexel.edu>
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License
14 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
15 #
16 # The author may be contacted at <wking@drexel.edu> on the Internet, or
17 # write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
18 # Philadelphia PA 19104, USA.
19
20 """Functions for running external commands on other hosts.
21
22 pbs_ is a Python wrapper around the Tourque PBS server.
23
24 .. _pbs: https://subtrac.sara.nl/oss/pbs_python
25 """
26
27 from __future__ import absolute_import
28 from __future__ import with_statement
29
30 import os
31 import os.path
32 import pickle
33 import re
34 import shutil
35 import socket
36 import sys
37 import tempfile
38 import time
39 import types
40
41 try:
42     import pbs
43     _ENABLED = True
44     _DISABLING_ERROR = None
45     _SKIP = ''
46 except ImportError, _DISABLING_ERROR:
47     _ENABLED = False
48     _SKIP = '  # doctest: +SKIP'
49
50 from .. import invoke
51 from . import Job, JobManager
52
53
54 SCRATCH_DIR = os.path.expanduser('~')
55
56
57 class PBSManager (JobManager):
58     __doc__ = """Manage asynchronous `Job` execution via :mod:`pbs`.
59
60     >>> from math import sqrt
61     >>> m = PBSManager()%(skip)s
62     >>> group_A = []
63     >>> for i in range(10):
64     ...     group_A.append(m.async_invoke(Job(target=sqrt, args=[i])))%(skip)s
65     >>> group_B = []
66     >>> for i in range(10):
67     ...     group_B.append(m.async_invoke(Job(target=sqrt, args=[i],
68     ...                 blocks_on=[j.id for j in group_A])))%(skip)s
69     >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]])%(skip)s
70     >>> print sorted(jobs.values(), key=lambda j: j.id)%(skip)s
71     [<Job 5>, <Job 6>, <Job 7>]
72     >>> jobs = m.wait()%(skip)s
73     >>> print sorted(jobs.values(), key=lambda j: j.id)%(skip)s
74     ... # doctest: +NORMALIZE_WHITESPACE
75     [<Job 0>, <Job 1>, <Job 2>, <Job 3>, <Job 4>, <Job 8>, <Job 9>, <Job 10>,
76      <Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
77      <Job 18>, <Job 19>]
78     >>> m.teardown()%(skip)s
79     """ % {'skip': _SKIP}
80     def __init__(self, workdir=None):
81         super(PBSManager, self).__init__()
82         self._cleanup = True
83         self._setup_pbs()
84         if workdir == None:
85             workdir = tempfile.mkdtemp(
86                 prefix='tmp-PBSManager-', dir=SCRATCH_DIR)
87             self._temporary_workdir = True
88         else:
89             self._temporary_workdir = False
90         self._workdir = workdir
91         # example tracejob line:
92         #   10/19/2010 13:57:01  S    dequeuing from batch, state COMPLETE
93         self._tracejob_re = re.compile(
94             '(\d*)/(\d*)/(\d*) (\d*):(\d*):(\d*)  (\w*)    (.*)')
95         self._tracejob_re_groups = [
96             'month', 'day', 'year', 'hour', 'minute', 'second', 'code', 'msg']
97         self._tracejob_re_int_groups = [
98             'month', 'day', 'year', 'hour', 'minute', 'second']
99         # example tracejob message:
100         #   dequeuing from batch, state COMPLETE
101         self._tracejob_re_dequeue = re.compile(
102             'dequeuing from (.*), state (.*)')
103
104     def _setup_pbs(self):
105         if _ENABLED == False:
106             raise _DISABLING_ERROR
107         self._pbs_server = pbs.pbs_default()
108         if not self._pbs_server:
109             raise Exception('No default server: %s' % pbs.error())
110         self._pbs_connection = pbs.pbs_connect(self._pbs_server)
111         self._post_submit = False
112         self._post_submit_sleep = 3
113         self._receive_poll_sleep = 3
114         self._walltime = '3:00:00'
115
116     def teardown(self):
117         if self._cleanup == True and self._temporary_workdir == True:
118             shutil.rmtree(self._workdir)
119         super(PBSManager, self).teardown()
120
121     def _spawn_job(self, job):
122         job._pbs_paths = {}
123         for name,extension in [('script', 'py'), ('job', 'job.pkl'),
124                                ('stdout', 'stdout'), ('stderr', 'stderr'),
125                                ('data', 'pkl')]:
126             job._pbs_paths[name] = os.path.join(
127                 self._workdir, '%d.%s' % (job.id, extension))
128         with open(job._pbs_paths['script'], 'w') as f:
129             self._write_script(job, f)
130         with open(job._pbs_paths['job'], 'w') as f:
131             pickle.dump(job, f)
132         environ = ','.join(['%s=%s' % (k,v) for k,v
133                             in self._environ().iteritems()])
134         host = socket.getfqdn(socket.gethostname())
135         attrib = self._make_attropl({
136                 pbs.ATTR_e: '%s:%s' % (
137                     host, job._pbs_paths['stderr']),
138                 #pbs.ATTR_h: 'u',   # user hold
139                 pbs.ATTR_m: 'n',   # don't send any mail
140                 pbs.ATTR_N: 'pysawsim-%d' % id(job),  # name the job
141                 pbs.ATTR_o: '%s:%s' % (
142                     host, job._pbs_paths['stdout']),
143                 pbs.ATTR_v: environ,
144                 })
145         job._pbs_id = pbs.pbs_submit(
146             self._pbs_connection, attrib, job._pbs_paths['script'], None, None)
147         if not job._pbs_id:
148             raise Exception('Error submitting job %s: %s'
149                             % (job, pbs.error()))
150         self._post_submit = True
151
152     def _write_script(self, job, stream):
153         stream.write('#!/usr/bin/env python\n')
154         stream.write('#PBS -l walltime:%s\n' % self._walltime)
155         #stream.write('#PBS -l minwclimit:...')
156         if len(job.blocks_on) > 0:
157             blockers = [self._jobs.get(id, None) for id in job.blocks_on]
158             stream.write(
159                 '#PBS -w afterok:%s\n'
160                 % ':'.join([j._pbs_id for j in blockers if j != None]))
161         stream.write('from __future__ import with_statement\n')
162         stream.write('import os\n')
163         stream.write('import pickle\n')
164         stream.write('import sys\n')
165         stream.write('sys.path = [%s]\n' % ', '.join(
166                 ["'%s'" % p for p in sys.path]))
167         stream.write("os.chdir(os.environ['PBS_O_WORKDIR'])\n")
168         stream.write("with open('%s', 'r') as f:\n" % job._pbs_paths['job'])
169         stream.write('    job = pickle.load(f)\n')
170         stream.write('job.run()\n')
171         stream.write("with open('%s', 'w') as f:\n" % job._pbs_paths['data'])
172         stream.write('    pickle.dump(job.data, f)\n')
173         stream.write('sys.exit(job.status)\n')
174     _write_script.__doc__ = """
175         >>> from . import InvokeJob
176         >>> m = PBSManager()%(skip)s
177         >>> j = InvokeJob(id=7, target='echo "testing %%s"' %% m)%(skip)s
178         >>> j._pbs_paths = {
179         ...     'job': os.path.join(m._workdir, '%%d.job.pkl' %% j.id),
180         ...     'data': os.path.join(m._workdir, '%%d.pkl' %% j.id),
181         ...     }%(skip)s
182         >>> m._write_script(j, sys.stdout)%(skip)s
183         ... # doctest: +ELLIPSIS, +REPORT_UDIFF
184         #!/usr/bin/env python
185         #PBS -l walltime:3:00:00
186         from __future__ import with_statement
187         import os
188         import pickle
189         import sys
190         sys.path = [...]
191         os.chdir(os.environ['PBS_O_WORKDIR'])
192         with open('/.../tmp-PBSManager-.../7.job.pkl', 'r') as f:
193             job = pickle.load(f)
194         job.run()
195         with open('/.../tmp-PBSManager-.../7.pkl', 'w') as f:
196             pickle.dump(job.data, f)
197         sys.exit(job.status)
198
199         >>> for id in [3, 4]:%(skip)s
200         ...     m._jobs[id] = Job(id=id)
201         ...     m._jobs[id]._pbs_id = '%%d.big.iron.com' %% id
202         >>> j = InvokeJob(id=8, target='echo "testing %%s"' %% m,
203         ...               blocks_on=[1,2,3,4])%(skip)s
204         >>> j._pbs_paths = {
205         ...     'job': os.path.join(m._workdir, '%%d.job.pkl' %% j.id),
206         ...     'data': os.path.join(m._workdir, '%%d.pkl' %% j.id),
207         ...     }%(skip)s
208         >>> m._write_script(j, sys.stdout)%(skip)s
209         ... # doctest: +ELLIPSIS, +REPORT_UDIFF
210         #!/usr/bin/env python
211         #PBS -l walltime:3:00:00
212         #PBS -w afterok:3.big.iron.com:4.big.iron.com
213         from __future__ import with_statement
214         import os
215         import pickle
216         import sys
217         sys.path = [...]
218         os.chdir(os.environ['PBS_O_WORKDIR'])
219         with open('/.../tmp-PBSManager-.../8.job.pkl', 'r') as f:
220             job = pickle.load(f)
221         job.run()
222         with open('/.../tmp-PBSManager-.../8.pkl', 'w') as f:
223             pickle.dump(job.data, f)
224         sys.exit(job.status)
225         >>> m.teardown()%(skip)s
226         """ % {'skip': _SKIP}
227
228     def _environ(self):
229         """Mimic the environment you would get with `qsub -V`.
230         """
231         environ = dict(os.environ)
232         for key in ['HOME', 'LANG', 'LOGNAME', 'PATH', 'MAIL', 'SHELL', 'TZ']:
233             if key in environ:
234                 environ['PBS_O_%s' % key] = environ[key]
235         environ['PBS_SERVER'] = self._pbs_server
236         environ['PBS_O_WORKDIR'] = os.getcwd()
237         #environ['PBS_ARRAYID'] # not an array job
238         return environ
239
240     def _receive_job(self):
241         if self._post_submit == True:
242             self._post_submit = False
243             time.sleep(self._post_submit_sleep)
244         while True:
245             for job in self._jobs.itervalues():
246                 if job.status != None:
247                     continue
248                 info = self._tracejob(job)
249                 if info.get('state', None) == 'COMPLETE':
250                     break
251             if info.get('state', None):
252                 break
253             time.sleep(self._receive_poll_sleep)
254         job._pbs_info = info
255         job.status = int(info['Exit_status'])
256         with open(job._pbs_paths['data'], 'r') as f:
257             job.data = pickle.load(f)
258         job._pbs_stdout = open(job._pbs_paths['stdout'], 'r').read()
259         job._pbs_stderr = open(job._pbs_paths['stderr'], 'r').read()
260         if self._cleanup == True:
261             for name,path in job._pbs_paths.iteritems():
262                 os.remove(path)
263         del(job._pbs_paths)
264         return job
265
266     def _tracejob(self, job):
267         s,out,err = invoke.invoke('tracejob "%s"' % job._pbs_id)
268         lines = []
269         for line in out.splitlines():
270             m = self._tracejob_re.match(line.strip())
271             if m == None:
272                 continue
273             data = {}
274             for group,value in zip(self._tracejob_re_groups, m.groups()):
275                 if group in self._tracejob_re_int_groups:
276                     data[group] = int(value)
277                 else:
278                     data[group] = value
279             lines.append(data)
280
281         info = {'lines': lines}
282         for line in lines:
283             if line['msg'].startswith('Exit_status='):
284                 # Exit_status=0 resources_used.cput=00:00:00...
285                 fields = line['msg'].split()
286                 for field in fields:
287                     key,value = field.split('=', 1)
288                     info[key] = value
289             elif line['msg'].startswith('dequeuing from'):
290                 m = self._tracejob_re_dequeue.match(line['msg'])
291                 info['queue'],info['state'] = m.groups()
292         return info
293
294     @staticmethod
295     def _make_attr_structs (attributes, constructor):
296         """Create an array of structs (via the specified
297         `constructor`) which encode the specified `attributes`.
298
299         Code adapted from:
300
301         From: "Nate Woody" <nathaniel.x.woody@gsk.com>
302         Date: Wed Sep 6 09:01:23 MDT 2006
303         Subject: [torqueusers] python pbs_submit
304         URL: http://www.clusterresources.com/pipermail/torqueusers/2006-September/004212.html
305         """
306         # Determine the number of structs evinced by 'attributes'.
307         attr_count = len(attributes)
308
309         attr  = constructor(attr_count)
310         index = 0
311
312         # Pack the struct array.
313         for pair in attributes.iteritems():
314             name, ds = pair
315
316             # If 'ds' is a dictionary, then treat it as containing valued resources.
317             if type(ds) == types.DictType:
318                 for resource_pair in ds.iteritems():
319                     resource, value       = resource_pair
320                     attr[index].name      = name
321                     attr[index].resource  = resource
322                     attr[index].value     = str(value)
323                     index                += 1
324
325             else:
326                 # If 'ds' is a scalar object, then wrap a list around it.
327                 if type(ds) != types.ListType:
328                     ds = [ ds ]
329
330                 attr[index].name  = name
331                 attr[index].value = ",".join(map(lambda x: str(x), ds))
332                 index += 1
333
334         return attr
335
336     @staticmethod
337     def _make_attrl (attributes):
338         """Obtain an array of `attrl` structs which encode the
339         specified `attributes`.
340
341         Code adapted from:
342
343         From: "Nate Woody" <nathaniel.x.woody@gsk.com>
344         Date: Wed Sep 6 09:01:23 MDT 2006
345         Subject: [torqueusers] python pbs_submit
346         URL: http://www.clusterresources.com/pipermail/torqueusers/2006-September/004212.html
347         """
348         return PBSManager._make_attr_structs(
349             attributes, constructor=pbs.new_attrl)
350
351     @staticmethod
352     def _make_attropl (attributes):
353         """Obtain an array of `attropl` structs which encode the specified
354         `attributes`.
355
356         Code adapted from:
357
358         From: "Nate Woody" <nathaniel.x.woody@gsk.com>
359         Date: Wed Sep 6 09:01:23 MDT 2006
360         Subject: [torqueusers] python pbs_submit
361         URL: http://www.clusterresources.com/pipermail/torqueusers/2006-September/004212.html
362         """
363         return PBSManager._make_attr_structs(
364             attributes, constructor=pbs.new_attropl)