JobManager.async_invoke() should accept Job instances.
[sawsim.git] / pysawsim / manager / __init__.py
1 # Copyright (C) 2010  W. Trevor King <wking@drexel.edu>
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License
14 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
15 #
16 # The author may be contacted at <wking@drexel.edu> on the Internet, or
17 # write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
18 # Philadelphia PA 19104, USA.
19
20 """Functions for running external commands on other hosts.
21 """
22
23 from .. import invoke as invoke
24
25
26 class Job (object):
27     """Job request structure for `JobManager`.
28
29     >>> import copy
30
31     Jobs execute a Python function (through an interface similar to
32     threading.Thread).
33
34     >>> def job(*args, **kwargs):
35     ...     print 'executing job with %s and %s.' % (args, kwargs)
36     ...     return [4, 5]
37     >>> j = Job(id=3, target=job, args=[1,2], kwargs={'kwargA':3})
38     >>> j2 = copy.deepcopy(j)
39
40     Job status is initially `None`.
41
42     >>> print j.status
43     None
44
45     >>> j.run()
46     executing job with (1, 2) and {'kwargA': 3}.
47
48     After execution, `status` and `data` are set.
49
50     >>> j.status
51     0
52     >>> j.data
53     [4, 5]
54
55     You can copy these attributes over to another job with `copy_onto()`.
56
57     >>> j.copy_onto(j2)
58     >>> j2.status
59     0
60     >>> j2.data
61     [4, 5]
62
63     String representations are nice and simple.
64
65     >>> str(j)
66     '<Job 3>'
67     >>> repr(j)
68     '<Job 3>'
69
70     If `target` raises an exception, it is stored in `status` (for
71     successful runs, the status value is `0`).
72
73     >>> def job():
74     ...     raise Exception('error running job')
75     >>> j = Job(id=3, target=job)
76     >>> j.run()
77     >>> j.status
78     Exception('error running job',)
79     >>> print j.data
80     None
81     """
82     def __init__(self, id=None, target=None, args=None, kwargs=None,
83                  blocks_on=None):
84         if args == None:
85             args = []
86         if kwargs == None:
87             kwargs = {}
88         if blocks_on == None:
89             blocks_on = []
90         self.id = id
91         self.target = target
92         self.blocks_on = blocks_on
93         self.args = args
94         self.kwargs = kwargs
95         self.status = None
96         self.data = None
97
98     def __str__(self):
99         return '<%s %d>' % (self.__class__.__name__, self.id)
100
101     def __repr__(self):
102         return self.__str__()
103
104     def run(self):
105         try:
106             self.data = self.target(*self.args, **self.kwargs)
107         except Exception, e:
108             self.status = e
109         else:
110             self.status = 0
111
112     def copy_onto(self, other):
113         """Merge results of run onto initial job-request instance."""
114         for attr in ['status', 'data']:
115             setattr(other, attr, getattr(self, attr))
116
117
118 class InvokeJob (Job):
119     """`Job` subclass which `invoke()`\s a command line function.
120
121     >>> q = 'What is the meaning of life, the universe, and everything?'
122     >>> j = InvokeJob(id=3, target='echo "%s"' % q)
123     >>> j.run()
124     >>> j.status
125     0
126     >>> j.data['stdout']
127     'What is the meaning of life, the universe, and everything?\\n'
128     >>> j.data['stderr']
129     ''
130
131     >>> j = InvokeJob(id=3, target='missing_command')
132     >>> j.run()
133     >>> print j.status
134     Command failed (127):
135       /bin/sh: missing_command: command not found
136     <BLANKLINE>
137     <BLANKLINE>
138     while executing
139       missing_command
140     >>> j.data['stdout']
141     ''
142     >>> j.data['stderr']
143     '/bin/sh: missing_command: command not found\\n'
144     """
145     def run(self):
146         try:
147             self.status,stdout,stderr = invoke.invoke(
148                 self.target, *self.args, **self.kwargs)
149             self.data = {'stdout':stdout, 'stderr':stderr}
150         except invoke.CommandError, e:
151             self.status = e
152             self.data = {'stdout':e.stdout, 'stderr':e.stderr}
153
154
155 class JobManager (object):
156     """Base class for managing asynchronous `Job` execution."""
157     def __init__(self):
158         self._jobs = {}
159         self.next_id = 0
160
161     def teardown(self):
162         pass
163
164     def async_invoke(self, job):
165         id = self.next_id
166         self.next_id += 1
167         job.id = id
168         self._jobs[id] = job
169         self._spawn_job(job)
170         return job
171
172     def _spawn_job(self, job):
173         raise NotImplementedError
174
175     def wait(self, ids=None):
176         if ids == None:
177             ids = self._jobs.keys()
178         jobs = {}
179         for id in list(ids):  # get already completed jobs
180             if self._jobs[id] != None:
181                 jobs[id] = self._jobs.pop(id)
182                 ids.remove(id)
183         while len(ids) > 0:  # wait for outstanding jobs
184             job = self._receive_job()
185             job.copy_onto(self._jobs[job.id])
186             if job.id in ids:
187                 jobs[job.id] = self._jobs.pop(job.id)
188                 ids.remove(id)
189         return jobs
190
191     def _receive_job(self):
192         raise NotImplementedError