Add --manager option to velocity_dependant_scan.py.
[sawsim.git] / pysawsim / manager / __init__.py
1 # Copyright (C) 2010  W. Trevor King <wking@drexel.edu>
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License
14 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
15 #
16 # The author may be contacted at <wking@drexel.edu> on the Internet, or
17 # write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
18 # Philadelphia PA 19104, USA.
19
20 """Functions for running external commands on other hosts.
21 """
22
23 from .. import invoke as invoke
24 from .. import log
25
26
27 MANAGERS = ['thread', 'pbs']
28 """Submodules with JobManager subclasses."""
29
30
31 class Job (object):
32     """Job request structure for `JobManager`.
33
34     >>> import copy
35
36     Jobs execute a Python function (through an interface similar to
37     threading.Thread).
38
39     >>> def job(*args, **kwargs):
40     ...     print 'executing job with %s and %s.' % (args, kwargs)
41     ...     return [4, 5]
42     >>> j = Job(id=3, target=job, args=[1,2], kwargs={'kwargA':3})
43     >>> j2 = copy.deepcopy(j)
44
45     Job status is initially `None`.
46
47     >>> print j.status
48     None
49
50     >>> j.run()
51     executing job with (1, 2) and {'kwargA': 3}.
52
53     After execution, `status` and `data` are set.
54
55     >>> j.status
56     0
57     >>> j.data
58     [4, 5]
59
60     You can copy these attributes over to another job with `copy_onto()`.
61
62     >>> j.copy_onto(j2)
63     >>> j2.status
64     0
65     >>> j2.data
66     [4, 5]
67
68     String representations are nice and simple.
69
70     >>> str(j)
71     '<Job 3>'
72     >>> repr(j)
73     '<Job 3>'
74
75     If `target` raises an exception, it is stored in `status` (for
76     successful runs, the status value is `0`).
77
78     >>> def job():
79     ...     raise Exception('error running job')
80     >>> j = Job(id=3, target=job)
81     >>> j.run()
82     >>> j.status
83     Exception('error running job',)
84     >>> print j.data
85     None
86     """
87     def __init__(self, id=None, target=None, args=None, kwargs=None,
88                  blocks_on=None):
89         if args == None:
90             args = []
91         if kwargs == None:
92             kwargs = {}
93         if blocks_on == None:
94             blocks_on = []
95         self.id = id
96         self.target = target
97         self.blocks_on = blocks_on
98         self.args = args
99         self.kwargs = kwargs
100         self.status = None
101         self.data = None
102
103     def __str__(self):
104         return '<%s %d>' % (self.__class__.__name__, self.id)
105
106     def __repr__(self):
107         return self.__str__()
108
109     def run(self):
110         try:
111             self.data = self.target(*self.args, **self.kwargs)
112         except Exception, e:
113             self.status = e
114         else:
115             self.status = 0
116
117     def copy_onto(self, other):
118         """Merge results of run onto initial job-request instance."""
119         for attr in ['status', 'data']:
120             setattr(other, attr, getattr(self, attr))
121
122
123 class InvokeJob (Job):
124     """`Job` subclass which `invoke()`\s a command line function.
125
126     >>> q = 'What is the meaning of life, the universe, and everything?'
127     >>> j = InvokeJob(id=3, target='echo "%s"' % q)
128     >>> j.run()
129     >>> j.status
130     0
131     >>> j.data['stdout']
132     'What is the meaning of life, the universe, and everything?\\n'
133     >>> j.data['stderr']
134     ''
135
136     >>> j = InvokeJob(id=3, target='missing_command')
137     >>> j.run()
138     >>> print j.status
139     Command failed (127):
140       /bin/sh: missing_command: command not found
141     <BLANKLINE>
142     <BLANKLINE>
143     while executing
144       missing_command
145     >>> j.data['stdout']
146     ''
147     >>> j.data['stderr']
148     '/bin/sh: missing_command: command not found\\n'
149     """
150     def run(self):
151         try:
152             self.status,stdout,stderr = invoke.invoke(
153                 self.target, *self.args, **self.kwargs)
154             self.data = {'stdout':stdout, 'stderr':stderr}
155         except invoke.CommandError, e:
156             self.status = e
157             self.data = {'stdout':e.stdout, 'stderr':e.stderr}
158
159
160 class JobManager (object):
161     """Base class for managing asynchronous `Job` execution."""
162     def __init__(self):
163         log().debug('setup %s' % self)
164         self._jobs = {}
165         self._next_id = 0
166
167     def __str__(self):
168         return '<%s %#x>' % (self.__class__.__name__, id(self))
169
170     def __repr__(self):
171         return self.__str__()
172
173     def teardown(self):
174         log().debug('teardown %s' % self)
175
176     def async_invoke(self, job):
177         id = self._next_id
178         self._next_id += 1
179         job.id = id
180         self._jobs[id] = job
181         log().debug('spawn job %s' % job)
182         self._spawn_job(job)
183         return job
184
185     def _spawn_job(self, job):
186         raise NotImplementedError
187
188     def wait(self, ids=None):
189         if ids == None:
190             ids = self._jobs.keys()
191         jobs = {}
192         log().debug('wait on %s' % ids)
193         log().debug('jobs: %s' % self._jobs)
194         for id in list(ids):  # get already completed jobs
195             if id not in self._jobs:
196                 log().debug('%d already gone' % id)
197                 ids.remove(id)
198             elif self._jobs[id].status != None:
199                 log().debug('%d already returned (%s)' % (id, self._jobs[id].status))
200                 ids.remove(id)
201                 jobs[id] = self._jobs.pop(id)
202         while len(ids) > 0:  # wait for outstanding jobs
203             job = self._receive_job()
204             log().debug('receive job %s (%d)' % (job, job.status))
205             job.copy_onto(self._jobs[job.id])
206             if job.id in ids and job.id in self._jobs:
207                 jobs[job.id] = self._jobs.pop(job.id)
208                 ids.remove(job.id)
209         return jobs
210
211     def _receive_job(self):
212         raise NotImplementedError
213
214
215 class IsSubclass (object):
216     """A safe subclass comparator.
217
218     Examples
219     --------
220
221     >>> class A (object):
222     ...     pass
223     >>> class B (A):
224     ...     pass
225     >>> C = 5
226     >>> is_subclass = IsSubclass(A)
227     >>> is_subclass(A)
228     True
229     >>> is_subclass = IsSubclass(A, blacklist=[A])
230     >>> is_subclass(A)
231     False
232     >>> is_subclass(B)
233     True
234     >>> is_subclass(C)
235     False
236     """
237     def __init__(self, base_class, blacklist=None):
238         self.base_class = base_class
239         if blacklist == None:
240             blacklist = []
241         self.blacklist = blacklist
242     def __call__(self, other):
243         try:
244             subclass = issubclass(other, self.base_class)
245         except TypeError:
246             return False
247         if other in self.blacklist:
248             return False
249         return subclass
250
251
252 def get_manager(submod):
253     """
254     >>> get_manager('thread')
255     <class 'pysawsim.manager.thread.ThreadManager'>
256     >>> get_manager('wookie')
257     Traceback (most recent call last):
258       ...
259     AttributeError: 'module' object has no attribute 'wookie'
260     """
261     this_mod = __import__(__name__, fromlist=[submod])
262     sub_mod = getattr(this_mod, submod)
263     class_selector = IsSubclass(base_class=JobManager, blacklist=[JobManager])
264     for x_name in dir(sub_mod):
265         x = getattr(sub_mod, x_name)
266         if class_selector(x) == True:
267             return x
268     raise ValueError('no JobManager found in %s' % sub_mod.__name__)