# Philadelphia PA 19104, USA.
"""Functions for running external commands on other hosts.
+
+See the Python wiki_ for a list of parallel processing modules.
+
+.. _wiki: http://wiki.python.org/moin/ParallelProcessing
"""
from .. import invoke as invoke
+from .. import log
+
+
+MANAGERS = ['thread', 'subproc', 'mpi', 'pbs']
+"""Submodules with JobManager subclasses."""
class Job (object):
self.data = None
def __str__(self):
- return '<%s %d>' % (self.__class__.__name__, self.id)
+ return '<%s %s>' % (self.__class__.__name__, self.id)
def __repr__(self):
return self.__str__()
>>> j = InvokeJob(id=3, target='missing_command')
>>> j.run()
- >>> print j.status
+ >>> print j.status # doctest: +ELLIPSIS
Command failed (127):
- /bin/sh: missing_command: command not found
+ /bin/sh: missing_command: ...not found
<BLANKLINE>
<BLANKLINE>
while executing
missing_command
>>> j.data['stdout']
''
- >>> j.data['stderr']
- '/bin/sh: missing_command: command not found\\n'
+ >>> j.data['stderr'] # doctest: +ELLIPSIS
+ '/bin/sh: missing_command: ...not found\\n'
"""
def run(self):
try:
self.data = {'stdout':stdout, 'stderr':stderr}
except invoke.CommandError, e:
self.status = e
- self.data = {'stdout':e.stdout, 'stderr':e.stderr}
+ self.data = {'stdout':e.stdout, 'stderr':e.stderr, 'error':e}
class JobManager (object):
"""Base class for managing asynchronous `Job` execution."""
def __init__(self):
+ log().debug('setup %s' % self)
self._jobs = {}
- self.next_id = 0
+ self._next_id = 0
+
+ def __str__(self):
+ return '<%s %#x>' % (self.__class__.__name__, id(self))
+
+ def __repr__(self):
+ return self.__str__()
def teardown(self):
- pass
+ log().debug('teardown %s' % self)
def async_invoke(self, job):
- id = self.next_id
- self.next_id += 1
+ id = self._next_id
+ self._next_id += 1
job.id = id
self._jobs[id] = job
+ log().debug('spawn job %s' % job)
self._spawn_job(job)
return job
if ids == None:
ids = self._jobs.keys()
jobs = {}
+ log().debug('wait on %s' % ids)
+ log().debug('jobs: %s' % self._jobs)
for id in list(ids): # get already completed jobs
- if self._jobs[id] != None:
- jobs[id] = self._jobs.pop(id)
+ if id not in self._jobs:
+ log().debug('%d already gone' % id)
ids.remove(id)
+ elif self._jobs[id].status != None:
+ log().debug('%d already returned (%s)' % (id, self._jobs[id].status))
+ ids.remove(id)
+ jobs[id] = self._jobs.pop(id)
while len(ids) > 0: # wait for outstanding jobs
job = self._receive_job()
- job.copy_onto(self._jobs[job.id])
- if job.id in ids:
+ self._handle_received_job(job)
+ if job.id in ids and job.id in self._jobs:
jobs[job.id] = self._jobs.pop(job.id)
- ids.remove(id)
+ ids.remove(job.id)
return jobs
+ def _handle_received_job(self, job):
+ log().debug('receive job %s (%s)' % (job, job.status))
+ job.copy_onto(self._jobs[job.id])
+
def _receive_job(self):
raise NotImplementedError
+
+
+class IsSubclass (object):
+ """A safe subclass comparator.
+
+ Examples
+ --------
+
+ >>> class A (object):
+ ... pass
+ >>> class B (A):
+ ... pass
+ >>> C = 5
+ >>> is_subclass = IsSubclass(A)
+ >>> is_subclass(A)
+ True
+ >>> is_subclass = IsSubclass(A, blacklist=[A])
+ >>> is_subclass(A)
+ False
+ >>> is_subclass(B)
+ True
+ >>> is_subclass(C)
+ False
+ """
+ def __init__(self, base_class, blacklist=None):
+ self.base_class = base_class
+ if blacklist == None:
+ blacklist = []
+ self.blacklist = blacklist
+ def __call__(self, other):
+ try:
+ subclass = issubclass(other, self.base_class)
+ except TypeError:
+ return False
+ if other in self.blacklist:
+ return False
+ return subclass
+
+
+def get_manager(submod=None, defaults=['subproc', 'thread']):
+ """
+ >>> get_manager('thread')
+ <class 'pysawsim.manager.thread.ThreadManager'>
+ >>> get_manager('wookie')
+ Traceback (most recent call last):
+ ...
+ AttributeError: 'module' object has no attribute 'wookie'
+ >>> m = get_manager()
+ >>> issubclass(m, JobManager)
+ True
+ """
+ if submod == None:
+ for submod in defaults:
+ try:
+ m = get_manager(submod)
+ except ImportError:
+ continue
+ if len(m._bugs) > 0:
+ continue
+ return m
+ raise Exception('none of the managers in %s were enabled' % defaults)
+ this_mod = __import__(__name__, fromlist=[submod])
+ sub_mod = getattr(this_mod, submod)
+ if sub_mod._ENABLED == False:
+ raise sub_mod._DISABLING_ERROR
+ class_selector = IsSubclass(base_class=JobManager, blacklist=[JobManager])
+ for x_name in dir(sub_mod):
+ x = getattr(sub_mod, x_name)
+ if class_selector(x) == True:
+ x._bugs = [a for a in dir(sub_mod) if a.startswith('_HAS_BUG_')]
+ return x
+ raise ValueError('no JobManager found in %s' % sub_mod.__name__)