1 # Copyright (C) 2010 W. Trevor King <wking@drexel.edu>
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 # The author may be contacted at <wking@drexel.edu> on the Internet, or
17 # write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
18 # Philadelphia PA 19104, USA.
20 """Functions for running external commands on other hosts.
23 from .. import invoke as invoke
30 Jobs execute a Python function (through an interface similar to
33 >>> def job(*args, **kwargs):
34 ... print 'executing job with %s and %s.' % (args, kwargs)
36 >>> j = Job(id=3, target=job, args=[1,2], kwargs={'kwargA':3})
37 >>> j2 = copy.deepcopy(j)
39 Job status is initially `None`.
45 executing job with (1, 2) and {'kwargA': 3}.
47 After execution, `status` and `data` are set.
54 You can copy these attributes over to another job with `copy_onto()`.
62 String representations are nice and simple.
69 If `target` raises an exception, it is stored in `status` (for
70 successful runs, the status value is `0`).
73 ... raise Exception('error running job')
74 >>> j = Job(id=3, target=job)
77 Exception('error running job',)
81 def __init__(self, id, target, blocks_on=None, args=None, kwargs=None):
90 self.blocks_on = blocks_on
97 return '<%s %d>' % (self.__class__.__name__, self.id)
100 return self.__str__()
104 self.data = self.target(*self.args, **self.kwargs)
110 def copy_onto(self, other):
111 """Merge results of run onto initial job-request instance."""
112 for attr in ['status', 'data']:
113 setattr(other, attr, getattr(self, attr))
116 class InvokeJob (Job):
117 """Job subclass which `invoke()`\s a command line function.
119 >>> q = 'What is the meaning of life, the universe, and everything?'
120 >>> j = InvokeJob(id=3, target='echo "%s"' % q)
125 'What is the meaning of life, the universe, and everything?\\n'
129 >>> j = InvokeJob(id=3, target='missing_command')
132 Command failed (127):
133 /bin/sh: missing_command: command not found
141 '/bin/sh: missing_command: command not found\\n'
145 self.status,stdout,stderr = invoke.invoke(
146 self.target, *self.args, **self.kwargs)
147 self.data = {'stdout':stdout, 'stderr':stderr}
148 except invoke.CommandError, e:
150 self.data = {'stdout':e.stdout, 'stderr':e.stderr}
153 class JobManager (object):
161 def async_invoke(self, cmd_string, blocks_on=None, *args, **kwargs):
164 job = Job(id, cmd_string, *args, **kwargs)
169 def _spawn_job(self, job):
170 raise NotImplementedError
172 def wait(self, ids=None):
174 ids = self._jobs.keys()
176 for id in list(ids): # get already completed jobs
177 if self._jobs[id] != None:
178 jobs[id] = self._jobs.pop(id)
180 while len(ids) > 0: # wait for outstanding jobs
181 job = self._receive_job()
182 job.copy_onto(self._jobs[job.id])
184 jobs[job.id] = self._jobs.pop(job.id)
188 def _receive_job(self):
189 raise NotImplementedError