Fix a few deadlock errors in pysawsim.manager.thread.
[sawsim.git] / pysawsim / manager / __init__.py
1 # Copyright (C) 2010  W. Trevor King <wking@drexel.edu>
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License
14 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
15 #
16 # The author may be contacted at <wking@drexel.edu> on the Internet, or
17 # write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
18 # Philadelphia PA 19104, USA.
19
20 """Functions for running external commands on other hosts.
21 """
22
23 from .. import invoke as invoke
24 from .. import log
25
26
27 class Job (object):
28     """Job request structure for `JobManager`.
29
30     >>> import copy
31
32     Jobs execute a Python function (through an interface similar to
33     threading.Thread).
34
35     >>> def job(*args, **kwargs):
36     ...     print 'executing job with %s and %s.' % (args, kwargs)
37     ...     return [4, 5]
38     >>> j = Job(id=3, target=job, args=[1,2], kwargs={'kwargA':3})
39     >>> j2 = copy.deepcopy(j)
40
41     Job status is initially `None`.
42
43     >>> print j.status
44     None
45
46     >>> j.run()
47     executing job with (1, 2) and {'kwargA': 3}.
48
49     After execution, `status` and `data` are set.
50
51     >>> j.status
52     0
53     >>> j.data
54     [4, 5]
55
56     You can copy these attributes over to another job with `copy_onto()`.
57
58     >>> j.copy_onto(j2)
59     >>> j2.status
60     0
61     >>> j2.data
62     [4, 5]
63
64     String representations are nice and simple.
65
66     >>> str(j)
67     '<Job 3>'
68     >>> repr(j)
69     '<Job 3>'
70
71     If `target` raises an exception, it is stored in `status` (for
72     successful runs, the status value is `0`).
73
74     >>> def job():
75     ...     raise Exception('error running job')
76     >>> j = Job(id=3, target=job)
77     >>> j.run()
78     >>> j.status
79     Exception('error running job',)
80     >>> print j.data
81     None
82     """
83     def __init__(self, id=None, target=None, args=None, kwargs=None,
84                  blocks_on=None):
85         if args == None:
86             args = []
87         if kwargs == None:
88             kwargs = {}
89         if blocks_on == None:
90             blocks_on = []
91         self.id = id
92         self.target = target
93         self.blocks_on = blocks_on
94         self.args = args
95         self.kwargs = kwargs
96         self.status = None
97         self.data = None
98
99     def __str__(self):
100         return '<%s %d>' % (self.__class__.__name__, self.id)
101
102     def __repr__(self):
103         return self.__str__()
104
105     def run(self):
106         try:
107             self.data = self.target(*self.args, **self.kwargs)
108         except Exception, e:
109             self.status = e
110         else:
111             self.status = 0
112
113     def copy_onto(self, other):
114         """Merge results of run onto initial job-request instance."""
115         for attr in ['status', 'data']:
116             setattr(other, attr, getattr(self, attr))
117
118
119 class InvokeJob (Job):
120     """`Job` subclass which `invoke()`\s a command line function.
121
122     >>> q = 'What is the meaning of life, the universe, and everything?'
123     >>> j = InvokeJob(id=3, target='echo "%s"' % q)
124     >>> j.run()
125     >>> j.status
126     0
127     >>> j.data['stdout']
128     'What is the meaning of life, the universe, and everything?\\n'
129     >>> j.data['stderr']
130     ''
131
132     >>> j = InvokeJob(id=3, target='missing_command')
133     >>> j.run()
134     >>> print j.status
135     Command failed (127):
136       /bin/sh: missing_command: command not found
137     <BLANKLINE>
138     <BLANKLINE>
139     while executing
140       missing_command
141     >>> j.data['stdout']
142     ''
143     >>> j.data['stderr']
144     '/bin/sh: missing_command: command not found\\n'
145     """
146     def run(self):
147         try:
148             self.status,stdout,stderr = invoke.invoke(
149                 self.target, *self.args, **self.kwargs)
150             self.data = {'stdout':stdout, 'stderr':stderr}
151         except invoke.CommandError, e:
152             self.status = e
153             self.data = {'stdout':e.stdout, 'stderr':e.stderr}
154
155
156 class JobManager (object):
157     """Base class for managing asynchronous `Job` execution."""
158     def __init__(self):
159         log().debug('setup %s' % self)
160         self._jobs = {}
161         self._next_id = 0
162
163     def __str__(self):
164         return '<%s %#x>' % (self.__class__.__name__, id(self))
165
166     def __repr__(self):
167         return self.__str__()
168
169     def teardown(self):
170         log().debug('teardown %s' % self)
171
172     def async_invoke(self, job):
173         id = self._next_id
174         self._next_id += 1
175         job.id = id
176         self._jobs[id] = job
177         log().debug('spawn job %s' % job)
178         self._spawn_job(job)
179         return job
180
181     def _spawn_job(self, job):
182         raise NotImplementedError
183
184     def wait(self, ids=None):
185         if ids == None:
186             ids = self._jobs.keys()
187         jobs = {}
188         log().debug('wait on %s' % ids)
189         log().debug('jobs: %s' % self._jobs)
190         for id in list(ids):  # get already completed jobs
191             if id not in self._jobs:
192                 log().debug('%d already gone' % id)
193                 ids.remove(id)
194             elif self._jobs[id].status != None:
195                 log().debug('%d already returned (%s)' % (id, self._jobs[id].status))
196                 ids.remove(id)
197                 jobs[id] = self._jobs.pop(id)
198         while len(ids) > 0:  # wait for outstanding jobs
199             job = self._receive_job()
200             log().debug('receive job %s (%d)' % (job, job.status))
201             job.copy_onto(self._jobs[job.id])
202             if job.id in ids and job.id in self._jobs:
203                 jobs[job.id] = self._jobs.pop(job.id)
204                 ids.remove(job.id)
205         return jobs
206
207     def _receive_job(self):
208         raise NotImplementedError