1 # Copyright (C) 2010 W. Trevor King <wking@drexel.edu>
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 # The author may be contacted at <wking@drexel.edu> on the Internet, or
17 # write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
18 # Philadelphia PA 19104, USA.
20 """Functions for running external commands on other hosts.
23 from .. import invoke as invoke
27 MANAGERS = ['thread', 'pbs']
28 """Submodules with JobManager subclasses."""
32 """Job request structure for `JobManager`.
36 Jobs execute a Python function (through an interface similar to
39 >>> def job(*args, **kwargs):
40 ... print 'executing job with %s and %s.' % (args, kwargs)
42 >>> j = Job(id=3, target=job, args=[1,2], kwargs={'kwargA':3})
43 >>> j2 = copy.deepcopy(j)
45 Job status is initially `None`.
51 executing job with (1, 2) and {'kwargA': 3}.
53 After execution, `status` and `data` are set.
60 You can copy these attributes over to another job with `copy_onto()`.
68 String representations are nice and simple.
75 If `target` raises an exception, it is stored in `status` (for
76 successful runs, the status value is `0`).
79 ... raise Exception('error running job')
80 >>> j = Job(id=3, target=job)
83 Exception('error running job',)
87 def __init__(self, id=None, target=None, args=None, kwargs=None,
97 self.blocks_on = blocks_on
104 return '<%s %d>' % (self.__class__.__name__, self.id)
107 return self.__str__()
111 self.data = self.target(*self.args, **self.kwargs)
117 def copy_onto(self, other):
118 """Merge results of run onto initial job-request instance."""
119 for attr in ['status', 'data']:
120 setattr(other, attr, getattr(self, attr))
123 class InvokeJob (Job):
124 """`Job` subclass which `invoke()`\s a command line function.
126 >>> q = 'What is the meaning of life, the universe, and everything?'
127 >>> j = InvokeJob(id=3, target='echo "%s"' % q)
132 'What is the meaning of life, the universe, and everything?\\n'
136 >>> j = InvokeJob(id=3, target='missing_command')
139 Command failed (127):
140 /bin/sh: missing_command: command not found
148 '/bin/sh: missing_command: command not found\\n'
152 self.status,stdout,stderr = invoke.invoke(
153 self.target, *self.args, **self.kwargs)
154 self.data = {'stdout':stdout, 'stderr':stderr}
155 except invoke.CommandError, e:
157 self.data = {'stdout':e.stdout, 'stderr':e.stderr}
160 class JobManager (object):
161 """Base class for managing asynchronous `Job` execution."""
163 log().debug('setup %s' % self)
168 return '<%s %#x>' % (self.__class__.__name__, id(self))
171 return self.__str__()
174 log().debug('teardown %s' % self)
176 def async_invoke(self, job):
181 log().debug('spawn job %s' % job)
185 def _spawn_job(self, job):
186 raise NotImplementedError
188 def wait(self, ids=None):
190 ids = self._jobs.keys()
192 log().debug('wait on %s' % ids)
193 log().debug('jobs: %s' % self._jobs)
194 for id in list(ids): # get already completed jobs
195 if id not in self._jobs:
196 log().debug('%d already gone' % id)
198 elif self._jobs[id].status != None:
199 log().debug('%d already returned (%s)' % (id, self._jobs[id].status))
201 jobs[id] = self._jobs.pop(id)
202 while len(ids) > 0: # wait for outstanding jobs
203 job = self._receive_job()
204 log().debug('receive job %s (%d)' % (job, job.status))
205 job.copy_onto(self._jobs[job.id])
206 if job.id in ids and job.id in self._jobs:
207 jobs[job.id] = self._jobs.pop(job.id)
211 def _receive_job(self):
212 raise NotImplementedError
215 class IsSubclass (object):
216 """A safe subclass comparator.
221 >>> class A (object):
226 >>> is_subclass = IsSubclass(A)
229 >>> is_subclass = IsSubclass(A, blacklist=[A])
237 def __init__(self, base_class, blacklist=None):
238 self.base_class = base_class
239 if blacklist == None:
241 self.blacklist = blacklist
242 def __call__(self, other):
244 subclass = issubclass(other, self.base_class)
247 if other in self.blacklist:
252 def get_manager(submod):
254 >>> get_manager('thread')
255 <class 'pysawsim.manager.thread.ThreadManager'>
256 >>> get_manager('wookie')
257 Traceback (most recent call last):
259 AttributeError: 'module' object has no attribute 'wookie'
261 this_mod = __import__(__name__, fromlist=[submod])
262 sub_mod = getattr(this_mod, submod)
263 class_selector = IsSubclass(base_class=JobManager, blacklist=[JobManager])
264 for x_name in dir(sub_mod):
265 x = getattr(sub_mod, x_name)
266 if class_selector(x) == True:
268 raise ValueError('no JobManager found in %s' % sub_mod.__name__)