1 # Copyright (C) 2010 W. Trevor King <wking@drexel.edu>
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 # The author may be contacted at <wking@drexel.edu> on the Internet, or
17 # write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
18 # Philadelphia PA 19104, USA.
20 """Functions for running external commands on other hosts.
22 pbs_ is a Python wrapper around the Tourque PBS server.
24 .. _pbs: https://subtrac.sara.nl/oss/pbs_python
27 from __future__ import absolute_import
28 from __future__ import with_statement
44 except ImportError, pbs_error:
46 _SKIP = ' # doctest: +SKIP'
49 from . import Job, JobManager
52 SCRATCH_DIR = os.path.expanduser('~')
55 class PBSManager (JobManager):
56 __doc__ = """Manage asynchronous `Job` execution via :mod:`pbs`.
58 >>> from math import sqrt
59 >>> m = PBSManager()%(skip)s
61 >>> for i in range(10):
62 ... group_A.append(m.async_invoke(Job(target=sqrt, args=[i])))%(skip)s
64 >>> for i in range(10):
65 ... group_B.append(m.async_invoke(Job(target=sqrt, args=[i],
66 ... blocks_on=[j.id for j in group_A])))%(skip)s
67 >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]])%(skip)s
68 >>> print sorted(jobs.values(), key=lambda j: j.id)%(skip)s
69 [<Job 5>, <Job 6>, <Job 7>]
70 >>> jobs = m.wait()%(skip)s
71 >>> print sorted(jobs.values(), key=lambda j: j.id)%(skip)s
72 ... # doctest: +NORMALIZE_WHITESPACE
73 [<Job 0>, <Job 1>, <Job 2>, <Job 3>, <Job 4>, <Job 8>, <Job 9>, <Job 10>,
74 <Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
76 >>> m.teardown()%(skip)s
78 def __init__(self, workdir=None):
79 super(PBSManager, self).__init__()
83 workdir = tempfile.mkdtemp(
84 prefix='tmp-PBSManager-', dir=SCRATCH_DIR)
85 self._temporary_workdir = True
87 self._temporary_workdir = False
88 self._workdir = workdir
89 # example tracejob line:
90 # 10/19/2010 13:57:01 S dequeuing from batch, state COMPLETE
91 self._tracejob_re = re.compile(
92 '(\d*)/(\d*)/(\d*) (\d*):(\d*):(\d*) (\w*) (.*)')
93 self._tracejob_re_groups = [
94 'month', 'day', 'year', 'hour', 'minute', 'second', 'code', 'msg']
95 self._tracejob_re_int_groups = [
96 'month', 'day', 'year', 'hour', 'minute', 'second']
97 # example tracejob message:
98 # dequeuing from batch, state COMPLETE
99 self._tracejob_re_dequeue = re.compile(
100 'dequeuing from (.*), state (.*)')
102 def _setup_pbs(self):
105 self._pbs_server = pbs.pbs_default()
106 if not self._pbs_server:
107 raise Exception('No default server: %s' % pbs.error())
108 self._pbs_connection = pbs.pbs_connect(self._pbs_server)
109 self._post_submit = False
110 self._post_submit_sleep = 3
111 self._receive_poll_sleep = 3
112 self._walltime = '3:00:00'
115 if self._cleanup == True and self._temporary_workdir == True:
116 shutil.rmtree(self._workdir)
117 super(PBSManager, self).teardown()
119 def _spawn_job(self, job):
121 for name,extension in [('script', 'py'), ('job', 'job.pkl'),
122 ('stdout', 'stdout'), ('stderr', 'stderr'),
124 job._pbs_paths[name] = os.path.join(
125 self._workdir, '%d.%s' % (job.id, extension))
126 with open(job._pbs_paths['script'], 'w') as f:
127 self._write_script(job, f)
128 with open(job._pbs_paths['job'], 'w') as f:
130 environ = ','.join(['%s=%s' % (k,v) for k,v
131 in self._environ().iteritems()])
132 host = socket.getfqdn(socket.gethostname())
133 attrib = self._make_attropl({
134 pbs.ATTR_e: '%s:%s' % (
135 host, job._pbs_paths['stderr']),
136 #pbs.ATTR_h: 'u', # user hold
137 pbs.ATTR_m: 'n', # don't send any mail
138 pbs.ATTR_N: 'pysawsim-%d' % id(job), # name the job
139 pbs.ATTR_o: '%s:%s' % (
140 host, job._pbs_paths['stdout']),
143 job._pbs_id = pbs.pbs_submit(
144 self._pbs_connection, attrib, job._pbs_paths['script'], None, None)
146 raise Exception('Error submitting job %s: %s'
147 % (job, pbs.error()))
148 self._post_submit = True
150 def _write_script(self, job, stream):
151 stream.write('#!/usr/bin/env python\n')
152 stream.write('#PBS -l walltime:%s\n' % self._walltime)
153 #stream.write('#PBS -l minwclimit:...')
154 if len(job.blocks_on) > 0:
155 blockers = [self._jobs.get(id, None) for id in job.blocks_on]
157 '#PBS -w afterok:%s\n'
158 % ':'.join([j._pbs_id for j in blockers if j != None]))
159 stream.write('from __future__ import with_statement\n')
160 stream.write('import os\n')
161 stream.write('import pickle\n')
162 stream.write('import sys\n')
163 stream.write('sys.path = [%s]\n' % ', '.join(
164 ["'%s'" % p for p in sys.path]))
165 stream.write("os.chdir(os.environ['PBS_O_WORKDIR'])\n")
166 stream.write("with open('%s', 'r') as f:\n" % job._pbs_paths['job'])
167 stream.write(' job = pickle.load(f)\n')
168 stream.write('job.run()\n')
169 stream.write("with open('%s', 'w') as f:\n" % job._pbs_paths['data'])
170 stream.write(' pickle.dump(job.data, f)\n')
171 stream.write('sys.exit(job.status)\n')
172 _write_script.__doc__ = """
173 >>> from . import InvokeJob
174 >>> m = PBSManager()%(skip)s
175 >>> j = InvokeJob(id=7, target='echo "testing %%s"' %% m)%(skip)s
177 ... 'job': os.path.join(m._workdir, '%%d.job.pkl' %% j.id),
178 ... 'data': os.path.join(m._workdir, '%%d.pkl' %% j.id),
180 >>> m._write_script(j, sys.stdout)%(skip)s
181 ... # doctest: +ELLIPSIS, +REPORT_UDIFF
182 #!/usr/bin/env python
183 #PBS -l walltime:3:00:00
184 from __future__ import with_statement
189 os.chdir(os.environ['PBS_O_WORKDIR'])
190 with open('/.../tmp-PBSManager-.../7.job.pkl', 'r') as f:
193 with open('/.../tmp-PBSManager-.../7.pkl', 'w') as f:
194 pickle.dump(job.data, f)
197 >>> for id in [3, 4]:%(skip)s
198 ... m._jobs[id] = Job(id=id)
199 ... m._jobs[id]._pbs_id = '%%d.big.iron.com' %% id
200 >>> j = InvokeJob(id=8, target='echo "testing %%s"' %% m,
201 ... blocks_on=[1,2,3,4])%(skip)s
203 ... 'job': os.path.join(m._workdir, '%%d.job.pkl' %% j.id),
204 ... 'data': os.path.join(m._workdir, '%%d.pkl' %% j.id),
206 >>> m._write_script(j, sys.stdout)%(skip)s
207 ... # doctest: +ELLIPSIS, +REPORT_UDIFF
208 #!/usr/bin/env python
209 #PBS -l walltime:3:00:00
210 #PBS -w afterok:3.big.iron.com:4.big.iron.com
211 from __future__ import with_statement
216 os.chdir(os.environ['PBS_O_WORKDIR'])
217 with open('/.../tmp-PBSManager-.../8.job.pkl', 'r') as f:
220 with open('/.../tmp-PBSManager-.../8.pkl', 'w') as f:
221 pickle.dump(job.data, f)
223 >>> m.teardown()%(skip)s
224 """ % {'skip': _SKIP}
227 """Mimic the environment you would get with `qsub -V`.
229 environ = dict(os.environ)
230 for key in ['HOME', 'LANG', 'LOGNAME', 'PATH', 'MAIL', 'SHELL', 'TZ']:
232 environ['PBS_O_%s' % key] = environ[key]
233 environ['PBS_SERVER'] = self._pbs_server
234 environ['PBS_O_WORKDIR'] = os.getcwd()
235 #environ['PBS_ARRAYID'] # not an array job
238 def _receive_job(self):
239 if self._post_submit == True:
240 self._post_submit = False
241 time.sleep(self._post_submit_sleep)
243 for job in self._jobs.itervalues():
244 if job.status != None:
246 info = self._tracejob(job)
247 if info.get('state', None) == 'COMPLETE':
249 if info.get('state', None):
251 time.sleep(self._receive_poll_sleep)
253 job.status = int(info['Exit_status'])
254 with open(job._pbs_paths['data'], 'r') as f:
255 job.data = pickle.load(f)
256 job._pbs_stdout = open(job._pbs_paths['stdout'], 'r').read()
257 job._pbs_stderr = open(job._pbs_paths['stderr'], 'r').read()
258 if self._cleanup == True:
259 for name,path in job._pbs_paths.iteritems():
264 def _tracejob(self, job):
265 s,out,err = invoke.invoke('tracejob "%s"' % job._pbs_id)
267 for line in out.splitlines():
268 m = self._tracejob_re.match(line.strip())
272 for group,value in zip(self._tracejob_re_groups, m.groups()):
273 if group in self._tracejob_re_int_groups:
274 data[group] = int(value)
279 info = {'lines': lines}
281 if line['msg'].startswith('Exit_status='):
282 # Exit_status=0 resources_used.cput=00:00:00...
283 fields = line['msg'].split()
285 key,value = field.split('=', 1)
287 elif line['msg'].startswith('dequeuing from'):
288 m = self._tracejob_re_dequeue.match(line['msg'])
289 info['queue'],info['state'] = m.groups()
293 def _make_attr_structs (attributes, constructor):
294 """Create an array of structs (via the specified
295 `constructor`) which encode the specified `attributes`.
299 From: "Nate Woody" <nathaniel.x.woody@gsk.com>
300 Date: Wed Sep 6 09:01:23 MDT 2006
301 Subject: [torqueusers] python pbs_submit
302 URL: http://www.clusterresources.com/pipermail/torqueusers/2006-September/004212.html
304 # Determine the number of structs evinced by 'attributes'.
305 attr_count = len(attributes)
307 attr = constructor(attr_count)
310 # Pack the struct array.
311 for pair in attributes.iteritems():
314 # If 'ds' is a dictionary, then treat it as containing valued resources.
315 if type(ds) == types.DictType:
316 for resource_pair in ds.iteritems():
317 resource, value = resource_pair
318 attr[index].name = name
319 attr[index].resource = resource
320 attr[index].value = str(value)
324 # If 'ds' is a scalar object, then wrap a list around it.
325 if type(ds) != types.ListType:
328 attr[index].name = name
329 attr[index].value = ",".join(map(lambda x: str(x), ds))
335 def _make_attrl (attributes):
336 """Obtain an array of `attrl` structs which encode the
337 specified `attributes`.
341 From: "Nate Woody" <nathaniel.x.woody@gsk.com>
342 Date: Wed Sep 6 09:01:23 MDT 2006
343 Subject: [torqueusers] python pbs_submit
344 URL: http://www.clusterresources.com/pipermail/torqueusers/2006-September/004212.html
346 return PBSManager._make_attr_structs(
347 attributes, constructor=pbs.new_attrl)
350 def _make_attropl (attributes):
351 """Obtain an array of `attropl` structs which encode the specified
356 From: "Nate Woody" <nathaniel.x.woody@gsk.com>
357 Date: Wed Sep 6 09:01:23 MDT 2006
358 Subject: [torqueusers] python pbs_submit
359 URL: http://www.clusterresources.com/pipermail/torqueusers/2006-September/004212.html
361 return PBSManager._make_attr_structs(
362 attributes, constructor=pbs.new_attropl)