1 # Copyright (C) 2010 W. Trevor King <wking@drexel.edu>
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 # The author may be contacted at <wking@drexel.edu> on the Internet, or
17 # write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
18 # Philadelphia PA 19104, USA.
20 """Functions for running external commands on other hosts.
22 See the Python wiki_ for a list of parallel processing modules.
24 .. _wiki: http://wiki.python.org/moin/ParallelProcessing
27 from .. import invoke as invoke
31 MANAGERS = ['thread', 'subproc', 'mpi', 'pbs']
32 """Submodules with JobManager subclasses."""
36 """Job request structure for `JobManager`.
40 Jobs execute a Python function (through an interface similar to
43 >>> def job(*args, **kwargs):
44 ... print 'executing job with %s and %s.' % (args, kwargs)
46 >>> j = Job(id=3, target=job, args=[1,2], kwargs={'kwargA':3})
47 >>> j2 = copy.deepcopy(j)
49 Job status is initially `None`.
55 executing job with (1, 2) and {'kwargA': 3}.
57 After execution, `status` and `data` are set.
64 You can copy these attributes over to another job with `copy_onto()`.
72 String representations are nice and simple.
79 If `target` raises an exception, it is stored in `status` (for
80 successful runs, the status value is `0`).
83 ... raise Exception('error running job')
84 >>> j = Job(id=3, target=job)
87 Exception('error running job',)
91 def __init__(self, id=None, target=None, args=None, kwargs=None,
101 self.blocks_on = blocks_on
108 return '<%s %s>' % (self.__class__.__name__, self.id)
111 return self.__str__()
115 self.data = self.target(*self.args, **self.kwargs)
121 def copy_onto(self, other):
122 """Merge results of run onto initial job-request instance."""
123 for attr in ['status', 'data']:
124 setattr(other, attr, getattr(self, attr))
127 class InvokeJob (Job):
128 """`Job` subclass which `invoke()`\s a command line function.
130 >>> q = 'What is the meaning of life, the universe, and everything?'
131 >>> j = InvokeJob(id=3, target='echo "%s"' % q)
136 'What is the meaning of life, the universe, and everything?\\n'
140 >>> j = InvokeJob(id=3, target='missing_command')
142 >>> print j.status # doctest: +ELLIPSIS
143 Command failed (127):
144 /bin/sh: missing_command: ...not found
151 >>> j.data['stderr'] # doctest: +ELLIPSIS
152 '/bin/sh: missing_command: ...not found\\n'
156 self.status,stdout,stderr = invoke.invoke(
157 self.target, *self.args, **self.kwargs)
158 self.data = {'stdout':stdout, 'stderr':stderr}
159 except invoke.CommandError, e:
161 self.data = {'stdout':e.stdout, 'stderr':e.stderr, 'error':e}
164 class JobManager (object):
165 """Base class for managing asynchronous `Job` execution."""
167 log().debug('setup %s' % self)
172 return '<%s %#x>' % (self.__class__.__name__, id(self))
175 return self.__str__()
178 log().debug('teardown %s' % self)
180 def async_invoke(self, job):
185 log().debug('spawn job %s' % job)
189 def _spawn_job(self, job):
190 raise NotImplementedError
192 def wait(self, ids=None):
194 ids = self._jobs.keys()
196 log().debug('wait on %s' % ids)
197 log().debug('jobs: %s' % self._jobs)
198 for id in list(ids): # get already completed jobs
199 if id not in self._jobs:
200 log().debug('%d already gone' % id)
202 elif self._jobs[id].status != None:
203 log().debug('%d already returned (%s)' % (id, self._jobs[id].status))
205 jobs[id] = self._jobs.pop(id)
206 while len(ids) > 0: # wait for outstanding jobs
207 job = self._receive_job()
208 self._handle_received_job(job)
209 if job.id in ids and job.id in self._jobs:
210 jobs[job.id] = self._jobs.pop(job.id)
214 def _handle_received_job(self, job):
215 log().debug('receive job %s (%s)' % (job, job.status))
216 job.copy_onto(self._jobs[job.id])
218 def _receive_job(self):
219 raise NotImplementedError
222 class IsSubclass (object):
223 """A safe subclass comparator.
228 >>> class A (object):
233 >>> is_subclass = IsSubclass(A)
236 >>> is_subclass = IsSubclass(A, blacklist=[A])
244 def __init__(self, base_class, blacklist=None):
245 self.base_class = base_class
246 if blacklist == None:
248 self.blacklist = blacklist
249 def __call__(self, other):
251 subclass = issubclass(other, self.base_class)
254 if other in self.blacklist:
259 def get_manager(submod=None, defaults=['subproc', 'thread']):
261 >>> get_manager('thread')
262 <class 'pysawsim.manager.thread.ThreadManager'>
263 >>> get_manager('wookie')
264 Traceback (most recent call last):
266 AttributeError: 'module' object has no attribute 'wookie'
267 >>> m = get_manager()
268 >>> issubclass(m, JobManager)
272 for submod in defaults:
274 m = get_manager(submod)
280 raise Exception('none of the managers in %s were enabled' % defaults)
281 this_mod = __import__(__name__, fromlist=[submod])
282 sub_mod = getattr(this_mod, submod)
283 if sub_mod._ENABLED == False:
284 raise sub_mod._DISABLING_ERROR
285 class_selector = IsSubclass(base_class=JobManager, blacklist=[JobManager])
286 for x_name in dir(sub_mod):
287 x = getattr(sub_mod, x_name)
288 if class_selector(x) == True:
289 x._bugs = [a for a in dir(sub_mod) if a.startswith('_HAS_BUG_')]
291 raise ValueError('no JobManager found in %s' % sub_mod.__name__)