Add emirrordist, a tool for mirroring distfiles.
authorZac Medico <zmedico@gentoo.org>
Wed, 9 Jan 2013 14:38:16 +0000 (06:38 -0800)
committerZac Medico <zmedico@gentoo.org>
Wed, 9 Jan 2013 17:09:44 +0000 (09:09 -0800)
Special thanks to Brian Harring, author of the mirror-dist program from
which emirrordist is derived.

12 files changed:
bin/emirrordist [new file with mode: 0755]
man/emirrordist.1 [new file with mode: 0644]
pym/portage/_emirrordist/Config.py [new file with mode: 0644]
pym/portage/_emirrordist/DeletionIterator.py [new file with mode: 0644]
pym/portage/_emirrordist/DeletionTask.py [new file with mode: 0644]
pym/portage/_emirrordist/FetchIterator.py [new file with mode: 0644]
pym/portage/_emirrordist/FetchTask.py [new file with mode: 0644]
pym/portage/_emirrordist/MirrorDistTask.py [new file with mode: 0644]
pym/portage/_emirrordist/__init__.py [new file with mode: 0644]
pym/portage/_emirrordist/main.py [new file with mode: 0644]
pym/portage/util/_ShelveUnicodeWrapper.py [new file with mode: 0644]
pym/portage/util/_async/FileCopier.py [new file with mode: 0644]

