self.shortname = shortname
self.matches = matches
+class DiskAccessRequired (Exception):
+ def __init__(self, goal):
+ msg = "Cannot %s without accessing the disk" % goal
+ Exception.__init__(self, msg)
+
TREE_VERSION_STRING = "Bugs Everywhere Tree 1 0\n"
all bugs/comments/etc. that have been loaded into memory. If
you've been living in memory and want to move to
.sync_with_disk==True, but you're not sure if anything has been
- changed in memory, a call to save() is a safe move.
+ changed in memory, a call to save() immediately before the
+ .set_sync_with_disk(True) call is a safe move.
Regardless of .sync_with_disk, a call to .save() will write out
all the contents that the BugDir instance has loaded into memory.
changes, this .save() call will be a waste of time.
The BugDir will only load information from the file system when it
- loads new bugs/comments that it doesn't already have in memory, or
- when it explicitly asked to do so (e.g. .load() or
- __init__(from_disk=True)). If you want BugDir to live entirely in
- memory (ignoring even explicit calls to .load(), .save(), etc.),
- set in_memory = True. The in_memory option is ignored if
- from_disk==True.
+ loads new settings/bugs/comments that it doesn't already have in
+ memory and .sync_with_disk == True.
Allow RCS initialization
========================
def __init__(self, root=None, sink_to_existing_root=True,
assert_new_BugDir=False, allow_rcs_init=False,
- manipulate_encodings=True,
- from_disk=False, in_memory=False, rcs=None):
+ manipulate_encodings=True, from_disk=False, rcs=None):
list.__init__(self)
settings_object.SavedSettingsObject.__init__(self)
self._manipulate_encodings = manipulate_encodings
- self._in_memory = in_memory
if root == None:
root = os.getcwd()
if sink_to_existing_root == True:
if from_disk == True:
self.sync_with_disk = True
- self._in_memory = False
self.load()
else:
self.sync_with_disk = False
self.rcs = rcs
self._setup_user_id(self.user_id)
- def set_sync_with_disk(self, value):
- self.sync_with_disk = value
- for bug in self:
- bug.set_sync_with_disk(value)
+ # methods for getting the BugDir situated in the filesystem
def _find_root(self, path):
"""
Search for an existing bug database dir and it's ancestors and
- return a BugDir rooted there.
+ return a BugDir rooted there. Only called by __init__, and
+ then only if sink_to_existing_root == True.
"""
if not os.path.exists(path):
raise NoRootEntry(path)
raise NoBugDir(path)
return beroot
+ def _guess_rcs(self, allow_rcs_init=False):
+ """
+ Only called by __init__.
+ """
+ deepdir = self.get_path()
+ if not os.path.exists(deepdir):
+ deepdir = os.path.dirname(deepdir)
+ new_rcs = rcs.detect_rcs(deepdir)
+ install = False
+ if new_rcs.name == "None":
+ if allow_rcs_init == True:
+ new_rcs = rcs.installed_rcs()
+ new_rcs.init(self.root)
+ return new_rcs
+
+ def get_path(self, *args):
+ """
+ Return a path relative to .root.
+ """
+ my_dir = os.path.join(self.root, ".be")
+ if len(args) == 0:
+ return my_dir
+ assert args[0] in ["version", "settings", "bugs"], str(args)
+ return os.path.join(my_dir, *args)
+
+ # methods for saving/loading/accessing settings and properties.
+
+ def _get_settings(self, settings_path, for_duplicate_bugdir=False):
+ allow_no_rcs = not self.rcs.path_in_root(settings_path)
+ if allow_no_rcs == True:
+ assert for_duplicate_bugdir == True
+ if self.sync_with_disk == False and for_duplicate_bugdir == False:
+ # duplicates can ignore this bugdir's .sync_with_disk status
+ raise DiskAccessRequired("_get settings")
+ try:
+ settings = mapfile.map_load(self.rcs, settings_path, allow_no_rcs)
+ except rcs.NoSuchFile:
+ settings = {"rcs_name": "None"}
+ return settings
+
+ def _save_settings(self, settings_path, settings,
+ for_duplicate_bugdir=False):
+ allow_no_rcs = not self.rcs.path_in_root(settings_path)
+ if allow_no_rcs == True:
+ assert for_duplicate_bugdir == True
+ if self.sync_with_disk == False and for_duplicate_bugdir == False:
+ # duplicates can ignore this bugdir's .sync_with_disk status
+ raise DiskAccessRequired("_save settings")
+ self.rcs.mkdir(self.get_path(), allow_no_rcs)
+ mapfile.map_save(self.rcs, settings_path, settings, allow_no_rcs)
+
+ def load_settings(self):
+ self.settings = self._get_settings(self.get_path("settings"))
+ self._setup_saved_settings()
+ self._setup_user_id(self.user_id)
+ self._setup_encoding(self.encoding)
+ self._setup_severities(self.severities)
+ self._setup_status(self.active_status, self.inactive_status)
+ self.rcs = rcs.rcs_by_name(self.rcs_name)
+ self._setup_user_id(self.user_id)
+
+ def save_settings(self):
+ settings = self._get_saved_settings()
+ self._save_settings(self.get_path("settings"), settings)
+
def get_version(self, path=None, use_none_rcs=False):
- if self._in_memory == True:
- return TREE_VERSION_STRING
+ """
+ Requires disk access.
+ """
+ if self.sync_with_disk == False:
+ raise DiskAccessRequired("get version")
if use_none_rcs == True:
RCS = rcs.rcs_by_name("None")
RCS.root(self.root)
return tree_version
def set_version(self):
- if self._in_memory == True:
- return
+ """
+ Requires disk access.
+ """
+ if self.sync_with_disk == False:
+ raise DiskAccessRequired("set version")
self.rcs.mkdir(self.get_path())
self.rcs.set_file_contents(self.get_path("version"),
TREE_VERSION_STRING)
- def get_path(self, *args):
- my_dir = os.path.join(self.root, ".be")
- if len(args) == 0:
- return my_dir
- assert args[0] in ["version", "settings", "bugs"], str(args)
- return os.path.join(my_dir, *args)
+ # methods controlling disk access
- def _guess_rcs(self, allow_rcs_init=False):
- deepdir = self.get_path()
- if not os.path.exists(deepdir):
- deepdir = os.path.dirname(deepdir)
- new_rcs = rcs.detect_rcs(deepdir)
- install = False
- if new_rcs.name == "None":
- if allow_rcs_init == True:
- new_rcs = rcs.installed_rcs()
- new_rcs.init(self.root)
- return new_rcs
+ def set_sync_with_disk(self, value):
+ """
+ Adjust .sync_with_disk for the BugDir and all it's children.
+ See the BugDir docstring for a description of the role of
+ .sync_with_disk.
+ """
+ self.sync_with_disk = value
+ for bug in self:
+ bug.set_sync_with_disk(value)
def load(self):
+ """
+ Reqires disk access
+ """
version = self.get_version(use_none_rcs=True)
if version != TREE_VERSION_STRING:
raise NotImplementedError, \
"BugDir cannot handle version '%s' yet." % version
else:
- if not os.path.exists(self.get_path()) and self._in_memory == False:
+ if not os.path.exists(self.get_path()):
raise NoBugDir(self.get_path())
self.load_settings()
- self.rcs = rcs.rcs_by_name(self.rcs_name)
- self._setup_user_id(self.user_id)
-
def load_all_bugs(self):
- "Warning: this could take a while."
- if self._in_memory == True:
- return
+ """
+ Requires disk access.
+ Warning: this could take a while.
+ """
+ if self.sync_with_disk == False:
+ raise DiskAccessRequired("load all bugs")
self._clear_bugs()
for uuid in self.list_uuids():
self._load_bug(uuid)
def save(self):
"""
+ Note that this command writes to disk _regardless_ of the
+ status of .sync_with_disk.
+
Save any loaded contents to disk. Because of lazy loading of
bugs and comments, this is actually not too inefficient.
- However, if self.sync_with_disk = True, then any changes are
+ However, if .sync_with_disk = True, then any changes are
automatically written to disk as soon as they happen, so
calling this method will just waste time (unless something
else has been messing with your on-disk files).
+
+ Requires disk access.
"""
+ sync_with_disk = self.sync_with_disk
+ if sync_with_disk == False:
+ self.set_sync_with_disk(True)
self.set_version()
self.save_settings()
for bug in self:
- if self._in_memory == True:
- return
bug.save()
+ if sync_with_disk == False:
+ self.set_sync_with_disk(sync_with_disk)
- def load_settings(self):
- self.settings = self._get_settings(self.get_path("settings"))
- self._setup_saved_settings()
- self._setup_user_id(self.user_id)
- self._setup_encoding(self.encoding)
- self._setup_severities(self.severities)
- self._setup_status(self.active_status, self.inactive_status)
-
- def _get_settings(self, settings_path):
- if self._in_memory == True:
- return {}
- allow_no_rcs = not self.rcs.path_in_root(settings_path)
- # allow_no_rcs=True should only be for the special case of
- # configuring duplicate bugdir settings
-
- try:
- settings = mapfile.map_load(self.rcs, settings_path, allow_no_rcs)
- except rcs.NoSuchFile:
- settings = {"rcs_name": "None"}
- return settings
-
- def save_settings(self):
- settings = self._get_saved_settings()
- self._save_settings(self.get_path("settings"), settings)
-
- def _save_settings(self, settings_path, settings):
- if self._in_memory == True:
- return
- allow_no_rcs = not self.rcs.path_in_root(settings_path)
- # allow_no_rcs=True should only be for the special case of
- # configuring duplicate bugdir settings
- self.rcs.mkdir(self.get_path(), allow_no_rcs)
- mapfile.map_save(self.rcs, settings_path, settings, allow_no_rcs)
+ # methods for managing duplicate BugDirs
def duplicate_bugdir(self, revision):
duplicate_path = self.rcs.duplicate_repo(revision)
# initialized for versioning
duplicate_settings_path = os.path.join(duplicate_path,
".be", "settings")
- duplicate_settings = self._get_settings(duplicate_settings_path)
+ duplicate_settings = self._get_settings(duplicate_settings_path,
+ for_duplicate_bugdir=True)
if "rcs_name" in duplicate_settings:
duplicate_settings["rcs_name"] = "None"
duplicate_settings["user_id"] = self.user_id
if "disabled" in bug.status_values:
# Hack to support old versions of BE bugs
duplicate_settings["inactive_status"] = self.inactive_status
- self._save_settings(duplicate_settings_path, duplicate_settings)
+ self._save_settings(duplicate_settings_path, duplicate_settings,
+ for_duplicate_bugdir=True)
return BugDir(duplicate_path, from_disk=True, manipulate_encodings=self._manipulate_encodings)
def remove_duplicate_bugdir(self):
self.rcs.remove_duplicate_repo()
+ # methods for managing bugs
+
def list_uuids(self):
uuids = []
- if self._in_memory == False and os.path.exists(self.get_path()):
+ if self.sync_with_disk == True and os.path.exists(self.get_path()):
# list the uuids on disk
for uuid in os.listdir(self.get_path("bugs")):
if not (uuid.startswith('.')):
self._bug_map_gen()
def _load_bug(self, uuid):
+ if self.sync_with_disk == False:
+ raise DiskAccessRequired("_load bug")
bg = bug.Bug(bugdir=self, uuid=uuid, from_disk=True)
self.append(bg)
self._bug_map_gen()
def bug_from_shortname(self, shortname):
"""
- >>> bd = simple_bug_dir()
+ >>> bd = simple_bug_dir(sync_with_disk=False)
>>> bug_a = bd.bug_from_shortname('a')
>>> print type(bug_a)
<class 'libbe.bug.Bug'>
return True
-def simple_bug_dir(on_disk=True):
+def simple_bug_dir(sync_with_disk=True):
"""
- For testing. Set on_disk==False for a memory-only bugdir.
+ For testing. Set sync_with_disk==False for a memory-only bugdir.
>>> bugdir = simple_bug_dir()
>>> uuids = list(bugdir.list_uuids())
>>> uuids.sort()
>>> print uuids
['a', 'b']
"""
- if on_disk == True:
+ if sync_with_disk == True:
dir = utility.Dir()
assert os.path.exists(dir.path)
root = dir.path
- in_memory = False
+ assert_new_BugDir = True
rcs_init = True
else:
root = "/"
- in_memory = True
+ assert_new_BugDir = False
rcs_init = False
bugdir = BugDir(root, sink_to_existing_root=False,
- in_memory=in_memory,
+ assert_new_BugDir=assert_new_BugDir,
allow_rcs_init=rcs_init,
manipulate_encodings=False)
- if on_disk == True: # postpone cleanup since dir.__del__() removes dir.
+ if sync_with_disk == True: # postpone cleanup since dir.__del__() removes dir.
bugdir._dir_ref = dir
bug_a = bugdir.new_bug("a", summary="Bug A")
bug_a.creator = "John Doe <jdoe@example.com>"
bug_b.creator = "Jane Doe <jdoe@example.com>"
bug_b.time = 0
bug_b.status = "closed"
- if on_disk == True:
+ if sync_with_disk == True:
bugdir.save()
+ bugdir.set_sync_with_disk(True)
return bugdir
rep.new_reply("And they have six legs.")
if sync_with_disk == False:
self.bugdir.save()
+ self.bugdir.set_sync_with_disk(True)
self.bugdir._clear_bugs()
bug = self.bugdir.bug_from_uuid("a")
bug.load_comments()
+ if sync_with_disk == False:
+ self.bugdir.set_sync_with_disk(False)
self.failUnless(len(bug.comment_root)==1, len(bug.comment_root))
for index,comment in enumerate(bug.comments()):
if index == 0:
self.failUnless(repLoaded.uuid == rep.uuid, repLoaded.uuid)
self.failUnless(comment.sync_with_disk == True,
comment.sync_with_disk)
- #load_settings()
self.failUnless(comment.content_type == "text/plain",
comment.content_type)
self.failUnless(repLoaded.settings["Content-type"]=="text/plain",
def tearDown(self):
self.dir.cleanup()
def testOnDiskCleanLoad(self):
- """simple_bug_dir(on_disk==True) should not import preexisting bugs."""
- bugdir = simple_bug_dir(on_disk=True)
- self.failUnless(bugdir._in_memory == False, bugdir._in_memory)
+ """simple_bug_dir(sync_with_disk==True) should not import preexisting bugs."""
+ bugdir = simple_bug_dir(sync_with_disk=True)
uuids = sorted([bug.uuid for bug in bugdir])
self.failUnless(uuids == ['a', 'b'], uuids)
bugdir._clear_bugs()
uuids = sorted([bug.uuid for bug in bugdir])
self.failUnless(uuids == ['a', 'b'], uuids)
def testInMemoryCleanLoad(self):
- """simple_bug_dir(on_disk==False) should not import preexisting bugs."""
- bugdir = simple_bug_dir(on_disk=False)
- self.failUnless(bugdir._in_memory == True, bugdir._in_memory)
+ """simple_bug_dir(sync_with_disk==False) should not import preexisting bugs."""
+ bugdir = simple_bug_dir(sync_with_disk=False)
uuids = sorted([bug.uuid for bug in bugdir])
self.failUnless(uuids == ['a', 'b'], uuids)
- bugdir.load_all_bugs()
+ self.failUnlessRaises(DiskAccessRequired, bugdir.load_all_bugs)
uuids = sorted([bug.uuid for bug in bugdir])
self.failUnless(uuids == ['a', 'b'], uuids)
bugdir._clear_bugs()
uuids = sorted([bug.uuid for bug in bugdir])
self.failUnless(uuids == [], uuids)
- bugdir.load_all_bugs()
- uuids = sorted([bug.uuid for bug in bugdir])
- self.failUnless(uuids == [], uuids)
unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])