Broke subprocess handling out into its own submodule libbe.subproc.
authorW. Trevor King <wking@drexel.edu>
Fri, 20 Nov 2009 15:48:36 +0000 (10:48 -0500)
committerW. Trevor King <wking@drexel.edu>
Fri, 20 Nov 2009 15:48:36 +0000 (10:48 -0500)
libbe/bzr.py
libbe/git.py
libbe/subproc.py [new file with mode: 0644]
libbe/vcs.py
release.py
update_copyright.py

index 2cf1cba19709055c791c8d02cbf74f0a619df0b5..281493dff847bf12b9b0d811478a043bd465989d 100644 (file)
@@ -90,7 +90,7 @@ class Bzr(vcs.VCS):
                 if self._u_any_in_string(strings, error) == True:
                     raise vcs.EmptyCommit()
                 else:
-                    raise vcs.CommandError(args, status, stdout="", stderr=error)
+                    raise vcs.CommandError(args, status, stderr=error)
         revision = None
         revline = re.compile("Committed revision (.*)[.]")
         match = revline.search(error)
index cb4436a0d908e683aa9e4cdcf04e39850967b8cc..55556defd7704d56143a7c0ff8a25b327a5ab340 100644 (file)
@@ -134,7 +134,7 @@ class Git(vcs.VCS):
         if status == 128:
             if error.startswith("fatal: ambiguous argument 'HEAD': unknown "):
                 return None
-            raise vcs.CommandError(args, status, stdout="", stderr=error)
+            raise vcs.CommandError(args, status, stderr=error)
         commits = output.splitlines()
         try:
             return commits[index]
