self.vcs = vcs
class InvalidPath (InvalidID):
- def __init__(self, path, root, msg=None):
+ def __init__(self, path, root, msg=None, **kwargs):
if msg == None:
msg = 'Path "%s" not in root "%s"' % (path, root)
- InvalidID.__init__(self, msg)
+ InvalidID.__init__(self, msg=msg, **kwargs)
self.path = path
self.root = root
self._changed = True
def id(self, path):
- path = os.path.abspath(path)
+ path = os.path.join(self._root, path)
if not path.startswith(self._root + os.path.sep):
- raise InvalidPath('Path %s not in root %s' % (path, self._root))
+ raise InvalidPath(path, self._root)
path = path[len(self._root)+1:]
orig_path = path
if not path.startswith(self._spacer_dirs[0] + os.path.sep):
f.close()
return contents
+ def _vcs_path(self, id, revision):
+ """
+ Return the path to object id as of revision.
+
+ Revision will not be None.
+ """
+ raise NotImplementedError
+
+ def _vcs_isdir(self, path, revision):
+ """
+ Return True if path (as returned by _vcs_path) was a directory
+ as of revision, False otherwise.
+
+ Revision will not be None.
+ """
+ raise NotImplementedError
+
+ def _vcs_listdir(self, path, revision):
+ """
+ Return a list of the contents of the directory path (as
+ returned by _vcs_path) as of revision.
+
+ Revision will not be None, and ._vcs_isdir(path, revision)
+ will be True.
+ """
+ raise NotImplementedError
+
def _vcs_commit(self, commitfile, allow_empty=False):
"""
Commit the current working directory, using the contents of
self._cached_path_id.remove_id(id)
def _children(self, id=None, revision=None):
+ if revision == None:
+ id_to_path = self._cached_path_id.path
+ path_to_id = self._cached_path_id.id
+ isdir = os.path.isdir
+ listdir = os.listdir
+ else:
+ id_to_path = lambda id : self._vcs_path(id, revision)
+ path_to_id = self._cached_path_id.id
+ isdir = lambda path : self._vcs_isdir(path, revision)
+ listdir = lambda path : self._vcs_listdir(path, revision)
if id==None:
path = self.be_dir
else:
- path = self._cached_path_id.path(id)
- if os.path.isdir(path) == False:
+ path = id_to_path(id)
+ if isdir(path) == False:
return []
- children = os.listdir(path)
+ children = listdir(path)
for i,c in enumerate(children):
if c in self._cached_path_id._spacer_dirs:
children[i] = None
children.extend([os.path.join(c, c2) for c2 in
- os.listdir(os.path.join(path, c))])
+ listdir(os.path.join(path, c))])
elif c in ['id-cache', 'version']:
children[i] = None
for i,c in enumerate(children):
if c == None: continue
cpath = os.path.join(path, c)
if self.interspersed_vcs_files == True \
+ and revision != None \
and self._vcs_is_versioned(cpath) == False:
children[i] = None
else:
- children[i] = self._cached_path_id.id(cpath)
+ children[i] = path_to_id(cpath)
+ children[i]
return [c for c in children if c != None]
def _get(self, id, default=libbe.util.InvalidObject, revision=None):
try:
contents = self._vcs_get_file_contents(relpath,revision)
except InvalidID, e:
- raise InvalidID(id)
+ raise InvalidPath(path=path, root=self.repo, id=id)
if contents in [libbe.storage.base.InvalidDirectory,
libbe.util.InvalidObject]:
raise InvalidID(id)
"""
return search_parent_directories(path, filename)
+ def _u_find_id(self, id, revision):
+ """
+ Search for the relative path to id as of revision.
+ Returns None if the id is not found.
+ """
+ assert self._rooted == True
+ be_dir = self._cached_path_id._spacer_dirs[0]
+ stack = [(be_dir, be_dir)]
+ while len(stack) > 0:
+ path,long_id = stack.pop()
+ if long_id.endswith('/'+id):
+ return path
+ if self._vcs_isdir(path, revision) == False:
+ continue
+ for child in self._vcs_listdir(path, revision):
+ stack.append((os.path.join(path, child),
+ '/'.join([long_id, child])))
+ return None
+
def _u_rel_path(self, path, root=None):
"""
Return the relative path to path from root.
cmd.run(filename=path, revision=revision)
except bzrlib.errors.BzrCommandError, e:
if 'not present in revision' in str(e):
- raise base.InvalidID(path)
+ raise base.InvalidID(path, revision)
raise
return cmd.outf.getvalue()
+ def _vcs_path(self, id, revision):
+ return self._u_find_id(id, revision)
+
+ def _vcs_isdir(self, path, revision):
+ try:
+ self._vcs_listdir(path, revision)
+ except AttributeError, e:
+ if 'children' in str(e):
+ return False
+ raise
+ return True
+
+ def _vcs_listdir(self, path, revision):
+ path = os.path.join(self.repo, path)
+ revision = self._parse_revision_string(revision)
+ cmd = bzrlib.builtins.cmd_ls()
+ cmd.outf = StringIO.StringIO()
+ try:
+ cmd.run(revision=revision, path=path)
+ except bzrlib.errors.BzrCommandError, e:
+ if 'not present in revision' in str(e):
+ raise base.InvalidID(path, revision)
+ raise
+ children = cmd.outf.getvalue().rstrip('\n').splitlines()
+ children = [self._u_rel_path(c, path) for c in children]
+ return children
+
def _vcs_commit(self, commitfile, allow_empty=False):
cmd = bzrlib.builtins.cmd_commit()
cmd.outf = StringIO.StringIO()