X-Git-Url: http://git.tremily.us/?a=blobdiff_plain;f=libbe%2Fstorage%2Fvcs%2Fbase.py;h=845336daf86dad8587e706678ca38bb718ae33f3;hb=e186cfedadf43deb4b265d06be560bf5e5d81dc6;hp=a45f1fe308bc25cefe0c59513c1352eedcd66f0e;hpb=214c4317bb90684dcfdab4d2402daa66fbad2e77;p=be.git diff --git a/libbe/storage/vcs/base.py b/libbe/storage/vcs/base.py index a45f1fe..845336d 100644 --- a/libbe/storage/vcs/base.py +++ b/libbe/storage/vcs/base.py @@ -1,28 +1,28 @@ -# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. +# Copyright (C) 2005-2012 Aaron Bentley # Alexander Belchenko # Ben Finney # Chris Ball # Gianluca Montecchi -# W. Trevor King +# W. Trevor King # -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. +# This file is part of Bugs Everywhere. # -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. +# Bugs Everywhere is free software: you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 2 of the License, or (at your option) any +# later version. # -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# Bugs Everywhere is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# Bugs Everywhere. If not, see . -""" -Define the base VCS (Version Control System) class, which should be -subclassed by other Version Control System backends. The base class -implements a "do not version" VCS. +"""Define the base :py:class:`VCS` (Version Control System) class, which +should be subclassed by other Version Control System backends. The +base class implements a "do not version" VCS. """ import codecs @@ -32,11 +32,13 @@ import re import shutil import sys import tempfile +import types import libbe +import libbe.storage import libbe.storage.base import libbe.util.encoding -from libbe.storage.base import EmptyCommit, InvalidRevision +from libbe.storage.base import EmptyCommit, InvalidRevision, InvalidID from libbe.util.utility import Dir, search_parent_directories from libbe.util.subproc import CommandError, invoke from libbe.util.plugin import import_by_name @@ -48,11 +50,17 @@ if libbe.TESTING == True: import libbe.ui.util.user -# List VCS modules in order of preference. -# Don't list this module, it is implicitly last. -VCS_ORDER = ['arch', 'bzr', 'darcs', 'git', 'hg'] +VCS_ORDER = ['arch', 'bzr', 'darcs', 'git', 'hg', 'monotone'] +"""List VCS modules in order of preference. + +Don't list this module, it is implicitly last. +""" def set_preferred_vcs(name): + """Manipulate :py:data:`VCS_ORDER` to place `name` first. + + This is primarily indended for testing purposes. + """ global VCS_ORDER assert name in VCS_ORDER, \ 'unrecognized VCS %s not in\n %s' % (name, VCS_ORDER) @@ -60,7 +68,10 @@ def set_preferred_vcs(name): VCS_ORDER.insert(0, name) def _get_matching_vcs(matchfn): - """Return the first module for which matchfn(VCS_instance) is true""" + """Return the first module for which matchfn(VCS_instance) is True. + + Searches in :py:data:`VCS_ORDER`. + """ for submodname in VCS_ORDER: module = import_by_name('libbe.storage.vcs.%s' % submodname) vcs = module.new() @@ -69,17 +80,26 @@ def _get_matching_vcs(matchfn): return VCS() def vcs_by_name(vcs_name): - """Return the module for the VCS with the given name""" + """Return the module for the VCS with the given name. + + Searches in :py:data:`VCS_ORDER`. + """ if vcs_name == VCS.name: return new() return _get_matching_vcs(lambda vcs: vcs.name == vcs_name) def detect_vcs(dir): - """Return an VCS instance for the vcs being used in this directory""" + """Return an VCS instance for the vcs being used in this directory. + + Searches in :py:data:`VCS_ORDER`. + """ return _get_matching_vcs(lambda vcs: vcs._detect(dir)) def installed_vcs(): - """Return an instance of an installed VCS""" + """Return an instance of an installed VCS. + + Searches in :py:data:`VCS_ORDER`. + """ return _get_matching_vcs(lambda vcs: vcs.installed()) @@ -95,11 +115,11 @@ class VCSUnableToRoot (libbe.storage.base.ConnectionError): libbe.storage.base.ConnectionError.__init__(self, msg) self.vcs = vcs -class InvalidPath (libbe.storage.base.InvalidID): - def __init__(self, path, root, msg=None): +class InvalidPath (InvalidID): + def __init__(self, path, root, msg=None, **kwargs): if msg == None: msg = 'Path "%s" not in root "%s"' % (path, root) - libbe.storage.base.InvalidID.__init__(self, msg) + InvalidID.__init__(self, msg=msg, **kwargs) self.path = path self.root = root @@ -109,17 +129,24 @@ class SpacerCollision (InvalidPath): InvalidPath.__init__(self, path, root=None, msg=msg) self.spacer = spacer -class NoSuchFile (libbe.storage.base.InvalidID): +class NoSuchFile (InvalidID): def __init__(self, pathname, root='.'): path = os.path.abspath(os.path.join(root, pathname)) - libbe.storage.base.InvalidID.__init__(self, 'No such file: %s' % path) + InvalidID.__init__(self, 'No such file: %s' % path) class CachedPathID (object): - """ - Storage ID <-> path policy. - .../.be/BUGDIR/bugs/BUG/comments/COMMENT - ^-- root path + """Cache Storage ID <-> path policy. + + Paths generated following:: + + .../.be/BUGDIR/bugs/BUG/comments/COMMENT + ^-- root path + + See :py:mod:`libbe.util.id` for a discussion of ID formats. + + Examples + -------- >>> dir = Dir() >>> os.mkdir(os.path.join(dir.path, '.be')) @@ -129,11 +156,11 @@ class CachedPathID (object): >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments')) >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def')) >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '456')) - >>> file(os.path.join(dir.path, '.be', 'abc', 'values'), + >>> open(os.path.join(dir.path, '.be', 'abc', 'values'), ... 'w').close() - >>> file(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'values'), + >>> open(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'values'), ... 'w').close() - >>> file(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'), + >>> open(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'), ... 'w').close() >>> c = CachedPathID() >>> c.root(dir.path) @@ -166,13 +193,13 @@ class CachedPathID (object): >>> c.path('qrs') Traceback (most recent call last): ... - InvalidID: 'qrs' + InvalidID: qrs in revision None >>> c.disconnect() >>> c.destroy() >>> dir.cleanup() """ def __init__(self, encoding=None): - self.encoding = libbe.util.encoding.get_filesystem_encoding() + self.encoding = libbe.util.encoding.get_text_file_encoding() self._spacer_dirs = ['.be', 'bugs', 'comments'] def root(self, path): @@ -180,29 +207,35 @@ class CachedPathID (object): self._cache_path = os.path.join( self._root, self._spacer_dirs[0], 'id-cache') - def init(self): - """ - Create cache file for an existing .be directory. - File if multiple lines of the form: - UUID\tPATH + def init(self, verbose=True, cache=None): + """Create cache file for an existing .be directory. + + The file contains multiple lines of the form:: + + UUID\tPATH """ - self._cache = {} + if cache == None: + self._cache = {} + else: + self._cache = cache spaced_root = os.path.join(self._root, self._spacer_dirs[0]) - for dirpath, dirnames, filenames in os.walk(spaced_root): + for dirpath, dirnames, filenames in os.walk(spaced_root, + followlinks=True): if dirpath == spaced_root: continue try: id = self.id(dirpath) - relpath = dirpath[len(self._root)+1:] + relpath = dirpath[len(self._root + os.path.sep):] if id.count('/') == 0: - if id in self._cache: - import sys + if verbose == True and id in self._cache: print >> sys.stderr, 'Multiple paths for %s: \n %s\n %s' % (id, self._cache[id], relpath) self._cache[id] = relpath except InvalidPath: pass - self._changed = True - self.disconnect() + if self._cache != cache: + self._changed = True + if cache == None: + self.disconnect() def destroy(self): if os.path.exists(self._cache_path): @@ -238,7 +271,9 @@ class CachedPathID (object): else: extra = fields[1:] if uuid not in self._cache: - raise libbe.storage.base.InvalidID(uuid) + self.init(verbose=False, cache=self._cache) + if uuid not in self._cache: + raise InvalidID(uuid) if relpath == True: return os.path.join(self._cache[uuid], *extra) return os.path.join(self._root, self._cache[uuid], *extra) @@ -276,18 +311,18 @@ class CachedPathID (object): self._changed = True def id(self, path): - path = os.path.abspath(path) + path = os.path.join(self._root, path) if not path.startswith(self._root + os.path.sep): - raise InvalidPath('Path %s not in root %s' % (path, self._root)) - path = path[len(self._root)+1:] + raise InvalidPath(path, self._root) + path = path[len(self._root + os.path.sep):] orig_path = path if not path.startswith(self._spacer_dirs[0] + os.path.sep): raise InvalidPath(path, self._spacer_dirs[0]) for spacer in self._spacer_dirs: if not path.startswith(spacer + os.path.sep): break - id = path[len(spacer)+1:] - fields = path[len(spacer)+1:].split(os.path.sep,1) + id = path[len(spacer + os.path.sep):] + fields = path[len(spacer + os.path.sep):].split(os.path.sep,1) if len(fields) == 1: break path = fields[1] @@ -303,149 +338,20 @@ def new(): return VCS() class VCS (libbe.storage.base.VersionedStorage): - """ - This class implements a 'no-vcs' interface. + """Implement a 'no-VCS' interface. Support for other VCSs can be added by subclassing this class, and - overriding methods _vcs_*() with code appropriate for your VCS. + overriding methods `_vcs_*()` with code appropriate for your VCS. - The methods _u_*() are utility methods available to the _vcs_*() + The methods `_u_*()` are utility methods available to the `_vcs_*()` methods. - - Sink to existing root - ====================== - - Consider the following usage case: - You have a bug directory rooted in - /path/to/source - by which I mean the '.be' directory is at - /path/to/source/.be - However, you're of in some subdirectory like - /path/to/source/GUI/testing - and you want to comment on a bug. Setting sink_to_root=True when - you initialize your BugDir will cause it to search for the '.be' - file in the ancestors of the path you passed in as 'root'. - /path/to/source/GUI/testing/.be miss - /path/to/source/GUI/.be miss - /path/to/source/.be hit! - So it still roots itself appropriately without much work for you. - - File-system access - ================== - - BugDirs live completely in memory when .sync_with_disk is False. - This is the default configuration setup by BugDir(from_disk=False). - If .sync_with_disk == True (e.g. BugDir(from_disk=True)), then - any changes to the BugDir will be immediately written to disk. - - If you want to change .sync_with_disk, we suggest you use - .set_sync_with_disk(), which propogates the new setting through to - all bugs/comments/etc. that have been loaded into memory. If - you've been living in memory and want to move to - .sync_with_disk==True, but you're not sure if anything has been - changed in memory, a call to .save() immediately before the - .set_sync_with_disk(True) call is a safe move. - - Regardless of .sync_with_disk, a call to .save() will write out - all the contents that the BugDir instance has loaded into memory. - If sync_with_disk has been True over the course of all interesting - changes, this .save() call will be a waste of time. - - The BugDir will only load information from the file system when it - loads new settings/bugs/comments that it doesn't already have in - memory and .sync_with_disk == True. - - Allow storage initialization - ======================== - - This one is for testing purposes. Setting it to True allows the - BugDir to search for an installed Storage backend and initialize - it in the root directory. This is a convenience option for - supporting tests of versioning functionality - (e.g. .duplicate_bugdir). - - Disable encoding manipulation - ============================= - - This one is for testing purposed. You might have non-ASCII - Unicode in your bugs, comments, files, etc. BugDir instances try - and support your preferred encoding scheme (e.g. "utf-8") when - dealing with stream and file input/output. For stream output, - this involves replacing sys.stdout and sys.stderr - (libbe.encode.set_IO_stream_encodings). However this messes up - doctest's output catching. In order to support doctest tests - using BugDirs, set manipulate_encodings=False, and stick to ASCII - in your tests. - - if root == None: - root = os.getcwd() - if sink_to_existing_root == True: - self.root = self._find_root(root) - else: - if not os.path.exists(root): - self.root = None - raise NoRootEntry(root) - self.root = root - # get a temporary storage until we've loaded settings - self.sync_with_disk = False - self.storage = self._guess_storage() - - if assert_new_BugDir == True: - if os.path.exists(self.get_path()): - raise AlreadyInitialized, self.get_path() - if storage == None: - storage = self._guess_storage(allow_storage_init) - self.storage = storage - self._setup_user_id(self.user_id) - - - # methods for getting the BugDir situated in the filesystem - - def _find_root(self, path): - ''' - Search for an existing bug database dir and it's ancestors and - return a BugDir rooted there. Only called by __init__, and - then only if sink_to_existing_root == True. - ''' - if not os.path.exists(path): - self.root = None - raise NoRootEntry(path) - versionfile=utility.search_parent_directories(path, - os.path.join(".be", "version")) - if versionfile != None: - beroot = os.path.dirname(versionfile) - root = os.path.dirname(beroot) - return root - else: - beroot = utility.search_parent_directories(path, ".be") - if beroot == None: - self.root = None - raise NoBugDir(path) - return beroot - - def _guess_storage(self, allow_storage_init=False): - ''' - Only called by __init__. - ''' - deepdir = self.get_path() - if not os.path.exists(deepdir): - deepdir = os.path.dirname(deepdir) - new_storage = storage.detect_storage(deepdir) - install = False - if new_storage.name == "None": - if allow_storage_init == True: - new_storage = storage.installed_storage() - new_storage.init(self.root) - return new_storage - -os.listdir(self.get_path("bugs")): """ name = 'None' client = 'false' # command-line tool for _u_invoke_client def __init__(self, *args, **kwargs): if 'encoding' not in kwargs: - kwargs['encoding'] = libbe.util.encoding.get_filesystem_encoding() + kwargs['encoding'] = libbe.util.encoding.get_text_file_encoding() libbe.storage.base.VersionedStorage.__init__(self, *args, **kwargs) self.versioned = False self.interspersed_vcs_files = False @@ -503,6 +409,12 @@ os.listdir(self.get_path("bugs")): """ pass + def _vcs_exists(self, path, revision=None): + """ + Does the path exist in a given revision? (True/False) + """ + raise NotImplementedError('Lazy BE developers') + def _vcs_remove(self, path): """ Remove the file at path from version control. Optionally @@ -547,6 +459,33 @@ os.listdir(self.get_path("bugs")): f.close() return contents + def _vcs_path(self, id, revision): + """ + Return the relative path to object id as of revision. + + Revision will not be None. + """ + raise NotImplementedError + + def _vcs_isdir(self, path, revision): + """ + Return True if path (as returned by _vcs_path) was a directory + as of revision, False otherwise. + + Revision will not be None. + """ + raise NotImplementedError + + def _vcs_listdir(self, path, revision): + """ + Return a list of the contents of the directory path (as + returned by _vcs_path) as of revision. + + Revision will not be None, and ._vcs_isdir(path, revision) + will be True. + """ + raise NotImplementedError + def _vcs_commit(self, commitfile, allow_empty=False): """ Commit the current working directory, using the contents of @@ -569,23 +508,110 @@ os.listdir(self.get_path("bugs")): """ return None + def _vcs_changed(self, revision): + """ + Return a tuple of lists of ids + (new, modified, removed) + from the specified revision to the current situation. + """ + return ([], [], []) + def version(self): # Cache version string for efficiency. if not hasattr(self, '_version'): - self._version = self._get_version() + self._version = self._vcs_version() return self._version - def _get_version(self): - try: - ret = self._vcs_version() - return ret - except OSError, e: - if e.errno == errno.ENOENT: - return None + def version_cmp(self, *args): + """Compare the installed VCS version `V_i` with another version + `V_o` (given in `*args`). Returns + + === =============== + 1 if `V_i > V_o` + 0 if `V_i == V_o` + -1 if `V_i < V_o` + === =============== + + Examples + -------- + + >>> v = VCS(repo='.') + >>> v._version = '2.3.1 (release)' + >>> v.version_cmp(2,3,1) + 0 + >>> v.version_cmp(2,3,2) + -1 + >>> v.version_cmp(2,3,'a',5) + 1 + >>> v.version_cmp(2,3,0) + 1 + >>> v.version_cmp(2,3,1,'a',5) + 1 + >>> v.version_cmp(2,3,1,1) + -1 + >>> v.version_cmp(3) + -1 + >>> v._version = '2.0.0pre2' + >>> v._parsed_version = None + >>> v.version_cmp(3) + -1 + >>> v.version_cmp(2,0,1) + -1 + >>> v.version_cmp(2,0,0,'pre',1) + 1 + >>> v.version_cmp(2,0,0,'pre',2) + 0 + >>> v.version_cmp(2,0,0,'pre',3) + -1 + >>> v.version_cmp(2,0,0,'a',3) + 1 + >>> v.version_cmp(2,0,0,'rc',1) + -1 + """ + if not hasattr(self, '_parsed_version') \ + or self._parsed_version == None: + num_part = self.version().split(' ')[0] + self._parsed_version = [] + for num in num_part.split('.'): + try: + self._parsed_version.append(int(num)) + except ValueError, e: + # bzr version number might contain non-numerical tags + splitter = re.compile(r'[\D]') # Match non-digits + splits = splitter.split(num) + # if len(tag) > 1 some splits will be empty; remove + splits = filter(lambda s: s != '', splits) + tag_starti = len(splits[0]) + num_starti = num.find(splits[1], tag_starti) + tag = num[tag_starti:num_starti] + self._parsed_version.append(int(splits[0])) + self._parsed_version.append(tag) + self._parsed_version.append(int(splits[1])) + for current,other in zip(self._parsed_version, args): + if type(current) != type (other): + # one of them is a pre-release string + if type(current) != types.IntType: + return -1 + else: + return 1 + c = cmp(current,other) + if c != 0: + return c + # see if one is longer than the other + verlen = len(self._parsed_version) + arglen = len(args) + if verlen == arglen: + return 0 + elif verlen > arglen: + if type(self._parsed_version[arglen]) != types.IntType: + return -1 # self is a prerelease else: - raise OSError, e - except CommandError: - return None + return 1 + else: + if type(args[verlen]) != types.IntType: + return 1 # args is a prerelease + else: + return -1 def installed(self): if self.version() != None: @@ -601,6 +627,11 @@ os.listdir(self.get_path("bugs")): """ if not hasattr(self, 'user_id'): self.user_id = self._vcs_get_user_id() + if self.user_id == None: + # guess missing info + name = libbe.ui.util.user.get_fallback_fullname() + email = libbe.ui.util.user.get_fallback_email() + self.user_id = libbe.ui.util.user.create_user_id(name, email) return self.user_id def _detect(self, path='.'): @@ -610,14 +641,33 @@ os.listdir(self.get_path("bugs")): return self._vcs_detect(path) def root(self): - """ - Set the root directory to the path's VCS root. This is the - default working directory for future invocations. + """Set the root directory to the path's VCS root. + + This is the default working directory for future invocations. + Consider the following usage case: + + You have a project rooted in:: + + /path/to/source/ + + by which I mean the VCS repository is in, for example:: + + /path/to/source/.bzr + + However, you're of in some subdirectory like:: + + /path/to/source/ui/testing + + and you want to comment on a bug. `root` will locate your VCS + root (``/path/to/source/``) and set the repo there. This + means that it doesn't matter where you are in your project + tree when you call "be COMMAND", it always acts as if you called + it from the VCS root. """ if self._detect(self.repo) == False: raise VCSUnableToRoot(self) root = self._vcs_root(self.repo) - self.repo = os.path.abspath(root) + self.repo = os.path.realpath(root) if os.path.isdir(self.repo) == False: self.repo = os.path.dirname(self.repo) self.be_dir = os.path.join( @@ -629,6 +679,10 @@ os.listdir(self.get_path("bugs")): """ Begin versioning the tree based at self.repo. Also roots the vcs at path. + + See Also + -------- + root : called if the VCS has already been initialized. """ if not os.path.exists(self.repo) or not os.path.isdir(self.repo): raise VCSUnableToRoot(self) @@ -638,6 +692,7 @@ os.listdir(self.get_path("bugs")): self.root() os.mkdir(self.be_dir) self._vcs_add(self._u_rel_path(self.be_dir)) + self._setup_storage_version() self._cached_path_id.init() def _destroy(self): @@ -652,11 +707,22 @@ os.listdir(self.get_path("bugs")): if not os.path.isdir(self.be_dir): raise libbe.storage.base.ConnectionError(self) self._cached_path_id.connect() - self.check_disk_version() + self.check_storage_version() - def disconnect(self): + def _disconnect(self): self._cached_path_id.disconnect() + def path(self, id, revision=None, relpath=True): + if revision == None: + path = self._cached_path_id.path(id) + if relpath == True: + return self._u_rel_path(path) + return path + path = self._vcs_path(id, revision) + if relpath == True: + return path + return os.path.join(self.repo, path) + def _add_path(self, path, directory=False): relpath = self._u_rel_path(path) reldirs = relpath.split(os.path.sep) @@ -679,6 +745,16 @@ os.listdir(self.get_path("bugs")): path = self._cached_path_id.add_id(id, parent) self._add_path(path, **kwargs) + def _exists(self, id, revision=None): + if revision == None: + try: + path = self.path(id, revision, relpath=False) + except InvalidID, e: + return False + return os.path.exists(path) + path = self.path(id, revision, relpath=True) + return self._vcs_exists(relpath, revision) + def _remove(self, id): path = self._cached_path_id.path(id) if os.path.exists(path): @@ -708,54 +784,83 @@ os.listdir(self.get_path("bugs")): if p.startswith(path): self._cached_path_id.remove_id(id) + def _ancestors(self, id=None, revision=None): + if id==None: + path = self.be_dir + else: + path = self.path(id, revision, relpath=False) + ancestors = [] + while True: + if not path.startswith(self.repo + os.path.sep): + break + path = os.path.dirname(path) + try: + id = self._u_path_to_id(path) + ancestors.append(id) + except (SpacerCollision, InvalidPath): + pass + return ancestors + def _children(self, id=None, revision=None): + if revision == None: + isdir = os.path.isdir + listdir = os.listdir + else: + isdir = lambda path : self._vcs_isdir( + self._u_rel_path(path), revision) + listdir = lambda path : self._vcs_listdir( + self._u_rel_path(path), revision) if id==None: path = self.be_dir else: - path = self._cached_path_id.path(id) - if os.path.isdir(path) == False: + path = self.path(id, revision, relpath=False) + if isdir(path) == False: return [] - children = os.listdir(path) + children = listdir(path) for i,c in enumerate(children): if c in self._cached_path_id._spacer_dirs: children[i] = None children.extend([os.path.join(c, c2) for c2 in - os.listdir(os.path.join(path, c))]) - elif c == 'id-cache': + listdir(os.path.join(path, c))]) + elif c in ['id-cache', 'version']: + children[i] = None + elif self.interspersed_vcs_files \ + and self._vcs_is_versioned(c) == False: children[i] = None for i,c in enumerate(children): if c == None: continue cpath = os.path.join(path, c) if self.interspersed_vcs_files == True \ + and revision != None \ and self._vcs_is_versioned(cpath) == False: children[i] = None else: - children[i] = self._cached_path_id.id(cpath) + children[i] = self._u_path_to_id(cpath) return [c for c in children if c != None] def _get(self, id, default=libbe.util.InvalidObject, revision=None): try: - path = self._cached_path_id.path(id) - except libbe.storage.base.InvalidID, e: + relpath = self.path(id, revision, relpath=True) + contents = self._vcs_get_file_contents(relpath, revision) + except InvalidID, e: if default == libbe.util.InvalidObject: raise e return default - relpath = self._u_rel_path(path) - contents = self._vcs_get_file_contents(relpath,revision) if contents in [libbe.storage.base.InvalidDirectory, - libbe.util.InvalidObject]: - raise libbe.storage.base.InvalidID(id) - elif len(contents) == 0: - return None + libbe.util.InvalidObject] \ + or len(contents) == 0: + if default == libbe.util.InvalidObject: + raise InvalidID(id, revision) + return default return contents def _set(self, id, value): try: path = self._cached_path_id.path(id) - except libbe.storage.base.InvalidID, e: - raise e + except InvalidID, e: + raise if not os.path.exists(path): - raise libbe.storage.base.InvalidID(id) + raise InvalidID(id) if os.path.isdir(path): raise libbe.storage.base.InvalidDirectory(id) f = open(path, "wb") @@ -792,9 +897,22 @@ os.listdir(self.get_path("bugs")): raise libbe.storage.base.InvalidRevision(index) return revid + def changed(self, revision): + new,mod,rem = self._vcs_changed(revision) + def paths_to_ids(paths): + for p in paths: + try: + id = self._u_path_to_id(p) + yield id + except (SpacerCollision, InvalidPath): + pass + new_id = list(paths_to_ids(new)) + mod_id = list(paths_to_ids(mod)) + rem_id = list(paths_to_ids(rem)) + return (new_id, mod_id, rem_id) + def _u_any_in_string(self, list, string): - """ - Return True if any of the strings in list are in string. + """Return True if any of the strings in list are in string. Otherwise return False. """ for list_string in list: @@ -817,9 +935,8 @@ os.listdir(self.get_path("bugs")): return self._u_invoke(cl_args, **kwargs) def _u_search_parent_directories(self, path, filename): - """ - Find the file (or directory) named filename in path or in any - of path's parents. + """Find the file (or directory) named filename in path or in any of + path's parents. e.g. search_parent_directories("/a/b/c", ".be") @@ -830,14 +947,75 @@ os.listdir(self.get_path("bugs")): /.be or None if none of those files exist. """ - return search_parent_directories(path, filename) + try: + ret = search_parent_directories(path, filename) + except AssertionError, e: + return None + return ret + + def _u_find_id_from_manifest(self, id, manifest, revision=None): + """Search for the relative path to id using manifest, a list of all + files. + + Returns None if the id is not found. + """ + be_dir = self._cached_path_id._spacer_dirs[0] + be_dir_sep = self._cached_path_id._spacer_dirs[0] + os.path.sep + files = [f for f in manifest if f.startswith(be_dir_sep)] + for file in files: + if not file.startswith(be_dir+os.path.sep): + continue + parts = file.split(os.path.sep) + dir = parts.pop(0) # don't add the first spacer dir + for part in parts[:-1]: + dir = os.path.join(dir, part) + if not dir in files: + files.append(dir) + for file in files: + try: + p_id = self._u_path_to_id(file) + if p_id == id: + return file + except (SpacerCollision, InvalidPath): + pass + raise InvalidID(id, revision=revision) - def _u_rel_path(self, path, root=None): + def _u_find_id(self, id, revision): + """Search for the relative path to id as of revision. + + Returns None if the id is not found. """ - Return the relative path to path from root. + assert self._rooted == True + be_dir = self._cached_path_id._spacer_dirs[0] + stack = [(be_dir, be_dir)] + while len(stack) > 0: + path,long_id = stack.pop() + if long_id.endswith('/'+id): + return path + if self._vcs_isdir(path, revision) == False: + continue + for child in self._vcs_listdir(path, revision): + stack.append((os.path.join(path, child), + '/'.join([long_id, child]))) + raise InvalidID(id, revision=revision) + + def _u_path_to_id(self, path): + return self._cached_path_id.id(path) + + def _u_rel_path(self, path, root=None): + """Return the relative path to path from root. + + Examples: + >>> vcs = new() >>> vcs._u_rel_path("/a.b/c/.be", "/a.b/c") '.be' + >>> vcs._u_rel_path("/a.b/c/", "/a.b/c") + '.' + >>> vcs._u_rel_path("/a.b/c/", "/a.b/c/") + '.' + >>> vcs._u_rel_path("./a", ".") + 'a' """ if root == None: if self.repo == None: @@ -846,16 +1024,19 @@ os.listdir(self.get_path("bugs")): path = os.path.abspath(path) absRoot = os.path.abspath(root) absRootSlashedDir = os.path.join(absRoot,"") + if path in [absRoot, absRootSlashedDir]: + return '.' if not path.startswith(absRootSlashedDir): raise InvalidPath(path, absRootSlashedDir) - assert path != absRootSlashedDir, \ - "file %s == root directory %s" % (path, absRootSlashedDir) relpath = path[len(absRootSlashedDir):] return relpath def _u_abspath(self, path, root=None): - """ - Return the absolute path from a path realtive to root. + """Return the absolute path from a path relative to root. + + Examples + -------- + >>> vcs = new() >>> vcs._u_abspath(".be", "/a.b/c") '/a.b/c/.be' @@ -866,9 +1047,8 @@ os.listdir(self.get_path("bugs")): return os.path.abspath(os.path.join(root, path)) def _u_parse_commitfile(self, commitfile): - """ - Split the commitfile created in self.commit() back into - summary and header lines. + """Split the commitfile created in self.commit() back into summary and + header lines. """ f = codecs.open(commitfile, 'r', self.encoding) summary = f.readline() @@ -879,21 +1059,43 @@ os.listdir(self.get_path("bugs")): f.close() return (summary, body) - def check_disk_version(self): - version = self.disk_version() - if version != upgrade.BUGDIR_DISK_VERSION: + def check_storage_version(self): + version = self.storage_version() + if version != libbe.storage.STORAGE_VERSION: upgrade.upgrade(self.repo, version) - def disk_version(self, path=None): - """ - Requires disk access. + def storage_version(self, revision=None, path=None): + """Return the storage version of the on-disk files. + + See Also + -------- + libbe.storage.util.upgrade """ if path == None: path = os.path.join(self.repo, '.be', 'version') - return libbe.util.encoding.get_file_contents(path).rstrip('\n') + if not os.path.exists(path): + raise libbe.storage.InvalidStorageVersion(None) + if revision == None: # don't require connection + return libbe.util.encoding.get_file_contents( + path, decode=True).rstrip() + relpath = self._u_rel_path(path) + contents = self._vcs_get_file_contents(relpath, revision=revision) + if type(contents) != types.UnicodeType: + contents = unicode(contents, self.encoding) + return contents.strip() + + def _setup_storage_version(self): + """ + Requires disk access. + """ + assert self._rooted == True + path = os.path.join(self.be_dir, 'version') + if not os.path.exists(path): + libbe.util.encoding.set_file_contents(path, + libbe.storage.STORAGE_VERSION+'\n') + self._vcs_add(self._u_rel_path(path)) - if libbe.TESTING == True: class VCSTestCase (unittest.TestCase): """ @@ -926,8 +1128,7 @@ if libbe.TESTING == True: class VCS_installed_TestCase (VCSTestCase): def test_installed(self): - """ - See if the VCS is installed. + """See if the VCS is installed. """ self.failUnless(self.s.installed() == True, '%(name)s VCS not found' % vars(self.Class)) @@ -935,8 +1136,7 @@ if libbe.TESTING == True: class VCS_detection_TestCase (VCSTestCase): def test_detection(self): - """ - See if the VCS detects its installed repository + """See if the VCS detects its installed repository """ if self.s.installed(): self.s.disconnect() @@ -946,8 +1146,7 @@ if libbe.TESTING == True: self.s.connect() def test_no_detection(self): - """ - See if the VCS detects its installed repository + """See if the VCS detects its installed repository """ if self.s.installed() and self.Class.name != 'None': self.s.disconnect() @@ -970,7 +1169,7 @@ if libbe.TESTING == True: class VCS_get_user_id_TestCase(VCSTestCase): """Test cases for VCS.get_user_id method.""" - def test_gets_existing_user_id(self): + def test_get_existing_user_id(self): """Should get the existing user ID.""" if self.s.installed(): user_id = self.s.get_user_id() @@ -995,7 +1194,8 @@ if libbe.TESTING == True: vcs_testcase_classes = [ c for c in ( ob for ob in globals().values() if isinstance(ob, type)) - if issubclass(c, VCSTestCase)] + if issubclass(c, VCSTestCase) \ + and c.Class == VCS] for base_class in vcs_testcase_classes: testcase_class_name = vcs_class.__name__ + base_class.__name__