1 # Copyright (C) 2009-2010 W. Trevor King <wking@drexel.edu>
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation; either version 2 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License along
14 # with this program; if not, write to the Free Software Foundation, Inc.,
15 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 Functions for running external commands in subprocesses.
21 from subprocess import Popen, PIPE
25 from encoding import get_encoding
26 if libbe.TESTING == True:
29 _MSWINDOWS = sys.platform == 'win32'
30 _POSIX = not _MSWINDOWS
36 class CommandError(Exception):
37 def __init__(self, command, status, stdout=None, stderr=None):
38 strerror = ['Command failed (%d):\n %s\n' % (status, stderr),
39 'while executing\n %s' % str(command)]
40 Exception.__init__(self, '\n'.join(strerror))
41 self.command = command
46 def invoke(args, stdin=None, stdout=PIPE, stderr=PIPE, expect=(0,),
47 cwd=None, unicode_output=True, verbose=False, encoding=None):
49 expect should be a tuple of allowed exit codes. cwd should be
50 the directory from which the command will be executed. When
51 unicode_output == True, convert stdout and stdin strings to
52 unicode before returing them.
57 print >> sys.stderr, '%s$ %s' % (cwd, ' '.join(args))
60 q = Popen(args, stdin=PIPE, stdout=stdout, stderr=stderr, cwd=cwd)
62 assert _MSWINDOWS==True, 'invalid platform'
63 # win32 don't have os.execvp() so have to run command in a shell
64 q = Popen(args, stdin=PIPE, stdout=stdout, stderr=stderr,
67 raise CommandError(args, status=e.args[0], stderr=e)
68 stdout,stderr = q.communicate(input=stdin)
70 if unicode_output == True:
72 encoding = get_encoding()
74 stdout = unicode(stdout, encoding)
76 stderr = unicode(stderr, encoding)
78 print >> sys.stderr, '%d\n%s%s' % (status, stdout, stderr)
79 if status not in expect:
80 raise CommandError(args, status, stdout, stderr)
81 return status, stdout, stderr
85 Simple interface for executing POSIX-style pipes based on the
86 subprocess module. The only complication is the adaptation of
87 subprocess.Popen._comminucate to listen to the stderrs of all
88 processes involved in the pipe, as well as the terminal process'
89 stdout. There are two implementations of Pipe._communicate, one
90 for MS Windows, and one for POSIX systems. The MS Windows
91 implementation is currently untested.
93 >>> p = Pipe([['find', '/etc/'], ['grep', '^/etc/ssh$']])
100 >>> p.stderrs # doctest: +ELLIPSIS
101 [...find: ...: Permission denied..., '']
103 def __init__(self, cmds, stdin=None):
107 if len(self._procs) != 0:
108 stdin = self._procs[-1].stdout
109 self._procs.append(Popen(cmd, stdin=stdin, stdout=PIPE, stderr=PIPE))
111 self.stdout,self.stderrs = self._communicate(input=None)
113 # collect process statuses
116 for proc in self._procs:
117 self.statuses.append(proc.wait())
118 if self.statuses[-1] != 0:
119 self.status = self.statuses[-1]
121 # Code excerpted from subprocess.Popen._communicate()
122 if _MSWINDOWS == True:
123 def _communicate(self, input=None):
124 assert input == None, 'stdin != None not yet supported'
125 # listen to each process' stderr
128 for proc in self._procs:
130 thread = Thread(target=proc._readerthread,
131 args=(proc.stderr, stderr_array))
132 thread.setDaemon(True)
134 threads.append(thread)
135 std_X_arrays.append(stderr_array)
137 # also listen to the last processes stdout
139 thread = Thread(target=proc._readerthread,
140 args=(proc.stdout, stdout_array))
141 thread.setDaemon(True)
143 threads.append(thread)
144 std_X_arrays.append(stdout_array)
146 # join threads as they die
147 for thread in threads:
150 # read output from reader threads
152 for std_X_array in std_X_arrays:
153 std_X_strings.append(std_X_array[0])
155 stdout = std_X_strings.pop(-1)
156 stderrs = std_X_strings
157 return (stdout, stderrs)
159 assert _POSIX==True, 'invalid platform'
160 def _communicate(self, input=None):
164 stdout = None # Return
165 stderr = None # Return
167 if self._procs[0].stdin:
168 # Flush stdio buffer. This might block, if the user has
169 # been writing to .stdin in an uncontrolled fashion.
170 self._procs[0].stdin.flush()
172 write_set.append(self._procs[0].stdin)
174 self._procs[0].stdin.close()
175 for proc in self._procs:
176 read_set.append(proc.stderr)
177 read_arrays.append([])
178 read_set.append(self._procs[-1].stdout)
179 read_arrays.append([])
182 while read_set or write_set:
184 rlist, wlist, xlist = select.select(read_set, write_set, [])
185 except select.error, e:
186 if e.args[0] == errno.EINTR:
189 if self._procs[0].stdin in wlist:
190 # When select has indicated that the file is writable,
191 # we can write up to PIPE_BUF bytes without risk
192 # blocking. POSIX defines PIPE_BUF >= 512
193 chunk = input[input_offset : input_offset + 512]
194 bytes_written = os.write(self.stdin.fileno(), chunk)
195 input_offset += bytes_written
196 if input_offset >= len(input):
197 self._procs[0].stdin.close()
198 write_set.remove(self._procs[0].stdin)
199 if self._procs[-1].stdout in rlist:
200 data = os.read(self._procs[-1].stdout.fileno(), 1024)
202 self._procs[-1].stdout.close()
203 read_set.remove(self._procs[-1].stdout)
204 read_arrays[-1].append(data)
205 for i,proc in enumerate(self._procs):
206 if proc.stderr in rlist:
207 data = os.read(proc.stderr.fileno(), 1024)
210 read_set.remove(proc.stderr)
211 read_arrays[i].append(data)
213 # All data exchanged. Translate lists into strings.
215 for read_array in read_arrays:
216 read_strings.append(''.join(read_array))
218 stdout = read_strings.pop(-1)
219 stderrs = read_strings
220 return (stdout, stderrs)
222 if libbe.TESTING == True:
223 suite = doctest.DocTestSuite()