Bug #275047 - Split _emerge/__init__.py into smaller pieces (part 3).
authorZac Medico <zmedico@gentoo.org>
Mon, 22 Jun 2009 18:47:26 +0000 (18:47 -0000)
committerZac Medico <zmedico@gentoo.org>
Mon, 22 Jun 2009 18:47:26 +0000 (18:47 -0000)
Thanks to Sebastian Mingramm (few) <s.mingramm@gmx.de> for this patch.

svn path=/main/trunk/; revision=13668

pym/_emerge/PollScheduler.py [new file with mode: 0644]
pym/_emerge/QueueScheduler.py [new file with mode: 0644]
pym/_emerge/TaskScheduler.py [new file with mode: 0644]
pym/_emerge/__init__.py
pym/portage/tests/process/test_poll.py

diff --git a/pym/_emerge/PollScheduler.py b/pym/_emerge/PollScheduler.py
new file mode 100644 (file)
index 0000000..3a23877
--- /dev/null
@@ -0,0 +1,251 @@
+import logging
+import select
+
+from portage.util import writemsg_level
+
+from _emerge.SlotObject import SlotObject
+from _emerge.getloadavg import getloadavg
+from _emerge.PollConstants import PollConstants
+from _emerge.PollSelectAdapter import PollSelectAdapter
+
+class PollScheduler(object):
+
+       class _sched_iface_class(SlotObject):
+               __slots__ = ("register", "schedule", "unregister")
+
+       def __init__(self):
+               self._max_jobs = 1
+               self._max_load = None
+               self._jobs = 0
+               self._poll_event_queue = []
+               self._poll_event_handlers = {}
+               self._poll_event_handler_ids = {}
+               # Increment id for each new handler.
+               self._event_handler_id = 0
+               self._poll_obj = create_poll_instance()
+               self._scheduling = False
+
+       def _schedule(self):
+               """
+               Calls _schedule_tasks() and automatically returns early from
+               any recursive calls to this method that the _schedule_tasks()
+               call might trigger. This makes _schedule() safe to call from
+               inside exit listeners.
+               """
+               if self._scheduling:
+                       return False
+               self._scheduling = True
+               try:
+                       return self._schedule_tasks()
+               finally:
+                       self._scheduling = False
+
+       def _running_job_count(self):
+               return self._jobs
+
+       def _can_add_job(self):
+               max_jobs = self._max_jobs
+               max_load = self._max_load
+
+               if self._max_jobs is not True and \
+                       self._running_job_count() >= self._max_jobs:
+                       return False
+
+               if max_load is not None and \
+                       (max_jobs is True or max_jobs > 1) and \
+                       self._running_job_count() >= 1:
+                       try:
+                               avg1, avg5, avg15 = getloadavg()
+                       except OSError:
+                               return False
+
+                       if avg1 >= max_load:
+                               return False
+
+               return True
+
+       def _poll(self, timeout=None):
+               """
+               All poll() calls pass through here. The poll events
+               are added directly to self._poll_event_queue.
+               In order to avoid endless blocking, this raises
+               StopIteration if timeout is None and there are
+               no file descriptors to poll.
+               """
+               if not self._poll_event_handlers:
+                       self._schedule()
+                       if timeout is None and \
+                               not self._poll_event_handlers:
+                               raise StopIteration(
+                                       "timeout is None and there are no poll() event handlers")
+
+               # The following error is known to occur with Linux kernel versions
+               # less than 2.6.24:
+               #
+               #   select.error: (4, 'Interrupted system call')
+               #
+               # This error has been observed after a SIGSTOP, followed by SIGCONT.
+               # Treat it similar to EAGAIN if timeout is None, otherwise just return
+               # without any events.
+               while True:
+                       try:
+                               self._poll_event_queue.extend(self._poll_obj.poll(timeout))
+                               break
+                       except select.error, e:
+                               writemsg_level("\n!!! select error: %s\n" % (e,),
+                                       level=logging.ERROR, noiselevel=-1)
+                               del e
+                               if timeout is not None:
+                                       break
+
+       def _next_poll_event(self, timeout=None):
+               """
+               Since the _schedule_wait() loop is called by event
+               handlers from _poll_loop(), maintain a central event
+               queue for both of them to share events from a single
+               poll() call. In order to avoid endless blocking, this
+               raises StopIteration if timeout is None and there are
+               no file descriptors to poll.
+               """
+               if not self._poll_event_queue:
+                       self._poll(timeout)
+               return self._poll_event_queue.pop()
+
+       def _poll_loop(self):
+
+               event_handlers = self._poll_event_handlers
+               event_handled = False
+
+               try:
+                       while event_handlers:
+                               f, event = self._next_poll_event()
+                               handler, reg_id = event_handlers[f]
+                               handler(f, event)
+                               event_handled = True
+               except StopIteration:
+                       event_handled = True
+
+               if not event_handled:
+                       raise AssertionError("tight loop")
+
+       def _schedule_yield(self):
+               """
+               Schedule for a short period of time chosen by the scheduler based
+               on internal state. Synchronous tasks should call this periodically
+               in order to allow the scheduler to service pending poll events. The
+               scheduler will call poll() exactly once, without blocking, and any
+               resulting poll events will be serviced.
+               """
+               event_handlers = self._poll_event_handlers
+               events_handled = 0
+
+               if not event_handlers:
+                       return bool(events_handled)
+
+               if not self._poll_event_queue:
+                       self._poll(0)
+
+               try:
+                       while event_handlers and self._poll_event_queue:
+                               f, event = self._next_poll_event()
+                               handler, reg_id = event_handlers[f]
+                               handler(f, event)
+                               events_handled += 1
+               except StopIteration:
+                       events_handled += 1
+
+               return bool(events_handled)
+
+       def _register(self, f, eventmask, handler):
+               """
+               @rtype: Integer
+               @return: A unique registration id, for use in schedule() or
+                       unregister() calls.
+               """
+               if f in self._poll_event_handlers:
+                       raise AssertionError("fd %d is already registered" % f)
+               self._event_handler_id += 1
+               reg_id = self._event_handler_id
+               self._poll_event_handler_ids[reg_id] = f
+               self._poll_event_handlers[f] = (handler, reg_id)
+               self._poll_obj.register(f, eventmask)
+               return reg_id
+
+       def _unregister(self, reg_id):
+               f = self._poll_event_handler_ids[reg_id]
+               self._poll_obj.unregister(f)
+               del self._poll_event_handlers[f]
+               del self._poll_event_handler_ids[reg_id]
+
+       def _schedule_wait(self, wait_ids):
+               """
+               Schedule until wait_id is not longer registered
+               for poll() events.
+               @type wait_id: int
+               @param wait_id: a task id to wait for
+               """
+               event_handlers = self._poll_event_handlers
+               handler_ids = self._poll_event_handler_ids
+               event_handled = False
+
+               if isinstance(wait_ids, int):
+                       wait_ids = frozenset([wait_ids])
+
+               try:
+                       while wait_ids.intersection(handler_ids):
+                               f, event = self._next_poll_event()
+                               handler, reg_id = event_handlers[f]
+                               handler(f, event)
+                               event_handled = True
+               except StopIteration:
+                       event_handled = True
+
+               return event_handled
+
+
+_can_poll_device = None
+
+def can_poll_device():
+       """
+       Test if it's possible to use poll() on a device such as a pty. This
+       is known to fail on Darwin.
+       @rtype: bool
+       @returns: True if poll() on a device succeeds, False otherwise.
+       """
+
+       global _can_poll_device
+       if _can_poll_device is not None:
+               return _can_poll_device
+
+       if not hasattr(select, "poll"):
+               _can_poll_device = False
+               return _can_poll_device
+
+       try:
+               dev_null = open('/dev/null', 'rb')
+       except IOError:
+               _can_poll_device = False
+               return _can_poll_device
+
+       p = select.poll()
+       p.register(dev_null.fileno(), PollConstants.POLLIN)
+
+       invalid_request = False
+       for f, event in p.poll():
+               if event & PollConstants.POLLNVAL:
+                       invalid_request = True
+                       break
+       dev_null.close()
+
+       _can_poll_device = not invalid_request
+       return _can_poll_device
+
+def create_poll_instance():
+       """
+       Create an instance of select.poll, or an instance of
+       PollSelectAdapter there is no poll() implementation or
+       it is broken somehow.
+       """
+       if can_poll_device():
+               return select.poll()
+       return PollSelectAdapter()
diff --git a/pym/_emerge/QueueScheduler.py b/pym/_emerge/QueueScheduler.py
new file mode 100644 (file)
index 0000000..f88c13c
--- /dev/null
@@ -0,0 +1,77 @@
+from _emerge.PollScheduler import PollScheduler
+
+class QueueScheduler(PollScheduler):
+
+       """
+       Add instances of SequentialTaskQueue and then call run(). The
+       run() method returns when no tasks remain.
+       """
+
+       def __init__(self, max_jobs=None, max_load=None):
+               PollScheduler.__init__(self)
+
+               if max_jobs is None:
+                       max_jobs = 1
+
+               self._max_jobs = max_jobs
+               self._max_load = max_load
+               self.sched_iface = self._sched_iface_class(
+                       register=self._register,
+                       schedule=self._schedule_wait,
+                       unregister=self._unregister)
+
+               self._queues = []
+               self._schedule_listeners = []
+
+       def add(self, q):
+               self._queues.append(q)
+
+       def remove(self, q):
+               self._queues.remove(q)
+
+       def run(self):
+
+               while self._schedule():
+                       self._poll_loop()
+
+               while self._running_job_count():
+                       self._poll_loop()
+
+       def _schedule_tasks(self):
+               """
+               @rtype: bool
+               @returns: True if there may be remaining tasks to schedule,
+                       False otherwise.
+               """
+               while self._can_add_job():
+                       n = self._max_jobs - self._running_job_count()
+                       if n < 1:
+                               break
+
+                       if not self._start_next_job(n):
+                               return False
+
+               for q in self._queues:
+                       if q:
+                               return True
+               return False
+
+       def _running_job_count(self):
+               job_count = 0
+               for q in self._queues:
+                       job_count += len(q.running_tasks)
+               self._jobs = job_count
+               return job_count
+
+       def _start_next_job(self, n=1):
+               started_count = 0
+               for q in self._queues:
+                       initial_job_count = len(q.running_tasks)
+                       q.schedule()
+                       final_job_count = len(q.running_tasks)
+                       if final_job_count > initial_job_count:
+                               started_count += (final_job_count - initial_job_count)
+                       if started_count >= n:
+                               break
+               return started_count
+
diff --git a/pym/_emerge/TaskScheduler.py b/pym/_emerge/TaskScheduler.py
new file mode 100644 (file)
index 0000000..564f130
--- /dev/null
@@ -0,0 +1,21 @@
+from _emerge.QueueScheduler import QueueScheduler
+from _emerge.SequentialTaskQueue import SequentialTaskQueue
+
+class TaskScheduler(object):
+
+       """
+       A simple way to handle scheduling of AsynchrousTask instances. Simply
+       add tasks and call run(). The run() method returns when no tasks remain.
+       """
+
+       def __init__(self, max_jobs=None, max_load=None):
+               self._queue = SequentialTaskQueue(max_jobs=max_jobs)
+               self._scheduler = QueueScheduler(
+                       max_jobs=max_jobs, max_load=max_load)
+               self.sched_iface = self._scheduler.sched_iface
+               self.run = self._scheduler.run
+               self._scheduler.add(self._queue)
+
+       def add(self, task):
+               self._queue.add(task)
+
index 3a3dac25e93d40abfb0bf02301cd2e987c5b8f8c..de5642da9b958244698c817f7e8eda061a918b62 100644 (file)
@@ -3,10 +3,8 @@
 # Distributed under the terms of the GNU General Public License v2
 # $Id$
 
