JobManager.async_invoke() should accept Job instances.
[sawsim.git] / pysawsim / manager / thread.py
1 # Copyright (C) 2010  W. Trevor King <wking@drexel.edu>
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License
14 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
15 #
16 # The author may be contacted at <wking@drexel.edu> on the Internet, or
17 # write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
18 # Philadelphia PA 19104, USA.
19
20 """Functions for running external commands on other hosts.
21 """
22
23 from Queue import Queue
24 import threading
25
26 from .. import log
27 from . import Job, JobManager
28
29
30 CLOSE_MESSAGE = "close"
31
32
33 class WorkerThread (threading.Thread):
34     def __init__(self, spawn_queue, receive_queue, *args, **kwargs):
35         super(WorkerThread, self).__init__(*args, **kwargs)
36         self.spawn_queue = spawn_queue
37         self.receive_queue = receive_queue
38
39     def run(self):
40         while True:
41             msg = self.spawn_queue.get()
42             if msg == CLOSE_MESSAGE:
43                 break
44             assert isinstance(msg, Job), msg
45             msg.run()
46             self.receive_queue.put(msg)
47
48
49 class ThreadManager (JobManager):
50     """Manage asynchronous `Job` execution via :mod:`threading`.
51
52     >>> from math import sqrt
53     >>> t = ThreadManager()
54     >>> group_A = []
55     >>> for i in range(10):
56     ...     group_A.append(t.async_invoke(Job(target=sqrt, args=[i])))
57     >>> group_B = []
58     >>> for i in range(10):
59     ...     group_B.append(t.async_invoke(Job(target=sqrt, args=[i],
60     ...                 blocks_on=[j.id for j in group_A])))
61     >>> jobs = t.wait(ids=[j.id for j in group_A[5:8]])
62     >>> print sorted(jobs.values(), key=lambda j: j.id)
63     [<Job 5>, <Job 6>, <Job 7>]
64     >>> jobs = t.wait()
65     >>> print sorted(jobs.values(), key=lambda j: j.id)
66     ... # doctest: +NORMALIZE_WHITESPACE
67     [<Job 0>, <Job 1>, <Job 2>, <Job 3>, <Job 4>, <Job 8>, <Job 9>, <Job 10>,
68      <Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
69      <Job 18>, <Job 19>]
70     >>> t.teardown()
71     """
72     def __init__(self, worker_pool=5):
73         super(ThreadManager, self).__init__()
74         self._blocked = []
75         self._spawn_queue = Queue()
76         self._receive_queue = Queue()
77         self._workers = []
78         for i in range(worker_pool):
79             worker = WorkerThread(spawn_queue=self._spawn_queue,
80                                   receive_queue=self._receive_queue,
81                                   name='worker-%d' % i)
82             log().debug('start thread %s' % worker.name)
83             worker.start()
84             self._workers.append(worker)
85
86     def teardown(self):
87         for worker in self._workers:
88             self._spawn_queue.put(CLOSE_MESSAGE)
89         for worker in self._workers:
90             log().debug('join thread %s' % worker.name)
91             worker.join()
92         super(ThreadManager, self).teardown()
93
94     def _job_is_blocked(self, job):
95         for id in job.blocks_on:
96             if id in self._jobs and self._jobs[id].status == None:
97                 return True
98         return False
99
100     def _spawn_job(self, job):
101         if self._job_is_blocked(job):
102             self._blocked.append(job)
103         log().debug('queue job %s' % job)
104         self._spawn_queue.put(job)
105
106     def _receive_job(self):
107         job = self._receive_queue.get()
108         for j in self._blocked:
109             if job.id in j.blocks_on:
110                 if not self._job_is_blocked(j):
111                     self._blocked.remove(j)
112                     log().debug('queue job %s' % j)
113                     self._spawn_queue.put(j)
114         log().debug('receive job %s' % job)
115         return job