"""Functions for running external commands on other hosts.
"""
-from Queue import Queue
+import copy
+import os
+from Queue import Queue, Empty
import threading
from .. import log
from . import Job, JobManager
+_ENABLED = True
+_DISABLING_ERROR = None
+
CLOSE_MESSAGE = "close"
super(WorkerThread, self).__init__(*args, **kwargs)
self.spawn_queue = spawn_queue
self.receive_queue = receive_queue
+ self.name = self.getName() # work around Pythons < 2.6
def run(self):
while True:
msg = self.spawn_queue.get()
if msg == CLOSE_MESSAGE:
+ log().debug('%s closing' % self.name)
break
assert isinstance(msg, Job), msg
+ log().debug('%s running job %s' % (self.name, msg))
msg.run()
self.receive_queue.put(msg)
class ThreadManager (JobManager):
"""Manage asynchronous `Job` execution via :mod:`threading`.
+ >>> from time import sleep
>>> from math import sqrt
- >>> t = ThreadManager()
+ >>> m = ThreadManager()
>>> group_A = []
>>> for i in range(10):
- ... group_A.append(t.async_invoke(Job(target=sqrt, args=[i])))
+ ... t = max(0, 5-i)
+ ... group_A.append(m.async_invoke(Job(target=sleep, args=[t])))
>>> group_B = []
>>> for i in range(10):
- ... group_B.append(t.async_invoke(Job(target=sqrt, args=[i],
+ ... group_B.append(m.async_invoke(Job(target=sqrt, args=[i],
... blocks_on=[j.id for j in group_A])))
- >>> jobs = t.wait(ids=[j.id for j in group_A[5:8]])
+ >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]])
>>> print sorted(jobs.values(), key=lambda j: j.id)
[<Job 5>, <Job 6>, <Job 7>]
- >>> jobs = t.wait()
+ >>> jobs = m.wait()
>>> print sorted(jobs.values(), key=lambda j: j.id)
... # doctest: +NORMALIZE_WHITESPACE
[<Job 0>, <Job 1>, <Job 2>, <Job 3>, <Job 4>, <Job 8>, <Job 9>, <Job 10>,
<Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
<Job 18>, <Job 19>]
- >>> t.teardown()
+ >>> m.teardown()
+
+ Note that Python's Global Interpreter Lock (GIL) currently limits
+ threads to a single core. See the following discussions:
+
+ * http://smoothspan.wordpress.com/2007/09/14/guido-is-right-to-leave-the-gil-in-python-not-for-multicore-but-for-utility-computing/
+ * http://docs.python.org/faq/library#can-t-we-get-rid-of-the-global-interpreter-lock
+ * http://www.artima.com/weblogs/viewpost.jsp?thread=214235
+ * http://www.snaplogic.com/blog/?p=94
+ * http://stackoverflow.com/questions/31340/
+
+ Increasing `worker_pool` will only help you get around IO blockin
+ at the cost increased time-slicing overhead.
"""
- def __init__(self, worker_pool=5):
+ def __init__(self, worker_pool=None):
super(ThreadManager, self).__init__()
self._blocked = []
+ self._setup_queues()
+ self._spawn_workers(worker_pool)
+
+ def _setup_queues(self):
self._spawn_queue = Queue()
self._receive_queue = Queue()
+
+ def _spawn_workers(self, worker_pool):
+ if worker_pool is None:
+ worker_pool = int(os.environ.get('WORKER_POOL', 2))
self._workers = []
for i in range(worker_pool):
worker = WorkerThread(spawn_queue=self._spawn_queue,
receive_queue=self._receive_queue,
name='worker-%d' % i)
- log().debug('start thread %s' % worker.name)
+ log().debug('start %s' % worker.name)
worker.start()
self._workers.append(worker)
for worker in self._workers:
self._spawn_queue.put(CLOSE_MESSAGE)
for worker in self._workers:
- log().debug('join thread %s' % worker.name)
+ log().debug('join %s' % worker.name)
worker.join()
super(ThreadManager, self).teardown()
- def _job_is_blocked(self, job):
+ def _job_is_blocked(self, job, ignore_id=None):
for id in job.blocks_on:
- if id in self._jobs and self._jobs[id].status == None:
+ if id == ignore_id:
+ continue
+ elif id in self._jobs and self._jobs[id].status == None:
return True
return False
def _spawn_job(self, job):
+ j = self._receive_job(block=False)
+ if j != None:
+ self._handle_received_job(j)
if self._job_is_blocked(job):
+ log().debug('block job %s' % job)
self._blocked.append(job)
- log().debug('queue job %s' % job)
- self._spawn_queue.put(job)
+ return
+ self._put_job_in_spawn_queue(job)
+
+ def _put_job_in_spawn_queue(self, job):
+ """Place a job in the spawn queue.
+
+ Threads share memory, so we need to send a copy of `job` to
+ protect the local copy from unmanaged changes.
+
+ Broken out to a method to allow code sharing with
+ SubprocessManager.
+ """
+ self._spawn_queue.put(copy.deepcopy(job))
- def _receive_job(self):
- job = self._receive_queue.get()
- for j in self._blocked:
+ def _receive_job(self, block=True):
+ try:
+ job = self._receive_queue.get(block=block)
+ except Empty:
+ return
+ for j in list(self._blocked):
if job.id in j.blocks_on:
- if not self._job_is_blocked(j):
+ if not self._job_is_blocked(j, ignore_id=job.id):
+ log().debug('unblock job %s' % j)
self._blocked.remove(j)
- log().debug('queue job %s' % j)
self._spawn_queue.put(j)
- log().debug('receive job %s' % job)
return job