From 326f038f597bb51fb80f267032a963bcdde337f0 Mon Sep 17 00:00:00 2001 From: "W. Trevor King" Date: Wed, 20 Oct 2010 06:38:36 -0400 Subject: [PATCH] Add pysawsim.manager.subproc using subprocessing. --- pysawsim/manager/__init__.py | 2 +- pysawsim/manager/subproc.py | 95 ++++++++++++++++++++++++++++++++++++ pysawsim/manager/thread.py | 23 +++++++-- 3 files changed, 116 insertions(+), 4 deletions(-) create mode 100644 pysawsim/manager/subproc.py diff --git a/pysawsim/manager/__init__.py b/pysawsim/manager/__init__.py index d596785..5cc1d6e 100644 --- a/pysawsim/manager/__init__.py +++ b/pysawsim/manager/__init__.py @@ -24,7 +24,7 @@ from .. import invoke as invoke from .. import log -MANAGERS = ['thread', 'pbs'] +MANAGERS = ['thread', 'subproc', 'pbs'] """Submodules with JobManager subclasses.""" diff --git a/pysawsim/manager/subproc.py b/pysawsim/manager/subproc.py new file mode 100644 index 0000000..d0bb007 --- /dev/null +++ b/pysawsim/manager/subproc.py @@ -0,0 +1,95 @@ +# Copyright (C) 2010 W. Trevor King +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# The author may be contacted at on the Internet, or +# write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St., +# Philadelphia PA 19104, USA. + +"""Functions for running external commands on other hosts. +""" + +from multiprocessing import Manager, Process, Queue, cpu_count + +from .. import log +from . import Job +from .thread import ThreadManager, CLOSE_MESSAGE + + +class WorkerProcess (Process): + def __init__(self, spawn_queue, receive_queue, *args, **kwargs): + super(WorkerProcess, self).__init__(*args, **kwargs) + self.spawn_queue = spawn_queue + self.receive_queue = receive_queue + + def run(self): + while True: + msg = self.spawn_queue.get() + if msg == CLOSE_MESSAGE: + log().debug('%s closing' % self.name) + break + assert isinstance(msg, Job), msg + log().debug('%s running job %s' % (self.name, msg)) + msg.run() + self.receive_queue.put(msg) + + +class SubprocessManager (ThreadManager): + """Manage asynchronous `Job` execution via :mod:`subprocess`. + + >>> from time import sleep + >>> from math import sqrt + >>> m = SubprocessManager() + >>> group_A = [] + >>> for i in range(10): + ... t = max(0, 5-i) + ... group_A.append(m.async_invoke(Job(target=sleep, args=[t]))) + >>> group_B = [] + >>> for i in range(10): + ... group_B.append(m.async_invoke(Job(target=sqrt, args=[i], + ... blocks_on=[j.id for j in group_A]))) + >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]]) + >>> print sorted(jobs.values(), key=lambda j: j.id) + [, , ] + >>> jobs = m.wait() + >>> print sorted(jobs.values(), key=lambda j: j.id) + ... # doctest: +NORMALIZE_WHITESPACE + [, , , , , , , , + , , , , , , , + , ] + >>> m.teardown() + """ + def __init__(self, worker_pool=None): + super(SubprocessManager, self).__init__(worker_pool=worker_pool) + + def _setup_queues(self): + self._spawn_queue = Queue() + self._receive_queue = Queue() + + def _spawn_workers(self, worker_pool=None): + if worker_pool == None: + worker_pool = cpu_count() + 1 + self._manager = Manager() + self._workers = [] + for i in range(worker_pool): + worker = WorkerProcess(spawn_queue=self._spawn_queue, + receive_queue=self._receive_queue, + name='worker-%d' % i) + log().debug('start %s' % worker.name) + worker.start() + self._workers.append(worker) + + def _put_job_in_spawn_queue(self, job): + """Place a job in the spawn queue.""" + self._spawn_queue.put(job) diff --git a/pysawsim/manager/thread.py b/pysawsim/manager/thread.py index 289b06c..636545b 100644 --- a/pysawsim/manager/thread.py +++ b/pysawsim/manager/thread.py @@ -90,14 +90,20 @@ class ThreadManager (JobManager): def __init__(self, worker_pool=2): super(ThreadManager, self).__init__() self._blocked = [] + self._setup_queues() + self._spawn_workers(worker_pool) + + def _setup_queues(self): self._spawn_queue = Queue() self._receive_queue = Queue() + + def _spawn_workers(self, worker_pool): self._workers = [] for i in range(worker_pool): worker = WorkerThread(spawn_queue=self._spawn_queue, receive_queue=self._receive_queue, name='worker-%d' % i) - log().debug('start thread %s' % worker.name) + log().debug('start %s' % worker.name) worker.start() self._workers.append(worker) @@ -105,7 +111,7 @@ class ThreadManager (JobManager): for worker in self._workers: self._spawn_queue.put(CLOSE_MESSAGE) for worker in self._workers: - log().debug('join thread %s' % worker.name) + log().debug('join %s' % worker.name) worker.join() super(ThreadManager, self).teardown() @@ -123,7 +129,18 @@ class ThreadManager (JobManager): log().debug('block job %s' % job) self._blocked.append(job) return - self._spawn_queue.put(copy.deepcopy(job)) # protect from shared memory + self._put_job_in_spawn_queue(job) + + def _put_job_in_spawn_queue(self, job): + """Place a job in the spawn queue. + + Threads share memory, so we need to send a copy of `job` to + protect the local copy from unmanaged changes. + + Broken out to a method to allow code sharing with + SubprocessManager. + """ + self._spawn_queue.put(copy.deepcopy(job)) def _receive_job(self, block=True): try: -- 2.26.2