ebuild-ipc: add FifoWriter class
authorZac Medico <zmedico@gentoo.org>
Fri, 4 Jan 2013 13:22:48 +0000 (05:22 -0800)
committerZac Medico <zmedico@gentoo.org>
Fri, 4 Jan 2013 13:22:48 +0000 (05:22 -0800)
bin/ebuild-ipc.py

index 4046d8d256888f288ccb7d5f256ba39ad0335bab..d351e9454442c1a1c6f5aeb11656dd4509e00356 100755 (executable)
@@ -12,7 +12,6 @@ import platform
 import signal
 import sys
 import time
-import traceback
 
 def debug_signal(signum, frame):
        import pdb
@@ -39,9 +38,20 @@ import portage
 portage._internal_caller = True
 portage._disable_legacy_globals()
 
+from portage.util._async.ForkProcess import ForkProcess
 from portage.util._eventloop.global_event_loop import global_event_loop
 from _emerge.PipeReader import PipeReader
 
+class FifoWriter(ForkProcess):
+
+       __slots__ = ('buf', 'fifo',)
+
+       def _run(self):
+               # Atomically write the whole buffer into the fifo.
+               with open(self.fifo, 'wb', 0) as f:
+                       f.write(self.buf)
+               return os.EX_OK
+
 class EbuildIpc(object):
 
        # Timeout for each individual communication attempt (we retry
@@ -90,7 +100,7 @@ class EbuildIpc(object):
                        'ebuild-ipc: daemon process not detected\n'),
                        level=logging.ERROR, noiselevel=-1)
 
-       def _wait(self, pid, pr, msg):
+       def _run_writer(self, fifo_writer, msg):
                """
                Wait on pid and return an appropriate exit code. This
                may return unsuccessfully due to timeout if the daemon
@@ -99,49 +109,24 @@ class EbuildIpc(object):
 
                start_time = time.time()
 
-               pipe_reader = PipeReader(input_files={"pipe_read":pr},
-                       scheduler=global_event_loop())
-               pipe_reader.start()
-
-               eof = pipe_reader.poll() is not None
+               fifo_writer.start()
+               eof = fifo_writer.poll() is not None
 
                while not eof:
-                       pipe_reader._wait_loop(timeout=self._COMMUNICATE_RETRY_TIMEOUT_MS)
+                       fifo_writer._wait_loop(timeout=self._COMMUNICATE_RETRY_TIMEOUT_MS)
 
-                       eof = pipe_reader.poll() is not None
+                       eof = fifo_writer.poll() is not None
                        if eof:
                                break
                        elif self._daemon_is_alive():
                                self._timeout_retry_msg(start_time, msg)
                        else:
-                               pipe_reader.cancel()
+                               fifo_writer.cancel()
                                self._no_daemon_msg()
-                               try:
-                                       os.kill(pid, signal.SIGKILL)
-                                       os.waitpid(pid, 0)
-                               except OSError as e:
-                                       portage.util.writemsg_level(
-                                               "ebuild-ipc: %s\n" % (e,),
-                                               level=logging.ERROR, noiselevel=-1)
+                               fifo_writer.wait()
                                return 2
 
-               try:
-                       wait_retval = os.waitpid(pid, 0)
-               except OSError as e:
-                       portage.util.writemsg_level(
-                               "ebuild-ipc: %s: %s\n" % (msg, e),
-                               level=logging.ERROR, noiselevel=-1)
-                       return 2
-
-               if not os.WIFEXITED(wait_retval[1]):
-                       portage.util.writemsg_level(
-                               "ebuild-ipc: %s: %s\n" % (msg,
-                               portage.localization._('subprocess failure: %s') % \
-                               wait_retval[1]),
-                               level=logging.ERROR, noiselevel=-1)
-                       return 2
-
-               return os.WEXITSTATUS(wait_retval[1])
+               return fifo_writer.wait()
 
        def _receive_reply(self, input_fd):
 
@@ -218,31 +203,9 @@ class EbuildIpc(object):
                # un-interrupted, while the parent handles all timeout
                # considerations. This helps to avoid possible race conditions
                # from interference between timeouts and blocking IO operations.
-               pr, pw = os.pipe()
-               pid = os.fork()
-
-               if pid == 0:
-                       retval = 2
-                       try:
-                               os.close(pr)
-
-                               # File streams are in unbuffered mode since we do atomic
-                               # read and write of whole pickles.
-                               output_file = open(self.ipc_in_fifo, 'wb', 0)
-                               output_file.write(pickle.dumps(args))
-                               output_file.close()
-                               retval = os.EX_OK
-                       except SystemExit:
-                               raise
-                       except:
-                               traceback.print_exc()
-                       finally:
-                               os._exit(retval)
-
-               os.close(pw)
-
                msg = portage.localization._('during write')
-               retval = self._wait(pid, pr, msg)
+               retval = self._run_writer(FifoWriter(buf=pickle.dumps(args),
+                       fifo=self.ipc_in_fifo, scheduler=global_event_loop()), msg)
 
                if retval != os.EX_OK:
                        portage.util.writemsg_level(