return subclass
-def get_manager(submod):
+def get_manager(submod=None, defaults=['subproc', 'thread']):
"""
>>> get_manager('thread')
<class 'pysawsim.manager.thread.ThreadManager'>
Traceback (most recent call last):
...
AttributeError: 'module' object has no attribute 'wookie'
+ >>> m = get_manager()
+ >>> issubclass(m, JobManager)
+ True
"""
+ if submod == None:
+ for submod in defaults:
+ try:
+ m = get_manager(submod)
+ except ImportError:
+ continue
+ if len(m._bugs) > 0:
+ continue
+ return m
+ raise Exception('none of the managers in %s were enabled' % defaults)
this_mod = __import__(__name__, fromlist=[submod])
sub_mod = getattr(this_mod, submod)
+ if sub_mod._ENABLED == False:
+ raise sub_mod._DISABLING_ERROR
class_selector = IsSubclass(base_class=JobManager, blacklist=[JobManager])
for x_name in dir(sub_mod):
x = getattr(sub_mod, x_name)
if class_selector(x) == True:
+ x._bugs = [a for a in dir(sub_mod) if a.startswith('_HAS_BUG_')]
return x
raise ValueError('no JobManager found in %s' % sub_mod.__name__)
try:
from mpi4py import MPI
+ _ENABLED = True
+ _DISABLING_ERROR = None
if MPI.COMM_WORLD.Get_rank() == 0:
_SKIP = ''
else:
_SKIP = ' # doctest: +SKIP'
-except ImportError, MPI_error:
- MPI = None
+except ImportError, _DISABLING_ERROR:
+ _ENABLED = False
_SKIP = ' # doctest: +SKIP'
from .. import log
def MPI_worker_death():
- if MPI == None:
+ if _ENABLED != True:
return
if MPI.COMM_WORLD.Get_rank() != 0:
sys.exit(0)
def _manager_check():
- assert MPI != None, MPI_error
+ if _ENABLED == False:
+ raise _DISABLING_ERROR
rank = MPI.COMM_WORLD.Get_rank()
assert rank == 0, (
'process %d should have been killed by an MPI_worker_death() call'
try:
import pbs
+ _ENABLED = True
+ _DISABLING_ERROR = None
_SKIP = ''
-except ImportError, pbs_error:
- pbs = None
+except ImportError, _DISABLING_ERROR:
+ _ENABLED = False
_SKIP = ' # doctest: +SKIP'
from .. import invoke
'dequeuing from (.*), state (.*)')
def _setup_pbs(self):
- if pbs == None:
- raise pbs_error
+ if _ENABLED == False:
+ raise _DISABLING_ERROR
self._pbs_server = pbs.pbs_default()
if not self._pbs_server:
raise Exception('No default server: %s' % pbs.error())
try:
from multiprocessing import Manager, Process, Queue, cpu_count
+ _ENABLED = True
+ _DISABLING_ERROR = None
_SKIP = ''
-except ImportError, multiprocessing_error:
+except ImportError, _DISABLING_ERROR:
+ _ENABLED = False
Process = object
_SKIP = ' # doctest: +SKIP'
+
from .. import log
from . import Job
from .thread import ThreadManager, CLOSE_MESSAGE
+
+if _ENABLED == True:
+ """Check succeptibility to python issue 5313:
+ http://bugs.python.org/issue5155
+ http://bugs.python.org/issue5313
+ http://svn.python.org/view?view=rev&revision=73708
+
+ Fix merged into the 2.6 maintenance branch with
+
+ changeset: 531:11e5b7504a71
+ branch: release26-maint
+ user: r.david.murray
+ date: Tue Jul 21 19:02:14 2009 +0200
+ summary: [svn r74145] Merged revisions 73708,73738 via svnmerge...
+
+ Which came between 2.6.2 and 2.6.3rc1
+
+ $ hg blame -r 631 README | grep 'version 2\.6\.'
+ 631: This is Python version 2.6.3rc1
+ $ hg blame -r 630 README | grep 'version 2\.6\.'
+ 345: This is Python version 2.6.2
+ """
+ import sys
+ _HAS_BUG_5313 = sys.version_info < (2,6,3)
+
+
class WorkerProcess (Process):
def __init__(self, spawn_queue, receive_queue, *args, **kwargs):
- if Process == object:
- raise multiprocessing_error
+ if _ENABLED == False:
+ raise _DISABLING_ERROR
super(WorkerProcess, self).__init__(*args, **kwargs)
self.spawn_queue = spawn_queue
self.receive_queue = receive_queue
>>> m.teardown()%(skip)s
""" % {'skip': _SKIP}
def __init__(self, worker_pool=None):
- if Process == object:
- raise multiprocessing_error
+ if _ENABLED == False:
+ raise _DISABLING_ERROR
super(SubprocessManager, self).__init__(worker_pool=worker_pool)
def _setup_queues(self):
from . import Job, JobManager
+_ENABLED = True
+_DISABLING_ERROR = None
+
CLOSE_MESSAGE = "close"