1 # Copyright (C) 2010 W. Trevor King <wking@drexel.edu>
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 # The author may be contacted at <wking@drexel.edu> on the Internet, or
17 # write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
18 # Philadelphia PA 19104, USA.
20 """Functions for running external commands on other hosts.
22 pbs_ is a Python wrapper around the Tourque PBS server.
24 .. _pbs: https://subtrac.sara.nl/oss/pbs_python
27 from __future__ import absolute_import
28 from __future__ import with_statement
43 except ImportError, pbs_error:
47 from . import Job, JobManager
50 SCRATCH_DIR = os.path.expanduser('~')
53 class PBSManager (JobManager):
54 """Manage asynchronous `Job` execution via :mod:`pbs`.
56 >>> from math import sqrt
59 >>> for i in range(10):
60 ... group_A.append(m.async_invoke(Job(target=sqrt, args=[i])))
62 >>> for i in range(10):
63 ... group_B.append(m.async_invoke(Job(target=sqrt, args=[i],
64 ... blocks_on=[j.id for j in group_A])))
65 >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]])
66 >>> print sorted(jobs.values(), key=lambda j: j.id)
67 [<Job 5>, <Job 6>, <Job 7>]
69 >>> print sorted(jobs.values(), key=lambda j: j.id)
70 ... # doctest: +NORMALIZE_WHITESPACE
71 [<Job 0>, <Job 1>, <Job 2>, <Job 3>, <Job 4>, <Job 8>, <Job 9>, <Job 10>,
72 <Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
76 def __init__(self, workdir=None):
77 super(PBSManager, self).__init__()
81 workdir = tempfile.mkdtemp(
82 prefix='tmp-PBSManager-', dir=SCRATCH_DIR)
83 self._temporary_workdir = True
85 self._temporary_workdir = False
86 self._workdir = workdir
87 # example tracejob line:
88 # 10/19/2010 13:57:01 S dequeuing from batch, state COMPLETE
89 self._tracejob_re = re.compile(
90 '(\d*)/(\d*)/(\d*) (\d*):(\d*):(\d*) (\w*) (.*)')
91 self._tracejob_re_groups = [
92 'month', 'day', 'year', 'hour', 'minute', 'second', 'code', 'msg']
93 self._tracejob_re_int_groups = [
94 'month', 'day', 'year', 'hour', 'minute', 'second']
95 # example tracejob message:
96 # dequeuing from batch, state COMPLETE
97 self._tracejob_re_dequeue = re.compile(
98 'dequeuing from (.*), state (.*)')
100 def _setup_pbs(self):
103 self._pbs_server = pbs.pbs_default()
104 if not self._pbs_server:
105 raise Exception('No default server: %s' % pbs.error())
106 self._pbs_connection = pbs.pbs_connect(self._pbs_server)
107 self._post_submit = False
108 self._post_submit_sleep = 3
109 self._receive_poll_sleep = 3
110 self._walltime = '3:00:00'
113 if self._cleanup == True and self._temporary_workdir == True:
114 shutil.rmtree(self._workdir)
115 super(PBSManager, self).teardown()
117 def _spawn_job(self, job):
119 for name,extension in [('script', 'py'), ('job', 'job.pkl'),
120 ('stdout', 'stdout'), ('stderr', 'stderr'),
122 job._pbs_paths[name] = os.path.join(
123 self._workdir, '%d.%s' % (job.id, extension))
124 with open(job._pbs_paths['script'], 'w') as f:
125 self._write_script(job, f)
126 with open(job._pbs_paths['job'], 'w') as f:
128 environ = ','.join(['%s=%s' % (k,v) for k,v
129 in self._environ().iteritems()])
130 host = socket.getfqdn(socket.gethostname())
131 attrib = self._make_attropl({
132 pbs.ATTR_e: '%s:%s' % (
133 host, job._pbs_paths['stderr']),
134 #pbs.ATTR_h: 'u', # user hold
135 pbs.ATTR_m: 'n', # don't send any mail
136 pbs.ATTR_N: 'pysawsim-%d' % id(job), # name the job
137 pbs.ATTR_o: '%s:%s' % (
138 host, job._pbs_paths['stdout']),
141 job._pbs_id = pbs.pbs_submit(
142 self._pbs_connection, attrib, job._pbs_paths['script'], None, None)
144 raise Exception('Error submitting job %s: %s'
145 % (job, pbs.error()))
146 self._post_submit = True
148 def _write_script(self, job, stream):
150 >>> from . import InvokeJob
152 >>> j = InvokeJob(id=7, target='echo "testing %s"' % m)
154 ... 'job': os.path.join(m._workdir, '%d.job.pkl' % j.id),
155 ... 'data': os.path.join(m._workdir, '%d.pkl' % j.id),
157 >>> m._write_script(j, sys.stdout)
158 ... # doctest: +ELLIPSIS, +REPORT_UDIFF
159 #!/usr/bin/env python
160 #PBS -l walltime:3:00:00
161 from __future__ import with_statement
166 os.chdir(os.environ['PBS_O_WORKDIR'])
167 with open('/.../tmp-PBSManager-.../7.job.pkl', 'r') as f:
170 with open('/.../tmp-PBSManager-.../7.pkl', 'w') as f:
171 pickle.dump(job.data, f)
174 >>> for id in [3, 4]:
175 ... m._jobs[id] = Job(id=id)
176 ... m._jobs[id]._pbs_id = '%d.big.iron.com' % id
177 >>> j = InvokeJob(id=8, target='echo "testing %s"' % m,
178 ... blocks_on=[1,2,3,4])
180 ... 'job': os.path.join(m._workdir, '%d.job.pkl' % j.id),
181 ... 'data': os.path.join(m._workdir, '%d.pkl' % j.id),
183 >>> m._write_script(j, sys.stdout)
184 ... # doctest: +ELLIPSIS, +REPORT_UDIFF
185 #!/usr/bin/env python
186 #PBS -l walltime:3:00:00
187 #PBS -w afterok:3.big.iron.com:4.big.iron.com
188 from __future__ import with_statement
193 os.chdir(os.environ['PBS_O_WORKDIR'])
194 with open('/.../tmp-PBSManager-.../8.job.pkl', 'r') as f:
197 with open('/.../tmp-PBSManager-.../8.pkl', 'w') as f:
198 pickle.dump(job.data, f)
202 stream.write('#!/usr/bin/env python\n')
203 stream.write('#PBS -l walltime:%s\n' % self._walltime)
204 #stream.write('#PBS -l minwclimit:...')
205 if len(job.blocks_on) > 0:
206 blockers = [self._jobs.get(id, None) for id in job.blocks_on]
208 '#PBS -w afterok:%s\n'
209 % ':'.join([j._pbs_id for j in blockers if j != None]))
210 stream.write('from __future__ import with_statement\n')
211 stream.write('import os\n')
212 stream.write('import pickle\n')
213 stream.write('import sys\n')
214 stream.write('sys.path = [%s]\n' % ', '.join(
215 ["'%s'" % p for p in sys.path]))
216 stream.write("os.chdir(os.environ['PBS_O_WORKDIR'])\n")
217 stream.write("with open('%s', 'r') as f:\n" % job._pbs_paths['job'])
218 stream.write(' job = pickle.load(f)\n')
219 stream.write('job.run()\n')
220 stream.write("with open('%s', 'w') as f:\n" % job._pbs_paths['data'])
221 stream.write(' pickle.dump(job.data, f)\n')
222 stream.write('sys.exit(job.status)\n')
225 """Mimic the environment you would get with `qsub -V`.
227 environ = dict(os.environ)
228 for key in ['HOME', 'LANG', 'LOGNAME', 'PATH', 'MAIL', 'SHELL', 'TZ']:
230 environ['PBS_O_%s' % key] = environ[key]
231 environ['PBS_SERVER'] = self._pbs_server
232 environ['PBS_O_WORKDIR'] = os.getcwd()
233 #environ['PBS_ARRAYID'] # not an array job
236 def _receive_job(self):
237 if self._post_submit == True:
238 self._post_submit = False
239 time.sleep(self._post_submit_sleep)
241 for job in self._jobs.itervalues():
242 if job.status != None:
244 info = self._tracejob(job)
245 if info.get('state', None) == 'COMPLETE':
247 if info.get('state', None):
249 time.sleep(self._receive_poll_sleep)
251 job.status = int(info['Exit_status'])
252 with open(job._pbs_paths['data'], 'r') as f:
253 job.data = pickle.load(f)
254 job._pbs_stdout = open(job._pbs_paths['stdout'], 'r').read()
255 job._pbs_stderr = open(job._pbs_paths['stderr'], 'r').read()
256 if self._cleanup == True:
257 for name,path in job._pbs_paths.iteritems():
262 def _tracejob(self, job):
263 s,out,err = invoke.invoke('tracejob "%s"' % job._pbs_id)
265 for line in out.splitlines():
266 m = self._tracejob_re.match(line.strip())
270 for group,value in zip(self._tracejob_re_groups, m.groups()):
271 if group in self._tracejob_re_int_groups:
272 data[group] = int(value)
277 info = {'lines': lines}
279 if line['msg'].startswith('Exit_status='):
280 # Exit_status=0 resources_used.cput=00:00:00...
281 fields = line['msg'].split()
283 key,value = field.split('=', 1)
285 elif line['msg'].startswith('dequeuing from'):
286 m = self._tracejob_re_dequeue.match(line['msg'])
287 info['queue'],info['state'] = m.groups()
291 def _make_attr_structs (attributes, constructor):
292 """Create an array of structs (via the specified
293 `constructor`) which encode the specified `attributes`.
297 From: "Nate Woody" <nathaniel.x.woody@gsk.com>
298 Date: Wed Sep 6 09:01:23 MDT 2006
299 Subject: [torqueusers] python pbs_submit
300 URL: http://www.clusterresources.com/pipermail/torqueusers/2006-September/004212.html
302 # Determine the number of structs evinced by 'attributes'.
303 attr_count = len(attributes)
305 attr = constructor(attr_count)
308 # Pack the struct array.
309 for pair in attributes.iteritems():
312 # If 'ds' is a dictionary, then treat it as containing valued resources.
313 if type(ds) == types.DictType:
314 for resource_pair in ds.iteritems():
315 resource, value = resource_pair
316 attr[index].name = name
317 attr[index].resource = resource
318 attr[index].value = str(value)
322 # If 'ds' is a scalar object, then wrap a list around it.
323 if type(ds) != types.ListType:
326 attr[index].name = name
327 attr[index].value = ",".join(map(lambda x: str(x), ds))
333 def _make_attrl (attributes):
334 """Obtain an array of `attrl` structs which encode the
335 specified `attributes`.
339 From: "Nate Woody" <nathaniel.x.woody@gsk.com>
340 Date: Wed Sep 6 09:01:23 MDT 2006
341 Subject: [torqueusers] python pbs_submit
342 URL: http://www.clusterresources.com/pipermail/torqueusers/2006-September/004212.html
344 return PBSManager._make_attr_structs(
345 attributes, constructor=pbs.new_attrl)
348 def _make_attropl (attributes):
349 """Obtain an array of `attropl` structs which encode the specified
354 From: "Nate Woody" <nathaniel.x.woody@gsk.com>
355 Date: Wed Sep 6 09:01:23 MDT 2006
356 Subject: [torqueusers] python pbs_submit
357 URL: http://www.clusterresources.com/pipermail/torqueusers/2006-September/004212.html
359 return PBSManager._make_attr_structs(
360 attributes, constructor=pbs.new_attropl)