1 # Copyright (C) 2010 W. Trevor King <wking@drexel.edu>
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 # The author may be contacted at <wking@drexel.edu> on the Internet, or
17 # write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
18 # Philadelphia PA 19104, USA.
20 """Functions for running external commands on other hosts.
25 from Queue import Queue, Empty
29 from . import Job, JobManager
33 _DISABLING_ERROR = None
35 CLOSE_MESSAGE = "close"
38 class WorkerThread (threading.Thread):
39 def __init__(self, spawn_queue, receive_queue, *args, **kwargs):
40 super(WorkerThread, self).__init__(*args, **kwargs)
41 self.spawn_queue = spawn_queue
42 self.receive_queue = receive_queue
43 self.name = self.getName() # work around Pythons < 2.6
47 msg = self.spawn_queue.get()
48 if msg == CLOSE_MESSAGE:
49 log().debug('%s closing' % self.name)
51 assert isinstance(msg, Job), msg
52 log().debug('%s running job %s' % (self.name, msg))
54 self.receive_queue.put(msg)
57 class ThreadManager (JobManager):
58 """Manage asynchronous `Job` execution via :mod:`threading`.
60 >>> from time import sleep
61 >>> from math import sqrt
62 >>> m = ThreadManager()
64 >>> for i in range(10):
66 ... group_A.append(m.async_invoke(Job(target=sleep, args=[t])))
68 >>> for i in range(10):
69 ... group_B.append(m.async_invoke(Job(target=sqrt, args=[i],
70 ... blocks_on=[j.id for j in group_A])))
71 >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]])
72 >>> print sorted(jobs.values(), key=lambda j: j.id)
73 [<Job 5>, <Job 6>, <Job 7>]
75 >>> print sorted(jobs.values(), key=lambda j: j.id)
76 ... # doctest: +NORMALIZE_WHITESPACE
77 [<Job 0>, <Job 1>, <Job 2>, <Job 3>, <Job 4>, <Job 8>, <Job 9>, <Job 10>,
78 <Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
82 Note that Python's Global Interpreter Lock (GIL) currently limits
83 threads to a single core. See the following discussions:
85 * http://smoothspan.wordpress.com/2007/09/14/guido-is-right-to-leave-the-gil-in-python-not-for-multicore-but-for-utility-computing/
86 * http://docs.python.org/faq/library#can-t-we-get-rid-of-the-global-interpreter-lock
87 * http://www.artima.com/weblogs/viewpost.jsp?thread=214235
88 * http://www.snaplogic.com/blog/?p=94
89 * http://stackoverflow.com/questions/31340/
91 Increasing `worker_pool` will only help you get around IO blockin
92 at the cost increased time-slicing overhead.
94 def __init__(self, worker_pool=None):
95 super(ThreadManager, self).__init__()
98 self._spawn_workers(worker_pool)
100 def _setup_queues(self):
101 self._spawn_queue = Queue()
102 self._receive_queue = Queue()
104 def _spawn_workers(self, worker_pool):
105 if worker_pool is None:
106 worker_pool = int(os.environ.get('WORKER_POOL', 2))
108 for i in range(worker_pool):
109 worker = WorkerThread(spawn_queue=self._spawn_queue,
110 receive_queue=self._receive_queue,
111 name='worker-%d' % i)
112 log().debug('start %s' % worker.name)
114 self._workers.append(worker)
117 for worker in self._workers:
118 self._spawn_queue.put(CLOSE_MESSAGE)
119 for worker in self._workers:
120 log().debug('join %s' % worker.name)
122 super(ThreadManager, self).teardown()
124 def _job_is_blocked(self, job, ignore_id=None):
125 for id in job.blocks_on:
128 elif id in self._jobs and self._jobs[id].status == None:
132 def _spawn_job(self, job):
133 j = self._receive_job(block=False)
135 self._handle_received_job(j)
136 if self._job_is_blocked(job):
137 log().debug('block job %s' % job)
138 self._blocked.append(job)
140 self._put_job_in_spawn_queue(job)
142 def _put_job_in_spawn_queue(self, job):
143 """Place a job in the spawn queue.
145 Threads share memory, so we need to send a copy of `job` to
146 protect the local copy from unmanaged changes.
148 Broken out to a method to allow code sharing with
151 self._spawn_queue.put(copy.deepcopy(job))
153 def _receive_job(self, block=True):
155 job = self._receive_queue.get(block=block)
158 for j in list(self._blocked):
159 if job.id in j.blocks_on:
160 if not self._job_is_blocked(j, ignore_id=job.id):
161 log().debug('unblock job %s' % j)
162 self._blocked.remove(j)
163 self._spawn_queue.put(j)