-# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
+# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc.
# Alexander Belchenko <bialix@ukr.net>
# Ben Finney <benf@cybersource.com.au>
# Chris Ball <cjb@laptop.org>
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-"""
-Define the base VCS (Version Control System) class, which should be
-subclassed by other Version Control System backends. The base class
-implements a "do not version" VCS.
+"""Define the base :class:`VCS` (Version Control System) class, which
+should be subclassed by other Version Control System backends. The
+base class implements a "do not version" VCS.
"""
import codecs
import libbe.ui.util.user
-# List VCS modules in order of preference.
-# Don't list this module, it is implicitly last.
VCS_ORDER = ['arch', 'bzr', 'darcs', 'git', 'hg']
+"""List VCS modules in order of preference.
+
+Don't list this module, it is implicitly last.
+"""
def set_preferred_vcs(name):
+ """Manipulate :data:`VCS_ORDER` to place `name` first.
+
+ This is primarily indended for testing purposes.
+ """
global VCS_ORDER
assert name in VCS_ORDER, \
'unrecognized VCS %s not in\n %s' % (name, VCS_ORDER)
VCS_ORDER.insert(0, name)
def _get_matching_vcs(matchfn):
- """Return the first module for which matchfn(VCS_instance) is true"""
+ """Return the first module for which matchfn(VCS_instance) is True.
+
+ Searches in :data:`VCS_ORDER`.
+ """
for submodname in VCS_ORDER:
module = import_by_name('libbe.storage.vcs.%s' % submodname)
vcs = module.new()
return VCS()
def vcs_by_name(vcs_name):
- """Return the module for the VCS with the given name"""
+ """Return the module for the VCS with the given name.
+
+ Searches in :data:`VCS_ORDER`.
+ """
if vcs_name == VCS.name:
return new()
return _get_matching_vcs(lambda vcs: vcs.name == vcs_name)
def detect_vcs(dir):
- """Return an VCS instance for the vcs being used in this directory"""
+ """Return an VCS instance for the vcs being used in this directory.
+
+ Searches in :data:`VCS_ORDER`.
+ """
return _get_matching_vcs(lambda vcs: vcs._detect(dir))
def installed_vcs():
- """Return an instance of an installed VCS"""
+ """Return an instance of an installed VCS.
+
+ Searches in :data:`VCS_ORDER`.
+ """
return _get_matching_vcs(lambda vcs: vcs.installed())
class CachedPathID (object):
- """
- Storage ID <-> path policy.
- .../.be/BUGDIR/bugs/BUG/comments/COMMENT
- ^-- root path
+ """Cache Storage ID <-> path policy.
+
+ Paths generated following::
+
+ .../.be/BUGDIR/bugs/BUG/comments/COMMENT
+ ^-- root path
+
+ See :mod:`libbe.util.id` for a discussion of ID formats.
+
+ Examples
+ --------
>>> dir = Dir()
>>> os.mkdir(os.path.join(dir.path, '.be'))
>>> c.path('qrs')
Traceback (most recent call last):
...
- InvalidID: 'qrs'
+ InvalidID: qrs in revision None
>>> c.disconnect()
>>> c.destroy()
>>> dir.cleanup()
self._cache_path = os.path.join(
self._root, self._spacer_dirs[0], 'id-cache')
- def init(self):
- """
- Create cache file for an existing .be directory.
- File if multiple lines of the form:
- UUID\tPATH
+ def init(self, verbose=True, cache=None):
+ """Create cache file for an existing .be directory.
+
+ The file contains multiple lines of the form::
+
+ UUID\tPATH
"""
- self._cache = {}
+ if cache == None:
+ self._cache = {}
+ else:
+ self._cache = cache
spaced_root = os.path.join(self._root, self._spacer_dirs[0])
for dirpath, dirnames, filenames in os.walk(spaced_root):
if dirpath == spaced_root:
id = self.id(dirpath)
relpath = dirpath[len(self._root)+1:]
if id.count('/') == 0:
- if id in self._cache:
+ if verbose == True and id in self._cache:
print >> sys.stderr, 'Multiple paths for %s: \n %s\n %s' % (id, self._cache[id], relpath)
self._cache[id] = relpath
except InvalidPath:
pass
- self._changed = True
- self.disconnect()
+ if self._cache != cache:
+ self._changed = True
+ if cache == None:
+ self.disconnect()
def destroy(self):
if os.path.exists(self._cache_path):
else:
extra = fields[1:]
if uuid not in self._cache:
- raise InvalidID(uuid)
+ self.init(verbose=False, cache=self._cache)
+ if uuid not in self._cache:
+ raise InvalidID(uuid)
if relpath == True:
return os.path.join(self._cache[uuid], *extra)
return os.path.join(self._root, self._cache[uuid], *extra)
return VCS()
class VCS (libbe.storage.base.VersionedStorage):
- """
- This class implements a 'no-vcs' interface.
+ """Implement a 'no-VCS' interface.
Support for other VCSs can be added by subclassing this class, and
- overriding methods _vcs_*() with code appropriate for your VCS.
+ overriding methods `_vcs_*()` with code appropriate for your VCS.
- The methods _u_*() are utility methods available to the _vcs_*()
+ The methods `_u_*()` are utility methods available to the `_vcs_*()`
methods.
-
- Sink to existing root
- ======================
-
- Consider the following usage case:
- You have a bug directory rooted in
- /path/to/source
- by which I mean the '.be' directory is at
- /path/to/source/.be
- However, you're of in some subdirectory like
- /path/to/source/GUI/testing
- and you want to comment on a bug. Setting sink_to_root=True when
- you initialize your BugDir will cause it to search for the '.be'
- file in the ancestors of the path you passed in as 'root'.
- /path/to/source/GUI/testing/.be miss
- /path/to/source/GUI/.be miss
- /path/to/source/.be hit!
- So it still roots itself appropriately without much work for you.
-
- File-system access
- ==================
-
- BugDirs live completely in memory when .sync_with_disk is False.
- This is the default configuration setup by BugDir(from_disk=False).
- If .sync_with_disk == True (e.g. BugDir(from_disk=True)), then
- any changes to the BugDir will be immediately written to disk.
-
- If you want to change .sync_with_disk, we suggest you use
- .set_sync_with_disk(), which propogates the new setting through to
- all bugs/comments/etc. that have been loaded into memory. If
- you've been living in memory and want to move to
- .sync_with_disk==True, but you're not sure if anything has been
- changed in memory, a call to .save() immediately before the
- .set_sync_with_disk(True) call is a safe move.
-
- Regardless of .sync_with_disk, a call to .save() will write out
- all the contents that the BugDir instance has loaded into memory.
- If sync_with_disk has been True over the course of all interesting
- changes, this .save() call will be a waste of time.
-
- The BugDir will only load information from the file system when it
- loads new settings/bugs/comments that it doesn't already have in
- memory and .sync_with_disk == True.
-
- Allow storage initialization
- ========================
-
- This one is for testing purposes. Setting it to True allows the
- BugDir to search for an installed Storage backend and initialize
- it in the root directory. This is a convenience option for
- supporting tests of versioning functionality
- (e.g. .duplicate_bugdir).
-
- Disable encoding manipulation
- =============================
-
- This one is for testing purposed. You might have non-ASCII
- Unicode in your bugs, comments, files, etc. BugDir instances try
- and support your preferred encoding scheme (e.g. "utf-8") when
- dealing with stream and file input/output. For stream output,
- this involves replacing sys.stdout and sys.stderr
- (libbe.encode.set_IO_stream_encodings). However this messes up
- doctest's output catching. In order to support doctest tests
- using BugDirs, set manipulate_encodings=False, and stick to ASCII
- in your tests.
-
- if root == None:
- root = os.getcwd()
- if sink_to_existing_root == True:
- self.root = self._find_root(root)
- else:
- if not os.path.exists(root):
- self.root = None
- raise NoRootEntry(root)
- self.root = root
- # get a temporary storage until we've loaded settings
- self.sync_with_disk = False
- self.storage = self._guess_storage()
-
- if assert_new_BugDir == True:
- if os.path.exists(self.get_path()):
- raise AlreadyInitialized, self.get_path()
- if storage == None:
- storage = self._guess_storage(allow_storage_init)
- self.storage = storage
- self._setup_user_id(self.user_id)
-
-
- # methods for getting the BugDir situated in the filesystem
-
- def _find_root(self, path):
- '''
- Search for an existing bug database dir and it's ancestors and
- return a BugDir rooted there. Only called by __init__, and
- then only if sink_to_existing_root == True.
- '''
- if not os.path.exists(path):
- self.root = None
- raise NoRootEntry(path)
- versionfile=utility.search_parent_directories(path,
- os.path.join(".be", "version"))
- if versionfile != None:
- beroot = os.path.dirname(versionfile)
- root = os.path.dirname(beroot)
- return root
- else:
- beroot = utility.search_parent_directories(path, ".be")
- if beroot == None:
- self.root = None
- raise NoBugDir(path)
- return beroot
-
- def _guess_storage(self, allow_storage_init=False):
- '''
- Only called by __init__.
- '''
- deepdir = self.get_path()
- if not os.path.exists(deepdir):
- deepdir = os.path.dirname(deepdir)
- new_storage = storage.detect_storage(deepdir)
- install = False
- if new_storage.name == "None":
- if allow_storage_init == True:
- new_storage = storage.installed_storage()
- new_storage.init(self.root)
- return new_storage
-
-os.listdir(self.get_path("bugs")):
"""
name = 'None'
client = 'false' # command-line tool for _u_invoke_client
"""
pass
+ def _vcs_exists(self, path, revision=None):
+ """
+ Does the path exist in a given revision? (True/False)
+ """
+ raise NotImplementedError('Lazy BE developers')
+
def _vcs_remove(self, path):
"""
Remove the file at path from version control. Optionally
def _vcs_path(self, id, revision):
"""
- Return the path to object id as of revision.
+ Return the relative path to object id as of revision.
Revision will not be None.
"""
"""
return None
+ def _vcs_changed(self, revision):
+ """
+ Return a tuple of lists of ids
+ (new, modified, removed)
+ from the specified revision to the current situation.
+ """
+ return ([], [], [])
+
def version(self):
# Cache version string for efficiency.
if not hasattr(self, '_version'):
return self._vcs_detect(path)
def root(self):
- """
- Set the root directory to the path's VCS root. This is the
- default working directory for future invocations.
+ """Set the root directory to the path's VCS root.
+
+ This is the default working directory for future invocations.
+ Consider the following usage case:
+
+ You have a project rooted in::
+
+ /path/to/source/
+
+ by which I mean the VCS repository is in, for example::
+
+ /path/to/source/.bzr
+
+ However, you're of in some subdirectory like::
+
+ /path/to/source/ui/testing
+
+ and you want to comment on a bug. `root` will locate your VCS
+ root (``/path/to/source/``) and set the repo there. This
+ means that it doesn't matter where you are in your project
+ tree when you call "be COMMAND", it always acts as if you called
+ it from the VCS root.
"""
if self._detect(self.repo) == False:
raise VCSUnableToRoot(self)
"""
Begin versioning the tree based at self.repo.
Also roots the vcs at path.
+
+ See Also
+ --------
+ root : called if the VCS has already been initialized.
"""
if not os.path.exists(self.repo) or not os.path.isdir(self.repo):
raise VCSUnableToRoot(self)
self._cached_path_id.connect()
self.check_storage_version()
- def disconnect(self):
+ def _disconnect(self):
self._cached_path_id.disconnect()
+ def path(self, id, revision=None, relpath=True):
+ if revision == None:
+ path = self._cached_path_id.path(id)
+ if relpath == True:
+ return self._u_rel_path(path)
+ return path
+ path = self._vcs_path(id, revision)
+ if relpath == True:
+ return path
+ return os.path.join(self.repo, path)
+
def _add_path(self, path, directory=False):
relpath = self._u_rel_path(path)
reldirs = relpath.split(os.path.sep)
path = self._cached_path_id.add_id(id, parent)
self._add_path(path, **kwargs)
+ def _exists(self, id, revision=None):
+ if revision == None:
+ try:
+ path = self.path(id, revision, relpath=False)
+ except InvalidID, e:
+ return False
+ return os.path.exists(path)
+ path = self.path(id, revision, relpath=True)
+ return self._vcs_exists(relpath, revision)
+
def _remove(self, id):
path = self._cached_path_id.path(id)
if os.path.exists(path):
if p.startswith(path):
self._cached_path_id.remove_id(id)
+ def _ancestors(self, id=None, revision=None):
+ if id==None:
+ path = self.be_dir
+ else:
+ path = self.path(id, revision, relpath=False)
+ ancestors = []
+ while True:
+ if not path.startswith(self.repo + os.path.sep):
+ break
+ path = os.path.dirname(path)
+ try:
+ id = self._u_path_to_id(path)
+ ancestors.append(id)
+ except (SpacerCollision, InvalidPath):
+ pass
+ return ancestors
+
def _children(self, id=None, revision=None):
if revision == None:
- id_to_path = self._cached_path_id.path
isdir = os.path.isdir
listdir = os.listdir
else:
- id_to_path = lambda id : self._vcs_path(id, revision)
- isdir = lambda path : self._vcs_isdir(path, revision)
- listdir = lambda path : self._vcs_listdir(path, revision)
+ isdir = lambda path : self._vcs_isdir(
+ self._u_rel_path(path), revision)
+ listdir = lambda path : self._vcs_listdir(
+ self._u_rel_path(path), revision)
if id==None:
path = self.be_dir
else:
- path = id_to_path(id)
+ path = self.path(id, revision, relpath=False)
if isdir(path) == False:
return []
children = listdir(path)
listdir(os.path.join(path, c))])
elif c in ['id-cache', 'version']:
children[i] = None
+ elif self.interspersed_vcs_files \
+ and self._vcs_is_versioned(c) == False:
+ children[i] = None
for i,c in enumerate(children):
if c == None: continue
cpath = os.path.join(path, c)
def _get(self, id, default=libbe.util.InvalidObject, revision=None):
try:
- path = self._cached_path_id.path(id)
+ relpath = self.path(id, revision, relpath=True)
+ contents = self._vcs_get_file_contents(relpath, revision)
except InvalidID, e:
if default == libbe.util.InvalidObject:
raise e
return default
- relpath = self._u_rel_path(path)
- try:
- contents = self._vcs_get_file_contents(relpath, revision)
- except InvalidID, e:
- if InvalidID == None:
- e.id = InvalidID
- raise
if contents in [libbe.storage.base.InvalidDirectory,
- libbe.util.InvalidObject]:
- raise InvalidID(id)
- elif len(contents) == 0:
- return None
+ libbe.util.InvalidObject] \
+ or len(contents) == 0:
+ if default == libbe.util.InvalidObject:
+ raise InvalidID(id, revision)
+ return default
return contents
def _set(self, id, value):
try:
path = self._cached_path_id.path(id)
except InvalidID, e:
- raise e
+ raise
if not os.path.exists(path):
raise InvalidID(id)
if os.path.isdir(path):
raise libbe.storage.base.InvalidRevision(index)
return revid
+ def changed(self, revision):
+ new,mod,rem = self._vcs_changed(revision)
+ def paths_to_ids(paths):
+ for p in paths:
+ try:
+ id = self._u_path_to_id(p)
+ yield id
+ except (SpacerCollision, InvalidPath):
+ pass
+ new_id = list(paths_to_ids(new))
+ mod_id = list(paths_to_ids(mod))
+ rem_id = list(paths_to_ids(rem))
+ return (new_id, mod_id, rem_id)
+
def _u_any_in_string(self, list, string):
- """
- Return True if any of the strings in list are in string.
+ """Return True if any of the strings in list are in string.
Otherwise return False.
"""
for list_string in list:
return self._u_invoke(cl_args, **kwargs)
def _u_search_parent_directories(self, path, filename):
- """
- Find the file (or directory) named filename in path or in any
- of path's parents.
+ """Find the file (or directory) named filename in path or in any of
+ path's parents.
e.g.
search_parent_directories("/a/b/c", ".be")
/.be
or None if none of those files exist.
"""
- return search_parent_directories(path, filename)
+ try:
+ ret = search_parent_directories(path, filename)
+ except AssertionError, e:
+ return None
+ return ret
+
+ def _u_find_id_from_manifest(self, id, manifest, revision=None):
+ """Search for the relative path to id using manifest, a list of all
+ files.
+
+ Returns None if the id is not found.
+ """
+ be_dir = self._cached_path_id._spacer_dirs[0]
+ be_dir_sep = self._cached_path_id._spacer_dirs[0] + os.path.sep
+ files = [f for f in manifest if f.startswith(be_dir_sep)]
+ for file in files:
+ if not file.startswith(be_dir+os.path.sep):
+ continue
+ parts = file.split(os.path.sep)
+ dir = parts.pop(0) # don't add the first spacer dir
+ for part in parts[:-1]:
+ dir = os.path.join(dir, part)
+ if not dir in files:
+ files.append(dir)
+ for file in files:
+ try:
+ p_id = self._u_path_to_id(file)
+ if p_id == id:
+ return file
+ except (SpacerCollision, InvalidPath):
+ pass
+ raise InvalidID(id, revision=revision)
def _u_find_id(self, id, revision):
- """
- Search for the relative path to id as of revision.
+ """Search for the relative path to id as of revision.
+
Returns None if the id is not found.
"""
assert self._rooted == True
return self._cached_path_id.id(path)
def _u_rel_path(self, path, root=None):
- """
- Return the relative path to path from root.
+ """Return the relative path to path from root.
+
+ Examples:
+
>>> vcs = new()
>>> vcs._u_rel_path("/a.b/c/.be", "/a.b/c")
'.be'
return relpath
def _u_abspath(self, path, root=None):
- """
- Return the absolute path from a path realtive to root.
+ """Return the absolute path from a path realtive to root.
+
+ Examples
+ --------
+
>>> vcs = new()
>>> vcs._u_abspath(".be", "/a.b/c")
'/a.b/c/.be'
return os.path.abspath(os.path.join(root, path))
def _u_parse_commitfile(self, commitfile):
- """
- Split the commitfile created in self.commit() back into
- summary and header lines.
+ """Split the commitfile created in self.commit() back into summary and
+ header lines.
"""
f = codecs.open(commitfile, 'r', self.encoding)
summary = f.readline()
upgrade.upgrade(self.repo, version)
def storage_version(self, revision=None, path=None):
- """
- Requires disk access.
+ """Return the storage version of the on-disk files.
+
+ See Also
+ --------
+ :mod:`libbe.storage.util.upgrade`
"""
if path == None:
path = os.path.join(self.repo, '.be', 'version')
if revision == None: # don't require connection
return libbe.util.encoding.get_file_contents(
path, decode=True).rstrip('\n')
- contents = self._vcs_get_file_contents(path, revision=revision)
+ relpath = self._u_rel_path(path)
+ contents = self._vcs_get_file_contents(relpath, revision=revision)
if type(contents) != types.UnicodeType:
contents = unicode(contents, self.encoding)
return contents.strip()
class VCS_installed_TestCase (VCSTestCase):
def test_installed(self):
- """
- See if the VCS is installed.
+ """See if the VCS is installed.
"""
self.failUnless(self.s.installed() == True,
'%(name)s VCS not found' % vars(self.Class))
class VCS_detection_TestCase (VCSTestCase):
def test_detection(self):
- """
- See if the VCS detects its installed repository
+ """See if the VCS detects its installed repository
"""
if self.s.installed():
self.s.disconnect()
self.s.connect()
def test_no_detection(self):
- """
- See if the VCS detects its installed repository
+ """See if the VCS detects its installed repository
"""
if self.s.installed() and self.Class.name != 'None':
self.s.disconnect()