--- /dev/null
+# Copyright (C) 2010 W. Trevor King <wking@drexel.edu>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# The author may be contacted at <wking@drexel.edu> on the Internet, or
+# write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
+# Philadelphia PA 19104, USA.
+
+"""Functions for running external commands on other hosts.
+"""
+
+from multiprocessing import Manager, Process, Queue, cpu_count
+
+from .. import log
+from . import Job
+from .thread import ThreadManager, CLOSE_MESSAGE
+
+
+class WorkerProcess (Process):
+ def __init__(self, spawn_queue, receive_queue, *args, **kwargs):
+ super(WorkerProcess, self).__init__(*args, **kwargs)
+ self.spawn_queue = spawn_queue
+ self.receive_queue = receive_queue
+
+ def run(self):
+ while True:
+ msg = self.spawn_queue.get()
+ if msg == CLOSE_MESSAGE:
+ log().debug('%s closing' % self.name)
+ break
+ assert isinstance(msg, Job), msg
+ log().debug('%s running job %s' % (self.name, msg))
+ msg.run()
+ self.receive_queue.put(msg)
+
+
+class SubprocessManager (ThreadManager):
+ """Manage asynchronous `Job` execution via :mod:`subprocess`.
+
+ >>> from time import sleep
+ >>> from math import sqrt
+ >>> m = SubprocessManager()
+ >>> group_A = []
+ >>> for i in range(10):
+ ... t = max(0, 5-i)
+ ... group_A.append(m.async_invoke(Job(target=sleep, args=[t])))
+ >>> group_B = []
+ >>> for i in range(10):
+ ... group_B.append(m.async_invoke(Job(target=sqrt, args=[i],
+ ... blocks_on=[j.id for j in group_A])))
+ >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]])
+ >>> print sorted(jobs.values(), key=lambda j: j.id)
+ [<Job 5>, <Job 6>, <Job 7>]
+ >>> jobs = m.wait()
+ >>> print sorted(jobs.values(), key=lambda j: j.id)
+ ... # doctest: +NORMALIZE_WHITESPACE
+ [<Job 0>, <Job 1>, <Job 2>, <Job 3>, <Job 4>, <Job 8>, <Job 9>, <Job 10>,
+ <Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
+ <Job 18>, <Job 19>]
+ >>> m.teardown()
+ """
+ def __init__(self, worker_pool=None):
+ super(SubprocessManager, self).__init__(worker_pool=worker_pool)
+
+ def _setup_queues(self):
+ self._spawn_queue = Queue()
+ self._receive_queue = Queue()
+
+ def _spawn_workers(self, worker_pool=None):
+ if worker_pool == None:
+ worker_pool = cpu_count() + 1
+ self._manager = Manager()
+ self._workers = []
+ for i in range(worker_pool):
+ worker = WorkerProcess(spawn_queue=self._spawn_queue,
+ receive_queue=self._receive_queue,
+ name='worker-%d' % i)
+ log().debug('start %s' % worker.name)
+ worker.start()
+ self._workers.append(worker)
+
+ def _put_job_in_spawn_queue(self, job):
+ """Place a job in the spawn queue."""
+ self._spawn_queue.put(job)
def __init__(self, worker_pool=2):
super(ThreadManager, self).__init__()
self._blocked = []
+ self._setup_queues()
+ self._spawn_workers(worker_pool)
+
+ def _setup_queues(self):
self._spawn_queue = Queue()
self._receive_queue = Queue()
+
+ def _spawn_workers(self, worker_pool):
self._workers = []
for i in range(worker_pool):
worker = WorkerThread(spawn_queue=self._spawn_queue,
receive_queue=self._receive_queue,
name='worker-%d' % i)
- log().debug('start thread %s' % worker.name)
+ log().debug('start %s' % worker.name)
worker.start()
self._workers.append(worker)
for worker in self._workers:
self._spawn_queue.put(CLOSE_MESSAGE)
for worker in self._workers:
- log().debug('join thread %s' % worker.name)
+ log().debug('join %s' % worker.name)
worker.join()
super(ThreadManager, self).teardown()
log().debug('block job %s' % job)
self._blocked.append(job)
return
- self._spawn_queue.put(copy.deepcopy(job)) # protect from shared memory
+ self._put_job_in_spawn_queue(job)
+
+ def _put_job_in_spawn_queue(self, job):
+ """Place a job in the spawn queue.
+
+ Threads share memory, so we need to send a copy of `job` to
+ protect the local copy from unmanaged changes.
+
+ Broken out to a method to allow code sharing with
+ SubprocessManager.
+ """
+ self._spawn_queue.put(copy.deepcopy(job))
def _receive_job(self, block=True):
try: