import copy
import os
import pickle
+import types
from libbe.error import NotSupported
from libbe.util.tree import Tree
class InvalidRevision (KeyError):
pass
+class NotWriteable (NotSupported):
+ def __init__(self, msg):
+ NotSupported.__init__('write', msg)
+
+class NotReadable (NotSupported):
+ def __init__(self, msg):
+ NotSupported.__init__('read', msg)
+
class EmptyCommit(Exception):
def __init__(self):
Exception.__init__(self, 'No changes to commit')
"""
name = 'Storage'
- def __init__(self, repo, options=None):
+ def __init__(self, repo, encoding='utf-8', options=None):
self.repo = repo
+ self.encoding = encoding
self.options = options
- self.read_only = False
+ self.readable = True # soft limit (user choice)
+ self._readable = True # hard limit (backend choice)
+ self.writeable = True # soft limit (user choice)
+ self._writeable = True # hard limit (backend choice)
self.versioned = False
self.can_init = True
"""Return a version string for this backend."""
return '0'
+ def is_readable(self):
+ return self.readable and self._readable
+
+ def is_writeable(self):
+ return self.writeable and self._writeable
+
def init(self):
"""Create a new storage repository."""
if self.can_init == False:
raise NotSupported('init',
'Cannot initialize this repository format.')
- if self.read_only == True:
- raise NotSupported('init', 'Cannot initialize read only storage.')
+ if self.is_writeable() == False:
+ raise NotWriteable('Cannot initialize unwriteable storage.')
return self._init()
def _init(self):
def destroy(self):
"""Remove the storage repository."""
- if self.read_only == True:
- raise NotSupported('destroy', 'Cannot destroy read only storage.')
+ if self.is_writeable() == False:
+ raise NotWriteable('Cannot destroy unwriteable storage.')
return self._destroy()
def _destroy(self):
def connect(self):
"""Open a connection to the repository."""
+ if self.is_readable() == False:
+ raise NotReadable('Cannot connect to unreadable storage.')
+ self._connect()
+
+ def _connect(self):
try:
f = open(self.repo, 'rb')
except IOError:
def disconnect(self):
"""Close the connection to the repository."""
- if self.read_only == True:
+ if self.is_writeable() == False:
return
f = open(self.repo, 'wb')
pickle.dump(dict((k,v._objects_to_ids())
def add(self, *args, **kwargs):
"""Add an entry"""
- if self.read_only == True:
- raise NotSupported('add', 'Cannot add entry to read only storage.')
+ if self.is_writeable() == False:
+ raise NotWriteable('Cannot add entry to unwriteable storage.')
self._add(*args, **kwargs)
def _add(self, id, parent=None):
def remove(self, *args, **kwargs):
"""Remove an entry."""
- if self.read_only == True:
- raise NotSupported('remove',
- 'Cannot remove entry from read only storage.')
+ if self.is_writeable() == False:
+ raise NotSupported('write',
+ 'Cannot remove entry from unwriteable storage.')
self._remove(*args, **kwargs)
def _remove(self, id):
def recursive_remove(self, *args, **kwargs):
"""Remove an entry and all its decendents."""
- if self.read_only == True:
- raise NotSupported('recursive_remove',
- 'Cannot remove entries from read only storage.')
+ if self.is_writeable() == False:
+ raise NotSupported('write',
+ 'Cannot remove entries from unwriteable storage.')
self._recursive_remove(*args, **kwargs)
def _recursive_remove(self, id):
for entry in self._data[id].traverse():
self._remove(entry.id)
- def children(self, id=None, revision=None):
+ def children(self, *args, **kwargs):
"""Return a list of specified entry's children's ids."""
+ if self.is_readable() == False:
+ raise NotReadable('Cannot list children with unreadable storage.')
+ return self._children(*args, **kwargs)
+
+ def _children(self, id=None, revision=None):
if id == None:
id = '__ROOT__'
return [c.id for c in self._data[id] if not c.id.startswith('__')]
- def get(self, id, default=InvalidObject, revision=None):
+ def get(self, *args, **kwargs):
"""
Get contents of and entry as they were in a given revision.
revision==None specifies the current revision.
If there is no id, return default, unless default is not
given, in which case raise InvalidID.
"""
+ if self.is_readable() == False:
+ raise NotReadable('Cannot get entry with unreadable storage.')
+ if 'decode' in kwargs:
+ decode = kwargs.pop('decode')
+ else:
+ decode = False
+ value = self._get(*args, **kwargs)
+ if decode == True:
+ return unicode(value, self.encoding)
+ return value
+
+ def _get(self, id, default=InvalidObject, revision=None):
if id in self._data:
return self._data[id].value
elif default == InvalidObject:
raise InvalidID(id)
return default
- def set(self, *args, **kwargs):
+ def set(self, id, value, *args, **kwargs):
"""
Set the entry contents.
"""
- if self.read_only == True:
- raise NotSupported('set', 'Cannot set entry in read only storage.')
- self._set(*args, **kwargs)
+ if self.is_writeable() == False:
+ raise NotWriteable('Cannot set entry in unwriteable storage.')
+ if type(value) == types.UnicodeType:
+ value = value.encode(self.encoding)
+ self._set(id, value, *args, **kwargs)
def _set(self, id, value):
if id not in self._data:
pickle.dump([d, copy.deepcopy(d)], f, -1) # [inital tree, working tree]
f.close()
- def connect(self):
- """Open a connection to the repository."""
+ def _connect(self):
try:
f = open(self.repo, 'rb')
except IOError:
def disconnect(self):
"""Close the connection to the repository."""
- if self.read_only == True:
+ if self.is_writeable() == False:
return
f = open(self.repo, 'wb')
pickle.dump([dict((k,v._objects_to_ids())
for entry in self._data[-1][id].traverse():
self._remove(entry.id)
- def children(self, id=None, revision=None):
- """Return a list of specified entry's children's ids."""
+ def _children(self, id=None, revision=None):
if id == None:
id = '__ROOT__'
if revision == None:
return [c.id for c in self._data[revision][id]
if not c.id.startswith('__')]
- def get(self, id, default=InvalidObject, revision=None):
- """
- Get contents of and entry as they were in a given revision.
- revision==None specifies the current revision.
-
- If there is no id, return default, unless default is not
- given, in which case raise InvalidID.
- """
+ def _get(self, id, default=InvalidObject, revision=None):
if revision == None:
revision = -1
if id in self._data[revision]:
If allow_empty == False (the default), raise EmptyCommit if
there are no changes to commit.
"""
- if self.read_only == True:
- raise NotSupported('commit', 'Cannot commit to read only storage.')
+ if self.is_writeable() == False:
+ raise NotWriteable('Cannot commit to unwriteable storage.')
return self._commit(*args, **kwargs)
def _commit(self, summary, body=None, allow_empty=False):
"%s.get() returned %s not %s"
% (vars(self.Class)['name'], ret, self.val))
+ def test_unicode_set(self):
+ """
+ Set should define the value returned by get.
+ """
+ val = u'Fran\xe7ois'
+ self.s.add(self.id)
+ self.s.set(self.id, val)
+ ret = self.s.get(self.id, decode=True)
+ self.failUnless(type(ret) == types.UnicodeType,
+ "%s.get() returned %s not UnicodeType"
+ % (vars(self.Class)['name'], type(ret)))
+ self.failUnless(ret == val,
+ "%s.get() returned %s not %s"
+ % (vars(self.Class)['name'], ret, self.val))
+ ret = self.s.get(self.id)
+ self.failUnless(type(ret) == types.StringType,
+ "%s.get() returned %s not StringType"
+ % (vars(self.Class)['name'], type(ret)))
+ s = unicode(ret, self.s.encoding)
+ self.failUnless(s == val,
+ "%s.get() returned %s not %s"
+ % (vars(self.Class)['name'], s, self.val))
+
+
class Storage_persistence_TestCase (StorageTestCase):
"""Test cases for Storage.disconnect and .connect methods."""