48a2f9fec3b0f97315c18dc787cc513423f00851
[sawsim.git] / pysawsim / manager / thread.py
1 # Copyright (C) 2010  W. Trevor King <wking@drexel.edu>
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License
14 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
15 #
16 # The author may be contacted at <wking@drexel.edu> on the Internet, or
17 # write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
18 # Philadelphia PA 19104, USA.
19
20 """Functions for running external commands on other hosts.
21 """
22
23 import copy
24 from Queue import Queue, Empty
25 import threading
26
27 from .. import log
28 from . import Job, JobManager
29
30
31 _ENABLED = True
32 _DISABLING_ERROR = None
33
34 CLOSE_MESSAGE = "close"
35
36
37 class WorkerThread (threading.Thread):
38     def __init__(self, spawn_queue, receive_queue, *args, **kwargs):
39         super(WorkerThread, self).__init__(*args, **kwargs)
40         self.spawn_queue = spawn_queue
41         self.receive_queue = receive_queue
42         self.name = self.getName()  # work around Pythons < 2.6
43
44     def run(self):
45         while True:
46             msg = self.spawn_queue.get()
47             if msg == CLOSE_MESSAGE:
48                 log().debug('%s closing' % self.name)
49                 break
50             assert isinstance(msg, Job), msg
51             log().debug('%s running job %s' % (self.name, msg))
52             msg.run()
53             self.receive_queue.put(msg)
54
55
56 class ThreadManager (JobManager):
57     """Manage asynchronous `Job` execution via :mod:`threading`.
58
59     >>> from time import sleep
60     >>> from math import sqrt
61     >>> m = ThreadManager()
62     >>> group_A = []
63     >>> for i in range(10):
64     ...     t = max(0, 5-i)
65     ...     group_A.append(m.async_invoke(Job(target=sleep, args=[t])))
66     >>> group_B = []
67     >>> for i in range(10):
68     ...     group_B.append(m.async_invoke(Job(target=sqrt, args=[i],
69     ...                 blocks_on=[j.id for j in group_A])))
70     >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]])
71     >>> print sorted(jobs.values(), key=lambda j: j.id)
72     [<Job 5>, <Job 6>, <Job 7>]
73     >>> jobs = m.wait()
74     >>> print sorted(jobs.values(), key=lambda j: j.id)
75     ... # doctest: +NORMALIZE_WHITESPACE
76     [<Job 0>, <Job 1>, <Job 2>, <Job 3>, <Job 4>, <Job 8>, <Job 9>, <Job 10>,
77      <Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
78      <Job 18>, <Job 19>]
79     >>> m.teardown()
80
81     Note that Python's Global Interpreter Lock (GIL) currently limits
82     threads to a single core.  See the following discussions:
83
84     * http://smoothspan.wordpress.com/2007/09/14/guido-is-right-to-leave-the-gil-in-python-not-for-multicore-but-for-utility-computing/
85     * http://docs.python.org/faq/library#can-t-we-get-rid-of-the-global-interpreter-lock
86     * http://www.artima.com/weblogs/viewpost.jsp?thread=214235
87     * http://www.snaplogic.com/blog/?p=94
88     * http://stackoverflow.com/questions/31340/
89
90     Increasing `worker_pool` will only help you get around IO blockin
91     at the cost increased time-slicing overhead.
92     """
93     def __init__(self, worker_pool=2):
94         super(ThreadManager, self).__init__()
95         self._blocked = []
96         self._setup_queues()
97         self._spawn_workers(worker_pool)
98
99     def _setup_queues(self):
100         self._spawn_queue = Queue()
101         self._receive_queue = Queue()
102
103     def _spawn_workers(self, worker_pool):
104         self._workers = []
105         for i in range(worker_pool):
106             worker = WorkerThread(spawn_queue=self._spawn_queue,
107                                   receive_queue=self._receive_queue,
108                                   name='worker-%d' % i)
109             log().debug('start %s' % worker.name)
110             worker.start()
111             self._workers.append(worker)
112
113     def teardown(self):
114         for worker in self._workers:
115             self._spawn_queue.put(CLOSE_MESSAGE)
116         for worker in self._workers:
117             log().debug('join %s' % worker.name)
118             worker.join()
119         super(ThreadManager, self).teardown()
120
121     def _job_is_blocked(self, job, ignore_id=None):
122         for id in job.blocks_on:
123             if id == ignore_id:
124                 continue
125             elif id in self._jobs and self._jobs[id].status == None:
126                 return True
127         return False
128
129     def _spawn_job(self, job):
130         j = self._receive_job(block=False)
131         if j != None:
132             self._handle_received_job(j)
133         if self._job_is_blocked(job):
134             log().debug('block job %s' % job)
135             self._blocked.append(job)
136             return
137         self._put_job_in_spawn_queue(job)
138
139     def _put_job_in_spawn_queue(self, job):
140         """Place a job in the spawn queue.
141
142         Threads share memory, so we need to send a copy of `job` to
143         protect the local copy from unmanaged changes.
144
145         Broken out to a method to allow code sharing with
146         SubprocessManager.
147         """
148         self._spawn_queue.put(copy.deepcopy(job))
149
150     def _receive_job(self, block=True):
151         try:
152             job = self._receive_queue.get(block=block)
153         except Empty:
154             return
155         for j in list(self._blocked):
156             if job.id in j.blocks_on:
157                 if not self._job_is_blocked(j, ignore_id=job.id):
158                     log().debug('unblock job %s' % j)
159                     self._blocked.remove(j)
160                     self._spawn_queue.put(j)
161         return job