Rewrite ebuild-ipc timeout handling to use forks.
authorZac Medico <zmedico@gentoo.org>
Tue, 21 Sep 2010 01:06:56 +0000 (18:06 -0700)
committerZac Medico <zmedico@gentoo.org>
Tue, 21 Sep 2010 01:06:56 +0000 (18:06 -0700)
Use forks so that the child process can handle blocking IO
un-interrupted, while the parent handles all timeout
considerations. This helps to avoid possible race conditions
from interference between timeouts and blocking IO operations.

bin/ebuild-ipc.py

index e353dc4dff2049332f42ddd35ceda7c7b14ad25b..ca206e9597b94709cc005824be19c2330c480c76 100755 (executable)
@@ -80,84 +80,67 @@ class EbuildIpc(object):
                        'ebuild-ipc: daemon process not detected\n'),
                        level=logging.ERROR, noiselevel=-1)
 
-       def _communicate(self, args):
-
-               if not self._daemon_is_alive():
-                       self._no_daemon_msg()
-                       return 2
+       def _wait(self, pid, msg):
+               """
+               Wait on pid and return an appropriate exit code. This
+               may return unsuccessfully due to timeout if the daemon
+               process does not appear to be alive.
+               """
 
                start_time = time.time()
-
-               # File streams are in unbuffered mode since we do atomic
-               # read and write of whole pickles.
-               output_file = None
+               wait_retval = None
 
                while True:
                        try:
                                try:
                                        portage.exception.AlarmSignal.register(
                                                self._COMMUNICATE_RETRY_TIMEOUT_SECONDS)
-
-                                       if output_file is not None:
-                                               output_file.close()
-                                               output_file = None
-
-                                       output_file = open(self.ipc_in_fifo, 'wb', 0)
-
-                                       # Write the whole pickle in a single atomic write() call,
-                                       # since the reader is in non-blocking mode and we want
-                                       # it to get the whole pickle at once.
-                                       output_file.write(pickle.dumps(args))
-                                       output_file.close()
+                                       wait_retval = os.waitpid(pid, 0)
                                        break
                                finally:
                                        portage.exception.AlarmSignal.unregister()
+                       except OSError as e:
+                               # waitpid() raised an exception
+                               portage.util.writemsg_level(
+                                       "ebuild-ipc: %s: %s\n" % (msg, e),
+                                       level=logging.ERROR, noiselevel=-1)
+                               return 2
                        except portage.exception.AlarmSignal:
-                               if self._daemon_is_alive():
-                                       self._timeout_retry_msg(start_time,
-                                               portage.localization._('during write'))
+                               if wait_retval is not None:
+                                       break
+                               elif self._daemon_is_alive():
+                                       self._timeout_retry_msg(start_time, msg)
                                else:
                                        self._no_daemon_msg()
                                        return 2
 
-               input_file = None
-               buf = array.array('B')
+               if not (os.WIFEXITED(wait_retval[1]) and \
+                       os.WEXITSTATUS(wait_retval[1]) == os.EX_OK):
+                       portage.util.writemsg_level(
+                               "ebuild-ipc: %s: %s\n" % (msg,
+                               portage.localization._('subprocess failure: %s') % \
+                               wait_retval[1]),
+                               level=logging.ERROR, noiselevel=-1)
+                       return 2
 
-               start_time = time.time()
-               while True:
-                       try:
-                               try:
-                                       portage.exception.AlarmSignal.register(
-                                               self._COMMUNICATE_RETRY_TIMEOUT_SECONDS)
+               return os.EX_OK
 
-                                       if input_file is None:
-                                               input_file = open(self.ipc_out_fifo, 'rb', 0)
-
-                                       # Read the whole pickle in a single atomic read() call.
-                                       try:
-                                               buf.fromfile(input_file, self._BUFSIZE)
-                                       except (EOFError, IOError) as e:
-                                               if not buf:
-                                                       portage.util.writemsg_level(
-                                                               "ebuild-ipc: %s\n" % (e,),
-                                                               level=logging.ERROR, noiselevel=-1)
-                                       break
-                               finally:
-                                       portage.exception.AlarmSignal.unregister()
-                       except portage.exception.AlarmSignal:
-                               if buf:
-                                       break
-                               elif self._daemon_is_alive():
-                                       self._timeout_retry_msg(start_time,
-                                               portage.localization._('during read'))
-                               else:
-                                       self._no_daemon_msg()
-                                       return 2
+       def _receive_reply(self):
 
-               if input_file is not None:
-                       input_file.close()
+               # File streams are in unbuffered mode since we do atomic
+               # read and write of whole pickles.
+               input_file = open(self.ipc_out_fifo, 'rb', 0)
+               buf = array.array('B')
 
-               rval = 2
+               try:
+                       buf.fromfile(input_file, self._BUFSIZE)
+               except (EOFError, IOError) as e:
+                       if not buf:
+                               portage.util.writemsg_level(
+                                       "ebuild-ipc: %s\n" % (e,),
+                                       level=logging.ERROR, noiselevel=-1)
+
+               retval = 2
 
                if not buf:
 
@@ -170,15 +153,18 @@ class EbuildIpc(object):
 
                        try:
                                reply = pickle.loads(buf.tostring())
-                       except (EnvironmentError, EOFError, ValueError,
-                               pickle.UnpicklingError) as e:
+                       except SystemExit:
+                               raise
+                       except Exception as e:
+                               # The pickle module can raise practically
+                               # any exception when given corrupt data.
                                portage.util.writemsg_level(
                                        "ebuild-ipc: %s\n" % (e,),
                                        level=logging.ERROR, noiselevel=-1)
 
                        else:
 
-                               (out, err, rval) = reply
+                               (out, err, retval) = reply
 
                                if out:
                                        portage.util.writemsg_stdout(out, noiselevel=-1)
@@ -186,7 +172,44 @@ class EbuildIpc(object):
                                if err:
                                        portage.util.writemsg(err, noiselevel=-1)
 
-               return rval
+               return retval
+
+       def _communicate(self, args):
+
+               if not self._daemon_is_alive():
+                       self._no_daemon_msg()
+                       return 2
+
+               # Use forks so that the child process can handle blocking IO
+               # un-interrupted, while the parent handles all timeout
+               # considerations. This helps to avoid possible race conditions
+               # from interference between timeouts and blocking IO operations.
+               pid = os.fork()
+
+               if pid == 0:
+
+                       # File streams are in unbuffered mode since we do atomic
+                       # read and write of whole pickles.
+                       output_file = open(self.ipc_in_fifo, 'wb', 0)
+                       output_file.write(pickle.dumps(args))
+                       output_file.close()
+                       os._exit(os.EX_OK)
+
+               retval = self._wait(pid, portage.localization._('during write'))
+               if retval != os.EX_OK:
+                       return retval
+
+               if not self._daemon_is_alive():
+                       self._no_daemon_msg()
+                       return 2
+
+               pid = os.fork()
+
+               if pid == 0:
+                       retval = self._receive_reply()
+                       os._exit(retval)
+
+               return self._wait(pid, portage.localization._('during read'))
 
 def ebuild_ipc_main(args):
        ebuild_ipc = EbuildIpc()