- Run Gentoolkit's test suite, make sure it passes:
Note: requires dev-python/snakeoil
+
+Note: If running from the gentoolkit repository, please grab the eclean test file from:
+http://code.google.com/p/genscripts/source/browse/trunk/gentoolkit/pym/gentoolkit/test/eclean/testdistfiles.tar.gz
+Please do not add/commit the test file to the gentoolkit repository
+
./setup.py test
- Create a tag for the release
Add more --debug stuff
Write tests for Dependencies._parser
Profile Dependencies._parser
- Tighten up CPV.split_cpv, it's slow and bad
Extend PackageFormatter usage to everything that outputs packages to
allow for purvasive use of -F, --format goodness
For Next Release:
- write NEWS file
- - make CPV.__init__ more strict, it allows some silly stuff
# This program is licensed under the GPL, version 2
import sys
+import os
import codecs
from functools import reduce
import portage
from portage.output import *
-from portage import os
from getopt import getopt, GetoptError
from getopt import getopt, GetoptError
import portage
-from portage import os
import gentoolkit as gen
from gentoolkit import errors
)
loaded_module.main(module_args)
except portage.exception.AmbiguousPackageName as err:
- raise errors.GentoolkitAmbiguousPackage(err)
+ raise errors.GentoolkitAmbiguousPackage(err.args[0])
except IOError as err:
if err.errno != errno.EPIPE:
raise
import time
from getopt import gnu_getopt, GetoptError
-from portage import os
-
import gentoolkit.pprinter as pp
from gentoolkit.formatters import format_options
from gentoolkit.base import mod_usage
from __future__ import print_function
+import os
import sys
import gentoolkit
from gentoolkit.analyse.output import RebuildPrinter
import portage
-from portage import os
def cpv_all_diff_use(
import errno
+import os
import sys
import time
from getopt import gnu_getopt, GetoptError
-from portage import os
-
import gentoolkit
from gentoolkit import errors
#from gentoolkit.textwrap_ import TextWrapper
from __future__ import print_function
+import os
import sys
-from portage import os
import gentoolkit.pprinter as pp
from gentoolkit.eclean.pkgindex import PkgIndex
__description__ = "A cleaning tool for Gentoo distfiles and binaries."
+import os
import sys
import re
import time
import getopt
import portage
-from portage import os
from portage.output import white, yellow, turquoise, green, teal, red
import gentoolkit.pprinter as pp
from __future__ import print_function
+import os
import sys
import re
import portage
-from portage import os
from gentoolkit.pprinter import warn
# Misc. shortcuts to some portage stuff:
import subprocess
+import os
import sys
import gentoolkit.pprinter as pp
import portage
-from portage import os
class PkgIndex(object):
from __future__ import print_function
+import os
import re
import stat
import sys
+from functools import partial
import portage
-from portage import os
import gentoolkit
import gentoolkit.pprinter as pp
from gentoolkit.eclean.exclude import (exclDictMatchCP, exclDictExpand,
exclDictExpandPkgname, exclMatchFilename)
-#from gentoolkit.package import Package
-from gentoolkit.helpers import walk
# Misc. shortcuts to some portage stuff:
err = sys.stderr
deprecated_message=""""Deprecation Warning: Installed package: %s
- Is no longer in the tree or an installed overlay"""
+ Is no longer in the tree or an installed overlay"""
DEPRECATED = pp.warn(deprecated_message)
debug_modules = []
time_limit=0,
size_limit=0,
_distdir=distdir,
- deprecate=False
+ deprecate=False,
+ extra_checks=()
):
"""Find all obsolete distfiles.
# gather the files to be cleaned
self.output("...checking limits for %d ebuild sources"
%len(pkgs))
- clean_me = self._check_limits(_distdir,
- size_limit, time_limit, exclude)
+
+ checks = self._get_default_checks(size_limit, time_limit, exclude)
+ checks.extend(extra_checks)
+ clean_me = self._check_limits(_distdir, checks, clean_me)
# remove any protected files from the list
self.output("...removing protected sources from %s candidates to clean"
%len(clean_me))
####################### begin _check_limits code block
- def _check_limits(self,
- _distdir,
- size_limit,
- time_limit,
- exclude,
- clean_me={}
- ):
- """Checks files if they exceed size and/or time_limits, etc.
- """
- checks = [self._isreg_limit_]
+ def _get_default_checks(self, size_limit, time_limit, excludes):
+ #checks =[(self._isreg_check_, "is_reg_check")]
+ checks =[self._isreg_check_]
+ if 'filenames' in excludes:
+ #checks.append((partial(self._filenames_check_, excludes), "Filenames_check"))
+ checks.append(partial(self._filenames_check_, excludes))
+ else:
+ self.output(" - skipping exclude filenames check")
if size_limit:
- checks.append(self._size_limit_)
- self.size_limit = size_limit
+ #checks.append((partial(self._size_check_, size_limit), "size_check"))
+ checks.append(partial(self._size_check_, size_limit))
else:
self.output(" - skipping size limit check")
if time_limit:
- checks.append(self._time_limit_)
- self.time_limit = time_limit
+ #print("time_limit = ", time_limit/1000000,"M sec")
+ #checks.append((partial(self._time_check_, time_limit), "time_check"))
+ checks.append(partial(self._time_check_, time_limit))
else:
self.output(" - skipping time limit check")
- if 'filenames' in exclude:
- checks.append(self._filenames_limit_)
- self.exclude = exclude
- else:
- self.output(" - skipping exclude filenames check")
- max_index = len(checks)
+ return checks
+
+
+ def _check_limits(self,
+ _distdir,
+ checks,
+ clean_me=None
+ ):
+ """Checks files if they exceed size and/or time_limits, etc.
+
+ To start with everything is considered dirty and is excluded
+ only if it matches some condition.
+ """
+ if clean_me is None:
+ clean_me = {}
for file in os.listdir(_distdir):
filepath = os.path.join(_distdir, file)
try:
- file_stat = os.stat(filepath)
- except:
- continue
- _index = 0
- next = True
- skip_file = False
- while _index<max_index and next:
- next, skip_file = checks[_index](file_stat, file)
- _index +=1
- if skip_file:
+ file_stat = os.lstat(filepath)
+ except EnvironmentError:
continue
- # this is a candidate for cleaning
- #print( "Adding file to clean_list:", file)
- clean_me[file]=[filepath]
+ is_dirty = False
+ #for check, check_name in checks:
+ for check in checks:
+ should_break, is_dirty = check(file_stat, file)
+ if should_break:
+ break
+
+ if is_dirty:
+ #print( "%s Adding file to clean_list:" %check_name, file)
+ clean_me[file]=[filepath]
return clean_me
- def _isreg_limit_(self, file_stat, file):
+ @staticmethod
+ def _isreg_check_(file_stat, file):
"""check if file is a regular file."""
is_reg_file = stat.S_ISREG(file_stat[stat.ST_MODE])
- return is_reg_file, not is_reg_file
+ return not is_reg_file, is_reg_file
- def _size_limit_(self, file_stat, file):
+ @staticmethod
+ def _size_check_(size_limit, file_stat, file):
"""checks if the file size exceeds the size_limit"""
- if (file_stat[stat.ST_SIZE] >= self.size_limit):
- #print( "size match ", file, file_stat[stat.ST_SIZE])
- return False, True
- return True, False
-
- def _time_limit_(self, file_stat, file):
- """checks if the file exceeds the time_limit"""
- if (file_stat[stat.ST_MTIME] >= self.time_limit):
- #print( "time match ", file, file_stat[stat.ST_MTIME])
- return False, True
- return True,False
-
- def _filenames_limit_(self, file_stat, file):
+ if (file_stat[stat.ST_SIZE] >= size_limit):
+ #print( "size mismatch ", file, file_stat[stat.ST_SIZE])
+ return True, False
+ return False, True
+
+ @staticmethod
+ def _time_check_(time_limit, file_stat, file):
+ """checks if the file exceeds the time_limit
+ (think forward, not back, time keeps increasing)"""
+ if (file_stat[stat.ST_MTIME] >= time_limit):
+ #print( "time match too young ", file, file_stat[stat.ST_MTIME]/1000000,"M sec.")
+ return True, False
+ #print( "time match too old", file, file_stat[stat.ST_MTIME]/1000000,"M sec.")
+ return False, True
+
+ @staticmethod
+ def _filenames_check_(exclude, file_stat, file):
"""checks if the file matches an exclusion file listing"""
# Try to match file name directly
- if file in self.exclude['filenames']:
- return False, True
+ if file in exclude['filenames']:
+ return True, False
# See if file matches via regular expression matching
else:
file_match = False
- for file_entry in self.exclude['filenames']:
- if self.exclude['filenames'][file_entry].match(file):
+ for file_entry in exclude['filenames']:
+ if exclude['filenames'][file_entry].match(file):
file_match = True
break
if file_match:
- return False, True
- return True, False
+ #print( "filename match ", file)
+ return True, False
+ return False, True
####################### end _check_limits code block
- def _remove_protected(self,
+ @staticmethod
+ def _remove_protected(
pkgs,
clean_me
):
def _non_destructive(self,
destructive,
fetch_restricted,
- pkgs_ = {},
- exclude={}
+ pkgs_ = None,
+ exclude=None
):
"""performs the non-destructive checks
@returns packages and thier SRC_URI's: {cpv: src_uri,}
@rtype: dictionary
"""
- pkgs = pkgs_.copy()
+ if pkgs_ is None:
+ pkgs = {}
+ else:
+ pkgs = pkgs_.copy()
+ if exclude is None:
+ exclude = {}
deprecated = {}
# the following code block was split to optimize for speed
# list all CPV from portree (yeah, that takes time...)
cpvs.difference_update(installed_cpvs)
self.output(" - getting fetch-restricted source file names " +
"for %d remaining ebuilds" %len(cpvs))
- pkgs, _deprecated = self._fetch_restricted(destructive, pkgs, cpvs)
+ pkgs, _deprecated = self._fetch_restricted(pkgs, cpvs)
deprecated.update(_deprecated)
else:
self.output(" - getting source file names " +
deprecated.update(_deprecated)
return pkgs, deprecated
- def _fetch_restricted(self, destructive, pkgs_, cpvs):
+ def _fetch_restricted(self, pkgs_, cpvs):
"""perform fetch restricted non-destructive source
filename lookups
- @param destructive: boolean
@param pkgs_: starting dictionary to add to
@param cpvs: set of (cat/pkg-ver, ...) identifiers
@return a new pkg dictionary
@rtype: dictionary
"""
- pkgs = pkgs_.copy()
+ if pkgs_ is None:
+ pkgs = {}
+ else:
+ pkgs = pkgs_.copy()
deprecated = {}
for cpv in cpvs:
# get SRC_URI and RESTRICT from aux_get
@return a new pkg dictionary
@rtype: dictionary
"""
- pkgs = pkgs_.copy()
+ if pkgs_ is None:
+ pkgs = {}
+ else:
+ pkgs = pkgs_.copy()
deprecated = {}
for cpv in cpvs:
# get SRC_URI from aux_get
def _destructive(self,
package_names,
exclude,
- pkgs_={},
+ pkgs_=None,
installed_included=False
):
"""Builds on pkgs according to input options
@returns pkgs: {cpv: src_uri,}
"""
- pkgs = pkgs_.copy()
+ if pkgs_ is None:
+ pkgs = {}
+ else:
+ pkgs = pkgs_.copy()
deprecated = {}
pkgset = set()
if not installed_included:
#self.output(" - done...")
return pkgs, deprecated
- def _get_excludes(self, exclude):
+ @staticmethod
+ def _get_excludes(exclude):
"""Expands the exclude dictionary into a set of
CPV's
def findPackages(
options,
- exclude={},
+ exclude=None,
destructive=False,
time_limit=0,
package_names=False,
@rtype: dict
@return clean_me i.e. {'cat/pkg-ver.tbz2': [filepath],}
"""
+ if exclude is None:
+ exclude = {}
clean_me = {}
# create a full package dictionary
print( pp.error("(Check your /etc/make.conf and environment)."), file=sys.stderr)
print( pp.error("Error: %s" %str(er)), file=sys.stderr)
exit(1)
- for root, dirs, files in walk(pkgdir):
+ for root, dirs, files in os.walk(pkgdir):
if root[-3:] == 'All':
continue
for file in files:
from __future__ import print_function
-# Move to Imports section after Python 2.6 is stable
-
-
__all__ = (
'format_options',
'format_package_names',
# =======
import errno
+import os
import sys
import time
from getopt import getopt, GetoptError
import portage
-from portage import os
import gentoolkit
from gentoolkit import CONFIG
)
loaded_module.main(module_args)
except portage.exception.AmbiguousPackageName as err:
- raise errors.GentoolkitAmbiguousPackage(err)
+ raise errors.GentoolkitAmbiguousPackage(err.args[0])
except IOError as err:
if err.errno != errno.EPIPE:
raise
# =======
QUERY_OPTS = {
- "fullRegex": False,
- "earlyOut": False,
- "nameOnly": False
+ "full_regex": False,
+ "early_out": False,
+ "name_only": False
}
# =======
name = pkg.cp
else:
name = str(pkg.cpv)
- print(name)
+ pp.uprint(name)
def print_verbose(self, pkg, cfile):
"Format for full output."
name = pkg.cp
else:
name = str(pkg.cpv)
- print(pp.cpv(name), "(" + file_str + ")")
-
+ pp.uprint(pp.cpv(name), "(" + file_str + ")")
# =========
# Functions
sys.stderr.write(pp.warn("Use of --earlyout is deprecated."))
sys.stderr.write(pp.warn("Please use --early-out."))
print()
- QUERY_OPTS['earlyOut'] = True
+ QUERY_OPTS['early_out'] = True
elif opt in ('-f', '--full-regex'):
- QUERY_OPTS['fullRegex'] = True
+ QUERY_OPTS['full_regex'] = True
elif opt in ('-n', '--name-only'):
- QUERY_OPTS['nameOnly'] = True
+ QUERY_OPTS['name_only'] = True
def print_help(with_description=True):
sys.exit(2)
if CONFIG['verbose']:
- print(" * Searching for %s ... " % (pp.regexpquery(",".join(queries))))
+ pp.uprint(" * Searching for %s ... " % (
+ pp.regexpquery(",".join(queries)))
+ )
printer_fn = BelongsPrinter(
- verbose=CONFIG['verbose'], name_only=QUERY_OPTS['nameOnly']
+ verbose=CONFIG['verbose'], name_only=QUERY_OPTS['name_only']
)
find_owner = FileOwner(
- is_regex=QUERY_OPTS['fullRegex'],
- early_out=QUERY_OPTS['earlyOut'],
+ is_regex=QUERY_OPTS['full_regex'],
+ early_out=QUERY_OPTS['early_out'],
printer_fn=printer_fn
)
from __future__ import print_function
-# Move to Imports sections when Python 2.6 is stable
-
-
__docformat__ = 'epytext'
# =======
# =======
import sys
+import os
from getopt import gnu_getopt, GetoptError
-from portage import os
-
import gentoolkit.pprinter as pp
from gentoolkit import errors
from gentoolkit.atom import Atom
# =======
QUERY_OPTS = {
- 'onlyLatest': False,
- 'showFullLog': False,
+ 'only_latest': False,
+ 'show_full_log': False,
'limit': None,
'from': None,
'to': None
print_help()
sys.exit(0)
elif opt in ('-f', '--full'):
- QUERY_OPTS['showFullLog'] = True
+ QUERY_OPTS['show_full_log'] = True
elif opt in ('-l', '--latest'):
- QUERY_OPTS['onlyLatest'] = True
+ QUERY_OPTS['only_latest'] = True
elif opt in ('--limit',):
set_limit(posarg)
elif opt in ('--from',):
len_entries = len(entries)
for i, entry in enumerate(entries, start=1):
if i < len_entries:
- print(entry)
+ pp.uprint(entry)
else:
- print(entry.strip())
+ pp.uprint(entry.strip())
def set_limit(posarg):
# Output
#
- if (QUERY_OPTS['onlyLatest'] or (
+ if (QUERY_OPTS['only_latest'] or (
changelog.entries and not changelog.indexed_entries
)):
- print(changelog.latest.strip())
+ pp.uprint(changelog.latest.strip())
else:
end = QUERY_OPTS['limit'] or len(changelog.indexed_entries)
if QUERY_OPTS['to'] or QUERY_OPTS['from']:
to_ver=QUERY_OPTS['to']
)[:end]
)
- elif QUERY_OPTS['showFullLog']:
+ elif QUERY_OPTS['show_full_log']:
print_entries(changelog.full[:end])
else:
# Raises GentoolkitInvalidAtom here if invalid
# Imports
# =======
+import os
import sys
from functools import partial
from getopt import gnu_getopt, GetoptError
import portage.checksum as checksum
-from portage import os
import gentoolkit.pprinter as pp
from gentoolkit import errors
else:
if verbose:
if not cpv in seen:
- print("* Checking %s ..." % (pp.emph(str(cpv))))
+ pp.uprint("* Checking %s ..." % (pp.emph(str(cpv))))
seen.append(cpv)
else:
- print("%s:" % cpv, end=' ')
+ pp.uprint("%s:" % cpv, end=' ')
if verbose:
for err in errs:
# =======
QUERY_OPTS = {
- "includeMasked": False,
- "onlyDirect": True,
- "maxDepth": -1,
+ "include_masked": False,
+ "only_direct": True,
+ "max_depth": -1,
}
# =======
"""Verbosely prints a set of dep strings."""
sep = ' ? ' if (depatom and use_conditional) else ''
- print(indent + pp.cpv(cpv), "(" + use_conditional + sep + depatom + ")")
+ pp.uprint(indent + pp.cpv(cpv), "(" + use_conditional +
+ sep + depatom + ")")
# W0613: *Unused argument %r*
# pylint: disable-msg=W0613
def print_quiet(indent, cpv, use_conditional, depatom):
"""Quietly prints a subset set of dep strings."""
- print(indent + pp.cpv(cpv))
+ pp.uprint(indent + pp.cpv(cpv))
def format_depend(self, dep, dep_is_displayed):
"""Format a dependency for printing.
print_help()
sys.exit(0)
elif opt in ('-a', '--all-packages'):
- QUERY_OPTS['includeMasked'] = True
+ QUERY_OPTS['include_masked'] = True
elif opt in ('-D', '--indirect'):
- QUERY_OPTS['onlyDirect'] = False
+ QUERY_OPTS['only_direct'] = False
elif opt in ('--depth'):
if posarg.isdigit():
depth = int(posarg)
print()
print_help(with_description=False)
sys.exit(2)
- QUERY_OPTS["maxDepth"] = depth
+ QUERY_OPTS["max_depth"] = depth
def main(input_args):
print()
pkg = Dependencies(query)
- if QUERY_OPTS['includeMasked']:
+ if QUERY_OPTS['include_masked']:
pkggetter = get_cpvs
else:
pkggetter = get_installed_cpvs
if CONFIG['verbose']:
print(" * These packages depend on %s:" % pp.emph(pkg.cpv))
pkg.graph_reverse_depends(
- pkgset=sorted(pkggetter(), key = CPV),
- max_depth=QUERY_OPTS["maxDepth"],
- only_direct=QUERY_OPTS["onlyDirect"],
+ pkgset=sorted(pkggetter(), key=CPV),
+ max_depth=QUERY_OPTS["max_depth"],
+ only_direct=QUERY_OPTS["only_direct"],
printer_fn=dep_print
)
pkg.environment('KEYWORDS'))]
mask = pp.masking(mask)
try:
- print(' '.join((indent, decorator, pp.cpv(str(pkg.cpv)), atom, mask, use)))
+ pp.uprint(' '.join(
+ (indent, decorator, pp.cpv(str(pkg.cpv)), atom, mask, use)
+ ))
except AttributeError:
# 'NoneType' object has no attribute 'cpv'
- print(''.join((indent, decorator, "(no match for %r)" % dep.atom)))
+ pp.uprint(''.join((indent, decorator, "(no match for %r)" % dep.atom)))
def make_depgraph(pkg, printer_fn):
"""Create and display depgraph for each package."""
+ print()
if CONFIG['verbose']:
- print() # blank line improves readability & package version separation
- print(" * " + pp.subsection("dependency graph for ") + pp.cpv(str(pkg.cpv)))
+ pp.uprint(" * " + pp.subsection("dependency graph for ") +
+ pp.cpv(str(pkg.cpv)))
else:
- print()
- print("%s:" % pkg.cpv)
+ pp.uprint("%s:" % pkg.cpv)
# Print out the first package
printer_fn(0, pkg, None, initial_pkg=True)
n_packages = pp.number(str(len(deps)))
max_seen = pp.number(str(max(x[0] for x in deps)))
info = "[ %s stats: packages (%s), max depth (%s) ]"
- print(info % (pkgname, n_packages, max_seen))
+ pp.uprint(info % (pkgname, n_packages, max_seen))
def main(input_args):
# Imports
# =======
+import os
import sys
from getopt import gnu_getopt, GetoptError
import portage
-from portage import os
import gentoolkit.pprinter as pp
from gentoolkit.equery import (format_filetype, format_options, mod_usage,
if contents[name][0] == "dir":
if len(last) == 0:
last = basename
- print(pp.path(indent + basename[0]))
+ pp.uprint(pp.path(indent + basename[0]))
continue
for i, directory in enumerate(basename):
try:
pass
last = basename
if len(last) == 1:
- print(pp.path(indent + last[0]))
+ pp.uprint(pp.path(indent + last[0]))
continue
- print(pp.path(indent + "> /" + last[-1]))
+ pp.uprint(pp.path(indent + "> /" + last[-1]))
elif contents[name][0] == "sym":
- print(pp.path(indent + "+"), end=' ')
- print(pp.path_symlink(basename[-1] + " -> " + contents[name][2]))
+ pp.uprint(pp.path(indent + "+"), end=' ')
+ pp.uprint(pp.path_symlink(basename[-1] + " -> " +
+ contents[name][2]))
else:
- print(pp.path(indent + "+ ") + basename[-1])
+ pp.uprint(pp.path(indent + "+ ") + basename[-1])
else:
- print(format_filetype(
+ pp.uprint(format_filetype(
name,
contents[name],
show_type=QUERY_OPTS["show_type"],
for pkg in matches:
if CONFIG['verbose']:
- print(" * Contents of %s:" % pp.cpv(str(pkg.cpv)))
+ pp.uprint(" * Contents of %s:" % pp.cpv(str(pkg.cpv)))
contents = pkg.parsed_contents()
display_files(filter_contents(contents))
not QUERY_OPTS["in_porttree"]):
if not 'O' in pkgstr.location:
return
- print(pkgstr)
+ pp.uprint(pkgstr)
print()
if CONFIG['verbose']:
- print(" * Searching for USE flag %s ... " % pp.emph(query))
+ pp.uprint(" * Searching for USE flag %s ... " % pp.emph(query))
for pkg in matches:
display_useflags(query, pkg)
not QUERY_OPTS["in_porttree"]):
if not 'O' in pkgstr.location:
continue
- print(pkgstr)
+ pp.uprint(pkgstr)
if QUERY_OPTS["include_mask_reason"]:
ms_int, ms_orig = pkgstr.format_mask_status()
status = ', '.join(ms_orig)
explanation = mask_reason[0]
mask_location = mask_reason[1]
- print(" * Masked by %r" % status)
- print(" * %s:" % mask_location)
- print('\n'.join(
+ pp.uprint(" * Masked by %r" % status)
+ pp.uprint(" * %s:" % mask_location)
+ pp.uprint('\n'.join(
[' * %s' % line.lstrip(' #')
for line in explanation.splitlines()]
))
# =======
import re
+import os
import sys
-import codecs
from getopt import gnu_getopt, GetoptError
from functools import partial
-from portage import os
-
import gentoolkit.pprinter as pp
from gentoolkit import errors
from gentoolkit import keyword
result = []
- for kw in sorted(keywords, cmp=keyword.compare_strs):
+ for kw in sorted(keywords, key=keyword.Keyword):
if kw.startswith('-'):
# arch masked
kw = pp.keyword(kw, stable=False, hard_masked=True)
def call_format_functions(best_match, matches):
"""Call information gathering functions and display the results."""
- if hasattr(sys.stdout, "buffer"):
- utf8_stdout = codecs.getwriter("utf-8")(sys.stdout.buffer)
- else:
- utf8_stdout = codecs.getwriter("utf-8")(sys.stdout)
- uprint = partial(print, file=utf8_stdout)
-
if CONFIG['verbose']:
repo = best_match.repo_name()
- uprint(" * %s [%s]" % (pp.cpv(best_match.cp), pp.section(repo)))
+ pp.uprint(" * %s [%s]" % (pp.cpv(best_match.cp), pp.section(repo)))
got_opts = False
if any(QUERY_OPTS.values()):
print_sequence(format_list(herds))
else:
for herd in herds:
- uprint(format_line(herd, "Herd: ", " " * 13))
+ pp.uprint(format_line(herd, "Herd: ", " " * 13))
if QUERY_OPTS["maintainer"] or not got_opts:
maints = format_maintainers(best_match.metadata.maintainers())
print_sequence(format_list(maints))
else:
if not maints:
- uprint(format_line([], "Maintainer: ", " " * 13))
+ pp.uprint(format_line([], "Maintainer: ", " " * 13))
else:
for maint in maints:
- uprint(format_line(maint, "Maintainer: ", " " * 13))
+ pp.uprint(format_line(maint, "Maintainer: ", " " * 13))
if QUERY_OPTS["upstream"] or not got_opts:
upstream = format_upstream(best_match.metadata.upstream())
upstream = format_list(upstream)
else:
upstream = format_list(upstream, "Upstream: ", " " * 13)
- print_sequence(upstream, file = utf8_stdout)
+ print_sequence(upstream)
if not got_opts:
pkg_loc = best_match.package_path()
- uprint(format_line(pkg_loc, "Location: ", " " * 13))
+ pp.uprint(format_line(pkg_loc, "Location: ", " " * 13))
if QUERY_OPTS["keywords"] or not got_opts:
# Get {<Package 'dev-libs/glib-2.20.5'>: [u'ia64', u'm68k', ...], ...}
match, fmtd_keywords, slot, verstr_len
)
if QUERY_OPTS["keywords"]:
- uprint(keywords_line)
+ pp.uprint(keywords_line)
else:
indent = " " * (16 + verstr_len)
- uprint(format_line(keywords_line, "Keywords: ", indent))
+ pp.uprint(format_line(keywords_line, "Keywords: ", indent))
if QUERY_OPTS["description"]:
desc = best_match.metadata.descriptions()
- print_sequence(format_list(desc), file = utf8_stdout)
+ print_sequence(format_list(desc))
if QUERY_OPTS["useflags"]:
useflags = format_useflags(best_match.metadata.use())
- print_sequence(format_list(useflags), file = utf8_stdout)
+ print_sequence(format_list(useflags))
if QUERY_OPTS["xml"]:
print_file(os.path.join(best_match.package_path(), 'metadata.xml'))
size, files, uncounted = pkg.size()
if CONFIG['verbose']:
- print(" * %s" % pp.cpv(str(pkg.cpv)))
+ pp.uprint(" * %s" % pp.cpv(str(pkg.cpv)))
print("Total files : %s".rjust(25) % pp.number(str(files)))
if uncounted:
print("Total size : %s".rjust(25) % size_str)
else:
info = "%s: total(%d), inaccessible(%d), size(%s)"
- print(info % (str(pkg.cpv), files, uncounted, size))
+ pp.uprint(info % (str(pkg.cpv), files, uncounted, size))
def format_bytes(bytes_, precision=2):
# Imports
# =======
+import os
import sys
from functools import partial
from getopt import gnu_getopt, GetoptError
from glob import glob
-from portage import os, settings
+from portage import settings
import gentoolkit.pprinter as pp
from gentoolkit import errors
restrict = "(%s %s)" % (pp.emph("Restricted to"),
pp.cpv(restrict))
twrap.initial_indent = flag_name
- print(twrap.fill(restrict))
+ pp.uprint(twrap.fill(restrict))
if desc:
twrap.initial_indent = twrap.subsequent_indent
- print(twrap.fill(desc))
+ pp.uprint(twrap.fill(desc))
else:
print(" : <unknown>")
else:
if desc:
twrap.initial_indent = flag_name
desc = twrap.fill(desc)
- print(desc)
+ pp.uprint(desc)
else:
twrap.initial_indent = flag_name
print(twrap.fill("<unknown>"))
else:
- print(markers[in_makeconf] + flag)
+ pp.uprint(markers[in_makeconf] + flag)
def get_global_useflags():
# Imports
# =======
+import os
import sys
from getopt import gnu_getopt, GetoptError
-from portage import os
import gentoolkit.pprinter as pp
from gentoolkit import errors
pkg = sorted(matches).pop()
ebuild_path = pkg.ebuild_path()
if ebuild_path:
- print(os.path.normpath(ebuild_path))
+ pp.uprint(os.path.normpath(ebuild_path))
else:
sys.stderr.write(
pp.warn("No ebuilds to satisfy %s" % pkg.cpv)
import sys
import time
-from portage import os
-
import gentoolkit
from gentoolkit.textwrap_ import TextWrapper
import gentoolkit.pprinter as pp
import sys
+import os
try:
from urllib import urlopen
except ImportError:
sys.path.insert(0, "/usr/lib/portage/pym")
import portage
-from portage import os
# Note: the space for rgt and rlt is important !!
opMapping = {"le": "<=", "lt": "<", "eq": "=", "gt": ">", "ge": ">=",
the query module, where they are called as: Query('portage').find_*().
"""
-from __future__ import print_function
-
__all__ = (
'ChangeLog',
'FileOwner',
'get_installed_cpvs',
'get_uninstalled_cpvs',
'uniqify',
- 'walk'
)
__docformat__ = 'epytext'
# Imports
# =======
+import os
import sys
import re
import codecs
from functools import partial
from itertools import chain
-from portage import os, _unicode_decode, _encodings
-
from gentoolkit import pprinter as pp
from gentoolkit import errors
from gentoolkit.atom import Atom
def print_file(path):
"""Display the contents of a file."""
- out = sys.stdout
- if hasattr(out, "buffer"):
- out = out.buffer
-
with open(path, "rb") as open_file:
lines = open_file.read()
- print(lines.strip(), file = out)
+ pp.uprint(lines.strip())
-def print_sequence(seq, file = sys.stdout):
+def print_sequence(seq):
"""Print every item of a sequence."""
for item in seq:
- print(item, file = file)
+ pp.uprint(item)
def uniqify(seq, preserve_order=True):
return result
-
-def walk(top, topdown = True, onerror = None, followlinks = False):
- """Variant of os.walk that always returns unicode filenames"""
- for root, dirs, files in os.walk(top, topdown, onerror, followlinks):
- root = _unicode_decode(root, _encodings["fs"], errors = "strict")
- dirs = [
- _unicode_decode(x, _encodings["fs"], errors = "strict")
- for x in dirs
- ]
- files = [
- _unicode_decode(x, _encodings["fs"], errors = "strict")
- for x in files
- ]
- # XXX: in contrast with os.walk we ignore modifications to dirs here
- yield root, dirs, files
-
# vim: set ts=4 sw=4 tw=79:
def __init__(self, keyword):
self.keyword = keyword
+ arch, sep, os = keyword.partition('-')
+ self.arch = arch
+ self.os = os
+
+ def __eq__(self, other):
+ if not isinstance(other, self.__class__):
+ return False
+ return self.keyword == other.keyword
+
+ def __ne__(self, other):
+ return not self == other
+
+ def __lt__(self, other):
+ if not isinstance(other, self.__class__):
+ raise TypeError("other isn't of %s type, is %s" % (
+ self.__class__, other.__class__)
+ )
+ if self.os < other.os:
+ return True
+ return self.arch < other.arch
+
+ def __le__(self, other):
+ return self == other or self < other
+
+ def __gt__(self, other):
+ return not self <= other
+
+ def __ge__(self, other):
+ return self == other or self > other
def __str__(self):
return self.keyword
kw2_arch, sep, kw2_os = kw2.partition('-')
if kw1_arch != kw2_arch:
if kw1_os != kw2_os:
- return cmp(kw1_os, kw2_os)
- return cmp(kw1_arch, kw2_arch)
- return cmp(kw1_os, kw2_os)
+ return -1 if kw1_os < kw2_os else 1
+ return -1 if kw1_arch < kw2_arch else 1
+ if kw1_os == kw2_os:
+ return 0
+ return -1 if kw1_os < kw2_os else 1
def reduce_keywords(keywords):
# Imports
# =======
+import os
import re
import xml.etree.cElementTree as etree
-from portage import os, settings
+from portage import settings
# =======
# Classes
# Imports
# =======
+import os
from string import Template
import portage
-from portage import os, settings
+from portage import settings
from portage.util import LazyItemsDict
import gentoolkit.pprinter as pp
# =======
import sys
+import locale
import portage.output as output
from portage import archlist
"""Returns a warning string."""
return "!!! " + string + "\n"
+try:
+ unicode
+except NameError:
+ unicode = str
+
+def uprint(*args, **kw):
+ """Replacement for the builtin print function.
+
+ This version gracefully handles characters not representable in the
+ user's current locale (through the errors='replace' handler).
+
+ @see: >>> help(print)
+ """
+
+ sep = kw.pop('sep', ' ')
+ end = kw.pop('end', '\n')
+ file = kw.pop("file", sys.stdout)
+ if kw:
+ raise TypeError("got invalid keyword arguments: {0}".format(list(kw)))
+ file = getattr(file, 'buffer', file)
+
+ encoding = locale.getpreferredencoding()
+
+ def encoded_args():
+ for arg in args:
+ if isinstance(arg, bytes):
+ yield arg
+ else:
+ yield unicode(arg).encode(encoding, 'replace')
+
+ sep = sep.encode(encoding, 'replace')
+ end = end.encode(encoding, 'replace')
+ text = sep.join(encoded_args())
+ file.write(text + end)
+
# vim: set ts=4 sw=4 tw=79:
def print_summary(self):
"""Print a summary of the query."""
- cpv = CPV(self.query)
- cat, pkg = cpv.category, cpv.name + cpv.fullversion
- if cat and not self.is_regex:
- cat_str = "in %s " % pp.emph(cat.lstrip('><=~!'))
- else:
+ if self.query_type == "set":
cat_str = ""
-
- if self.is_regex:
pkg_str = pp.emph(self.query)
else:
- pkg_str = pp.emph(pkg)
+ cpv = CPV(self.query)
+ cat, pkg = cpv.category, cpv.name + cpv.fullversion
+ if cat and not self.is_regex:
+ cat_str = "in %s " % pp.emph(cat.lstrip('><=~!'))
+ else:
+ cat_str = ""
+
+ if self.is_regex:
+ pkg_str = pp.emph(self.query)
+ else:
+ pkg_str = pp.emph(pkg)
repo = ''
if self.repo_filter is not None:
repo = ' %s' % pp.section(self.repo_filter)
- print(" * Searching%s for %s %s..." % (repo, pkg_str, cat_str))
+ pp.uprint(" * Searching%s for %s %s..." % (repo, pkg_str, cat_str))
def smart_find(
self,
# catch the ambiguous package Exception
except portage.exception.AmbiguousPackageName as err:
matches = []
- for pkgkey in err[0]:
+ for pkgkey in err.args[0]:
matches.extend(VARDB.match(pkgkey))
except portage.exception.InvalidAtom as err:
raise errors.GentoolkitInvalidAtom(err)
#
# $Header$
-__version__= "0.0.1"
-__author__ = "Brian Dolbec"
-__email__ = "brian.dolbec@gmail.com"
-
from __future__ import with_statement
from __future__ import print_function
import gentoolkit.pprinter as pp
+__version__= "0.0.1"
+__author__ = "Brian Dolbec"
+__email__ = "brian.dolbec@gmail.com"
-dir_mode = 0774
-file_mode = 0644
+dir_mode = int('0774', 8)
+file_mode = int('0644', 8)
def make_dir(path):
Will Error and exit if the target dir already exits"""
try:
os.makedirs(path, dir_mode)
- except EnvironmentError, er:
+ except EnvironmentError as er:
print( pp.error("Error creating path:%s" %path), file=sys.stderr)
- print( pp.error("Error: %s" %str(er), file=sys.stderr)
+ print( pp.error("Error: %s" %str(er), file=sys.stderr))
sys.exit(1)
if file_ not in clean_dict:
# it is included in a multifile target
continue
- elif clean_dict[file_] = []:
+ elif clean_dict[file_] == []:
clean_dict[file_] = filepath
else:
file_list = clean_dict[file_]
'xine-lib-1.1.15-textrel-fix.patch': [],
'xine-lib-1.1.16.3.tar.bz2': [],
'xorg-server-1.5.3.tar.bz2': ['xorg-server-1.5.3.tar.bz2',
- 'xorg-server-1.5.3-gentoo-patches-08.tar.bz2']
+ 'xorg-server-1.5.3-gentoo-patches-08.tar.bz2'],
'symlink-test-1.2.3.tar.bz2': distfile_symlink
}
# add some symlinks to it
path = os.path.join(self.options['target_path'], 'distfiles')
make_symlinks(path, distfile_symlink,
- dist_clean['symlink-test-1.2.3.tar.bz2']):
+ dist_clean['symlink-test-1.2.3.tar.bz2'])
# create the packages dir and populate it
path = os.path.join(self.options['target_path'], 'packages')
- make_pkgs(path, self.package_dict, self.pkg_clean):
+ make_pkgs(path, self.package_dict, self.pkg_clean)
self.targets_init = True
- def get_
+ #def get_
--- /dev/null
+# Copyright(c) 2009, Gentoo Foundation
+# Copyright 2010 Brian Dolbec <brian.dolbec@gmail.com>
+#
+# License: GPL2/BSD
+
+# $Header$
+
+
+from __future__ import print_function
+
+import os
+import unittest
+from tempfile import NamedTemporaryFile, mkdtemp
+import subprocess
+
+
+
+dir_mode = 0774
+
+CPVS = [u'sys-auth/consolekit-0.4.1', u'sys-apps/devicekit-power-014',
+ u'media-libs/sdl-pango-0.1.2', u'sys-apps/help2man-1.37.1',
+ u'app-emulation/emul-linux-x86-baselibs-20100220'
+]
+
+PROPS = {
+ u'sys-apps/devicekit-power-014': {u'SRC_URI':
+ u'http://hal.freedesktop.org/releases/DeviceKit-power-014.tar.gz',
+ u"RESTRICT": u''},
+ u'sys-apps/help2man-1.37.1': {u"SRC_URI": u'mirror://gnu/help2man/help2man-1.37.1.tar.gz',
+ u"RESTRICT": u''},
+ u'sys-auth/consolekit-0.4.1': { u"SRC_URI":
+ u'http://www.freedesktop.org/software/ConsoleKit/dist/ConsoleKit-0.4.1.tar.bz2',
+ u"RESTRICT": u''},
+ u'app-emulation/emul-linux-x86-baselibs-20100220': {
+ u"SRC_URI": u'mirror://gentoo/emul-linux-x86-baselibs-20100220.tar.gz',
+ u"RESTRICT": u'strip'},
+ u'media-libs/sdl-pango-0.1.2': {
+ u"SRC_URI": u'mirror://sourceforge/sdlpango/SDL_Pango-0.1.2.tar.gz http://zarb.org/~gc/t/SDL_Pango-0.1.2-API-adds.patch',
+ u"RESTRICT": u''}
+}
+
+FILES = [
+ u'DeviceKit-power-014.tar.gz',
+ u'help2man-1.37.1.tar.gz',
+ u'ConsoleKit-0.4.1.tar.bz2',
+ u'emul-linux-x86-baselibs-20100220.tar.gz',
+ u'SDL_Pango-0.1.2.tar.gz',
+ u'SDL_Pango-0.1.2-API-adds.patch'
+]
+
+
+class Dbapi(object):
+ """Fake portage dbapi class used to return
+ pre-determined test data in place of a live system
+
+ @param cp_all: list of cat/pkg's to use for testing
+ eg: ['app-portage/gentoolkit', 'app-portage/porthole',...]
+ @param cpv_all: list of cat/pkg-ver's to use for testing.
+ @param props: dictionary of ebuild properties to use for testing.
+ eg: {'cpv': {"SRC_URI": 'http://...', "RESTRICT": restriction},}
+ @param cp_list: ?????????
+ """
+
+ def __init__(self, cp_all=None, cpv_all=None, props=None,
+ cp_list=None, name=None):
+ self._cp_all = cp_all
+ self._cpv_all = cpv_all
+ self._props = props
+ self._cp_list = cp_list
+ self.name = name
+ #print(self.name, "DBAPI: cpv_all=")
+ #print(self._cpv_all)
+ #print(self.name, "DBAPI: props=")
+ #print(self._props)
+
+ def cp_all(self):
+ return self._cp_all[:]
+
+ def cp_list(self, package):
+ #need to determine the data to return
+ # and gather some from a live system to use for testing
+ pass
+
+ def cpv_all(self):
+ return self._cpv_all
+
+ def cpv_exists(self, cpv):
+ return cpv in self._cpv_all
+
+ def aux_get(self, cpv, prop_list):
+ """only need stubs for ["SRC_URI","RESTRICT"]
+ """
+ #print("FAKE DBAPI", cpv, prop_list)
+ props = []
+ for prop in prop_list:
+ if cpv in self._props:
+ props.append(self._props[cpv][prop])
+ else:
+ raise KeyError, self.name
+ return props
+
+
+class OutputSimulator(object):
+ """Simple output accumulator used for testing.
+ Simulates eclean.output.OutputControl class """
+
+ def __init__(self, callback):
+ self.callback = callback
+
+ def set_data(self, data):
+ """sets the data for the progress_controller to return
+ for the test being performed"""
+ self.data = data
+
+ def einfo(self, message=""):
+ self.callback('einfo', message)
+
+ def eprompt(self, message):
+ self.callback('eprompt', message)
+
+ def prettySize(self, size, justify=False, color=None):
+ self.callback('prettySize', size)
+
+ def yesNoAllPrompt(self, message="Dummy"):
+ self.callback('yesNoAllPrompt', message)
+
+ def progress_controller(self, size, key, clean_list, file_type):
+ self.callback('progress_controller', self.data)
+ return self.data
+
+ def total(self, mode, size, num_files, verb, action):
+ pass
+
+ def list_pkgs(self, pkgs):
+ self.callback('list_pkgs', pkgs)
+
+
+class TestDisfiles(object):
+
+ def __init__(self):
+ self.workdir = None
+ self.target_file = None
+ self.target_symlink = None
+ self.test_filepaths = None
+
+ def setUp(self):
+ # create the dist dir
+ self.tmpdir = mkdtemp()
+ #print("New tmpdir =", self.tmpdir)
+ os.chmod(self.tmpdir, dir_mode)
+ self.workdir = os.path.join(self.tmpdir, 'distfiles')
+ dir = os.path.dirname(os.path.abspath(__file__))
+ file = os.path.join(dir,"testdistfiles.tar.gz")
+ command = "tar -xpf %s -C %s" %(file, self.tmpdir)
+ retcode = subprocess.call(command, shell=True)
+ # create a symlink as part of the test files
+ #print()
+ self.target_symlink = "symlink-1.0.0.tar.gz"
+ os.symlink(file, os.path.join(self.workdir, self.target_symlink))
+ self.files = FILES[:]
+ self.files.append(self.target_symlink)
+ self.test_filepaths = []
+ for file in self.files:
+ self.test_filepaths.append(os.path.join(self.workdir, file))
+
+ def tearDown(self):
+ for file in self.test_filepaths:
+ os.unlink(file)
+ #print("deleting workdir =", self.workdir)
+ os.rmdir(self.workdir)
+ #print("deleting tmpdir =", self.tmpdir)
+ os.rmdir(self.tmpdir)
+
+
#
# $Header$
-from __future__ import with_statement
+from __future__ import print_function
__version__= "0.0.1"
__author__ = "Brian Dolbec"
import sys
import gentoolkit.pprinter as pp
-from test import test_support
+try:
+ from test import test_support
+except ImportError:
+ from test import support as test_support
from gentoolkit.eclean.clean import CleanUp
def useage():
"""output run options"""
- print "Useage: test_clean [OPTONS] path=test-dir"
- print " where test-dir is the location to create and populate"
- print "the testing distfiles and packages directories."
- print "All tests in this module test only the clean.py module functions"
- print
- print "OPTIONS:"
- print " -a, --all run all tests"
- print " -c, --clean clean up any previous test dirs & files"
- print " -D, --distfiles run the distfiles cleaning test"
- print " -k, --keep-dirs keep the test directories and files after the test"
- print " -p, --pretend run the test in pretend mode only"
- print " -P, --packages run the packages cleaning test"
- print " -S, --symlinks run the symlinks test"
- print " --path the location to create the temporary distfiles"
- print " and packages directories that will be test cleaned"
- print " --version test module version"
- print
+ print("Useage: test_clean [OPTONS] path=test-dir")
+ print(" where test-dir is the location to create and populate")
+ print("the testing distfiles and packages directories.")
+ print("All tests in this module test only the clean.py module functions")
+ print()
+ print("OPTIONS:")
+ print(" -a, --all run all tests")
+ print(" -c, --clean clean up any previous test dirs & files")
+ print(" -D, --distfiles run the distfiles cleaning test")
+ print(" -k, --keep-dirs keep the test directories and files after the test")
+ print(" -p, --pretend run the test in pretend mode only")
+ print(" -P, --packages run the packages cleaning test")
+ print(" -S, --symlinks run the symlinks test")
+ print(" --path the location to create the temporary distfiles")
+ print(" and packages directories that will be test cleaned")
+ print(" --version test module version")
+ print()
def parse_opts():
"pretend", "symlinks", "keep-dirs", "clean"])
#print opts
#print args
- except GetoptError, e:
- print >> sys.stderr, e.msg
+ except GetoptError as e:
+ print(e.msg, file=sys.stderr)
usage()
sys.exit(1)
try:
main(True)
except KeyboardInterrupt:
- print "Aborted."
+ print("Aborted.")
sys.exit(130)
sys.exit(0)
# Copyright(c) 2009, Gentoo Foundation
-# Copyright: 2006-2008 Brian Harring <ferringb@gmail.com>
# Copyright 2010 Brian Dolbec <brian.dolbec@gmail.com>
#
# License: GPL2/BSD
from __future__ import print_function
+from tempfile import NamedTemporaryFile, mkdtemp
import unittest
-from test import test_support
+import re
-from gentoolkit.eclean.search import *
+try:
+ from test import test_support
+except ImportError:
+ from test import support as test_support
-class Dbapi(object):
- """Fake portage dbapi class used to return
- pre-determined test data in place of a live system
+from gentoolkit.test.eclean.distsupport import *
+from gentoolkit.eclean.search import DistfilesSearch
- @param cp_all: list of cat/pkg's to use for testing
- eg: ['app-portage/gentoolkit', 'app-portage/porthole',...]
- @param cpv_all: list of cat/pkg-ver's to use for testing.
- @param props: dictionary of ebuild properties to use for testing.
- eg: {'cpv': {"SRC_URI": 'http://...', "RESTRICT": restriction},}
- @param cp_list: ?????????
+
+"""Tests for eclean's distfiles search functions."""
+
+
+class DistLimits(DistfilesSearch):
+ """subclass the DistfilesSearch class in order to override a number of
+ functions to isolate & test"""
+
+ def __init__(self,
+ output=lambda x: None,
+ portdb=None,
+ vardb=None,
+ ):
+ DistfilesSearch.__init__(self, output, portdb, vardb)
+ self.data = None
+
+ def set_data(self, data):
+ """sets the data for the functions to return for
+ the test being performed"""
+ self.data = data
+
+
+class TestCheckLimits(unittest.TestCase):
+ """Test the eclean.search.DistfilesSearch._check_limits() group.
+
+ it will test [ _get_default_checks(), _check_limits(),
+ _isreg_check_(), _size_check_(), _time_check_(), _filenames_check_()]
"""
- def __init__(self, cp_all=[], cpv_all=[], props={}, cp_list=[]):
- self._cp_all = cp_all
- self._cpv_all = cpv_all
- self._props = props
- self._cp_list = cp_list
+ test_excludes = {
+ 'blank': {},
+ 'filenames': {
+ 'filenames': {'help2man-1.37.1.tar.gz': re.compile('help2man-1.37.1.tar.gz')}
+ }
+ }
+
+ def setUp(self):
+ self.testdata = [
+ # test is_reg_limit alone, will give a base clean_me
+ { 'test': 'is_reg_limit',
+ 'params': (0, 0, self.test_excludes['blank']),
+ 'results': FILES[:],
+ 'output': [" - skipping size limit check",
+ " - skipping time limit check",
+ " - skipping exclude filenames check"
+ ]
+ },
+ # test size_limit trip
+ { 'test': 'size_limit',
+ 'params': (1024000, 0, self.test_excludes['blank']),
+ 'results': FILES[:3] + FILES[4:],
+ 'output': [
+ " - skipping time limit check",
+ " - skipping exclude filenames check"
+ ]
+ },
+ # test time_limit trip
+ { 'test': 'time_limit',
+ 'params': (0,1112671872, self.test_excludes['blank']),
+ 'results': [FILES[4]], # + FILES[5:],
+ 'output': [" - skipping size limit check",
+ " - skipping exclude filenames check"
+ ]
+ },
+ # test filenames_limit trip
+ { 'test': 'filenames_limit',
+ 'params': (0, 0, self.test_excludes['filenames']),
+ 'results': FILES[:1] + FILES[2:],
+ 'output': [" - skipping size limit check",
+ " - skipping time limit check",
+ ]
+ }
+ ]
+
+ self.testwork = TestDisfiles()
+ self.testwork.setUp()
+ self.workdir = self.testwork.workdir
+ self.target_class = DistLimits() #DistCheckLimits()
+ self.output = OutputSimulator(self.callback)
+ self.target_class.output = self.output
+ self.callback_data = []
+ self.test_index = 0
+
+ def tearDown(self):
+ self.testwork.tearDown()
+ #pass
+
+ def get_test(self, num):
+ return self.testdata[num]
+
+ def callback(self, id, data):
+ self.callback_data.append(data)
+
+ def set_limits(self, test):
+ limit = {}
+ #set is_reg always to testdata[0]
+ t1 = self.testdata[0]
+ limit[t1['test']] = {}
+ name = test['test']
+ limit[name] = {}
+ limits = test['limits']
+ for i in range(6):
+ file = self.testwork.files[i]
+ limits = test['limits']
+ limit[t1['test']][file] = t1['limits'][i]
+ if name != t1['test']:
+ limit[name][file] = limits[i]
+ return limit
- def cp_all(self):
- return self._cp_all[:]
- def cp_list(self, package):
- #need to determine the data to return
- # and gather some from a live system to use for testing
- pass
+ def test_check_limits(self):
+ """Testing DistfilesSearch._check_limits()"""
+ # pass in output=self.output.einfo
+ self.target_class.output = self.output.einfo
+ run_callbacks = []
+ run_results = []
+ print()
+ # run the tests
+ for i in range(4):
+ clean_me = {}
+ test = self.get_test(i)
+ #print("test =", test['test'])
+ if not test:
+ print("Error getting test data for index:", i)
+ #self.target_class.set_data(self.set_limits(test))
+ size_chk, time_chk, exclude = test["params"]
+ checks = self.target_class._get_default_checks(size_chk, time_chk, exclude)
+ clean_me = self.target_class._check_limits(self.workdir, checks, clean_me)
+ results = sorted(clean_me)
+ run_results.append(results)
+ self.callback_data.sort()
+ run_callbacks.append(self.callback_data)
+ self.callback_data = []
+ results = None
- def cpv_all(self):
- return self._cpv_all
+ # check results
+ for i in range(4):
+ test = self.get_test(i)
+ print("test =", test['test'])
+ if not test:
+ print("Error getting test data for index:", i)
+ test['results'].sort()
+ #print("actual=", run_results[i])
+ #print("should-be=", test['results'])
+ self.failUnlessEqual(run_results[i], test["results"],
+ "/ntest_check_limits, test# %d, test=%s, diff=%s"
+ %(i, test['test'], str(set(run_results[i]).difference(test['results'])))
+ )
+ test['output'].sort()
+ self.failUnlessEqual(run_callbacks[i], test['output'])
- def cpv_exists(self, cpv):
- return cpv in self._cpv_all
- def aux_get(self, cpv, prop_list):
- """only need stubs for ["SRC_URI","RESTRICT"]
- """
- props = []
- for prop in prop_list:
- props.append(self._props[cpv][prop])
+class TestFetchRestricted(unittest.TestCase):
+ """Tests eclean.search.DistfilesSearch._fetch_restricted and _unrestricted
+ functions
+ """
+
+ def setUp(self):
+ self.vardb = Dbapi(cp_all=[], cpv_all=CPVS,
+ props=PROPS, cp_list=[], name="FAKE VARDB")
+ self.portdb = Dbapi(cp_all=[], cpv_all=CPVS[:4],
+ props=self.get_props(CPVS[:4]), cp_list=[], name="FAKE PORTDB")
+ # set a fetch restricted pkg
+ self.portdb._props[CPVS[0]]["RESTRICT"] = u'fetch'
+ self.callback_data = []
+ self.output = self.output = OutputSimulator(self.callback)
+ self.target_class = DistfilesSearch(self.output.einfo, self.portdb, self.vardb)
+ self.target_class.portdb = self.portdb
+ self.target_class.portdb = self.portdb
+ self.testdata = {
+ 'fetch_restricted1':{
+ 'deprecated':
+ {u'app-emulation/emul-linux-x86-baselibs-20100220': u'mirror://gentoo/emul-linux-x86-baselibs-20100220.tar.gz'
+ },
+ 'pkgs':
+ {u'sys-auth/consolekit-0.4.1': u'http://www.freedesktop.org/software/ConsoleKit/dist/ConsoleKit-0.4.1.tar.bz2'
+ },
+ 'output': [
+ u'!!! "Deprecation Warning: Installed package: app-emulation/emul-linux-x86-baselibs-20100220\n\tIs no longer in the tree or an installed overlay\n'
+ ]
+ },
+ 'fetch_restricted2':{
+ 'deprecated':
+ {u'app-emulation/emul-linux-x86-baselibs-20100220': u'mirror://gentoo/emul-linux-x86-baselibs-20100220.tar.gz'
+ },
+ 'pkgs':
+ {u'sys-auth/consolekit-0.4.1': u'http://www.freedesktop.org/software/ConsoleKit/dist/ConsoleKit-0.4.1.tar.bz2'
+ },
+ 'output': [
+ u'!!! "Deprecation Warning: Installed package: app-emulation/emul-linux-x86-baselibs-20100220\n\tIs no longer in the tree or an installed overlay\n',
+ ' - Key Error looking up: app-portage/deprecated-pkg-1.0.0'
+ ]
+ },
+ 'unrestricted1':{
+ 'deprecated':{
+ u'app-emulation/emul-linux-x86-baselibs-20100220': u'mirror://gentoo/emul-linux-x86-baselibs-20100220.tar.gz'
+ },
+ 'pkgs': {
+ u'sys-apps/devicekit-power-014': u'http://hal.freedesktop.org/releases/DeviceKit-power-014.tar.gz',
+ u'sys-apps/help2man-1.37.1': u'mirror://gnu/help2man/help2man-1.37.1.tar.gz',
+ u'sys-auth/consolekit-0.4.1': u'http://www.freedesktop.org/software/ConsoleKit/dist/ConsoleKit-0.4.1.tar.bz2',
+ u'app-emulation/emul-linux-x86-baselibs-20100220': u'mirror://gentoo/emul-linux-x86-baselibs-20100220.tar.gz',
+ u'media-libs/sdl-pango-0.1.2': u'mirror://sourceforge/sdlpango/SDL_Pango-0.1.2.tar.gz http://zarb.org/~gc/t/SDL_Pango-0.1.2-API-adds.patch'
+ },
+ 'output': [
+ u'!!! "Deprecation Warning: Installed package: app-emulation/emul-linux-x86-baselibs-20100220\n\tIs no longer in the tree or an installed overlay\n',
+ ]
+ },
+ 'unrestricted2':{
+ 'deprecated':{
+ u'app-emulation/emul-linux-x86-baselibs-20100220': u'mirror://gentoo/emul-linux-x86-baselibs-20100220.tar.gz'
+ },
+ 'pkgs': {
+ u'sys-apps/devicekit-power-014': u'http://hal.freedesktop.org/releases/DeviceKit-power-014.tar.gz',
+ u'sys-apps/help2man-1.37.1': u'mirror://gnu/help2man/help2man-1.37.1.tar.gz',
+ u'sys-auth/consolekit-0.4.1': u'http://www.freedesktop.org/software/ConsoleKit/dist/ConsoleKit-0.4.1.tar.bz2',
+ u'app-emulation/emul-linux-x86-baselibs-20100220': u'mirror://gentoo/emul-linux-x86-baselibs-20100220.tar.gz',
+ u'media-libs/sdl-pango-0.1.2': u'mirror://sourceforge/sdlpango/SDL_Pango-0.1.2.tar.gz http://zarb.org/~gc/t/SDL_Pango-0.1.2-API-adds.patch'
+ },
+ 'output': [
+ u'!!! "Deprecation Warning: Installed package: app-emulation/emul-linux-x86-baselibs-20100220\n\tIs no longer in the tree or an installed overlay\n',
+ ' - Key Error looking up: app-portage/deprecated-pkg-1.0.0'
+ ]
+ }
+ }
+
+
+
+ def get_props(self, cpvs):
+ props = {}
+ for cpv in cpvs:
+ props[cpv] = PROPS[cpv]
return props
+ def callback(self, id, data):
+ self.callback_data.append(data)
+
+
+ def test__fetch_restricted(self):
+ pkgs, deprecated = self.target_class._fetch_restricted(None, CPVS)
+ self.results = {
+ 'fetch_restricted1': {
+ 'pkgs': pkgs,
+ 'deprecated': deprecated,
+ 'output': self.callback_data
+ }
+ }
+
+ self.callback_data = []
+ cpvs = CPVS[:]
+ cpvs.append('app-portage/deprecated-pkg-1.0.0')
+ pkgs, deprecated = self.target_class._fetch_restricted(None, cpvs)
+ self.results['fetch_restricted2'] = {
+ 'pkgs': pkgs,
+ 'deprecated': deprecated,
+ 'output': self.callback_data
+ }
+
+ for key in sorted(self.results):
+ testdata = self.testdata[key]
+ results = self.results[key]
+ for item in sorted(testdata):
+ self.failUnlessEqual(sorted(testdata[item]), sorted(results[item]),
+ "\ntest_fetch_restricted: %s %s data does not match\nresult=" %(key, item) +\
+ str(results[item]) + "\ntestdata=" + str(testdata[item]))
+
+
+
+ def test_unrestricted(self):
+ pkgs, deprecated = self.target_class._unrestricted(None, CPVS)
+ self.results = {
+ 'unrestricted1': {
+ 'pkgs': pkgs,
+ 'deprecated': deprecated,
+ 'output': self.callback_data
+ }
+ }
+ self.callback_data = []
+ cpvs = CPVS[:]
+ cpvs.append('app-portage/deprecated-pkg-1.0.0')
+ pkgs, deprecated = self.target_class._unrestricted(None, cpvs)
+ self.results['unrestricted2'] = {
+ 'pkgs': pkgs,
+ 'deprecated': deprecated,
+ 'output': self.callback_data
+ }
+ for key in sorted(self.results):
+ testdata = self.testdata[key]
+ results = self.results[key]
+ for item in sorted(testdata):
+ self.failUnlessEqual( sorted(testdata[item]), sorted(results[item]),
+ "\ntest_unrestricted: %s %s data does not match\nresult=" %(key, item) +\
+ str(results[item]) + "\ntestdata=" + str(testdata[item]))
-"""Tests for eclean's search modules."""
-
-class TestFindDistfiles(unittest.TestCase):
- uris = [
- u'/usr/portage/distfiles/xdg-utils-1.0.2.tgz',
- u'/usr/portage/distfiles/readline60-003',
- u'/usr/portage/distfiles/bash-completion-1.1.tar.bz2',
- u'/usr/portage/distfiles/libgweather-2.26.2.1.tar.bz2',
- u'/usr/portage/distfiles/libwnck-2.26.2.tar.bz2',
- u'/usr/portage/distfiles/gnome-cups-manager-0.33.tar.bz2',
- u'/usr/portage/distfiles/audiofile-0.2.6-constantise.patch.bz2',
- u'/usr/portage/distfiles/vixie-cron-4.1-gentoo-r4.patch.bz2',
- u'/usr/portage/distfiles/evince-2.26.2.tar.bz2',
- u'/usr/portage/distfiles/lxml-2.2.2.tgz'
- ]
- filenames = [
- u'audiofile-0.2.6-constantise.patch.bz2',
- u'bash-completion-1.1.tar.bz2',
- u'evince-2.26.2.tar.bz2',
- u'gnome-cups-manager-0.33.tar.bz2',
- u'libgweather-2.26.2.1.tar.bz2',
- u'libwnck-2.26.2.tar.bz2',
- u'lxml-2.2.2.tgz',
- u'readline60-003',
- u'vixie-cron-4.1-gentoo-r4.patch.bz2',
- u'xdg-utils-1.0.2.tgz'
- ]
-
- def test_get_filenames_from_uris(self):
- fns = sorted(get_filenames_from_uris(self.uris))
- print(fns)
- for fn, fn2 in zip(self.filenames, fns):
- self.failUnlessEqual(fn, fn2)
def test_main():
- test_support.run_unittest(TestFindDistfiles)
+
+ # Run tests
+ test_support.run_unittest(TestCheckLimits('test_check_limits'))
+ test_support.run_unittest( TestFetchRestricted('test__fetch_restricted'))
+ test_support.run_unittest( TestFetchRestricted('test_unrestricted'))
+
if __name__ == '__main__':
test_main()
+import os
import unittest
import warnings
from tempfile import NamedTemporaryFile, mktemp
except ImportError:
from test import support as test_support
-from portage import os
-
from gentoolkit import helpers
+import sys
import unittest
import warnings
from tempfile import NamedTemporaryFile
except ImportError:
from test import support as test_support
-from portage import os
-
from gentoolkit import keyword
+from gentoolkit.test import cmp
class TestGentoolkitKeyword(unittest.TestCase):
'~amd64', '~ppc', '~x86', '~amd64-linux', '~x86-linux',
'~ppc-macos', '~x86-macos', '~x86-solaris'
]
- self.failUnlessEqual(sorted(kwds_presort, cmp=compare_strs), kwds_postsort)
+ if sys.hexversion < 0x3000000:
+ self.failUnlessEqual(sorted(kwds_presort, cmp=compare_strs), kwds_postsort)
+ self.failUnlessEqual(sorted(kwds_presort, key = keyword.Keyword), kwds_postsort)
def test_main():
except ImportError:
from test import support as test_support
-from portage import os
-
from gentoolkit import query
from gentoolkit import errors
import unittest
import py_compile
-from portage import os
+import os
osp = os.path
-from gentoolkit.helpers import walk
-
"""Does a basic syntax check by compiling all modules. From Portage."""
-pym_dirs = walk(osp.dirname(osp.dirname(osp.dirname(__file__))))
+pym_dirs = os.walk(osp.dirname(osp.dirname(osp.dirname(__file__))))
blacklist_dirs = frozenset(('.svn', 'test'))
class TestForSyntaxErrors(unittest.TestCase):
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'pym'))
-from gentoolkit.helpers import walk
__version__ = os.getenv('VERSION', default='9999')
packages = [
str('.'.join(root.split(os.sep)[1:]))
- for root, dirs, files in walk('pym/gentoolkit')
+ for root, dirs, files in os.walk('pym/gentoolkit')
if '__init__.py' in files
]