# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+import copy
+import errno
import os
import os.path
-import errno
+import sys
import time
-import copy
import unittest
import doctest
all bugs/comments/etc. that have been loaded into memory. If
you've been living in memory and want to move to
.sync_with_disk==True, but you're not sure if anything has been
- changed in memoryy, a call to save() is a safe move.
+ changed in memory, a call to save() is a safe move.
Regardless of .sync_with_disk, a call to .save() will write out
all the contents that the BugDir instance has loaded into memory.
The BugDir will only load information from the file system when it
loads new bugs/comments that it doesn't already have in memory, or
when it explicitly asked to do so (e.g. .load() or
- __init__(from_disk=True)).
+ __init__(from_disk=True)). If you want BugDir to live entirely in
+ memory (ignoring even explicit calls to .load(), .save(), etc.),
+ set in_memory = True. The in_memory option is ignored if
+ from_disk==True.
Allow RCS initialization
========================
def __init__(self, root=None, sink_to_existing_root=True,
assert_new_BugDir=False, allow_rcs_init=False,
manipulate_encodings=True,
- from_disk=False, rcs=None):
+ from_disk=False, in_memory=False, rcs=None):
list.__init__(self)
settings_object.SavedSettingsObject.__init__(self)
self._manipulate_encodings = manipulate_encodings
+ self._in_memory = in_memory
if root == None:
root = os.getcwd()
if sink_to_existing_root == True:
if from_disk == True:
self.sync_with_disk = True
+ self._in_memory = False
self.load()
else:
self.sync_with_disk = False
return beroot
def get_version(self, path=None, use_none_rcs=False):
+ if self._in_memory == True:
+ return TREE_VERSION_STRING
if use_none_rcs == True:
RCS = rcs.rcs_by_name("None")
RCS.root(self.root)
return tree_version
def set_version(self):
+ if self._in_memory == True:
+ return
self.rcs.mkdir(self.get_path())
self.rcs.set_file_contents(self.get_path("version"),
TREE_VERSION_STRING)
raise NotImplementedError, \
"BugDir cannot handle version '%s' yet." % version
else:
- if not os.path.exists(self.get_path()):
+ if not os.path.exists(self.get_path()) and self._in_memory == False:
raise NoBugDir(self.get_path())
self.load_settings()
def load_all_bugs(self):
"Warning: this could take a while."
+ if self._in_memory == True:
+ return
self._clear_bugs()
for uuid in self.list_uuids():
self._load_bug(uuid)
self.set_version()
self.save_settings()
for bug in self:
+ if self._in_memory == True:
+ return
bug.save()
def load_settings(self):
self._setup_status(self.active_status, self.inactive_status)
def _get_settings(self, settings_path):
+ if self._in_memory == True:
+ return {}
allow_no_rcs = not self.rcs.path_in_root(settings_path)
# allow_no_rcs=True should only be for the special case of
# configuring duplicate bugdir settings
self._save_settings(self.get_path("settings"), settings)
def _save_settings(self, settings_path, settings):
+ if self._in_memory == True:
+ return
allow_no_rcs = not self.rcs.path_in_root(settings_path)
# allow_no_rcs=True should only be for the special case of
# configuring duplicate bugdir settings
def list_uuids(self):
uuids = []
- if os.path.exists(self.get_path()):
+ if self._in_memory == False and os.path.exists(self.get_path()):
# list the uuids on disk
for uuid in os.listdir(self.get_path("bugs")):
if not (uuid.startswith('.')):
def simple_bug_dir(on_disk=True):
"""
- For testing
+ For testing. Set on_disk==False for a memory-only bugdir.
>>> bugdir = simple_bug_dir()
- >>> ls = list(bugdir.list_uuids())
- >>> ls.sort()
- >>> print ls
+ >>> uuids = list(bugdir.list_uuids())
+ >>> uuids.sort()
+ >>> print uuids
['a', 'b']
"""
if on_disk == True:
dir = utility.Dir()
assert os.path.exists(dir.path)
root = dir.path
+ in_memory = False
rcs_init = True
else:
- root = None
+ root = "/"
+ in_memory = True
rcs_init = False
- bugdir = BugDir(root, sink_to_existing_root=False, allow_rcs_init=rcs_init,
+ bugdir = BugDir(root, sink_to_existing_root=False,
+ in_memory=in_memory,
+ allow_rcs_init=rcs_init,
manipulate_encodings=False)
if on_disk == True: # postpone cleanup since dir.__del__() removes dir.
bugdir._dir_ref = dir
class BugDirTestCase(unittest.TestCase):
- def __init__(self, *args, **kwargs):
- unittest.TestCase.__init__(self, *args, **kwargs)
def setUp(self):
self.dir = utility.Dir()
self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False,
def testSyncedComments(self):
self.testComments(sync_with_disk=True)
-unitsuite = unittest.TestLoader().loadTestsFromTestCase(BugDirTestCase)
-suite = unittest.TestSuite([unitsuite])#, doctest.DocTestSuite()])
+class SimpleBugDirTestCase (unittest.TestCase):
+ def setUp(self):
+ # create a pre-existing bugdir in a temporary directory
+ self.dir = utility.Dir()
+ os.chdir(self.dir.path)
+ self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False,
+ allow_rcs_init=True)
+ self.bugdir.new_bug("preexisting", summary="Hopefully not imported")
+ self.bugdir.save()
+ def tearDown(self):
+ self.dir.cleanup()
+ def testOnDiskCleanLoad(self):
+ """simple_bug_dir(on_disk==True) should not import preexisting bugs."""
+ bugdir = simple_bug_dir(on_disk=True)
+ self.failUnless(bugdir._in_memory == False, bugdir._in_memory)
+ uuids = sorted([bug.uuid for bug in bugdir])
+ self.failUnless(uuids == ['a', 'b'], uuids)
+ bugdir._clear_bugs()
+ uuids = sorted([bug.uuid for bug in bugdir])
+ self.failUnless(uuids == [], uuids)
+ bugdir.load_all_bugs()
+ uuids = sorted([bug.uuid for bug in bugdir])
+ self.failUnless(uuids == ['a', 'b'], uuids)
+ def testInMemoryCleanLoad(self):
+ """simple_bug_dir(on_disk==False) should not import preexisting bugs."""
+ bugdir = simple_bug_dir(on_disk=False)
+ self.failUnless(bugdir._in_memory == True, bugdir._in_memory)
+ uuids = sorted([bug.uuid for bug in bugdir])
+ self.failUnless(uuids == ['a', 'b'], uuids)
+ bugdir.load_all_bugs()
+ uuids = sorted([bug.uuid for bug in bugdir])
+ self.failUnless(uuids == ['a', 'b'], uuids)
+ bugdir._clear_bugs()
+ uuids = sorted([bug.uuid for bug in bugdir])
+ self.failUnless(uuids == [], uuids)
+ bugdir.load_all_bugs()
+ uuids = sorted([bug.uuid for bug in bugdir])
+ self.failUnless(uuids == [], uuids)
+
+
+unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])