<Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
<Job 18>, <Job 19>]
>>> m.teardown()
+
+ Note that Python's Global Interpreter Lock (GIL) currently limits
+ threads to a single core. See the following discussions:
+
+ * http://smoothspan.wordpress.com/2007/09/14/guido-is-right-to-leave-the-gil-in-python-not-for-multicore-but-for-utility-computing/
+ * http://docs.python.org/faq/library#id18
+ * http://www.artima.com/weblogs/viewpost.jsp?thread=214235
+ * http://www.snaplogic.com/blog/?p=94
+ * http://stackoverflow.com/questions/31340/
+
+ Increasing `worker_pool` will only help you get around IO blockin
+ at the cost increased time-slicing overhead.
"""
- def __init__(self, worker_pool=5):
+ def __init__(self, worker_pool=2):
super(ThreadManager, self).__init__()
self._blocked = []
+ self._setup_queues()
+ self._spawn_workers(worker_pool)
+
+ def _setup_queues(self):
self._spawn_queue = Queue()
self._receive_queue = Queue()
+
+ def _spawn_workers(self, worker_pool):
self._workers = []
for i in range(worker_pool):
worker = WorkerThread(spawn_queue=self._spawn_queue,
receive_queue=self._receive_queue,
name='worker-%d' % i)
- log().debug('start thread %s' % worker.name)
+ log().debug('start %s' % worker.name)
worker.start()
self._workers.append(worker)
for worker in self._workers:
self._spawn_queue.put(CLOSE_MESSAGE)
for worker in self._workers:
- log().debug('join thread %s' % worker.name)
+ log().debug('join %s' % worker.name)
worker.join()
super(ThreadManager, self).teardown()
log().debug('block job %s' % job)
self._blocked.append(job)
return
- self._spawn_queue.put(copy.deepcopy(job)) # protect from shared memory
+ self._put_job_in_spawn_queue(job)
+
+ def _put_job_in_spawn_queue(self, job):
+ """Place a job in the spawn queue.
+
+ Threads share memory, so we need to send a copy of `job` to
+ protect the local copy from unmanaged changes.
+
+ Broken out to a method to allow code sharing with
+ SubprocessManager.
+ """
+ self._spawn_queue.put(copy.deepcopy(job))
def _receive_job(self, block=True):
try: