1 # Copyright (C) 2010 W. Trevor King <wking@drexel.edu>
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 # The author may be contacted at <wking@drexel.edu> on the Internet, or
17 # write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
18 # Philadelphia PA 19104, USA.
20 """Functions for running external commands on other hosts.
22 pbs_ is a Python wrapper around the Tourque PBS server.
24 .. _pbs: https://subtrac.sara.nl/oss/pbs_python
27 from __future__ import absolute_import
28 from __future__ import with_statement
44 _DISABLING_ERROR = None
46 except ImportError, _DISABLING_ERROR:
48 _SKIP = ' # doctest: +SKIP'
51 from . import Job, JobManager
54 SCRATCH_DIR = os.path.expanduser('~')
57 class PBSManager (JobManager):
58 __doc__ = """Manage asynchronous `Job` execution via :mod:`pbs`.
60 >>> from math import sqrt
61 >>> m = PBSManager()%(skip)s
63 >>> for i in range(10):
64 ... group_A.append(m.async_invoke(Job(target=sqrt, args=[i])))%(skip)s
66 >>> for i in range(10):
67 ... group_B.append(m.async_invoke(Job(target=sqrt, args=[i],
68 ... blocks_on=[j.id for j in group_A])))%(skip)s
69 >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]])%(skip)s
70 >>> print sorted(jobs.values(), key=lambda j: j.id)%(skip)s
71 [<Job 5>, <Job 6>, <Job 7>]
72 >>> jobs = m.wait()%(skip)s
73 >>> print sorted(jobs.values(), key=lambda j: j.id)%(skip)s
74 ... # doctest: +NORMALIZE_WHITESPACE
75 [<Job 0>, <Job 1>, <Job 2>, <Job 3>, <Job 4>, <Job 8>, <Job 9>, <Job 10>,
76 <Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
78 >>> m.teardown()%(skip)s
80 def __init__(self, workdir=None):
81 super(PBSManager, self).__init__()
85 workdir = tempfile.mkdtemp(
86 prefix='tmp-PBSManager-', dir=SCRATCH_DIR)
87 self._temporary_workdir = True
89 self._temporary_workdir = False
90 self._workdir = workdir
91 # example tracejob line:
92 # 10/19/2010 13:57:01 S dequeuing from batch, state COMPLETE
93 self._tracejob_re = re.compile(
94 '(\d*)/(\d*)/(\d*) (\d*):(\d*):(\d*) (\w*) (.*)')
95 self._tracejob_re_groups = [
96 'month', 'day', 'year', 'hour', 'minute', 'second', 'code', 'msg']
97 self._tracejob_re_int_groups = [
98 'month', 'day', 'year', 'hour', 'minute', 'second']
99 # example tracejob message:
100 # dequeuing from batch, state COMPLETE
101 self._tracejob_re_dequeue = re.compile(
102 'dequeuing from (.*), state (.*)')
104 def _setup_pbs(self):
105 if _ENABLED == False:
106 raise _DISABLING_ERROR
107 self._pbs_server = pbs.pbs_default()
108 if not self._pbs_server:
109 raise Exception('No default server: %s' % pbs.error())
110 self._pbs_connection = pbs.pbs_connect(self._pbs_server)
111 self._post_submit = False
112 self._post_submit_sleep = 3
113 self._receive_poll_sleep = 3
114 self._walltime = '3:00:00'
117 if self._cleanup == True and self._temporary_workdir == True:
118 shutil.rmtree(self._workdir)
119 super(PBSManager, self).teardown()
121 def _spawn_job(self, job):
123 for name,extension in [('script', 'py'), ('job', 'job.pkl'),
124 ('stdout', 'stdout'), ('stderr', 'stderr'),
126 job._pbs_paths[name] = os.path.join(
127 self._workdir, '%d.%s' % (job.id, extension))
128 with open(job._pbs_paths['script'], 'w') as f:
129 self._write_script(job, f)
130 with open(job._pbs_paths['job'], 'w') as f:
132 environ = ','.join(['%s=%s' % (k,v) for k,v
133 in self._environ().iteritems()])
134 host = socket.getfqdn(socket.gethostname())
135 attrib = self._make_attropl({
136 pbs.ATTR_e: '%s:%s' % (
137 host, job._pbs_paths['stderr']),
138 #pbs.ATTR_h: 'u', # user hold
139 pbs.ATTR_m: 'n', # don't send any mail
140 pbs.ATTR_N: 'pysawsim-%d' % id(job), # name the job
141 pbs.ATTR_o: '%s:%s' % (
142 host, job._pbs_paths['stdout']),
145 job._pbs_id = pbs.pbs_submit(
146 self._pbs_connection, attrib, job._pbs_paths['script'], None, None)
148 raise Exception('Error submitting job %s: %s'
149 % (job, pbs.error()))
150 self._post_submit = True
152 def _write_script(self, job, stream):
153 stream.write('#!/usr/bin/env python\n')
154 stream.write('#PBS -l walltime:%s\n' % self._walltime)
155 #stream.write('#PBS -l minwclimit:...')
156 if len(job.blocks_on) > 0:
157 blockers = [self._jobs.get(id, None) for id in job.blocks_on]
159 '#PBS -w afterok:%s\n'
160 % ':'.join([j._pbs_id for j in blockers if j != None]))
161 stream.write('from __future__ import with_statement\n')
162 stream.write('import os\n')
163 stream.write('import pickle\n')
164 stream.write('import sys\n')
165 stream.write('sys.path = [%s]\n' % ', '.join(
166 ["'%s'" % p for p in sys.path]))
167 stream.write("os.chdir(os.environ['PBS_O_WORKDIR'])\n")
168 stream.write("with open('%s', 'r') as f:\n" % job._pbs_paths['job'])
169 stream.write(' job = pickle.load(f)\n')
170 stream.write('job.run()\n')
171 stream.write("with open('%s', 'w') as f:\n" % job._pbs_paths['data'])
172 stream.write(' pickle.dump(job.data, f)\n')
173 stream.write('sys.exit(job.status)\n')
174 _write_script.__doc__ = """
175 >>> from . import InvokeJob
176 >>> m = PBSManager()%(skip)s
177 >>> j = InvokeJob(id=7, target='echo "testing %%s"' %% m)%(skip)s
179 ... 'job': os.path.join(m._workdir, '%%d.job.pkl' %% j.id),
180 ... 'data': os.path.join(m._workdir, '%%d.pkl' %% j.id),
182 >>> m._write_script(j, sys.stdout)%(skip)s
183 ... # doctest: +ELLIPSIS, +REPORT_UDIFF
184 #!/usr/bin/env python
185 #PBS -l walltime:3:00:00
186 from __future__ import with_statement
191 os.chdir(os.environ['PBS_O_WORKDIR'])
192 with open('/.../tmp-PBSManager-.../7.job.pkl', 'r') as f:
195 with open('/.../tmp-PBSManager-.../7.pkl', 'w') as f:
196 pickle.dump(job.data, f)
199 >>> for id in [3, 4]:%(skip)s
200 ... m._jobs[id] = Job(id=id)
201 ... m._jobs[id]._pbs_id = '%%d.big.iron.com' %% id
202 >>> j = InvokeJob(id=8, target='echo "testing %%s"' %% m,
203 ... blocks_on=[1,2,3,4])%(skip)s
205 ... 'job': os.path.join(m._workdir, '%%d.job.pkl' %% j.id),
206 ... 'data': os.path.join(m._workdir, '%%d.pkl' %% j.id),
208 >>> m._write_script(j, sys.stdout)%(skip)s
209 ... # doctest: +ELLIPSIS, +REPORT_UDIFF
210 #!/usr/bin/env python
211 #PBS -l walltime:3:00:00
212 #PBS -w afterok:3.big.iron.com:4.big.iron.com
213 from __future__ import with_statement
218 os.chdir(os.environ['PBS_O_WORKDIR'])
219 with open('/.../tmp-PBSManager-.../8.job.pkl', 'r') as f:
222 with open('/.../tmp-PBSManager-.../8.pkl', 'w') as f:
223 pickle.dump(job.data, f)
225 >>> m.teardown()%(skip)s
226 """ % {'skip': _SKIP}
229 """Mimic the environment you would get with `qsub -V`.
231 environ = dict(os.environ)
232 for key in ['HOME', 'LANG', 'LOGNAME', 'PATH', 'MAIL', 'SHELL', 'TZ']:
234 environ['PBS_O_%s' % key] = environ[key]
235 environ['PBS_SERVER'] = self._pbs_server
236 environ['PBS_O_WORKDIR'] = os.getcwd()
237 #environ['PBS_ARRAYID'] # not an array job
240 def _receive_job(self):
241 if self._post_submit == True:
242 self._post_submit = False
243 time.sleep(self._post_submit_sleep)
245 for job in self._jobs.itervalues():
246 if job.status != None:
248 info = self._tracejob(job)
249 if info.get('state', None) == 'COMPLETE':
251 if info.get('state', None):
253 time.sleep(self._receive_poll_sleep)
255 job.status = int(info['Exit_status'])
256 with open(job._pbs_paths['data'], 'r') as f:
257 job.data = pickle.load(f)
258 job._pbs_stdout = open(job._pbs_paths['stdout'], 'r').read()
259 job._pbs_stderr = open(job._pbs_paths['stderr'], 'r').read()
260 if self._cleanup == True:
261 for name,path in job._pbs_paths.iteritems():
266 def _tracejob(self, job):
267 s,out,err = invoke.invoke('tracejob "%s"' % job._pbs_id)
269 for line in out.splitlines():
270 m = self._tracejob_re.match(line.strip())
274 for group,value in zip(self._tracejob_re_groups, m.groups()):
275 if group in self._tracejob_re_int_groups:
276 data[group] = int(value)
281 info = {'lines': lines}
283 if line['msg'].startswith('Exit_status='):
284 # Exit_status=0 resources_used.cput=00:00:00...
285 fields = line['msg'].split()
287 key,value = field.split('=', 1)
289 elif line['msg'].startswith('dequeuing from'):
290 m = self._tracejob_re_dequeue.match(line['msg'])
291 info['queue'],info['state'] = m.groups()
295 def _make_attr_structs (attributes, constructor):
296 """Create an array of structs (via the specified
297 `constructor`) which encode the specified `attributes`.
301 From: "Nate Woody" <nathaniel.x.woody@gsk.com>
302 Date: Wed Sep 6 09:01:23 MDT 2006
303 Subject: [torqueusers] python pbs_submit
304 URL: http://www.clusterresources.com/pipermail/torqueusers/2006-September/004212.html
306 # Determine the number of structs evinced by 'attributes'.
307 attr_count = len(attributes)
309 attr = constructor(attr_count)
312 # Pack the struct array.
313 for pair in attributes.iteritems():
316 # If 'ds' is a dictionary, then treat it as containing valued resources.
317 if type(ds) == types.DictType:
318 for resource_pair in ds.iteritems():
319 resource, value = resource_pair
320 attr[index].name = name
321 attr[index].resource = resource
322 attr[index].value = str(value)
326 # If 'ds' is a scalar object, then wrap a list around it.
327 if type(ds) != types.ListType:
330 attr[index].name = name
331 attr[index].value = ",".join(map(lambda x: str(x), ds))
337 def _make_attrl (attributes):
338 """Obtain an array of `attrl` structs which encode the
339 specified `attributes`.
343 From: "Nate Woody" <nathaniel.x.woody@gsk.com>
344 Date: Wed Sep 6 09:01:23 MDT 2006
345 Subject: [torqueusers] python pbs_submit
346 URL: http://www.clusterresources.com/pipermail/torqueusers/2006-September/004212.html
348 return PBSManager._make_attr_structs(
349 attributes, constructor=pbs.new_attrl)
352 def _make_attropl (attributes):
353 """Obtain an array of `attropl` structs which encode the specified
358 From: "Nate Woody" <nathaniel.x.woody@gsk.com>
359 Date: Wed Sep 6 09:01:23 MDT 2006
360 Subject: [torqueusers] python pbs_submit
361 URL: http://www.clusterresources.com/pipermail/torqueusers/2006-September/004212.html
363 return PBSManager._make_attr_structs(
364 attributes, constructor=pbs.new_attropl)