class Job (object):
- """
+ """Job request structure for `JobManager`.
+
>>> import copy
Jobs execute a Python function (through an interface similar to
>>> print j.data
None
"""
- def __init__(self, id, target, blocks_on=None, args=None, kwargs=None):
- if blocks_on == None:
- blocks_on = []
+ def __init__(self, id=None, target=None, args=None, kwargs=None,
+ blocks_on=None):
if args == None:
args = []
if kwargs == None:
kwargs = {}
+ if blocks_on == None:
+ blocks_on = []
self.id = id
self.target = target
self.blocks_on = blocks_on
class InvokeJob (Job):
- """Job subclass which `invoke()`\s a command line function.
+ """`Job` subclass which `invoke()`\s a command line function.
>>> q = 'What is the meaning of life, the universe, and everything?'
>>> j = InvokeJob(id=3, target='echo "%s"' % q)
class JobManager (object):
+ """Base class for managing asynchronous `Job` execution."""
def __init__(self):
self._jobs = {}
self.next_id = 0
def teardown(self):
pass
- def async_invoke(self, cmd_string, blocks_on=None, *args, **kwargs):
+ def async_invoke(self, job):
id = self.next_id
self.next_id += 1
- job = Job(id, cmd_string, *args, **kwargs)
+ job.id = id
self._jobs[id] = job
self._spawn_job(job)
return job
class ThreadManager (JobManager):
- """
+ """Manage asynchronous `Job` execution via :mod:`threading`.
+
+ >>> from math import sqrt
>>> t = ThreadManager()
>>> group_A = []
>>> for i in range(10):
- ... group_A.append(t.async_invoke('echo "%d"' % i))
+ ... group_A.append(t.async_invoke(Job(target=sqrt, args=[i])))
>>> group_B = []
>>> for i in range(10):
- ... group_B.append(t.async_invoke('echo "%d"' % i,
- ... blocks_on=[j.id for j in group_A]))
+ ... group_B.append(t.async_invoke(Job(target=sqrt, args=[i],
+ ... blocks_on=[j.id for j in group_A])))
>>> jobs = t.wait(ids=[j.id for j in group_A[5:8]])
>>> print sorted(jobs.values(), key=lambda j: j.id)
[<Job 5>, <Job 6>, <Job 7>]
def _job_is_blocked(self, job):
for id in job.blocks_on:
- if id in self._jobs and self.jobs[id].status == None:
+ if id in self._jobs and self._jobs[id].status == None:
return True
return False