Add pysawsim.manager.subproc using subprocessing.
[sawsim.git] / pysawsim / manager / thread.py
1 # Copyright (C) 2010  W. Trevor King <wking@drexel.edu>
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License
14 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
15 #
16 # The author may be contacted at <wking@drexel.edu> on the Internet, or
17 # write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
18 # Philadelphia PA 19104, USA.
19
20 """Functions for running external commands on other hosts.
21 """
22
23 import copy
24 from Queue import Queue, Empty
25 import threading
26
27 from .. import log
28 from . import Job, JobManager
29
30
31 CLOSE_MESSAGE = "close"
32
33
34 class WorkerThread (threading.Thread):
35     def __init__(self, spawn_queue, receive_queue, *args, **kwargs):
36         super(WorkerThread, self).__init__(*args, **kwargs)
37         self.spawn_queue = spawn_queue
38         self.receive_queue = receive_queue
39         self.name = self.getName()  # work around Pythons < 2.6
40
41     def run(self):
42         while True:
43             msg = self.spawn_queue.get()
44             if msg == CLOSE_MESSAGE:
45                 log().debug('%s closing' % self.name)
46                 break
47             assert isinstance(msg, Job), msg
48             log().debug('%s running job %s' % (self.name, msg))
49             msg.run()
50             self.receive_queue.put(msg)
51
52
53 class ThreadManager (JobManager):
54     """Manage asynchronous `Job` execution via :mod:`threading`.
55
56     >>> from time import sleep
57     >>> from math import sqrt
58     >>> m = ThreadManager()
59     >>> group_A = []
60     >>> for i in range(10):
61     ...     t = max(0, 5-i)
62     ...     group_A.append(m.async_invoke(Job(target=sleep, args=[t])))
63     >>> group_B = []
64     >>> for i in range(10):
65     ...     group_B.append(m.async_invoke(Job(target=sqrt, args=[i],
66     ...                 blocks_on=[j.id for j in group_A])))
67     >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]])
68     >>> print sorted(jobs.values(), key=lambda j: j.id)
69     [<Job 5>, <Job 6>, <Job 7>]
70     >>> jobs = m.wait()
71     >>> print sorted(jobs.values(), key=lambda j: j.id)
72     ... # doctest: +NORMALIZE_WHITESPACE
73     [<Job 0>, <Job 1>, <Job 2>, <Job 3>, <Job 4>, <Job 8>, <Job 9>, <Job 10>,
74      <Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
75      <Job 18>, <Job 19>]
76     >>> m.teardown()
77
78     Note that Python's Global Interpreter Lock (GIL) currently limits
79     threads to a single core.  See the following discussions:
80
81     * http://smoothspan.wordpress.com/2007/09/14/guido-is-right-to-leave-the-gil-in-python-not-for-multicore-but-for-utility-computing/
82     * http://docs.python.org/faq/library#id18
83     * http://www.artima.com/weblogs/viewpost.jsp?thread=214235
84     * http://www.snaplogic.com/blog/?p=94
85     * http://stackoverflow.com/questions/31340/
86
87     Increasing `worker_pool` will only help you get around IO blockin
88     at the cost increased time-slicing overhead.
89     """
90     def __init__(self, worker_pool=2):
91         super(ThreadManager, self).__init__()
92         self._blocked = []
93         self._setup_queues()
94         self._spawn_workers(worker_pool)
95
96     def _setup_queues(self):
97         self._spawn_queue = Queue()
98         self._receive_queue = Queue()
99
100     def _spawn_workers(self, worker_pool):
101         self._workers = []
102         for i in range(worker_pool):
103             worker = WorkerThread(spawn_queue=self._spawn_queue,
104                                   receive_queue=self._receive_queue,
105                                   name='worker-%d' % i)
106             log().debug('start %s' % worker.name)
107             worker.start()
108             self._workers.append(worker)
109
110     def teardown(self):
111         for worker in self._workers:
112             self._spawn_queue.put(CLOSE_MESSAGE)
113         for worker in self._workers:
114             log().debug('join %s' % worker.name)
115             worker.join()
116         super(ThreadManager, self).teardown()
117
118     def _job_is_blocked(self, job, ignore_id=None):
119         for id in job.blocks_on:
120             if id == ignore_id:
121                 continue
122             elif id in self._jobs and self._jobs[id].status == None:
123                 return True
124         return False
125
126     def _spawn_job(self, job):
127         self._receive_job(block=False)
128         if self._job_is_blocked(job):
129             log().debug('block job %s' % job)
130             self._blocked.append(job)
131             return
132         self._put_job_in_spawn_queue(job)
133
134     def _put_job_in_spawn_queue(self, job):
135         """Place a job in the spawn queue.
136
137         Threads share memory, so we need to send a copy of `job` to
138         protect the local copy from unmanaged changes.
139
140         Broken out to a method to allow code sharing with
141         SubprocessManager.
142         """
143         self._spawn_queue.put(copy.deepcopy(job))
144
145     def _receive_job(self, block=True):
146         try:
147             job = self._receive_queue.get(block=block)
148         except Empty:
149             return
150         for j in list(self._blocked):
151             if job.id in j.blocks_on:
152                 if not self._job_is_blocked(j, ignore_id=job.id):
153                     log().debug('unblock job %s' % j)
154                     self._blocked.remove(j)
155                     self._spawn_queue.put(j)
156         return job