import libbe.storage
import libbe.storage.base
import libbe.util.encoding
-from libbe.storage.base import EmptyCommit, InvalidRevision
+from libbe.storage.base import EmptyCommit, InvalidRevision, InvalidID
from libbe.util.utility import Dir, search_parent_directories
from libbe.util.subproc import CommandError, invoke
from libbe.util.plugin import import_by_name
libbe.storage.base.ConnectionError.__init__(self, msg)
self.vcs = vcs
-class InvalidPath (libbe.storage.base.InvalidID):
+class InvalidPath (InvalidID):
def __init__(self, path, root, msg=None):
if msg == None:
msg = 'Path "%s" not in root "%s"' % (path, root)
- libbe.storage.base.InvalidID.__init__(self, msg)
+ InvalidID.__init__(self, msg)
self.path = path
self.root = root
InvalidPath.__init__(self, path, root=None, msg=msg)
self.spacer = spacer
-class NoSuchFile (libbe.storage.base.InvalidID):
+class NoSuchFile (InvalidID):
def __init__(self, pathname, root='.'):
path = os.path.abspath(os.path.join(root, pathname))
- libbe.storage.base.InvalidID.__init__(self, 'No such file: %s' % path)
+ InvalidID.__init__(self, 'No such file: %s' % path)
class CachedPathID (object):
relpath = dirpath[len(self._root)+1:]
if id.count('/') == 0:
if id in self._cache:
- import sys
print >> sys.stderr, 'Multiple paths for %s: \n %s\n %s' % (id, self._cache[id], relpath)
self._cache[id] = relpath
except InvalidPath:
else:
extra = fields[1:]
if uuid not in self._cache:
- raise libbe.storage.base.InvalidID(uuid)
+ raise InvalidID(uuid)
if relpath == True:
return os.path.join(self._cache[uuid], *extra)
return os.path.join(self._root, self._cache[uuid], *extra)
self.root()
os.mkdir(self.be_dir)
self._vcs_add(self._u_rel_path(self.be_dir))
+ self._setup_storage_version()
self._cached_path_id.init()
def _destroy(self):
children[i] = None
children.extend([os.path.join(c, c2) for c2 in
os.listdir(os.path.join(path, c))])
- elif c == 'id-cache':
+ elif c in ['id-cache', 'version']:
children[i] = None
for i,c in enumerate(children):
if c == None: continue
def _get(self, id, default=libbe.util.InvalidObject, revision=None):
try:
path = self._cached_path_id.path(id)
- except libbe.storage.base.InvalidID, e:
+ except InvalidID, e:
if default == libbe.util.InvalidObject:
raise e
return default
relpath = self._u_rel_path(path)
- contents = self._vcs_get_file_contents(relpath,revision)
+ try:
+ contents = self._vcs_get_file_contents(relpath,revision)
+ except InvalidID, e:
+ raise InvalidID(id)
if contents in [libbe.storage.base.InvalidDirectory,
libbe.util.InvalidObject]:
- raise libbe.storage.base.InvalidID(id)
+ raise InvalidID(id)
elif len(contents) == 0:
return None
return contents
def _set(self, id, value):
try:
path = self._cached_path_id.path(id)
- except libbe.storage.base.InvalidID, e:
+ except InvalidID, e:
raise e
if not os.path.exists(path):
- raise libbe.storage.base.InvalidID(id)
+ raise InvalidID(id)
if os.path.isdir(path):
raise libbe.storage.base.InvalidDirectory(id)
f = open(path, "wb")
"""
if path == None:
path = os.path.join(self.repo, '.be', 'version')
+ if not os.path.exists(path):
+ raise libbe.storage.InvalidStorageVersion(None)
if revision == None: # don't require connection
return libbe.util.encoding.get_file_contents(
path, decode=True).rstrip('\n')
contents = unicode(contents, self.encoding)
return contents.strip()
+ def _setup_storage_version(self):
+ """
+ Requires disk access.
+ """
+ assert self._rooted == True
+ path = os.path.join(self.be_dir, 'version')
+ if not os.path.exists(path):
+ libbe.util.encoding.set_file_contents(path,
+ libbe.storage.STORAGE_VERSION+'\n')
+ self._vcs_add(self._u_rel_path(path))
+
\f
if libbe.TESTING == True:
class VCSTestCase (unittest.TestCase):