Add pysawsim.manager.subproc using subprocessing.
[sawsim.git] / pysawsim / manager / __init__.py
index 1dbdf7d0bba8b616fb79e110b5a241c00e90ea33..5cc1d6e2161caa7d44353a575c2813ead6412d72 100644 (file)
 """
 
 from .. import invoke as invoke
+from .. import log
+
+
+MANAGERS = ['thread', 'subproc', 'pbs']
+"""Submodules with JobManager subclasses."""
 
 
 class Job (object):
@@ -96,7 +101,7 @@ class Job (object):
         self.data = None
 
     def __str__(self):
-        return '<%s %d>' % (self.__class__.__name__, self.id)
+        return '<%s %s>' % (self.__class__.__name__, self.id)
 
     def __repr__(self):
         return self.__str__()
@@ -155,17 +160,25 @@ class InvokeJob (Job):
 class JobManager (object):
     """Base class for managing asynchronous `Job` execution."""
     def __init__(self):
+        log().debug('setup %s' % self)
         self._jobs = {}
-        self.next_id = 0
+        self._next_id = 0
+
+    def __str__(self):
+        return '<%s %#x>' % (self.__class__.__name__, id(self))
+
+    def __repr__(self):
+        return self.__str__()
 
     def teardown(self):
-        pass
+        log().debug('teardown %s' % self)
 
     def async_invoke(self, job):
-        id = self.next_id
-        self.next_id += 1
+        id = self._next_id
+        self._next_id += 1
         job.id = id
         self._jobs[id] = job
+        log().debug('spawn job %s' % job)
         self._spawn_job(job)
         return job
 
@@ -176,17 +189,80 @@ class JobManager (object):
         if ids == None:
             ids = self._jobs.keys()
         jobs = {}
+        log().debug('wait on %s' % ids)
+        log().debug('jobs: %s' % self._jobs)
         for id in list(ids):  # get already completed jobs
-            if self._jobs[id] != None:
-                jobs[id] = self._jobs.pop(id)
+            if id not in self._jobs:
+                log().debug('%d already gone' % id)
                 ids.remove(id)
+            elif self._jobs[id].status != None:
+                log().debug('%d already returned (%s)' % (id, self._jobs[id].status))
+                ids.remove(id)
+                jobs[id] = self._jobs.pop(id)
         while len(ids) > 0:  # wait for outstanding jobs
             job = self._receive_job()
+            log().debug('receive job %s (%s)' % (job, job.status))
             job.copy_onto(self._jobs[job.id])
-            if job.id in ids:
+            if job.id in ids and job.id in self._jobs:
                 jobs[job.id] = self._jobs.pop(job.id)
-                ids.remove(id)
+                ids.remove(job.id)
         return jobs
 
     def _receive_job(self):
         raise NotImplementedError
+
+
+class IsSubclass (object):
+    """A safe subclass comparator.
+
+    Examples
+    --------
+
+    >>> class A (object):
+    ...     pass
+    >>> class B (A):
+    ...     pass
+    >>> C = 5
+    >>> is_subclass = IsSubclass(A)
+    >>> is_subclass(A)
+    True
+    >>> is_subclass = IsSubclass(A, blacklist=[A])
+    >>> is_subclass(A)
+    False
+    >>> is_subclass(B)
+    True
+    >>> is_subclass(C)
+    False
+    """
+    def __init__(self, base_class, blacklist=None):
+        self.base_class = base_class
+        if blacklist == None:
+            blacklist = []
+        self.blacklist = blacklist
+    def __call__(self, other):
+        try:
+            subclass = issubclass(other, self.base_class)
+        except TypeError:
+            return False
+        if other in self.blacklist:
+            return False
+        return subclass
+
+
+def get_manager(submod):
+    """
+    >>> get_manager('thread')
+    <class 'pysawsim.manager.thread.ThreadManager'>
+    >>> get_manager('wookie')
+    Traceback (most recent call last):
+      ...
+    AttributeError: 'module' object has no attribute 'wookie'
+    """
+    this_mod = __import__(__name__, fromlist=[submod])
+    sub_mod = getattr(this_mod, submod)
+    class_selector = IsSubclass(base_class=JobManager, blacklist=[JobManager])
+    for x_name in dir(sub_mod):
+        x = getattr(sub_mod, x_name)
+        if class_selector(x) == True:
+            return x
+    raise ValueError('no JobManager found in %s' % sub_mod.__name__)