Add pysawsim.manager.subproc using subprocessing.
authorW. Trevor King <wking@drexel.edu>
Wed, 20 Oct 2010 10:38:36 +0000 (06:38 -0400)
committerW. Trevor King <wking@drexel.edu>
Wed, 20 Oct 2010 10:38:36 +0000 (06:38 -0400)
pysawsim/manager/__init__.py
pysawsim/manager/subproc.py [new file with mode: 0644]
pysawsim/manager/thread.py

index d596785b0293c1630ceda8a3b78a71964680ff9a..5cc1d6e2161caa7d44353a575c2813ead6412d72 100644 (file)
@@ -24,7 +24,7 @@ from .. import invoke as invoke
 from .. import log
 
 
-MANAGERS = ['thread', 'pbs']
+MANAGERS = ['thread', 'subproc', 'pbs']
 """Submodules with JobManager subclasses."""
 
 
diff --git a/pysawsim/manager/subproc.py b/pysawsim/manager/subproc.py
new file mode 100644 (file)
index 0000000..d0bb007
--- /dev/null
@@ -0,0 +1,95 @@
+# Copyright (C) 2010  W. Trevor King <wking@drexel.edu>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# The author may be contacted at <wking@drexel.edu> on the Internet, or
+# write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
+# Philadelphia PA 19104, USA.
+
+"""Functions for running external commands on other hosts.
+"""
+
+from multiprocessing import Manager, Process, Queue, cpu_count
+
+from .. import log
+from . import Job
+from .thread import ThreadManager, CLOSE_MESSAGE
+
+
+class WorkerProcess (Process):
+    def __init__(self, spawn_queue, receive_queue, *args, **kwargs):
+        super(WorkerProcess, self).__init__(*args, **kwargs)
+        self.spawn_queue = spawn_queue
+        self.receive_queue = receive_queue
+
+    def run(self):
+        while True:
+            msg = self.spawn_queue.get()
+            if msg == CLOSE_MESSAGE:
+                log().debug('%s closing' % self.name)
+                break
+            assert isinstance(msg, Job), msg
+            log().debug('%s running job %s' % (self.name, msg))
+            msg.run()
+            self.receive_queue.put(msg)
+
+
+class SubprocessManager (ThreadManager):
+    """Manage asynchronous `Job` execution via :mod:`subprocess`.
+
+    >>> from time import sleep
+    >>> from math import sqrt
+    >>> m = SubprocessManager()
+    >>> group_A = []
+    >>> for i in range(10):
+    ...     t = max(0, 5-i)
+    ...     group_A.append(m.async_invoke(Job(target=sleep, args=[t])))
+    >>> group_B = []
+    >>> for i in range(10):
+    ...     group_B.append(m.async_invoke(Job(target=sqrt, args=[i],
+    ...                 blocks_on=[j.id for j in group_A])))
+    >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]])
+    >>> print sorted(jobs.values(), key=lambda j: j.id)
+    [<Job 5>, <Job 6>, <Job 7>]
+    >>> jobs = m.wait()
+    >>> print sorted(jobs.values(), key=lambda j: j.id)
+    ... # doctest: +NORMALIZE_WHITESPACE
+    [<Job 0>, <Job 1>, <Job 2>, <Job 3>, <Job 4>, <Job 8>, <Job 9>, <Job 10>,
+     <Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
+     <Job 18>, <Job 19>]
+    >>> m.teardown()
+    """
+    def __init__(self, worker_pool=None):
+        super(SubprocessManager, self).__init__(worker_pool=worker_pool)
+
+    def _setup_queues(self):
+        self._spawn_queue = Queue()
+        self._receive_queue = Queue()
+
+    def _spawn_workers(self, worker_pool=None):
+        if worker_pool == None:
+            worker_pool = cpu_count() + 1
+        self._manager = Manager()
+        self._workers = []
+        for i in range(worker_pool):
+            worker = WorkerProcess(spawn_queue=self._spawn_queue,
+                                   receive_queue=self._receive_queue,
+                                   name='worker-%d' % i)
+            log().debug('start %s' % worker.name)
+            worker.start()
+            self._workers.append(worker)
+
+    def _put_job_in_spawn_queue(self, job):
+        """Place a job in the spawn queue."""
+        self._spawn_queue.put(job)
index 289b06c1a4bead0dc24567307c646f6b10b236fc..636545b1f9e5b285618e8281d3994b4419b1a763 100644 (file)
@@ -90,14 +90,20 @@ class ThreadManager (JobManager):
     def __init__(self, worker_pool=2):
         super(ThreadManager, self).__init__()
         self._blocked = []
+        self._setup_queues()
+        self._spawn_workers(worker_pool)
+
+    def _setup_queues(self):
         self._spawn_queue = Queue()
         self._receive_queue = Queue()
+
+    def _spawn_workers(self, worker_pool):
         self._workers = []
         for i in range(worker_pool):
             worker = WorkerThread(spawn_queue=self._spawn_queue,
                                   receive_queue=self._receive_queue,
                                   name='worker-%d' % i)
-            log().debug('start thread %s' % worker.name)
+            log().debug('start %s' % worker.name)
             worker.start()
             self._workers.append(worker)
 
@@ -105,7 +111,7 @@ class ThreadManager (JobManager):
         for worker in self._workers:
             self._spawn_queue.put(CLOSE_MESSAGE)
         for worker in self._workers:
-            log().debug('join thread %s' % worker.name)
+            log().debug('join %s' % worker.name)
             worker.join()
         super(ThreadManager, self).teardown()
 
@@ -123,7 +129,18 @@ class ThreadManager (JobManager):
             log().debug('block job %s' % job)
             self._blocked.append(job)
             return
-        self._spawn_queue.put(copy.deepcopy(job))  # protect from shared memory
+        self._put_job_in_spawn_queue(job)
+
+    def _put_job_in_spawn_queue(self, job):
+        """Place a job in the spawn queue.
+
+        Threads share memory, so we need to send a copy of `job` to
+        protect the local copy from unmanaged changes.
+
+        Broken out to a method to allow code sharing with
+        SubprocessManager.
+        """
+        self._spawn_queue.put(copy.deepcopy(job))
 
     def _receive_job(self, block=True):
         try: