if timeout is not None:
select_args = select_args[:]
- select_args.append(timeout)
+ # Translate poll() timeout args to select() timeout args:
+ #
+ # | units | value(s) for indefinite block
+ # ---------|--------------|------------------------------
+ # poll | milliseconds | omitted, negative, or None
+ # ---------|--------------|------------------------------
+ # select | seconds | omitted
+ # ---------|--------------|------------------------------
+
+ if timeout is not None and timeout < 0:
+ timeout = None
+ if timeout is not None:
+ select_args.append(timeout / 1000)
select_events = select.select(*select_args)
poll_events = []
return True
- def _next_poll_event(self):
+ def _next_poll_event(self, timeout=None):
"""
Since the _schedule_wait() loop is called by event
handlers from _poll_loop(), maintain a central event
poll() call.
"""
if not self._poll_event_queue:
- self._poll_event_queue.extend(self._poll.poll())
+ self._poll_event_queue.extend(self._poll.poll(timeout))
return self._poll_event_queue.pop()
def _poll_loop(self):
if not event_handled:
raise AssertionError("tight loop")
+ def _schedule_yield(self):
+ """
+ Schedule for a short period of time chosen by the scheduler based
+ on internal state. Synchronous tasks should call this periodically
+ in order to allow the scheduler to service pending poll events. The
+ scheduler will call poll() exactly once, without blocking, and any
+ resulting poll events will be serviced.
+ """
+ event_handlers = self._poll_event_handlers
+ events_handled = 0
+
+ if not event_handlers:
+ return bool(events_handled)
+
+ if not self._poll_event_queue:
+ self._poll_event_queue.extend(self._poll.poll(0))
+
+ while event_handlers and self._poll_event_queue:
+ f, event = self._next_poll_event()
+ handler, reg_id = event_handlers[f]
+ handler(f, event)
+ events_handled += 1
+
+ return bool(events_handled)
+
def _register(self, f, eventmask, handler):
"""
@rtype: Integer
class _iface_class(SlotObject):
__slots__ = ("dblinkEbuildPhase", "dblinkDisplayMerge", "fetch",
- "register", "schedule", "unregister")
+ "register", "schedule", "scheduleYield", "unregister")
class _fetch_iface_class(SlotObject):
__slots__ = ("log_file", "schedule")
dblinkEbuildPhase=self._dblink_ebuild_phase,
dblinkDisplayMerge=self._dblink_display_merge,
fetch=fetch_iface, register=self._register,
- schedule=self._schedule_wait, unregister=self._unregister)
+ schedule=self._schedule_wait, scheduleYield=self._schedule_yield,
+ unregister=self._unregister)
self._task_queues = self._task_queues_class()
for k in self._task_queues.allowed_keys:
"sym": 5
}
+ # When looping over files for merge/unmerge, temporarily yield to the
+ # scheduler each time this many files are processed.
+ _file_merge_yield_interval = 20
+
def __init__(self, cat, pkg, myroot, mysettings, treetype=None,
vartree=None, blockers=None, scheduler=None):
"""
"""
showMessage = self._display_merge
+ scheduler = self._scheduler
if not pkgfiles:
showMessage("No package files given... Grabbing a set.\n")
def show_unmerge(zing, desc, file_type, file_name):
showMessage("%s %s %s %s\n" % \
(zing, desc.ljust(8), file_type, file_name))
- for objkey in mykeys:
+ for i, objkey in enumerate(mykeys):
+
+ if scheduler is not None and \
+ 0 == i % self._file_merge_yield_interval:
+ scheduler.scheduleYield()
+
obj = normalize_path(objkey)
file_data = pkgfiles[objkey]
file_type = file_data[0]
self.settings.get("COLLISION_IGNORE", "").split()])
showMessage = self._display_merge
+ scheduler = self._scheduler
stopmerge = False
- i=0
collisions = []
destroot = normalize_path(destroot).rstrip(os.path.sep) + \
os.path.sep
showMessage("%s checking %d files for package collisions\n" % \
(green("*"), len(mycontents)))
- for f in mycontents:
- i = i + 1
+ for i, f in enumerate(mycontents):
if i % 1000 == 0:
showMessage("%d files checked ...\n" % i)
+
+ if scheduler is not None and \
+ 0 == i % self._file_merge_yield_interval:
+ scheduler.scheduleYield()
+
dest_path = normalize_path(
os.path.join(destroot, f.lstrip(os.path.sep)))
try:
return 0
showMessage = self._display_merge
+ scheduler = self._scheduler
file_paths = set()
for dblnk in installed_instances:
file_paths.update(dblnk.getcontents())
inode_map = {}
real_paths = set()
- for path in file_paths:
+ for i, path in enumerate(file_paths):
+
+ if scheduler is not None and \
+ 0 == i % self._file_merge_yield_interval:
+ scheduler.scheduleYield()
+
try:
s = os.lstat(path)
except OSError, e:
"""
showMessage = self._display_merge
+ scheduler = self._scheduler
from os.path import sep, join
srcroot = normalize_path(srcroot).rstrip(sep) + sep
else:
mergelist = stufftomerge
offset = ""
- for x in mergelist:
+
+ for i, x in enumerate(mergelist):
+
+ if scheduler is not None and \
+ 0 == i % self._file_merge_yield_interval:
+ scheduler.scheduleYield()
+
mysrc = join(srcroot, offset, x)
mydest = join(destroot, offset, x)
# myrealdest is mydest without the $ROOT prefix (makes a difference if ROOT!="/")