X-Git-Url: http://git.tremily.us/?a=blobdiff_plain;f=libbe%2Fstorage%2Fvcs%2Fbase.py;h=d85c94d4226ad66d05cb9eca1b100fb9f76e9829;hb=50444209eee408dde7d240fdf59bfc9e82b714ce;hp=99f43f3a40230bded839a55e054b6daacea24b5b;hpb=b0b5341c4045dd27cfbb3e2585cb2614ed9ad903;p=be.git diff --git a/libbe/storage/vcs/base.py b/libbe/storage/vcs/base.py index 99f43f3..d85c94d 100644 --- a/libbe/storage/vcs/base.py +++ b/libbe/storage/vcs/base.py @@ -1,4 +1,4 @@ -# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. +# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc. # Alexander Belchenko # Ben Finney # Chris Ball @@ -19,10 +19,9 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -""" -Define the base VCS (Version Control System) class, which should be -subclassed by other Version Control System backends. The base class -implements a "do not version" VCS. +"""Define the base :class:`VCS` (Version Control System) class, which +should be subclassed by other Version Control System backends. The +base class implements a "do not version" VCS. """ import codecs @@ -50,11 +49,17 @@ if libbe.TESTING == True: import libbe.ui.util.user -# List VCS modules in order of preference. -# Don't list this module, it is implicitly last. VCS_ORDER = ['arch', 'bzr', 'darcs', 'git', 'hg'] +"""List VCS modules in order of preference. + +Don't list this module, it is implicitly last. +""" def set_preferred_vcs(name): + """Manipulate :data:`VCS_ORDER` to place `name` first. + + This is primarily indended for testing purposes. + """ global VCS_ORDER assert name in VCS_ORDER, \ 'unrecognized VCS %s not in\n %s' % (name, VCS_ORDER) @@ -62,7 +67,10 @@ def set_preferred_vcs(name): VCS_ORDER.insert(0, name) def _get_matching_vcs(matchfn): - """Return the first module for which matchfn(VCS_instance) is true""" + """Return the first module for which matchfn(VCS_instance) is True. + + Searches in :data:`VCS_ORDER`. + """ for submodname in VCS_ORDER: module = import_by_name('libbe.storage.vcs.%s' % submodname) vcs = module.new() @@ -71,17 +79,26 @@ def _get_matching_vcs(matchfn): return VCS() def vcs_by_name(vcs_name): - """Return the module for the VCS with the given name""" + """Return the module for the VCS with the given name. + + Searches in :data:`VCS_ORDER`. + """ if vcs_name == VCS.name: return new() return _get_matching_vcs(lambda vcs: vcs.name == vcs_name) def detect_vcs(dir): - """Return an VCS instance for the vcs being used in this directory""" + """Return an VCS instance for the vcs being used in this directory. + + Searches in :data:`VCS_ORDER`. + """ return _get_matching_vcs(lambda vcs: vcs._detect(dir)) def installed_vcs(): - """Return an instance of an installed VCS""" + """Return an instance of an installed VCS. + + Searches in :data:`VCS_ORDER`. + """ return _get_matching_vcs(lambda vcs: vcs.installed()) @@ -118,10 +135,17 @@ class NoSuchFile (InvalidID): class CachedPathID (object): - """ - Storage ID <-> path policy. - .../.be/BUGDIR/bugs/BUG/comments/COMMENT - ^-- root path + """Cache Storage ID <-> path policy. + + Paths generated following:: + + .../.be/BUGDIR/bugs/BUG/comments/COMMENT + ^-- root path + + See :mod:`libbe.util.id` for a discussion of ID formats. + + Examples + -------- >>> dir = Dir() >>> os.mkdir(os.path.join(dir.path, '.be')) @@ -168,7 +192,7 @@ class CachedPathID (object): >>> c.path('qrs') Traceback (most recent call last): ... - InvalidID: 'qrs' + InvalidID: qrs in revision None >>> c.disconnect() >>> c.destroy() >>> dir.cleanup() @@ -182,13 +206,17 @@ class CachedPathID (object): self._cache_path = os.path.join( self._root, self._spacer_dirs[0], 'id-cache') - def init(self): - """ - Create cache file for an existing .be directory. - File if multiple lines of the form: - UUID\tPATH + def init(self, verbose=True, cache=None): + """Create cache file for an existing .be directory. + + The file contains multiple lines of the form:: + + UUID\tPATH """ - self._cache = {} + if cache == None: + self._cache = {} + else: + self._cache = cache spaced_root = os.path.join(self._root, self._spacer_dirs[0]) for dirpath, dirnames, filenames in os.walk(spaced_root): if dirpath == spaced_root: @@ -197,13 +225,15 @@ class CachedPathID (object): id = self.id(dirpath) relpath = dirpath[len(self._root)+1:] if id.count('/') == 0: - if id in self._cache: + if verbose == True and id in self._cache: print >> sys.stderr, 'Multiple paths for %s: \n %s\n %s' % (id, self._cache[id], relpath) self._cache[id] = relpath except InvalidPath: pass - self._changed = True - self.disconnect() + if self._cache != cache: + self._changed = True + if cache == None: + self.disconnect() def destroy(self): if os.path.exists(self._cache_path): @@ -239,7 +269,9 @@ class CachedPathID (object): else: extra = fields[1:] if uuid not in self._cache: - raise InvalidID(uuid) + self.init(verbose=False, cache=self._cache) + if uuid not in self._cache: + raise InvalidID(uuid) if relpath == True: return os.path.join(self._cache[uuid], *extra) return os.path.join(self._root, self._cache[uuid], *extra) @@ -304,142 +336,13 @@ def new(): return VCS() class VCS (libbe.storage.base.VersionedStorage): - """ - This class implements a 'no-vcs' interface. + """Implement a 'no-VCS' interface. Support for other VCSs can be added by subclassing this class, and - overriding methods _vcs_*() with code appropriate for your VCS. + overriding methods `_vcs_*()` with code appropriate for your VCS. - The methods _u_*() are utility methods available to the _vcs_*() + The methods `_u_*()` are utility methods available to the `_vcs_*()` methods. - - Sink to existing root - ====================== - - Consider the following usage case: - You have a bug directory rooted in - /path/to/source - by which I mean the '.be' directory is at - /path/to/source/.be - However, you're of in some subdirectory like - /path/to/source/GUI/testing - and you want to comment on a bug. Setting sink_to_root=True when - you initialize your BugDir will cause it to search for the '.be' - file in the ancestors of the path you passed in as 'root'. - /path/to/source/GUI/testing/.be miss - /path/to/source/GUI/.be miss - /path/to/source/.be hit! - So it still roots itself appropriately without much work for you. - - File-system access - ================== - - BugDirs live completely in memory when .sync_with_disk is False. - This is the default configuration setup by BugDir(from_disk=False). - If .sync_with_disk == True (e.g. BugDir(from_disk=True)), then - any changes to the BugDir will be immediately written to disk. - - If you want to change .sync_with_disk, we suggest you use - .set_sync_with_disk(), which propogates the new setting through to - all bugs/comments/etc. that have been loaded into memory. If - you've been living in memory and want to move to - .sync_with_disk==True, but you're not sure if anything has been - changed in memory, a call to .save() immediately before the - .set_sync_with_disk(True) call is a safe move. - - Regardless of .sync_with_disk, a call to .save() will write out - all the contents that the BugDir instance has loaded into memory. - If sync_with_disk has been True over the course of all interesting - changes, this .save() call will be a waste of time. - - The BugDir will only load information from the file system when it - loads new settings/bugs/comments that it doesn't already have in - memory and .sync_with_disk == True. - - Allow storage initialization - ======================== - - This one is for testing purposes. Setting it to True allows the - BugDir to search for an installed Storage backend and initialize - it in the root directory. This is a convenience option for - supporting tests of versioning functionality - (e.g. .duplicate_bugdir). - - Disable encoding manipulation - ============================= - - This one is for testing purposed. You might have non-ASCII - Unicode in your bugs, comments, files, etc. BugDir instances try - and support your preferred encoding scheme (e.g. "utf-8") when - dealing with stream and file input/output. For stream output, - this involves replacing sys.stdout and sys.stderr - (libbe.encode.set_IO_stream_encodings). However this messes up - doctest's output catching. In order to support doctest tests - using BugDirs, set manipulate_encodings=False, and stick to ASCII - in your tests. - - if root == None: - root = os.getcwd() - if sink_to_existing_root == True: - self.root = self._find_root(root) - else: - if not os.path.exists(root): - self.root = None - raise NoRootEntry(root) - self.root = root - # get a temporary storage until we've loaded settings - self.sync_with_disk = False - self.storage = self._guess_storage() - - if assert_new_BugDir == True: - if os.path.exists(self.get_path()): - raise AlreadyInitialized, self.get_path() - if storage == None: - storage = self._guess_storage(allow_storage_init) - self.storage = storage - self._setup_user_id(self.user_id) - - - # methods for getting the BugDir situated in the filesystem - - def _find_root(self, path): - ''' - Search for an existing bug database dir and it's ancestors and - return a BugDir rooted there. Only called by __init__, and - then only if sink_to_existing_root == True. - ''' - if not os.path.exists(path): - self.root = None - raise NoRootEntry(path) - versionfile=utility.search_parent_directories(path, - os.path.join(".be", "version")) - if versionfile != None: - beroot = os.path.dirname(versionfile) - root = os.path.dirname(beroot) - return root - else: - beroot = utility.search_parent_directories(path, ".be") - if beroot == None: - self.root = None - raise NoBugDir(path) - return beroot - - def _guess_storage(self, allow_storage_init=False): - ''' - Only called by __init__. - ''' - deepdir = self.get_path() - if not os.path.exists(deepdir): - deepdir = os.path.dirname(deepdir) - new_storage = storage.detect_storage(deepdir) - install = False - if new_storage.name == "None": - if allow_storage_init == True: - new_storage = storage.installed_storage() - new_storage.init(self.root) - return new_storage - -os.listdir(self.get_path("bugs")): """ name = 'None' client = 'false' # command-line tool for _u_invoke_client @@ -504,6 +407,12 @@ os.listdir(self.get_path("bugs")): """ pass + def _vcs_exists(self, path, revision=None): + """ + Does the path exist in a given revision? (True/False) + """ + raise NotImplementedError('Lazy BE developers') + def _vcs_remove(self, path): """ Remove the file at path from version control. Optionally @@ -550,7 +459,7 @@ os.listdir(self.get_path("bugs")): def _vcs_path(self, id, revision): """ - Return the path to object id as of revision. + Return the relative path to object id as of revision. Revision will not be None. """ @@ -597,6 +506,14 @@ os.listdir(self.get_path("bugs")): """ return None + def _vcs_changed(self, revision): + """ + Return a tuple of lists of ids + (new, modified, removed) + from the specified revision to the current situation. + """ + return ([], [], []) + def version(self): # Cache version string for efficiency. if not hasattr(self, '_version'): @@ -638,9 +555,28 @@ os.listdir(self.get_path("bugs")): return self._vcs_detect(path) def root(self): - """ - Set the root directory to the path's VCS root. This is the - default working directory for future invocations. + """Set the root directory to the path's VCS root. + + This is the default working directory for future invocations. + Consider the following usage case: + + You have a project rooted in:: + + /path/to/source/ + + by which I mean the VCS repository is in, for example:: + + /path/to/source/.bzr + + However, you're of in some subdirectory like:: + + /path/to/source/ui/testing + + and you want to comment on a bug. `root` will locate your VCS + root (``/path/to/source/``) and set the repo there. This + means that it doesn't matter where you are in your project + tree when you call "be COMMAND", it always acts as if you called + it from the VCS root. """ if self._detect(self.repo) == False: raise VCSUnableToRoot(self) @@ -657,6 +593,10 @@ os.listdir(self.get_path("bugs")): """ Begin versioning the tree based at self.repo. Also roots the vcs at path. + + See Also + -------- + root : called if the VCS has already been initialized. """ if not os.path.exists(self.repo) or not os.path.isdir(self.repo): raise VCSUnableToRoot(self) @@ -686,6 +626,17 @@ os.listdir(self.get_path("bugs")): def _disconnect(self): self._cached_path_id.disconnect() + def path(self, id, revision=None, relpath=True): + if revision == None: + path = self._cached_path_id.path(id) + if relpath == True: + return self._u_rel_path(path) + return path + path = self._vcs_path(id, revision) + if relpath == True: + return path + return os.path.join(self.repo, path) + def _add_path(self, path, directory=False): relpath = self._u_rel_path(path) reldirs = relpath.split(os.path.sep) @@ -708,6 +659,16 @@ os.listdir(self.get_path("bugs")): path = self._cached_path_id.add_id(id, parent) self._add_path(path, **kwargs) + def _exists(self, id, revision=None): + if revision == None: + try: + path = self.path(id, revision, relpath=False) + except InvalidID, e: + return False + return os.path.exists(path) + path = self.path(id, revision, relpath=True) + return self._vcs_exists(relpath, revision) + def _remove(self, id): path = self._cached_path_id.path(id) if os.path.exists(path): @@ -737,19 +698,36 @@ os.listdir(self.get_path("bugs")): if p.startswith(path): self._cached_path_id.remove_id(id) + def _ancestors(self, id=None, revision=None): + if id==None: + path = self.be_dir + else: + path = self.path(id, revision, relpath=False) + ancestors = [] + while True: + if not path.startswith(self.repo + os.path.sep): + break + path = os.path.dirname(path) + try: + id = self._u_path_to_id(path) + ancestors.append(id) + except (SpacerCollision, InvalidPath): + pass + return ancestors + def _children(self, id=None, revision=None): if revision == None: - id_to_path = self._cached_path_id.path isdir = os.path.isdir listdir = os.listdir else: - id_to_path = lambda id : self._vcs_path(id, revision) - isdir = lambda path : self._vcs_isdir(path, revision) - listdir = lambda path : self._vcs_listdir(path, revision) + isdir = lambda path : self._vcs_isdir( + self._u_rel_path(path), revision) + listdir = lambda path : self._vcs_listdir( + self._u_rel_path(path), revision) if id==None: path = self.be_dir else: - path = id_to_path(id) + path = self.path(id, revision, relpath=False) if isdir(path) == False: return [] children = listdir(path) @@ -777,30 +755,25 @@ os.listdir(self.get_path("bugs")): def _get(self, id, default=libbe.util.InvalidObject, revision=None): try: - path = self._cached_path_id.path(id) + relpath = self.path(id, revision, relpath=True) + contents = self._vcs_get_file_contents(relpath, revision) except InvalidID, e: if default == libbe.util.InvalidObject: raise e return default - relpath = self._u_rel_path(path) - try: - contents = self._vcs_get_file_contents(relpath, revision) - except InvalidID, e: - if InvalidID == None: - e.id = InvalidID - raise if contents in [libbe.storage.base.InvalidDirectory, - libbe.util.InvalidObject]: - raise InvalidID(id) - elif len(contents) == 0: - return None + libbe.util.InvalidObject] \ + or len(contents) == 0: + if default == libbe.util.InvalidObject: + raise InvalidID(id, revision) + return default return contents def _set(self, id, value): try: path = self._cached_path_id.path(id) except InvalidID, e: - raise e + raise if not os.path.exists(path): raise InvalidID(id) if os.path.isdir(path): @@ -839,9 +812,22 @@ os.listdir(self.get_path("bugs")): raise libbe.storage.base.InvalidRevision(index) return revid + def changed(self, revision): + new,mod,rem = self._vcs_changed(revision) + def paths_to_ids(paths): + for p in paths: + try: + id = self._u_path_to_id(p) + yield id + except (SpacerCollision, InvalidPath): + pass + new_id = list(paths_to_ids(new)) + mod_id = list(paths_to_ids(mod)) + rem_id = list(paths_to_ids(rem)) + return (new_id, mod_id, rem_id) + def _u_any_in_string(self, list, string): - """ - Return True if any of the strings in list are in string. + """Return True if any of the strings in list are in string. Otherwise return False. """ for list_string in list: @@ -864,9 +850,8 @@ os.listdir(self.get_path("bugs")): return self._u_invoke(cl_args, **kwargs) def _u_search_parent_directories(self, path, filename): - """ - Find the file (or directory) named filename in path or in any - of path's parents. + """Find the file (or directory) named filename in path or in any of + path's parents. e.g. search_parent_directories("/a/b/c", ".be") @@ -877,11 +862,42 @@ os.listdir(self.get_path("bugs")): /.be or None if none of those files exist. """ - return search_parent_directories(path, filename) + try: + ret = search_parent_directories(path, filename) + except AssertionError, e: + return None + return ret - def _u_find_id(self, id, revision): + def _u_find_id_from_manifest(self, id, manifest, revision=None): + """Search for the relative path to id using manifest, a list of all + files. + + Returns None if the id is not found. """ - Search for the relative path to id as of revision. + be_dir = self._cached_path_id._spacer_dirs[0] + be_dir_sep = self._cached_path_id._spacer_dirs[0] + os.path.sep + files = [f for f in manifest if f.startswith(be_dir_sep)] + for file in files: + if not file.startswith(be_dir+os.path.sep): + continue + parts = file.split(os.path.sep) + dir = parts.pop(0) # don't add the first spacer dir + for part in parts[:-1]: + dir = os.path.join(dir, part) + if not dir in files: + files.append(dir) + for file in files: + try: + p_id = self._u_path_to_id(file) + if p_id == id: + return file + except (SpacerCollision, InvalidPath): + pass + raise InvalidID(id, revision=revision) + + def _u_find_id(self, id, revision): + """Search for the relative path to id as of revision. + Returns None if the id is not found. """ assert self._rooted == True @@ -902,8 +918,10 @@ os.listdir(self.get_path("bugs")): return self._cached_path_id.id(path) def _u_rel_path(self, path, root=None): - """ - Return the relative path to path from root. + """Return the relative path to path from root. + + Examples: + >>> vcs = new() >>> vcs._u_rel_path("/a.b/c/.be", "/a.b/c") '.be' @@ -929,8 +947,11 @@ os.listdir(self.get_path("bugs")): return relpath def _u_abspath(self, path, root=None): - """ - Return the absolute path from a path realtive to root. + """Return the absolute path from a path realtive to root. + + Examples + -------- + >>> vcs = new() >>> vcs._u_abspath(".be", "/a.b/c") '/a.b/c/.be' @@ -941,9 +962,8 @@ os.listdir(self.get_path("bugs")): return os.path.abspath(os.path.join(root, path)) def _u_parse_commitfile(self, commitfile): - """ - Split the commitfile created in self.commit() back into - summary and header lines. + """Split the commitfile created in self.commit() back into summary and + header lines. """ f = codecs.open(commitfile, 'r', self.encoding) summary = f.readline() @@ -960,8 +980,11 @@ os.listdir(self.get_path("bugs")): upgrade.upgrade(self.repo, version) def storage_version(self, revision=None, path=None): - """ - Requires disk access. + """Return the storage version of the on-disk files. + + See Also + -------- + :mod:`libbe.storage.util.upgrade` """ if path == None: path = os.path.join(self.repo, '.be', 'version') @@ -970,7 +993,8 @@ os.listdir(self.get_path("bugs")): if revision == None: # don't require connection return libbe.util.encoding.get_file_contents( path, decode=True).rstrip('\n') - contents = self._vcs_get_file_contents(path, revision=revision) + relpath = self._u_rel_path(path) + contents = self._vcs_get_file_contents(relpath, revision=revision) if type(contents) != types.UnicodeType: contents = unicode(contents, self.encoding) return contents.strip() @@ -1019,8 +1043,7 @@ if libbe.TESTING == True: class VCS_installed_TestCase (VCSTestCase): def test_installed(self): - """ - See if the VCS is installed. + """See if the VCS is installed. """ self.failUnless(self.s.installed() == True, '%(name)s VCS not found' % vars(self.Class)) @@ -1028,8 +1051,7 @@ if libbe.TESTING == True: class VCS_detection_TestCase (VCSTestCase): def test_detection(self): - """ - See if the VCS detects its installed repository + """See if the VCS detects its installed repository """ if self.s.installed(): self.s.disconnect() @@ -1039,8 +1061,7 @@ if libbe.TESTING == True: self.s.connect() def test_no_detection(self): - """ - See if the VCS detects its installed repository + """See if the VCS detects its installed repository """ if self.s.installed() and self.Class.name != 'None': self.s.disconnect()