-# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc.
+# Copyright (C) 2005-2012 Aaron Bentley <abentley@panoramicfeedback.com>
# Alexander Belchenko <bialix@ukr.net>
# Ben Finney <benf@cybersource.com.au>
# Chris Ball <cjb@laptop.org>
# Gianluca Montecchi <gian@grys.it>
-# W. Trevor King <wking@drexel.edu>
+# W. Trevor King <wking@tremily.us>
#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
+# This file is part of Bugs Everywhere.
#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
+# Bugs Everywhere is free software: you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 2 of the License, or (at your option) any
+# later version.
#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+# Bugs Everywhere is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# Bugs Everywhere. If not, see <http://www.gnu.org/licenses/>.
-"""
-Define the base VCS (Version Control System) class, which should be
-subclassed by other Version Control System backends. The base class
-implements a "do not version" VCS.
+"""Define the base :py:class:`VCS` (Version Control System) class, which
+should be subclassed by other Version Control System backends. The
+base class implements a "do not version" VCS.
"""
import codecs
import libbe.ui.util.user
-# List VCS modules in order of preference.
-# Don't list this module, it is implicitly last.
-VCS_ORDER = ['arch', 'bzr', 'darcs', 'git', 'hg']
+VCS_ORDER = ['arch', 'bzr', 'darcs', 'git', 'hg', 'monotone']
+"""List VCS modules in order of preference.
+
+Don't list this module, it is implicitly last.
+"""
def set_preferred_vcs(name):
+ """Manipulate :py:data:`VCS_ORDER` to place `name` first.
+
+ This is primarily indended for testing purposes.
+ """
global VCS_ORDER
assert name in VCS_ORDER, \
'unrecognized VCS %s not in\n %s' % (name, VCS_ORDER)
VCS_ORDER.insert(0, name)
def _get_matching_vcs(matchfn):
- """Return the first module for which matchfn(VCS_instance) is true"""
+ """Return the first module for which matchfn(VCS_instance) is True.
+
+ Searches in :py:data:`VCS_ORDER`.
+ """
for submodname in VCS_ORDER:
module = import_by_name('libbe.storage.vcs.%s' % submodname)
vcs = module.new()
return VCS()
def vcs_by_name(vcs_name):
- """Return the module for the VCS with the given name"""
+ """Return the module for the VCS with the given name.
+
+ Searches in :py:data:`VCS_ORDER`.
+ """
if vcs_name == VCS.name:
return new()
return _get_matching_vcs(lambda vcs: vcs.name == vcs_name)
def detect_vcs(dir):
- """Return an VCS instance for the vcs being used in this directory"""
+ """Return an VCS instance for the vcs being used in this directory.
+
+ Searches in :py:data:`VCS_ORDER`.
+ """
return _get_matching_vcs(lambda vcs: vcs._detect(dir))
def installed_vcs():
- """Return an instance of an installed VCS"""
+ """Return an instance of an installed VCS.
+
+ Searches in :py:data:`VCS_ORDER`.
+ """
return _get_matching_vcs(lambda vcs: vcs.installed())
class CachedPathID (object):
- """
- Storage ID <-> path policy.
- .../.be/BUGDIR/bugs/BUG/comments/COMMENT
- ^-- root path
+ """Cache Storage ID <-> path policy.
+
+ Paths generated following::
+
+ .../.be/BUGDIR/bugs/BUG/comments/COMMENT
+ ^-- root path
+
+ See :py:mod:`libbe.util.id` for a discussion of ID formats.
+
+ Examples
+ --------
>>> dir = Dir()
>>> os.mkdir(os.path.join(dir.path, '.be'))
>>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments'))
>>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def'))
>>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '456'))
- >>> file(os.path.join(dir.path, '.be', 'abc', 'values'),
+ >>> open(os.path.join(dir.path, '.be', 'abc', 'values'),
... 'w').close()
- >>> file(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'values'),
+ >>> open(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'values'),
... 'w').close()
- >>> file(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'),
+ >>> open(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'),
... 'w').close()
>>> c = CachedPathID()
>>> c.root(dir.path)
>>> c.path('qrs')
Traceback (most recent call last):
...
- InvalidID: 'qrs'
+ InvalidID: qrs in revision None
>>> c.disconnect()
>>> c.destroy()
>>> dir.cleanup()
"""
def __init__(self, encoding=None):
- self.encoding = libbe.util.encoding.get_filesystem_encoding()
+ self.encoding = libbe.util.encoding.get_text_file_encoding()
self._spacer_dirs = ['.be', 'bugs', 'comments']
def root(self, path):
self._cache_path = os.path.join(
self._root, self._spacer_dirs[0], 'id-cache')
- def init(self):
- """
- Create cache file for an existing .be directory.
- File if multiple lines of the form:
- UUID\tPATH
+ def init(self, verbose=True, cache=None):
+ """Create cache file for an existing .be directory.
+
+ The file contains multiple lines of the form::
+
+ UUID\tPATH
"""
- self._cache = {}
+ if cache == None:
+ self._cache = {}
+ else:
+ self._cache = cache
spaced_root = os.path.join(self._root, self._spacer_dirs[0])
- for dirpath, dirnames, filenames in os.walk(spaced_root):
+ for dirpath, dirnames, filenames in os.walk(spaced_root,
+ followlinks=True):
if dirpath == spaced_root:
continue
try:
id = self.id(dirpath)
- relpath = dirpath[len(self._root)+1:]
+ relpath = dirpath[len(self._root + os.path.sep):]
if id.count('/') == 0:
- if id in self._cache:
+ if verbose == True and id in self._cache:
print >> sys.stderr, 'Multiple paths for %s: \n %s\n %s' % (id, self._cache[id], relpath)
self._cache[id] = relpath
except InvalidPath:
pass
- self._changed = True
- self.disconnect()
+ if self._cache != cache:
+ self._changed = True
+ if cache == None:
+ self.disconnect()
def destroy(self):
if os.path.exists(self._cache_path):
else:
extra = fields[1:]
if uuid not in self._cache:
- raise InvalidID(uuid)
+ self.init(verbose=False, cache=self._cache)
+ if uuid not in self._cache:
+ raise InvalidID(uuid)
if relpath == True:
return os.path.join(self._cache[uuid], *extra)
return os.path.join(self._root, self._cache[uuid], *extra)
path = os.path.join(self._root, path)
if not path.startswith(self._root + os.path.sep):
raise InvalidPath(path, self._root)
- path = path[len(self._root)+1:]
+ path = path[len(self._root + os.path.sep):]
orig_path = path
if not path.startswith(self._spacer_dirs[0] + os.path.sep):
raise InvalidPath(path, self._spacer_dirs[0])
for spacer in self._spacer_dirs:
if not path.startswith(spacer + os.path.sep):
break
- id = path[len(spacer)+1:]
- fields = path[len(spacer)+1:].split(os.path.sep,1)
+ id = path[len(spacer + os.path.sep):]
+ fields = path[len(spacer + os.path.sep):].split(os.path.sep,1)
if len(fields) == 1:
break
path = fields[1]
return VCS()
class VCS (libbe.storage.base.VersionedStorage):
- """
- This class implements a 'no-vcs' interface.
+ """Implement a 'no-VCS' interface.
Support for other VCSs can be added by subclassing this class, and
- overriding methods _vcs_*() with code appropriate for your VCS.
+ overriding methods `_vcs_*()` with code appropriate for your VCS.
- The methods _u_*() are utility methods available to the _vcs_*()
+ The methods `_u_*()` are utility methods available to the `_vcs_*()`
methods.
-
- Sink to existing root
- ======================
-
- Consider the following usage case:
- You have a bug directory rooted in
- /path/to/source
- by which I mean the '.be' directory is at
- /path/to/source/.be
- However, you're of in some subdirectory like
- /path/to/source/GUI/testing
- and you want to comment on a bug. Setting sink_to_root=True when
- you initialize your BugDir will cause it to search for the '.be'
- file in the ancestors of the path you passed in as 'root'.
- /path/to/source/GUI/testing/.be miss
- /path/to/source/GUI/.be miss
- /path/to/source/.be hit!
- So it still roots itself appropriately without much work for you.
-
- File-system access
- ==================
-
- BugDirs live completely in memory when .sync_with_disk is False.
- This is the default configuration setup by BugDir(from_disk=False).
- If .sync_with_disk == True (e.g. BugDir(from_disk=True)), then
- any changes to the BugDir will be immediately written to disk.
-
- If you want to change .sync_with_disk, we suggest you use
- .set_sync_with_disk(), which propogates the new setting through to
- all bugs/comments/etc. that have been loaded into memory. If
- you've been living in memory and want to move to
- .sync_with_disk==True, but you're not sure if anything has been
- changed in memory, a call to .save() immediately before the
- .set_sync_with_disk(True) call is a safe move.
-
- Regardless of .sync_with_disk, a call to .save() will write out
- all the contents that the BugDir instance has loaded into memory.
- If sync_with_disk has been True over the course of all interesting
- changes, this .save() call will be a waste of time.
-
- The BugDir will only load information from the file system when it
- loads new settings/bugs/comments that it doesn't already have in
- memory and .sync_with_disk == True.
-
- Allow storage initialization
- ========================
-
- This one is for testing purposes. Setting it to True allows the
- BugDir to search for an installed Storage backend and initialize
- it in the root directory. This is a convenience option for
- supporting tests of versioning functionality
- (e.g. .duplicate_bugdir).
-
- Disable encoding manipulation
- =============================
-
- This one is for testing purposed. You might have non-ASCII
- Unicode in your bugs, comments, files, etc. BugDir instances try
- and support your preferred encoding scheme (e.g. "utf-8") when
- dealing with stream and file input/output. For stream output,
- this involves replacing sys.stdout and sys.stderr
- (libbe.encode.set_IO_stream_encodings). However this messes up
- doctest's output catching. In order to support doctest tests
- using BugDirs, set manipulate_encodings=False, and stick to ASCII
- in your tests.
-
- if root == None:
- root = os.getcwd()
- if sink_to_existing_root == True:
- self.root = self._find_root(root)
- else:
- if not os.path.exists(root):
- self.root = None
- raise NoRootEntry(root)
- self.root = root
- # get a temporary storage until we've loaded settings
- self.sync_with_disk = False
- self.storage = self._guess_storage()
-
- if assert_new_BugDir == True:
- if os.path.exists(self.get_path()):
- raise AlreadyInitialized, self.get_path()
- if storage == None:
- storage = self._guess_storage(allow_storage_init)
- self.storage = storage
- self._setup_user_id(self.user_id)
-
-
- # methods for getting the BugDir situated in the filesystem
-
- def _find_root(self, path):
- '''
- Search for an existing bug database dir and it's ancestors and
- return a BugDir rooted there. Only called by __init__, and
- then only if sink_to_existing_root == True.
- '''
- if not os.path.exists(path):
- self.root = None
- raise NoRootEntry(path)
- versionfile=utility.search_parent_directories(path,
- os.path.join(".be", "version"))
- if versionfile != None:
- beroot = os.path.dirname(versionfile)
- root = os.path.dirname(beroot)
- return root
- else:
- beroot = utility.search_parent_directories(path, ".be")
- if beroot == None:
- self.root = None
- raise NoBugDir(path)
- return beroot
-
- def _guess_storage(self, allow_storage_init=False):
- '''
- Only called by __init__.
- '''
- deepdir = self.get_path()
- if not os.path.exists(deepdir):
- deepdir = os.path.dirname(deepdir)
- new_storage = storage.detect_storage(deepdir)
- install = False
- if new_storage.name == "None":
- if allow_storage_init == True:
- new_storage = storage.installed_storage()
- new_storage.init(self.root)
- return new_storage
-
-os.listdir(self.get_path("bugs")):
"""
name = 'None'
client = 'false' # command-line tool for _u_invoke_client
def __init__(self, *args, **kwargs):
if 'encoding' not in kwargs:
- kwargs['encoding'] = libbe.util.encoding.get_filesystem_encoding()
+ kwargs['encoding'] = libbe.util.encoding.get_text_file_encoding()
libbe.storage.base.VersionedStorage.__init__(self, *args, **kwargs)
self.versioned = False
self.interspersed_vcs_files = False
"""
pass
+ def _vcs_exists(self, path, revision=None):
+ """
+ Does the path exist in a given revision? (True/False)
+ """
+ raise NotImplementedError('Lazy BE developers')
+
def _vcs_remove(self, path):
"""
Remove the file at path from version control. Optionally
def _vcs_path(self, id, revision):
"""
- Return the path to object id as of revision.
+ Return the relative path to object id as of revision.
Revision will not be None.
"""
"""
return None
+ def _vcs_changed(self, revision):
+ """
+ Return a tuple of lists of ids
+ (new, modified, removed)
+ from the specified revision to the current situation.
+ """
+ return ([], [], [])
+
def version(self):
# Cache version string for efficiency.
if not hasattr(self, '_version'):
- self._version = self._get_version()
+ self._version = self._vcs_version()
return self._version
- def _get_version(self):
- try:
- ret = self._vcs_version()
- return ret
- except OSError, e:
- if e.errno == errno.ENOENT:
- return None
+ def version_cmp(self, *args):
+ """Compare the installed VCS version `V_i` with another version
+ `V_o` (given in `*args`). Returns
+
+ === ===============
+ 1 if `V_i > V_o`
+ 0 if `V_i == V_o`
+ -1 if `V_i < V_o`
+ === ===============
+
+ Examples
+ --------
+
+ >>> v = VCS(repo='.')
+ >>> v._version = '2.3.1 (release)'
+ >>> v.version_cmp(2,3,1)
+ 0
+ >>> v.version_cmp(2,3,2)
+ -1
+ >>> v.version_cmp(2,3,'a',5)
+ 1
+ >>> v.version_cmp(2,3,0)
+ 1
+ >>> v.version_cmp(2,3,1,'a',5)
+ 1
+ >>> v.version_cmp(2,3,1,1)
+ -1
+ >>> v.version_cmp(3)
+ -1
+ >>> v._version = '2.0.0pre2'
+ >>> v._parsed_version = None
+ >>> v.version_cmp(3)
+ -1
+ >>> v.version_cmp(2,0,1)
+ -1
+ >>> v.version_cmp(2,0,0,'pre',1)
+ 1
+ >>> v.version_cmp(2,0,0,'pre',2)
+ 0
+ >>> v.version_cmp(2,0,0,'pre',3)
+ -1
+ >>> v.version_cmp(2,0,0,'a',3)
+ 1
+ >>> v.version_cmp(2,0,0,'rc',1)
+ -1
+ """
+ if not hasattr(self, '_parsed_version') \
+ or self._parsed_version == None:
+ num_part = self.version().split(' ')[0]
+ self._parsed_version = []
+ for num in num_part.split('.'):
+ try:
+ self._parsed_version.append(int(num))
+ except ValueError, e:
+ # bzr version number might contain non-numerical tags
+ splitter = re.compile(r'[\D]') # Match non-digits
+ splits = splitter.split(num)
+ # if len(tag) > 1 some splits will be empty; remove
+ splits = filter(lambda s: s != '', splits)
+ tag_starti = len(splits[0])
+ num_starti = num.find(splits[1], tag_starti)
+ tag = num[tag_starti:num_starti]
+ self._parsed_version.append(int(splits[0]))
+ self._parsed_version.append(tag)
+ self._parsed_version.append(int(splits[1]))
+ for current,other in zip(self._parsed_version, args):
+ if type(current) != type (other):
+ # one of them is a pre-release string
+ if type(current) != types.IntType:
+ return -1
+ else:
+ return 1
+ c = cmp(current,other)
+ if c != 0:
+ return c
+ # see if one is longer than the other
+ verlen = len(self._parsed_version)
+ arglen = len(args)
+ if verlen == arglen:
+ return 0
+ elif verlen > arglen:
+ if type(self._parsed_version[arglen]) != types.IntType:
+ return -1 # self is a prerelease
else:
- raise OSError, e
- except CommandError:
- return None
+ return 1
+ else:
+ if type(args[verlen]) != types.IntType:
+ return 1 # args is a prerelease
+ else:
+ return -1
def installed(self):
if self.version() != None:
"""
if not hasattr(self, 'user_id'):
self.user_id = self._vcs_get_user_id()
+ if self.user_id == None:
+ # guess missing info
+ name = libbe.ui.util.user.get_fallback_fullname()
+ email = libbe.ui.util.user.get_fallback_email()
+ self.user_id = libbe.ui.util.user.create_user_id(name, email)
return self.user_id
def _detect(self, path='.'):
return self._vcs_detect(path)
def root(self):
- """
- Set the root directory to the path's VCS root. This is the
- default working directory for future invocations.
+ """Set the root directory to the path's VCS root.
+
+ This is the default working directory for future invocations.
+ Consider the following usage case:
+
+ You have a project rooted in::
+
+ /path/to/source/
+
+ by which I mean the VCS repository is in, for example::
+
+ /path/to/source/.bzr
+
+ However, you're of in some subdirectory like::
+
+ /path/to/source/ui/testing
+
+ and you want to comment on a bug. `root` will locate your VCS
+ root (``/path/to/source/``) and set the repo there. This
+ means that it doesn't matter where you are in your project
+ tree when you call "be COMMAND", it always acts as if you called
+ it from the VCS root.
"""
if self._detect(self.repo) == False:
raise VCSUnableToRoot(self)
root = self._vcs_root(self.repo)
- self.repo = os.path.abspath(root)
+ self.repo = os.path.realpath(root)
if os.path.isdir(self.repo) == False:
self.repo = os.path.dirname(self.repo)
self.be_dir = os.path.join(
"""
Begin versioning the tree based at self.repo.
Also roots the vcs at path.
+
+ See Also
+ --------
+ root : called if the VCS has already been initialized.
"""
if not os.path.exists(self.repo) or not os.path.isdir(self.repo):
raise VCSUnableToRoot(self)
def _disconnect(self):
self._cached_path_id.disconnect()
+ def path(self, id, revision=None, relpath=True):
+ if revision == None:
+ path = self._cached_path_id.path(id)
+ if relpath == True:
+ return self._u_rel_path(path)
+ return path
+ path = self._vcs_path(id, revision)
+ if relpath == True:
+ return path
+ return os.path.join(self.repo, path)
+
def _add_path(self, path, directory=False):
relpath = self._u_rel_path(path)
reldirs = relpath.split(os.path.sep)
path = self._cached_path_id.add_id(id, parent)
self._add_path(path, **kwargs)
+ def _exists(self, id, revision=None):
+ if revision == None:
+ try:
+ path = self.path(id, revision, relpath=False)
+ except InvalidID, e:
+ return False
+ return os.path.exists(path)
+ path = self.path(id, revision, relpath=True)
+ return self._vcs_exists(relpath, revision)
+
def _remove(self, id):
path = self._cached_path_id.path(id)
if os.path.exists(path):
if p.startswith(path):
self._cached_path_id.remove_id(id)
+ def _ancestors(self, id=None, revision=None):
+ if id==None:
+ path = self.be_dir
+ else:
+ path = self.path(id, revision, relpath=False)
+ ancestors = []
+ while True:
+ if not path.startswith(self.repo + os.path.sep):
+ break
+ path = os.path.dirname(path)
+ try:
+ id = self._u_path_to_id(path)
+ ancestors.append(id)
+ except (SpacerCollision, InvalidPath):
+ pass
+ return ancestors
+
def _children(self, id=None, revision=None):
if revision == None:
- id_to_path = self._cached_path_id.path
isdir = os.path.isdir
listdir = os.listdir
else:
- id_to_path = lambda id : self._vcs_path(id, revision)
- isdir = lambda path : self._vcs_isdir(path, revision)
- listdir = lambda path : self._vcs_listdir(path, revision)
+ isdir = lambda path : self._vcs_isdir(
+ self._u_rel_path(path), revision)
+ listdir = lambda path : self._vcs_listdir(
+ self._u_rel_path(path), revision)
if id==None:
path = self.be_dir
else:
- path = id_to_path(id)
+ path = self.path(id, revision, relpath=False)
if isdir(path) == False:
return []
children = listdir(path)
children[i] = None
else:
children[i] = self._u_path_to_id(cpath)
- children[i]
return [c for c in children if c != None]
def _get(self, id, default=libbe.util.InvalidObject, revision=None):
try:
- path = self._cached_path_id.path(id)
+ relpath = self.path(id, revision, relpath=True)
+ contents = self._vcs_get_file_contents(relpath, revision)
except InvalidID, e:
if default == libbe.util.InvalidObject:
raise e
return default
- relpath = self._u_rel_path(path)
- try:
- contents = self._vcs_get_file_contents(relpath, revision)
- except InvalidID, e:
- if InvalidID == None:
- e.id = InvalidID
- raise
if contents in [libbe.storage.base.InvalidDirectory,
- libbe.util.InvalidObject]:
- raise InvalidID(id)
- elif len(contents) == 0:
- return None
+ libbe.util.InvalidObject] \
+ or len(contents) == 0:
+ if default == libbe.util.InvalidObject:
+ raise InvalidID(id, revision)
+ return default
return contents
def _set(self, id, value):
raise libbe.storage.base.InvalidRevision(index)
return revid
+ def changed(self, revision):
+ new,mod,rem = self._vcs_changed(revision)
+ def paths_to_ids(paths):
+ for p in paths:
+ try:
+ id = self._u_path_to_id(p)
+ yield id
+ except (SpacerCollision, InvalidPath):
+ pass
+ new_id = list(paths_to_ids(new))
+ mod_id = list(paths_to_ids(mod))
+ rem_id = list(paths_to_ids(rem))
+ return (new_id, mod_id, rem_id)
+
def _u_any_in_string(self, list, string):
- """
- Return True if any of the strings in list are in string.
+ """Return True if any of the strings in list are in string.
Otherwise return False.
"""
for list_string in list:
return self._u_invoke(cl_args, **kwargs)
def _u_search_parent_directories(self, path, filename):
- """
- Find the file (or directory) named filename in path or in any
- of path's parents.
+ """Find the file (or directory) named filename in path or in any of
+ path's parents.
e.g.
search_parent_directories("/a/b/c", ".be")
return None
return ret
- def _u_find_id(self, id, revision):
+ def _u_find_id_from_manifest(self, id, manifest, revision=None):
+ """Search for the relative path to id using manifest, a list of all
+ files.
+
+ Returns None if the id is not found.
"""
- Search for the relative path to id as of revision.
+ be_dir = self._cached_path_id._spacer_dirs[0]
+ be_dir_sep = self._cached_path_id._spacer_dirs[0] + os.path.sep
+ files = [f for f in manifest if f.startswith(be_dir_sep)]
+ for file in files:
+ if not file.startswith(be_dir+os.path.sep):
+ continue
+ parts = file.split(os.path.sep)
+ dir = parts.pop(0) # don't add the first spacer dir
+ for part in parts[:-1]:
+ dir = os.path.join(dir, part)
+ if not dir in files:
+ files.append(dir)
+ for file in files:
+ try:
+ p_id = self._u_path_to_id(file)
+ if p_id == id:
+ return file
+ except (SpacerCollision, InvalidPath):
+ pass
+ raise InvalidID(id, revision=revision)
+
+ def _u_find_id(self, id, revision):
+ """Search for the relative path to id as of revision.
+
Returns None if the id is not found.
"""
assert self._rooted == True
return self._cached_path_id.id(path)
def _u_rel_path(self, path, root=None):
- """
- Return the relative path to path from root.
+ """Return the relative path to path from root.
+
+ Examples:
+
>>> vcs = new()
>>> vcs._u_rel_path("/a.b/c/.be", "/a.b/c")
'.be'
return relpath
def _u_abspath(self, path, root=None):
- """
- Return the absolute path from a path realtive to root.
+ """Return the absolute path from a path relative to root.
+
+ Examples
+ --------
+
>>> vcs = new()
>>> vcs._u_abspath(".be", "/a.b/c")
'/a.b/c/.be'
return os.path.abspath(os.path.join(root, path))
def _u_parse_commitfile(self, commitfile):
- """
- Split the commitfile created in self.commit() back into
- summary and header lines.
+ """Split the commitfile created in self.commit() back into summary and
+ header lines.
"""
f = codecs.open(commitfile, 'r', self.encoding)
summary = f.readline()
upgrade.upgrade(self.repo, version)
def storage_version(self, revision=None, path=None):
- """
- Requires disk access.
+ """Return the storage version of the on-disk files.
+
+ See Also
+ --------
+ libbe.storage.util.upgrade
"""
if path == None:
path = os.path.join(self.repo, '.be', 'version')
raise libbe.storage.InvalidStorageVersion(None)
if revision == None: # don't require connection
return libbe.util.encoding.get_file_contents(
- path, decode=True).rstrip('\n')
- contents = self._vcs_get_file_contents(path, revision=revision)
+ path, decode=True).rstrip()
+ relpath = self._u_rel_path(path)
+ contents = self._vcs_get_file_contents(relpath, revision=revision)
if type(contents) != types.UnicodeType:
contents = unicode(contents, self.encoding)
return contents.strip()
libbe.storage.STORAGE_VERSION+'\n')
self._vcs_add(self._u_rel_path(path))
-\f
+
if libbe.TESTING == True:
class VCSTestCase (unittest.TestCase):
"""
class VCS_installed_TestCase (VCSTestCase):
def test_installed(self):
- """
- See if the VCS is installed.
+ """See if the VCS is installed.
"""
self.failUnless(self.s.installed() == True,
'%(name)s VCS not found' % vars(self.Class))
class VCS_detection_TestCase (VCSTestCase):
def test_detection(self):
- """
- See if the VCS detects its installed repository
+ """See if the VCS detects its installed repository
"""
if self.s.installed():
self.s.disconnect()
self.s.connect()
def test_no_detection(self):
- """
- See if the VCS detects its installed repository
+ """See if the VCS detects its installed repository
"""
if self.s.installed() and self.Class.name != 'None':
self.s.disconnect()
class VCS_get_user_id_TestCase(VCSTestCase):
"""Test cases for VCS.get_user_id method."""
- def test_gets_existing_user_id(self):
+ def test_get_existing_user_id(self):
"""Should get the existing user ID."""
if self.s.installed():
user_id = self.s.get_user_id()