diff --git a/bin/emirrordist b/bin/emirrordist
new file mode 100755 (executable)
index 0000000..8d93de9
--- /dev/null
@@ -0,0 +1,13 @@
+#!/usr/bin/python
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import sys
+
+import portage
+portage._internal_caller = True
+portage._disable_legacy_globals()
+from portage._emirrordist.main import emirrordist_main
+
+if __name__ == "__main__":
+       sys.exit(emirrordist_main(sys.argv[1:]))
diff --git a/man/emirrordist.1 b/man/emirrordist.1
new file mode 100644 (file)
index 0000000..6d02077
--- /dev/null
@@ -0,0 +1,143 @@
+.TH "EMIRRORDIST" "1" "Jan 2013" "Portage VERSION" "Portage"
+.SH "NAME"
+emirrordist \- a fetch tool for mirroring of package distfiles
+.SH SYNOPSIS
+.B emirrordist
+[\fIoptions\fR] \fI<action>\fR
+.SH ACTIONS
+.TP
+\fB\-h\fR, \fB\-\-help\fR
+Show a help message and exit.
+.TP
+\fB\-\-version\fR
+Display portage version and exit.
+.TP
+\fB\-\-mirror\fR
+Mirror distfiles for the selected repository.
+.SH OPTIONS
+.TP
+\fB\-\-dry\-run\fR
+Perform a trial run with no changes made (typically combined
+with \fI\-v\fR or \fI\-vv\fR).
+.TP
+\fB\-v\fR, \fB\-\-verbose\fR
+Display extra information on stderr (multiple occurences
+increase verbosity).
+.TP
+\fB\-\-ignore\-default\-opts\fR
+Do not use the \fIEMIRRORDIST_DEFAULT_OPTS\fR environment
+variable.
+.TP
+\fB\-\-distfiles\fR=\fIDIR\fR
+Distfiles directory to use (required).
+.TP
+\fB\-j\fR JOBS, \fB\-\-jobs\fR=\fIJOBS\fR
+Number of concurrent jobs to run.
+.TP
+\fB\-l\fR LOAD, \fB\-\-load\-average\fR=\fILOAD\fR
+Load average limit for spawning of new concurrent jobs.
+.TP
+\fB\-\-tries\fR=\fITRIES\fR
+Maximum number of tries per file, 0 means unlimited
+(default is 10).
+.TP
+\fB\-\-repo\fR=\fIREPO\fR
+Name of repo to operate on (default repo is located at
+$PORTDIR).
+.TP
+\fB\-\-config\-root\fR=\fIDIR\fR
+Location of portage config files.
+.TP
+\fB\-\-portdir\fR=\fIDIR\fR
+Override the portage tree location.
+.TP
+\fB\-\-portdir\-overlay\fR=\fIPORTDIR_OVERLAY\fR
+Override the PORTDIR_OVERLAY variable (requires that
+\fI\-\-repo\fR is also specified).
+.TP
+\fB\-\-strict\-manifests=\fR<y|n>
+Manually override "strict" FEATURES setting.
+.TP
+\fB\-\-failure\-log\fR=\fIFILE\fR
+Log file for fetch failures, with tab\-delimited output, for
+reporting purposes. Opened in append mode.
+.TP
+\fB\-\-success\-log\fR=\fIFILE\fR
+Log file for fetch successes, with tab\-delimited output, for
+reporting purposes. Opened in append mode.
+.TP
+\fB\-\-scheduled\-deletion\-log\fR=\fIFILE\fR
+Log file for scheduled deletions, with tab\-delimited output, for
+reporting purposes. Overwritten with each run.
+.TP
+\fB\-\-delete\fR
+Enable deletion of unused distfiles.
+.TP
+\fB\-\-deletion\-db\fR=\fIFILE\fR
+Database file used to track lifetime of files scheduled for
+delayed deletion.
+.TP
+\fB\-\-deletion\-delay\fR=\fISECONDS\fR
+Delay time for deletion of unused distfiles, measured in seconds.
+.TP
+\fB\-\-temp\-dir\fR=\fIDIR\fR
+Temporary directory for downloads.
+.TP
+\fB\-\-mirror\-overrides\fR=\fIFILE\fR
+File holding a list of mirror overrides.
+.TP
+\fB\-\-mirror\-skip\fR=\fIMIRROR_SKIP\fR
+Comma delimited list of mirror targets to skip when
+fetching.
+.TP
+\fB\-\-restrict\-mirror\-exemptions\fR=\fIRESTRICT_MIRROR_EXEMPTIONS\fR
+Comma delimited list of mirror targets for which to ignore
+RESTRICT="mirror" (see \fBebuild\fR(5)).
+.TP
+\fB\-\-verify\-existing\-digest\fR
+Use digest as a verification of whether existing
+distfiles are valid.
+.TP
+\fB\-\-distfiles\-local\fR=\fIDIR\fR
+The distfiles\-local directory to use.
+.TP
+\fB\-\-distfiles\-db\fR=\fIFILE\fR
+Database file used to track which ebuilds a distfile belongs to.
+.TP
+\fB\-\-recycle\-dir\fR=\fIDIR\fR
+Directory for extended retention of files that are removed from
+distdir with the \-\-delete option. These files may be be recycled if
+they are needed again, instead of downloading them again.
+.TP
+\fB\-\-recycle\-db\fR=\fIFILE\fR
+Database file used to track lifetime of files in recycle dir.
+.TP
+\fB\-\-recycle\-deletion\-delay\fR=\fISECONDS\fR
+Delay time for deletion of unused files from recycle dir,
+measured in seconds (defaults to the equivalent of 60 days).
+.TP
+\fB\-\-fetch\-log\-dir\fR=\fIDIR\fR
+Directory for individual fetch logs.
+.TP
+\fB\-\-whitelist\-from\fR=\fIFILE\fR
+Specifies a file containing a list of files to whitelist, one per line,
+# prefixed lines ignored. Use this option multiple times in order to
+specify multiple whitelists.
+.SH "REPORTING BUGS"
+Please report bugs via http://bugs.gentoo.org/
+.SH "THANKS"
+Special thanks to Brian Harring, author of the mirror\-dist program from
+which emirrordist is derived.
+.SH "AUTHORS"
+.nf
+Zac Medico <zmedico@gentoo.org>
+.fi
+.SH "FILES"
+.TP
+.B /etc/portage/make.conf
+Contains variables.
+.SH "SEE ALSO"
+.BR ebuild (5),
+.BR egencache (1),
+.BR make.conf (5),
+.BR portage (5)
diff --git a/pym/portage/_emirrordist/Config.py b/pym/portage/_emirrordist/Config.py
new file mode 100644 (file)
index 0000000..db4bfeb
--- /dev/null
@@ -0,0 +1,132 @@
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import copy
+import io
+import logging
+import shelve
+import sys
+import time
+
+import portage
+from portage import os
+from portage.util import grabdict, grablines
+from portage.util._ShelveUnicodeWrapper import ShelveUnicodeWrapper
+
+class Config(object):
+       def __init__(self, options, portdb, event_loop):
+               self.options = options
+               self.portdb = portdb
+               self.event_loop = event_loop
+               self.added_byte_count = 0
+               self.added_file_count = 0
+               self.scheduled_deletion_count = 0
+               self.delete_count = 0
+               self.file_owners = {}
+               self.file_failures = {}
+               self.start_time = time.time()
+               self._open_files = []
+
+               self.log_success = self._open_log('success', options.success_log, 'a')
+               self.log_failure = self._open_log('failure', options.failure_log, 'a')
+
+               self.distfiles = None
+               if options.distfiles is not None:
+                       self.distfiles = options.distfiles
+
+               self.mirrors = copy.copy(portdb.settings.thirdpartymirrors())
+
+               if options.mirror_overrides is not None:
+                       self.mirrors.update(grabdict(options.mirror_overrides))
+
+               if options.mirror_skip is not None:
+                       for x in options.mirror_skip.split(","):
+                               self.mirrors[x] = []
+
+               self.whitelist = None
+               if options.whitelist_from is not None:
+                       self.whitelist = set()
+                       for filename in options.whitelist_from:
+                               for line in grablines(filename):
+                                       line = line.strip()
+                                       if line and not line.startswith("#"):
+                                               self.whitelist.add(line)
+
+               self.restrict_mirror_exemptions = None
+               if options.restrict_mirror_exemptions is not None:
+                       self.restrict_mirror_exemptions = frozenset(
+                               options.restrict_mirror_exemptions.split(","))
+
+               self.recycle_db = None
+               if options.recycle_db is not None:
+                       self.recycle_db = self._open_shelve(
+                               options.recycle_db, 'recycle')
+
+               self.distfiles_db = None
+               if options.distfiles_db is not None:
+                       self.distfiles_db = self._open_shelve(
+                               options.distfiles_db, 'distfiles')
+
+               self.deletion_db = None
+               if options.deletion_db is not None:
+                       self.deletion_db = self._open_shelve(
+                               options.deletion_db, 'deletion')
+
+       def _open_log(self, log_desc, log_path, mode):
+
+               if log_path is None or self.options.dry_run:
+                       log_func = logging.info
+                       line_format = "%s: %%s" % log_desc
+                       add_newline = False
+                       if log_path is not None:
+                               logging.warn(("dry-run: %s log "
+                                       "redirected to logging.info") % log_desc)
+               else:
+                       self._open_files.append(io.open(log_path, mode=mode,
+                               encoding='utf_8'))
+                       line_format = "%s\n"
+                       log_func = self._open_files[-1].write
+
+               return self._LogFormatter(line_format, log_func)
+
+       class _LogFormatter(object):
+
+               __slots__ = ('_line_format', '_log_func')
+
+               def __init__(self, line_format, log_func):
+                       self._line_format = line_format
+                       self._log_func = log_func
+
+               def __call__(self, msg):
+                       self._log_func(self._line_format % (msg,))
+
+       def _open_shelve(self, db_file, db_desc):
+               if self.options.dry_run:
+                       open_flag = "r"
+               else:
+                       open_flag = "c"
+
+               if self.options.dry_run and not os.path.exists(db_file):
+                       db = {}
+               else:
+                       db = shelve.open(db_file, flag=open_flag)
+                       if sys.hexversion < 0x3000000:
+                               db = ShelveUnicodeWrapper(db)
+
+               if self.options.dry_run:
+                       logging.warn("dry-run: %s db opened in readonly mode" % db_desc)
+                       if not isinstance(db, dict):
+                               volatile_db = dict((k, db[k]) for k in db)
+                               db.close()
+                               db = volatile_db
+               else:
+                       self._open_files.append(db)
+
+               return db
+
+       def __enter__(self):
+               return self
+
+       def __exit__(self, exc_type, exc_value, traceback):
+               while self._open_files:
+                       self._open_files.pop().close()
diff --git a/pym/portage/_emirrordist/DeletionIterator.py b/pym/portage/_emirrordist/DeletionIterator.py
new file mode 100644 (file)
index 0000000..dff52c0
--- /dev/null
@@ -0,0 +1,83 @@
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import logging
+import stat
+
+from portage import os
+from .DeletionTask import DeletionTask
+
+class DeletionIterator(object):
+
+       def __init__(self, config):
+               self._config = config
+
+       def __iter__(self):
+               distdir = self._config.options.distfiles
+               file_owners = self._config.file_owners
+               whitelist = self._config.whitelist
+               distfiles_local = self._config.options.distfiles_local
+               deletion_db = self._config.deletion_db
+               deletion_delay = self._config.options.deletion_delay
+               start_time = self._config.start_time
+               distfiles_set = set(os.listdir(self._config.options.distfiles))
+               for filename in distfiles_set:
+                       try:
+                               st = os.stat(os.path.join(distdir, filename))
+                       except OSError as e:
+                               logging.error("stat failed on '%s' in distfiles: %s\n" %
+                                       (filename, e))
+                               continue
+                       if not stat.S_ISREG(st.st_mode):
+                               continue
+                       elif filename in file_owners:
+                               if deletion_db is not None:
+                                       try:
+                                               del deletion_db[filename]
+                                       except KeyError:
+                                               pass
+                       elif whitelist is not None and filename in whitelist:
+                               if deletion_db is not None:
+                                       try:
+                                               del deletion_db[filename]
+                                       except KeyError:
+                                               pass
+                       elif distfiles_local is not None and \
+                               os.path.exists(os.path.join(distfiles_local, filename)):
+                               if deletion_db is not None:
+                                       try:
+                                               del deletion_db[filename]
+                                       except KeyError:
+                                               pass
+                       else:
+                               self._config.scheduled_deletion_count += 1
+
+                               if deletion_db is None or deletion_delay is None:
+
+                                       yield DeletionTask(background=True,
+                                               distfile=filename,
+                                               config=self._config)
+
+                               else:
+                                       deletion_entry = deletion_db.get(filename)
+
+                                       if deletion_entry is None:
+                                               logging.debug("add '%s' to deletion db" % filename)
+                                               deletion_db[filename] = start_time
+
+                                       elif deletion_entry + deletion_delay <= start_time:
+
+                                               yield DeletionTask(background=True,
+                                                       distfile=filename,
+                                                       config=self._config)
+
+               if deletion_db is not None:
+                       for filename in list(deletion_db):
+                               if filename not in distfiles_set:
+                                       try:
+                                               del deletion_db[filename]
+                                       except KeyError:
+                                               pass
+                                       else:
+                                               logging.debug("drop '%s' from deletion db" %
+                                                       filename)
diff --git a/pym/portage/_emirrordist/DeletionTask.py b/pym/portage/_emirrordist/DeletionTask.py
new file mode 100644 (file)
index 0000000..7d10957
--- /dev/null
@@ -0,0 +1,129 @@
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import errno
+import logging
+
+from portage import os
+from portage.util._async.FileCopier import FileCopier
+from _emerge.CompositeTask import CompositeTask
+
+class DeletionTask(CompositeTask):
+
+       __slots__ = ('distfile', 'config')
+
+       def _start(self):
+
+               distfile_path = os.path.join(
+                       self.config.options.distfiles, self.distfile)
+
+               if self.config.options.recycle_dir is not None:
+                       distfile_path = os.path.join(self.config.options.distfiles, self.distfile)
+                       recycle_path = os.path.join(
+                               self.config.options.recycle_dir, self.distfile)
+                       if self.config.options.dry_run:
+                               logging.info(("dry-run: move '%s' from "
+                                       "distfiles to recycle") % self.distfile)
+                       else:
+                               logging.debug(("move '%s' from "
+                                       "distfiles to recycle") % self.distfile)
+                               try:
+                                       os.rename(distfile_path, recycle_path)
+                               except OSError as e:
+                                       if e.errno != errno.EXDEV:
+                                               logging.error(("rename %s from distfiles to "
+                                                       "recycle failed: %s") % (self.distfile, e))
+                               else:
+                                       self.returncode = os.EX_OK
+                                       self._async_wait()
+                                       return
+
+                               self._start_task(
+                                       FileCopier(src_path=distfile_path,
+                                               dest_path=recycle_path,
+                                               background=False),
+                                       self._recycle_copier_exit)
+                               return
+
+               success = True
+
+               if self.config.options.dry_run:
+                       logging.info(("dry-run: delete '%s' from "
+                               "distfiles") % self.distfile)
+               else:
+                       logging.debug(("delete '%s' from "
+                               "distfiles") % self.distfile)
+                       try:
+                               os.unlink(distfile_path)
+                       except OSError as e:
+                               if e.errno not in (errno.ENOENT, errno.ESTALE):
+                                       logging.error("%s unlink failed in distfiles: %s" %
+                                               (self.distfile, e))
+                                       success = False
+
+               if success:
+                       self._success()
+                       self.returncode = os.EX_OK
+               else:
+                       self.returncode = 1
+
+               self._async_wait()
+
+       def _recycle_copier_exit(self, copier):
+
+               self._assert_current(copier)
+               if self._was_cancelled():
+                       self.wait()
+                       return
+
+               success = True
+               if copier.returncode == os.EX_OK:
+
+                       try:
+                               os.unlink(copier.src_path)
+                       except OSError as e:
+                               if e.errno not in (errno.ENOENT, errno.ESTALE):
+                                       logging.error("%s unlink failed in distfiles: %s" %
+                                               (self.distfile, e))
+                                       success = False
+
+               else:
+                       logging.error(("%s copy from distfiles "
+                               "to recycle failed: %s") % (self.distfile, e))
+                       success = False
+
+               if success:
+                       self._success()
+                       self.returncode = os.EX_OK
+               else:
+                       self.returncode = 1
+
+               self._current_task = None
+               self.wait()
+
+       def _success(self):
+
+               cpv = "unknown"
+               if self.config.distfiles_db is not None:
+                       cpv = self.config.distfiles_db.get(self.distfile, cpv)
+
+               self.config.delete_count += 1
+               self.config.log_success("%s\t%s\tremoved" % (cpv, self.distfile))
+
+               if self.config.distfiles_db is not None:
+                       try:
+                               del self.config.distfiles_db[self.distfile]
+                       except KeyError:
+                               pass
+                       else:
+                               logging.debug(("drop '%s' from "
+                                       "distfiles db") % self.distfile)
+
+               if self.config.deletion_db is not None:
+                       try:
+                               del self.config.deletion_db[self.distfile]
+                       except KeyError:
+                               pass
+                       else:
+                               logging.debug(("drop '%s' from "
+                                       "deletion db") % self.distfile)
diff --git a/pym/portage/_emirrordist/FetchIterator.py b/pym/portage/_emirrordist/FetchIterator.py
new file mode 100644 (file)
index 0000000..cc5d4be
--- /dev/null
@@ -0,0 +1,132 @@
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from portage import os
+from portage.dep import use_reduce
+from portage.exception import PortageException
+from portage.manifest import Manifest
+from .FetchTask import FetchTask
+
+class FetchIterator(object):
+
+       def __init__(self, config):
+               self._config = config
+               self._log_failure = config.log_failure
+
+       def _iter_every_cp(self):
+               # List categories individually, in order to start yielding quicker,
+               # and in order to reduce latency in case of a signal interrupt.
+               cp_all = self._config.portdb.cp_all
+               for category in sorted(self._config.portdb.categories):
+                       for cp in cp_all(categories=(category,)):
+                               yield cp
+
+       def __iter__(self):
+
+               portdb = self._config.portdb
+               file_owners = self._config.file_owners
+               file_failures = self._config.file_failures
+               restrict_mirror_exemptions = self._config.restrict_mirror_exemptions
+
+               for cp in self._iter_every_cp():
+
+                       for tree in portdb.porttrees:
+
+                               # Reset state so the Manifest is pulled once
+                               # for this cp / tree combination.
+                               digests = None
+
+                               for cpv in portdb.cp_list(cp, mytree=tree):
+
+                                       try:
+                                               restrict, = portdb.aux_get(cpv, ("RESTRICT",),
+                                                       mytree=tree)
+                                       except (KeyError, PortageException) as e:
+                                               self._log_failure("%s\t\taux_get exception %s" %
+                                                       (cpv, e))
+                                               continue
+
+                                       # Here we use matchnone=True to ignore conditional parts
+                                       # of RESTRICT since they don't apply unconditionally.
+                                       # Assume such conditionals only apply on the client side.
+                                       try:
+                                               restrict = frozenset(use_reduce(restrict,
+                                                       flat=True, matchnone=True))
+                                       except PortageException as e:
+                                               self._log_failure("%s\t\tuse_reduce exception %s" %
+                                                       (cpv, e))
+                                               continue
+
+                                       if "fetch" in restrict:
+                                               continue
+
+                                       try:
+                                               uri_map = portdb.getFetchMap(cpv)
+                                       except PortageException as e:
+                                               self._log_failure("%s\t\tgetFetchMap exception %s" %
+                                                       (cpv, e))
+                                               continue
+
+                                       if not uri_map:
+                                               continue
+
+                                       if "mirror" in restrict:
+                                               skip = False
+                                               if restrict_mirror_exemptions is not None:
+                                                       new_uri_map = {}
+                                                       for filename, uri_tuple in uri_map.items():
+                                                               for uri in uri_tuple:
+                                                                       if uri[:9] == "mirror://":
+                                                                               i = uri.find("/", 9)
+                                                                               if i != -1 and uri[9:i].strip("/") in \
+                                                                                       restrict_mirror_exemptions:
+                                                                                       new_uri_map[filename] = uri_tuple
+                                                                                       break
+                                                       if new_uri_map:
+                                                               uri_map = new_uri_map
+                                                       else:
+                                                               skip = True
+                                               else:
+                                                       skip = True
+
+                                               if skip:
+                                                       continue
+
+                                       # Parse Manifest for this cp if we haven't yet.
+                                       if digests is None:
+                                               try:
+                                                       digests = Manifest(os.path.join(
+                                                               tree, cp)).getTypeDigests("DIST")
+                                               except (EnvironmentError, PortageException) as e:
+                                                       for filename in uri_map:
+                                                               self._log_failure(
+                                                                       "%s\t%s\tManifest exception %s" %
+                                                                       (cpv, filename, e))
+                                                               file_failures[filename] = cpv
+                                                       continue
+
+                                       if not digests:
+                                               for filename in uri_map:
+                                                       self._log_failure("%s\t%s\tdigest entry missing" %
+                                                               (cpv, filename))
+                                                       file_failures[filename] = cpv
+                                               continue
+
+                                       for filename, uri_tuple in uri_map.items():
+                                               file_digests = digests.get(filename)
+                                               if file_digests is None:
+                                                       self._log_failure("%s\t%s\tdigest entry missing" %
+                                                               (cpv, filename))
+                                                       file_failures[filename] = cpv
+                                                       continue
+                                               if filename in file_owners:
+                                                       continue
+                                               file_owners[filename] = cpv
+
+                                               yield FetchTask(cpv=cpv,
+                                                       background=True,
+                                                       digests=file_digests,
+                                                       distfile=filename,
+                                                       restrict=restrict,
+                                                       uri_tuple=uri_tuple,
+                                                       config=self._config)
diff --git a/pym/portage/_emirrordist/FetchTask.py b/pym/portage/_emirrordist/FetchTask.py
new file mode 100644 (file)
index 0000000..65fd672
--- /dev/null
@@ -0,0 +1,615 @@
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import collections
+import errno
+import logging
+import stat
+import subprocess
+import sys
+
+import portage
+from portage import _encodings, _unicode_encode
+from portage import os
+from portage.exception import PortageException
+from portage.util._async.FileCopier import FileCopier
+from portage.util._async.FileDigester import FileDigester
+from portage.util._async.PipeLogger import PipeLogger
+from portage.util._async.PopenProcess import PopenProcess
+from _emerge.CompositeTask import CompositeTask
+
+default_hash_name = portage.const.MANIFEST2_REQUIRED_HASH
+
+default_fetchcommand = "wget -c -v -t 1 --passive-ftp --timeout=60 -O \"${DISTDIR}/${FILE}\" \"${URI}\""
+
+class FetchTask(CompositeTask):
+
+       __slots__ = ('distfile', 'digests', 'config', 'cpv',
+               'restrict', 'uri_tuple', '_current_mirror',
+               '_current_stat', '_fetch_tmp_dir_info', '_fetch_tmp_file',
+               '_fs_mirror_stack', '_mirror_stack',
+               '_previously_added',
+               '_primaryuri_stack', '_log_path', '_tried_uris')
+
+       def _start(self):
+
+               if self.config.options.fetch_log_dir is not None and \
+                       not self.config.options.dry_run:
+                       self._log_path = os.path.join(
+                               self.config.options.fetch_log_dir,
+                               self.distfile + '.log')
+
+               self._previously_added = True
+               if self.config.distfiles_db is not None and \
+                       self.distfile not in self.config.distfiles_db:
+                       self._previously_added = False
+                       self.config.distfiles_db[self.distfile] = self.cpv
+
+               if not self._have_needed_digests():
+                       msg = "incomplete digests: %s" % " ".join(self.digests)
+                       self.scheduler.output(msg, background=self.background,
+                               log_path=self._log_path)
+                       self.config.log_failure("%s\t%s\t%s" %
+                               (self.cpv, self.distfile, msg))
+                       self.config.file_failures[self.distfile] = self.cpv
+                       self.returncode = os.EX_OK
+                       self._async_wait()
+                       return
+
+               distfile_path = os.path.join(
+                       self.config.options.distfiles, self.distfile)
+
+               st = None
+               size_ok = False
+               try:
+                       st = os.stat(distfile_path)
+               except OSError as e:
+                       if e.errno not in (errno.ENOENT, errno.ESTALE):
+                               msg = "%s stat failed in %s: %s" % \
+                                       (self.distfile, "distfiles", e)
+                               self.scheduler.output(msg + '\n', background=True,
+                                       log_path=self._log_path)
+                               logging.error(msg)
+               else:
+                       size_ok = st.st_size == self.digests["size"]
+
+               if not size_ok:
+                       if self.config.options.dry_run:
+                               if st is not None:
+                                       logging.info(("dry-run: delete '%s' with "
+                                               "wrong size from distfiles") % (self.distfile,))
+                       else:
+                               # Do the unlink in order to ensure that the path is clear,
+                               # even if stat raised ENOENT, since a broken symlink can
+                               # trigger ENOENT.
+                               if self._unlink_file(distfile_path, "distfiles"):
+                                       if st is not None:
+                                               logging.debug(("delete '%s' with "
+                                                       "wrong size from distfiles") % (self.distfile,))
+                               else:
+                                       self.config.log_failure("%s\t%s\t%s" %
+                                               (self.cpv, self.distfile, "unlink failed in distfiles"))
+                                       self.returncode = os.EX_OK
+                                       self._async_wait()
+                                       return
+
+               if size_ok:
+                       if self.config.options.verify_existing_digest:
+                               self._start_task(
+                                       FileDigester(file_path=distfile_path,
+                                               hash_names=(self._select_hash(),),
+                                               background=self.background,
+                                               logfile=self._log_path), self._distfiles_digester_exit)
+                               return
+
+                       self._success()
+                       self.returncode = os.EX_OK
+                       self._async_wait()
+                       return
+
+               self._start_fetch()
+
+       def _success(self):
+               if not self._previously_added:
+                       size = self.digests["size"]
+                       self.config.added_byte_count += size
+                       self.config.added_file_count += 1
+                       self.config.log_success("%s\t%s\tadded %i bytes" %
+                               (self.cpv, self.distfile, size))
+
+               if self._log_path is not None:
+                       if not self.config.options.dry_run:
+                               try:
+                                       os.unlink(self._log_path)
+                               except OSError:
+                                       pass
+
+               if self.config.options.recycle_dir is not None:
+
+                       recycle_file = os.path.join(
+                               self.config.options.recycle_dir, self.distfile)
+
+                       if self.config.options.dry_run:
+                               if os.path.exists(recycle_file):
+                                       logging.info("dry-run: delete '%s' from recycle" %
+                                               (self.distfile,))
+                       else:
+                               try:
+                                       os.unlink(recycle_file)
+                               except OSError:
+                                       pass
+                               else:
+                                       logging.debug("delete '%s' from recycle" %
+                                               (self.distfile,))
+
+       def _distfiles_digester_exit(self, digester):
+
+               self._assert_current(digester)
+               if self._was_cancelled():
+                       self.wait()
+                       return
+
+               if self._default_exit(digester) != os.EX_OK:
+                       msg = "%s distfiles digester failed unexpectedly" % \
+                               (self.distfile,)
+                       self.scheduler.output(msg + '\n', background=True,
+                               log_path=self._log_path)
+                       logging.error(msg)
+                       self.wait()
+                       return
+
+               wrong_digest = self._find_bad_digest(digester.digests)
+               if wrong_digest is None:
+                       self._success()
+                       self.returncode = os.EX_OK
+                       self.wait()
+                       return
+
+               self._start_fetch()
+
+       _mirror_info = collections.namedtuple('_mirror_info',
+               'name location')
+
+       def _start_fetch(self):
+
+               self._previously_added = False
+               self._fs_mirror_stack = []
+               if self.config.options.distfiles_local is not None:
+                       self._fs_mirror_stack.append(self._mirror_info(
+                               'distfiles-local', self.config.options.distfiles_local))
+               if self.config.options.recycle_dir is not None:
+                       self._fs_mirror_stack.append(self._mirror_info(
+                               'recycle', self.config.options.recycle_dir))
+
+               self._primaryuri_stack = []
+               self._mirror_stack = []
+               for uri in reversed(self.uri_tuple):
+                       if uri.startswith('mirror://'):
+                               self._mirror_stack.append(
+                                       self._mirror_iterator(uri, self.config.mirrors))
+                       else:
+                               self._primaryuri_stack.append(uri)
+
+               self._tried_uris = set()
+               self._try_next_mirror()
+
+       @staticmethod
+       def _mirror_iterator(uri, mirrors_dict):
+
+               slash_index = uri.find("/", 9)
+               if slash_index != -1:
+                       mirror_name = uri[9:slash_index].strip("/")
+                       for mirror in mirrors_dict.get(mirror_name, []):
+                               yield mirror.rstrip("/") + "/" + uri[slash_index+1:]
+
+       def _try_next_mirror(self):
+               if self._fs_mirror_stack:
+                       self._fetch_fs(self._fs_mirror_stack.pop())
+                       return
+               else:
+                       uri = self._next_uri()
+                       if uri is not None:
+                               self._tried_uris.add(uri)
+                               self._fetch_uri(uri)
+                               return
+
+               if self._tried_uris:
+                       msg = "all uris failed"
+               else:
+                       msg = "no fetchable uris"
+
+               self.config.log_failure("%s\t%s\t%s" %
+                       (self.cpv, self.distfile, msg))
+               self.config.file_failures[self.distfile] = self.cpv
+               self.returncode = os.EX_OK
+               self.wait()
+
+       def _next_uri(self):
+               remaining_tries = self.config.options.tries - len(self._tried_uris)
+               if remaining_tries > 0:
+
+                       if remaining_tries <= self.config.options.tries / 2:
+                               while self._primaryuri_stack:
+                                       uri = self._primaryuri_stack.pop()
+                                       if uri not in self._tried_uris:
+                                               return uri
+
+                       while self._mirror_stack:
+                               uri = next(self._mirror_stack[-1], None)
+                               if uri is None:
+                                       self._mirror_stack.pop()
+                               else:
+                                       if uri not in self._tried_uris:
+                                               return uri
+
+                       if self._primaryuri_stack:
+                               return self._primaryuri_stack.pop()
+
+               return None
+
+       def _fetch_fs(self, mirror_info):
+               file_path = os.path.join(mirror_info.location, self.distfile)
+
+               st = None
+               size_ok = False
+               try:
+                       st = os.stat(file_path)
+               except OSError as e:
+                       if e.errno not in (errno.ENOENT, errno.ESTALE):
+                               msg = "%s stat failed in %s: %s" % \
+                                       (self.distfile, mirror_info.name, e)
+                               self.scheduler.output(msg + '\n', background=True,
+                                       log_path=self._log_path)
+                               logging.error(msg)
+               else:
+                       size_ok = st.st_size == self.digests["size"]
+                       self._current_stat = st
+
+               if size_ok:
+                       self._current_mirror = mirror_info
+                       self._start_task(
+                               FileDigester(file_path=file_path,
+                                       hash_names=(self._select_hash(),),
+                                       background=self.background,
+                                       logfile=self._log_path),
+                               self._fs_mirror_digester_exit)
+               else:
+                       self._try_next_mirror()
+
+       def _fs_mirror_digester_exit(self, digester):
+
+               self._assert_current(digester)
+               if self._was_cancelled():
+                       self.wait()
+                       return
+
+               current_mirror = self._current_mirror
+               if digester.returncode != os.EX_OK:
+                       msg = "%s %s digester failed unexpectedly" % \
+                       (self.distfile, current_mirror.name)
+                       self.scheduler.output(msg + '\n', background=True,
+                               log_path=self._log_path)
+                       logging.error(msg)
+               else:
+                       bad_digest = self._find_bad_digest(digester.digests)
+                       if bad_digest is not None:
+                               msg = "%s %s has bad %s digest: expected %s, got %s" % \
+                                       (self.distfile, current_mirror.name, bad_digest,
+                                       self.digests[bad_digest], digester.digests[bad_digest])
+                               self.scheduler.output(msg + '\n', background=True,
+                                       log_path=self._log_path)
+                               logging.error(msg)
+                       elif self.config.options.dry_run:
+                               # Report success without actually touching any files
+                               if self._same_device(current_mirror.location,
+                                       self.config.options.distfiles):
+                                       logging.info(("dry-run: hardlink '%s' from %s "
+                                               "to distfiles") % (self.distfile, current_mirror.name))
+                               else:
+                                       logging.info("dry-run: copy '%s' from %s to distfiles" %
+                                               (self.distfile, current_mirror.name))
+                               self._success()
+                               self.returncode = os.EX_OK
+                               self.wait()
+                               return
+                       else:
+                               src = os.path.join(current_mirror.location, self.distfile)
+                               dest = os.path.join(self.config.options.distfiles, self.distfile)
+                               if self._hardlink_atomic(src, dest,
+                                       "%s to %s" % (current_mirror.name, "distfiles")):
+                                       logging.debug("hardlink '%s' from %s to distfiles" %
+                                               (self.distfile, current_mirror.name))
+                                       self._success()
+                                       self.returncode = os.EX_OK
+                                       self.wait()
+                                       return
+                               else:
+                                       self._start_task(
+                                               FileCopier(src_path=src, dest_path=dest,
+                                                       background=(self.background and
+                                                               self._log_path is not None),
+                                                       logfile=self._log_path),
+                                               self._fs_mirror_copier_exit)
+                                       return
+
+               self._try_next_mirror()
+
+       def _fs_mirror_copier_exit(self, copier):
+
+               self._assert_current(copier)
+               if self._was_cancelled():
+                       self.wait()
+                       return
+
+               current_mirror = self._current_mirror
+               if copier.returncode != os.EX_OK:
+                       msg = "%s %s copy failed unexpectedly" % \
+                               (self.distfile, current_mirror.name)
+                       self.scheduler.output(msg + '\n', background=True,
+                               log_path=self._log_path)
+                       logging.error(msg)
+               else:
+
+                       logging.debug("copy '%s' from %s to distfiles" %
+                               (self.distfile, current_mirror.name))
+
+                       try:
+                               portage.util.apply_stat_permissions(
+                                       copier.dest_path, self._current_stat)
+                       except (OSError, PortageException) as e:
+                               msg = ("%s %s apply_stat_permissions "
+                                       "failed unexpectedly: %s") % \
+                                       (self.distfile, current_mirror.name, e)
+                               self.scheduler.output(msg + '\n', background=True,
+                                       log_path=self._log_path)
+                               logging.error(msg)
+
+                       try:
+                               if sys.hexversion >= 0x3030000:
+                                       os.utime(copier.dest_path,
+                                               ns=(self._current_stat.st_mtime_ns,
+                                               self._current_stat.st_mtime_ns))
+                               else:
+                                       os.utime(copier.dest_path,
+                                               (self._current_stat[stat.ST_MTIME],
+                                               self._current_stat[stat.ST_MTIME]))
+                       except OSError as e:
+                               msg = "%s %s utime failed unexpectedly: %s" % \
+                                       (self.distfile, current_mirror.name, e)
+                               self.scheduler.output(msg + '\n', background=True,
+                                       log_path=self._log_path)
+                               logging.error(msg)
+
+                       self._success()
+                       self.returncode = os.EX_OK
+                       self.wait()
+                       return
+
+               self._try_next_mirror()
+
+       def _fetch_uri(self, uri):
+
+               if self.config.options.dry_run:
+                       # Simply report success.
+                       logging.info("dry-run: fetch '%s' from '%s'" %
+                               (self.distfile, uri))
+                       self._success()
+                       self.returncode = os.EX_OK
+                       self.wait()
+                       return
+
+               if self.config.options.temp_dir:
+                       self._fetch_tmp_dir_info = 'temp-dir'
+                       distdir = self.config.options.temp_dir
+               else:
+                       self._fetch_tmp_dir_info = 'distfiles'
+                       distdir = self.config.options.distfiles
+
+               tmp_basename = self.distfile + '._emirrordist_fetch_.%s' % os.getpid()
+
+               variables = {
+                       "DISTDIR": distdir,
+                       "URI":     uri,
+                       "FILE":    tmp_basename
+               }
+
+               self._fetch_tmp_file = os.path.join(distdir, tmp_basename)
+
+               try:
+                       os.unlink(self._fetch_tmp_file)
+               except OSError:
+                       pass
+
+               args = portage.util.shlex_split(default_fetchcommand)
+               args = [portage.util.varexpand(x, mydict=variables)
+                       for x in args]
+
+               if sys.hexversion < 0x3000000 or sys.hexversion >= 0x3020000:
+                       # Python 3.1 does not support bytes in Popen args.
+                       args = [_unicode_encode(x,
+                               encoding=_encodings['fs'], errors='strict') for x in args]
+
+               null_fd = os.open(os.devnull, os.O_RDONLY)
+               fetcher = PopenProcess(background=self.background,
+                       proc=subprocess.Popen(args, stdin=null_fd,
+                       stdout=subprocess.PIPE, stderr=subprocess.STDOUT),
+                       scheduler=self.scheduler)
+               os.close(null_fd)
+
+               fetcher.pipe_reader = PipeLogger(background=self.background,
+                       input_fd=fetcher.proc.stdout, log_file_path=self._log_path,
+                       scheduler=self.scheduler)
+
+               self._start_task(fetcher, self._fetcher_exit)
+
+       def _fetcher_exit(self, fetcher):
+
+               self._assert_current(fetcher)
+               if self._was_cancelled():
+                       self.wait()
+                       return
+               
+               if os.path.exists(self._fetch_tmp_file):
+                       self._start_task(
+                               FileDigester(file_path=self._fetch_tmp_file,
+                                       hash_names=(self._select_hash(),),
+                                       background=self.background,
+                                       logfile=self._log_path),
+                                       self._fetch_digester_exit)
+               else:
+                       self._try_next_mirror()
+
+       def _fetch_digester_exit(self, digester):
+
+               self._assert_current(digester)
+               if self._was_cancelled():
+                       self.wait()
+                       return
+
+               if digester.returncode != os.EX_OK:
+                       msg = "%s %s digester failed unexpectedly" % \
+                       (self.distfile, self._fetch_tmp_dir_info)
+                       self.scheduler.output(msg + '\n', background=True,
+                               log_path=self._log_path)
+                       logging.error(msg)
+               else:
+                       bad_digest = self._find_bad_digest(digester.digests)
+                       if bad_digest is not None:
+                               msg = "%s %s has bad %s digest: expected %s, got %s" % \
+                                       (self.distfile, self._current_mirror.name, bad_digest,
+                                       self.digests[bad_digest], digester.digests[bad_digest])
+                               self.scheduler.output(msg + '\n', background=True,
+                                       log_path=self._log_path)
+                               try:
+                                       os.unlink(self._fetch_tmp_file)
+                               except OSError:
+                                       pass
+                       else:
+                               dest = os.path.join(self.config.options.distfiles, self.distfile)
+                               try:
+                                       os.rename(self._fetch_tmp_file, dest)
+                               except OSError:
+                                       self._start_task(
+                                               FileCopier(src_path=self._fetch_tmp_file,
+                                                       dest_path=dest,
+                                                       background=(self.background and
+                                                               self._log_path is not None),
+                                                       logfile=self._log_path),
+                                               self._fetch_copier_exit)
+                                       return
+                               else:
+                                       self._success()
+                                       self.returncode = os.EX_OK
+                                       self.wait()
+                                       return
+
+               self._try_next_mirror()
+
+       def _fetch_copier_exit(self, copier):
+
+               self._assert_current(copier)
+
+               try:
+                       os.unlink(self._fetch_tmp_file)
+               except OSError:
+                       pass
+
+               if self._was_cancelled():
+                       self.wait()
+                       return
+
+               if copier.returncode == os.EX_OK:
+                       self._success()
+                       self.returncode = os.EX_OK
+                       self.wait()
+               else:
+                       # out of space?
+                       msg = "%s %s copy failed unexpectedly" % \
+                               (self.distfile, self._fetch_tmp_dir_info)
+                       self.scheduler.output(msg + '\n', background=True,
+                               log_path=self._log_path)
+                       logging.error(msg)
+                       self.config.log_failure("%s\t%s\t%s" %
+                               (self.cpv, self.distfile, msg))
+                       self.config.file_failures[self.distfile] = self.cpv
+                       self.returncode = 1
+                       self.wait()
+
+       def _unlink_file(self, file_path, dir_info):
+               try:
+                       os.unlink(file_path)
+               except OSError as e:
+                       if e.errno not in (errno.ENOENT, errno.ESTALE):
+                               msg = "unlink '%s' failed in %s: %s" % \
+                                       (self.distfile, dir_info, e)
+                               self.scheduler.output(msg + '\n', background=True,
+                                       log_path=self._log_path)
+                               logging.error(msg)
+                               return False
+               return True
+
+       def _have_needed_digests(self):
+               return "size" in self.digests and \
+                       self._select_hash() is not None
+
+       def _select_hash(self):
+               if default_hash_name in self.digests:
+                       return default_hash_name
+               else:
+                       for hash_name in self.digests:
+                               if hash_name != "size" and \
+                                       hash_name in portage.checksum.hashfunc_map:
+                                       return hash_name
+
+               return None
+
+       def _find_bad_digest(self, digests):
+               for hash_name, hash_value in digests.items():
+                       if self.digests[hash_name] != hash_value:
+                               return hash_name
+               return None
+
+       @staticmethod
+       def _same_device(path1, path2):
+               try:
+                       st1 = os.stat(path1)
+                       st2 = os.stat(path2)
+               except OSError:
+                       return False
+               else:
+                       return st1.st_dev == st2.st_dev
+
+       def _hardlink_atomic(self, src, dest, dir_info):
+
+               head, tail = os.path.split(dest)
+               hardlink_tmp = os.path.join(head, ".%s._mirrordist_hardlink_.%s" % \
+                       (tail, os.getpid()))
+
+               try:
+                       try:
+                               os.link(src, hardlink_tmp)
+                       except OSError as e:
+                               if e.errno != errno.EXDEV:
+                                       msg = "hardlink %s from %s failed: %s" % \
+                                               (self.distfile, dir_info, e)
+                                       self.scheduler.output(msg + '\n', background=True,
+                                               log_path=self._log_path)
+                                       logging.error(msg)
+                               return False
+
+                       try:
+                               os.rename(hardlink_tmp, dest)
+                       except OSError as e:
+                               msg = "hardlink rename '%s' from %s failed: %s" % \
+                                       (self.distfile, dir_info, e)
+                               self.scheduler.output(msg + '\n', background=True,
+                                       log_path=self._log_path)
+                               logging.error(msg)
+                               return False
+               finally:
+                       try:
+                               os.unlink(hardlink_tmp)
+                       except OSError:
+                               pass
+
+               return True
diff --git a/pym/portage/_emirrordist/MirrorDistTask.py b/pym/portage/_emirrordist/MirrorDistTask.py
new file mode 100644 (file)
index 0000000..b6f875d
--- /dev/null
@@ -0,0 +1,218 @@
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import errno
+import logging
+import sys
+import time
+
+try:
+       import threading
+except ImportError:
+       import dummy_threading as threading
+
+import portage
+from portage import os
+from portage.util._async.TaskScheduler import TaskScheduler
+from _emerge.CompositeTask import CompositeTask
+from .FetchIterator import FetchIterator
+from .DeletionIterator import DeletionIterator
+
+if sys.hexversion >= 0x3000000:
+       long = int
+
+class MirrorDistTask(CompositeTask):
+
+       __slots__ = ('_config', '_terminated', '_term_check_id')
+
+       def __init__(self, config):
+               CompositeTask.__init__(self, scheduler=config.event_loop)
+               self._config = config
+               self._terminated = threading.Event()
+
+       def _start(self):
+               self._term_check_id = self.scheduler.idle_add(self._termination_check)
+               fetch = TaskScheduler(iter(FetchIterator(self._config)),
+                       max_jobs=self._config.options.jobs,
+                       max_load=self._config.options.load_average,
+                       event_loop=self._config.event_loop)
+               self._start_task(fetch, self._fetch_exit)
+
+       def _fetch_exit(self, fetch):
+
+               self._assert_current(fetch)
+               if self._was_cancelled():
+                       self.wait()
+                       return
+
+               if self._config.options.delete:
+                       deletion = TaskScheduler(iter(DeletionIterator(self._config)),
+                               max_jobs=self._config.options.jobs,
+                               max_load=self._config.options.load_average,
+                               event_loop=self._config.event_loop)
+                       self._start_task(deletion, self._deletion_exit)
+                       return
+
+               self._post_deletion()
+
+       def _deletion_exit(self, deletion):
+
+               self._assert_current(deletion)
+               if self._was_cancelled():
+                       self.wait()
+                       return
+
+               self._post_deletion()
+
+       def _post_deletion(self):
+
+               if self._config.options.recycle_db is not None:
+                       self._update_recycle_db()
+
+               if self._config.options.scheduled_deletion_log is not None:
+                       self._scheduled_deletion_log()
+
+               self._summary()
+
+               self.returncode = os.EX_OK
+               self._current_task = None
+               self.wait()
+
+       def _update_recycle_db(self):
+
+               start_time = self._config.start_time
+               recycle_dir = self._config.options.recycle_dir
+               recycle_db = self._config.recycle_db
+               r_deletion_delay = self._config.options.recycle_deletion_delay
+
+               # Use a dict optimize access.
+               recycle_db_cache = dict(recycle_db.items())
+
+               for filename in os.listdir(recycle_dir):
+
+                       recycle_file = os.path.join(recycle_dir, filename)
+
+                       try:
+                               st = os.stat(recycle_file)
+                       except OSError as e:
+                               if e.errno not in (errno.ENOENT, errno.ESTALE):
+                                       logging.error(("stat failed for '%s' in "
+                                               "recycle: %s") % (filename, e))
+                               continue
+
+                       value = recycle_db_cache.pop(filename, None)
+                       if value is None:
+                               logging.debug(("add '%s' to "
+                                       "recycle db") % filename)
+                               recycle_db[filename] = (st.st_size, start_time)
+                       else:
+                               r_size, r_time = value
+                               if long(r_size) != st.st_size:
+                                       recycle_db[filename] = (st.st_size, start_time)
+                               elif r_time + r_deletion_delay < start_time:
+                                       if self._config.options.dry_run:
+                                               logging.info(("dry-run: delete '%s' from "
+                                                       "recycle") % filename)
+                                               logging.info(("drop '%s' from "
+                                                       "recycle db") % filename)
+                                       else:
+                                               try:
+                                                       os.unlink(recycle_file)
+                                               except OSError as e:
+                                                       if e.errno not in (errno.ENOENT, errno.ESTALE):
+                                                               logging.error(("delete '%s' from "
+                                                                       "recycle failed: %s") % (filename, e))
+                                               else:
+                                                       logging.debug(("delete '%s' from "
+                                                               "recycle") % filename)
+                                                       try:
+                                                               del recycle_db[filename]
+                                                       except KeyError:
+                                                               pass
+                                                       else:
+                                                               logging.debug(("drop '%s' from "
+                                                                       "recycle db") % filename)
+
+               # Existing files were popped from recycle_db_cache,
+               # so any remaining entries are for files that no
+               # longer exist.
+               for filename in recycle_db_cache:
+                       try:
+                               del recycle_db[filename]
+                       except KeyError:
+                               pass
+                       else:
+                               logging.debug(("drop non-existent '%s' from "
+                                       "recycle db") % filename)
+
+       def _scheduled_deletion_log(self):
+
+               start_time = self._config.start_time
+               dry_run = self._config.options.dry_run
+               deletion_delay = self._config.options.deletion_delay
+               distfiles_db = self._config.distfiles_db
+
+               date_map = {}
+               for filename, timestamp in self._config.deletion_db.items():
+                       date = timestamp + deletion_delay
+                       if date < start_time:
+                               date = start_time
+                       date = time.strftime("%Y-%m-%d", time.gmtime(date))
+                       date_files = date_map.get(date)
+                       if date_files is None:
+                               date_files = []
+                               date_map[date] = date_files
+                       date_files.append(filename)
+
+               if dry_run:
+                       logging.warn(("dry-run: scheduled-deletions log "
+                               "will be summarized via logging.info"))
+
+               lines = []
+               for date in sorted(date_map):
+                       date_files = date_map[date]
+                       if dry_run:
+                               logging.info(("dry-run: scheduled deletions for %s: %s files") %
+                                       (date, len(date_files)))
+                       lines.append("%s\n" % date)
+                       for filename in date_files:
+                               cpv = "unknown"
+                               if distfiles_db is not None:
+                                       cpv = distfiles_db.get(filename, cpv)
+                               lines.append("\t%s\t%s\n" % (filename, cpv))
+
+               if not dry_run:
+                       portage.util.write_atomic(
+                               self._config.options.scheduled_deletion_log,
+                               "".join(lines))
+
+       def _summary(self):
+               elapsed_time = time.time() - self._config.start_time
+               fail_count = len(self._config.file_failures)
+               delete_count = self._config.delete_count
+               scheduled_deletion_count = self._config.scheduled_deletion_count - delete_count
+               added_file_count = self._config.added_file_count
+               added_byte_count = self._config.added_byte_count
+
+               logging.info("finished in %i seconds" % elapsed_time)
+               logging.info("failed to fetch %i files" % fail_count)
+               logging.info("deleted %i files" % delete_count)
+               logging.info("deletion of %i files scheduled" %
+                       scheduled_deletion_count)
+               logging.info("added %i files" % added_file_count)
+               logging.info("added %i bytes total" % added_byte_count)
+
+       def terminate(self):
+               self._terminated.set()
+
+       def _termination_check(self):
+               if self._terminated.is_set():
+                       self.cancel()
+                       self.wait()
+               return True
+
+       def _wait(self):
+               CompositeTask._wait(self)
+               if self._term_check_id is not None:
+                       self.scheduler.source_remove(self._term_check_id)
+                       self._term_check_id = None
diff --git a/pym/portage/_emirrordist/__init__.py b/pym/portage/_emirrordist/__init__.py
new file mode 100644 (file)
index 0000000..6cde932
--- /dev/null
@@ -0,0 +1,2 @@
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
diff --git a/pym/portage/_emirrordist/main.py b/pym/portage/_emirrordist/main.py
new file mode 100644 (file)
index 0000000..6b7c22f
--- /dev/null
@@ -0,0 +1,438 @@
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import logging
+import optparse
+import sys
+
+import portage
+from portage import os
+from portage.util import normalize_path
+from portage.util._async.run_main_scheduler import run_main_scheduler
+from portage.util._async.SchedulerInterface import SchedulerInterface
+from portage.util._eventloop.global_event_loop import global_event_loop
+from portage._emirrordist.Config import Config
+from .Config import Config
+from .MirrorDistTask import MirrorDistTask
+
+if sys.hexversion >= 0x3000000:
+       long = int
+
+seconds_per_day = 24 * 60 * 60
+
+common_options = (
+       {
+               "longopt"  : "--dry-run",
+               "help"     : "perform a trial run with no changes made (usually combined "
+                       "with --verbose)",
+               "action"   : "store_true"
+       },
+       {
+               "longopt"  : "--verbose",
+               "shortopt" : "-v",
+               "help"     : "display extra information on stderr "
+                       "(multiple occurences increase verbosity)",
+               "action"   : "count",
+               "default"  : 0,
+       },
+       {
+               "longopt"  : "--ignore-default-opts",
+               "help"     : "do not use the EMIRRORDIST_DEFAULT_OPTS environment variable",
+               "action"   : "store_true"
+       },
+       {
+               "longopt"  : "--distfiles",
+               "help"     : "distfiles directory to use (required)",
+               "metavar"  : "DIR"
+       },
+       {
+               "longopt"  : "--jobs",
+               "shortopt" : "-j",
+               "help"     : "number of concurrent jobs to run"
+       },
+       {
+               "longopt"  : "--load-average",
+               "shortopt" : "-l",
+               "help"     : "load average limit for spawning of new concurrent jobs",
+               "metavar"  : "LOAD"
+       },
+       {
+               "longopt"  : "--tries",
+               "help"     : "maximum number of tries per file, 0 means unlimited (default is 10)",
+               "default"  : 10
+       },
+       {
+               "longopt"  : "--repo",
+               "help"     : "name of repo to operate on (default repo is located at $PORTDIR)"
+       },
+       {
+               "longopt"  : "--config-root",
+               "help"     : "location of portage config files",
+               "metavar"  : "DIR"
+       },
+       {
+               "longopt"  : "--portdir",
+               "help"     : "override the portage tree location",
+               "metavar"  : "DIR"
+       },
+       {
+               "longopt"  : "--portdir-overlay",
+               "help"     : "override the PORTDIR_OVERLAY variable (requires "
+                       "that --repo is also specified)"
+       },
+       {
+               "longopt"  : "--strict-manifests",
+               "help"     : "manually override \"strict\" FEATURES setting",
+               "choices"  : ("y", "n"),
+               "metavar"  : "<y|n>",
+               "type"     : "choice"
+       },
+       {
+               "longopt"  : "--failure-log",
+               "help"     : "log file for fetch failures, with tab-delimited "
+                       "output, for reporting purposes",
+               "metavar"  : "FILE"
+       },
+       {
+               "longopt"  : "--success-log",
+               "help"     : "log file for fetch successes, with tab-delimited "
+                       "output, for reporting purposes",
+               "metavar"  : "FILE"
+       },
+       {
+               "longopt"  : "--scheduled-deletion-log",
+               "help"     : "log file for scheduled deletions, with tab-delimited "
+                       "output, for reporting purposes",
+               "metavar"  : "FILE"
+       },
+       {
+               "longopt"  : "--delete",
+               "help"     : "enable deletion of unused distfiles",
+               "action"   : "store_true"
+       },
+       {
+               "longopt"  : "--deletion-db",
+               "help"     : "database file used to track lifetime of files "
+                       "scheduled for delayed deletion",
+               "metavar"  : "FILE"
+       },
+       {
+               "longopt"  : "--deletion-delay",
+               "help"     : "delay time for deletion, measured in seconds",
+               "metavar"  : "SECONDS"
+       },
+       {
+               "longopt"  : "--temp-dir",
+               "help"     : "temporary directory for downloads",
+               "metavar"  : "DIR"
+       },
+       {
+               "longopt"  : "--mirror-overrides",
+               "help"     : "file holding a list of mirror overrides",
+               "metavar"  : "FILE"
+       },
+       {
+               "longopt"  : "--mirror-skip",
+               "help"     : "comma delimited list of mirror targets to skip "
+                       "when fetching"
+       },
+       {
+               "longopt"  : "--restrict-mirror-exemptions",
+               "help"     : "comma delimited list of mirror targets for which to "
+                       "ignore RESTRICT=\"mirror\""
+       },
+       {
+               "longopt"  : "--verify-existing-digest",
+               "help"     : "use digest as a verification of whether existing "
+                       "distfiles are valid",
+               "action"   : "store_true"
+       },
+       {
+               "longopt"  : "--distfiles-local",
+               "help"     : "distfiles-local directory to use",
+               "metavar"  : "DIR"
+       },
+       {
+               "longopt"  : "--distfiles-db",
+               "help"     : "database file used to track which ebuilds a "
+                       "distfile belongs to",
+               "metavar"  : "FILE"
+       },
+       {
+               "longopt"  : "--recycle-dir",
+               "help"     : "directory for extended retention of files that "
+                       "are removed from distdir with the --delete option",
+               "metavar"  : "DIR"
+       },
+       {
+               "longopt"  : "--recycle-db",
+               "help"     : "database file used to track lifetime of files "
+                       "in recycle dir",
+               "metavar"  : "FILE"
+       },
+       {
+               "longopt"  : "--recycle-deletion-delay",
+               "help"     : "delay time for deletion of unused files from "
+                       "recycle dir, measured in seconds (defaults to "
+                       "the equivalent of 60 days)",
+               "default"  : 60 * seconds_per_day,
+               "metavar"  : "SECONDS"
+       },
+       {
+               "longopt"  : "--fetch-log-dir",
+               "help"     : "directory for individual fetch logs",
+               "metavar"  : "DIR"
+       },
+       {
+               "longopt"  : "--whitelist-from",
+               "help"     : "specifies a file containing a list of files to "
+                       "whitelist, one per line, # prefixed lines ignored",
+               "action"   : "append",
+               "metavar"  : "FILE"
+       },
+)
+
+def parse_args(args):
+       description = "emirrordist - a fetch tool for mirroring " \
+               "of package distfiles"
+       usage = "emirrordist [options] <action>"
+       parser = optparse.OptionParser(description=description, usage=usage)
+
+       actions = optparse.OptionGroup(parser, 'Actions')
+       actions.add_option("--version",
+               action="store_true",
+               help="display portage version and exit")
+       actions.add_option("--mirror",
+               action="store_true",
+               help="mirror distfiles for the selected repository")
+       parser.add_option_group(actions)
+
+       common = optparse.OptionGroup(parser, 'Common options')
+       for opt_info in common_options:
+               opt_pargs = [opt_info["longopt"]]
+               if opt_info.get("shortopt"):
+                       opt_pargs.append(opt_info["shortopt"])
+               opt_kwargs = {"help" : opt_info["help"]}
+               for k in ("action", "choices", "default", "metavar", "type"):
+                       if k in opt_info:
+                               opt_kwargs[k] = opt_info[k]
+               common.add_option(*opt_pargs, **opt_kwargs)
+       parser.add_option_group(common)
+
+       options, args = parser.parse_args(args)
+
+       return (parser, options, args)
+
+def emirrordist_main(args):
+
+       parser, options, args = parse_args(args)
+
+       if options.version:
+               sys.stdout.write("Portage %s\n" % portage.VERSION)
+               return os.EX_OK
+
+       config_root = options.config_root
+
+       # The calling environment is ignored, so the program is
+       # completely controlled by commandline arguments.
+       env = {}
+
+       if options.repo is None:
+               env['PORTDIR_OVERLAY'] = ''
+       elif options.portdir_overlay:
+               env['PORTDIR_OVERLAY'] = options.portdir_overlay
+
+       if options.portdir is not None:
+               env['PORTDIR'] = options.portdir
+
+       settings = portage.config(config_root=config_root,
+               local_config=False, env=env)
+
+       default_opts = None
+       if not options.ignore_default_opts:
+               default_opts = settings.get('EMIRRORDIST_DEFAULT_OPTS', '').split()
+
+       if default_opts:
+               parser, options, args = parse_args(default_opts + args)
+
+               settings = portage.config(config_root=config_root,
+                       local_config=False, env=env)
+
+       repo_path = None
+       if options.repo is not None:
+               repo_path = settings.repositories.treemap.get(options.repo)
+               if repo_path is None:
+                       parser.error("Unable to locate repository named '%s'" % \
+                               (options.repo,))
+       else:
+               repo_path = settings.repositories.mainRepoLocation()
+               if not repo_path:
+                       parser.error("PORTDIR is undefined")
+
+       if options.jobs is not None:
+               options.jobs = int(options.jobs)
+
+       if options.load_average is not None:
+               options.load_average = float(options.load_average)
+
+       if options.failure_log is not None:
+               options.failure_log = normalize_path(
+                       os.path.abspath(options.failure_log))
+
+               parent_dir = os.path.dirname(options.failure_log)
+               if not (os.path.isdir(parent_dir) and
+                       os.access(parent_dir, os.W_OK|os.X_OK)):
+                       parser.error(("--failure-log '%s' parent is not a "
+                               "writable directory") % options.failure_log)
+
+       if options.success_log is not None:
+               options.success_log = normalize_path(
+                       os.path.abspath(options.success_log))
+
+               parent_dir = os.path.dirname(options.success_log)
+               if not (os.path.isdir(parent_dir) and
+                       os.access(parent_dir, os.W_OK|os.X_OK)):
+                       parser.error(("--success-log '%s' parent is not a "
+                               "writable directory") % options.success_log)
+
+       if options.scheduled_deletion_log is not None:
+               options.scheduled_deletion_log = normalize_path(
+                       os.path.abspath(options.scheduled_deletion_log))
+
+               parent_dir = os.path.dirname(options.scheduled_deletion_log)
+               if not (os.path.isdir(parent_dir) and
+                       os.access(parent_dir, os.W_OK|os.X_OK)):
+                       parser.error(("--scheduled-deletion-log '%s' parent is not a "
+                               "writable directory") % options.scheduled_deletion_log)
+
+               if options.deletion_db is None:
+                       parser.error("--scheduled-deletion-log requires --deletion-db")
+
+       if options.deletion_delay is not None:
+               options.deletion_delay = long(options.deletion_delay)
+               if options.deletion_db is None:
+                       parser.error("--deletion-delay requires --deletion-db")
+
+       if options.deletion_db is not None:
+               if options.deletion_delay is None:
+                       parser.error("--deletion-db requires --deletion-delay")
+               options.deletion_db = normalize_path(
+                       os.path.abspath(options.deletion_db))
+
+       if options.temp_dir is not None:
+               options.temp_dir = normalize_path(
+                       os.path.abspath(options.temp_dir))
+
+               if not (os.path.isdir(options.temp_dir) and
+                       os.access(options.temp_dir, os.W_OK|os.X_OK)):
+                       parser.error(("--temp-dir '%s' is not a "
+                               "writable directory") % options.temp_dir)
+
+       if options.distfiles is not None:
+               options.distfiles = normalize_path(
+                       os.path.abspath(options.distfiles))
+
+               if not (os.path.isdir(options.distfiles) and
+                       os.access(options.distfiles, os.W_OK|os.X_OK)):
+                       parser.error(("--distfiles '%s' is not a "
+                               "writable directory") % options.distfiles)
+       else:
+               parser.error("missing required --distfiles parameter")
+
+       if options.mirror_overrides is not None:
+               options.mirror_overrides = normalize_path(
+                       os.path.abspath(options.mirror_overrides))
+
+               if not (os.access(options.mirror_overrides, os.R_OK) and
+                       os.path.isfile(options.mirror_overrides)):
+                       parser.error(
+                               "--mirror-overrides-file '%s' is not a readable file" %
+                               options.mirror_overrides)
+
+       if options.distfiles_local is not None:
+               options.distfiles_local = normalize_path(
+                       os.path.abspath(options.distfiles_local))
+
+               if not (os.path.isdir(options.distfiles_local) and
+                       os.access(options.distfiles_local, os.W_OK|os.X_OK)):
+                       parser.error(("--distfiles-local '%s' is not a "
+                               "writable directory") % options.distfiles_local)
+
+       if options.distfiles_db is not None:
+               options.distfiles_db = normalize_path(
+                       os.path.abspath(options.distfiles_db))
+
+       if options.tries is not None:
+               options.tries = int(options.tries)
+
+       if options.recycle_dir is not None:
+               options.recycle_dir = normalize_path(
+                       os.path.abspath(options.recycle_dir))
+               if not (os.path.isdir(options.recycle_dir) and
+                       os.access(options.recycle_dir, os.W_OK|os.X_OK)):
+                       parser.error(("--recycle-dir '%s' is not a "
+                               "writable directory") % options.recycle_dir)
+
+       if options.recycle_db is not None:
+               if options.recycle_dir is None:
+                       parser.error("--recycle-db requires "
+                               "--recycle-dir to be specified")
+               options.recycle_db = normalize_path(
+                       os.path.abspath(options.recycle_db))
+
+       if options.recycle_deletion_delay is not None:
+               options.recycle_deletion_delay = \
+                       long(options.recycle_deletion_delay)
+
+       if options.fetch_log_dir is not None:
+               options.fetch_log_dir = normalize_path(
+                       os.path.abspath(options.fetch_log_dir))
+
+               if not (os.path.isdir(options.fetch_log_dir) and
+                       os.access(options.fetch_log_dir, os.W_OK|os.X_OK)):
+                       parser.error(("--fetch-log-dir '%s' is not a "
+                               "writable directory") % options.fetch_log_dir)
+
+       if options.whitelist_from:
+               normalized_paths = []
+               for x in options.whitelist_from:
+                       path = normalize_path(os.path.abspath(x))
+                       normalized_paths.append(path)
+                       if not (os.access(path, os.R_OK) and os.path.isfile(path)):
+                               parser.error(
+                                       "--whitelist-from '%s' is not a readable file" % x)
+               options.whitelist_from = normalized_paths
+
+       if options.strict_manifests is not None:
+               if options.strict_manifests == "y":
+                       settings.features.add("strict")
+               else:
+                       settings.features.discard("strict")
+
+       settings.lock()
+
+       portdb = portage.portdbapi(mysettings=settings)
+
+       # Limit ebuilds to the specified repo.
+       portdb.porttrees = [repo_path]
+
+       portage.util.initialize_logger()
+
+       if options.verbose > 0:
+               l = logging.getLogger()
+               l.setLevel(l.getEffectiveLevel() - 10 * options.verbose)
+
+       with Config(options, portdb,
+               SchedulerInterface(global_event_loop())) as config:
+
+               if not options.mirror:
+                       parser.error('No action specified')
+
+               returncode = os.EX_OK
+
+               if options.mirror:
+                       signum = run_main_scheduler(MirrorDistTask(config))
+                       if signum is not None:
+                               sys.exit(128 + signum)
+
+       return returncode
diff --git a/pym/portage/util/_ShelveUnicodeWrapper.py b/pym/portage/util/_ShelveUnicodeWrapper.py
new file mode 100644 (file)
index 0000000..adbd519
--- /dev/null
@@ -0,0 +1,45 @@
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+class ShelveUnicodeWrapper(object):
+       """
+       Convert unicode to str and back again, since python-2.x shelve
+       module doesn't support unicode.
+       """
+       def __init__(self, shelve_instance):
+               self._shelve = shelve_instance
+
+       def _encode(self, s):
+               if isinstance(s, unicode):
+                       s = s.encode('utf_8')
+               return s
+
+       def __len__(self):
+               return len(self._shelve)
+
+       def __contains__(self, k):
+               return self._encode(k) in self._shelve
+
+       def __iter__(self):
+               return self._shelve.__iter__()
+
+       def items(self):
+               return self._shelve.iteritems()
+
+       def __setitem__(self, k, v):
+               self._shelve[self._encode(k)] = self._encode(v)
+
+       def __getitem__(self, k):
+               return self._shelve[self._encode(k)]
+
+       def __delitem__(self, k):
+               del self._shelve[self._encode(k)]
+
+       def get(self, k, *args):
+               return self._shelve.get(self._encode(k), *args)
+
+       def close(self):
+               self._shelve.close()
+
+       def clear(self):
+               self._shelve.clear()
diff --git a/pym/portage/util/_async/FileCopier.py b/pym/portage/util/_async/FileCopier.py
new file mode 100644 (file)
index 0000000..27e5ab4
--- /dev/null
@@ -0,0 +1,17 @@
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from portage import os
+from portage import shutil
+from portage.util._async.ForkProcess import ForkProcess
+
+class FileCopier(ForkProcess):
+       """
+       Asynchronously copy a file.
+       """
+
+       __slots__ = ('src_path', 'dest_path')
+
+       def _run(self):
+               shutil.copy(self.src_path, self.dest_path)
+               return os.EX_OK