From: W. Trevor King Date: Tue, 19 Oct 2010 20:43:26 +0000 (-0400) Subject: Fix a few deadlock errors in pysawsim.manager.thread. X-Git-Url: http://git.tremily.us/?a=commitdiff_plain;h=30560e70e17081300b49bb5898b806a5bb98beb8;p=sawsim.git Fix a few deadlock errors in pysawsim.manager.thread. --- diff --git a/pysawsim/manager/__init__.py b/pysawsim/manager/__init__.py index 1dbdf7d..6bdba75 100644 --- a/pysawsim/manager/__init__.py +++ b/pysawsim/manager/__init__.py @@ -21,6 +21,7 @@ """ from .. import invoke as invoke +from .. import log class Job (object): @@ -155,17 +156,25 @@ class InvokeJob (Job): class JobManager (object): """Base class for managing asynchronous `Job` execution.""" def __init__(self): + log().debug('setup %s' % self) self._jobs = {} - self.next_id = 0 + self._next_id = 0 + + def __str__(self): + return '<%s %#x>' % (self.__class__.__name__, id(self)) + + def __repr__(self): + return self.__str__() def teardown(self): - pass + log().debug('teardown %s' % self) def async_invoke(self, job): - id = self.next_id - self.next_id += 1 + id = self._next_id + self._next_id += 1 job.id = id self._jobs[id] = job + log().debug('spawn job %s' % job) self._spawn_job(job) return job @@ -176,16 +185,23 @@ class JobManager (object): if ids == None: ids = self._jobs.keys() jobs = {} + log().debug('wait on %s' % ids) + log().debug('jobs: %s' % self._jobs) for id in list(ids): # get already completed jobs - if self._jobs[id] != None: - jobs[id] = self._jobs.pop(id) + if id not in self._jobs: + log().debug('%d already gone' % id) + ids.remove(id) + elif self._jobs[id].status != None: + log().debug('%d already returned (%s)' % (id, self._jobs[id].status)) ids.remove(id) + jobs[id] = self._jobs.pop(id) while len(ids) > 0: # wait for outstanding jobs job = self._receive_job() + log().debug('receive job %s (%d)' % (job, job.status)) job.copy_onto(self._jobs[job.id]) - if job.id in ids: + if job.id in ids and job.id in self._jobs: jobs[job.id] = self._jobs.pop(job.id) - ids.remove(id) + ids.remove(job.id) return jobs def _receive_job(self): diff --git a/pysawsim/manager/thread.py b/pysawsim/manager/thread.py index f963e41..95cae0f 100644 --- a/pysawsim/manager/thread.py +++ b/pysawsim/manager/thread.py @@ -20,7 +20,8 @@ """Functions for running external commands on other hosts. """ -from Queue import Queue +import copy +from Queue import Queue, Empty import threading from .. import log @@ -35,13 +36,16 @@ class WorkerThread (threading.Thread): super(WorkerThread, self).__init__(*args, **kwargs) self.spawn_queue = spawn_queue self.receive_queue = receive_queue + self.name = self.getName() # work around Pythons < 2.6 def run(self): while True: msg = self.spawn_queue.get() if msg == CLOSE_MESSAGE: + log().debug('%s closing' % self.name) break assert isinstance(msg, Job), msg + log().debug('%s running job %s' % (self.name, msg)) msg.run() self.receive_queue.put(msg) @@ -49,25 +53,27 @@ class WorkerThread (threading.Thread): class ThreadManager (JobManager): """Manage asynchronous `Job` execution via :mod:`threading`. + >>> from time import sleep >>> from math import sqrt - >>> t = ThreadManager() + >>> m = ThreadManager() >>> group_A = [] >>> for i in range(10): - ... group_A.append(t.async_invoke(Job(target=sqrt, args=[i]))) + ... t = max(0, 5-i) + ... group_A.append(m.async_invoke(Job(target=sleep, args=[t]))) >>> group_B = [] >>> for i in range(10): - ... group_B.append(t.async_invoke(Job(target=sqrt, args=[i], + ... group_B.append(m.async_invoke(Job(target=sqrt, args=[i], ... blocks_on=[j.id for j in group_A]))) - >>> jobs = t.wait(ids=[j.id for j in group_A[5:8]]) + >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]]) >>> print sorted(jobs.values(), key=lambda j: j.id) [, , ] - >>> jobs = t.wait() + >>> jobs = m.wait() >>> print sorted(jobs.values(), key=lambda j: j.id) ... # doctest: +NORMALIZE_WHITESPACE [, , , , , , , , , , , , , , , , ] - >>> t.teardown() + >>> m.teardown() """ def __init__(self, worker_pool=5): super(ThreadManager, self).__init__() @@ -91,25 +97,31 @@ class ThreadManager (JobManager): worker.join() super(ThreadManager, self).teardown() - def _job_is_blocked(self, job): + def _job_is_blocked(self, job, ignore_id=None): for id in job.blocks_on: - if id in self._jobs and self._jobs[id].status == None: + if id == ignore_id: + continue + elif id in self._jobs and self._jobs[id].status == None: return True return False def _spawn_job(self, job): + self._receive_job(block=False) if self._job_is_blocked(job): + log().debug('block job %s' % job) self._blocked.append(job) - log().debug('queue job %s' % job) - self._spawn_queue.put(job) + return + self._spawn_queue.put(copy.deepcopy(job)) # protect from shared memory - def _receive_job(self): - job = self._receive_queue.get() - for j in self._blocked: + def _receive_job(self, block=True): + try: + job = self._receive_queue.get(block=block) + except Empty: + return + for j in list(self._blocked): if job.id in j.blocks_on: - if not self._job_is_blocked(j): + if not self._job_is_blocked(j, ignore_id=job.id): + log().debug('unblock job %s' % j) self._blocked.remove(j) - log().debug('queue job %s' % j) self._spawn_queue.put(j) - log().debug('receive job %s' % job) return job