From 252fa9a50c264e9827c42c631291749ad62d0f4d Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Thu, 2 Sep 2010 00:05:06 -0700 Subject: [PATCH] Adjust EbuildIpcDaemon pickle read and write code in order to ensure atomc reading and writing of whole pickles. This should be the least error-prone approach, given the non-blocking nature of the streams. --- bin/ebuild-ipc.py | 30 ++++++++++++++++++++++++++---- pym/_emerge/EbuildIpcDaemon.py | 29 ++++++++++++++++++++++++++--- pym/_emerge/FifoIpcDaemon.py | 5 ++++- 3 files changed, 56 insertions(+), 8 deletions(-) diff --git a/bin/ebuild-ipc.py b/bin/ebuild-ipc.py index cbb8c5570..52fb08226 100755 --- a/bin/ebuild-ipc.py +++ b/bin/ebuild-ipc.py @@ -48,13 +48,35 @@ class EbuildIpc(object): def _communicate(self, args): input_fd = os.open(self.ipc_out_fifo, os.O_RDONLY|os.O_NONBLOCK) - input_file = os.fdopen(input_fd, 'rb') - output_file = open(self.ipc_in_fifo, 'wb') - pickle.dump(args, output_file) + + # File streams are in unbuffered mode since we do atomic + # read and write of whole pickles. + input_file = os.fdopen(input_fd, 'rb', 0) + output_file = open(self.ipc_in_fifo, 'wb', 0) + + # Write the whole pickle in a single atomic write() call, + # since the reader is in non-blocking mode and we want + # it to get the whole pickle at once. + output_file.write(pickle.dumps(args)) output_file.flush() events = select.select([input_file], [], []) - reply = pickle.load(input_file) + + # Read the whole pickle in a single read() call since + # this stream is in non-blocking mode and pickle.load() + # has been known to raise the following exception when + # reading from a non-blocking stream: + # + # File "/usr/lib64/python2.6/pickle.py", line 1370, in load + # return Unpickler(file).load() + # File "/usr/lib64/python2.6/pickle.py", line 858, in load + # dispatch[key](self) + # File "/usr/lib64/python2.6/pickle.py", line 1195, in load_setitem + # value = stack.pop() + # IndexError: pop from empty list + + pickle_str = input_file.read() + reply = pickle.loads(pickle_str) output_file.close() input_file.close() diff --git a/pym/_emerge/EbuildIpcDaemon.py b/pym/_emerge/EbuildIpcDaemon.py index f87ffd2e0..93333a6b1 100644 --- a/pym/_emerge/EbuildIpcDaemon.py +++ b/pym/_emerge/EbuildIpcDaemon.py @@ -32,8 +32,23 @@ class EbuildIpcDaemon(FifoIpcDaemon): if event & PollConstants.POLLIN: + # Read the whole pickle in a single read() call since + # this stream is in non-blocking mode and pickle.load() + # has been known to raise the following exception when + # reading from a non-blocking stream: + # + # File "/usr/lib64/python2.6/pickle.py", line 1370, in load + # return Unpickler(file).load() + # File "/usr/lib64/python2.6/pickle.py", line 858, in load + # dispatch[key](self) + # File "/usr/lib64/python2.6/pickle.py", line 1195, in load_setitem + # value = stack.pop() + # IndexError: pop from empty list + + pickle_str = self._files.pipe_in.read() + try: - obj = pickle.load(self._files.pipe_in) + obj = pickle.loads(pickle_str) except (EnvironmentError, EOFError, ValueError, pickle.UnpicklingError): pass @@ -63,6 +78,14 @@ class EbuildIpcDaemon(FifoIpcDaemon): def _send_reply(self, reply): output_fd = os.open(self.output_fifo, os.O_WRONLY|os.O_NONBLOCK) - output_file = os.fdopen(output_fd, 'wb') - pickle.dump(reply, output_file) + + # File streams are in unbuffered mode since we do atomic + # read and write of whole pickles. + output_file = os.fdopen(output_fd, 'wb', 0) + + # Write the whole pickle in a single atomic write() call, + # since the reader is in non-blocking mode and we want + # it to get the whole pickle at once. + output_file.write(pickle.dumps(reply)) + output_file.flush() output_file.close() diff --git a/pym/_emerge/FifoIpcDaemon.py b/pym/_emerge/FifoIpcDaemon.py index 60a5096a5..b879fda95 100644 --- a/pym/_emerge/FifoIpcDaemon.py +++ b/pym/_emerge/FifoIpcDaemon.py @@ -16,7 +16,10 @@ class FifoIpcDaemon(AbstractPollTask): def _start(self): self._files = self._files_dict() input_fd = os.open(self.input_fifo, os.O_RDONLY|os.O_NONBLOCK) - self._files.pipe_in = os.fdopen(input_fd, 'rb') + + # File streams are in unbuffered mode since we do atomic + # read and write of whole pickles. + self._files.pipe_in = os.fdopen(input_fd, 'rb', 0) self._reg_id = self.scheduler.register( self._files.pipe_in.fileno(), -- 2.26.2