JobManager.async_invoke() should accept Job instances.
authorW. Trevor King <wking@drexel.edu>
Tue, 19 Oct 2010 13:04:07 +0000 (09:04 -0400)
committerW. Trevor King <wking@drexel.edu>
Tue, 19 Oct 2010 13:04:07 +0000 (09:04 -0400)
It used to attempt to construct them on its own, but with multiple
subclasses (e.g. InvokeJob), that becomes too difficult.

pysawsim/manager/__init__.py
pysawsim/manager/thread.py

index 5deb701376bc5607ea6228c2fc762432860abe4f..1dbdf7d0bba8b616fb79e110b5a241c00e90ea33 100644 (file)
@@ -24,7 +24,8 @@ from .. import invoke as invoke
 
 
 class Job (object):
-    """
+    """Job request structure for `JobManager`.
+
     >>> import copy
 
     Jobs execute a Python function (through an interface similar to
@@ -78,13 +79,14 @@ class Job (object):
     >>> print j.data
     None
     """
-    def __init__(self, id, target, blocks_on=None, args=None, kwargs=None):
-        if blocks_on == None:
-            blocks_on = []
+    def __init__(self, id=None, target=None, args=None, kwargs=None,
+                 blocks_on=None):
         if args == None:
             args = []
         if kwargs == None:
             kwargs = {}
+        if blocks_on == None:
+            blocks_on = []
         self.id = id
         self.target = target
         self.blocks_on = blocks_on
@@ -114,7 +116,7 @@ class Job (object):
 
 
 class InvokeJob (Job):
-    """Job subclass which `invoke()`\s a command line function.
+    """`Job` subclass which `invoke()`\s a command line function.
 
     >>> q = 'What is the meaning of life, the universe, and everything?'
     >>> j = InvokeJob(id=3, target='echo "%s"' % q)
@@ -151,6 +153,7 @@ class InvokeJob (Job):
 
 
 class JobManager (object):
+    """Base class for managing asynchronous `Job` execution."""
     def __init__(self):
         self._jobs = {}
         self.next_id = 0
@@ -158,10 +161,10 @@ class JobManager (object):
     def teardown(self):
         pass
 
-    def async_invoke(self, cmd_string, blocks_on=None, *args, **kwargs):
+    def async_invoke(self, job):
         id = self.next_id
         self.next_id += 1
-        job = Job(id, cmd_string, *args, **kwargs)
+        job.id = id
         self._jobs[id] = job
         self._spawn_job(job)
         return job
index cc5b05d23d277a73c13db0cf1ad8e65e78d65ef8..f963e41c9950dfa860c73ed9605d4c863fa3c453 100644 (file)
@@ -47,15 +47,17 @@ class WorkerThread (threading.Thread):
 
 
 class ThreadManager (JobManager):
-    """
+    """Manage asynchronous `Job` execution via :mod:`threading`.
+
+    >>> from math import sqrt
     >>> t = ThreadManager()
     >>> group_A = []
     >>> for i in range(10):
-    ...     group_A.append(t.async_invoke('echo "%d"' % i))
+    ...     group_A.append(t.async_invoke(Job(target=sqrt, args=[i])))
     >>> group_B = []
     >>> for i in range(10):
-    ...     group_B.append(t.async_invoke('echo "%d"' % i,
-    ...                                   blocks_on=[j.id for j in group_A]))
+    ...     group_B.append(t.async_invoke(Job(target=sqrt, args=[i],
+    ...                 blocks_on=[j.id for j in group_A])))
     >>> jobs = t.wait(ids=[j.id for j in group_A[5:8]])
     >>> print sorted(jobs.values(), key=lambda j: j.id)
     [<Job 5>, <Job 6>, <Job 7>]
@@ -91,7 +93,7 @@ class ThreadManager (JobManager):
 
     def _job_is_blocked(self, job):
         for id in job.blocks_on:
-            if id in self._jobs and self.jobs[id].status == None:
+            if id in self._jobs and self._jobs[id].status == None:
                 return True
         return False