"""Functions for running external commands on other hosts.
"""
-from multiprocessing import Manager, Process, Queue, cpu_count
+try:
+ from multiprocessing import Manager, Process, Queue, cpu_count
+ _ENABLED = True
+ _DISABLING_ERROR = None
+ _SKIP = ''
+except ImportError, _DISABLING_ERROR:
+ _ENABLED = False
+ Process = object
+ _SKIP = ' # doctest: +SKIP'
+
+import os
from .. import log
from . import Job
from .thread import ThreadManager, CLOSE_MESSAGE
+if _ENABLED == True:
+ """Check succeptibility to python issue 5313:
+ http://bugs.python.org/issue5155
+ http://bugs.python.org/issue5313
+ http://svn.python.org/view?view=rev&revision=73708
+
+ Fix merged into the 2.6 maintenance branch with
+
+ changeset: 531:11e5b7504a71
+ branch: release26-maint
+ user: r.david.murray
+ date: Tue Jul 21 19:02:14 2009 +0200
+ summary: [svn r74145] Merged revisions 73708,73738 via svnmerge...
+
+ Which came between 2.6.2 and 2.6.3rc1
+
+ $ hg blame -r 631 README | grep 'version 2\.6\.'
+ 631: This is Python version 2.6.3rc1
+ $ hg blame -r 630 README | grep 'version 2\.6\.'
+ 345: This is Python version 2.6.2
+ """
+ import sys
+ _HAS_BUG_5313 = sys.version_info < (2,6,3)
+
+
class WorkerProcess (Process):
def __init__(self, spawn_queue, receive_queue, *args, **kwargs):
+ if _ENABLED == False:
+ raise _DISABLING_ERROR
super(WorkerProcess, self).__init__(*args, **kwargs)
self.spawn_queue = spawn_queue
self.receive_queue = receive_queue
>>> from time import sleep
>>> from math import sqrt
- >>> m = SubprocessManager()
+ >>> m = SubprocessManager()%(skip)s
>>> group_A = []
- >>> for i in range(10):
+ >>> for i in range(10):%(skip)s
... t = max(0, 5-i)
... group_A.append(m.async_invoke(Job(target=sleep, args=[t])))
>>> group_B = []
- >>> for i in range(10):
+ >>> for i in range(10):%(skip)s
... group_B.append(m.async_invoke(Job(target=sqrt, args=[i],
... blocks_on=[j.id for j in group_A])))
- >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]])
- >>> print sorted(jobs.values(), key=lambda j: j.id)
+ >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]])%(skip)s
+ >>> print sorted(jobs.values(), key=lambda j: j.id)%(skip)s
[<Job 5>, <Job 6>, <Job 7>]
- >>> jobs = m.wait()
- >>> print sorted(jobs.values(), key=lambda j: j.id)
+ >>> jobs = m.wait()%(skip)s
+ >>> print sorted(jobs.values(), key=lambda j: j.id)%(skip)s
... # doctest: +NORMALIZE_WHITESPACE
[<Job 0>, <Job 1>, <Job 2>, <Job 3>, <Job 4>, <Job 8>, <Job 9>, <Job 10>,
<Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
<Job 18>, <Job 19>]
- >>> m.teardown()
- """
+ >>> m.teardown()%(skip)s
+ """ % {'skip': _SKIP}
def __init__(self, worker_pool=None):
+ if _ENABLED == False:
+ raise _DISABLING_ERROR
super(SubprocessManager, self).__init__(worker_pool=worker_pool)
def _setup_queues(self):
self._receive_queue = Queue()
def _spawn_workers(self, worker_pool=None):
- if worker_pool == None:
- worker_pool = cpu_count() + 1
+ if worker_pool is None:
+ worker_pool = int(os.environ.get('WORKER_POOL', cpu_count() + 1))
self._manager = Manager()
self._workers = []
for i in range(worker_pool):