From 63a7726eba738fe2ed340027039ba655ff91898a Mon Sep 17 00:00:00 2001 From: "W. Trevor King" Date: Mon, 24 Nov 2008 07:09:03 -0500 Subject: [PATCH] Added 'allow_no_rcs' flag to RCS file system access methods. Now mapfile access has fewer special cases, and there is less redundant rcs.add/update code. --- libbe/bug.py | 2 +- libbe/bugdir.py | 132 +++++++++++++++++++++++++++++------------------ libbe/comment.py | 2 +- libbe/mapfile.py | 84 ++++++++++++------------------ libbe/rcs.py | 109 +++++++++++++++++++++++++++++--------- 5 files changed, 204 insertions(+), 125 deletions(-) diff --git a/libbe/bug.py b/libbe/bug.py index ef1629c..09a2c1d 100644 --- a/libbe/bug.py +++ b/libbe/bug.py @@ -188,7 +188,7 @@ class Bug(object): return os.path.join(my_dir, name) def load(self, load_comments=False): - map = mapfile.map_load(self.get_path("values")) + map = mapfile.map_load(self.rcs, self.get_path("values")) self.summary = map.get("summary") self.creator = map.get("creator") self.target = map.get("target") diff --git a/libbe/bugdir.py b/libbe/bugdir.py index e134a2c..175f518 100644 --- a/libbe/bugdir.py +++ b/libbe/bugdir.py @@ -25,7 +25,8 @@ import doctest import mapfile import bug import utility -from rcs import rcs_by_name, detect_rcs, installed_rcs, PathNotInRoot +import rcs + class NoBugDir(Exception): def __init__(self, path): @@ -67,18 +68,16 @@ def setting_property(name, valid=None, doc=None): def getter(self): value = self.settings.get(name) if valid is not None: - if value not in valid: + if value not in valid and value != None: raise InvalidValue(name, value) return value def setter(self, value): - if valid is not None: - if value not in valid and value is not None: - raise InvalidValue(name, value) - if value is None: - del self.settings[name] - else: - self.settings[name] = value + if value != getter(self): + if value is None: + del self.settings[name] + else: + self.settings[name] = value self._save_settings(self.get_path("settings"), self.settings) return property(getter, setter, doc=doc) @@ -151,27 +150,36 @@ class BugDir (list): return tree_version def set_version(self): - self.rcs.set_file_contents(self.get_path("version"), TREE_VERSION_STRING) + self.rcs.set_file_contents(self.get_path("version"), + TREE_VERSION_STRING) rcs_name = setting_property("rcs_name", ("None", "bzr", "git", "Arch", "hg"), - doc="The name of the current RCS. Kept seperate to make saving/loading settings easy. Don't set this attribute. Set .rcs instead, and .rcs_name will be automatically adjusted.") + doc= +"""The name of the current RCS. Kept seperate to make saving/loading +settings easy. Don't set this attribute. Set .rcs instead, and +.rcs_name will be automatically adjusted.""") _rcs = None def _get_rcs(self): return self._rcs - def _set_rcs(self, rcs): - if rcs == None: - rcs = rcs_by_name("None") - self._rcs = rcs - rcs.root(self.root) - self.rcs_name = rcs.name + def _set_rcs(self, new_rcs): + if new_rcs == None: + new_rcs = rcs.rcs_by_name("None") + self._rcs = new_rcs + new_rcs.root(self.root) + self.rcs_name = new_rcs.name - rcs = property(_get_rcs, _set_rcs, doc="A revision control system (RCS) instance") + rcs = property(_get_rcs, _set_rcs, + doc="A revision control system (RCS) instance") - _user_id = setting_property("user-id", doc="The user's prefered name. Kept seperate to make saving/loading settings easy. Don't set this attribute. Set .user_id instead, and ._user_id will be automatically adjusted. This setting is only saved if ._save_user_id == True") + _user_id = setting_property("user-id", doc= +"""The user's prefered name. Kept seperate to make saving/loading +settings easy. Don't set this attribute. Set .user_id instead, +and ._user_id will be automatically adjusted. This setting is +only saved if ._save_user_id == True""") def _get_user_id(self): if self._user_id == None and self.rcs != None: @@ -183,9 +191,12 @@ class BugDir (list): self.rcs.user_id = user_id self._user_id = user_id - user_id = property(_get_user_id, _set_user_id, doc="The user's prefered name, e.g 'John Doe '. Not that the Arch RCS backend *enforces* ids with this format.") + user_id = property(_get_user_id, _set_user_id, doc= +"""The user's prefered name, e.g 'John Doe '. Note +that the Arch RCS backend *enforces* ids with this format.""") - target = setting_property("target", doc="The current project development target") + target = setting_property("target", + doc="The current project development target") def save_user_id(self, user_id=None): if user_id == None: @@ -204,25 +215,26 @@ class BugDir (list): deepdir = self.get_path() if not os.path.exists(deepdir): deepdir = os.path.dirname(deepdir) - rcs = detect_rcs(deepdir) + new_rcs = rcs.detect_rcs(deepdir) install = False - if rcs.name == "None": + if new_rcs.name == "None": if allow_rcs_init == True: - rcs = installed_rcs() - rcs.init(self.root) - self.rcs = rcs - return rcs + new_rcs = rcs.installed_rcs() + new_rcs.init(self.root) + self.rcs = new_rcs + return new_rcs def load(self): version = self.get_version() if version != TREE_VERSION_STRING: - raise NotImplementedError, "BugDir cannot handle version '%s' yet." % version + raise NotImplementedError, \ + "BugDir cannot handle version '%s' yet." % version else: if not os.path.exists(self.get_path()): raise NoBugDir(self.get_path()) self.settings = self._get_settings(self.get_path("settings")) - self.rcs = rcs_by_name(self.rcs_name) + self.rcs = rcs.rcs_by_name(self.rcs_name) # set real RCS if self._user_id != None: # was a user name in the settings file self.save_user_id() @@ -243,9 +255,20 @@ class BugDir (list): bug.save() def _get_settings(self, settings_path): + if self.rcs_name == None: + # Use a temporary RCS to loading settings the first time + RCS = rcs.rcs_by_name("None") + RCS.root(self.root) + else: + RCS = self.rcs + + allow_no_rcs = not RCS.path_in_root(settings_path) + # allow_no_rcs=True should only be for the special case of + # configuring duplicate bugdir settings + try: - settings = mapfile.map_load(settings_path) - except mapfile.NoSuchFile: + settings = mapfile.map_load(RCS, settings_path, allow_no_rcs) + except rcs.NoSuchFile: settings = {"rcs_name": "None"} return settings @@ -263,19 +286,18 @@ class BugDir (list): if "user-id" in settings: settings = copy.copy(settings) del settings["user-id"] - try: - mapfile.map_save(self.rcs, settings_path, settings) - except PathNotInRoot, e: - # Special case for configuring duplicate bugdir settings - none_rcs = rcs_by_name("None") - none_rcs.root(settings_path) - mapfile.map_save(none_rcs, settings_path, settings) + allow_no_rcs = not self.rcs.path_in_root(settings_path) + # allow_no_rcs=True should only be for the special case of + # configuring duplicate bugdir settings + mapfile.map_save(self.rcs, settings_path, settings, allow_no_rcs) def duplicate_bugdir(self, revision): duplicate_path = self.rcs.duplicate_repo(revision) - # setup revision RCS as None, since the duplicate may not be initialized for versioning - duplicate_settings_path = os.path.join(duplicate_path, ".be", "settings") + # setup revision RCS as None, since the duplicate may not be + # initialized for versioning + duplicate_settings_path = os.path.join(duplicate_path, + ".be", "settings") duplicate_settings = self._get_settings(duplicate_settings_path) if "rcs_name" in duplicate_settings: duplicate_settings["rcs_name"] = "None" @@ -372,7 +394,8 @@ class BugDir (list): if uuid not in self.bug_map: self._bug_map_gen() if uuid not in self.bug_map: - raise KeyError("No bug matches %s" % uuid +str(self.bug_map)+str(self)) + raise KeyError("No bug matches %s\n bug map: %s\n root: %s" \ + % (uuid, self.bug_map, self.root)) if self.bug_map[uuid] == None: self._load_bug(uuid) return self.bug_map[uuid] @@ -407,7 +430,8 @@ class BugDirTestCase(unittest.TestCase): unittest.TestCase.__init__(self, *args, **kwargs) def setUp(self): self.dir = utility.Dir() - self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False, allow_rcs_init=True) + self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False, + allow_rcs_init=True) self.rcs = self.bugdir.rcs def tearDown(self): self.rcs.cleanup() @@ -429,16 +453,24 @@ class BugDirTestCase(unittest.TestCase): self.bugdir.save() new = self.rcs.commit("Fixed bug a") dupdir = self.bugdir.duplicate_bugdir(original) - self.failUnless(dupdir.root != self.bugdir.root, "%s, %s" % (dupdir.root, self.bugdir.root)) + self.failUnless(dupdir.root != self.bugdir.root, + "%s, %s" % (dupdir.root, self.bugdir.root)) bugAorig = dupdir.bug_from_uuid("a") - self.failUnless(bugA != bugAorig, "\n%s\n%s" % (bugA.string(), bugAorig.string())) + self.failUnless(bugA != bugAorig, + "\n%s\n%s" % (bugA.string(), bugAorig.string())) bugAorig.status = "fixed" - self.failUnless(bug.cmp_status(bugA, bugAorig)==0, "%s, %s" % (bugA.status, bugAorig.status)) - self.failUnless(bug.cmp_severity(bugA, bugAorig)==0, "%s, %s" % (bugA.severity, bugAorig.severity)) - self.failUnless(bug.cmp_assigned(bugA, bugAorig)==0, "%s, %s" % (bugA.assigned, bugAorig.assigned)) - self.failUnless(bug.cmp_time(bugA, bugAorig)==0, "%s, %s" % (bugA.time, bugAorig.time)) - self.failUnless(bug.cmp_creator(bugA, bugAorig)==0, "%s, %s" % (bugA.creator, bugAorig.creator)) - self.failUnless(bugA == bugAorig, "\n%s\n%s" % (bugA.string(), bugAorig.string())) + self.failUnless(bug.cmp_status(bugA, bugAorig)==0, + "%s, %s" % (bugA.status, bugAorig.status)) + self.failUnless(bug.cmp_severity(bugA, bugAorig)==0, + "%s, %s" % (bugA.severity, bugAorig.severity)) + self.failUnless(bug.cmp_assigned(bugA, bugAorig)==0, + "%s, %s" % (bugA.assigned, bugAorig.assigned)) + self.failUnless(bug.cmp_time(bugA, bugAorig)==0, + "%s, %s" % (bugA.time, bugAorig.time)) + self.failUnless(bug.cmp_creator(bugA, bugAorig)==0, + "%s, %s" % (bugA.creator, bugAorig.creator)) + self.failUnless(bugA == bugAorig, + "\n%s\n%s" % (bugA.string(), bugAorig.string())) self.bugdir.remove_duplicate_bugdir() self.failUnless(os.path.exists(dupdir.root)==False, str(dupdir.root)) def testRun(self): diff --git a/libbe/comment.py b/libbe/comment.py index 9b87f18..710c773 100644 --- a/libbe/comment.py +++ b/libbe/comment.py @@ -181,7 +181,7 @@ class Comment(Tree): return os.path.join(my_dir, name) def load(self): - map = mapfile.map_load(self.get_path("values")) + map = mapfile.map_load(self.rcs, self.get_path("values")) self.time = utility.str_to_time(map["Date"]) self.From = map["From"] self.in_reply_to = map.get("In-reply-to") diff --git a/libbe/mapfile.py b/libbe/mapfile.py index 9a7fa8b..559d713 100644 --- a/libbe/mapfile.py +++ b/libbe/mapfile.py @@ -29,28 +29,27 @@ class IllegalValue(Exception): Exception.__init__(self, 'Illegal value "%s"' % value) self.value = value -def generate(f, map, context=3): - """Generate a format-2 mapfile. This is a simpler format, but should merge - better, because there's no chance of confusion for appends, and lines - are unique for both key and value. +def generate(map, context=3): + """Generate a format-2 mapfile content string. This is a simpler + format, but should merge better, because there's no chance of + confusion for appends, and lines are unique for both key and + value. - >>> f = utility.FileString() - >>> generate(f, {"q":"p"}) - >>> f.str + >>> generate({"q":"p"}) '\\n\\n\\nq=p\\n\\n\\n\\n' - >>> generate(f, {"q=":"p"}) + >>> generate({"q=":"p"}) Traceback (most recent call last): IllegalKey: Illegal key "q=" - >>> generate(f, {"q\\n":"p"}) + >>> generate({"q\\n":"p"}) Traceback (most recent call last): IllegalKey: Illegal key "q\\n" - >>> generate(f, {"":"p"}) + >>> generate({"":"p"}) Traceback (most recent call last): IllegalKey: Illegal key "" - >>> generate(f, {">q":"p"}) + >>> generate({">q":"p"}) Traceback (most recent call last): IllegalKey: Illegal key ">q" - >>> generate(f, {"q":"p\\n"}) + >>> generate({"q":"p\\n"}) Traceback (most recent call last): IllegalValue: Illegal value "p\\n" """ @@ -68,63 +67,48 @@ def generate(f, map, context=3): if "\n" in map[key]: raise IllegalValue(map[key].encode('string_escape')) + lines = [] for key in keys: for i in range(context): - f.write("\n") - f.write("%s=%s\n" % (key.encode("utf-8"), map[key].encode("utf-8"))) + lines.append("") + lines.append("%s=%s" % (key, map[key])) for i in range(context): - f.write("\n") + lines.append("") + return '\n'.join(lines) + '\n' -def parse(f): +def parse(contents): """ - Parse a format-2 mapfile. + Parse a format-2 mapfile string. >>> parse('\\n\\n\\nq=p\\n\\n\\n\\n')['q'] - u'p' + 'p' >>> parse('\\n\\nq=\\'p\\'\\n\\n\\n\\n')['q'] - u"\'p\'" - >>> f = utility.FileString() - >>> generate(f, {"a":"b", "c":"d", "e":"f"}) - >>> dict = parse(f) + "\'p\'" + >>> contents = generate({"a":"b", "c":"d", "e":"f"}) + >>> dict = parse(contents) >>> dict["a"] - u'b' + 'b' >>> dict["c"] - u'd' + 'd' >>> dict["e"] - u'f' + 'f' """ - f = utility.get_file(f) result = {} - for line in f: - line = line.decode("utf-8").rstrip('\n') + for line in contents.splitlines(): + line = line.rstrip('\n') if len(line) == 0: continue - name,value = [f for f in line.split('=', 1)] + name,value = [field for field in line.split('=', 1)] assert not result.has_key(name) result[name] = value return result -def map_save(rcs, path, map): +def map_save(rcs, path, map, allow_no_rcs=False): """Save the map as a mapfile to the specified path""" - add = not os.path.exists(path) - output = file(path, "wb") - generate(output, map) - output.close() - if add: - rcs.add(path) - else: - rcs.update(path) + contents = generate(map) + rcs.set_file_contents(path, contents, allow_no_rcs) -class NoSuchFile(Exception): - def __init__(self, pathname): - Exception.__init__(self, "No such file: %s" % pathname) - - -def map_load(path): - try: - return parse(file(path, "rb")) - except IOError, e: - if e.errno != errno.ENOENT: - raise e - raise NoSuchFile(path) +def map_load(rcs, path, allow_no_rcs=False): + contents = rcs.get_file_contents(path, allow_no_rcs=allow_no_rcs) + return parse(contents) suite = doctest.DocTestSuite() diff --git a/libbe/rcs.py b/libbe/rcs.py index f222795..3519c3d 100644 --- a/libbe/rcs.py +++ b/libbe/rcs.py @@ -64,8 +64,22 @@ class CommandError(Exception): class SettingIDnotSupported(NotImplementedError): pass +class RCSnotRooted(Exception): + def __init__(self): + msg = "RCS not rooted" + Exception.__init__(self, msg) + class PathNotInRoot(Exception): - pass + def __init__(self, path, root): + msg = "Path '%s' not in root '%s'" % (path, root) + Exception.__init__(self, msg) + self.path = path + self.root = root + +class NoSuchFile(Exception): + def __init__(self, pathname): + Exception.__init__(self, "No such file: %s" % pathname) + def new(): return RCS() @@ -255,6 +269,8 @@ class RCS(object): Remove a file/directory and all its decendents from both version control and the filesystem. """ + if not os.path.exists(dirname): + raise NoSuchFile(dirname) for dirpath,dirnames,filenames in os.walk(dirname, topdown=False): filenames.extend(dirnames) for path in filenames: @@ -270,34 +286,44 @@ class RCS(object): at path. """ self._rcs_update(self._u_rel_path(path)) - def get_file_contents(self, path, revision=None): + def get_file_contents(self, path, revision=None, allow_no_rcs=False): """ Get the file as it was in a given revision. Revision==None specifies the current revision. """ - relpath = self._u_rel_path(path) - return self._rcs_get_file_contents(relpath, revision).decode("utf-8") - def set_file_contents(self, path, contents): + if not os.path.exists(path): + raise NoSuchFile(path) + if self._use_rcs(path, allow_no_rcs): + relpath = self._u_rel_path(path) + contents = self._rcs_get_file_contents(relpath,revision) + else: + contents = file(path, "rb").read() + return contents.decode("utf-8") + def set_file_contents(self, path, contents, allow_no_rcs=False): """ Set the file contents under version control. """ add = not os.path.exists(path) file(path, "wb").write(contents.encode("utf-8")) - if add: - self.add(path) - else: - self.update(path) - def mkdir(self, path): + + if self._use_rcs(path, allow_no_rcs): + if add: + self.add(path) + else: + self.update(path) + def mkdir(self, path, allow_no_rcs=False): """ Create (if neccessary) a directory at path under version control. """ if not os.path.exists(path): os.mkdir(path) - self.add(path) + if self._use_rcs(path, allow_no_rcs): + self.add(path) else: assert os.path.isdir(path) - self.update(path) + if self._use_rcs(path, allow_no_rcs): + self.update(path) def duplicate_repo(self, revision=None): """ Get the repository as it was in a given revision. @@ -387,6 +413,43 @@ class RCS(object): or None if none of those files exist. """ return search_parent_directories(path, filename) + def _use_rcs(self, path, allow_no_rcs): + """ + Try and decide if _rcs_add/update/mkdir/etc calls will + succeed. Returns True is we think the rcs_call would + succeeed, and False otherwise. + """ + use_rcs = True + exception = None + if self.rootdir != None: + if self.path_in_root(path) == False: + use_rcs = False + exception = PathNotInRoot(path, self.rootdir) + else: + use_rcs = False + exception = RCSnotRooted + if use_rcs == False and allow_no_rcs==False: + raise exception + return use_rcs + def path_in_root(self, path, root=None): + """ + Return the relative path to path from root. + >>> rcs = new() + >>> rcs.path_in_root("/a.b/c/.be", "/a.b/c") + True + >>> rcs.path_in_root("/a.b/.be", "/a.b/c") + False + """ + if root == None: + if self.rootdir == None: + raise RCSnotRooted + root = self.rootdir + path = os.path.abspath(path) + absRoot = os.path.abspath(root) + absRootSlashedDir = os.path.join(absRoot,"") + if not path.startswith(absRootSlashedDir): + return False + return True def _u_rel_path(self, path, root=None): """ Return the relative path to path from root. @@ -395,18 +458,18 @@ class RCS(object): '.be' """ if root == None: - assert self.rootdir != None, "RCS not rooted" + if self.rootdir == None: + raise RCSnotRooted root = self.rootdir - if os.path.isabs(path): - absRoot = os.path.abspath(root) - absRootSlashedDir = os.path.join(absRoot,"") - if not path.startswith(absRootSlashedDir): - raise PathNotInRoot, \ - "file %s not in root %s" % (path, absRootSlashedDir) - assert path != absRootSlashedDir, \ - "file %s == root directory %s" % (path, absRootSlashedDir) - path = path[len(absRootSlashedDir):] - return path + path = os.path.abspath(path) + absRoot = os.path.abspath(root) + absRootSlashedDir = os.path.join(absRoot,"") + if not path.startswith(absRootSlashedDir): + raise PathNotInRoot(path, absRootSlashedDir) + assert path != absRootSlashedDir, \ + "file %s == root directory %s" % (path, absRootSlashedDir) + relpath = path[len(absRootSlashedDir):] + return relpath def _u_abspath(self, path, root=None): """ Return the absolute path from a path realtive to root. -- 2.26.2