1 # Copyright (C) 2010 W. Trevor King <wking@drexel.edu>
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 # The author may be contacted at <wking@drexel.edu> on the Internet, or
17 # write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
18 # Philadelphia PA 19104, USA.
20 """Functions for running external commands on other hosts.
24 from multiprocessing import Manager, Process, Queue, cpu_count
26 _DISABLING_ERROR = None
28 except ImportError, _DISABLING_ERROR:
31 _SKIP = ' # doctest: +SKIP'
37 from .thread import ThreadManager, CLOSE_MESSAGE
41 """Check succeptibility to python issue 5313:
42 http://bugs.python.org/issue5155
43 http://bugs.python.org/issue5313
44 http://svn.python.org/view?view=rev&revision=73708
46 Fix merged into the 2.6 maintenance branch with
48 changeset: 531:11e5b7504a71
49 branch: release26-maint
51 date: Tue Jul 21 19:02:14 2009 +0200
52 summary: [svn r74145] Merged revisions 73708,73738 via svnmerge...
54 Which came between 2.6.2 and 2.6.3rc1
56 $ hg blame -r 631 README | grep 'version 2\.6\.'
57 631: This is Python version 2.6.3rc1
58 $ hg blame -r 630 README | grep 'version 2\.6\.'
59 345: This is Python version 2.6.2
62 _HAS_BUG_5313 = sys.version_info < (2,6,3)
65 class WorkerProcess (Process):
66 def __init__(self, spawn_queue, receive_queue, *args, **kwargs):
68 raise _DISABLING_ERROR
69 super(WorkerProcess, self).__init__(*args, **kwargs)
70 self.spawn_queue = spawn_queue
71 self.receive_queue = receive_queue
75 msg = self.spawn_queue.get()
76 if msg == CLOSE_MESSAGE:
77 log().debug('%s closing' % self.name)
79 assert isinstance(msg, Job), msg
80 log().debug('%s running job %s' % (self.name, msg))
82 self.receive_queue.put(msg)
85 class SubprocessManager (ThreadManager):
86 """Manage asynchronous `Job` execution via :mod:`subprocess`.
88 >>> from time import sleep
89 >>> from math import sqrt
90 >>> m = SubprocessManager()%(skip)s
92 >>> for i in range(10):%(skip)s
94 ... group_A.append(m.async_invoke(Job(target=sleep, args=[t])))
96 >>> for i in range(10):%(skip)s
97 ... group_B.append(m.async_invoke(Job(target=sqrt, args=[i],
98 ... blocks_on=[j.id for j in group_A])))
99 >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]])%(skip)s
100 >>> print sorted(jobs.values(), key=lambda j: j.id)%(skip)s
101 [<Job 5>, <Job 6>, <Job 7>]
102 >>> jobs = m.wait()%(skip)s
103 >>> print sorted(jobs.values(), key=lambda j: j.id)%(skip)s
104 ... # doctest: +NORMALIZE_WHITESPACE
105 [<Job 0>, <Job 1>, <Job 2>, <Job 3>, <Job 4>, <Job 8>, <Job 9>, <Job 10>,
106 <Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
108 >>> m.teardown()%(skip)s
109 """ % {'skip': _SKIP}
110 def __init__(self, worker_pool=None):
111 if _ENABLED == False:
112 raise _DISABLING_ERROR
113 super(SubprocessManager, self).__init__(worker_pool=worker_pool)
115 def _setup_queues(self):
116 self._spawn_queue = Queue()
117 self._receive_queue = Queue()
119 def _spawn_workers(self, worker_pool=None):
120 if worker_pool is None:
121 worker_pool = int(os.environ.get('WORKER_POOL', cpu_count() + 1))
122 self._manager = Manager()
124 for i in range(worker_pool):
125 worker = WorkerProcess(spawn_queue=self._spawn_queue,
126 receive_queue=self._receive_queue,
127 name='worker-%d' % i)
128 log().debug('start %s' % worker.name)
130 self._workers.append(worker)
132 def _put_job_in_spawn_queue(self, job):
133 """Place a job in the spawn queue."""
134 self._spawn_queue.put(job)