try:
from multiprocessing import Manager, Process, Queue, cpu_count
+ _ENABLED = True
+ _DISABLING_ERROR = None
_SKIP = ''
-except ImportError, multiprocessing_error:
+except ImportError, _DISABLING_ERROR:
+ _ENABLED = False
Process = object
_SKIP = ' # doctest: +SKIP'
+import os
+
from .. import log
from . import Job
from .thread import ThreadManager, CLOSE_MESSAGE
+
+if _ENABLED == True:
+ """Check succeptibility to python issue 5313:
+ http://bugs.python.org/issue5155
+ http://bugs.python.org/issue5313
+ http://svn.python.org/view?view=rev&revision=73708
+
+ Fix merged into the 2.6 maintenance branch with
+
+ changeset: 531:11e5b7504a71
+ branch: release26-maint
+ user: r.david.murray
+ date: Tue Jul 21 19:02:14 2009 +0200
+ summary: [svn r74145] Merged revisions 73708,73738 via svnmerge...
+
+ Which came between 2.6.2 and 2.6.3rc1
+
+ $ hg blame -r 631 README | grep 'version 2\.6\.'
+ 631: This is Python version 2.6.3rc1
+ $ hg blame -r 630 README | grep 'version 2\.6\.'
+ 345: This is Python version 2.6.2
+ """
+ import sys
+ _HAS_BUG_5313 = sys.version_info < (2,6,3)
+
+
class WorkerProcess (Process):
def __init__(self, spawn_queue, receive_queue, *args, **kwargs):
- if Process == object:
- raise multiprocessing_error
+ if _ENABLED == False:
+ raise _DISABLING_ERROR
super(WorkerProcess, self).__init__(*args, **kwargs)
self.spawn_queue = spawn_queue
self.receive_queue = receive_queue
>>> m.teardown()%(skip)s
""" % {'skip': _SKIP}
def __init__(self, worker_pool=None):
- if Process == object:
- raise multiprocessing_error
+ if _ENABLED == False:
+ raise _DISABLING_ERROR
super(SubprocessManager, self).__init__(worker_pool=worker_pool)
def _setup_queues(self):
self._receive_queue = Queue()
def _spawn_workers(self, worker_pool=None):
- if worker_pool == None:
- worker_pool = cpu_count() + 1
+ if worker_pool is None:
+ worker_pool = int(os.environ.get('WORKER_POOL', cpu_count() + 1))
self._manager = Manager()
self._workers = []
for i in range(worker_pool):