From: Zac Medico Date: Mon, 22 Jun 2009 18:47:26 +0000 (-0000) Subject: Bug #275047 - Split _emerge/__init__.py into smaller pieces (part 3). X-Git-Tag: v2.2_rc34~170 X-Git-Url: http://git.tremily.us/?a=commitdiff_plain;h=bf9282b6782ad433b2ca905a5131bd0c424a2d94;p=portage.git Bug #275047 - Split _emerge/__init__.py into smaller pieces (part 3). Thanks to Sebastian Mingramm (few) for this patch. svn path=/main/trunk/; revision=13668 --- diff --git a/pym/_emerge/PollScheduler.py b/pym/_emerge/PollScheduler.py new file mode 100644 index 000000000..3a23877f9 --- /dev/null +++ b/pym/_emerge/PollScheduler.py @@ -0,0 +1,251 @@ +import logging +import select + +from portage.util import writemsg_level + +from _emerge.SlotObject import SlotObject +from _emerge.getloadavg import getloadavg +from _emerge.PollConstants import PollConstants +from _emerge.PollSelectAdapter import PollSelectAdapter + +class PollScheduler(object): + + class _sched_iface_class(SlotObject): + __slots__ = ("register", "schedule", "unregister") + + def __init__(self): + self._max_jobs = 1 + self._max_load = None + self._jobs = 0 + self._poll_event_queue = [] + self._poll_event_handlers = {} + self._poll_event_handler_ids = {} + # Increment id for each new handler. + self._event_handler_id = 0 + self._poll_obj = create_poll_instance() + self._scheduling = False + + def _schedule(self): + """ + Calls _schedule_tasks() and automatically returns early from + any recursive calls to this method that the _schedule_tasks() + call might trigger. This makes _schedule() safe to call from + inside exit listeners. + """ + if self._scheduling: + return False + self._scheduling = True + try: + return self._schedule_tasks() + finally: + self._scheduling = False + + def _running_job_count(self): + return self._jobs + + def _can_add_job(self): + max_jobs = self._max_jobs + max_load = self._max_load + + if self._max_jobs is not True and \ + self._running_job_count() >= self._max_jobs: + return False + + if max_load is not None and \ + (max_jobs is True or max_jobs > 1) and \ + self._running_job_count() >= 1: + try: + avg1, avg5, avg15 = getloadavg() + except OSError: + return False + + if avg1 >= max_load: + return False + + return True + + def _poll(self, timeout=None): + """ + All poll() calls pass through here. The poll events + are added directly to self._poll_event_queue. + In order to avoid endless blocking, this raises + StopIteration if timeout is None and there are + no file descriptors to poll. + """ + if not self._poll_event_handlers: + self._schedule() + if timeout is None and \ + not self._poll_event_handlers: + raise StopIteration( + "timeout is None and there are no poll() event handlers") + + # The following error is known to occur with Linux kernel versions + # less than 2.6.24: + # + # select.error: (4, 'Interrupted system call') + # + # This error has been observed after a SIGSTOP, followed by SIGCONT. + # Treat it similar to EAGAIN if timeout is None, otherwise just return + # without any events. + while True: + try: + self._poll_event_queue.extend(self._poll_obj.poll(timeout)) + break + except select.error, e: + writemsg_level("\n!!! select error: %s\n" % (e,), + level=logging.ERROR, noiselevel=-1) + del e + if timeout is not None: + break + + def _next_poll_event(self, timeout=None): + """ + Since the _schedule_wait() loop is called by event + handlers from _poll_loop(), maintain a central event + queue for both of them to share events from a single + poll() call. In order to avoid endless blocking, this + raises StopIteration if timeout is None and there are + no file descriptors to poll. + """ + if not self._poll_event_queue: + self._poll(timeout) + return self._poll_event_queue.pop() + + def _poll_loop(self): + + event_handlers = self._poll_event_handlers + event_handled = False + + try: + while event_handlers: + f, event = self._next_poll_event() + handler, reg_id = event_handlers[f] + handler(f, event) + event_handled = True + except StopIteration: + event_handled = True + + if not event_handled: + raise AssertionError("tight loop") + + def _schedule_yield(self): + """ + Schedule for a short period of time chosen by the scheduler based + on internal state. Synchronous tasks should call this periodically + in order to allow the scheduler to service pending poll events. The + scheduler will call poll() exactly once, without blocking, and any + resulting poll events will be serviced. + """ + event_handlers = self._poll_event_handlers + events_handled = 0 + + if not event_handlers: + return bool(events_handled) + + if not self._poll_event_queue: + self._poll(0) + + try: + while event_handlers and self._poll_event_queue: + f, event = self._next_poll_event() + handler, reg_id = event_handlers[f] + handler(f, event) + events_handled += 1 + except StopIteration: + events_handled += 1 + + return bool(events_handled) + + def _register(self, f, eventmask, handler): + """ + @rtype: Integer + @return: A unique registration id, for use in schedule() or + unregister() calls. + """ + if f in self._poll_event_handlers: + raise AssertionError("fd %d is already registered" % f) + self._event_handler_id += 1 + reg_id = self._event_handler_id + self._poll_event_handler_ids[reg_id] = f + self._poll_event_handlers[f] = (handler, reg_id) + self._poll_obj.register(f, eventmask) + return reg_id + + def _unregister(self, reg_id): + f = self._poll_event_handler_ids[reg_id] + self._poll_obj.unregister(f) + del self._poll_event_handlers[f] + del self._poll_event_handler_ids[reg_id] + + def _schedule_wait(self, wait_ids): + """ + Schedule until wait_id is not longer registered + for poll() events. + @type wait_id: int + @param wait_id: a task id to wait for + """ + event_handlers = self._poll_event_handlers + handler_ids = self._poll_event_handler_ids + event_handled = False + + if isinstance(wait_ids, int): + wait_ids = frozenset([wait_ids]) + + try: + while wait_ids.intersection(handler_ids): + f, event = self._next_poll_event() + handler, reg_id = event_handlers[f] + handler(f, event) + event_handled = True + except StopIteration: + event_handled = True + + return event_handled + + +_can_poll_device = None + +def can_poll_device(): + """ + Test if it's possible to use poll() on a device such as a pty. This + is known to fail on Darwin. + @rtype: bool + @returns: True if poll() on a device succeeds, False otherwise. + """ + + global _can_poll_device + if _can_poll_device is not None: + return _can_poll_device + + if not hasattr(select, "poll"): + _can_poll_device = False + return _can_poll_device + + try: + dev_null = open('/dev/null', 'rb') + except IOError: + _can_poll_device = False + return _can_poll_device + + p = select.poll() + p.register(dev_null.fileno(), PollConstants.POLLIN) + + invalid_request = False + for f, event in p.poll(): + if event & PollConstants.POLLNVAL: + invalid_request = True + break + dev_null.close() + + _can_poll_device = not invalid_request + return _can_poll_device + +def create_poll_instance(): + """ + Create an instance of select.poll, or an instance of + PollSelectAdapter there is no poll() implementation or + it is broken somehow. + """ + if can_poll_device(): + return select.poll() + return PollSelectAdapter() diff --git a/pym/_emerge/QueueScheduler.py b/pym/_emerge/QueueScheduler.py new file mode 100644 index 000000000..f88c13caa --- /dev/null +++ b/pym/_emerge/QueueScheduler.py @@ -0,0 +1,77 @@ +from _emerge.PollScheduler import PollScheduler + +class QueueScheduler(PollScheduler): + + """ + Add instances of SequentialTaskQueue and then call run(). The + run() method returns when no tasks remain. + """ + + def __init__(self, max_jobs=None, max_load=None): + PollScheduler.__init__(self) + + if max_jobs is None: + max_jobs = 1 + + self._max_jobs = max_jobs + self._max_load = max_load + self.sched_iface = self._sched_iface_class( + register=self._register, + schedule=self._schedule_wait, + unregister=self._unregister) + + self._queues = [] + self._schedule_listeners = [] + + def add(self, q): + self._queues.append(q) + + def remove(self, q): + self._queues.remove(q) + + def run(self): + + while self._schedule(): + self._poll_loop() + + while self._running_job_count(): + self._poll_loop() + + def _schedule_tasks(self): + """ + @rtype: bool + @returns: True if there may be remaining tasks to schedule, + False otherwise. + """ + while self._can_add_job(): + n = self._max_jobs - self._running_job_count() + if n < 1: + break + + if not self._start_next_job(n): + return False + + for q in self._queues: + if q: + return True + return False + + def _running_job_count(self): + job_count = 0 + for q in self._queues: + job_count += len(q.running_tasks) + self._jobs = job_count + return job_count + + def _start_next_job(self, n=1): + started_count = 0 + for q in self._queues: + initial_job_count = len(q.running_tasks) + q.schedule() + final_job_count = len(q.running_tasks) + if final_job_count > initial_job_count: + started_count += (final_job_count - initial_job_count) + if started_count >= n: + break + return started_count + diff --git a/pym/_emerge/TaskScheduler.py b/pym/_emerge/TaskScheduler.py new file mode 100644 index 000000000..564f130aa --- /dev/null +++ b/pym/_emerge/TaskScheduler.py @@ -0,0 +1,21 @@ +from _emerge.QueueScheduler import QueueScheduler +from _emerge.SequentialTaskQueue import SequentialTaskQueue + +class TaskScheduler(object): + + """ + A simple way to handle scheduling of AsynchrousTask instances. Simply + add tasks and call run(). The run() method returns when no tasks remain. + """ + + def __init__(self, max_jobs=None, max_load=None): + self._queue = SequentialTaskQueue(max_jobs=max_jobs) + self._scheduler = QueueScheduler( + max_jobs=max_jobs, max_load=max_load) + self.sched_iface = self._scheduler.sched_iface + self.run = self._scheduler.run + self._scheduler.add(self._queue) + + def add(self, task): + self._queue.add(task) + diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py index 3a3dac25e..de5642da9 100644 --- a/pym/_emerge/__init__.py +++ b/pym/_emerge/__init__.py @@ -3,10 +3,8 @@ # Distributed under the terms of the GNU General Public License v2 # $Id$ -import formatter import logging import pwd -import select import shlex import signal import sys @@ -61,7 +59,6 @@ from _emerge.DepPriorityNormalRange import DepPriorityNormalRange from _emerge.DepPrioritySatisfiedRange import DepPrioritySatisfiedRange from _emerge.Task import Task from _emerge.Blocker import Blocker -from _emerge.PollConstants import PollConstants from _emerge.AsynchronousTask import AsynchronousTask from _emerge.CompositeTask import CompositeTask from _emerge.EbuildFetcher import EbuildFetcher @@ -80,13 +77,12 @@ from _emerge.BlockerCache import BlockerCache from _emerge.PackageVirtualDbapi import PackageVirtualDbapi from _emerge.RepoDisplay import RepoDisplay from _emerge.UseFlagDisplay import UseFlagDisplay -from _emerge.PollSelectAdapter import PollSelectAdapter from _emerge.SequentialTaskQueue import SequentialTaskQueue from _emerge.ProgressHandler import ProgressHandler from _emerge.stdout_spinner import stdout_spinner from _emerge.UninstallFailure import UninstallFailure from _emerge.JobStatusDisplay import JobStatusDisplay -from _emerge.getloadavg import getloadavg +from _emerge.PollScheduler import PollScheduler def userquery(prompt, responses=None, colours=None): """Displays a prompt and a set of responses, then waits for a response @@ -6445,341 +6441,6 @@ class PackageCounters(object): (self.blocks - self.blocks_satisfied)) return "".join(myoutput) - -_can_poll_device = None - -def can_poll_device(): - """ - Test if it's possible to use poll() on a device such as a pty. This - is known to fail on Darwin. - @rtype: bool - @returns: True if poll() on a device succeeds, False otherwise. - """ - - global _can_poll_device - if _can_poll_device is not None: - return _can_poll_device - - if not hasattr(select, "poll"): - _can_poll_device = False - return _can_poll_device - - try: - dev_null = open('/dev/null', 'rb') - except IOError: - _can_poll_device = False - return _can_poll_device - - p = select.poll() - p.register(dev_null.fileno(), PollConstants.POLLIN) - - invalid_request = False - for f, event in p.poll(): - if event & PollConstants.POLLNVAL: - invalid_request = True - break - dev_null.close() - - _can_poll_device = not invalid_request - return _can_poll_device - -def create_poll_instance(): - """ - Create an instance of select.poll, or an instance of - PollSelectAdapter there is no poll() implementation or - it is broken somehow. - """ - if can_poll_device(): - return select.poll() - return PollSelectAdapter() - -class PollScheduler(object): - - class _sched_iface_class(SlotObject): - __slots__ = ("register", "schedule", "unregister") - - def __init__(self): - self._max_jobs = 1 - self._max_load = None - self._jobs = 0 - self._poll_event_queue = [] - self._poll_event_handlers = {} - self._poll_event_handler_ids = {} - # Increment id for each new handler. - self._event_handler_id = 0 - self._poll_obj = create_poll_instance() - self._scheduling = False - - def _schedule(self): - """ - Calls _schedule_tasks() and automatically returns early from - any recursive calls to this method that the _schedule_tasks() - call might trigger. This makes _schedule() safe to call from - inside exit listeners. - """ - if self._scheduling: - return False - self._scheduling = True - try: - return self._schedule_tasks() - finally: - self._scheduling = False - - def _running_job_count(self): - return self._jobs - - def _can_add_job(self): - max_jobs = self._max_jobs - max_load = self._max_load - - if self._max_jobs is not True and \ - self._running_job_count() >= self._max_jobs: - return False - - if max_load is not None and \ - (max_jobs is True or max_jobs > 1) and \ - self._running_job_count() >= 1: - try: - avg1, avg5, avg15 = getloadavg() - except OSError: - return False - - if avg1 >= max_load: - return False - - return True - - def _poll(self, timeout=None): - """ - All poll() calls pass through here. The poll events - are added directly to self._poll_event_queue. - In order to avoid endless blocking, this raises - StopIteration if timeout is None and there are - no file descriptors to poll. - """ - if not self._poll_event_handlers: - self._schedule() - if timeout is None and \ - not self._poll_event_handlers: - raise StopIteration( - "timeout is None and there are no poll() event handlers") - - # The following error is known to occur with Linux kernel versions - # less than 2.6.24: - # - # select.error: (4, 'Interrupted system call') - # - # This error has been observed after a SIGSTOP, followed by SIGCONT. - # Treat it similar to EAGAIN if timeout is None, otherwise just return - # without any events. - while True: - try: - self._poll_event_queue.extend(self._poll_obj.poll(timeout)) - break - except select.error, e: - writemsg_level("\n!!! select error: %s\n" % (e,), - level=logging.ERROR, noiselevel=-1) - del e - if timeout is not None: - break - - def _next_poll_event(self, timeout=None): - """ - Since the _schedule_wait() loop is called by event - handlers from _poll_loop(), maintain a central event - queue for both of them to share events from a single - poll() call. In order to avoid endless blocking, this - raises StopIteration if timeout is None and there are - no file descriptors to poll. - """ - if not self._poll_event_queue: - self._poll(timeout) - return self._poll_event_queue.pop() - - def _poll_loop(self): - - event_handlers = self._poll_event_handlers - event_handled = False - - try: - while event_handlers: - f, event = self._next_poll_event() - handler, reg_id = event_handlers[f] - handler(f, event) - event_handled = True - except StopIteration: - event_handled = True - - if not event_handled: - raise AssertionError("tight loop") - - def _schedule_yield(self): - """ - Schedule for a short period of time chosen by the scheduler based - on internal state. Synchronous tasks should call this periodically - in order to allow the scheduler to service pending poll events. The - scheduler will call poll() exactly once, without blocking, and any - resulting poll events will be serviced. - """ - event_handlers = self._poll_event_handlers - events_handled = 0 - - if not event_handlers: - return bool(events_handled) - - if not self._poll_event_queue: - self._poll(0) - - try: - while event_handlers and self._poll_event_queue: - f, event = self._next_poll_event() - handler, reg_id = event_handlers[f] - handler(f, event) - events_handled += 1 - except StopIteration: - events_handled += 1 - - return bool(events_handled) - - def _register(self, f, eventmask, handler): - """ - @rtype: Integer - @return: A unique registration id, for use in schedule() or - unregister() calls. - """ - if f in self._poll_event_handlers: - raise AssertionError("fd %d is already registered" % f) - self._event_handler_id += 1 - reg_id = self._event_handler_id - self._poll_event_handler_ids[reg_id] = f - self._poll_event_handlers[f] = (handler, reg_id) - self._poll_obj.register(f, eventmask) - return reg_id - - def _unregister(self, reg_id): - f = self._poll_event_handler_ids[reg_id] - self._poll_obj.unregister(f) - del self._poll_event_handlers[f] - del self._poll_event_handler_ids[reg_id] - - def _schedule_wait(self, wait_ids): - """ - Schedule until wait_id is not longer registered - for poll() events. - @type wait_id: int - @param wait_id: a task id to wait for - """ - event_handlers = self._poll_event_handlers - handler_ids = self._poll_event_handler_ids - event_handled = False - - if isinstance(wait_ids, int): - wait_ids = frozenset([wait_ids]) - - try: - while wait_ids.intersection(handler_ids): - f, event = self._next_poll_event() - handler, reg_id = event_handlers[f] - handler(f, event) - event_handled = True - except StopIteration: - event_handled = True - - return event_handled - -class QueueScheduler(PollScheduler): - - """ - Add instances of SequentialTaskQueue and then call run(). The - run() method returns when no tasks remain. - """ - - def __init__(self, max_jobs=None, max_load=None): - PollScheduler.__init__(self) - - if max_jobs is None: - max_jobs = 1 - - self._max_jobs = max_jobs - self._max_load = max_load - self.sched_iface = self._sched_iface_class( - register=self._register, - schedule=self._schedule_wait, - unregister=self._unregister) - - self._queues = [] - self._schedule_listeners = [] - - def add(self, q): - self._queues.append(q) - - def remove(self, q): - self._queues.remove(q) - - def run(self): - - while self._schedule(): - self._poll_loop() - - while self._running_job_count(): - self._poll_loop() - - def _schedule_tasks(self): - """ - @rtype: bool - @returns: True if there may be remaining tasks to schedule, - False otherwise. - """ - while self._can_add_job(): - n = self._max_jobs - self._running_job_count() - if n < 1: - break - - if not self._start_next_job(n): - return False - - for q in self._queues: - if q: - return True - return False - - def _running_job_count(self): - job_count = 0 - for q in self._queues: - job_count += len(q.running_tasks) - self._jobs = job_count - return job_count - - def _start_next_job(self, n=1): - started_count = 0 - for q in self._queues: - initial_job_count = len(q.running_tasks) - q.schedule() - final_job_count = len(q.running_tasks) - if final_job_count > initial_job_count: - started_count += (final_job_count - initial_job_count) - if started_count >= n: - break - return started_count - -class TaskScheduler(object): - - """ - A simple way to handle scheduling of AsynchrousTask instances. Simply - add tasks and call run(). The run() method returns when no tasks remain. - """ - - def __init__(self, max_jobs=None, max_load=None): - self._queue = SequentialTaskQueue(max_jobs=max_jobs) - self._scheduler = QueueScheduler( - max_jobs=max_jobs, max_load=max_load) - self.sched_iface = self._scheduler.sched_iface - self.run = self._scheduler.run - self._scheduler.add(self._queue) - - def add(self, task): - self._queue.add(task) - class Scheduler(PollScheduler): _opts_ignore_blockers = \ diff --git a/pym/portage/tests/process/test_poll.py b/pym/portage/tests/process/test_poll.py index f5669780f..3f4a597b4 100644 --- a/pym/portage/tests/process/test_poll.py +++ b/pym/portage/tests/process/test_poll.py @@ -8,7 +8,7 @@ import termios import portage from portage.output import get_term_size, set_term_size from portage.tests import TestCase -from _emerge import TaskScheduler +from _emerge.TaskScheduler import TaskScheduler from _emerge.PipeReader import PipeReader from _emerge.SpawnProcess import SpawnProcess