efbfbf: upgrade to Bugs Everywhere Directory v1.5
[sawsim.git] / pysawsim / manager / __init__.py
1 # Copyright (C) 2010  W. Trevor King <wking@drexel.edu>
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License
14 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
15 #
16 # The author may be contacted at <wking@drexel.edu> on the Internet, or
17 # write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
18 # Philadelphia PA 19104, USA.
19
20 """Functions for running external commands on other hosts.
21
22 See the Python wiki_ for a list of parallel processing modules.
23
24 .. _wiki: http://wiki.python.org/moin/ParallelProcessing
25 """
26
27 from .. import invoke as invoke
28 from .. import log
29
30
31 MANAGERS = ['thread', 'subproc', 'mpi', 'pbs']
32 """Submodules with JobManager subclasses."""
33
34
35 class Job (object):
36     """Job request structure for `JobManager`.
37
38     >>> import copy
39
40     Jobs execute a Python function (through an interface similar to
41     threading.Thread).
42
43     >>> def job(*args, **kwargs):
44     ...     print 'executing job with %s and %s.' % (args, kwargs)
45     ...     return [4, 5]
46     >>> j = Job(id=3, target=job, args=[1,2], kwargs={'kwargA':3})
47     >>> j2 = copy.deepcopy(j)
48
49     Job status is initially `None`.
50
51     >>> print j.status
52     None
53
54     >>> j.run()
55     executing job with (1, 2) and {'kwargA': 3}.
56
57     After execution, `status` and `data` are set.
58
59     >>> j.status
60     0
61     >>> j.data
62     [4, 5]
63
64     You can copy these attributes over to another job with `copy_onto()`.
65
66     >>> j.copy_onto(j2)
67     >>> j2.status
68     0
69     >>> j2.data
70     [4, 5]
71
72     String representations are nice and simple.
73
74     >>> str(j)
75     '<Job 3>'
76     >>> repr(j)
77     '<Job 3>'
78
79     If `target` raises an exception, it is stored in `status` (for
80     successful runs, the status value is `0`).
81
82     >>> def job():
83     ...     raise Exception('error running job')
84     >>> j = Job(id=3, target=job)
85     >>> j.run()
86     >>> j.status
87     Exception('error running job',)
88     >>> print j.data
89     None
90     """
91     def __init__(self, id=None, target=None, args=None, kwargs=None,
92                  blocks_on=None):
93         if args == None:
94             args = []
95         if kwargs == None:
96             kwargs = {}
97         if blocks_on == None:
98             blocks_on = []
99         self.id = id
100         self.target = target
101         self.blocks_on = blocks_on
102         self.args = args
103         self.kwargs = kwargs
104         self.status = None
105         self.data = None
106
107     def __str__(self):
108         return '<%s %s>' % (self.__class__.__name__, self.id)
109
110     def __repr__(self):
111         return self.__str__()
112
113     def run(self):
114         try:
115             self.data = self.target(*self.args, **self.kwargs)
116         except Exception, e:
117             self.status = e
118         else:
119             self.status = 0
120
121     def copy_onto(self, other):
122         """Merge results of run onto initial job-request instance."""
123         for attr in ['status', 'data']:
124             setattr(other, attr, getattr(self, attr))
125
126
127 class InvokeJob (Job):
128     """`Job` subclass which `invoke()`\s a command line function.
129
130     >>> q = 'What is the meaning of life, the universe, and everything?'
131     >>> j = InvokeJob(id=3, target='echo "%s"' % q)
132     >>> j.run()
133     >>> j.status
134     0
135     >>> j.data['stdout']
136     'What is the meaning of life, the universe, and everything?\\n'
137     >>> j.data['stderr']
138     ''
139
140     >>> j = InvokeJob(id=3, target='missing_command')
141     >>> j.run()
142     >>> print j.status  # doctest: +ELLIPSIS
143     Command failed (127):
144       /bin/sh: missing_command: ...not found
145     <BLANKLINE>
146     <BLANKLINE>
147     while executing
148       missing_command
149     >>> j.data['stdout']
150     ''
151     >>> j.data['stderr']  # doctest: +ELLIPSIS
152     '/bin/sh: missing_command: ...not found\\n'
153     """
154     def run(self):
155         try:
156             self.status,stdout,stderr = invoke.invoke(
157                 self.target, *self.args, **self.kwargs)
158             self.data = {'stdout':stdout, 'stderr':stderr}
159         except invoke.CommandError, e:
160             self.status = e
161             self.data = {'stdout':e.stdout, 'stderr':e.stderr, 'error':e}
162
163
164 class JobManager (object):
165     """Base class for managing asynchronous `Job` execution."""
166     def __init__(self):
167         log().debug('setup %s' % self)
168         self._jobs = {}
169         self._next_id = 0
170
171     def __str__(self):
172         return '<%s %#x>' % (self.__class__.__name__, id(self))
173
174     def __repr__(self):
175         return self.__str__()
176
177     def teardown(self):
178         log().debug('teardown %s' % self)
179
180     def async_invoke(self, job):
181         id = self._next_id
182         self._next_id += 1
183         job.id = id
184         self._jobs[id] = job
185         log().debug('spawn job %s' % job)
186         self._spawn_job(job)
187         return job
188
189     def _spawn_job(self, job):
190         raise NotImplementedError
191
192     def wait(self, ids=None):
193         if ids == None:
194             ids = self._jobs.keys()
195         jobs = {}
196         log().debug('wait on %s' % ids)
197         log().debug('jobs: %s' % self._jobs)
198         for id in list(ids):  # get already completed jobs
199             if id not in self._jobs:
200                 log().debug('%d already gone' % id)
201                 ids.remove(id)
202             elif self._jobs[id].status != None:
203                 log().debug('%d already returned (%s)' % (id, self._jobs[id].status))
204                 ids.remove(id)
205                 jobs[id] = self._jobs.pop(id)
206         while len(ids) > 0:  # wait for outstanding jobs
207             job = self._receive_job()
208             self._handle_received_job(job)
209             if job.id in ids and job.id in self._jobs:
210                 jobs[job.id] = self._jobs.pop(job.id)
211                 ids.remove(job.id)
212         return jobs
213
214     def _handle_received_job(self, job):
215         log().debug('receive job %s (%s)' % (job, job.status))
216         job.copy_onto(self._jobs[job.id])
217
218     def _receive_job(self):
219         raise NotImplementedError
220
221
222 class IsSubclass (object):
223     """A safe subclass comparator.
224
225     Examples
226     --------
227
228     >>> class A (object):
229     ...     pass
230     >>> class B (A):
231     ...     pass
232     >>> C = 5
233     >>> is_subclass = IsSubclass(A)
234     >>> is_subclass(A)
235     True
236     >>> is_subclass = IsSubclass(A, blacklist=[A])
237     >>> is_subclass(A)
238     False
239     >>> is_subclass(B)
240     True
241     >>> is_subclass(C)
242     False
243     """
244     def __init__(self, base_class, blacklist=None):
245         self.base_class = base_class
246         if blacklist == None:
247             blacklist = []
248         self.blacklist = blacklist
249     def __call__(self, other):
250         try:
251             subclass = issubclass(other, self.base_class)
252         except TypeError:
253             return False
254         if other in self.blacklist:
255             return False
256         return subclass
257
258
259 def get_manager(submod=None, defaults=['subproc', 'thread']):
260     """
261     >>> get_manager('thread')
262     <class 'pysawsim.manager.thread.ThreadManager'>
263     >>> get_manager('wookie')
264     Traceback (most recent call last):
265       ...
266     AttributeError: 'module' object has no attribute 'wookie'
267     >>> m = get_manager()
268     >>> issubclass(m, JobManager)
269     True
270     """
271     if submod == None:
272         for submod in defaults:
273             try:
274                 m = get_manager(submod)
275             except ImportError:
276                 continue
277             if len(m._bugs) > 0:
278                 continue
279             return m
280         raise Exception('none of the managers in %s were enabled' % defaults)
281     this_mod = __import__(__name__, fromlist=[submod])
282     sub_mod = getattr(this_mod, submod)
283     if sub_mod._ENABLED == False:
284         raise sub_mod._DISABLING_ERROR
285     class_selector = IsSubclass(base_class=JobManager, blacklist=[JobManager])
286     for x_name in dir(sub_mod):
287         x = getattr(sub_mod, x_name)
288         if class_selector(x) == True:
289             x._bugs = [a for a in dir(sub_mod) if a.startswith('_HAS_BUG_')]
290             return x
291     raise ValueError('no JobManager found in %s' % sub_mod.__name__)