"""
from .. import invoke as invoke
+from .. import log
class Job (object):
class JobManager (object):
"""Base class for managing asynchronous `Job` execution."""
def __init__(self):
+ log().debug('setup %s' % self)
self._jobs = {}
- self.next_id = 0
+ self._next_id = 0
+
+ def __str__(self):
+ return '<%s %#x>' % (self.__class__.__name__, id(self))
+
+ def __repr__(self):
+ return self.__str__()
def teardown(self):
- pass
+ log().debug('teardown %s' % self)
def async_invoke(self, job):
- id = self.next_id
- self.next_id += 1
+ id = self._next_id
+ self._next_id += 1
job.id = id
self._jobs[id] = job
+ log().debug('spawn job %s' % job)
self._spawn_job(job)
return job
if ids == None:
ids = self._jobs.keys()
jobs = {}
+ log().debug('wait on %s' % ids)
+ log().debug('jobs: %s' % self._jobs)
for id in list(ids): # get already completed jobs
- if self._jobs[id] != None:
- jobs[id] = self._jobs.pop(id)
+ if id not in self._jobs:
+ log().debug('%d already gone' % id)
+ ids.remove(id)
+ elif self._jobs[id].status != None:
+ log().debug('%d already returned (%s)' % (id, self._jobs[id].status))
ids.remove(id)
+ jobs[id] = self._jobs.pop(id)
while len(ids) > 0: # wait for outstanding jobs
job = self._receive_job()
+ log().debug('receive job %s (%d)' % (job, job.status))
job.copy_onto(self._jobs[job.id])
- if job.id in ids:
+ if job.id in ids and job.id in self._jobs:
jobs[job.id] = self._jobs.pop(job.id)
- ids.remove(id)
+ ids.remove(job.id)
return jobs
def _receive_job(self):
"""Functions for running external commands on other hosts.
"""
-from Queue import Queue
+import copy
+from Queue import Queue, Empty
import threading
from .. import log
super(WorkerThread, self).__init__(*args, **kwargs)
self.spawn_queue = spawn_queue
self.receive_queue = receive_queue
+ self.name = self.getName() # work around Pythons < 2.6
def run(self):
while True:
msg = self.spawn_queue.get()
if msg == CLOSE_MESSAGE:
+ log().debug('%s closing' % self.name)
break
assert isinstance(msg, Job), msg
+ log().debug('%s running job %s' % (self.name, msg))
msg.run()
self.receive_queue.put(msg)
class ThreadManager (JobManager):
"""Manage asynchronous `Job` execution via :mod:`threading`.
+ >>> from time import sleep
>>> from math import sqrt
- >>> t = ThreadManager()
+ >>> m = ThreadManager()
>>> group_A = []
>>> for i in range(10):
- ... group_A.append(t.async_invoke(Job(target=sqrt, args=[i])))
+ ... t = max(0, 5-i)
+ ... group_A.append(m.async_invoke(Job(target=sleep, args=[t])))
>>> group_B = []
>>> for i in range(10):
- ... group_B.append(t.async_invoke(Job(target=sqrt, args=[i],
+ ... group_B.append(m.async_invoke(Job(target=sqrt, args=[i],
... blocks_on=[j.id for j in group_A])))
- >>> jobs = t.wait(ids=[j.id for j in group_A[5:8]])
+ >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]])
>>> print sorted(jobs.values(), key=lambda j: j.id)
[<Job 5>, <Job 6>, <Job 7>]
- >>> jobs = t.wait()
+ >>> jobs = m.wait()
>>> print sorted(jobs.values(), key=lambda j: j.id)
... # doctest: +NORMALIZE_WHITESPACE
[<Job 0>, <Job 1>, <Job 2>, <Job 3>, <Job 4>, <Job 8>, <Job 9>, <Job 10>,
<Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
<Job 18>, <Job 19>]
- >>> t.teardown()
+ >>> m.teardown()
"""
def __init__(self, worker_pool=5):
super(ThreadManager, self).__init__()
worker.join()
super(ThreadManager, self).teardown()
- def _job_is_blocked(self, job):
+ def _job_is_blocked(self, job, ignore_id=None):
for id in job.blocks_on:
- if id in self._jobs and self._jobs[id].status == None:
+ if id == ignore_id:
+ continue
+ elif id in self._jobs and self._jobs[id].status == None:
return True
return False
def _spawn_job(self, job):
+ self._receive_job(block=False)
if self._job_is_blocked(job):
+ log().debug('block job %s' % job)
self._blocked.append(job)
- log().debug('queue job %s' % job)
- self._spawn_queue.put(job)
+ return
+ self._spawn_queue.put(copy.deepcopy(job)) # protect from shared memory
- def _receive_job(self):
- job = self._receive_queue.get()
- for j in self._blocked:
+ def _receive_job(self, block=True):
+ try:
+ job = self._receive_queue.get(block=block)
+ except Empty:
+ return
+ for j in list(self._blocked):
if job.id in j.blocks_on:
- if not self._job_is_blocked(j):
+ if not self._job_is_blocked(j, ignore_id=job.id):
+ log().debug('unblock job %s' % j)
self._blocked.remove(j)
- log().debug('queue job %s' % j)
self._spawn_queue.put(j)
- log().debug('receive job %s' % job)
return job