efbfbf: upgrade to Bugs Everywhere Directory v1.5
[sawsim.git] / pysawsim / manager / __init__.py
index 5deb701376bc5607ea6228c2fc762432860abe4f..30f6e874604f8224f35c41256a49574e2753e453 100644 (file)
 # Philadelphia PA 19104, USA.
 
 """Functions for running external commands on other hosts.
+
+See the Python wiki_ for a list of parallel processing modules.
+
+.. _wiki: http://wiki.python.org/moin/ParallelProcessing
 """
 
 from .. import invoke as invoke
+from .. import log
+
+
+MANAGERS = ['thread', 'subproc', 'mpi', 'pbs']
+"""Submodules with JobManager subclasses."""
 
 
 class Job (object):
-    """
+    """Job request structure for `JobManager`.
+
     >>> import copy
 
     Jobs execute a Python function (through an interface similar to
@@ -78,13 +88,14 @@ class Job (object):
     >>> print j.data
     None
     """
-    def __init__(self, id, target, blocks_on=None, args=None, kwargs=None):
-        if blocks_on == None:
-            blocks_on = []
+    def __init__(self, id=None, target=None, args=None, kwargs=None,
+                 blocks_on=None):
         if args == None:
             args = []
         if kwargs == None:
             kwargs = {}
+        if blocks_on == None:
+            blocks_on = []
         self.id = id
         self.target = target
         self.blocks_on = blocks_on
@@ -94,7 +105,7 @@ class Job (object):
         self.data = None
 
     def __str__(self):
-        return '<%s %d>' % (self.__class__.__name__, self.id)
+        return '<%s %s>' % (self.__class__.__name__, self.id)
 
     def __repr__(self):
         return self.__str__()
@@ -114,7 +125,7 @@ class Job (object):
 
 
 class InvokeJob (Job):
-    """Job subclass which `invoke()`\s a command line function.
+    """`Job` subclass which `invoke()`\s a command line function.
 
     >>> q = 'What is the meaning of life, the universe, and everything?'
     >>> j = InvokeJob(id=3, target='echo "%s"' % q)
@@ -128,17 +139,17 @@ class InvokeJob (Job):
 
     >>> j = InvokeJob(id=3, target='missing_command')
     >>> j.run()
-    >>> print j.status
+    >>> print j.status  # doctest: +ELLIPSIS
     Command failed (127):
-      /bin/sh: missing_command: command not found
+      /bin/sh: missing_command: ...not found
     <BLANKLINE>
     <BLANKLINE>
     while executing
       missing_command
     >>> j.data['stdout']
     ''
-    >>> j.data['stderr']
-    '/bin/sh: missing_command: command not found\\n'
+    >>> j.data['stderr']  # doctest: +ELLIPSIS
+    '/bin/sh: missing_command: ...not found\\n'
     """
     def run(self):
         try:
@@ -147,22 +158,31 @@ class InvokeJob (Job):
             self.data = {'stdout':stdout, 'stderr':stderr}
         except invoke.CommandError, e:
             self.status = e
-            self.data = {'stdout':e.stdout, 'stderr':e.stderr}
+            self.data = {'stdout':e.stdout, 'stderr':e.stderr, 'error':e}
 
 
 class JobManager (object):
+    """Base class for managing asynchronous `Job` execution."""
     def __init__(self):
+        log().debug('setup %s' % self)
         self._jobs = {}
-        self.next_id = 0
+        self._next_id = 0
+
+    def __str__(self):
+        return '<%s %#x>' % (self.__class__.__name__, id(self))
+
+    def __repr__(self):
+        return self.__str__()
 
     def teardown(self):
-        pass
+        log().debug('teardown %s' % self)
 
-    def async_invoke(self, cmd_string, blocks_on=None, *args, **kwargs):
-        id = self.next_id
-        self.next_id += 1
-        job = Job(id, cmd_string, *args, **kwargs)
+    def async_invoke(self, job):
+        id = self._next_id
+        self._next_id += 1
+        job.id = id
         self._jobs[id] = job
+        log().debug('spawn job %s' % job)
         self._spawn_job(job)
         return job
 
@@ -173,17 +193,99 @@ class JobManager (object):
         if ids == None:
             ids = self._jobs.keys()
         jobs = {}
+        log().debug('wait on %s' % ids)
+        log().debug('jobs: %s' % self._jobs)
         for id in list(ids):  # get already completed jobs
-            if self._jobs[id] != None:
-                jobs[id] = self._jobs.pop(id)
+            if id not in self._jobs:
+                log().debug('%d already gone' % id)
+                ids.remove(id)
+            elif self._jobs[id].status != None:
+                log().debug('%d already returned (%s)' % (id, self._jobs[id].status))
                 ids.remove(id)
+                jobs[id] = self._jobs.pop(id)
         while len(ids) > 0:  # wait for outstanding jobs
             job = self._receive_job()
-            job.copy_onto(self._jobs[job.id])
-            if job.id in ids:
+            self._handle_received_job(job)
+            if job.id in ids and job.id in self._jobs:
                 jobs[job.id] = self._jobs.pop(job.id)
-                ids.remove(id)
+                ids.remove(job.id)
         return jobs
 
+    def _handle_received_job(self, job):
+        log().debug('receive job %s (%s)' % (job, job.status))
+        job.copy_onto(self._jobs[job.id])
+
     def _receive_job(self):
         raise NotImplementedError
+
+
+class IsSubclass (object):
+    """A safe subclass comparator.
+
+    Examples
+    --------
+
+    >>> class A (object):
+    ...     pass
+    >>> class B (A):
+    ...     pass
+    >>> C = 5
+    >>> is_subclass = IsSubclass(A)
+    >>> is_subclass(A)
+    True
+    >>> is_subclass = IsSubclass(A, blacklist=[A])
+    >>> is_subclass(A)
+    False
+    >>> is_subclass(B)
+    True
+    >>> is_subclass(C)
+    False
+    """
+    def __init__(self, base_class, blacklist=None):
+        self.base_class = base_class
+        if blacklist == None:
+            blacklist = []
+        self.blacklist = blacklist
+    def __call__(self, other):
+        try:
+            subclass = issubclass(other, self.base_class)
+        except TypeError:
+            return False
+        if other in self.blacklist:
+            return False
+        return subclass
+
+
+def get_manager(submod=None, defaults=['subproc', 'thread']):
+    """
+    >>> get_manager('thread')
+    <class 'pysawsim.manager.thread.ThreadManager'>
+    >>> get_manager('wookie')
+    Traceback (most recent call last):
+      ...
+    AttributeError: 'module' object has no attribute 'wookie'
+    >>> m = get_manager()
+    >>> issubclass(m, JobManager)
+    True
+    """
+    if submod == None:
+        for submod in defaults:
+            try:
+                m = get_manager(submod)
+            except ImportError:
+                continue
+            if len(m._bugs) > 0:
+                continue
+            return m
+        raise Exception('none of the managers in %s were enabled' % defaults)
+    this_mod = __import__(__name__, fromlist=[submod])
+    sub_mod = getattr(this_mod, submod)
+    if sub_mod._ENABLED == False:
+        raise sub_mod._DISABLING_ERROR
+    class_selector = IsSubclass(base_class=JobManager, blacklist=[JobManager])
+    for x_name in dir(sub_mod):
+        x = getattr(sub_mod, x_name)
+        if class_selector(x) == True:
+            x._bugs = [a for a in dir(sub_mod) if a.startswith('_HAS_BUG_')]
+            return x
+    raise ValueError('no JobManager found in %s' % sub_mod.__name__)