Fix a few deadlock errors in pysawsim.manager.thread.
[sawsim.git] / pysawsim / manager / thread.py
1 # Copyright (C) 2010  W. Trevor King <wking@drexel.edu>
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License
14 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
15 #
16 # The author may be contacted at <wking@drexel.edu> on the Internet, or
17 # write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
18 # Philadelphia PA 19104, USA.
19
20 """Functions for running external commands on other hosts.
21 """
22
23 import copy
24 from Queue import Queue, Empty
25 import threading
26
27 from .. import log
28 from . import Job, JobManager
29
30
31 CLOSE_MESSAGE = "close"
32
33
34 class WorkerThread (threading.Thread):
35     def __init__(self, spawn_queue, receive_queue, *args, **kwargs):
36         super(WorkerThread, self).__init__(*args, **kwargs)
37         self.spawn_queue = spawn_queue
38         self.receive_queue = receive_queue
39         self.name = self.getName()  # work around Pythons < 2.6
40
41     def run(self):
42         while True:
43             msg = self.spawn_queue.get()
44             if msg == CLOSE_MESSAGE:
45                 log().debug('%s closing' % self.name)
46                 break
47             assert isinstance(msg, Job), msg
48             log().debug('%s running job %s' % (self.name, msg))
49             msg.run()
50             self.receive_queue.put(msg)
51
52
53 class ThreadManager (JobManager):
54     """Manage asynchronous `Job` execution via :mod:`threading`.
55
56     >>> from time import sleep
57     >>> from math import sqrt
58     >>> m = ThreadManager()
59     >>> group_A = []
60     >>> for i in range(10):
61     ...     t = max(0, 5-i)
62     ...     group_A.append(m.async_invoke(Job(target=sleep, args=[t])))
63     >>> group_B = []
64     >>> for i in range(10):
65     ...     group_B.append(m.async_invoke(Job(target=sqrt, args=[i],
66     ...                 blocks_on=[j.id for j in group_A])))
67     >>> jobs = m.wait(ids=[j.id for j in group_A[5:8]])
68     >>> print sorted(jobs.values(), key=lambda j: j.id)
69     [<Job 5>, <Job 6>, <Job 7>]
70     >>> jobs = m.wait()
71     >>> print sorted(jobs.values(), key=lambda j: j.id)
72     ... # doctest: +NORMALIZE_WHITESPACE
73     [<Job 0>, <Job 1>, <Job 2>, <Job 3>, <Job 4>, <Job 8>, <Job 9>, <Job 10>,
74      <Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
75      <Job 18>, <Job 19>]
76     >>> m.teardown()
77     """
78     def __init__(self, worker_pool=5):
79         super(ThreadManager, self).__init__()
80         self._blocked = []
81         self._spawn_queue = Queue()
82         self._receive_queue = Queue()
83         self._workers = []
84         for i in range(worker_pool):
85             worker = WorkerThread(spawn_queue=self._spawn_queue,
86                                   receive_queue=self._receive_queue,
87                                   name='worker-%d' % i)
88             log().debug('start thread %s' % worker.name)
89             worker.start()
90             self._workers.append(worker)
91
92     def teardown(self):
93         for worker in self._workers:
94             self._spawn_queue.put(CLOSE_MESSAGE)
95         for worker in self._workers:
96             log().debug('join thread %s' % worker.name)
97             worker.join()
98         super(ThreadManager, self).teardown()
99
100     def _job_is_blocked(self, job, ignore_id=None):
101         for id in job.blocks_on:
102             if id == ignore_id:
103                 continue
104             elif id in self._jobs and self._jobs[id].status == None:
105                 return True
106         return False
107
108     def _spawn_job(self, job):
109         self._receive_job(block=False)
110         if self._job_is_blocked(job):
111             log().debug('block job %s' % job)
112             self._blocked.append(job)
113             return
114         self._spawn_queue.put(copy.deepcopy(job))  # protect from shared memory
115
116     def _receive_job(self, block=True):
117         try:
118             job = self._receive_queue.get(block=block)
119         except Empty:
120             return
121         for j in list(self._blocked):
122             if job.id in j.blocks_on:
123                 if not self._job_is_blocked(j, ignore_id=job.id):
124                     log().debug('unblock job %s' % j)
125                     self._blocked.remove(j)
126                     self._spawn_queue.put(j)
127         return job