"""Functions for running external commands on other hosts.
"""
-from multiprocessing import Manager, Process, Queue, cpu_count
+try:
+ from multiprocessing import Manager, Process, Queue, cpu_count
+ _SKIP = ''
+except ImportError, multiprocessing_error:
+ Process = object
+ _SKIP = ' # doctest: +SKIP'
from .. import log
from . import Job
from .thread import ThreadManager, CLOSE_MESSAGE
-
class WorkerProcess (Process):
def __init__(self, spawn_queue, receive_queue, *args, **kwargs):
+ if Process == object:
+ raise multiprocessing_error
super(WorkerProcess, self).__init__(*args, **kwargs)
self.spawn_queue = spawn_queue
self.receive_queue = receive_queue
>>> from time import sleep
>>> from math import sqrt
- >>> m = SubprocessManager()
+ >>> m = SubprocessManager()%(skip)s
>>> group_A = []
- >>> for i in range(10):
+ >>> for i in range(10):%(skip)s
... t = max(0, 5-i)
... group_A.append(m.async_invoke(Job(target=sleep, args=[t])))
>>> group_B = []
- >>> for i in range(10):
+ >>> for i in range(10):%(skip)s
... group_B.append(m.async_invoke(Job(target=sqrt, args=[i],
... blocks_on=[j.id for j in group_A])))
- >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]])
- >>> print sorted(jobs.values(), key=lambda j: j.id)
+ >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]])%(skip)s
+ >>> print sorted(jobs.values(), key=lambda j: j.id)%(skip)s
[<Job 5>, <Job 6>, <Job 7>]
- >>> jobs = m.wait()
- >>> print sorted(jobs.values(), key=lambda j: j.id)
+ >>> jobs = m.wait()%(skip)s
+ >>> print sorted(jobs.values(), key=lambda j: j.id)%(skip)s
... # doctest: +NORMALIZE_WHITESPACE
[<Job 0>, <Job 1>, <Job 2>, <Job 3>, <Job 4>, <Job 8>, <Job 9>, <Job 10>,
<Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
<Job 18>, <Job 19>]
- >>> m.teardown()
- """
+ >>> m.teardown()%(skip)s
+ """ % {'skip': _SKIP}
def __init__(self, worker_pool=None):
+ if Process == object:
+ raise multiprocessing_error
super(SubprocessManager, self).__init__(worker_pool=worker_pool)
def _setup_queues(self):