X-Git-Url: http://git.tremily.us/?p=sawsim.git;a=blobdiff_plain;f=pysawsim%2Fmanager%2F__init__.py;h=30f6e874604f8224f35c41256a49574e2753e453;hp=d596785b0293c1630ceda8a3b78a71964680ff9a;hb=HEAD;hpb=14427ffdf0976d9763f77295f1b02f6a6bda527a diff --git a/pysawsim/manager/__init__.py b/pysawsim/manager/__init__.py index d596785..30f6e87 100644 --- a/pysawsim/manager/__init__.py +++ b/pysawsim/manager/__init__.py @@ -18,13 +18,17 @@ # Philadelphia PA 19104, USA. """Functions for running external commands on other hosts. + +See the Python wiki_ for a list of parallel processing modules. + +.. _wiki: http://wiki.python.org/moin/ParallelProcessing """ from .. import invoke as invoke from .. import log -MANAGERS = ['thread', 'pbs'] +MANAGERS = ['thread', 'subproc', 'mpi', 'pbs'] """Submodules with JobManager subclasses.""" @@ -135,17 +139,17 @@ class InvokeJob (Job): >>> j = InvokeJob(id=3, target='missing_command') >>> j.run() - >>> print j.status + >>> print j.status # doctest: +ELLIPSIS Command failed (127): - /bin/sh: missing_command: command not found + /bin/sh: missing_command: ...not found while executing missing_command >>> j.data['stdout'] '' - >>> j.data['stderr'] - '/bin/sh: missing_command: command not found\\n' + >>> j.data['stderr'] # doctest: +ELLIPSIS + '/bin/sh: missing_command: ...not found\\n' """ def run(self): try: @@ -154,7 +158,7 @@ class InvokeJob (Job): self.data = {'stdout':stdout, 'stderr':stderr} except invoke.CommandError, e: self.status = e - self.data = {'stdout':e.stdout, 'stderr':e.stderr} + self.data = {'stdout':e.stdout, 'stderr':e.stderr, 'error':e} class JobManager (object): @@ -201,13 +205,16 @@ class JobManager (object): jobs[id] = self._jobs.pop(id) while len(ids) > 0: # wait for outstanding jobs job = self._receive_job() - log().debug('receive job %s (%s)' % (job, job.status)) - job.copy_onto(self._jobs[job.id]) + self._handle_received_job(job) if job.id in ids and job.id in self._jobs: jobs[job.id] = self._jobs.pop(job.id) ids.remove(job.id) return jobs + def _handle_received_job(self, job): + log().debug('receive job %s (%s)' % (job, job.status)) + job.copy_onto(self._jobs[job.id]) + def _receive_job(self): raise NotImplementedError @@ -249,7 +256,7 @@ class IsSubclass (object): return subclass -def get_manager(submod): +def get_manager(submod=None, defaults=['subproc', 'thread']): """ >>> get_manager('thread') @@ -257,12 +264,28 @@ def get_manager(submod): Traceback (most recent call last): ... AttributeError: 'module' object has no attribute 'wookie' + >>> m = get_manager() + >>> issubclass(m, JobManager) + True """ + if submod == None: + for submod in defaults: + try: + m = get_manager(submod) + except ImportError: + continue + if len(m._bugs) > 0: + continue + return m + raise Exception('none of the managers in %s were enabled' % defaults) this_mod = __import__(__name__, fromlist=[submod]) sub_mod = getattr(this_mod, submod) + if sub_mod._ENABLED == False: + raise sub_mod._DISABLING_ERROR class_selector = IsSubclass(base_class=JobManager, blacklist=[JobManager]) for x_name in dir(sub_mod): x = getattr(sub_mod, x_name) if class_selector(x) == True: + x._bugs = [a for a in dir(sub_mod) if a.startswith('_HAS_BUG_')] return x raise ValueError('no JobManager found in %s' % sub_mod.__name__)