X-Git-Url: http://git.tremily.us/?a=blobdiff_plain;f=libbe%2Fstorage%2Fvcs%2Fbase.py;h=845336daf86dad8587e706678ca38bb718ae33f3;hb=e186cfedadf43deb4b265d06be560bf5e5d81dc6;hp=8390cbce4cd90a79b68f898457fc81800745ef0a;hpb=97eabcc3657bdc6511baebd79b059ae1589c7e87;p=be.git diff --git a/libbe/storage/vcs/base.py b/libbe/storage/vcs/base.py index 8390cbc..845336d 100644 --- a/libbe/storage/vcs/base.py +++ b/libbe/storage/vcs/base.py @@ -1,28 +1,28 @@ -# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc. +# Copyright (C) 2005-2012 Aaron Bentley # Alexander Belchenko # Ben Finney # Chris Ball # Gianluca Montecchi -# W. Trevor King +# W. Trevor King # -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. +# This file is part of Bugs Everywhere. # -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. +# Bugs Everywhere is free software: you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 2 of the License, or (at your option) any +# later version. # -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# Bugs Everywhere is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# Bugs Everywhere. If not, see . -""" -Define the base VCS (Version Control System) class, which should be -subclassed by other Version Control System backends. The base class -implements a "do not version" VCS. +"""Define the base :py:class:`VCS` (Version Control System) class, which +should be subclassed by other Version Control System backends. The +base class implements a "do not version" VCS. """ import codecs @@ -50,11 +50,17 @@ if libbe.TESTING == True: import libbe.ui.util.user -# List VCS modules in order of preference. -# Don't list this module, it is implicitly last. -VCS_ORDER = ['arch', 'bzr', 'darcs', 'git', 'hg'] +VCS_ORDER = ['arch', 'bzr', 'darcs', 'git', 'hg', 'monotone'] +"""List VCS modules in order of preference. + +Don't list this module, it is implicitly last. +""" def set_preferred_vcs(name): + """Manipulate :py:data:`VCS_ORDER` to place `name` first. + + This is primarily indended for testing purposes. + """ global VCS_ORDER assert name in VCS_ORDER, \ 'unrecognized VCS %s not in\n %s' % (name, VCS_ORDER) @@ -62,7 +68,10 @@ def set_preferred_vcs(name): VCS_ORDER.insert(0, name) def _get_matching_vcs(matchfn): - """Return the first module for which matchfn(VCS_instance) is true""" + """Return the first module for which matchfn(VCS_instance) is True. + + Searches in :py:data:`VCS_ORDER`. + """ for submodname in VCS_ORDER: module = import_by_name('libbe.storage.vcs.%s' % submodname) vcs = module.new() @@ -71,17 +80,26 @@ def _get_matching_vcs(matchfn): return VCS() def vcs_by_name(vcs_name): - """Return the module for the VCS with the given name""" + """Return the module for the VCS with the given name. + + Searches in :py:data:`VCS_ORDER`. + """ if vcs_name == VCS.name: return new() return _get_matching_vcs(lambda vcs: vcs.name == vcs_name) def detect_vcs(dir): - """Return an VCS instance for the vcs being used in this directory""" + """Return an VCS instance for the vcs being used in this directory. + + Searches in :py:data:`VCS_ORDER`. + """ return _get_matching_vcs(lambda vcs: vcs._detect(dir)) def installed_vcs(): - """Return an instance of an installed VCS""" + """Return an instance of an installed VCS. + + Searches in :py:data:`VCS_ORDER`. + """ return _get_matching_vcs(lambda vcs: vcs.installed()) @@ -118,10 +136,17 @@ class NoSuchFile (InvalidID): class CachedPathID (object): - """ - Storage ID <-> path policy. - .../.be/BUGDIR/bugs/BUG/comments/COMMENT - ^-- root path + """Cache Storage ID <-> path policy. + + Paths generated following:: + + .../.be/BUGDIR/bugs/BUG/comments/COMMENT + ^-- root path + + See :py:mod:`libbe.util.id` for a discussion of ID formats. + + Examples + -------- >>> dir = Dir() >>> os.mkdir(os.path.join(dir.path, '.be')) @@ -131,11 +156,11 @@ class CachedPathID (object): >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments')) >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def')) >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '456')) - >>> file(os.path.join(dir.path, '.be', 'abc', 'values'), + >>> open(os.path.join(dir.path, '.be', 'abc', 'values'), ... 'w').close() - >>> file(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'values'), + >>> open(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'values'), ... 'w').close() - >>> file(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'), + >>> open(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'), ... 'w').close() >>> c = CachedPathID() >>> c.root(dir.path) @@ -168,13 +193,13 @@ class CachedPathID (object): >>> c.path('qrs') Traceback (most recent call last): ... - InvalidID: 'qrs' + InvalidID: qrs in revision None >>> c.disconnect() >>> c.destroy() >>> dir.cleanup() """ def __init__(self, encoding=None): - self.encoding = libbe.util.encoding.get_filesystem_encoding() + self.encoding = libbe.util.encoding.get_text_file_encoding() self._spacer_dirs = ['.be', 'bugs', 'comments'] def root(self, path): @@ -182,28 +207,35 @@ class CachedPathID (object): self._cache_path = os.path.join( self._root, self._spacer_dirs[0], 'id-cache') - def init(self): - """ - Create cache file for an existing .be directory. - File if multiple lines of the form: - UUID\tPATH + def init(self, verbose=True, cache=None): + """Create cache file for an existing .be directory. + + The file contains multiple lines of the form:: + + UUID\tPATH """ - self._cache = {} + if cache == None: + self._cache = {} + else: + self._cache = cache spaced_root = os.path.join(self._root, self._spacer_dirs[0]) - for dirpath, dirnames, filenames in os.walk(spaced_root): + for dirpath, dirnames, filenames in os.walk(spaced_root, + followlinks=True): if dirpath == spaced_root: continue try: id = self.id(dirpath) - relpath = dirpath[len(self._root)+1:] + relpath = dirpath[len(self._root + os.path.sep):] if id.count('/') == 0: - if id in self._cache: + if verbose == True and id in self._cache: print >> sys.stderr, 'Multiple paths for %s: \n %s\n %s' % (id, self._cache[id], relpath) self._cache[id] = relpath except InvalidPath: pass - self._changed = True - self.disconnect() + if self._cache != cache: + self._changed = True + if cache == None: + self.disconnect() def destroy(self): if os.path.exists(self._cache_path): @@ -239,7 +271,9 @@ class CachedPathID (object): else: extra = fields[1:] if uuid not in self._cache: - raise InvalidID(uuid) + self.init(verbose=False, cache=self._cache) + if uuid not in self._cache: + raise InvalidID(uuid) if relpath == True: return os.path.join(self._cache[uuid], *extra) return os.path.join(self._root, self._cache[uuid], *extra) @@ -280,15 +314,15 @@ class CachedPathID (object): path = os.path.join(self._root, path) if not path.startswith(self._root + os.path.sep): raise InvalidPath(path, self._root) - path = path[len(self._root)+1:] + path = path[len(self._root + os.path.sep):] orig_path = path if not path.startswith(self._spacer_dirs[0] + os.path.sep): raise InvalidPath(path, self._spacer_dirs[0]) for spacer in self._spacer_dirs: if not path.startswith(spacer + os.path.sep): break - id = path[len(spacer)+1:] - fields = path[len(spacer)+1:].split(os.path.sep,1) + id = path[len(spacer + os.path.sep):] + fields = path[len(spacer + os.path.sep):].split(os.path.sep,1) if len(fields) == 1: break path = fields[1] @@ -304,149 +338,20 @@ def new(): return VCS() class VCS (libbe.storage.base.VersionedStorage): - """ - This class implements a 'no-vcs' interface. + """Implement a 'no-VCS' interface. Support for other VCSs can be added by subclassing this class, and - overriding methods _vcs_*() with code appropriate for your VCS. + overriding methods `_vcs_*()` with code appropriate for your VCS. - The methods _u_*() are utility methods available to the _vcs_*() + The methods `_u_*()` are utility methods available to the `_vcs_*()` methods. - - Sink to existing root - ====================== - - Consider the following usage case: - You have a bug directory rooted in - /path/to/source - by which I mean the '.be' directory is at - /path/to/source/.be - However, you're of in some subdirectory like - /path/to/source/GUI/testing - and you want to comment on a bug. Setting sink_to_root=True when - you initialize your BugDir will cause it to search for the '.be' - file in the ancestors of the path you passed in as 'root'. - /path/to/source/GUI/testing/.be miss - /path/to/source/GUI/.be miss - /path/to/source/.be hit! - So it still roots itself appropriately without much work for you. - - File-system access - ================== - - BugDirs live completely in memory when .sync_with_disk is False. - This is the default configuration setup by BugDir(from_disk=False). - If .sync_with_disk == True (e.g. BugDir(from_disk=True)), then - any changes to the BugDir will be immediately written to disk. - - If you want to change .sync_with_disk, we suggest you use - .set_sync_with_disk(), which propogates the new setting through to - all bugs/comments/etc. that have been loaded into memory. If - you've been living in memory and want to move to - .sync_with_disk==True, but you're not sure if anything has been - changed in memory, a call to .save() immediately before the - .set_sync_with_disk(True) call is a safe move. - - Regardless of .sync_with_disk, a call to .save() will write out - all the contents that the BugDir instance has loaded into memory. - If sync_with_disk has been True over the course of all interesting - changes, this .save() call will be a waste of time. - - The BugDir will only load information from the file system when it - loads new settings/bugs/comments that it doesn't already have in - memory and .sync_with_disk == True. - - Allow storage initialization - ======================== - - This one is for testing purposes. Setting it to True allows the - BugDir to search for an installed Storage backend and initialize - it in the root directory. This is a convenience option for - supporting tests of versioning functionality - (e.g. .duplicate_bugdir). - - Disable encoding manipulation - ============================= - - This one is for testing purposed. You might have non-ASCII - Unicode in your bugs, comments, files, etc. BugDir instances try - and support your preferred encoding scheme (e.g. "utf-8") when - dealing with stream and file input/output. For stream output, - this involves replacing sys.stdout and sys.stderr - (libbe.encode.set_IO_stream_encodings). However this messes up - doctest's output catching. In order to support doctest tests - using BugDirs, set manipulate_encodings=False, and stick to ASCII - in your tests. - - if root == None: - root = os.getcwd() - if sink_to_existing_root == True: - self.root = self._find_root(root) - else: - if not os.path.exists(root): - self.root = None - raise NoRootEntry(root) - self.root = root - # get a temporary storage until we've loaded settings - self.sync_with_disk = False - self.storage = self._guess_storage() - - if assert_new_BugDir == True: - if os.path.exists(self.get_path()): - raise AlreadyInitialized, self.get_path() - if storage == None: - storage = self._guess_storage(allow_storage_init) - self.storage = storage - self._setup_user_id(self.user_id) - - - # methods for getting the BugDir situated in the filesystem - - def _find_root(self, path): - ''' - Search for an existing bug database dir and it's ancestors and - return a BugDir rooted there. Only called by __init__, and - then only if sink_to_existing_root == True. - ''' - if not os.path.exists(path): - self.root = None - raise NoRootEntry(path) - versionfile=utility.search_parent_directories(path, - os.path.join(".be", "version")) - if versionfile != None: - beroot = os.path.dirname(versionfile) - root = os.path.dirname(beroot) - return root - else: - beroot = utility.search_parent_directories(path, ".be") - if beroot == None: - self.root = None - raise NoBugDir(path) - return beroot - - def _guess_storage(self, allow_storage_init=False): - ''' - Only called by __init__. - ''' - deepdir = self.get_path() - if not os.path.exists(deepdir): - deepdir = os.path.dirname(deepdir) - new_storage = storage.detect_storage(deepdir) - install = False - if new_storage.name == "None": - if allow_storage_init == True: - new_storage = storage.installed_storage() - new_storage.init(self.root) - return new_storage - -os.listdir(self.get_path("bugs")): """ name = 'None' client = 'false' # command-line tool for _u_invoke_client def __init__(self, *args, **kwargs): if 'encoding' not in kwargs: - kwargs['encoding'] = libbe.util.encoding.get_filesystem_encoding() + kwargs['encoding'] = libbe.util.encoding.get_text_file_encoding() libbe.storage.base.VersionedStorage.__init__(self, *args, **kwargs) self.versioned = False self.interspersed_vcs_files = False @@ -504,6 +409,12 @@ os.listdir(self.get_path("bugs")): """ pass + def _vcs_exists(self, path, revision=None): + """ + Does the path exist in a given revision? (True/False) + """ + raise NotImplementedError('Lazy BE developers') + def _vcs_remove(self, path): """ Remove the file at path from version control. Optionally @@ -550,7 +461,7 @@ os.listdir(self.get_path("bugs")): def _vcs_path(self, id, revision): """ - Return the path to object id as of revision. + Return the relative path to object id as of revision. Revision will not be None. """ @@ -597,23 +508,110 @@ os.listdir(self.get_path("bugs")): """ return None + def _vcs_changed(self, revision): + """ + Return a tuple of lists of ids + (new, modified, removed) + from the specified revision to the current situation. + """ + return ([], [], []) + def version(self): # Cache version string for efficiency. if not hasattr(self, '_version'): - self._version = self._get_version() + self._version = self._vcs_version() return self._version - def _get_version(self): - try: - ret = self._vcs_version() - return ret - except OSError, e: - if e.errno == errno.ENOENT: - return None + def version_cmp(self, *args): + """Compare the installed VCS version `V_i` with another version + `V_o` (given in `*args`). Returns + + === =============== + 1 if `V_i > V_o` + 0 if `V_i == V_o` + -1 if `V_i < V_o` + === =============== + + Examples + -------- + + >>> v = VCS(repo='.') + >>> v._version = '2.3.1 (release)' + >>> v.version_cmp(2,3,1) + 0 + >>> v.version_cmp(2,3,2) + -1 + >>> v.version_cmp(2,3,'a',5) + 1 + >>> v.version_cmp(2,3,0) + 1 + >>> v.version_cmp(2,3,1,'a',5) + 1 + >>> v.version_cmp(2,3,1,1) + -1 + >>> v.version_cmp(3) + -1 + >>> v._version = '2.0.0pre2' + >>> v._parsed_version = None + >>> v.version_cmp(3) + -1 + >>> v.version_cmp(2,0,1) + -1 + >>> v.version_cmp(2,0,0,'pre',1) + 1 + >>> v.version_cmp(2,0,0,'pre',2) + 0 + >>> v.version_cmp(2,0,0,'pre',3) + -1 + >>> v.version_cmp(2,0,0,'a',3) + 1 + >>> v.version_cmp(2,0,0,'rc',1) + -1 + """ + if not hasattr(self, '_parsed_version') \ + or self._parsed_version == None: + num_part = self.version().split(' ')[0] + self._parsed_version = [] + for num in num_part.split('.'): + try: + self._parsed_version.append(int(num)) + except ValueError, e: + # bzr version number might contain non-numerical tags + splitter = re.compile(r'[\D]') # Match non-digits + splits = splitter.split(num) + # if len(tag) > 1 some splits will be empty; remove + splits = filter(lambda s: s != '', splits) + tag_starti = len(splits[0]) + num_starti = num.find(splits[1], tag_starti) + tag = num[tag_starti:num_starti] + self._parsed_version.append(int(splits[0])) + self._parsed_version.append(tag) + self._parsed_version.append(int(splits[1])) + for current,other in zip(self._parsed_version, args): + if type(current) != type (other): + # one of them is a pre-release string + if type(current) != types.IntType: + return -1 + else: + return 1 + c = cmp(current,other) + if c != 0: + return c + # see if one is longer than the other + verlen = len(self._parsed_version) + arglen = len(args) + if verlen == arglen: + return 0 + elif verlen > arglen: + if type(self._parsed_version[arglen]) != types.IntType: + return -1 # self is a prerelease else: - raise OSError, e - except CommandError: - return None + return 1 + else: + if type(args[verlen]) != types.IntType: + return 1 # args is a prerelease + else: + return -1 def installed(self): if self.version() != None: @@ -629,6 +627,11 @@ os.listdir(self.get_path("bugs")): """ if not hasattr(self, 'user_id'): self.user_id = self._vcs_get_user_id() + if self.user_id == None: + # guess missing info + name = libbe.ui.util.user.get_fallback_fullname() + email = libbe.ui.util.user.get_fallback_email() + self.user_id = libbe.ui.util.user.create_user_id(name, email) return self.user_id def _detect(self, path='.'): @@ -638,14 +641,33 @@ os.listdir(self.get_path("bugs")): return self._vcs_detect(path) def root(self): - """ - Set the root directory to the path's VCS root. This is the - default working directory for future invocations. + """Set the root directory to the path's VCS root. + + This is the default working directory for future invocations. + Consider the following usage case: + + You have a project rooted in:: + + /path/to/source/ + + by which I mean the VCS repository is in, for example:: + + /path/to/source/.bzr + + However, you're of in some subdirectory like:: + + /path/to/source/ui/testing + + and you want to comment on a bug. `root` will locate your VCS + root (``/path/to/source/``) and set the repo there. This + means that it doesn't matter where you are in your project + tree when you call "be COMMAND", it always acts as if you called + it from the VCS root. """ if self._detect(self.repo) == False: raise VCSUnableToRoot(self) root = self._vcs_root(self.repo) - self.repo = os.path.abspath(root) + self.repo = os.path.realpath(root) if os.path.isdir(self.repo) == False: self.repo = os.path.dirname(self.repo) self.be_dir = os.path.join( @@ -657,6 +679,10 @@ os.listdir(self.get_path("bugs")): """ Begin versioning the tree based at self.repo. Also roots the vcs at path. + + See Also + -------- + root : called if the VCS has already been initialized. """ if not os.path.exists(self.repo) or not os.path.isdir(self.repo): raise VCSUnableToRoot(self) @@ -686,6 +712,17 @@ os.listdir(self.get_path("bugs")): def _disconnect(self): self._cached_path_id.disconnect() + def path(self, id, revision=None, relpath=True): + if revision == None: + path = self._cached_path_id.path(id) + if relpath == True: + return self._u_rel_path(path) + return path + path = self._vcs_path(id, revision) + if relpath == True: + return path + return os.path.join(self.repo, path) + def _add_path(self, path, directory=False): relpath = self._u_rel_path(path) reldirs = relpath.split(os.path.sep) @@ -708,6 +745,16 @@ os.listdir(self.get_path("bugs")): path = self._cached_path_id.add_id(id, parent) self._add_path(path, **kwargs) + def _exists(self, id, revision=None): + if revision == None: + try: + path = self.path(id, revision, relpath=False) + except InvalidID, e: + return False + return os.path.exists(path) + path = self.path(id, revision, relpath=True) + return self._vcs_exists(relpath, revision) + def _remove(self, id): path = self._cached_path_id.path(id) if os.path.exists(path): @@ -737,19 +784,36 @@ os.listdir(self.get_path("bugs")): if p.startswith(path): self._cached_path_id.remove_id(id) + def _ancestors(self, id=None, revision=None): + if id==None: + path = self.be_dir + else: + path = self.path(id, revision, relpath=False) + ancestors = [] + while True: + if not path.startswith(self.repo + os.path.sep): + break + path = os.path.dirname(path) + try: + id = self._u_path_to_id(path) + ancestors.append(id) + except (SpacerCollision, InvalidPath): + pass + return ancestors + def _children(self, id=None, revision=None): if revision == None: - id_to_path = self._cached_path_id.path isdir = os.path.isdir listdir = os.listdir else: - id_to_path = lambda id : self._vcs_path(id, revision) - isdir = lambda path : self._vcs_isdir(path, revision) - listdir = lambda path : self._vcs_listdir(path, revision) + isdir = lambda path : self._vcs_isdir( + self._u_rel_path(path), revision) + listdir = lambda path : self._vcs_listdir( + self._u_rel_path(path), revision) if id==None: path = self.be_dir else: - path = id_to_path(id) + path = self.path(id, revision, relpath=False) if isdir(path) == False: return [] children = listdir(path) @@ -772,28 +836,22 @@ os.listdir(self.get_path("bugs")): children[i] = None else: children[i] = self._u_path_to_id(cpath) - children[i] return [c for c in children if c != None] def _get(self, id, default=libbe.util.InvalidObject, revision=None): try: - path = self._cached_path_id.path(id) + relpath = self.path(id, revision, relpath=True) + contents = self._vcs_get_file_contents(relpath, revision) except InvalidID, e: if default == libbe.util.InvalidObject: raise e return default - relpath = self._u_rel_path(path) - try: - contents = self._vcs_get_file_contents(relpath, revision) - except InvalidID, e: - if InvalidID == None: - e.id = InvalidID - raise if contents in [libbe.storage.base.InvalidDirectory, - libbe.util.InvalidObject]: - raise InvalidID(id) - elif len(contents) == 0: - return None + libbe.util.InvalidObject] \ + or len(contents) == 0: + if default == libbe.util.InvalidObject: + raise InvalidID(id, revision) + return default return contents def _set(self, id, value): @@ -839,9 +897,22 @@ os.listdir(self.get_path("bugs")): raise libbe.storage.base.InvalidRevision(index) return revid + def changed(self, revision): + new,mod,rem = self._vcs_changed(revision) + def paths_to_ids(paths): + for p in paths: + try: + id = self._u_path_to_id(p) + yield id + except (SpacerCollision, InvalidPath): + pass + new_id = list(paths_to_ids(new)) + mod_id = list(paths_to_ids(mod)) + rem_id = list(paths_to_ids(rem)) + return (new_id, mod_id, rem_id) + def _u_any_in_string(self, list, string): - """ - Return True if any of the strings in list are in string. + """Return True if any of the strings in list are in string. Otherwise return False. """ for list_string in list: @@ -864,9 +935,8 @@ os.listdir(self.get_path("bugs")): return self._u_invoke(cl_args, **kwargs) def _u_search_parent_directories(self, path, filename): - """ - Find the file (or directory) named filename in path or in any - of path's parents. + """Find the file (or directory) named filename in path or in any of + path's parents. e.g. search_parent_directories("/a/b/c", ".be") @@ -883,9 +953,36 @@ os.listdir(self.get_path("bugs")): return None return ret - def _u_find_id(self, id, revision): + def _u_find_id_from_manifest(self, id, manifest, revision=None): + """Search for the relative path to id using manifest, a list of all + files. + + Returns None if the id is not found. """ - Search for the relative path to id as of revision. + be_dir = self._cached_path_id._spacer_dirs[0] + be_dir_sep = self._cached_path_id._spacer_dirs[0] + os.path.sep + files = [f for f in manifest if f.startswith(be_dir_sep)] + for file in files: + if not file.startswith(be_dir+os.path.sep): + continue + parts = file.split(os.path.sep) + dir = parts.pop(0) # don't add the first spacer dir + for part in parts[:-1]: + dir = os.path.join(dir, part) + if not dir in files: + files.append(dir) + for file in files: + try: + p_id = self._u_path_to_id(file) + if p_id == id: + return file + except (SpacerCollision, InvalidPath): + pass + raise InvalidID(id, revision=revision) + + def _u_find_id(self, id, revision): + """Search for the relative path to id as of revision. + Returns None if the id is not found. """ assert self._rooted == True @@ -906,8 +1003,10 @@ os.listdir(self.get_path("bugs")): return self._cached_path_id.id(path) def _u_rel_path(self, path, root=None): - """ - Return the relative path to path from root. + """Return the relative path to path from root. + + Examples: + >>> vcs = new() >>> vcs._u_rel_path("/a.b/c/.be", "/a.b/c") '.be' @@ -933,8 +1032,11 @@ os.listdir(self.get_path("bugs")): return relpath def _u_abspath(self, path, root=None): - """ - Return the absolute path from a path realtive to root. + """Return the absolute path from a path relative to root. + + Examples + -------- + >>> vcs = new() >>> vcs._u_abspath(".be", "/a.b/c") '/a.b/c/.be' @@ -945,9 +1047,8 @@ os.listdir(self.get_path("bugs")): return os.path.abspath(os.path.join(root, path)) def _u_parse_commitfile(self, commitfile): - """ - Split the commitfile created in self.commit() back into - summary and header lines. + """Split the commitfile created in self.commit() back into summary and + header lines. """ f = codecs.open(commitfile, 'r', self.encoding) summary = f.readline() @@ -964,8 +1065,11 @@ os.listdir(self.get_path("bugs")): upgrade.upgrade(self.repo, version) def storage_version(self, revision=None, path=None): - """ - Requires disk access. + """Return the storage version of the on-disk files. + + See Also + -------- + libbe.storage.util.upgrade """ if path == None: path = os.path.join(self.repo, '.be', 'version') @@ -973,8 +1077,9 @@ os.listdir(self.get_path("bugs")): raise libbe.storage.InvalidStorageVersion(None) if revision == None: # don't require connection return libbe.util.encoding.get_file_contents( - path, decode=True).rstrip('\n') - contents = self._vcs_get_file_contents(path, revision=revision) + path, decode=True).rstrip() + relpath = self._u_rel_path(path) + contents = self._vcs_get_file_contents(relpath, revision=revision) if type(contents) != types.UnicodeType: contents = unicode(contents, self.encoding) return contents.strip() @@ -990,7 +1095,7 @@ os.listdir(self.get_path("bugs")): libbe.storage.STORAGE_VERSION+'\n') self._vcs_add(self._u_rel_path(path)) - + if libbe.TESTING == True: class VCSTestCase (unittest.TestCase): """ @@ -1023,8 +1128,7 @@ if libbe.TESTING == True: class VCS_installed_TestCase (VCSTestCase): def test_installed(self): - """ - See if the VCS is installed. + """See if the VCS is installed. """ self.failUnless(self.s.installed() == True, '%(name)s VCS not found' % vars(self.Class)) @@ -1032,8 +1136,7 @@ if libbe.TESTING == True: class VCS_detection_TestCase (VCSTestCase): def test_detection(self): - """ - See if the VCS detects its installed repository + """See if the VCS detects its installed repository """ if self.s.installed(): self.s.disconnect() @@ -1043,8 +1146,7 @@ if libbe.TESTING == True: self.s.connect() def test_no_detection(self): - """ - See if the VCS detects its installed repository + """See if the VCS detects its installed repository """ if self.s.installed() and self.Class.name != 'None': self.s.disconnect() @@ -1067,7 +1169,7 @@ if libbe.TESTING == True: class VCS_get_user_id_TestCase(VCSTestCase): """Test cases for VCS.get_user_id method.""" - def test_gets_existing_user_id(self): + def test_get_existing_user_id(self): """Should get the existing user ID.""" if self.s.installed(): user_id = self.s.get_user_id()