Merged Anton Batenev's report of Nicolas Alvarez' unicode-in-be-new bug
[be.git] / libbe / util / subproc.py
1 # Copyright (C) 2009-2010 W. Trevor King <wking@drexel.edu>
2 #
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation; either version 2 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License along
14 # with this program; if not, write to the Free Software Foundation, Inc.,
15 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16
17 """
18 Functions for running external commands in subprocesses.
19 """
20
21 from subprocess import Popen, PIPE
22 import sys
23
24 import libbe
25 from encoding import get_encoding
26 if libbe.TESTING == True:
27     import doctest
28
29 _MSWINDOWS = sys.platform == 'win32'
30 _POSIX = not _MSWINDOWS
31
32 if _POSIX == True:
33     import os
34     import select
35
36 class CommandError(Exception):
37     def __init__(self, command, status, stdout=None, stderr=None):
38         strerror = ['Command failed (%d):\n  %s\n' % (status, stderr),
39                     'while executing\n  %s' % str(command)]
40         Exception.__init__(self, '\n'.join(strerror))
41         self.command = command
42         self.status = status
43         self.stdout = stdout
44         self.stderr = stderr
45
46 def invoke(args, stdin=None, stdout=PIPE, stderr=PIPE, expect=(0,),
47            cwd=None, unicode_output=True, verbose=False, encoding=None):
48     """
49     expect should be a tuple of allowed exit codes.  cwd should be
50     the directory from which the command will be executed.  When
51     unicode_output == True, convert stdout and stdin strings to
52     unicode before returing them.
53     """
54     if cwd == None:
55         cwd = '.'
56     if verbose == True:
57         print >> sys.stderr, '%s$ %s' % (cwd, ' '.join(args))
58     try :
59         if _POSIX:
60             q = Popen(args, stdin=PIPE, stdout=stdout, stderr=stderr, cwd=cwd)
61         else:
62             assert _MSWINDOWS==True, 'invalid platform'
63             # win32 don't have os.execvp() so have to run command in a shell
64             q = Popen(args, stdin=PIPE, stdout=stdout, stderr=stderr,
65                       shell=True, cwd=cwd)
66     except OSError, e:
67         raise CommandError(args, status=e.args[0], stderr=e)
68     stdout,stderr = q.communicate(input=stdin)
69     status = q.wait()
70     if unicode_output == True:
71         if encoding == None:
72             encoding = get_encoding()
73         if stdout != None:
74             stdout = unicode(stdout, encoding)
75         if stderr != None:
76             stderr = unicode(stderr, encoding)
77     if verbose == True:
78         print >> sys.stderr, '%d\n%s%s' % (status, stdout, stderr)
79     if status not in expect:
80         raise CommandError(args, status, stdout, stderr)
81     return status, stdout, stderr
82
83 class Pipe (object):
84     """
85     Simple interface for executing POSIX-style pipes based on the
86     subprocess module.  The only complication is the adaptation of
87     subprocess.Popen._comminucate to listen to the stderrs of all
88     processes involved in the pipe, as well as the terminal process'
89     stdout.  There are two implementations of Pipe._communicate, one
90     for MS Windows, and one for POSIX systems.  The MS Windows
91     implementation is currently untested.
92
93     >>> p = Pipe([['find', '/etc/'], ['grep', '^/etc/ssh$']])
94     >>> p.stdout
95     '/etc/ssh\\n'
96     >>> p.status
97     1
98     >>> p.statuses
99     [1, 0]
100     >>> p.stderrs # doctest: +ELLIPSIS
101     [...find: ...: Permission denied..., '']
102     """
103     def __init__(self, cmds, stdin=None):
104         # spawn processes
105         self._procs = []
106         for cmd in cmds:
107             if len(self._procs) != 0:
108                 stdin = self._procs[-1].stdout
109             self._procs.append(Popen(cmd, stdin=stdin, stdout=PIPE, stderr=PIPE))
110
111         self.stdout,self.stderrs = self._communicate(input=None)
112
113         # collect process statuses
114         self.statuses = []
115         self.status = 0
116         for proc in self._procs:
117             self.statuses.append(proc.wait())
118             if self.statuses[-1] != 0:
119                 self.status = self.statuses[-1]
120
121     # Code excerpted from subprocess.Popen._communicate()
122     if _MSWINDOWS == True:
123         def _communicate(self, input=None):
124             assert input == None, 'stdin != None not yet supported'
125             # listen to each process' stderr
126             threads = []
127             std_X_arrays = []
128             for proc in self._procs:
129                 stderr_array = []
130                 thread = Thread(target=proc._readerthread,
131                                 args=(proc.stderr, stderr_array))
132                 thread.setDaemon(True)
133                 thread.start()
134                 threads.append(thread)
135                 std_X_arrays.append(stderr_array)
136
137             # also listen to the last processes stdout
138             stdout_array = []
139             thread = Thread(target=proc._readerthread,
140                             args=(proc.stdout, stdout_array))
141             thread.setDaemon(True)
142             thread.start()
143             threads.append(thread)
144             std_X_arrays.append(stdout_array)
145
146             # join threads as they die
147             for thread in threads:
148                 thread.join()
149
150             # read output from reader threads
151             std_X_strings = []
152             for std_X_array in std_X_arrays:
153                 std_X_strings.append(std_X_array[0])
154
155             stdout = std_X_strings.pop(-1)
156             stderrs = std_X_strings
157             return (stdout, stderrs)
158     else:
159         assert _POSIX==True, 'invalid platform'
160         def _communicate(self, input=None):
161             read_set = []
162             write_set = []
163             read_arrays = []
164             stdout = None # Return
165             stderr = None # Return
166
167             if self._procs[0].stdin:
168                 # Flush stdio buffer.  This might block, if the user has
169                 # been writing to .stdin in an uncontrolled fashion.
170                 self._procs[0].stdin.flush()
171                 if input:
172                     write_set.append(self._procs[0].stdin)
173                 else:
174                     self._procs[0].stdin.close()
175             for proc in self._procs:
176                 read_set.append(proc.stderr)
177                 read_arrays.append([])
178             read_set.append(self._procs[-1].stdout)
179             read_arrays.append([])
180
181             input_offset = 0
182             while read_set or write_set:
183                 try:
184                     rlist, wlist, xlist = select.select(read_set, write_set, [])
185                 except select.error, e:
186                     if e.args[0] == errno.EINTR:
187                         continue
188                     raise
189                 if self._procs[0].stdin in wlist:
190                     # When select has indicated that the file is writable,
191                     # we can write up to PIPE_BUF bytes without risk
192                     # blocking.  POSIX defines PIPE_BUF >= 512
193                     chunk = input[input_offset : input_offset + 512]
194                     bytes_written = os.write(self.stdin.fileno(), chunk)
195                     input_offset += bytes_written
196                     if input_offset >= len(input):
197                         self._procs[0].stdin.close()
198                         write_set.remove(self._procs[0].stdin)
199                 if self._procs[-1].stdout in rlist:
200                     data = os.read(self._procs[-1].stdout.fileno(), 1024)
201                     if data == '':
202                         self._procs[-1].stdout.close()
203                         read_set.remove(self._procs[-1].stdout)
204                     read_arrays[-1].append(data)
205                 for i,proc in enumerate(self._procs):
206                     if proc.stderr in rlist:
207                         data = os.read(proc.stderr.fileno(), 1024)
208                         if data == '':
209                             proc.stderr.close()
210                             read_set.remove(proc.stderr)
211                         read_arrays[i].append(data)
212
213             # All data exchanged.  Translate lists into strings.
214             read_strings = []
215             for read_array in read_arrays:
216                 read_strings.append(''.join(read_array))
217
218             stdout = read_strings.pop(-1)
219             stderrs = read_strings
220             return (stdout, stderrs)
221
222 if libbe.TESTING == True:
223     suite = doctest.DocTestSuite()