From f202826c41db708712a1847c11f2298b1314b2d9 Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Thu, 17 Jul 2008 22:41:04 +0000 Subject: [PATCH] When dblink is looping over files for merge/unmerge, temporarily yield to the scheduler each time a fixed number of files are processed (currently 20). This gives the scheduler an opportunity to service pending poll events. This is implemented with a new PollScheduler._schedule_yield() method which calls poll() exactly once, without blocking, and any services any resulting poll events. svn path=/main/trunk/; revision=11116 --- pym/_emerge/__init__.py | 48 ++++++++++++++++++++++++++++++++---- pym/portage/dbapi/vartree.py | 39 ++++++++++++++++++++++++----- 2 files changed, 76 insertions(+), 11 deletions(-) diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py index 94218e25a..b6e942963 100644 --- a/pym/_emerge/__init__.py +++ b/pym/_emerge/__init__.py @@ -7958,7 +7958,19 @@ class PollSelectAdapter(PollConstants): if timeout is not None: select_args = select_args[:] - select_args.append(timeout) + # Translate poll() timeout args to select() timeout args: + # + # | units | value(s) for indefinite block + # ---------|--------------|------------------------------ + # poll | milliseconds | omitted, negative, or None + # ---------|--------------|------------------------------ + # select | seconds | omitted + # ---------|--------------|------------------------------ + + if timeout is not None and timeout < 0: + timeout = None + if timeout is not None: + select_args.append(timeout / 1000) select_events = select.select(*select_args) poll_events = [] @@ -8145,7 +8157,7 @@ class PollScheduler(object): return True - def _next_poll_event(self): + def _next_poll_event(self, timeout=None): """ Since the _schedule_wait() loop is called by event handlers from _poll_loop(), maintain a central event @@ -8153,7 +8165,7 @@ class PollScheduler(object): poll() call. """ if not self._poll_event_queue: - self._poll_event_queue.extend(self._poll.poll()) + self._poll_event_queue.extend(self._poll.poll(timeout)) return self._poll_event_queue.pop() def _poll_loop(self): @@ -8170,6 +8182,31 @@ class PollScheduler(object): if not event_handled: raise AssertionError("tight loop") + def _schedule_yield(self): + """ + Schedule for a short period of time chosen by the scheduler based + on internal state. Synchronous tasks should call this periodically + in order to allow the scheduler to service pending poll events. The + scheduler will call poll() exactly once, without blocking, and any + resulting poll events will be serviced. + """ + event_handlers = self._poll_event_handlers + events_handled = 0 + + if not event_handlers: + return bool(events_handled) + + if not self._poll_event_queue: + self._poll_event_queue.extend(self._poll.poll(0)) + + while event_handlers and self._poll_event_queue: + f, event = self._next_poll_event() + handler, reg_id = event_handlers[f] + handler(f, event) + events_handled += 1 + + return bool(events_handled) + def _register(self, f, eventmask, handler): """ @rtype: Integer @@ -8429,7 +8466,7 @@ class Scheduler(PollScheduler): class _iface_class(SlotObject): __slots__ = ("dblinkEbuildPhase", "dblinkDisplayMerge", "fetch", - "register", "schedule", "unregister") + "register", "schedule", "scheduleYield", "unregister") class _fetch_iface_class(SlotObject): __slots__ = ("log_file", "schedule") @@ -8498,7 +8535,8 @@ class Scheduler(PollScheduler): dblinkEbuildPhase=self._dblink_ebuild_phase, dblinkDisplayMerge=self._dblink_display_merge, fetch=fetch_iface, register=self._register, - schedule=self._schedule_wait, unregister=self._unregister) + schedule=self._schedule_wait, scheduleYield=self._schedule_yield, + unregister=self._unregister) self._task_queues = self._task_queues_class() for k in self._task_queues.allowed_keys: diff --git a/pym/portage/dbapi/vartree.py b/pym/portage/dbapi/vartree.py index 32d83e25a..a6cebaab4 100644 --- a/pym/portage/dbapi/vartree.py +++ b/pym/portage/dbapi/vartree.py @@ -1283,6 +1283,10 @@ class dblink(object): "sym": 5 } + # When looping over files for merge/unmerge, temporarily yield to the + # scheduler each time this many files are processed. + _file_merge_yield_interval = 20 + def __init__(self, cat, pkg, myroot, mysettings, treetype=None, vartree=None, blockers=None, scheduler=None): """ @@ -1775,6 +1779,7 @@ class dblink(object): """ showMessage = self._display_merge + scheduler = self._scheduler if not pkgfiles: showMessage("No package files given... Grabbing a set.\n") @@ -1843,7 +1848,12 @@ class dblink(object): def show_unmerge(zing, desc, file_type, file_name): showMessage("%s %s %s %s\n" % \ (zing, desc.ljust(8), file_type, file_name)) - for objkey in mykeys: + for i, objkey in enumerate(mykeys): + + if scheduler is not None and \ + 0 == i % self._file_merge_yield_interval: + scheduler.scheduleYield() + obj = normalize_path(objkey) file_data = pkgfiles[objkey] file_type = file_data[0] @@ -2217,17 +2227,21 @@ class dblink(object): self.settings.get("COLLISION_IGNORE", "").split()]) showMessage = self._display_merge + scheduler = self._scheduler stopmerge = False - i=0 collisions = [] destroot = normalize_path(destroot).rstrip(os.path.sep) + \ os.path.sep showMessage("%s checking %d files for package collisions\n" % \ (green("*"), len(mycontents))) - for f in mycontents: - i = i + 1 + for i, f in enumerate(mycontents): if i % 1000 == 0: showMessage("%d files checked ...\n" % i) + + if scheduler is not None and \ + 0 == i % self._file_merge_yield_interval: + scheduler.scheduleYield() + dest_path = normalize_path( os.path.join(destroot, f.lstrip(os.path.sep))) try: @@ -2287,13 +2301,19 @@ class dblink(object): return 0 showMessage = self._display_merge + scheduler = self._scheduler file_paths = set() for dblnk in installed_instances: file_paths.update(dblnk.getcontents()) inode_map = {} real_paths = set() - for path in file_paths: + for i, path in enumerate(file_paths): + + if scheduler is not None and \ + 0 == i % self._file_merge_yield_interval: + scheduler.scheduleYield() + try: s = os.lstat(path) except OSError, e: @@ -2842,6 +2862,7 @@ class dblink(object): """ showMessage = self._display_merge + scheduler = self._scheduler from os.path import sep, join srcroot = normalize_path(srcroot).rstrip(sep) + sep @@ -2855,7 +2876,13 @@ class dblink(object): else: mergelist = stufftomerge offset = "" - for x in mergelist: + + for i, x in enumerate(mergelist): + + if scheduler is not None and \ + 0 == i % self._file_merge_yield_interval: + scheduler.scheduleYield() + mysrc = join(srcroot, offset, x) mydest = join(destroot, offset, x) # myrealdest is mydest without the $ROOT prefix (makes a difference if ROOT!="/") -- 2.26.2