Add WORKER_POOL environmental option to facilitate pysawsim.manager benchmarking.
[sawsim.git] / pysawsim / manager / thread.py
1 # Copyright (C) 2010  W. Trevor King <wking@drexel.edu>
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License
14 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
15 #
16 # The author may be contacted at <wking@drexel.edu> on the Internet, or
17 # write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
18 # Philadelphia PA 19104, USA.
19
20 """Functions for running external commands on other hosts.
21 """
22
23 import copy
24 import os
25 from Queue import Queue, Empty
26 import threading
27
28 from .. import log
29 from . import Job, JobManager
30
31
32 _ENABLED = True
33 _DISABLING_ERROR = None
34
35 CLOSE_MESSAGE = "close"
36
37
38 class WorkerThread (threading.Thread):
39     def __init__(self, spawn_queue, receive_queue, *args, **kwargs):
40         super(WorkerThread, self).__init__(*args, **kwargs)
41         self.spawn_queue = spawn_queue
42         self.receive_queue = receive_queue
43         self.name = self.getName()  # work around Pythons < 2.6
44
45     def run(self):
46         while True:
47             msg = self.spawn_queue.get()
48             if msg == CLOSE_MESSAGE:
49                 log().debug('%s closing' % self.name)
50                 break
51             assert isinstance(msg, Job), msg
52             log().debug('%s running job %s' % (self.name, msg))
53             msg.run()
54             self.receive_queue.put(msg)
55
56
57 class ThreadManager (JobManager):
58     """Manage asynchronous `Job` execution via :mod:`threading`.
59
60     >>> from time import sleep
61     >>> from math import sqrt
62     >>> m = ThreadManager()
63     >>> group_A = []
64     >>> for i in range(10):
65     ...     t = max(0, 5-i)
66     ...     group_A.append(m.async_invoke(Job(target=sleep, args=[t])))
67     >>> group_B = []
68     >>> for i in range(10):
69     ...     group_B.append(m.async_invoke(Job(target=sqrt, args=[i],
70     ...                 blocks_on=[j.id for j in group_A])))
71     >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]])
72     >>> print sorted(jobs.values(), key=lambda j: j.id)
73     [<Job 5>, <Job 6>, <Job 7>]
74     >>> jobs = m.wait()
75     >>> print sorted(jobs.values(), key=lambda j: j.id)
76     ... # doctest: +NORMALIZE_WHITESPACE
77     [<Job 0>, <Job 1>, <Job 2>, <Job 3>, <Job 4>, <Job 8>, <Job 9>, <Job 10>,
78      <Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
79      <Job 18>, <Job 19>]
80     >>> m.teardown()
81
82     Note that Python's Global Interpreter Lock (GIL) currently limits
83     threads to a single core.  See the following discussions:
84
85     * http://smoothspan.wordpress.com/2007/09/14/guido-is-right-to-leave-the-gil-in-python-not-for-multicore-but-for-utility-computing/
86     * http://docs.python.org/faq/library#can-t-we-get-rid-of-the-global-interpreter-lock
87     * http://www.artima.com/weblogs/viewpost.jsp?thread=214235
88     * http://www.snaplogic.com/blog/?p=94
89     * http://stackoverflow.com/questions/31340/
90
91     Increasing `worker_pool` will only help you get around IO blockin
92     at the cost increased time-slicing overhead.
93     """
94     def __init__(self, worker_pool=None):
95         super(ThreadManager, self).__init__()
96         self._blocked = []
97         self._setup_queues()
98         self._spawn_workers(worker_pool)
99
100     def _setup_queues(self):
101         self._spawn_queue = Queue()
102         self._receive_queue = Queue()
103
104     def _spawn_workers(self, worker_pool):
105         if worker_pool is None:
106             worker_pool = int(os.environ.get('WORKER_POOL', 2))
107         self._workers = []
108         for i in range(worker_pool):
109             worker = WorkerThread(spawn_queue=self._spawn_queue,
110                                   receive_queue=self._receive_queue,
111                                   name='worker-%d' % i)
112             log().debug('start %s' % worker.name)
113             worker.start()
114             self._workers.append(worker)
115
116     def teardown(self):
117         for worker in self._workers:
118             self._spawn_queue.put(CLOSE_MESSAGE)
119         for worker in self._workers:
120             log().debug('join %s' % worker.name)
121             worker.join()
122         super(ThreadManager, self).teardown()
123
124     def _job_is_blocked(self, job, ignore_id=None):
125         for id in job.blocks_on:
126             if id == ignore_id:
127                 continue
128             elif id in self._jobs and self._jobs[id].status == None:
129                 return True
130         return False
131
132     def _spawn_job(self, job):
133         j = self._receive_job(block=False)
134         if j != None:
135             self._handle_received_job(j)
136         if self._job_is_blocked(job):
137             log().debug('block job %s' % job)
138             self._blocked.append(job)
139             return
140         self._put_job_in_spawn_queue(job)
141
142     def _put_job_in_spawn_queue(self, job):
143         """Place a job in the spawn queue.
144
145         Threads share memory, so we need to send a copy of `job` to
146         protect the local copy from unmanaged changes.
147
148         Broken out to a method to allow code sharing with
149         SubprocessManager.
150         """
151         self._spawn_queue.put(copy.deepcopy(job))
152
153     def _receive_job(self, block=True):
154         try:
155             job = self._receive_queue.get(block=block)
156         except Empty:
157             return
158         for j in list(self._blocked):
159             if job.id in j.blocks_on:
160                 if not self._job_is_blocked(j, ignore_id=job.id):
161                     log().debug('unblock job %s' % j)
162                     self._blocked.remove(j)
163                     self._spawn_queue.put(j)
164         return job