Add WORKER_POOL environmental option to facilitate pysawsim.manager benchmarking.
[sawsim.git] / pysawsim / manager / thread.py
index 95cae0ff36ba077093f7a8b4e97cee2d734d62ec..a4e98f5cf5a0b508c520b713c4b644616ecb4ae5 100644 (file)
@@ -21,6 +21,7 @@
 """
 
 import copy
+import os
 from Queue import Queue, Empty
 import threading
 
@@ -28,6 +29,9 @@ from .. import log
 from . import Job, JobManager
 
 
+_ENABLED = True
+_DISABLING_ERROR = None
+
 CLOSE_MESSAGE = "close"
 
 
@@ -74,18 +78,38 @@ class ThreadManager (JobManager):
      <Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
      <Job 18>, <Job 19>]
     >>> m.teardown()
+
+    Note that Python's Global Interpreter Lock (GIL) currently limits
+    threads to a single core.  See the following discussions:
+
+    * http://smoothspan.wordpress.com/2007/09/14/guido-is-right-to-leave-the-gil-in-python-not-for-multicore-but-for-utility-computing/
+    * http://docs.python.org/faq/library#can-t-we-get-rid-of-the-global-interpreter-lock
+    * http://www.artima.com/weblogs/viewpost.jsp?thread=214235
+    * http://www.snaplogic.com/blog/?p=94
+    * http://stackoverflow.com/questions/31340/
+
+    Increasing `worker_pool` will only help you get around IO blockin
+    at the cost increased time-slicing overhead.
     """
-    def __init__(self, worker_pool=5):
+    def __init__(self, worker_pool=None):
         super(ThreadManager, self).__init__()
         self._blocked = []
+        self._setup_queues()
+        self._spawn_workers(worker_pool)
+
+    def _setup_queues(self):
         self._spawn_queue = Queue()
         self._receive_queue = Queue()
+
+    def _spawn_workers(self, worker_pool):
+        if worker_pool is None:
+            worker_pool = int(os.environ.get('WORKER_POOL', 2))
         self._workers = []
         for i in range(worker_pool):
             worker = WorkerThread(spawn_queue=self._spawn_queue,
                                   receive_queue=self._receive_queue,
                                   name='worker-%d' % i)
-            log().debug('start thread %s' % worker.name)
+            log().debug('start %s' % worker.name)
             worker.start()
             self._workers.append(worker)
 
@@ -93,7 +117,7 @@ class ThreadManager (JobManager):
         for worker in self._workers:
             self._spawn_queue.put(CLOSE_MESSAGE)
         for worker in self._workers:
-            log().debug('join thread %s' % worker.name)
+            log().debug('join %s' % worker.name)
             worker.join()
         super(ThreadManager, self).teardown()
 
@@ -106,12 +130,25 @@ class ThreadManager (JobManager):
         return False
 
     def _spawn_job(self, job):
-        self._receive_job(block=False)
+        j = self._receive_job(block=False)
+        if j != None:
+            self._handle_received_job(j)
         if self._job_is_blocked(job):
             log().debug('block job %s' % job)
             self._blocked.append(job)
             return
-        self._spawn_queue.put(copy.deepcopy(job))  # protect from shared memory
+        self._put_job_in_spawn_queue(job)
+
+    def _put_job_in_spawn_queue(self, job):
+        """Place a job in the spawn queue.
+
+        Threads share memory, so we need to send a copy of `job` to
+        protect the local copy from unmanaged changes.
+
+        Broken out to a method to allow code sharing with
+        SubprocessManager.
+        """
+        self._spawn_queue.put(copy.deepcopy(job))
 
     def _receive_job(self, block=True):
         try: