d767da4011cac35dc66c544a78eff99bac7e6775
[sawsim.git] / pysawsim / manager / subproc.py
1 # Copyright (C) 2010  W. Trevor King <wking@drexel.edu>
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License
14 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
15 #
16 # The author may be contacted at <wking@drexel.edu> on the Internet, or
17 # write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
18 # Philadelphia PA 19104, USA.
19
20 """Functions for running external commands on other hosts.
21 """
22
23 try:
24     from multiprocessing import Manager, Process, Queue, cpu_count
25     _ENABLED = True
26     _DISABLING_ERROR = None
27     _SKIP = ''
28 except ImportError, _DISABLING_ERROR:
29     _ENABLED = False
30     Process = object
31     _SKIP = '  # doctest: +SKIP'
32
33
34 from .. import log
35 from . import Job
36 from .thread import ThreadManager, CLOSE_MESSAGE
37
38
39 if _ENABLED == True:
40     """Check succeptibility to python issue 5313:
41       http://bugs.python.org/issue5155
42       http://bugs.python.org/issue5313
43       http://svn.python.org/view?view=rev&revision=73708
44
45     Fix merged into the 2.6 maintenance branch with
46
47       changeset:   531:11e5b7504a71
48       branch:      release26-maint
49       user:        r.david.murray
50       date:        Tue Jul 21 19:02:14 2009 +0200
51       summary:     [svn r74145] Merged revisions 73708,73738 via svnmerge...
52
53     Which came between 2.6.2 and 2.6.3rc1
54
55       $ hg blame -r 631 README | grep 'version 2\.6\.'
56       631: This is Python version 2.6.3rc1
57       $ hg blame -r 630 README | grep 'version 2\.6\.'
58       345: This is Python version 2.6.2
59     """
60     import sys
61     _HAS_BUG_5313 = sys.version_info < (2,6,3)
62
63
64 class WorkerProcess (Process):
65     def __init__(self, spawn_queue, receive_queue, *args, **kwargs):
66         if _ENABLED == False:
67             raise _DISABLING_ERROR
68         super(WorkerProcess, self).__init__(*args, **kwargs)
69         self.spawn_queue = spawn_queue
70         self.receive_queue = receive_queue
71
72     def run(self):
73         while True:
74             msg = self.spawn_queue.get()
75             if msg == CLOSE_MESSAGE:
76                 log().debug('%s closing' % self.name)
77                 break
78             assert isinstance(msg, Job), msg
79             log().debug('%s running job %s' % (self.name, msg))
80             msg.run()
81             self.receive_queue.put(msg)
82
83
84 class SubprocessManager (ThreadManager):
85     """Manage asynchronous `Job` execution via :mod:`subprocess`.
86
87     >>> from time import sleep
88     >>> from math import sqrt
89     >>> m = SubprocessManager()%(skip)s
90     >>> group_A = []
91     >>> for i in range(10):%(skip)s
92     ...     t = max(0, 5-i)
93     ...     group_A.append(m.async_invoke(Job(target=sleep, args=[t])))
94     >>> group_B = []
95     >>> for i in range(10):%(skip)s
96     ...     group_B.append(m.async_invoke(Job(target=sqrt, args=[i],
97     ...                 blocks_on=[j.id for j in group_A])))
98     >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]])%(skip)s
99     >>> print sorted(jobs.values(), key=lambda j: j.id)%(skip)s
100     [<Job 5>, <Job 6>, <Job 7>]
101     >>> jobs = m.wait()%(skip)s
102     >>> print sorted(jobs.values(), key=lambda j: j.id)%(skip)s
103     ... # doctest: +NORMALIZE_WHITESPACE
104     [<Job 0>, <Job 1>, <Job 2>, <Job 3>, <Job 4>, <Job 8>, <Job 9>, <Job 10>,
105      <Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
106      <Job 18>, <Job 19>]
107     >>> m.teardown()%(skip)s
108     """ % {'skip': _SKIP}
109     def __init__(self, worker_pool=None):
110         if _ENABLED == False:
111             raise _DISABLING_ERROR
112         super(SubprocessManager, self).__init__(worker_pool=worker_pool)
113
114     def _setup_queues(self):
115         self._spawn_queue = Queue()
116         self._receive_queue = Queue()
117
118     def _spawn_workers(self, worker_pool=None):
119         if worker_pool == None:
120             worker_pool = cpu_count() + 1
121         self._manager = Manager()
122         self._workers = []
123         for i in range(worker_pool):
124             worker = WorkerProcess(spawn_queue=self._spawn_queue,
125                                    receive_queue=self._receive_queue,
126                                    name='worker-%d' % i)
127             log().debug('start %s' % worker.name)
128             worker.start()
129             self._workers.append(worker)
130
131     def _put_job_in_spawn_queue(self, job):
132         """Place a job in the spawn queue."""
133         self._spawn_queue.put(job)