f7b2cb53bfc3dc181b0b91df7b3d168c94b0a085
[sawsim.git] / pysawsim / manager / __init__.py
1 # Copyright (C) 2010  W. Trevor King <wking@drexel.edu>
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License
14 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
15 #
16 # The author may be contacted at <wking@drexel.edu> on the Internet, or
17 # write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
18 # Philadelphia PA 19104, USA.
19
20 """Functions for running external commands on other hosts.
21 """
22
23 from .. import invoke as invoke
24 from .. import log
25
26
27 SUBMODS = ['thread', 'pbs']
28
29
30 class Job (object):
31     """Job request structure for `JobManager`.
32
33     >>> import copy
34
35     Jobs execute a Python function (through an interface similar to
36     threading.Thread).
37
38     >>> def job(*args, **kwargs):
39     ...     print 'executing job with %s and %s.' % (args, kwargs)
40     ...     return [4, 5]
41     >>> j = Job(id=3, target=job, args=[1,2], kwargs={'kwargA':3})
42     >>> j2 = copy.deepcopy(j)
43
44     Job status is initially `None`.
45
46     >>> print j.status
47     None
48
49     >>> j.run()
50     executing job with (1, 2) and {'kwargA': 3}.
51
52     After execution, `status` and `data` are set.
53
54     >>> j.status
55     0
56     >>> j.data
57     [4, 5]
58
59     You can copy these attributes over to another job with `copy_onto()`.
60
61     >>> j.copy_onto(j2)
62     >>> j2.status
63     0
64     >>> j2.data
65     [4, 5]
66
67     String representations are nice and simple.
68
69     >>> str(j)
70     '<Job 3>'
71     >>> repr(j)
72     '<Job 3>'
73
74     If `target` raises an exception, it is stored in `status` (for
75     successful runs, the status value is `0`).
76
77     >>> def job():
78     ...     raise Exception('error running job')
79     >>> j = Job(id=3, target=job)
80     >>> j.run()
81     >>> j.status
82     Exception('error running job',)
83     >>> print j.data
84     None
85     """
86     def __init__(self, id=None, target=None, args=None, kwargs=None,
87                  blocks_on=None):
88         if args == None:
89             args = []
90         if kwargs == None:
91             kwargs = {}
92         if blocks_on == None:
93             blocks_on = []
94         self.id = id
95         self.target = target
96         self.blocks_on = blocks_on
97         self.args = args
98         self.kwargs = kwargs
99         self.status = None
100         self.data = None
101
102     def __str__(self):
103         return '<%s %d>' % (self.__class__.__name__, self.id)
104
105     def __repr__(self):
106         return self.__str__()
107
108     def run(self):
109         try:
110             self.data = self.target(*self.args, **self.kwargs)
111         except Exception, e:
112             self.status = e
113         else:
114             self.status = 0
115
116     def copy_onto(self, other):
117         """Merge results of run onto initial job-request instance."""
118         for attr in ['status', 'data']:
119             setattr(other, attr, getattr(self, attr))
120
121
122 class InvokeJob (Job):
123     """`Job` subclass which `invoke()`\s a command line function.
124
125     >>> q = 'What is the meaning of life, the universe, and everything?'
126     >>> j = InvokeJob(id=3, target='echo "%s"' % q)
127     >>> j.run()
128     >>> j.status
129     0
130     >>> j.data['stdout']
131     'What is the meaning of life, the universe, and everything?\\n'
132     >>> j.data['stderr']
133     ''
134
135     >>> j = InvokeJob(id=3, target='missing_command')
136     >>> j.run()
137     >>> print j.status
138     Command failed (127):
139       /bin/sh: missing_command: command not found
140     <BLANKLINE>
141     <BLANKLINE>
142     while executing
143       missing_command
144     >>> j.data['stdout']
145     ''
146     >>> j.data['stderr']
147     '/bin/sh: missing_command: command not found\\n'
148     """
149     def run(self):
150         try:
151             self.status,stdout,stderr = invoke.invoke(
152                 self.target, *self.args, **self.kwargs)
153             self.data = {'stdout':stdout, 'stderr':stderr}
154         except invoke.CommandError, e:
155             self.status = e
156             self.data = {'stdout':e.stdout, 'stderr':e.stderr}
157
158
159 class JobManager (object):
160     """Base class for managing asynchronous `Job` execution."""
161     def __init__(self):
162         log().debug('setup %s' % self)
163         self._jobs = {}
164         self._next_id = 0
165
166     def __str__(self):
167         return '<%s %#x>' % (self.__class__.__name__, id(self))
168
169     def __repr__(self):
170         return self.__str__()
171
172     def teardown(self):
173         log().debug('teardown %s' % self)
174
175     def async_invoke(self, job):
176         id = self._next_id
177         self._next_id += 1
178         job.id = id
179         self._jobs[id] = job
180         log().debug('spawn job %s' % job)
181         self._spawn_job(job)
182         return job
183
184     def _spawn_job(self, job):
185         raise NotImplementedError
186
187     def wait(self, ids=None):
188         if ids == None:
189             ids = self._jobs.keys()
190         jobs = {}
191         log().debug('wait on %s' % ids)
192         log().debug('jobs: %s' % self._jobs)
193         for id in list(ids):  # get already completed jobs
194             if id not in self._jobs:
195                 log().debug('%d already gone' % id)
196                 ids.remove(id)
197             elif self._jobs[id].status != None:
198                 log().debug('%d already returned (%s)' % (id, self._jobs[id].status))
199                 ids.remove(id)
200                 jobs[id] = self._jobs.pop(id)
201         while len(ids) > 0:  # wait for outstanding jobs
202             job = self._receive_job()
203             log().debug('receive job %s (%d)' % (job, job.status))
204             job.copy_onto(self._jobs[job.id])
205             if job.id in ids and job.id in self._jobs:
206                 jobs[job.id] = self._jobs.pop(job.id)
207                 ids.remove(job.id)
208         return jobs
209
210     def _receive_job(self):
211         raise NotImplementedError
212
213
214 class IsSubclass (object):
215     """A safe subclass comparator.
216
217     Examples
218     --------
219
220     >>> class A (object):
221     ...     pass
222     >>> class B (A):
223     ...     pass
224     >>> C = 5
225     >>> is_subclass = IsSubclass(A)
226     >>> is_subclass(A)
227     True
228     >>> is_subclass = IsSubclass(A, blacklist=[A])
229     >>> is_subclass(A)
230     False
231     >>> is_subclass(B)
232     True
233     >>> is_subclass(C)
234     False
235     """
236     def __init__(self, base_class, blacklist=None):
237         self.base_class = base_class
238         if blacklist == None:
239             blacklist = []
240         self.blacklist = blacklist
241     def __call__(self, other):
242         try:
243             subclass = issubclass(other, self.base_class)
244         except TypeError:
245             return False
246         if other in self.blacklist:
247             return False
248         return subclass
249
250
251 def get_manager(submod):
252     """
253     >>> get_manager('thread')
254     <class 'pysawsim.manager.thread.ThreadManager'>
255     >>> get_manager('wookie')
256     Traceback (most recent call last):
257       ...
258     AttributeError: 'module' object has no attribute 'wookie'
259     """
260     this_mod = __import__(__name__, fromlist=[submod])
261     sub_mod = getattr(this_mod, submod)
262     class_selector = IsSubclass(base_class=JobManager, blacklist=[JobManager])
263     for x_name in dir(sub_mod):
264         x = getattr(sub_mod, x_name)
265         if class_selector(x) == True:
266             return x
267     raise ValueError('no JobManager found in %s' % sub_mod.__name__)