1 # Copyright (C) 2010 W. Trevor King <wking@drexel.edu>
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 # The author may be contacted at <wking@drexel.edu> on the Internet, or
17 # write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
18 # Philadelphia PA 19104, USA.
20 """Functions for running external commands on other hosts.
24 from Queue import Queue, Empty
28 from . import Job, JobManager
32 _DISABLING_ERROR = None
34 CLOSE_MESSAGE = "close"
37 class WorkerThread (threading.Thread):
38 def __init__(self, spawn_queue, receive_queue, *args, **kwargs):
39 super(WorkerThread, self).__init__(*args, **kwargs)
40 self.spawn_queue = spawn_queue
41 self.receive_queue = receive_queue
42 self.name = self.getName() # work around Pythons < 2.6
46 msg = self.spawn_queue.get()
47 if msg == CLOSE_MESSAGE:
48 log().debug('%s closing' % self.name)
50 assert isinstance(msg, Job), msg
51 log().debug('%s running job %s' % (self.name, msg))
53 self.receive_queue.put(msg)
56 class ThreadManager (JobManager):
57 """Manage asynchronous `Job` execution via :mod:`threading`.
59 >>> from time import sleep
60 >>> from math import sqrt
61 >>> m = ThreadManager()
63 >>> for i in range(10):
65 ... group_A.append(m.async_invoke(Job(target=sleep, args=[t])))
67 >>> for i in range(10):
68 ... group_B.append(m.async_invoke(Job(target=sqrt, args=[i],
69 ... blocks_on=[j.id for j in group_A])))
70 >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]])
71 >>> print sorted(jobs.values(), key=lambda j: j.id)
72 [<Job 5>, <Job 6>, <Job 7>]
74 >>> print sorted(jobs.values(), key=lambda j: j.id)
75 ... # doctest: +NORMALIZE_WHITESPACE
76 [<Job 0>, <Job 1>, <Job 2>, <Job 3>, <Job 4>, <Job 8>, <Job 9>, <Job 10>,
77 <Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
81 Note that Python's Global Interpreter Lock (GIL) currently limits
82 threads to a single core. See the following discussions:
84 * http://smoothspan.wordpress.com/2007/09/14/guido-is-right-to-leave-the-gil-in-python-not-for-multicore-but-for-utility-computing/
85 * http://docs.python.org/faq/library#can-t-we-get-rid-of-the-global-interpreter-lock
86 * http://www.artima.com/weblogs/viewpost.jsp?thread=214235
87 * http://www.snaplogic.com/blog/?p=94
88 * http://stackoverflow.com/questions/31340/
90 Increasing `worker_pool` will only help you get around IO blockin
91 at the cost increased time-slicing overhead.
93 def __init__(self, worker_pool=2):
94 super(ThreadManager, self).__init__()
97 self._spawn_workers(worker_pool)
99 def _setup_queues(self):
100 self._spawn_queue = Queue()
101 self._receive_queue = Queue()
103 def _spawn_workers(self, worker_pool):
105 for i in range(worker_pool):
106 worker = WorkerThread(spawn_queue=self._spawn_queue,
107 receive_queue=self._receive_queue,
108 name='worker-%d' % i)
109 log().debug('start %s' % worker.name)
111 self._workers.append(worker)
114 for worker in self._workers:
115 self._spawn_queue.put(CLOSE_MESSAGE)
116 for worker in self._workers:
117 log().debug('join %s' % worker.name)
119 super(ThreadManager, self).teardown()
121 def _job_is_blocked(self, job, ignore_id=None):
122 for id in job.blocks_on:
125 elif id in self._jobs and self._jobs[id].status == None:
129 def _spawn_job(self, job):
130 j = self._receive_job(block=False)
132 self._handle_received_job(j)
133 if self._job_is_blocked(job):
134 log().debug('block job %s' % job)
135 self._blocked.append(job)
137 self._put_job_in_spawn_queue(job)
139 def _put_job_in_spawn_queue(self, job):
140 """Place a job in the spawn queue.
142 Threads share memory, so we need to send a copy of `job` to
143 protect the local copy from unmanaged changes.
145 Broken out to a method to allow code sharing with
148 self._spawn_queue.put(copy.deepcopy(job))
150 def _receive_job(self, block=True):
152 job = self._receive_queue.get(block=block)
155 for j in list(self._blocked):
156 if job.id in j.blocks_on:
157 if not self._job_is_blocked(j, ignore_id=job.id):
158 log().debug('unblock job %s' % j)
159 self._blocked.remove(j)
160 self._spawn_queue.put(j)