Add pysawsim.manager and pysawsim.manager.thread for running asynchronous jobs.
[sawsim.git] / pysawsim / manager / thread.py
1 # Copyright (C) 2010  W. Trevor King <wking@drexel.edu>
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License
14 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
15 #
16 # The author may be contacted at <wking@drexel.edu> on the Internet, or
17 # write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
18 # Philadelphia PA 19104, USA.
19
20 """Functions for running external commands on other hosts.
21 """
22
23 from Queue import Queue
24 import threading
25
26 from .. import log
27 from . import Job, JobManager
28
29
30 CLOSE_MESSAGE = "close"
31
32
33 class WorkerThread (threading.Thread):
34     def __init__(self, spawn_queue, receive_queue, *args, **kwargs):
35         super(WorkerThread, self).__init__(*args, **kwargs)
36         self.spawn_queue = spawn_queue
37         self.receive_queue = receive_queue
38
39     def run(self):
40         while True:
41             msg = self.spawn_queue.get()
42             if msg == CLOSE_MESSAGE:
43                 break
44             assert isinstance(msg, Job), msg
45             msg.run()
46             self.receive_queue.put(msg)
47
48
49 class ThreadManager (JobManager):
50     """
51     >>> t = ThreadManager()
52     >>> group_A = []
53     >>> for i in range(10):
54     ...     group_A.append(t.async_invoke('echo "%d"' % i))
55     >>> group_B = []
56     >>> for i in range(10):
57     ...     group_B.append(t.async_invoke('echo "%d"' % i,
58     ...                                   blocks_on=[j.id for j in group_A]))
59     >>> jobs = t.wait(ids=[j.id for j in group_A[5:8]])
60     >>> print sorted(jobs.values(), key=lambda j: j.id)
61     [<Job 5>, <Job 6>, <Job 7>]
62     >>> jobs = t.wait()
63     >>> print sorted(jobs.values(), key=lambda j: j.id)
64     ... # doctest: +NORMALIZE_WHITESPACE
65     [<Job 0>, <Job 1>, <Job 2>, <Job 3>, <Job 4>, <Job 8>, <Job 9>, <Job 10>,
66      <Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
67      <Job 18>, <Job 19>]
68     >>> t.teardown()
69     """
70     def __init__(self, worker_pool=5):
71         super(ThreadManager, self).__init__()
72         self._blocked = []
73         self._spawn_queue = Queue()
74         self._receive_queue = Queue()
75         self._workers = []
76         for i in range(worker_pool):
77             worker = WorkerThread(spawn_queue=self._spawn_queue,
78                                   receive_queue=self._receive_queue,
79                                   name='worker-%d' % i)
80             log().debug('start thread %s' % worker.name)
81             worker.start()
82             self._workers.append(worker)
83
84     def teardown(self):
85         for worker in self._workers:
86             self._spawn_queue.put(CLOSE_MESSAGE)
87         for worker in self._workers:
88             log().debug('join thread %s' % worker.name)
89             worker.join()
90         super(ThreadManager, self).teardown()
91
92     def _job_is_blocked(self, job):
93         for id in job.blocks_on:
94             if id in self._jobs and self.jobs[id].status == None:
95                 return True
96         return False
97
98     def _spawn_job(self, job):
99         if self._job_is_blocked(job):
100             self._blocked.append(job)
101         log().debug('queue job %s' % job)
102         self._spawn_queue.put(job)
103
104     def _receive_job(self):
105         job = self._receive_queue.get()
106         for j in self._blocked:
107             if job.id in j.blocks_on:
108                 if not self._job_is_blocked(j):
109                     self._blocked.remove(j)
110                     log().debug('queue job %s' % j)
111                     self._spawn_queue.put(j)
112         log().debug('receive job %s' % job)
113         return job