Adjust EbuildIpcDaemon pickle read and write code in order to ensure
authorZac Medico <zmedico@gentoo.org>
Thu, 2 Sep 2010 07:05:06 +0000 (00:05 -0700)
committerZac Medico <zmedico@gentoo.org>
Thu, 2 Sep 2010 07:05:06 +0000 (00:05 -0700)
atomc reading and writing of whole pickles. This should be the least
error-prone approach, given the non-blocking nature of the streams.

bin/ebuild-ipc.py
pym/_emerge/EbuildIpcDaemon.py
pym/_emerge/FifoIpcDaemon.py

index cbb8c5570c2b4623a6c852f55cb90a7f77f2be34..52fb08226448dde211750ba43b2e01e4d6d502a4 100755 (executable)
@@ -48,13 +48,35 @@ class EbuildIpc(object):
 
        def _communicate(self, args):
                input_fd = os.open(self.ipc_out_fifo, os.O_RDONLY|os.O_NONBLOCK)
-               input_file = os.fdopen(input_fd, 'rb')
-               output_file = open(self.ipc_in_fifo, 'wb')
-               pickle.dump(args, output_file)
+
+               # File streams are in unbuffered mode since we do atomic
+               # read and write of whole pickles.
+               input_file = os.fdopen(input_fd, 'rb', 0)
+               output_file = open(self.ipc_in_fifo, 'wb', 0)
+
+               # Write the whole pickle in a single atomic write() call,
+               # since the reader is in non-blocking mode and we want
+               # it to get the whole pickle at once.
+               output_file.write(pickle.dumps(args))
                output_file.flush()
 
                events = select.select([input_file], [], [])
-               reply = pickle.load(input_file)
+
+               # Read the whole pickle in a single read() call since
+               # this stream is in non-blocking mode and pickle.load()
+               # has been known to raise the following exception when
+               # reading from a non-blocking stream:
+               #
+               #   File "/usr/lib64/python2.6/pickle.py", line 1370, in load
+               #     return Unpickler(file).load()
+               #   File "/usr/lib64/python2.6/pickle.py", line 858, in load
+               #     dispatch[key](self)
+               #   File "/usr/lib64/python2.6/pickle.py", line 1195, in load_setitem
+               #     value = stack.pop()
+               # IndexError: pop from empty list
+
+               pickle_str = input_file.read()
+               reply = pickle.loads(pickle_str)
                output_file.close()
                input_file.close()
 
index f87ffd2e0ca3e14b4352098e2e58a0adea201232..93333a6b170e72ae9f17d624ee89bc5dc6ddff43 100644 (file)
@@ -32,8 +32,23 @@ class EbuildIpcDaemon(FifoIpcDaemon):
 
                if event & PollConstants.POLLIN:
 
+                       # Read the whole pickle in a single read() call since
+                       # this stream is in non-blocking mode and pickle.load()
+                       # has been known to raise the following exception when
+                       # reading from a non-blocking stream:
+                       #
+                       #   File "/usr/lib64/python2.6/pickle.py", line 1370, in load
+                       #     return Unpickler(file).load()
+                       #   File "/usr/lib64/python2.6/pickle.py", line 858, in load
+                       #     dispatch[key](self)
+                       #   File "/usr/lib64/python2.6/pickle.py", line 1195, in load_setitem
+                       #     value = stack.pop()
+                       # IndexError: pop from empty list
+
+                       pickle_str = self._files.pipe_in.read()
+
                        try:
-                               obj = pickle.load(self._files.pipe_in)
+                               obj = pickle.loads(pickle_str)
                        except (EnvironmentError, EOFError, ValueError,
                                pickle.UnpicklingError):
                                pass
@@ -63,6 +78,14 @@ class EbuildIpcDaemon(FifoIpcDaemon):
 
        def _send_reply(self, reply):
                output_fd = os.open(self.output_fifo, os.O_WRONLY|os.O_NONBLOCK)
-               output_file = os.fdopen(output_fd, 'wb')
-               pickle.dump(reply, output_file)
+
+               # File streams are in unbuffered mode since we do atomic
+               # read and write of whole pickles.
+               output_file = os.fdopen(output_fd, 'wb', 0)
+
+               # Write the whole pickle in a single atomic write() call,
+               # since the reader is in non-blocking mode and we want
+               # it to get the whole pickle at once.
+               output_file.write(pickle.dumps(reply))
+               output_file.flush()
                output_file.close()
index 60a5096a5443be08656f0abaa54254aa07793525..b879fda95ab71b3995dc29fb98190562fa32fe71 100644 (file)
@@ -16,7 +16,10 @@ class FifoIpcDaemon(AbstractPollTask):
        def _start(self):
                self._files = self._files_dict()
                input_fd = os.open(self.input_fifo, os.O_RDONLY|os.O_NONBLOCK)
-               self._files.pipe_in = os.fdopen(input_fd, 'rb')
+
+               # File streams are in unbuffered mode since we do atomic
+               # read and write of whole pickles.
+               self._files.pipe_in = os.fdopen(input_fd, 'rb', 0)
 
                self._reg_id = self.scheduler.register(
                        self._files.pipe_in.fileno(),