Reduce ThreadManager default worker_thread to 2.
[sawsim.git] / pysawsim / manager / thread.py
1 # Copyright (C) 2010  W. Trevor King <wking@drexel.edu>
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License
14 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
15 #
16 # The author may be contacted at <wking@drexel.edu> on the Internet, or
17 # write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
18 # Philadelphia PA 19104, USA.
19
20 """Functions for running external commands on other hosts.
21 """
22
23 import copy
24 from Queue import Queue, Empty
25 import threading
26
27 from .. import log
28 from . import Job, JobManager
29
30
31 CLOSE_MESSAGE = "close"
32
33
34 class WorkerThread (threading.Thread):
35     def __init__(self, spawn_queue, receive_queue, *args, **kwargs):
36         super(WorkerThread, self).__init__(*args, **kwargs)
37         self.spawn_queue = spawn_queue
38         self.receive_queue = receive_queue
39         self.name = self.getName()  # work around Pythons < 2.6
40
41     def run(self):
42         while True:
43             msg = self.spawn_queue.get()
44             if msg == CLOSE_MESSAGE:
45                 log().debug('%s closing' % self.name)
46                 break
47             assert isinstance(msg, Job), msg
48             log().debug('%s running job %s' % (self.name, msg))
49             msg.run()
50             self.receive_queue.put(msg)
51
52
53 class ThreadManager (JobManager):
54     """Manage asynchronous `Job` execution via :mod:`threading`.
55
56     >>> from time import sleep
57     >>> from math import sqrt
58     >>> m = ThreadManager()
59     >>> group_A = []
60     >>> for i in range(10):
61     ...     t = max(0, 5-i)
62     ...     group_A.append(m.async_invoke(Job(target=sleep, args=[t])))
63     >>> group_B = []
64     >>> for i in range(10):
65     ...     group_B.append(m.async_invoke(Job(target=sqrt, args=[i],
66     ...                 blocks_on=[j.id for j in group_A])))
67     >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]])
68     >>> print sorted(jobs.values(), key=lambda j: j.id)
69     [<Job 5>, <Job 6>, <Job 7>]
70     >>> jobs = m.wait()
71     >>> print sorted(jobs.values(), key=lambda j: j.id)
72     ... # doctest: +NORMALIZE_WHITESPACE
73     [<Job 0>, <Job 1>, <Job 2>, <Job 3>, <Job 4>, <Job 8>, <Job 9>, <Job 10>,
74      <Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
75      <Job 18>, <Job 19>]
76     >>> m.teardown()
77
78     Note that Python's Global Interpreter Lock (GIL) currently limits
79     threads to a single core.  See the following discussions:
80
81     * http://smoothspan.wordpress.com/2007/09/14/guido-is-right-to-leave-the-gil-in-python-not-for-multicore-but-for-utility-computing/
82     * http://docs.python.org/faq/library#id18
83     * http://www.artima.com/weblogs/viewpost.jsp?thread=214235
84     * http://www.snaplogic.com/blog/?p=94
85     * http://stackoverflow.com/questions/31340/
86
87     Increasing `worker_pool` will only help you get around IO blockin
88     at the cost increased time-slicing overhead.
89     """
90     def __init__(self, worker_pool=2):
91         super(ThreadManager, self).__init__()
92         self._blocked = []
93         self._spawn_queue = Queue()
94         self._receive_queue = Queue()
95         self._workers = []
96         for i in range(worker_pool):
97             worker = WorkerThread(spawn_queue=self._spawn_queue,
98                                   receive_queue=self._receive_queue,
99                                   name='worker-%d' % i)
100             log().debug('start thread %s' % worker.name)
101             worker.start()
102             self._workers.append(worker)
103
104     def teardown(self):
105         for worker in self._workers:
106             self._spawn_queue.put(CLOSE_MESSAGE)
107         for worker in self._workers:
108             log().debug('join thread %s' % worker.name)
109             worker.join()
110         super(ThreadManager, self).teardown()
111
112     def _job_is_blocked(self, job, ignore_id=None):
113         for id in job.blocks_on:
114             if id == ignore_id:
115                 continue
116             elif id in self._jobs and self._jobs[id].status == None:
117                 return True
118         return False
119
120     def _spawn_job(self, job):
121         self._receive_job(block=False)
122         if self._job_is_blocked(job):
123             log().debug('block job %s' % job)
124             self._blocked.append(job)
125             return
126         self._spawn_queue.put(copy.deepcopy(job))  # protect from shared memory
127
128     def _receive_job(self, block=True):
129         try:
130             job = self._receive_queue.get(block=block)
131         except Empty:
132             return
133         for j in list(self._blocked):
134             if job.id in j.blocks_on:
135                 if not self._job_is_blocked(j, ignore_id=job.id):
136                     log().debug('unblock job %s' % j)
137                     self._blocked.remove(j)
138                     self._spawn_queue.put(j)
139         return job