diff --git a/libbe/subproc.py b/libbe/subproc.py
new file mode 100644 (file)
index 0000000..13afcf8
--- /dev/null
@@ -0,0 +1,216 @@
+# Copyright (C) 2009 W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Functions for running external commands in subprocesses.
+"""
+
+from subprocess import Popen, PIPE
+import sys
+import doctest
+
+from encoding import get_encoding
+
+_MSWINDOWS = sys.platform == 'win32'
+_POSIX = not _MSWINDOWS
+
+class CommandError(Exception):
+    def __init__(self, command, status, stdout=None, stderr=None):
+        strerror = ['Command failed (%d):\n  %s\n' % (status, stderr),
+                    'while executing\n  %s' % command]
+        Exception.__init__(self, '\n'.join(strerror))
+        self.command = command
+        self.status = status
+        self.stdout = stdout
+        self.stderr = stderr
+
+def invoke(args, stdin=None, stdout=PIPE, stderr=PIPE, expect=(0,),
+           cwd=None, unicode_output=True, verbose=False, encoding=None):
+    """
+    expect should be a tuple of allowed exit codes.  cwd should be
+    the directory from which the command will be executed.  When
+    unicode_output == True, convert stdout and stdin strings to
+    unicode before returing them.
+    """
+    if cwd == None:
+        cwd = '.'
+    if verbose == True:
+        print >> sys.stderr, '%s$ %s' % (cwd, ' '.join(args))
+    try :
+        if _POSIX:
+            q = Popen(args, stdin=PIPE, stdout=stdout, stderr=stderr, cwd=cwd)
+        else:
+            assert _MSWINDOWS==True, 'invalid platform'
+            # win32 don't have os.execvp() so have to run command in a shell
+            q = Popen(args, stdin=PIPE, stdout=stdout, stderr=stderr, 
+                      shell=True, cwd=cwd)
+    except OSError, e:
+        raise CommandError(args, status=e.args[0], stderr=e)
+    stdout,stderr = q.communicate(input=stdin)
+    status = q.wait()
+    if unicode_output == True:
+        if encoding == None:
+            encoding = get_encoding()
+        if stdout != None:
+            stdout = unicode(stdout, encoding)
+        if stderr != None:
+            stderr = unicode(stderr, encoding)
+    if verbose == True:
+        print >> sys.stderr, '%d\n%s%s' % (status, stdout, stderr)
+    if status not in expect:
+        raise CommandError(args, status, stdout, stderr)
+    return status, stdout, stderr
+
+class Pipe (object):
+    """
+    Simple interface for executing POSIX-style pipes based on the
+    subprocess module.  The only complication is the adaptation of
+    subprocess.Popen._comminucate to listen to the stderrs of all
+    processes involved in the pipe, as well as the terminal process'
+    stdout.  There are two implementations of Pipe._communicate, one
+    for MS Windows, and one for POSIX systems.  The MS Windows
+    implementation is currently untested.
+
+    >>> p = Pipe([['find', '/etc/'], ['grep', '^/etc/ssh$']])
+    >>> p.stdout
+    '/etc/ssh\\n'
+    >>> p.status
+    1
+    >>> p.statuses
+    [1, 0]
+    >>> p.stderrs # doctest: +ELLIPSIS
+    ["find: `...': Permission denied\\n...", '']
+    """
+    def __init__(self, cmds, stdin=None):
+        # spawn processes
+        self._procs = []
+        for cmd in cmds:
+            if len(self._procs) != 0:
+                stdin = self._procs[-1].stdout
+            self._procs.append(Popen(cmd, stdin=stdin, stdout=PIPE, stderr=PIPE))
+
+        self.stdout,self.stderrs = self._communicate(input=None)
+
+        # collect process statuses
+        self.statuses = []
+        self.status = 0
+        for proc in self._procs:
+            self.statuses.append(proc.wait())
+            if self.statuses[-1] != 0:
+                self.status = self.statuses[-1]
+
+    # Code excerpted from subprocess.Popen._communicate()
+    if _MSWINDOWS == True:
+        def _communicate(self, input=None):
+            assert input == None, 'stdin != None not yet supported'
+            # listen to each process' stderr
+            threads = []
+            std_X_arrays = []
+            for proc in self._procs:
+                stderr_array = []
+                thread = Thread(target=proc._readerthread,
+                                args=(proc.stderr, stderr_array))
+                thread.setDaemon(True)
+                thread.start()
+                threads.append(thread)
+                std_X_arrays.append(stderr_array)
+    
+            # also listen to the last processes stdout
+            stdout_array = []
+            thread = Thread(target=proc._readerthread,
+                            args=(proc.stdout, stdout_array))
+            thread.setDaemon(True)
+            thread.start()
+            threads.append(thread)
+            std_X_arrays.append(stdout_array)
+    
+            # join threads as they die
+            for thread in threads:
+                thread.join()
+    
+            # read output from reader threads
+            std_X_strings = []
+            for std_X_array in std_X_arrays:
+                std_X_strings.append(std_X_array[0])
+
+            stdout = std_X_strings.pop(-1)
+            stderrs = std_X_strings
+            return (stdout, stderrs)
+    else:
+        assert _POSIX==True, 'invalid platform'
+        def _communicate(self, input=None):
+            read_set = []
+            write_set = []
+            read_arrays = []
+            stdout = None # Return
+            stderr = None # Return
+
+            if self._procs[0].stdin:
+                # Flush stdio buffer.  This might block, if the user has
+                # been writing to .stdin in an uncontrolled fashion.
+                self._procs[0].stdin.flush()
+                if input:
+                    write_set.append(self._procs[0].stdin)
+                else:
+                    self._procs[0].stdin.close()
+            for proc in self._procs:
+                read_set.append(proc.stderr)
+                read_arrays.append([])
+            read_set.append(self._procs[-1].stdout)
+            read_arrays.append([])
+
+            input_offset = 0
+            while read_set or write_set:
+                try:
+                    rlist, wlist, xlist = select.select(read_set, write_set, [])
+                except select.error, e:
+                    if e.args[0] == errno.EINTR:
+                        continue
+                    raise
+                if self._procs[0].stdin in wlist:
+                    # When select has indicated that the file is writable,
+                    # we can write up to PIPE_BUF bytes without risk
+                    # blocking.  POSIX defines PIPE_BUF >= 512
+                    chunk = input[input_offset : input_offset + 512]
+                    bytes_written = os.write(self.stdin.fileno(), chunk)
+                    input_offset += bytes_written
+                    if input_offset >= len(input):
+                        self._procs[0].stdin.close()
+                        write_set.remove(self._procs[0].stdin)
+                if self._procs[-1].stdout in rlist:
+                    data = os.read(self._procs[-1].stdout.fileno(), 1024)
+                    if data == '':
+                        self._procs[-1].stdout.close()
+                        read_set.remove(self._procs[-1].stdout)
+                    read_arrays[-1].append(data)
+                for i,proc in enumerate(self._procs):
+                    if proc.stderr in rlist:
+                        data = os.read(proc.stderr.fileno(), 1024)
+                        if data == '':
+                            proc.stderr.close()
+                            read_set.remove(proc.stderr)
+                        read_arrays[i].append(data)
+
+            # All data exchanged.  Translate lists into strings.
+            read_strings = []
+            for read_array in read_arrays:
+                read_strings.append(''.join(read_array))
+
+            stdout = read_strings.pop(-1)
+            stderrs = read_strings
+            return (stdout, stderrs)
+
+suite = doctest.DocTestSuite()
index 1ac5dd9898a3dc9e2de0ce32b955da71a9da25bf..be2884684fe2755095d1f76e892fb5c2ff657f1c 100644 (file)
@@ -25,7 +25,6 @@ subclassed by other Version Control System backends.  The base class
 implements a "do not version" VCS.
 """
 
