Add ability to import a default manager with get_manager().
authorW. Trevor King <wking@drexel.edu>
Wed, 27 Oct 2010 19:10:58 +0000 (12:10 -0700)
committerW. Trevor King <wking@drexel.edu>
Wed, 27 Oct 2010 19:14:33 +0000 (12:14 -0700)
pysawsim/manager/__init__.py
pysawsim/manager/mpi.py
pysawsim/manager/pbs.py
pysawsim/manager/subproc.py
pysawsim/manager/thread.py

index acf2af6509078750286fd86b426c28a7bd9f8b58..30f6e874604f8224f35c41256a49574e2753e453 100644 (file)
@@ -256,7 +256,7 @@ class IsSubclass (object):
         return subclass
 
 
-def get_manager(submod):
+def get_manager(submod=None, defaults=['subproc', 'thread']):
     """
     >>> get_manager('thread')
     <class 'pysawsim.manager.thread.ThreadManager'>
@@ -264,12 +264,28 @@ def get_manager(submod):
     Traceback (most recent call last):
       ...
     AttributeError: 'module' object has no attribute 'wookie'
+    >>> m = get_manager()
+    >>> issubclass(m, JobManager)
+    True
     """
+    if submod == None:
+        for submod in defaults:
+            try:
+                m = get_manager(submod)
+            except ImportError:
+                continue
+            if len(m._bugs) > 0:
+                continue
+            return m
+        raise Exception('none of the managers in %s were enabled' % defaults)
     this_mod = __import__(__name__, fromlist=[submod])
     sub_mod = getattr(this_mod, submod)
+    if sub_mod._ENABLED == False:
+        raise sub_mod._DISABLING_ERROR
     class_selector = IsSubclass(base_class=JobManager, blacklist=[JobManager])
     for x_name in dir(sub_mod):
         x = getattr(sub_mod, x_name)
         if class_selector(x) == True:
+            x._bugs = [a for a in dir(sub_mod) if a.startswith('_HAS_BUG_')]
             return x
     raise ValueError('no JobManager found in %s' % sub_mod.__name__)
index 0f1b0f841659cf647c386ded8967b39134bef74a..4f2b2f338fac934975bb018c48f0148c2562ed0f 100644 (file)
@@ -55,12 +55,14 @@ from threading import Thread
 
 try:
     from mpi4py import MPI
+    _ENABLED = True
+    _DISABLING_ERROR = None
     if MPI.COMM_WORLD.Get_rank() == 0:
         _SKIP = ''
     else:
         _SKIP = '  # doctest: +SKIP'
-except ImportError, MPI_error:
-    MPI = None
+except ImportError, _DISABLING_ERROR:
+    _ENABLED = False
     _SKIP = '  # doctest: +SKIP'
 
 from .. import log
@@ -74,13 +76,14 @@ RECEIVE_TAG = 101
 
 
 def MPI_worker_death():
-    if MPI == None:
+    if _ENABLED != True:
         return
     if MPI.COMM_WORLD.Get_rank() != 0:
         sys.exit(0)
 
 def _manager_check():
-    assert MPI != None, MPI_error
+    if _ENABLED == False:
+        raise _DISABLING_ERROR
     rank = MPI.COMM_WORLD.Get_rank()
     assert rank == 0, (
         'process %d should have been killed by an MPI_worker_death() call'
index f98a0aaf9897bf9eb6800636a7fbd75c3d03e214..ca237b0f25e4b23e670552ca6033ce1c626967d4 100644 (file)
@@ -40,9 +40,11 @@ import types
 
 try:
     import pbs
+    _ENABLED = True
+    _DISABLING_ERROR = None
     _SKIP = ''
-except ImportError, pbs_error:
-    pbs = None
+except ImportError, _DISABLING_ERROR:
+    _ENABLED = False
     _SKIP = '  # doctest: +SKIP'
 
 from .. import invoke
@@ -100,8 +102,8 @@ class PBSManager (JobManager):
             'dequeuing from (.*), state (.*)')
 
     def _setup_pbs(self):
-        if pbs == None:
-            raise pbs_error
+        if _ENABLED == False:
+            raise _DISABLING_ERROR
         self._pbs_server = pbs.pbs_default()
         if not self._pbs_server:
             raise Exception('No default server: %s' % pbs.error())
index ad0d64bc693595754e2313555ec7078615a06b1c..d767da4011cac35dc66c544a78eff99bac7e6775 100644 (file)
 
 try:
     from multiprocessing import Manager, Process, Queue, cpu_count
+    _ENABLED = True
+    _DISABLING_ERROR = None
     _SKIP = ''
-except ImportError, multiprocessing_error:
+except ImportError, _DISABLING_ERROR:
+    _ENABLED = False
     Process = object
     _SKIP = '  # doctest: +SKIP'
 
+
 from .. import log
 from . import Job
 from .thread import ThreadManager, CLOSE_MESSAGE
 
+
+if _ENABLED == True:
+    """Check succeptibility to python issue 5313:
+      http://bugs.python.org/issue5155
+      http://bugs.python.org/issue5313
+      http://svn.python.org/view?view=rev&revision=73708
+
+    Fix merged into the 2.6 maintenance branch with
+
+      changeset:   531:11e5b7504a71
+      branch:      release26-maint
+      user:        r.david.murray
+      date:        Tue Jul 21 19:02:14 2009 +0200
+      summary:     [svn r74145] Merged revisions 73708,73738 via svnmerge...
+
+    Which came between 2.6.2 and 2.6.3rc1
+
+      $ hg blame -r 631 README | grep 'version 2\.6\.'
+      631: This is Python version 2.6.3rc1
+      $ hg blame -r 630 README | grep 'version 2\.6\.'
+      345: This is Python version 2.6.2
+    """
+    import sys
+    _HAS_BUG_5313 = sys.version_info < (2,6,3)
+
+
 class WorkerProcess (Process):
     def __init__(self, spawn_queue, receive_queue, *args, **kwargs):
-        if Process == object:
-            raise multiprocessing_error
+        if _ENABLED == False:
+            raise _DISABLING_ERROR
         super(WorkerProcess, self).__init__(*args, **kwargs)
         self.spawn_queue = spawn_queue
         self.receive_queue = receive_queue
@@ -77,8 +107,8 @@ class SubprocessManager (ThreadManager):
     >>> m.teardown()%(skip)s
     """ % {'skip': _SKIP}
     def __init__(self, worker_pool=None):
-        if Process == object:
-            raise multiprocessing_error
+        if _ENABLED == False:
+            raise _DISABLING_ERROR
         super(SubprocessManager, self).__init__(worker_pool=worker_pool)
 
     def _setup_queues(self):
index 5eb0edcfc0b904726b98bd6121dfdde271ba975f..7c62db091024c7af92fff18feda9b65cc71e4a92 100644 (file)
@@ -28,6 +28,9 @@ from .. import log
 from . import Job, JobManager
 
 
+_ENABLED = True
+_DISABLING_ERROR = None
+
 CLOSE_MESSAGE = "close"