Fixed libbe.bugdir.BugDirTestCase.testComments(sync_with_disk=False).
[be.git] / libbe / bugdir.py
1 # Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
2 #                         Alexander Belchenko <bialix@ukr.net>
3 #                         Chris Ball <cjb@laptop.org>
4 #                         Oleg Romanyshyn <oromanyshyn@panoramicfeedback.com>
5 #                         W. Trevor King <wking@drexel.edu>
6 #
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License along
18 # with this program; if not, write to the Free Software Foundation, Inc.,
19 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 import copy
21 import errno
22 import os
23 import os.path
24 import sys
25 import time
26 import unittest
27 import doctest
28
29 from properties import Property, doc_property, local_property, \
30     defaulting_property, checked_property, fn_checked_property, \
31     cached_property, primed_property, change_hook_property, \
32     settings_property
33 import settings_object
34 import mapfile
35 import bug
36 import rcs
37 import encoding
38 import utility
39
40
41 class NoBugDir(Exception):
42     def __init__(self, path):
43         msg = "The directory \"%s\" has no bug directory." % path
44         Exception.__init__(self, msg)
45         self.path = path
46
47 class NoRootEntry(Exception):
48     def __init__(self, path):
49         self.path = path
50         Exception.__init__(self, "Specified root does not exist: %s" % path)
51
52 class AlreadyInitialized(Exception):
53     def __init__(self, path):
54         self.path = path
55         Exception.__init__(self,
56                            "Specified root is already initialized: %s" % path)
57
58 class MultipleBugMatches(ValueError):
59     def __init__(self, shortname, matches):
60         msg = ("More than one bug matches %s.  "
61                "Please be more specific.\n%s" % (shortname, matches))
62         ValueError.__init__(self, msg)
63         self.shortname = shortname
64         self.matches = matches
65
66 class NoBugMatches(KeyError):
67     def __init__(self, shortname):
68         msg = "No bug matches %s" % shortname
69         KeyError.__init__(self, msg)
70         self.shortname = shortname
71
72 class DiskAccessRequired (Exception):
73     def __init__(self, goal):
74         msg = "Cannot %s without accessing the disk" % goal
75         Exception.__init__(self, msg)
76
77
78 TREE_VERSION_STRING = "Bugs Everywhere Tree 1 0\n"
79
80
81 class BugDir (list, settings_object.SavedSettingsObject):
82     """
83     Sink to existing root
84     ======================
85
86     Consider the following usage case:
87     You have a bug directory rooted in
88       /path/to/source
89     by which I mean the '.be' directory is at
90       /path/to/source/.be
91     However, you're of in some subdirectory like
92       /path/to/source/GUI/testing
93     and you want to comment on a bug.  Setting sink_to_root=True wen
94     you initialize your BugDir will cause it to search for the '.be'
95     file in the ancestors of the path you passed in as 'root'.
96       /path/to/source/GUI/testing/.be     miss
97       /path/to/source/GUI/.be             miss
98       /path/to/source/.be                 hit!
99     So it still roots itself appropriately without much work for you.
100
101     File-system access
102     ==================
103
104     BugDirs live completely in memory when .sync_with_disk is False.
105     This is the default configuration setup by BugDir(from_disk=False).
106     If .sync_with_disk == True (e.g. BugDir(from_disk=True)), then
107     any changes to the BugDir will be immediately written to disk.
108
109     If you want to change .sync_with_disk, we suggest you use
110     .set_sync_with_disk(), which propogates the new setting through to
111     all bugs/comments/etc. that have been loaded into memory.  If
112     you've been living in memory and want to move to
113     .sync_with_disk==True, but you're not sure if anything has been
114     changed in memory, a call to save() immediately before the
115     .set_sync_with_disk(True) call is a safe move.
116
117     Regardless of .sync_with_disk, a call to .save() will write out
118     all the contents that the BugDir instance has loaded into memory.
119     If sync_with_disk has been True over the course of all interesting
120     changes, this .save() call will be a waste of time.
121
122     The BugDir will only load information from the file system when it
123     loads new settings/bugs/comments that it doesn't already have in
124     memory and .sync_with_disk == True.
125
126     Allow RCS initialization
127     ========================
128
129     This one is for testing purposes.  Setting it to True allows the
130     BugDir to search for an installed RCS backend and initialize it in
131     the root directory.  This is a convenience option for supporting
132     tests of versioning functionality (e.g. .duplicate_bugdir).
133
134     Disable encoding manipulation
135     =============================
136
137     This one is for testing purposed.  You might have non-ASCII
138     Unicode in your bugs, comments, files, etc.  BugDir instances try
139     and support your preferred encoding scheme (e.g. "utf-8") when
140     dealing with stream and file input/output.  For stream output,
141     this involves replacing sys.stdout and sys.stderr
142     (libbe.encode.set_IO_stream_encodings).  However this messes up
143     doctest's output catching.  In order to support doctest tests
144     using BugDirs, set manipulate_encodings=False, and stick to ASCII
145     in your tests.
146     """
147
148     settings_properties = []
149     required_saved_properties = []
150     _prop_save_settings = settings_object.prop_save_settings
151     _prop_load_settings = settings_object.prop_load_settings
152     def _versioned_property(settings_properties=settings_properties,
153                             required_saved_properties=required_saved_properties,
154                             **kwargs):
155         if "settings_properties" not in kwargs:
156             kwargs["settings_properties"] = settings_properties
157         if "required_saved_properties" not in kwargs:
158             kwargs["required_saved_properties"]=required_saved_properties
159         return settings_object.versioned_property(**kwargs)
160
161     @_versioned_property(name="target",
162                          doc="The current project development target.")
163     def target(): return {}
164
165     def _guess_encoding(self):
166         return encoding.get_encoding()
167     def _check_encoding(value):
168         if value != None:
169             return encoding.known_encoding(value)
170     def _setup_encoding(self, new_encoding):
171         # change hook called before generator.
172         if new_encoding not in [None, settings_object.EMPTY]:
173             if self._manipulate_encodings == True:
174                 encoding.set_IO_stream_encodings(new_encoding)
175     def _set_encoding(self, old_encoding, new_encoding):
176         self._setup_encoding(new_encoding)
177         self._prop_save_settings(old_encoding, new_encoding)
178
179     @_versioned_property(name="encoding",
180                          doc="""The default input/output encoding to use (e.g. "utf-8").""",
181                          change_hook=_set_encoding,
182                          generator=_guess_encoding,
183                          check_fn=_check_encoding)
184     def encoding(): return {}
185
186     def _setup_user_id(self, user_id):
187         self.rcs.user_id = user_id
188     def _guess_user_id(self):
189         return self.rcs.get_user_id()
190     def _set_user_id(self, old_user_id, new_user_id):
191         self._setup_user_id(new_user_id)
192         self._prop_save_settings(old_user_id, new_user_id)
193
194     @_versioned_property(name="user_id",
195                          doc=
196 """The user's prefered name, e.g. 'John Doe <jdoe@example.com>'.  Note
197 that the Arch RCS backend *enforces* ids with this format.""",
198                          change_hook=_set_user_id,
199                          generator=_guess_user_id)
200     def user_id(): return {}
201
202     @_versioned_property(name="default_assignee",
203                          doc=
204 """The default assignee for new bugs e.g. 'John Doe <jdoe@example.com>'.""")
205     def default_assignee(): return {}
206
207     @_versioned_property(name="rcs_name",
208                          doc="""The name of the current RCS.  Kept seperate to make saving/loading
209 settings easy.  Don't set this attribute.  Set .rcs instead, and
210 .rcs_name will be automatically adjusted.""",
211                          default="None",
212                          allowed=["None", "Arch", "bzr", "darcs", "git", "hg"])
213     def rcs_name(): return {}
214
215     def _get_rcs(self, rcs_name=None):
216         """Get and root a new revision control system"""
217         if rcs_name == None:
218             rcs_name = self.rcs_name
219         new_rcs = rcs.rcs_by_name(rcs_name)
220         self._change_rcs(None, new_rcs)
221         return new_rcs
222     def _change_rcs(self, old_rcs, new_rcs):
223         new_rcs.encoding = self.encoding
224         new_rcs.root(self.root)
225         self.rcs_name = new_rcs.name
226
227     @Property
228     @change_hook_property(hook=_change_rcs)
229     @cached_property(generator=_get_rcs)
230     @local_property("rcs")
231     @doc_property(doc="A revision control system instance.")
232     def rcs(): return {}
233
234     def _bug_map_gen(self):
235         map = {}
236         for bug in self:
237             map[bug.uuid] = bug
238         for uuid in self.list_uuids():
239             if uuid not in map:
240                 map[uuid] = None
241         self._bug_map_value = map # ._bug_map_value used by @local_property
242
243     def _extra_strings_check_fn(value):
244         return utility.iterable_full_of_strings(value, \
245                          alternative=settings_object.EMPTY)
246     def _extra_strings_change_hook(self, old, new):
247         self.extra_strings.sort() # to make merging easier
248         self._prop_save_settings(old, new)
249     @_versioned_property(name="extra_strings",
250                          doc="Space for an array of extra strings.  Useful for storing state for functionality implemented purely in becommands/<some_function>.py.",
251                          default=[],
252                          check_fn=_extra_strings_check_fn,
253                          change_hook=_extra_strings_change_hook,
254                          mutable=True)
255     def extra_strings(): return {}
256
257     @Property
258     @primed_property(primer=_bug_map_gen)
259     @local_property("bug_map")
260     @doc_property(doc="A dict of (bug-uuid, bug-instance) pairs.")
261     def _bug_map(): return {}
262
263     def _setup_severities(self, severities):
264         if severities not in [None, settings_object.EMPTY]:
265             bug.load_severities(severities)
266     def _set_severities(self, old_severities, new_severities):
267         self._setup_severities(new_severities)
268         self._prop_save_settings(old_severities, new_severities)
269     @_versioned_property(name="severities",
270                          doc="The allowed bug severities and their descriptions.",
271                          change_hook=_set_severities)
272     def severities(): return {}
273
274     def _setup_status(self, active_status, inactive_status):
275         bug.load_status(active_status, inactive_status)
276     def _set_active_status(self, old_active_status, new_active_status):
277         self._setup_status(new_active_status, self.inactive_status)
278         self._prop_save_settings(old_active_status, new_active_status)
279     @_versioned_property(name="active_status",
280                          doc="The allowed active bug states and their descriptions.",
281                          change_hook=_set_active_status)
282     def active_status(): return {}
283
284     def _set_inactive_status(self, old_inactive_status, new_inactive_status):
285         self._setup_status(self.active_status, new_inactive_status)
286         self._prop_save_settings(old_inactive_status, new_inactive_status)
287     @_versioned_property(name="inactive_status",
288                          doc="The allowed inactive bug states and their descriptions.",
289                          change_hook=_set_inactive_status)
290     def inactive_status(): return {}
291
292
293     def __init__(self, root=None, sink_to_existing_root=True,
294                  assert_new_BugDir=False, allow_rcs_init=False,
295                  manipulate_encodings=True, from_disk=False, rcs=None):
296         list.__init__(self)
297         settings_object.SavedSettingsObject.__init__(self)
298         self._manipulate_encodings = manipulate_encodings
299         if root == None:
300             root = os.getcwd()
301         if sink_to_existing_root == True:
302             self.root = self._find_root(root)
303         else:
304             if not os.path.exists(root):
305                 raise NoRootEntry(root)
306             self.root = root
307         # get a temporary rcs until we've loaded settings
308         self.sync_with_disk = False
309         self.rcs = self._guess_rcs()
310
311         if from_disk == True:
312             self.sync_with_disk = True
313             self.load()
314         else:
315             self.sync_with_disk = False
316             if assert_new_BugDir == True:
317                 if os.path.exists(self.get_path()):
318                     raise AlreadyInitialized, self.get_path()
319             if rcs == None:
320                 rcs = self._guess_rcs(allow_rcs_init)
321             self.rcs = rcs
322             self._setup_user_id(self.user_id)
323
324     # methods for getting the BugDir situated in the filesystem
325
326     def _find_root(self, path):
327         """
328         Search for an existing bug database dir and it's ancestors and
329         return a BugDir rooted there.  Only called by __init__, and
330         then only if sink_to_existing_root == True.
331         """
332         if not os.path.exists(path):
333             raise NoRootEntry(path)
334         versionfile=utility.search_parent_directories(path,
335                                                       os.path.join(".be", "version"))
336         if versionfile != None:
337             beroot = os.path.dirname(versionfile)
338             root = os.path.dirname(beroot)
339             return root
340         else:
341             beroot = utility.search_parent_directories(path, ".be")
342             if beroot == None:
343                 raise NoBugDir(path)
344             return beroot
345
346     def _guess_rcs(self, allow_rcs_init=False):
347         """
348         Only called by __init__.
349         """
350         deepdir = self.get_path()
351         if not os.path.exists(deepdir):
352             deepdir = os.path.dirname(deepdir)
353         new_rcs = rcs.detect_rcs(deepdir)
354         install = False
355         if new_rcs.name == "None":
356             if allow_rcs_init == True:
357                 new_rcs = rcs.installed_rcs()
358                 new_rcs.init(self.root)
359         return new_rcs
360
361     # methods for saving/loading/accessing settings and properties.
362
363     def get_path(self, *args):
364         """
365         Return a path relative to .root.
366         """
367         my_dir = os.path.join(self.root, ".be")
368         if len(args) == 0:
369             return my_dir
370         assert args[0] in ["version", "settings", "bugs"], str(args)
371         return os.path.join(my_dir, *args)
372
373     def _get_settings(self, settings_path, for_duplicate_bugdir=False):
374         allow_no_rcs = not self.rcs.path_in_root(settings_path)
375         if allow_no_rcs == True:
376             assert for_duplicate_bugdir == True
377         if self.sync_with_disk == False and for_duplicate_bugdir == False:
378             # duplicates can ignore this bugdir's .sync_with_disk status
379             raise DiskAccessRequired("_get settings")
380         try:
381             settings = mapfile.map_load(self.rcs, settings_path, allow_no_rcs)
382         except rcs.NoSuchFile:
383             settings = {"rcs_name": "None"}
384         return settings
385
386     def _save_settings(self, settings_path, settings,
387                        for_duplicate_bugdir=False):
388         allow_no_rcs = not self.rcs.path_in_root(settings_path)
389         if allow_no_rcs == True:
390             assert for_duplicate_bugdir == True
391         if self.sync_with_disk == False and for_duplicate_bugdir == False:
392             # duplicates can ignore this bugdir's .sync_with_disk status
393             raise DiskAccessRequired("_save settings")
394         self.rcs.mkdir(self.get_path(), allow_no_rcs)
395         mapfile.map_save(self.rcs, settings_path, settings, allow_no_rcs)
396
397     def load_settings(self):
398         self.settings = self._get_settings(self.get_path("settings"))
399         self._setup_saved_settings()
400         self._setup_user_id(self.user_id)
401         self._setup_encoding(self.encoding)
402         self._setup_severities(self.severities)
403         self._setup_status(self.active_status, self.inactive_status)
404         self.rcs = rcs.rcs_by_name(self.rcs_name)
405         self._setup_user_id(self.user_id)
406
407     def save_settings(self):
408         settings = self._get_saved_settings()
409         self._save_settings(self.get_path("settings"), settings)
410
411     def get_version(self, path=None, use_none_rcs=False):
412         """
413         Requires disk access.
414         """
415         if self.sync_with_disk == False:
416             raise DiskAccessRequired("get version")
417         if use_none_rcs == True:
418             RCS = rcs.rcs_by_name("None")
419             RCS.root(self.root)
420             RCS.encoding = encoding.get_encoding()
421         else:
422             RCS = self.rcs
423
424         if path == None:
425             path = self.get_path("version")
426         tree_version = RCS.get_file_contents(path)
427         return tree_version
428
429     def set_version(self):
430         """
431         Requires disk access.
432         """
433         if self.sync_with_disk == False:
434             raise DiskAccessRequired("set version")
435         self.rcs.mkdir(self.get_path())
436         self.rcs.set_file_contents(self.get_path("version"),
437                                    TREE_VERSION_STRING)
438
439     # methods controlling disk access
440
441     def set_sync_with_disk(self, value):
442         """
443         Adjust .sync_with_disk for the BugDir and all it's children.
444         See the BugDir docstring for a description of the role of
445         .sync_with_disk.
446         """
447         self.sync_with_disk = value
448         for bug in self:
449             bug.set_sync_with_disk(value)
450
451     def load(self):
452         """
453         Reqires disk access
454         """
455         version = self.get_version(use_none_rcs=True)
456         if version != TREE_VERSION_STRING:
457             raise NotImplementedError, \
458                 "BugDir cannot handle version '%s' yet." % version
459         else:
460             if not os.path.exists(self.get_path()):
461                 raise NoBugDir(self.get_path())
462             self.load_settings()
463
464     def load_all_bugs(self):
465         """
466         Requires disk access.
467         Warning: this could take a while.
468         """
469         if self.sync_with_disk == False:
470             raise DiskAccessRequired("load all bugs")
471         self._clear_bugs()
472         for uuid in self.list_uuids():
473             self._load_bug(uuid)
474
475     def save(self):
476         """
477         Note that this command writes to disk _regardless_ of the
478         status of .sync_with_disk.
479
480         Save any loaded contents to disk.  Because of lazy loading of
481         bugs and comments, this is actually not too inefficient.
482
483         However, if .sync_with_disk = True, then any changes are
484         automatically written to disk as soon as they happen, so
485         calling this method will just waste time (unless something
486         else has been messing with your on-disk files).
487
488         Requires disk access.
489         """
490         sync_with_disk = self.sync_with_disk
491         if sync_with_disk == False:
492             self.set_sync_with_disk(True)
493         self.set_version()
494         self.save_settings()
495         for bug in self:
496             bug.save()
497         if sync_with_disk == False:
498             self.set_sync_with_disk(sync_with_disk)
499
500     # methods for managing duplicate BugDirs
501
502     def duplicate_bugdir(self, revision):
503         duplicate_path = self.rcs.duplicate_repo(revision)
504
505         # setup revision RCS as None, since the duplicate may not be
506         # initialized for versioning
507         duplicate_settings_path = os.path.join(duplicate_path,
508                                                ".be", "settings")
509         duplicate_settings = self._get_settings(duplicate_settings_path,
510                                                 for_duplicate_bugdir=True)
511         if "rcs_name" in duplicate_settings:
512             duplicate_settings["rcs_name"] = "None"
513             duplicate_settings["user_id"] = self.user_id
514         if "disabled" in bug.status_values:
515             # Hack to support old versions of BE bugs
516             duplicate_settings["inactive_status"] = self.inactive_status
517         self._save_settings(duplicate_settings_path, duplicate_settings,
518                             for_duplicate_bugdir=True)
519
520         return BugDir(duplicate_path, from_disk=True, manipulate_encodings=self._manipulate_encodings)
521
522     def remove_duplicate_bugdir(self):
523         self.rcs.remove_duplicate_repo()
524
525     # methods for managing bugs
526
527     def list_uuids(self):
528         uuids = []
529         if self.sync_with_disk == True and os.path.exists(self.get_path()):
530             # list the uuids on disk
531             for uuid in os.listdir(self.get_path("bugs")):
532                 if not (uuid.startswith('.')):
533                     uuids.append(uuid)
534                     yield uuid
535         # and the ones that are still just in memory
536         for bug in self:
537             if bug.uuid not in uuids:
538                 uuids.append(bug.uuid)
539                 yield bug.uuid
540
541     def _clear_bugs(self):
542         while len(self) > 0:
543             self.pop()
544         self._bug_map_gen()
545
546     def _load_bug(self, uuid):
547         if self.sync_with_disk == False:
548             raise DiskAccessRequired("_load bug")
549         bg = bug.Bug(bugdir=self, uuid=uuid, from_disk=True)
550         self.append(bg)
551         self._bug_map_gen()
552         return bg
553
554     def new_bug(self, uuid=None, summary=None):
555         bg = bug.Bug(bugdir=self, uuid=uuid, summary=summary)
556         bg.set_sync_with_disk(self.sync_with_disk)
557         if bg.sync_with_disk == True:
558             bg.save()
559         self.append(bg)
560         self._bug_map_gen()
561         return bg
562
563     def remove_bug(self, bug):
564         self.remove(bug)
565         if bug.sync_with_disk == True:
566             bug.remove()
567
568     def bug_shortname(self, bug):
569         """
570         Generate short names from uuids.  Picks the minimum number of
571         characters (>=3) from the beginning of the uuid such that the
572         short names are unique.
573
574         Obviously, as the number of bugs in the database grows, these
575         short names will cease to be unique.  The complete uuid should be
576         used for long term reference.
577         """
578         chars = 3
579         for uuid in self._bug_map.keys():
580             if bug.uuid == uuid:
581                 continue
582             while (bug.uuid[:chars] == uuid[:chars]):
583                 chars+=1
584         return bug.uuid[:chars]
585
586     def bug_from_shortname(self, shortname):
587         """
588         >>> bd = simple_bug_dir(sync_with_disk=False)
589         >>> bug_a = bd.bug_from_shortname('a')
590         >>> print type(bug_a)
591         <class 'libbe.bug.Bug'>
592         >>> print bug_a
593         a:om: Bug A
594         """
595         matches = []
596         self._bug_map_gen()
597         for uuid in self._bug_map.keys():
598             if uuid.startswith(shortname):
599                 matches.append(uuid)
600         if len(matches) > 1:
601             raise MultipleBugMatches(shortname, matches)
602         if len(matches) == 1:
603             return self.bug_from_uuid(matches[0])
604         raise NoBugMatches(shortname)
605
606     def bug_from_uuid(self, uuid):
607         if not self.has_bug(uuid):
608             raise KeyError("No bug matches %s\n  bug map: %s\n  root: %s" \
609                                % (uuid, self._bug_map, self.root))
610         if self._bug_map[uuid] == None:
611             self._load_bug(uuid)
612         return self._bug_map[uuid]
613
614     def has_bug(self, bug_uuid):
615         if bug_uuid not in self._bug_map:
616             self._bug_map_gen()
617             if bug_uuid not in self._bug_map:
618                 return False
619         return True
620
621
622 def simple_bug_dir(sync_with_disk=True):
623     """
624     For testing.  Set sync_with_disk==False for a memory-only bugdir.
625     >>> bugdir = simple_bug_dir()
626     >>> uuids = list(bugdir.list_uuids())
627     >>> uuids.sort()
628     >>> print uuids
629     ['a', 'b']
630     """
631     if sync_with_disk == True:
632         dir = utility.Dir()
633         assert os.path.exists(dir.path)
634         root = dir.path
635         assert_new_BugDir = True
636         rcs_init = True
637     else:
638         root = "/"
639         assert_new_BugDir = False
640         rcs_init = False
641     bugdir = BugDir(root, sink_to_existing_root=False,
642                     assert_new_BugDir=assert_new_BugDir,
643                     allow_rcs_init=rcs_init,
644                     manipulate_encodings=False)
645     if sync_with_disk == True: # postpone cleanup since dir.__del__() removes dir.
646         bugdir._dir_ref = dir
647     bug_a = bugdir.new_bug("a", summary="Bug A")
648     bug_a.creator = "John Doe <jdoe@example.com>"
649     bug_a.time = 0
650     bug_b = bugdir.new_bug("b", summary="Bug B")
651     bug_b.creator = "Jane Doe <jdoe@example.com>"
652     bug_b.time = 0
653     bug_b.status = "closed"
654     if sync_with_disk == True:
655         bugdir.save()
656         bugdir.set_sync_with_disk(True)
657     return bugdir
658
659
660 class BugDirTestCase(unittest.TestCase):
661     def setUp(self):
662         self.dir = utility.Dir()
663         self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False,
664                              allow_rcs_init=True)
665         self.rcs = self.bugdir.rcs
666     def tearDown(self):
667         self.rcs.cleanup()
668         self.dir.cleanup()
669     def fullPath(self, path):
670         return os.path.join(self.dir.path, path)
671     def assertPathExists(self, path):
672         fullpath = self.fullPath(path)
673         self.failUnless(os.path.exists(fullpath)==True,
674                         "path %s does not exist" % fullpath)
675         self.assertRaises(AlreadyInitialized, BugDir,
676                           self.dir.path, assertNewBugDir=True)
677     def versionTest(self):
678         if self.rcs.versioned == False:
679             return
680         original = self.bugdir.rcs.commit("Began versioning")
681         bugA = self.bugdir.bug_from_uuid("a")
682         bugA.status = "fixed"
683         self.bugdir.save()
684         new = self.rcs.commit("Fixed bug a")
685         dupdir = self.bugdir.duplicate_bugdir(original)
686         self.failUnless(dupdir.root != self.bugdir.root,
687                         "%s, %s" % (dupdir.root, self.bugdir.root))
688         bugAorig = dupdir.bug_from_uuid("a")
689         self.failUnless(bugA != bugAorig,
690                         "\n%s\n%s" % (bugA.string(), bugAorig.string()))
691         bugAorig.status = "fixed"
692         self.failUnless(bug.cmp_status(bugA, bugAorig)==0,
693                         "%s, %s" % (bugA.status, bugAorig.status))
694         self.failUnless(bug.cmp_severity(bugA, bugAorig)==0,
695                         "%s, %s" % (bugA.severity, bugAorig.severity))
696         self.failUnless(bug.cmp_assigned(bugA, bugAorig)==0,
697                         "%s, %s" % (bugA.assigned, bugAorig.assigned))
698         self.failUnless(bug.cmp_time(bugA, bugAorig)==0,
699                         "%s, %s" % (bugA.time, bugAorig.time))
700         self.failUnless(bug.cmp_creator(bugA, bugAorig)==0,
701                         "%s, %s" % (bugA.creator, bugAorig.creator))
702         self.failUnless(bugA == bugAorig,
703                         "\n%s\n%s" % (bugA.string(), bugAorig.string()))
704         self.bugdir.remove_duplicate_bugdir()
705         self.failUnless(os.path.exists(dupdir.root)==False, str(dupdir.root))
706     def testRun(self):
707         self.bugdir.new_bug(uuid="a", summary="Ant")
708         self.bugdir.new_bug(uuid="b", summary="Cockroach")
709         self.bugdir.new_bug(uuid="c", summary="Praying mantis")
710         length = len(self.bugdir)
711         self.failUnless(length == 3, "%d != 3 bugs" % length)
712         uuids = list(self.bugdir.list_uuids())
713         self.failUnless(len(uuids) == 3, "%d != 3 uuids" % len(uuids))
714         self.failUnless(uuids == ["a","b","c"], str(uuids))
715         bugA = self.bugdir.bug_from_uuid("a")
716         bugAprime = self.bugdir.bug_from_shortname("a")
717         self.failUnless(bugA == bugAprime, "%s != %s" % (bugA, bugAprime))
718         self.bugdir.save()
719         self.versionTest()
720     def testComments(self, sync_with_disk=False):
721         if sync_with_disk == True:
722             self.bugdir.set_sync_with_disk(True)
723         self.bugdir.new_bug(uuid="a", summary="Ant")
724         bug = self.bugdir.bug_from_uuid("a")
725         comm = bug.comment_root
726         rep = comm.new_reply("Ants are small.")
727         rep.new_reply("And they have six legs.")
728         if sync_with_disk == False:
729             self.bugdir.save()
730             self.bugdir.set_sync_with_disk(True)
731         self.bugdir._clear_bugs()
732         bug = self.bugdir.bug_from_uuid("a")
733         bug.load_comments()
734         if sync_with_disk == False:
735             self.bugdir.set_sync_with_disk(False)
736         self.failUnless(len(bug.comment_root)==1, len(bug.comment_root))
737         for index,comment in enumerate(bug.comments()):
738             if index == 0:
739                 repLoaded = comment
740                 self.failUnless(repLoaded.uuid == rep.uuid, repLoaded.uuid)
741                 self.failUnless(comment.sync_with_disk == sync_with_disk,
742                                 comment.sync_with_disk)
743                 self.failUnless(comment.content_type == "text/plain",
744                                 comment.content_type)
745                 self.failUnless(repLoaded.settings["Content-type"]=="text/plain",
746                                 repLoaded.settings)
747                 self.failUnless(repLoaded.body == "Ants are small.",
748                                 repLoaded.body)
749             elif index == 1:
750                 self.failUnless(comment.in_reply_to == repLoaded.uuid,
751                                 repLoaded.uuid)
752                 self.failUnless(comment.body == "And they have six legs.",
753                                 comment.body)
754             else:
755                 self.failIf(True, "Invalid comment: %d\n%s" % (index, comment))
756     def testSyncedComments(self):
757         self.testComments(sync_with_disk=True)
758
759 class SimpleBugDirTestCase (unittest.TestCase):
760     def setUp(self):
761         # create a pre-existing bugdir in a temporary directory
762         self.dir = utility.Dir()
763         os.chdir(self.dir.path)
764         self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False,
765                              allow_rcs_init=True)
766         self.bugdir.new_bug("preexisting", summary="Hopefully not imported")
767         self.bugdir.save()
768     def tearDown(self):
769         self.dir.cleanup()
770     def testOnDiskCleanLoad(self):
771         """simple_bug_dir(sync_with_disk==True) should not import preexisting bugs."""
772         bugdir = simple_bug_dir(sync_with_disk=True)
773         self.failUnless(bugdir.sync_with_disk==True, bugdir.sync_with_disk)
774         uuids = sorted([bug.uuid for bug in bugdir])
775         self.failUnless(uuids == ['a', 'b'], uuids)
776         bugdir._clear_bugs()
777         uuids = sorted([bug.uuid for bug in bugdir])
778         self.failUnless(uuids == [], uuids)
779         bugdir.load_all_bugs()
780         uuids = sorted([bug.uuid for bug in bugdir])
781         self.failUnless(uuids == ['a', 'b'], uuids)
782     def testInMemoryCleanLoad(self):
783         """simple_bug_dir(sync_with_disk==False) should not import preexisting bugs."""
784         bugdir = simple_bug_dir(sync_with_disk=False)
785         self.failUnless(bugdir.sync_with_disk==False, bugdir.sync_with_disk)
786         uuids = sorted([bug.uuid for bug in bugdir])
787         self.failUnless(uuids == ['a', 'b'], uuids)
788         self.failUnlessRaises(DiskAccessRequired, bugdir.load_all_bugs)
789         uuids = sorted([bug.uuid for bug in bugdir])
790         self.failUnless(uuids == ['a', 'b'], uuids)
791         bugdir._clear_bugs()
792         uuids = sorted([bug.uuid for bug in bugdir])
793         self.failUnless(uuids == [], uuids)
794
795
796 unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
797 suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])