Broke subprocess handling out into its own submodule libbe.subproc.
authorW. Trevor King <wking@drexel.edu>
Fri, 20 Nov 2009 15:48:36 +0000 (10:48 -0500)
committerW. Trevor King <wking@drexel.edu>
Thu, 9 Feb 2012 18:04:00 +0000 (13:04 -0500)
update_copyright.py

index 0a3a281bd878cd543a1fa0834628d9dd5c491cd9..5270ac521faef194a5b32eef0b208685519ecc86 100755 (executable)
@@ -24,9 +24,10 @@ import time
 import os
 import sys
 import select
-from subprocess import Popen, PIPE, mswindows
 from threading import Thread
 
+from libbe.subproc import Pipe
+
 COPYRIGHT_TEXT="""#
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -71,144 +72,6 @@ EXCLUDES = [
 IGNORED_PATHS = ['./.be/', './.bzr/', './build/']
 IGNORED_FILES = ['COPYING', 'update_copyright.py', 'catmutt']
 
-class Pipe (object):
-    """
-    Simple interface for executing POSIX-style pipes based on the
-    subprocess module.  The only complication is the adaptation of
-    subprocess.Popen._comminucate to listen to the stderrs of all
-    processes involved in the pipe, as well as the terminal process'
-    stdout.  There are two implementations of Pipe._communicate, one
-    for MS Windows, and one for POSIX systems.  The MS Windows
-    implementation is currently untested.
-
-    >>> p = Pipe([['find', '/etc/'], ['grep', '^/etc/ssh$']])
-    >>> p.stdout
-    '/etc/ssh\\n'
-    >>> p.status
-    1
-    >>> p.statuses
-    [1, 0]
-    >>> p.stderrs # doctest: +ELLIPSIS
-    ["find: `...': Permission denied\\n...", '']
-    """
-    def __init__(self, cmds, stdin=None):
-        # spawn processes
-        self._procs = []
-        for cmd in cmds:
-            if len(self._procs) != 0:
-                stdin = self._procs[-1].stdout
-            self._procs.append(Popen(cmd, stdin=stdin, stdout=PIPE, stderr=PIPE))
-
-        self.stdout,self.stderrs = self._communicate(input=None)
-
-        # collect process statuses
-        self.statuses = []
-        self.status = 0
-        for proc in self._procs:
-            self.statuses.append(proc.wait())
-            if self.statuses[-1] != 0:
-                self.status = self.statuses[-1]
-
-    # Code excerpted from subprocess.Popen._communicate()
-    if mswindows == True:
-        def _communicate(self, input=None):
-            assert input == None, "stdin != None not yet supported"
-            # listen to each process' stderr
-            threads = []
-            std_X_arrays = []
-            for proc in self._procs:
-                stderr_array = []
-                thread = Thread(target=proc._readerthread,
-                                args=(proc.stderr, stderr_array))
-                thread.setDaemon(True)
-                thread.start()
-                threads.append(thread)
-                std_X_arrays.append(stderr_array)
-
-            # also listen to the last processes stdout
-            stdout_array = []
-            thread = Thread(target=proc._readerthread,
-                            args=(proc.stdout, stdout_array))
-            thread.setDaemon(True)
-            thread.start()
-            threads.append(thread)
-            std_X_arrays.append(stdout_array)
-
-            # join threads as they die
-            for thread in threads:
-                thread.join()
-
-            # read output from reader threads
-            std_X_strings = []
-            for std_X_array in std_X_arrays:
-                std_X_strings.append(std_X_array[0])
-
-            stdout = std_X_strings.pop(-1)
-            stderrs = std_X_strings
-            return (stdout, stderrs)
-    else: # POSIX
-        def _communicate(self, input=None):
-            read_set = []
-            write_set = []
-            read_arrays = []
-            stdout = None # Return
-            stderr = None # Return
-
-            if self._procs[0].stdin:
-                # Flush stdio buffer.  This might block, if the user has
-                # been writing to .stdin in an uncontrolled fashion.
-                self._procs[0].stdin.flush()
-                if input:
-                    write_set.append(self._procs[0].stdin)
-                else:
-                    self._procs[0].stdin.close()
-            for proc in self._procs:
-                read_set.append(proc.stderr)
-                read_arrays.append([])
-            read_set.append(self._procs[-1].stdout)
-            read_arrays.append([])
-
-            input_offset = 0
-            while read_set or write_set:
-                try:
-                    rlist, wlist, xlist = select.select(read_set, write_set, [])
-                except select.error, e:
-                    if e.args[0] == errno.EINTR:
-                        continue
-                    raise
-                if self._procs[0].stdin in wlist:
-                    # When select has indicated that the file is writable,
-                    # we can write up to PIPE_BUF bytes without risk
-                    # blocking.  POSIX defines PIPE_BUF >= 512
-                    chunk = input[input_offset : input_offset + 512]
-                    bytes_written = os.write(self.stdin.fileno(), chunk)
-                    input_offset += bytes_written
-                    if input_offset >= len(input):
-                        self._procs[0].stdin.close()
-                        write_set.remove(self._procs[0].stdin)
-                if self._procs[-1].stdout in rlist:
-                    data = os.read(self._procs[-1].stdout.fileno(), 1024)
-                    if data == "":
-                        self._procs[-1].stdout.close()
-                        read_set.remove(self._procs[-1].stdout)
-                    read_arrays[-1].append(data)
-                for i,proc in enumerate(self._procs):
-                    if proc.stderr in rlist:
-                        data = os.read(proc.stderr.fileno(), 1024)
-                        if data == "":
-                            proc.stderr.close()
-                            read_set.remove(proc.stderr)
-                        read_arrays[i].append(data)
-
-            # All data exchanged.  Translate lists into strings.
-            read_strings = []
-            for read_array in read_arrays:
-                read_strings.append(''.join(read_array))
-
-            stdout = read_strings.pop(-1)
-            stderrs = read_strings
-            return (stdout, stderrs)
-
 def _strip_email(*args):
     """
     >>> _strip_email('J Doe <jdoe@a.com>')