self._regen = MetadataRegen(portdb, cp_iter=cp_iter,
consumer=self._metadata_callback,
max_jobs=max_jobs, max_load=max_load,
- write_auxdb=write_auxdb)
+ write_auxdb=write_auxdb, main=True)
self.returncode = os.EX_OK
conf = portdb.repositories.get_repo_for_location(tree)
self._trg_caches = tuple(conf.iter_pregenerated_caches(
earlier_sigterm_handler = signal.signal(signal.SIGTERM, sighandler)
try:
- self._regen.run()
+ self._regen.start()
+ self._regen.wait()
finally:
# Restore previous handlers
if earlier_sigint_handler is not None:
from portage import os
from portage.dep import _repo_separator
from _emerge.EbuildMetadataPhase import EbuildMetadataPhase
-from _emerge.PollScheduler import PollScheduler
+from portage.cache.cache_errors import CacheError
+from portage.util._async.AsyncScheduler import AsyncScheduler
-class MetadataRegen(PollScheduler):
+class MetadataRegen(AsyncScheduler):
def __init__(self, portdb, cp_iter=None, consumer=None,
- max_jobs=None, max_load=None, write_auxdb=True):
- PollScheduler.__init__(self, main=True)
+ write_auxdb=True, **kwargs):
+ AsyncScheduler.__init__(self, **kwargs)
self._portdb = portdb
self._write_auxdb = write_auxdb
self._global_cleanse = False
self._cp_iter = cp_iter
self._consumer = consumer
- if max_jobs is None:
- max_jobs = 1
-
- self._max_jobs = max_jobs
- self._max_load = max_load
-
self._valid_pkgs = set()
self._cp_set = set()
self._process_iter = self._iter_metadata_processes()
- self.returncode = os.EX_OK
- self._error_count = 0
self._running_tasks = set()
- self._remaining_tasks = True
- def _terminate_tasks(self):
- for task in list(self._running_tasks):
- task.cancel()
+ def _next_task(self):
+ return next(self._process_iter)
def _iter_every_cp(self):
portage.writemsg_stdout("Listing available packages...\n")
settings=portdb.doebuild_settings,
write_auxdb=self._write_auxdb)
- def _keep_scheduling(self):
- return self._remaining_tasks and not self._terminated_tasks
+ def _wait(self):
- def _running_job_count(self):
- return len(self._running_tasks)
-
- def run(self):
+ AsyncScheduler._wait(self)
portdb = self._portdb
- from portage.cache.cache_errors import CacheError
dead_nodes = {}
- self._main_loop()
-
if self._terminated_tasks:
- self.returncode = 1
- return
+ self.returncode = self._cancelled_returncode
+ return self.returncode
if self._global_cleanse:
for mytree in portdb.porttrees:
except (KeyError, CacheError):
pass
- def _schedule_tasks(self):
- if self._terminated_tasks:
- return
-
- while self._can_add_job():
- try:
- metadata_process = next(self._process_iter)
- except StopIteration:
- self._remaining_tasks = False
- return
-
- self._running_tasks.add(metadata_process)
- metadata_process.scheduler = self.sched_iface
- metadata_process.addExitListener(self._metadata_exit)
- metadata_process.start()
-
- def _metadata_exit(self, metadata_process):
- self._running_tasks.discard(metadata_process)
+ return self.returncode
+
+ def _task_exit(self, metadata_process):
+
if metadata_process.returncode != os.EX_OK:
- self.returncode = 1
- self._error_count += 1
self._valid_pkgs.discard(metadata_process.cpv)
if not self._terminated_tasks:
portage.writemsg("Error processing %s, continuing...\n" % \
metadata_process.ebuild_hash,
metadata_process.eapi_supported)
- self._schedule()
-
+ AsyncScheduler._task_exit(self, metadata_process)