From 32cef15206e24c81da75a87626c8ae5192800a03 Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Mon, 20 Sep 2010 18:06:56 -0700 Subject: [PATCH] Rewrite ebuild-ipc timeout handling to use forks. Use forks so that the child process can handle blocking IO un-interrupted, while the parent handles all timeout considerations. This helps to avoid possible race conditions from interference between timeouts and blocking IO operations. --- bin/ebuild-ipc.py | 147 +++++++++++++++++++++++++++------------------- 1 file changed, 85 insertions(+), 62 deletions(-) diff --git a/bin/ebuild-ipc.py b/bin/ebuild-ipc.py index e353dc4df..ca206e959 100755 --- a/bin/ebuild-ipc.py +++ b/bin/ebuild-ipc.py @@ -80,84 +80,67 @@ class EbuildIpc(object): 'ebuild-ipc: daemon process not detected\n'), level=logging.ERROR, noiselevel=-1) - def _communicate(self, args): - - if not self._daemon_is_alive(): - self._no_daemon_msg() - return 2 + def _wait(self, pid, msg): + """ + Wait on pid and return an appropriate exit code. This + may return unsuccessfully due to timeout if the daemon + process does not appear to be alive. + """ start_time = time.time() - - # File streams are in unbuffered mode since we do atomic - # read and write of whole pickles. - output_file = None + wait_retval = None while True: try: try: portage.exception.AlarmSignal.register( self._COMMUNICATE_RETRY_TIMEOUT_SECONDS) - - if output_file is not None: - output_file.close() - output_file = None - - output_file = open(self.ipc_in_fifo, 'wb', 0) - - # Write the whole pickle in a single atomic write() call, - # since the reader is in non-blocking mode and we want - # it to get the whole pickle at once. - output_file.write(pickle.dumps(args)) - output_file.close() + wait_retval = os.waitpid(pid, 0) break finally: portage.exception.AlarmSignal.unregister() + except OSError as e: + # waitpid() raised an exception + portage.util.writemsg_level( + "ebuild-ipc: %s: %s\n" % (msg, e), + level=logging.ERROR, noiselevel=-1) + return 2 except portage.exception.AlarmSignal: - if self._daemon_is_alive(): - self._timeout_retry_msg(start_time, - portage.localization._('during write')) + if wait_retval is not None: + break + elif self._daemon_is_alive(): + self._timeout_retry_msg(start_time, msg) else: self._no_daemon_msg() return 2 - input_file = None - buf = array.array('B') + if not (os.WIFEXITED(wait_retval[1]) and \ + os.WEXITSTATUS(wait_retval[1]) == os.EX_OK): + portage.util.writemsg_level( + "ebuild-ipc: %s: %s\n" % (msg, + portage.localization._('subprocess failure: %s') % \ + wait_retval[1]), + level=logging.ERROR, noiselevel=-1) + return 2 - start_time = time.time() - while True: - try: - try: - portage.exception.AlarmSignal.register( - self._COMMUNICATE_RETRY_TIMEOUT_SECONDS) + return os.EX_OK - if input_file is None: - input_file = open(self.ipc_out_fifo, 'rb', 0) - - # Read the whole pickle in a single atomic read() call. - try: - buf.fromfile(input_file, self._BUFSIZE) - except (EOFError, IOError) as e: - if not buf: - portage.util.writemsg_level( - "ebuild-ipc: %s\n" % (e,), - level=logging.ERROR, noiselevel=-1) - break - finally: - portage.exception.AlarmSignal.unregister() - except portage.exception.AlarmSignal: - if buf: - break - elif self._daemon_is_alive(): - self._timeout_retry_msg(start_time, - portage.localization._('during read')) - else: - self._no_daemon_msg() - return 2 + def _receive_reply(self): - if input_file is not None: - input_file.close() + # File streams are in unbuffered mode since we do atomic + # read and write of whole pickles. + input_file = open(self.ipc_out_fifo, 'rb', 0) + buf = array.array('B') - rval = 2 + try: + buf.fromfile(input_file, self._BUFSIZE) + except (EOFError, IOError) as e: + if not buf: + portage.util.writemsg_level( + "ebuild-ipc: %s\n" % (e,), + level=logging.ERROR, noiselevel=-1) + + retval = 2 if not buf: @@ -170,15 +153,18 @@ class EbuildIpc(object): try: reply = pickle.loads(buf.tostring()) - except (EnvironmentError, EOFError, ValueError, - pickle.UnpicklingError) as e: + except SystemExit: + raise + except Exception as e: + # The pickle module can raise practically + # any exception when given corrupt data. portage.util.writemsg_level( "ebuild-ipc: %s\n" % (e,), level=logging.ERROR, noiselevel=-1) else: - (out, err, rval) = reply + (out, err, retval) = reply if out: portage.util.writemsg_stdout(out, noiselevel=-1) @@ -186,7 +172,44 @@ class EbuildIpc(object): if err: portage.util.writemsg(err, noiselevel=-1) - return rval + return retval + + def _communicate(self, args): + + if not self._daemon_is_alive(): + self._no_daemon_msg() + return 2 + + # Use forks so that the child process can handle blocking IO + # un-interrupted, while the parent handles all timeout + # considerations. This helps to avoid possible race conditions + # from interference between timeouts and blocking IO operations. + pid = os.fork() + + if pid == 0: + + # File streams are in unbuffered mode since we do atomic + # read and write of whole pickles. + output_file = open(self.ipc_in_fifo, 'wb', 0) + output_file.write(pickle.dumps(args)) + output_file.close() + os._exit(os.EX_OK) + + retval = self._wait(pid, portage.localization._('during write')) + if retval != os.EX_OK: + return retval + + if not self._daemon_is_alive(): + self._no_daemon_msg() + return 2 + + pid = os.fork() + + if pid == 0: + retval = self._receive_reply() + os._exit(retval) + + return self._wait(pid, portage.localization._('during read')) def ebuild_ipc_main(args): ebuild_ipc = EbuildIpc() -- 2.26.2