Add pysawsim.manager and pysawsim.manager.thread for running asynchronous jobs.
[sawsim.git] / pysawsim / manager / __init__.py
1 # Copyright (C) 2010  W. Trevor King <wking@drexel.edu>
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License
14 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
15 #
16 # The author may be contacted at <wking@drexel.edu> on the Internet, or
17 # write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
18 # Philadelphia PA 19104, USA.
19
20 """Functions for running external commands on other hosts.
21 """
22
23 from .. import invoke as invoke
24
25
26 class Job (object):
27     """
28     >>> import copy
29
30     Jobs execute a Python function (through an interface similar to
31     threading.Thread).
32
33     >>> def job(*args, **kwargs):
34     ...     print 'executing job with %s and %s.' % (args, kwargs)
35     ...     return [4, 5]
36     >>> j = Job(id=3, target=job, args=[1,2], kwargs={'kwargA':3})
37     >>> j2 = copy.deepcopy(j)
38
39     Job status is initially `None`.
40
41     >>> print j.status
42     None
43
44     >>> j.run()
45     executing job with (1, 2) and {'kwargA': 3}.
46
47     After execution, `status` and `data` are set.
48
49     >>> j.status
50     0
51     >>> j.data
52     [4, 5]
53
54     You can copy these attributes over to another job with `copy_onto()`.
55
56     >>> j.copy_onto(j2)
57     >>> j2.status
58     0
59     >>> j2.data
60     [4, 5]
61
62     String representations are nice and simple.
63
64     >>> str(j)
65     '<Job 3>'
66     >>> repr(j)
67     '<Job 3>'
68
69     If `target` raises an exception, it is stored in `status` (for
70     successful runs, the status value is `0`).
71
72     >>> def job():
73     ...     raise Exception('error running job')
74     >>> j = Job(id=3, target=job)
75     >>> j.run()
76     >>> j.status
77     Exception('error running job',)
78     >>> print j.data
79     None
80     """
81     def __init__(self, id, target, blocks_on=None, args=None, kwargs=None):
82         if blocks_on == None:
83             blocks_on = []
84         if args == None:
85             args = []
86         if kwargs == None:
87             kwargs = {}
88         self.id = id
89         self.target = target
90         self.blocks_on = blocks_on
91         self.args = args
92         self.kwargs = kwargs
93         self.status = None
94         self.data = None
95
96     def __str__(self):
97         return '<%s %d>' % (self.__class__.__name__, self.id)
98
99     def __repr__(self):
100         return self.__str__()
101
102     def run(self):
103         try:
104             self.data = self.target(*self.args, **self.kwargs)
105         except Exception, e:
106             self.status = e
107         else:
108             self.status = 0
109
110     def copy_onto(self, other):
111         """Merge results of run onto initial job-request instance."""
112         for attr in ['status', 'data']:
113             setattr(other, attr, getattr(self, attr))
114
115
116 class InvokeJob (Job):
117     """Job subclass which `invoke()`\s a command line function.
118
119     >>> q = 'What is the meaning of life, the universe, and everything?'
120     >>> j = InvokeJob(id=3, target='echo "%s"' % q)
121     >>> j.run()
122     >>> j.status
123     0
124     >>> j.data['stdout']
125     'What is the meaning of life, the universe, and everything?\\n'
126     >>> j.data['stderr']
127     ''
128
129     >>> j = InvokeJob(id=3, target='missing_command')
130     >>> j.run()
131     >>> print j.status
132     Command failed (127):
133       /bin/sh: missing_command: command not found
134     <BLANKLINE>
135     <BLANKLINE>
136     while executing
137       missing_command
138     >>> j.data['stdout']
139     ''
140     >>> j.data['stderr']
141     '/bin/sh: missing_command: command not found\\n'
142     """
143     def run(self):
144         try:
145             self.status,stdout,stderr = invoke.invoke(
146                 self.target, *self.args, **self.kwargs)
147             self.data = {'stdout':stdout, 'stderr':stderr}
148         except invoke.CommandError, e:
149             self.status = e
150             self.data = {'stdout':e.stdout, 'stderr':e.stderr}
151
152
153 class JobManager (object):
154     def __init__(self):
155         self._jobs = {}
156         self.next_id = 0
157
158     def teardown(self):
159         pass
160
161     def async_invoke(self, cmd_string, blocks_on=None, *args, **kwargs):
162         id = self.next_id
163         self.next_id += 1
164         job = Job(id, cmd_string, *args, **kwargs)
165         self._jobs[id] = job
166         self._spawn_job(job)
167         return job
168
169     def _spawn_job(self, job):
170         raise NotImplementedError
171
172     def wait(self, ids=None):
173         if ids == None:
174             ids = self._jobs.keys()
175         jobs = {}
176         for id in list(ids):  # get already completed jobs
177             if self._jobs[id] != None:
178                 jobs[id] = self._jobs.pop(id)
179                 ids.remove(id)
180         while len(ids) > 0:  # wait for outstanding jobs
181             job = self._receive_job()
182             job.copy_onto(self._jobs[job.id])
183             if job.id in ids:
184                 jobs[job.id] = self._jobs.pop(job.id)
185                 ids.remove(id)
186         return jobs
187
188     def _receive_job(self):
189         raise NotImplementedError