1 # Copyright (C) 2010 W. Trevor King <wking@drexel.edu>
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 # The author may be contacted at <wking@drexel.edu> on the Internet, or
17 # write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
18 # Philadelphia PA 19104, USA.
20 """Functions for running external commands on other hosts.
23 from Queue import Queue
27 from . import Job, JobManager
30 CLOSE_MESSAGE = "close"
33 class WorkerThread (threading.Thread):
34 def __init__(self, spawn_queue, receive_queue, *args, **kwargs):
35 super(WorkerThread, self).__init__(*args, **kwargs)
36 self.spawn_queue = spawn_queue
37 self.receive_queue = receive_queue
41 msg = self.spawn_queue.get()
42 if msg == CLOSE_MESSAGE:
44 assert isinstance(msg, Job), msg
46 self.receive_queue.put(msg)
49 class ThreadManager (JobManager):
51 >>> t = ThreadManager()
53 >>> for i in range(10):
54 ... group_A.append(t.async_invoke('echo "%d"' % i))
56 >>> for i in range(10):
57 ... group_B.append(t.async_invoke('echo "%d"' % i,
58 ... blocks_on=[j.id for j in group_A]))
59 >>> jobs = t.wait(ids=[j.id for j in group_A[5:8]])
60 >>> print sorted(jobs.values(), key=lambda j: j.id)
61 [<Job 5>, <Job 6>, <Job 7>]
63 >>> print sorted(jobs.values(), key=lambda j: j.id)
64 ... # doctest: +NORMALIZE_WHITESPACE
65 [<Job 0>, <Job 1>, <Job 2>, <Job 3>, <Job 4>, <Job 8>, <Job 9>, <Job 10>,
66 <Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
70 def __init__(self, worker_pool=5):
71 super(ThreadManager, self).__init__()
73 self._spawn_queue = Queue()
74 self._receive_queue = Queue()
76 for i in range(worker_pool):
77 worker = WorkerThread(spawn_queue=self._spawn_queue,
78 receive_queue=self._receive_queue,
80 log().debug('start thread %s' % worker.name)
82 self._workers.append(worker)
85 for worker in self._workers:
86 self._spawn_queue.put(CLOSE_MESSAGE)
87 for worker in self._workers:
88 log().debug('join thread %s' % worker.name)
90 super(ThreadManager, self).teardown()
92 def _job_is_blocked(self, job):
93 for id in job.blocks_on:
94 if id in self._jobs and self.jobs[id].status == None:
98 def _spawn_job(self, job):
99 if self._job_is_blocked(job):
100 self._blocked.append(job)
101 log().debug('queue job %s' % job)
102 self._spawn_queue.put(job)
104 def _receive_job(self):
105 job = self._receive_queue.get()
106 for j in self._blocked:
107 if job.id in j.blocks_on:
108 if not self._job_is_blocked(j):
109 self._blocked.remove(j)
110 log().debug('queue job %s' % j)
111 self._spawn_queue.put(j)
112 log().debug('receive job %s' % job)