raise CommandError(list_args, status, stdout, stderr)
return status, stdout, stderr
-class Pipe (object):
- """Simple interface for executing POSIX-style pipes.
-
- Based on the `subprocess` module. The only complication is the
- adaptation of `subprocess.Popen._communicate` to listen to the
- stderrs of all processes involved in the pipe, as well as the
- terminal process' stdout. There are two implementations of
- `Pipe._communicate`, one for MS Windows, and one for POSIX
- systems. The MS Windows implementation is currently untested.
-
- >>> p = Pipe([['find', '/etc/'], ['grep', '^/etc/ssh$']])
- >>> p.stdout
- '/etc/ssh\\n'
- >>> p.status
- 1
- >>> p.statuses
- [1, 0]
- >>> p.stderrs # doctest: +ELLIPSIS
- [...find: ...: Permission denied..., '']
- """
- def __init__(self, cmds, stdin=None):
- # spawn processes
- self._procs = []
- for cmd in cmds:
- if len(self._procs) != 0:
- stdin = self._procs[-1].stdout
- self._procs.append(Popen(cmd, stdin=stdin, stdout=PIPE, stderr=PIPE))
-
- self.stdout,self.stderrs = self._communicate(input=None)
-
- # collect process statuses
- self.statuses = []
- self.status = 0
- for proc in self._procs:
- self.statuses.append(proc.wait())
- if self.statuses[-1] != 0:
- self.status = self.statuses[-1]
-
- # Code excerpted from subprocess.Popen._communicate()
- if _MSWINDOWS == True:
- def _communicate(self, input=None):
- assert input == None, 'stdin != None not yet supported'
- # listen to each process' stderr
- threads = []
- std_X_arrays = []
- for proc in self._procs:
- stderr_array = []
- thread = Thread(target=proc._readerthread,
- args=(proc.stderr, stderr_array))
- thread.setDaemon(True)
- thread.start()
- threads.append(thread)
- std_X_arrays.append(stderr_array)
-
- # also listen to the last processes stdout
- stdout_array = []
- thread = Thread(target=proc._readerthread,
- args=(proc.stdout, stdout_array))
- thread.setDaemon(True)
- thread.start()
- threads.append(thread)
- std_X_arrays.append(stdout_array)
-
- # join threads as they die
- for thread in threads:
- thread.join()
-
- # read output from reader threads
- std_X_strings = []
- for std_X_array in std_X_arrays:
- std_X_strings.append(std_X_array[0])
-
- stdout = std_X_strings.pop(-1)
- stderrs = std_X_strings
- return (stdout, stderrs)
- else:
- assert _POSIX==True, 'invalid platform'
- def _communicate(self, input=None):
- read_set = []
- write_set = []
- read_arrays = []
- stdout = None # Return
- stderr = None # Return
-
- if self._procs[0].stdin:
- # Flush stdio buffer. This might block, if the user has
- # been writing to .stdin in an uncontrolled fashion.
- self._procs[0].stdin.flush()
- if input:
- write_set.append(self._procs[0].stdin)
- else:
- self._procs[0].stdin.close()
- for proc in self._procs:
- read_set.append(proc.stderr)
- read_arrays.append([])
- read_set.append(self._procs[-1].stdout)
- read_arrays.append([])
-
- input_offset = 0
- while read_set or write_set:
- try:
- rlist, wlist, xlist = select.select(read_set, write_set, [])
- except select.error, e:
- if e.args[0] == errno.EINTR:
- continue
- raise
- if self._procs[0].stdin in wlist:
- # When select has indicated that the file is writable,
- # we can write up to PIPE_BUF bytes without risk
- # blocking. POSIX defines PIPE_BUF >= 512
- chunk = input[input_offset : input_offset + 512]
- bytes_written = os.write(self.stdin.fileno(), chunk)
- input_offset += bytes_written
- if input_offset >= len(input):
- self._procs[0].stdin.close()
- write_set.remove(self._procs[0].stdin)
- if self._procs[-1].stdout in rlist:
- data = os.read(self._procs[-1].stdout.fileno(), 1024)
- if data == '':
- self._procs[-1].stdout.close()
- read_set.remove(self._procs[-1].stdout)
- read_arrays[-1].append(data)
- for i,proc in enumerate(self._procs):
- if proc.stderr in rlist:
- data = os.read(proc.stderr.fileno(), 1024)
- if data == '':
- proc.stderr.close()
- read_set.remove(proc.stderr)
- read_arrays[i].append(data)
-
- # All data exchanged. Translate lists into strings.
- read_strings = []
- for read_array in read_arrays:
- read_strings.append(''.join(read_array))
-
- stdout = read_strings.pop(-1)
- stderrs = read_strings
- return (stdout, stderrs)
-
if libbe.TESTING == True:
suite = doctest.DocTestSuite()