-import formatter
 import logging
 import pwd
-import select
 import shlex
 import signal
 import sys
@@ -61,7 +59,6 @@ from _emerge.DepPriorityNormalRange import DepPriorityNormalRange
 from _emerge.DepPrioritySatisfiedRange import DepPrioritySatisfiedRange
 from _emerge.Task import Task
 from _emerge.Blocker import Blocker
-from _emerge.PollConstants import PollConstants
 from _emerge.AsynchronousTask import AsynchronousTask
 from _emerge.CompositeTask import CompositeTask
 from _emerge.EbuildFetcher import EbuildFetcher
@@ -80,13 +77,12 @@ from _emerge.BlockerCache import BlockerCache
 from _emerge.PackageVirtualDbapi import PackageVirtualDbapi
 from _emerge.RepoDisplay import RepoDisplay
 from _emerge.UseFlagDisplay import UseFlagDisplay
-from _emerge.PollSelectAdapter import PollSelectAdapter
 from _emerge.SequentialTaskQueue import SequentialTaskQueue
 from _emerge.ProgressHandler import ProgressHandler
 from _emerge.stdout_spinner import stdout_spinner
 from _emerge.UninstallFailure import UninstallFailure
 from _emerge.JobStatusDisplay import JobStatusDisplay
-from _emerge.getloadavg import getloadavg
+from _emerge.PollScheduler import PollScheduler
 
 def userquery(prompt, responses=None, colours=None):
        """Displays a prompt and a set of responses, then waits for a response
@@ -6445,341 +6441,6 @@ class PackageCounters(object):
                                        (self.blocks - self.blocks_satisfied))
                return "".join(myoutput)
 
-
-_can_poll_device = None
-
-def can_poll_device():
-       """
-       Test if it's possible to use poll() on a device such as a pty. This
-       is known to fail on Darwin.
-       @rtype: bool
-       @returns: True if poll() on a device succeeds, False otherwise.
-       """
-
-       global _can_poll_device
-       if _can_poll_device is not None:
-               return _can_poll_device
-
-       if not hasattr(select, "poll"):
-               _can_poll_device = False
-               return _can_poll_device
-
-       try:
-               dev_null = open('/dev/null', 'rb')
-       except IOError:
-               _can_poll_device = False
-               return _can_poll_device
-
-       p = select.poll()
-       p.register(dev_null.fileno(), PollConstants.POLLIN)
-
-       invalid_request = False
-       for f, event in p.poll():
-               if event & PollConstants.POLLNVAL:
-                       invalid_request = True
-                       break
-       dev_null.close()
-
-       _can_poll_device = not invalid_request
-       return _can_poll_device
-
-def create_poll_instance():
-       """
-       Create an instance of select.poll, or an instance of
-       PollSelectAdapter there is no poll() implementation or
-       it is broken somehow.
-       """
-       if can_poll_device():
-               return select.poll()
-       return PollSelectAdapter()
-
-class PollScheduler(object):
-
-       class _sched_iface_class(SlotObject):
-               __slots__ = ("register", "schedule", "unregister")
-
-       def __init__(self):
-               self._max_jobs = 1
-               self._max_load = None
-               self._jobs = 0
-               self._poll_event_queue = []
-               self._poll_event_handlers = {}
-               self._poll_event_handler_ids = {}
-               # Increment id for each new handler.
-               self._event_handler_id = 0
-               self._poll_obj = create_poll_instance()
-               self._scheduling = False
-
-       def _schedule(self):
-               """
-               Calls _schedule_tasks() and automatically returns early from
-               any recursive calls to this method that the _schedule_tasks()
-               call might trigger. This makes _schedule() safe to call from
-               inside exit listeners.
-               """
-               if self._scheduling:
-                       return False
-               self._scheduling = True
-               try:
-                       return self._schedule_tasks()
-               finally:
-                       self._scheduling = False
-
-       def _running_job_count(self):
-               return self._jobs
-
-       def _can_add_job(self):
-               max_jobs = self._max_jobs
-               max_load = self._max_load
-
-               if self._max_jobs is not True and \
-                       self._running_job_count() >= self._max_jobs:
-                       return False
-
-               if max_load is not None and \
-                       (max_jobs is True or max_jobs > 1) and \
-                       self._running_job_count() >= 1:
-                       try:
-                               avg1, avg5, avg15 = getloadavg()
-                       except OSError:
-                               return False
-
-                       if avg1 >= max_load:
-                               return False
-
-               return True
-
-       def _poll(self, timeout=None):
-               """
-               All poll() calls pass through here. The poll events
-               are added directly to self._poll_event_queue.
-               In order to avoid endless blocking, this raises
-               StopIteration if timeout is None and there are
-               no file descriptors to poll.
-               """
-               if not self._poll_event_handlers:
-                       self._schedule()
-                       if timeout is None and \
-                               not self._poll_event_handlers:
-                               raise StopIteration(
-                                       "timeout is None and there are no poll() event handlers")
-
-               # The following error is known to occur with Linux kernel versions
-               # less than 2.6.24:
-               #
-               #   select.error: (4, 'Interrupted system call')
-               #
-               # This error has been observed after a SIGSTOP, followed by SIGCONT.
-               # Treat it similar to EAGAIN if timeout is None, otherwise just return
-               # without any events.
-               while True:
-                       try:
-                               self._poll_event_queue.extend(self._poll_obj.poll(timeout))
-                               break
-                       except select.error, e:
-                               writemsg_level("\n!!! select error: %s\n" % (e,),
-                                       level=logging.ERROR, noiselevel=-1)
-                               del e
-                               if timeout is not None:
-                                       break
-
-       def _next_poll_event(self, timeout=None):
-               """
-               Since the _schedule_wait() loop is called by event
-               handlers from _poll_loop(), maintain a central event
-               queue for both of them to share events from a single
-               poll() call. In order to avoid endless blocking, this
-               raises StopIteration if timeout is None and there are
-               no file descriptors to poll.
-               """
-               if not self._poll_event_queue:
-                       self._poll(timeout)
-               return self._poll_event_queue.pop()
-
-       def _poll_loop(self):
-
-               event_handlers = self._poll_event_handlers
-               event_handled = False
-
-               try:
-                       while event_handlers:
-                               f, event = self._next_poll_event()
-                               handler, reg_id = event_handlers[f]
-                               handler(f, event)
-                               event_handled = True
-               except StopIteration:
-                       event_handled = True
-
-               if not event_handled:
-                       raise AssertionError("tight loop")
-
-       def _schedule_yield(self):
-               """
-               Schedule for a short period of time chosen by the scheduler based
-               on internal state. Synchronous tasks should call this periodically
-               in order to allow the scheduler to service pending poll events. The
-               scheduler will call poll() exactly once, without blocking, and any
-               resulting poll events will be serviced.
-               """
-               event_handlers = self._poll_event_handlers
-               events_handled = 0
-
-               if not event_handlers:
-                       return bool(events_handled)
-
-               if not self._poll_event_queue:
-                       self._poll(0)
-
-               try:
-                       while event_handlers and self._poll_event_queue:
-                               f, event = self._next_poll_event()
-                               handler, reg_id = event_handlers[f]
-                               handler(f, event)
-                               events_handled += 1
-               except StopIteration:
-                       events_handled += 1
-
-               return bool(events_handled)
-
-       def _register(self, f, eventmask, handler):
-               """
-               @rtype: Integer
-               @return: A unique registration id, for use in schedule() or
-                       unregister() calls.
-               """
-               if f in self._poll_event_handlers:
-                       raise AssertionError("fd %d is already registered" % f)
-               self._event_handler_id += 1
-               reg_id = self._event_handler_id
-               self._poll_event_handler_ids[reg_id] = f
-               self._poll_event_handlers[f] = (handler, reg_id)
-               self._poll_obj.register(f, eventmask)
-               return reg_id
-
-       def _unregister(self, reg_id):
-               f = self._poll_event_handler_ids[reg_id]
-               self._poll_obj.unregister(f)
-               del self._poll_event_handlers[f]
-               del self._poll_event_handler_ids[reg_id]
-
-       def _schedule_wait(self, wait_ids):
-               """
-               Schedule until wait_id is not longer registered
-               for poll() events.
-               @type wait_id: int
-               @param wait_id: a task id to wait for
-               """
-               event_handlers = self._poll_event_handlers
-               handler_ids = self._poll_event_handler_ids
-               event_handled = False
-
-               if isinstance(wait_ids, int):
-                       wait_ids = frozenset([wait_ids])
-
-               try:
-                       while wait_ids.intersection(handler_ids):
-                               f, event = self._next_poll_event()
-                               handler, reg_id = event_handlers[f]
-                               handler(f, event)
-                               event_handled = True
-               except StopIteration:
-                       event_handled = True
-
-               return event_handled
-
-class QueueScheduler(PollScheduler):
-
-       """
-       Add instances of SequentialTaskQueue and then call run(). The
-       run() method returns when no tasks remain.
-       """
-
-       def __init__(self, max_jobs=None, max_load=None):
-               PollScheduler.__init__(self)
-
-               if max_jobs is None:
-                       max_jobs = 1
-
-               self._max_jobs = max_jobs
-               self._max_load = max_load
-               self.sched_iface = self._sched_iface_class(
-                       register=self._register,
-                       schedule=self._schedule_wait,
-                       unregister=self._unregister)
-
-               self._queues = []
-               self._schedule_listeners = []
-
-       def add(self, q):
-               self._queues.append(q)
-
-       def remove(self, q):
-               self._queues.remove(q)
-
-       def run(self):
-
-               while self._schedule():
-                       self._poll_loop()
-
-               while self._running_job_count():
-                       self._poll_loop()
-
-       def _schedule_tasks(self):
-               """
-               @rtype: bool
-               @returns: True if there may be remaining tasks to schedule,
-                       False otherwise.
-               """
-               while self._can_add_job():
-                       n = self._max_jobs - self._running_job_count()
-                       if n < 1:
-                               break
-
-                       if not self._start_next_job(n):
-                               return False
-
-               for q in self._queues:
-                       if q:
-                               return True
-               return False
-
-       def _running_job_count(self):
-               job_count = 0
-               for q in self._queues:
-                       job_count += len(q.running_tasks)
-               self._jobs = job_count
-               return job_count
-
-       def _start_next_job(self, n=1):
-               started_count = 0
-               for q in self._queues:
-                       initial_job_count = len(q.running_tasks)
-                       q.schedule()
-                       final_job_count = len(q.running_tasks)
-                       if final_job_count > initial_job_count:
-                               started_count += (final_job_count - initial_job_count)
-                       if started_count >= n:
-                               break
-               return started_count
-
-class TaskScheduler(object):
-
-       """
-       A simple way to handle scheduling of AsynchrousTask instances. Simply
-       add tasks and call run(). The run() method returns when no tasks remain.
-       """
-
-       def __init__(self, max_jobs=None, max_load=None):
-               self._queue = SequentialTaskQueue(max_jobs=max_jobs)
-               self._scheduler = QueueScheduler(
-                       max_jobs=max_jobs, max_load=max_load)
-               self.sched_iface = self._scheduler.sched_iface
-               self.run = self._scheduler.run
-               self._scheduler.add(self._queue)
-
-       def add(self, task):
-               self._queue.add(task)
-
 class Scheduler(PollScheduler):
 
        _opts_ignore_blockers = \
index f5669780fa745f3349da4c8e30f816ae578026bd..3f4a597b4463cfeae26de4c49aacc2d7bac6fc03 100644 (file)
@@ -8,7 +8,7 @@ import termios
 import portage
 from portage.output import get_term_size, set_term_size
 from portage.tests import TestCase
-from _emerge import TaskScheduler
+from _emerge.TaskScheduler import TaskScheduler
 from _emerge.PipeReader import PipeReader
 from _emerge.SpawnProcess import SpawnProcess