1 # Copyright (C) 2010 W. Trevor King <wking@drexel.edu>
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 # The author may be contacted at <wking@drexel.edu> on the Internet, or
17 # write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
18 # Philadelphia PA 19104, USA.
20 """Functions for running external commands on other hosts.
23 from .. import invoke as invoke
27 SUBMODS = ['thread', 'pbs']
31 """Job request structure for `JobManager`.
35 Jobs execute a Python function (through an interface similar to
38 >>> def job(*args, **kwargs):
39 ... print 'executing job with %s and %s.' % (args, kwargs)
41 >>> j = Job(id=3, target=job, args=[1,2], kwargs={'kwargA':3})
42 >>> j2 = copy.deepcopy(j)
44 Job status is initially `None`.
50 executing job with (1, 2) and {'kwargA': 3}.
52 After execution, `status` and `data` are set.
59 You can copy these attributes over to another job with `copy_onto()`.
67 String representations are nice and simple.
74 If `target` raises an exception, it is stored in `status` (for
75 successful runs, the status value is `0`).
78 ... raise Exception('error running job')
79 >>> j = Job(id=3, target=job)
82 Exception('error running job',)
86 def __init__(self, id=None, target=None, args=None, kwargs=None,
96 self.blocks_on = blocks_on
103 return '<%s %d>' % (self.__class__.__name__, self.id)
106 return self.__str__()
110 self.data = self.target(*self.args, **self.kwargs)
116 def copy_onto(self, other):
117 """Merge results of run onto initial job-request instance."""
118 for attr in ['status', 'data']:
119 setattr(other, attr, getattr(self, attr))
122 class InvokeJob (Job):
123 """`Job` subclass which `invoke()`\s a command line function.
125 >>> q = 'What is the meaning of life, the universe, and everything?'
126 >>> j = InvokeJob(id=3, target='echo "%s"' % q)
131 'What is the meaning of life, the universe, and everything?\\n'
135 >>> j = InvokeJob(id=3, target='missing_command')
138 Command failed (127):
139 /bin/sh: missing_command: command not found
147 '/bin/sh: missing_command: command not found\\n'
151 self.status,stdout,stderr = invoke.invoke(
152 self.target, *self.args, **self.kwargs)
153 self.data = {'stdout':stdout, 'stderr':stderr}
154 except invoke.CommandError, e:
156 self.data = {'stdout':e.stdout, 'stderr':e.stderr}
159 class JobManager (object):
160 """Base class for managing asynchronous `Job` execution."""
162 log().debug('setup %s' % self)
167 return '<%s %#x>' % (self.__class__.__name__, id(self))
170 return self.__str__()
173 log().debug('teardown %s' % self)
175 def async_invoke(self, job):
180 log().debug('spawn job %s' % job)
184 def _spawn_job(self, job):
185 raise NotImplementedError
187 def wait(self, ids=None):
189 ids = self._jobs.keys()
191 log().debug('wait on %s' % ids)
192 log().debug('jobs: %s' % self._jobs)
193 for id in list(ids): # get already completed jobs
194 if id not in self._jobs:
195 log().debug('%d already gone' % id)
197 elif self._jobs[id].status != None:
198 log().debug('%d already returned (%s)' % (id, self._jobs[id].status))
200 jobs[id] = self._jobs.pop(id)
201 while len(ids) > 0: # wait for outstanding jobs
202 job = self._receive_job()
203 log().debug('receive job %s (%d)' % (job, job.status))
204 job.copy_onto(self._jobs[job.id])
205 if job.id in ids and job.id in self._jobs:
206 jobs[job.id] = self._jobs.pop(job.id)
210 def _receive_job(self):
211 raise NotImplementedError
214 class IsSubclass (object):
215 """A safe subclass comparator.
220 >>> class A (object):
225 >>> is_subclass = IsSubclass(A)
228 >>> is_subclass = IsSubclass(A, blacklist=[A])
236 def __init__(self, base_class, blacklist=None):
237 self.base_class = base_class
238 if blacklist == None:
240 self.blacklist = blacklist
241 def __call__(self, other):
243 subclass = issubclass(other, self.base_class)
246 if other in self.blacklist:
251 def get_manager(submod):
253 >>> get_manager('thread')
254 <class 'pysawsim.manager.thread.ThreadManager'>
255 >>> get_manager('wookie')
256 Traceback (most recent call last):
258 AttributeError: 'module' object has no attribute 'wookie'
260 this_mod = __import__(__name__, fromlist=[submod])
261 sub_mod = getattr(this_mod, submod)
262 class_selector = IsSubclass(base_class=JobManager, blacklist=[JobManager])
263 for x_name in dir(sub_mod):
264 x = getattr(sub_mod, x_name)
265 if class_selector(x) == True:
267 raise ValueError('no JobManager found in %s' % sub_mod.__name__)