Fix a few deadlock errors in pysawsim.manager.thread.
authorW. Trevor King <wking@drexel.edu>
Tue, 19 Oct 2010 20:43:26 +0000 (16:43 -0400)
committerW. Trevor King <wking@drexel.edu>
Tue, 19 Oct 2010 20:43:26 +0000 (16:43 -0400)
pysawsim/manager/__init__.py
pysawsim/manager/thread.py

index 1dbdf7d0bba8b616fb79e110b5a241c00e90ea33..6bdba75991aa6abe27fe768dda113c447ce09507 100644 (file)
@@ -21,6 +21,7 @@
 """
 
 from .. import invoke as invoke
+from .. import log
 
 
 class Job (object):
@@ -155,17 +156,25 @@ class InvokeJob (Job):
 class JobManager (object):
     """Base class for managing asynchronous `Job` execution."""
     def __init__(self):
+        log().debug('setup %s' % self)
         self._jobs = {}
-        self.next_id = 0
+        self._next_id = 0
+
+    def __str__(self):
+        return '<%s %#x>' % (self.__class__.__name__, id(self))
+
+    def __repr__(self):
+        return self.__str__()
 
     def teardown(self):
-        pass
+        log().debug('teardown %s' % self)
 
     def async_invoke(self, job):
-        id = self.next_id
-        self.next_id += 1
+        id = self._next_id
+        self._next_id += 1
         job.id = id
         self._jobs[id] = job
+        log().debug('spawn job %s' % job)
         self._spawn_job(job)
         return job
 
@@ -176,16 +185,23 @@ class JobManager (object):
         if ids == None:
             ids = self._jobs.keys()
         jobs = {}
+        log().debug('wait on %s' % ids)
+        log().debug('jobs: %s' % self._jobs)
         for id in list(ids):  # get already completed jobs
-            if self._jobs[id] != None:
-                jobs[id] = self._jobs.pop(id)
+            if id not in self._jobs:
+                log().debug('%d already gone' % id)
+                ids.remove(id)
+            elif self._jobs[id].status != None:
+                log().debug('%d already returned (%s)' % (id, self._jobs[id].status))
                 ids.remove(id)
+                jobs[id] = self._jobs.pop(id)
         while len(ids) > 0:  # wait for outstanding jobs
             job = self._receive_job()
+            log().debug('receive job %s (%d)' % (job, job.status))
             job.copy_onto(self._jobs[job.id])
-            if job.id in ids:
+            if job.id in ids and job.id in self._jobs:
                 jobs[job.id] = self._jobs.pop(job.id)
-                ids.remove(id)
+                ids.remove(job.id)
         return jobs
 
     def _receive_job(self):
index f963e41c9950dfa860c73ed9605d4c863fa3c453..95cae0ff36ba077093f7a8b4e97cee2d734d62ec 100644 (file)
@@ -20,7 +20,8 @@
 """Functions for running external commands on other hosts.
 """
 
-from Queue import Queue
+import copy
+from Queue import Queue, Empty
 import threading
 
 from .. import log
@@ -35,13 +36,16 @@ class WorkerThread (threading.Thread):
         super(WorkerThread, self).__init__(*args, **kwargs)
         self.spawn_queue = spawn_queue
         self.receive_queue = receive_queue
+        self.name = self.getName()  # work around Pythons < 2.6
 
     def run(self):
         while True:
             msg = self.spawn_queue.get()
             if msg == CLOSE_MESSAGE:
+                log().debug('%s closing' % self.name)
                 break
             assert isinstance(msg, Job), msg
+            log().debug('%s running job %s' % (self.name, msg))
             msg.run()
             self.receive_queue.put(msg)
 
@@ -49,25 +53,27 @@ class WorkerThread (threading.Thread):
 class ThreadManager (JobManager):
     """Manage asynchronous `Job` execution via :mod:`threading`.
 
+    >>> from time import sleep
     >>> from math import sqrt
-    >>> t = ThreadManager()
+    >>> m = ThreadManager()
     >>> group_A = []
     >>> for i in range(10):
-    ...     group_A.append(t.async_invoke(Job(target=sqrt, args=[i])))
+    ...     t = max(0, 5-i)
+    ...     group_A.append(m.async_invoke(Job(target=sleep, args=[t])))
     >>> group_B = []
     >>> for i in range(10):
-    ...     group_B.append(t.async_invoke(Job(target=sqrt, args=[i],
+    ...     group_B.append(m.async_invoke(Job(target=sqrt, args=[i],
     ...                 blocks_on=[j.id for j in group_A])))
-    >>> jobs = t.wait(ids=[j.id for j in group_A[5:8]])
+    >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]])
     >>> print sorted(jobs.values(), key=lambda j: j.id)
     [<Job 5>, <Job 6>, <Job 7>]
-    >>> jobs = t.wait()
+    >>> jobs = m.wait()
     >>> print sorted(jobs.values(), key=lambda j: j.id)
     ... # doctest: +NORMALIZE_WHITESPACE
     [<Job 0>, <Job 1>, <Job 2>, <Job 3>, <Job 4>, <Job 8>, <Job 9>, <Job 10>,
      <Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
      <Job 18>, <Job 19>]
-    >>> t.teardown()
+    >>> m.teardown()
     """
     def __init__(self, worker_pool=5):
         super(ThreadManager, self).__init__()
@@ -91,25 +97,31 @@ class ThreadManager (JobManager):
             worker.join()
         super(ThreadManager, self).teardown()
 
-    def _job_is_blocked(self, job):
+    def _job_is_blocked(self, job, ignore_id=None):
         for id in job.blocks_on:
-            if id in self._jobs and self._jobs[id].status == None:
+            if id == ignore_id:
+                continue
+            elif id in self._jobs and self._jobs[id].status == None:
                 return True
         return False
 
     def _spawn_job(self, job):
+        self._receive_job(block=False)
         if self._job_is_blocked(job):
+            log().debug('block job %s' % job)
             self._blocked.append(job)
-        log().debug('queue job %s' % job)
-        self._spawn_queue.put(job)
+            return
+        self._spawn_queue.put(copy.deepcopy(job))  # protect from shared memory
 
-    def _receive_job(self):
-        job = self._receive_queue.get()
-        for j in self._blocked:
+    def _receive_job(self, block=True):
+        try:
+            job = self._receive_queue.get(block=block)
+        except Empty:
+            return
+        for j in list(self._blocked):
             if job.id in j.blocks_on:
-                if not self._job_is_blocked(j):
+                if not self._job_is_blocked(j, ignore_id=job.id):
+                    log().debug('unblock job %s' % j)
                     self._blocked.remove(j)
-                    log().debug('queue job %s' % j)
                     self._spawn_queue.put(j)
-        log().debug('receive job %s' % job)
         return job