-from subprocess import Popen, PIPE
 import codecs
 import os
 import os.path
@@ -38,6 +37,7 @@ import unittest
 import doctest
 
 from utility import Dir, search_parent_directories
+from subproc import CommandError, invoke
 
 
 def _get_matching_vcs(matchfn):
@@ -67,15 +67,6 @@ def installed_vcs():
     return _get_matching_vcs(lambda vcs: vcs.installed())
 
 
-class CommandError(Exception):
-    def __init__(self, command, status, stdout, stderr):
-        strerror = ["Command failed (%d):\n  %s\n" % (status, stderr),
-                    "while executing\n  %s" % command]
-        Exception.__init__(self, "\n".join(strerror))
-        self.command = command
-        self.status = status
-        self.stdout = stdout
-        self.stderr = stderr
 
 class SettingIDnotSupported(NotImplementedError):
     pass
@@ -457,37 +448,14 @@ class VCS(object):
             if list_string in string:
                 return True
         return False
-    def _u_invoke(self, args, stdin=None, expect=(0,), cwd=None,
-                  unicode_output=True):
-        """
-        expect should be a tuple of allowed exit codes.  cwd should be
-        the directory from which the command will be executed.  When
-        unicode_output == True, convert stdout and stdin strings to
-        unicode before returing them.
-        """
-        if cwd == None:
-            cwd = self.rootdir
-        if self.verboseInvoke == True:
-            print >> sys.stderr, "%s$ %s" % (cwd, " ".join(args))
-        try :
-            if sys.platform != "win32":
-                q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=cwd)
-            else:
-                # win32 don't have os.execvp() so have to run command in a shell
-                q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, 
-                          shell=True, cwd=cwd)
-        except OSError, e :
-            raise CommandError(args, status=e.args[0], stdout="", stderr=e)
-        stdout,stderr = q.communicate(input=stdin)
-        status = q.wait()
-        if unicode_output == True:
-            stdout = unicode(stdout, self.encoding)
-            stderr = unicode(stderr, self.encoding)
-        if self.verboseInvoke == True:
-            print >> sys.stderr, "%d\n%s%s" % (status, stdout, stderr)
-        if status not in expect:
-            raise CommandError(args, status, stdout, stderr)
-        return status, stdout, stderr
+    def _u_invoke(self, *args, **kwargs):
+        if 'cwd' not in kwargs:
+            kwargs['cwd'] = self.rootdir
+        if 'verbose' not in kwargs:
+            kwargs['verbose'] = self.verboseInvoke
+        if 'encoding' not in kwargs:
+            kwargs['encoding'] = self.encoding
+        return invoke(*args, **kwargs)
     def _u_invoke_client(self, *args, **kwargs):
         cl_args = [self.client]
         cl_args.extend(args)
index 996e3631fa0d95680dc884cd4a31a6948953851e..2e756874760e3d239a6b48e1972a10f021ae5058 100755 (executable)
@@ -20,10 +20,10 @@ import os
 import os.path
 import shutil
 import string
-from subprocess import Popen
 import sys
 
