When dblink is looping over files for merge/unmerge, temporarily yield to the
authorZac Medico <zmedico@gentoo.org>
Thu, 17 Jul 2008 22:41:04 +0000 (22:41 -0000)
committerZac Medico <zmedico@gentoo.org>
Thu, 17 Jul 2008 22:41:04 +0000 (22:41 -0000)
scheduler each time a fixed number of files are processed (currently 20).
This gives the scheduler an opportunity to service pending poll events. This
is implemented with a new PollScheduler._schedule_yield() method which calls
poll() exactly once, without blocking, and any services any resulting poll
events.

svn path=/main/trunk/; revision=11116

pym/_emerge/__init__.py
pym/portage/dbapi/vartree.py

index 94218e25abf87885fcd2b4517423c01844b3557f..b6e9429634244dd48b08b3728e7fdb45bfe1cc47 100644 (file)
@@ -7958,7 +7958,19 @@ class PollSelectAdapter(PollConstants):
 
                if timeout is not None:
                        select_args = select_args[:]
-                       select_args.append(timeout)
+                       # Translate poll() timeout args to select() timeout args:
+                       #
+                       #          | units        | value(s) for indefinite block
+                       # ---------|--------------|------------------------------
+                       #   poll   | milliseconds | omitted, negative, or None
+                       # ---------|--------------|------------------------------
+                       #   select | seconds      | omitted
+                       # ---------|--------------|------------------------------
+
+                       if timeout is not None and timeout < 0:
+                               timeout = None
+                       if timeout is not None:
+                               select_args.append(timeout / 1000)
 
                select_events = select.select(*select_args)
                poll_events = []
@@ -8145,7 +8157,7 @@ class PollScheduler(object):
 
                return True
 
-       def _next_poll_event(self):
+       def _next_poll_event(self, timeout=None):
                """
                Since the _schedule_wait() loop is called by event
                handlers from _poll_loop(), maintain a central event
@@ -8153,7 +8165,7 @@ class PollScheduler(object):
                poll() call.
                """
                if not self._poll_event_queue:
-                       self._poll_event_queue.extend(self._poll.poll())
+                       self._poll_event_queue.extend(self._poll.poll(timeout))
                return self._poll_event_queue.pop()
 
        def _poll_loop(self):
@@ -8170,6 +8182,31 @@ class PollScheduler(object):
                if not event_handled:
                        raise AssertionError("tight loop")
 
+       def _schedule_yield(self):
+               """
+               Schedule for a short period of time chosen by the scheduler based
+               on internal state. Synchronous tasks should call this periodically
+               in order to allow the scheduler to service pending poll events. The
+               scheduler will call poll() exactly once, without blocking, and any
+               resulting poll events will be serviced.
+               """
+               event_handlers = self._poll_event_handlers
+               events_handled = 0
+
+               if not event_handlers:
+                       return bool(events_handled)
+
+               if not self._poll_event_queue:
+                       self._poll_event_queue.extend(self._poll.poll(0))
+
+               while event_handlers and self._poll_event_queue:
+                       f, event = self._next_poll_event()
+                       handler, reg_id = event_handlers[f]
+                       handler(f, event)
+                       events_handled += 1
+
+               return bool(events_handled)
+
        def _register(self, f, eventmask, handler):
                """
                @rtype: Integer
@@ -8429,7 +8466,7 @@ class Scheduler(PollScheduler):
 
        class _iface_class(SlotObject):
                __slots__ = ("dblinkEbuildPhase", "dblinkDisplayMerge", "fetch",
-                       "register", "schedule", "unregister")
+                       "register", "schedule", "scheduleYield", "unregister")
 
        class _fetch_iface_class(SlotObject):
                __slots__ = ("log_file", "schedule")
@@ -8498,7 +8535,8 @@ class Scheduler(PollScheduler):
                        dblinkEbuildPhase=self._dblink_ebuild_phase,
                        dblinkDisplayMerge=self._dblink_display_merge,
                        fetch=fetch_iface, register=self._register,
-                       schedule=self._schedule_wait, unregister=self._unregister)
+                       schedule=self._schedule_wait, scheduleYield=self._schedule_yield,
+                       unregister=self._unregister)
 
                self._task_queues = self._task_queues_class()
                for k in self._task_queues.allowed_keys:
index 32d83e25aa18ad89c9a1eddb451325bf987a772c..a6cebaab4127a955394e705b42acce6c86e7740a 100644 (file)
@@ -1283,6 +1283,10 @@ class dblink(object):
                "sym": 5
        }
 
+       # When looping over files for merge/unmerge, temporarily yield to the
+       # scheduler each time this many files are processed.
+       _file_merge_yield_interval = 20
+
        def __init__(self, cat, pkg, myroot, mysettings, treetype=None,
                vartree=None, blockers=None, scheduler=None):
                """
