1 # Copyright (C) 2010 W. Trevor King <wking@drexel.edu>
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 # The author may be contacted at <wking@drexel.edu> on the Internet, or
17 # write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
18 # Philadelphia PA 19104, USA.
20 """Functions for running external commands on other hosts.
23 from .. import invoke as invoke
28 """Job request structure for `JobManager`.
32 Jobs execute a Python function (through an interface similar to
35 >>> def job(*args, **kwargs):
36 ... print 'executing job with %s and %s.' % (args, kwargs)
38 >>> j = Job(id=3, target=job, args=[1,2], kwargs={'kwargA':3})
39 >>> j2 = copy.deepcopy(j)
41 Job status is initially `None`.
47 executing job with (1, 2) and {'kwargA': 3}.
49 After execution, `status` and `data` are set.
56 You can copy these attributes over to another job with `copy_onto()`.
64 String representations are nice and simple.
71 If `target` raises an exception, it is stored in `status` (for
72 successful runs, the status value is `0`).
75 ... raise Exception('error running job')
76 >>> j = Job(id=3, target=job)
79 Exception('error running job',)
83 def __init__(self, id=None, target=None, args=None, kwargs=None,
93 self.blocks_on = blocks_on
100 return '<%s %d>' % (self.__class__.__name__, self.id)
103 return self.__str__()
107 self.data = self.target(*self.args, **self.kwargs)
113 def copy_onto(self, other):
114 """Merge results of run onto initial job-request instance."""
115 for attr in ['status', 'data']:
116 setattr(other, attr, getattr(self, attr))
119 class InvokeJob (Job):
120 """`Job` subclass which `invoke()`\s a command line function.
122 >>> q = 'What is the meaning of life, the universe, and everything?'
123 >>> j = InvokeJob(id=3, target='echo "%s"' % q)
128 'What is the meaning of life, the universe, and everything?\\n'
132 >>> j = InvokeJob(id=3, target='missing_command')
135 Command failed (127):
136 /bin/sh: missing_command: command not found
144 '/bin/sh: missing_command: command not found\\n'
148 self.status,stdout,stderr = invoke.invoke(
149 self.target, *self.args, **self.kwargs)
150 self.data = {'stdout':stdout, 'stderr':stderr}
151 except invoke.CommandError, e:
153 self.data = {'stdout':e.stdout, 'stderr':e.stderr}
156 class JobManager (object):
157 """Base class for managing asynchronous `Job` execution."""
159 log().debug('setup %s' % self)
164 return '<%s %#x>' % (self.__class__.__name__, id(self))
167 return self.__str__()
170 log().debug('teardown %s' % self)
172 def async_invoke(self, job):
177 log().debug('spawn job %s' % job)
181 def _spawn_job(self, job):
182 raise NotImplementedError
184 def wait(self, ids=None):
186 ids = self._jobs.keys()
188 log().debug('wait on %s' % ids)
189 log().debug('jobs: %s' % self._jobs)
190 for id in list(ids): # get already completed jobs
191 if id not in self._jobs:
192 log().debug('%d already gone' % id)
194 elif self._jobs[id].status != None:
195 log().debug('%d already returned (%s)' % (id, self._jobs[id].status))
197 jobs[id] = self._jobs.pop(id)
198 while len(ids) > 0: # wait for outstanding jobs
199 job = self._receive_job()
200 log().debug('receive job %s (%d)' % (job, job.status))
201 job.copy_onto(self._jobs[job.id])
202 if job.id in ids and job.id in self._jobs:
203 jobs[job.id] = self._jobs.pop(job.id)
207 def _receive_job(self):
208 raise NotImplementedError