-from update_copyright import Pipe, update_authors, update_files
+from libbe.subproc import Pipe, invoke
+from update_copyright import update_authors, update_files
 
 def validate_tag(tag):
     """
@@ -93,8 +93,8 @@ def make_version():
 
 def make_changelog(filename, tag):
     print 'generate ChangeLog file', filename, 'up to tag', tag
-    p = Popen(['bzr', 'log', '--gnu-changelog', '-n1', '-r',
-               '..tag:%s' % tag], stdout=file(filename, 'w'))
+    p = invoke(['bzr', 'log', '--gnu-changelog', '-n1', '-r',
+                '..tag:%s' % tag], stdout=file(filename, 'w'))
     status = p.wait()
     assert status == 0, status
 
index 6cdaa2fb88bfebdadd70af023e288a17a38cfa21..5ca54552042c23dd425331b92051312068522e86 100755 (executable)
@@ -24,9 +24,10 @@ import time
 import os
 import sys
 import select
-from subprocess import Popen, PIPE, mswindows
 from threading import Thread
 
+from libbe.subproc import Pipe
+
 COPYRIGHT_TEXT="""#
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -71,144 +72,6 @@ EXCLUDES = [
 IGNORED_PATHS = ['./.be/', './.bzr/', './build/']
 IGNORED_FILES = ['COPYING', 'update_copyright.py', 'catmutt']
 
-class Pipe (object):
-    """
-    Simple interface for executing POSIX-style pipes based on the
-    subprocess module.  The only complication is the adaptation of
-    subprocess.Popen._comminucate to listen to the stderrs of all
-    processes involved in the pipe, as well as the terminal process'
-    stdout.  There are two implementations of Pipe._communicate, one
-    for MS Windows, and one for POSIX systems.  The MS Windows
-    implementation is currently untested.
-
-    >>> p = Pipe([['find', '/etc/'], ['grep', '^/etc/ssh$']])
-    >>> p.stdout
-    '/etc/ssh\\n'
-    >>> p.status
-    1
-    >>> p.statuses
-    [1, 0]
-    >>> p.stderrs # doctest: +ELLIPSIS
-    ["find: `...': Permission denied\\n...", '']
-    """
-    def __init__(self, cmds, stdin=None):
-        # spawn processes
-        self._procs = []
-        for cmd in cmds:
-            if len(self._procs) != 0:
-                stdin = self._procs[-1].stdout
-            self._procs.append(Popen(cmd, stdin=stdin, stdout=PIPE, stderr=PIPE))
-
-        self.stdout,self.stderrs = self._communicate(input=None)
-
-        # collect process statuses
-        self.statuses = []
-        self.status = 0
-        for proc in self._procs:
-            self.statuses.append(proc.wait())
-            if self.statuses[-1] != 0:
-                self.status = self.statuses[-1]
-
-    # Code excerpted from subprocess.Popen._communicate()
-    if mswindows == True:
-        def _communicate(self, input=None):
-            assert input == None, "stdin != None not yet supported"
-            # listen to each process' stderr
-            threads = []
-            std_X_arrays = []
-            for proc in self._procs:
-                stderr_array = []
-                thread = Thread(target=proc._readerthread,
-                                args=(proc.stderr, stderr_array))
-                thread.setDaemon(True)
-                thread.start()
-                threads.append(thread)
-                std_X_arrays.append(stderr_array)
-    
-            # also listen to the last processes stdout
-            stdout_array = []
-            thread = Thread(target=proc._readerthread,
-                            args=(proc.stdout, stdout_array))
-            thread.setDaemon(True)
-            thread.start()
-            threads.append(thread)
-            std_X_arrays.append(stdout_array)
-    
-            # join threads as they die
-            for thread in threads:
-                thread.join()
-    
-            # read output from reader threads
-            std_X_strings = []
-            for std_X_array in std_X_arrays:
-                std_X_strings.append(std_X_array[0])
-
-            stdout = std_X_strings.pop(-1)
-            stderrs = std_X_strings
-            return (stdout, stderrs)
-    else: # POSIX
-        def _communicate(self, input=None):
-            read_set = []
-            write_set = []
-            read_arrays = []
-            stdout = None # Return
-            stderr = None # Return
-
-            if self._procs[0].stdin:
-                # Flush stdio buffer.  This might block, if the user has
-                # been writing to .stdin in an uncontrolled fashion.
-                self._procs[0].stdin.flush()
-                if input:
-                    write_set.append(self._procs[0].stdin)
-                else:
-                    self._procs[0].stdin.close()
-            for proc in self._procs:
-                read_set.append(proc.stderr)
-                read_arrays.append([])
-            read_set.append(self._procs[-1].stdout)
-            read_arrays.append([])
-
-            input_offset = 0
-            while read_set or write_set:
-                try:
-                    rlist, wlist, xlist = select.select(read_set, write_set, [])
-                except select.error, e:
-                    if e.args[0] == errno.EINTR:
-                        continue
-                    raise
-                if self._procs[0].stdin in wlist:
-                    # When select has indicated that the file is writable,
-                    # we can write up to PIPE_BUF bytes without risk
-                    # blocking.  POSIX defines PIPE_BUF >= 512
-                    chunk = input[input_offset : input_offset + 512]
-                    bytes_written = os.write(self.stdin.fileno(), chunk)
-                    input_offset += bytes_written
-                    if input_offset >= len(input):
-                        self._procs[0].stdin.close()
-                        write_set.remove(self._procs[0].stdin)
-                if self._procs[-1].stdout in rlist:
-                    data = os.read(self._procs[-1].stdout.fileno(), 1024)
-                    if data == "":
-                        self._procs[-1].stdout.close()
-                        read_set.remove(self._procs[-1].stdout)
-                    read_arrays[-1].append(data)
-                for i,proc in enumerate(self._procs):
-                    if proc.stderr in rlist:
-                        data = os.read(proc.stderr.fileno(), 1024)
-                        if data == "":
-                            proc.stderr.close()
-                            read_set.remove(proc.stderr)
-                        read_arrays[i].append(data)
-
-            # All data exchanged.  Translate lists into strings.
-            read_strings = []
-            for read_array in read_arrays:
-                read_strings.append(''.join(read_array))
-
-            stdout = read_strings.pop(-1)
-            stderrs = read_strings
-            return (stdout, stderrs)
-
 def _strip_email(*args):
     """
     >>> _strip_email('J Doe <jdoe@a.com>')