@@ -1775,6 +1779,7 @@ class dblink(object):
                """
 
                showMessage = self._display_merge
+               scheduler = self._scheduler
 
                if not pkgfiles:
                        showMessage("No package files given... Grabbing a set.\n")
@@ -1843,7 +1848,12 @@ class dblink(object):
                        def show_unmerge(zing, desc, file_type, file_name):
                                        showMessage("%s %s %s %s\n" % \
                                                (zing, desc.ljust(8), file_type, file_name))
-                       for objkey in mykeys:
+                       for i, objkey in enumerate(mykeys):
+
+                               if scheduler is not None and \
+                                       0 == i % self._file_merge_yield_interval:
+                                       scheduler.scheduleYield()
+
                                obj = normalize_path(objkey)
                                file_data = pkgfiles[objkey]
                                file_type = file_data[0]
@@ -2217,17 +2227,21 @@ class dblink(object):
                                self.settings.get("COLLISION_IGNORE", "").split()])
 
                        showMessage = self._display_merge
+                       scheduler = self._scheduler
                        stopmerge = False
-                       i=0
                        collisions = []
                        destroot = normalize_path(destroot).rstrip(os.path.sep) + \
                                os.path.sep
                        showMessage("%s checking %d files for package collisions\n" % \
                                (green("*"), len(mycontents)))
-                       for f in mycontents:
-                               i = i + 1
+                       for i, f in enumerate(mycontents):
                                if i % 1000 == 0:
                                        showMessage("%d files checked ...\n" % i)
+
+                               if scheduler is not None and \
+                                       0 == i % self._file_merge_yield_interval:
+                                       scheduler.scheduleYield()
+
                                dest_path = normalize_path(
                                        os.path.join(destroot, f.lstrip(os.path.sep)))
                                try:
@@ -2287,13 +2301,19 @@ class dblink(object):
                        return 0
 
                showMessage = self._display_merge
+               scheduler = self._scheduler
 
                file_paths = set()
                for dblnk in installed_instances:
                        file_paths.update(dblnk.getcontents())
                inode_map = {}
                real_paths = set()
-               for path in file_paths:
+               for i, path in enumerate(file_paths):
+
+                       if scheduler is not None and \
+                               0 == i % self._file_merge_yield_interval:
+                               scheduler.scheduleYield()
+
                        try:
                                s = os.lstat(path)
                        except OSError, e:
@@ -2842,6 +2862,7 @@ class dblink(object):
                """
 
                showMessage = self._display_merge
+               scheduler = self._scheduler
 
                from os.path import sep, join
                srcroot = normalize_path(srcroot).rstrip(sep) + sep
@@ -2855,7 +2876,13 @@ class dblink(object):
                else:
                        mergelist = stufftomerge
                        offset = ""
-               for x in mergelist:
+
+               for i, x in enumerate(mergelist):
+
+                       if scheduler is not None and \
+                               0 == i % self._file_merge_yield_interval:
+                               scheduler.scheduleYield()
+
                        mysrc = join(srcroot, offset, x)
                        mydest = join(destroot, offset, x)
                        # myrealdest is mydest without the $ROOT prefix (makes a difference if ROOT!="/")