Add WORKER_POOL environmental option to facilitate pysawsim.manager benchmarking.
[sawsim.git] / pysawsim / manager / thread.py
index cc5b05d23d277a73c13db0cf1ad8e65e78d65ef8..a4e98f5cf5a0b508c520b713c4b644616ecb4ae5 100644 (file)
 """Functions for running external commands on other hosts.
 """
 
-from Queue import Queue
+import copy
+import os
+from Queue import Queue, Empty
 import threading
 
 from .. import log
 from . import Job, JobManager
 
 
+_ENABLED = True
+_DISABLING_ERROR = None
+
 CLOSE_MESSAGE = "close"
 
 
@@ -35,49 +40,76 @@ class WorkerThread (threading.Thread):
         super(WorkerThread, self).__init__(*args, **kwargs)
         self.spawn_queue = spawn_queue
         self.receive_queue = receive_queue
+        self.name = self.getName()  # work around Pythons < 2.6
 
     def run(self):
         while True:
             msg = self.spawn_queue.get()
             if msg == CLOSE_MESSAGE:
+                log().debug('%s closing' % self.name)
                 break
             assert isinstance(msg, Job), msg
+            log().debug('%s running job %s' % (self.name, msg))
             msg.run()
             self.receive_queue.put(msg)
 
 
 class ThreadManager (JobManager):
-    """
-    >>> t = ThreadManager()
+    """Manage asynchronous `Job` execution via :mod:`threading`.
+
+    >>> from time import sleep
+    >>> from math import sqrt
+    >>> m = ThreadManager()
     >>> group_A = []
     >>> for i in range(10):
-    ...     group_A.append(t.async_invoke('echo "%d"' % i))
+    ...     t = max(0, 5-i)
+    ...     group_A.append(m.async_invoke(Job(target=sleep, args=[t])))
     >>> group_B = []
     >>> for i in range(10):
-    ...     group_B.append(t.async_invoke('echo "%d"' % i,
-    ...                                   blocks_on=[j.id for j in group_A]))
-    >>> jobs = t.wait(ids=[j.id for j in group_A[5:8]])
+    ...     group_B.append(m.async_invoke(Job(target=sqrt, args=[i],
+    ...                 blocks_on=[j.id for j in group_A])))
+    >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]])
     >>> print sorted(jobs.values(), key=lambda j: j.id)
     [<Job 5>, <Job 6>, <Job 7>]
-    >>> jobs = t.wait()
+    >>> jobs = m.wait()
     >>> print sorted(jobs.values(), key=lambda j: j.id)
     ... # doctest: +NORMALIZE_WHITESPACE
     [<Job 0>, <Job 1>, <Job 2>, <Job 3>, <Job 4>, <Job 8>, <Job 9>, <Job 10>,
      <Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
      <Job 18>, <Job 19>]
-    >>> t.teardown()
+    >>> m.teardown()
+
+    Note that Python's Global Interpreter Lock (GIL) currently limits
+    threads to a single core.  See the following discussions:
+
+    * http://smoothspan.wordpress.com/2007/09/14/guido-is-right-to-leave-the-gil-in-python-not-for-multicore-but-for-utility-computing/
+    * http://docs.python.org/faq/library#can-t-we-get-rid-of-the-global-interpreter-lock
+    * http://www.artima.com/weblogs/viewpost.jsp?thread=214235
+    * http://www.snaplogic.com/blog/?p=94
+    * http://stackoverflow.com/questions/31340/
+
+    Increasing `worker_pool` will only help you get around IO blockin
+    at the cost increased time-slicing overhead.
     """
-    def __init__(self, worker_pool=5):
+    def __init__(self, worker_pool=None):
         super(ThreadManager, self).__init__()
         self._blocked = []
+        self._setup_queues()
+        self._spawn_workers(worker_pool)
+
+    def _setup_queues(self):
         self._spawn_queue = Queue()
         self._receive_queue = Queue()
+
+    def _spawn_workers(self, worker_pool):
+        if worker_pool is None:
+            worker_pool = int(os.environ.get('WORKER_POOL', 2))
         self._workers = []
         for i in range(worker_pool):
             worker = WorkerThread(spawn_queue=self._spawn_queue,
                                   receive_queue=self._receive_queue,
                                   name='worker-%d' % i)
-            log().debug('start thread %s' % worker.name)
+            log().debug('start %s' % worker.name)
             worker.start()
             self._workers.append(worker)
 
@@ -85,29 +117,48 @@ class ThreadManager (JobManager):
         for worker in self._workers:
             self._spawn_queue.put(CLOSE_MESSAGE)
         for worker in self._workers:
-            log().debug('join thread %s' % worker.name)
+            log().debug('join %s' % worker.name)
             worker.join()
         super(ThreadManager, self).teardown()
 
-    def _job_is_blocked(self, job):
+    def _job_is_blocked(self, job, ignore_id=None):
         for id in job.blocks_on:
-            if id in self._jobs and self.jobs[id].status == None:
+            if id == ignore_id:
+                continue
+            elif id in self._jobs and self._jobs[id].status == None:
                 return True
         return False
 
     def _spawn_job(self, job):
+        j = self._receive_job(block=False)
+        if j != None:
+            self._handle_received_job(j)
         if self._job_is_blocked(job):
+            log().debug('block job %s' % job)
             self._blocked.append(job)
-        log().debug('queue job %s' % job)
-        self._spawn_queue.put(job)
+            return
+        self._put_job_in_spawn_queue(job)
+
+    def _put_job_in_spawn_queue(self, job):
+        """Place a job in the spawn queue.
+
+        Threads share memory, so we need to send a copy of `job` to
+        protect the local copy from unmanaged changes.
+
+        Broken out to a method to allow code sharing with
+        SubprocessManager.
+        """
+        self._spawn_queue.put(copy.deepcopy(job))
 
-    def _receive_job(self):
-        job = self._receive_queue.get()
-        for j in self._blocked:
+    def _receive_job(self, block=True):
+        try:
+            job = self._receive_queue.get(block=block)
+        except Empty:
+            return
+        for j in list(self._blocked):
             if job.id in j.blocks_on:
-                if not self._job_is_blocked(j):
+                if not self._job_is_blocked(j, ignore_id=job.id):
+                    log().debug('unblock job %s' % j)
                     self._blocked.remove(j)
-                    log().debug('queue job %s' % j)
                     self._spawn_queue.put(j)
-        log().debug('receive job %s' % job)
         return job