f0487c126fac6ed16588168c9c442de35048a198
[sawsim.git] / pysawsim / manager / pbs.py
1 # Copyright (C) 2010  W. Trevor King <wking@drexel.edu>
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License
14 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
15 #
16 # The author may be contacted at <wking@drexel.edu> on the Internet, or
17 # write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
18 # Philadelphia PA 19104, USA.
19
20 """Functions for running external commands on other hosts.
21
22 pbs_ is a Python wrapper around the Tourque PBS server.
23
24 .. _pbs: https://subtrac.sara.nl/oss/pbs_python
25 """
26
27 from __future__ import absolute_import
28 from __future__ import with_statement
29
30 import os
31 import os.path
32 import pickle
33 import re
34 import shutil
35 import socket
36 import sys
37 import tempfile
38 import time
39 import types
40
41 try:
42     import pbs
43 except ImportError, pbs_error:
44     pbs = None
45
46 from .. import invoke
47 from . import Job, JobManager
48
49
50 SCRATCH_DIR = os.path.expanduser('~')
51
52
53 class PBSManager (JobManager):
54     """Manage asynchronous `Job` execution via :mod:`pbs`.
55
56     >>> from math import sqrt
57     >>> m = PBSManager()
58     >>> group_A = []
59     >>> for i in range(10):
60     ...     group_A.append(m.async_invoke(Job(target=sqrt, args=[i])))
61     >>> group_B = []
62     >>> for i in range(10):
63     ...     group_B.append(m.async_invoke(Job(target=sqrt, args=[i],
64     ...                 blocks_on=[j.id for j in group_A])))
65     >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]])
66     >>> print sorted(jobs.values(), key=lambda j: j.id)
67     [<Job 5>, <Job 6>, <Job 7>]
68     >>> jobs = m.wait()
69     >>> print sorted(jobs.values(), key=lambda j: j.id)
70     ... # doctest: +NORMALIZE_WHITESPACE
71     [<Job 0>, <Job 1>, <Job 2>, <Job 3>, <Job 4>, <Job 8>, <Job 9>, <Job 10>,
72      <Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
73      <Job 18>, <Job 19>]
74     >>> m.teardown()
75     """
76     def __init__(self, workdir=None):
77         super(PBSManager, self).__init__()
78         self._cleanup = True
79         self._setup_pbs()
80         if workdir == None:
81             workdir = tempfile.mkdtemp(
82                 prefix='tmp-PBSManager-', dir=SCRATCH_DIR)
83             self._temporary_workdir = True
84         else:
85             self._temporary_workdir = False
86         self._workdir = workdir
87         # example tracejob line:
88         #   10/19/2010 13:57:01  S    dequeuing from batch, state COMPLETE
89         self._tracejob_re = re.compile(
90             '(\d*)/(\d*)/(\d*) (\d*):(\d*):(\d*)  (\w*)    (.*)')
91         self._tracejob_re_groups = [
92             'month', 'day', 'year', 'hour', 'minute', 'second', 'code', 'msg']
93         self._tracejob_re_int_groups = [
94             'month', 'day', 'year', 'hour', 'minute', 'second']
95         # example tracejob message:
96         #   dequeuing from batch, state COMPLETE
97         self._tracejob_re_dequeue = re.compile(
98             'dequeuing from (.*), state (.*)')
99
100     def _setup_pbs(self):
101         if pbs == None:
102             raise pbs_error
103         self._pbs_server = pbs.pbs_default()
104         if not self._pbs_server:
105             raise Exception('No default server: %s' % pbs.error())
106         self._pbs_connection = pbs.pbs_connect(self._pbs_server)
107         self._post_submit = False
108         self._post_submit_sleep = 3
109         self._receive_poll_sleep = 3
110         self._walltime = '3:00:00'
111
112     def teardown(self):
113         if self._cleanup == True and self._temporary_workdir == True:
114             shutil.rmtree(self._workdir)
115         super(PBSManager, self).teardown()
116
117     def _spawn_job(self, job):
118         job._pbs_paths = {}
119         for name,extension in [('script', 'py'), ('job', 'job.pkl'),
120                                ('stdout', 'stdout'), ('stderr', 'stderr'),
121                                ('data', 'pkl')]:
122             job._pbs_paths[name] = os.path.join(
123                 self._workdir, '%d.%s' % (job.id, extension))
124         with open(job._pbs_paths['script'], 'w') as f:
125             self._write_script(job, f)
126         with open(job._pbs_paths['job'], 'w') as f:
127             pickle.dump(job, f)
128         environ = ','.join(['%s=%s' % (k,v) for k,v
129                             in self._environ().iteritems()])
130         host = socket.getfqdn(socket.gethostname())
131         attrib = self._make_attropl({
132                 pbs.ATTR_e: '%s:%s' % (
133                     host, job._pbs_paths['stderr']),
134                 #pbs.ATTR_h: 'u',   # user hold
135                 pbs.ATTR_m: 'n',   # don't send any mail
136                 pbs.ATTR_N: 'pysawsim-%d' % id(job),  # name the job
137                 pbs.ATTR_o: '%s:%s' % (
138                     host, job._pbs_paths['stdout']),
139                 pbs.ATTR_v: environ,
140                 })
141         job._pbs_id = pbs.pbs_submit(
142             self._pbs_connection, attrib, job._pbs_paths['script'], None, None)
143         if not job._pbs_id:
144             raise Exception('Error submitting job %s: %s'
145                             % (job, pbs.error()))
146         self._post_submit = True
147
148     def _write_script(self, job, stream):
149         """
150         >>> from . import InvokeJob
151         >>> m = PBSManager()
152         >>> j = InvokeJob(id=7, target='echo "testing %s"' % m)
153         >>> j._pbs_paths = {
154         ...     'job': os.path.join(m._workdir, '%d.job.pkl' % j.id),
155         ...     'data': os.path.join(m._workdir, '%d.pkl' % j.id),
156         ...     }
157         >>> m._write_script(j, sys.stdout)
158         ... # doctest: +ELLIPSIS, +REPORT_UDIFF
159         #!/usr/bin/env python
160         #PBS -l walltime:3:00:00
161         from __future__ import with_statement
162         import os
163         import pickle
164         import sys
165         sys.path = [...]
166         os.chdir(os.environ['PBS_O_WORKDIR'])
167         with open('/.../tmp-PBSManager-.../7.job.pkl', 'r') as f:
168             job = pickle.load(f)
169         job.run()
170         with open('/.../tmp-PBSManager-.../7.pkl', 'w') as f:
171             pickle.dump(job.data, f)
172         sys.exit(job.status)
173
174         >>> for id in [3, 4]:
175         ...     m._jobs[id] = Job(id=id)
176         ...     m._jobs[id]._pbs_id = '%d.big.iron.com' % id
177         >>> j = InvokeJob(id=8, target='echo "testing %s"' % m,
178         ...               blocks_on=[1,2,3,4])
179         >>> j._pbs_paths = {
180         ...     'job': os.path.join(m._workdir, '%d.job.pkl' % j.id),
181         ...     'data': os.path.join(m._workdir, '%d.pkl' % j.id),
182         ...     }
183         >>> m._write_script(j, sys.stdout)
184         ... # doctest: +ELLIPSIS, +REPORT_UDIFF
185         #!/usr/bin/env python
186         #PBS -l walltime:3:00:00
187         #PBS -w afterok:3.big.iron.com:4.big.iron.com
188         from __future__ import with_statement
189         import os
190         import pickle
191         import sys
192         sys.path = [...]
193         os.chdir(os.environ['PBS_O_WORKDIR'])
194         with open('/.../tmp-PBSManager-.../8.job.pkl', 'r') as f:
195             job = pickle.load(f)
196         job.run()
197         with open('/.../tmp-PBSManager-.../8.pkl', 'w') as f:
198             pickle.dump(job.data, f)
199         sys.exit(job.status)
200         >>> m.teardown()
201         """
202         stream.write('#!/usr/bin/env python\n')
203         stream.write('#PBS -l walltime:%s\n' % self._walltime)
204         #stream.write('#PBS -l minwclimit:...')
205         if len(job.blocks_on) > 0:
206             blockers = [self._jobs.get(id, None) for id in job.blocks_on]
207             stream.write(
208                 '#PBS -w afterok:%s\n'
209                 % ':'.join([j._pbs_id for j in blockers if j != None]))
210         stream.write('from __future__ import with_statement\n')
211         stream.write('import os\n')
212         stream.write('import pickle\n')
213         stream.write('import sys\n')
214         stream.write('sys.path = [%s]\n' % ', '.join(
215                 ["'%s'" % p for p in sys.path]))
216         stream.write("os.chdir(os.environ['PBS_O_WORKDIR'])\n")
217         stream.write("with open('%s', 'r') as f:\n" % job._pbs_paths['job'])
218         stream.write('    job = pickle.load(f)\n')
219         stream.write('job.run()\n')
220         stream.write("with open('%s', 'w') as f:\n" % job._pbs_paths['data'])
221         stream.write('    pickle.dump(job.data, f)\n')
222         stream.write('sys.exit(job.status)\n')
223
224     def _environ(self):
225         """Mimic the environment you would get with `qsub -V`.
226         """
227         environ = dict(os.environ)
228         for key in ['HOME', 'LANG', 'LOGNAME', 'PATH', 'MAIL', 'SHELL', 'TZ']:
229             if key in environ:
230                 environ['PBS_O_%s' % key] = environ[key]
231         environ['PBS_SERVER'] = self._pbs_server
232         environ['PBS_O_WORKDIR'] = os.getcwd()
233         #environ['PBS_ARRAYID'] # not an array job
234         return environ
235
236     def _receive_job(self):
237         if self._post_submit == True:
238             self._post_submit = False
239             time.sleep(self._post_submit_sleep)
240         while True:
241             for job in self._jobs.itervalues():
242                 if job.status != None:
243                     continue
244                 info = self._tracejob(job)
245                 if info.get('state', None) == 'COMPLETE':
246                     break
247             if info.get('state', None):
248                 break
249             time.sleep(self._receive_poll_sleep)
250         job._pbs_info = info
251         job.status = int(info['Exit_status'])
252         with open(job._pbs_paths['data'], 'r') as f:
253             job.data = pickle.load(f)
254         job._pbs_stdout = open(job._pbs_paths['stdout'], 'r').read()
255         job._pbs_stderr = open(job._pbs_paths['stderr'], 'r').read()
256         if self._cleanup == True:
257             for name,path in job._pbs_paths.iteritems():
258                 os.remove(path)
259         del(job._pbs_paths)
260         return job
261
262     def _tracejob(self, job):
263         s,out,err = invoke.invoke('tracejob "%s"' % job._pbs_id)
264         lines = []
265         for line in out.splitlines():
266             m = self._tracejob_re.match(line.strip())
267             if m == None:
268                 continue
269             data = {}
270             for group,value in zip(self._tracejob_re_groups, m.groups()):
271                 if group in self._tracejob_re_int_groups:
272                     data[group] = int(value)
273                 else:
274                     data[group] = value
275             lines.append(data)
276
277         info = {'lines': lines}
278         for line in lines:
279             if line['msg'].startswith('Exit_status='):
280                 # Exit_status=0 resources_used.cput=00:00:00...
281                 fields = line['msg'].split()
282                 for field in fields:
283                     key,value = field.split('=', 1)
284                     info[key] = value
285             elif line['msg'].startswith('dequeuing from'):
286                 m = self._tracejob_re_dequeue.match(line['msg'])
287                 info['queue'],info['state'] = m.groups()
288         return info
289
290     @staticmethod
291     def _make_attr_structs (attributes, constructor):
292         """Create an array of structs (via the specified
293         `constructor`) which encode the specified `attributes`.
294
295         Code adapted from:
296
297         From: "Nate Woody" <nathaniel.x.woody@gsk.com>
298         Date: Wed Sep 6 09:01:23 MDT 2006
299         Subject: [torqueusers] python pbs_submit
300         URL: http://www.clusterresources.com/pipermail/torqueusers/2006-September/004212.html
301         """
302         # Determine the number of structs evinced by 'attributes'.
303         attr_count = len(attributes)
304
305         attr  = constructor(attr_count)
306         index = 0
307
308         # Pack the struct array.
309         for pair in attributes.iteritems():
310             name, ds = pair
311
312             # If 'ds' is a dictionary, then treat it as containing valued resources.
313             if type(ds) == types.DictType:
314                 for resource_pair in ds.iteritems():
315                     resource, value       = resource_pair
316                     attr[index].name      = name
317                     attr[index].resource  = resource
318                     attr[index].value     = str(value)
319                     index                += 1
320
321             else:
322                 # If 'ds' is a scalar object, then wrap a list around it.
323                 if type(ds) != types.ListType:
324                     ds = [ ds ]
325
326                 attr[index].name  = name
327                 attr[index].value = ",".join(map(lambda x: str(x), ds))
328                 index += 1
329
330         return attr
331
332     @staticmethod
333     def _make_attrl (attributes):
334         """Obtain an array of `attrl` structs which encode the
335         specified `attributes`.
336
337         Code adapted from:
338
339         From: "Nate Woody" <nathaniel.x.woody@gsk.com>
340         Date: Wed Sep 6 09:01:23 MDT 2006
341         Subject: [torqueusers] python pbs_submit
342         URL: http://www.clusterresources.com/pipermail/torqueusers/2006-September/004212.html
343         """
344         return PBSManager._make_attr_structs(
345             attributes, constructor=pbs.new_attrl)
346
347     @staticmethod
348     def _make_attropl (attributes):
349         """Obtain an array of `attropl` structs which encode the specified
350         `attributes`.
351
352         Code adapted from:
353
354         From: "Nate Woody" <nathaniel.x.woody@gsk.com>
355         Date: Wed Sep 6 09:01:23 MDT 2006
356         Subject: [torqueusers] python pbs_submit
357         URL: http://www.clusterresources.com/pipermail/torqueusers/2006-September/004212.html
358         """
359         return PBSManager._make_attr_structs(
360             attributes, constructor=pbs.new_attropl)