Improved unittest cleanup by adding BugDir.cleanup().
[be.git] / libbe / bugdir.py
1 # Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
2 #                         Alexander Belchenko <bialix@ukr.net>
3 #                         Chris Ball <cjb@laptop.org>
4 #                         Oleg Romanyshyn <oromanyshyn@panoramicfeedback.com>
5 #                         W. Trevor King <wking@drexel.edu>
6 #
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License along
18 # with this program; if not, write to the Free Software Foundation, Inc.,
19 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 import copy
21 import errno
22 import os
23 import os.path
24 import sys
25 import time
26 import unittest
27 import doctest
28
29 from properties import Property, doc_property, local_property, \
30     defaulting_property, checked_property, fn_checked_property, \
31     cached_property, primed_property, change_hook_property, \
32     settings_property
33 import settings_object
34 import mapfile
35 import bug
36 import rcs
37 import encoding
38 import utility
39
40
41 class NoBugDir(Exception):
42     def __init__(self, path):
43         msg = "The directory \"%s\" has no bug directory." % path
44         Exception.__init__(self, msg)
45         self.path = path
46
47 class NoRootEntry(Exception):
48     def __init__(self, path):
49         self.path = path
50         Exception.__init__(self, "Specified root does not exist: %s" % path)
51
52 class AlreadyInitialized(Exception):
53     def __init__(self, path):
54         self.path = path
55         Exception.__init__(self,
56                            "Specified root is already initialized: %s" % path)
57
58 class MultipleBugMatches(ValueError):
59     def __init__(self, shortname, matches):
60         msg = ("More than one bug matches %s.  "
61                "Please be more specific.\n%s" % (shortname, matches))
62         ValueError.__init__(self, msg)
63         self.shortname = shortname
64         self.matches = matches
65
66 class NoBugMatches(KeyError):
67     def __init__(self, shortname):
68         msg = "No bug matches %s" % shortname
69         KeyError.__init__(self, msg)
70         self.shortname = shortname
71
72 class DiskAccessRequired (Exception):
73     def __init__(self, goal):
74         msg = "Cannot %s without accessing the disk" % goal
75         Exception.__init__(self, msg)
76
77
78 TREE_VERSION_STRING = "Bugs Everywhere Tree 1 0\n"
79
80
81 class BugDir (list, settings_object.SavedSettingsObject):
82     """
83     Sink to existing root
84     ======================
85
86     Consider the following usage case:
87     You have a bug directory rooted in
88       /path/to/source
89     by which I mean the '.be' directory is at
90       /path/to/source/.be
91     However, you're of in some subdirectory like
92       /path/to/source/GUI/testing
93     and you want to comment on a bug.  Setting sink_to_root=True wen
94     you initialize your BugDir will cause it to search for the '.be'
95     file in the ancestors of the path you passed in as 'root'.
96       /path/to/source/GUI/testing/.be     miss
97       /path/to/source/GUI/.be             miss
98       /path/to/source/.be                 hit!
99     So it still roots itself appropriately without much work for you.
100
101     File-system access
102     ==================
103
104     BugDirs live completely in memory when .sync_with_disk is False.
105     This is the default configuration setup by BugDir(from_disk=False).
106     If .sync_with_disk == True (e.g. BugDir(from_disk=True)), then
107     any changes to the BugDir will be immediately written to disk.
108
109     If you want to change .sync_with_disk, we suggest you use
110     .set_sync_with_disk(), which propogates the new setting through to
111     all bugs/comments/etc. that have been loaded into memory.  If
112     you've been living in memory and want to move to
113     .sync_with_disk==True, but you're not sure if anything has been
114     changed in memory, a call to save() immediately before the
115     .set_sync_with_disk(True) call is a safe move.
116
117     Regardless of .sync_with_disk, a call to .save() will write out
118     all the contents that the BugDir instance has loaded into memory.
119     If sync_with_disk has been True over the course of all interesting
120     changes, this .save() call will be a waste of time.
121
122     The BugDir will only load information from the file system when it
123     loads new settings/bugs/comments that it doesn't already have in
124     memory and .sync_with_disk == True.
125
126     Allow RCS initialization
127     ========================
128
129     This one is for testing purposes.  Setting it to True allows the
130     BugDir to search for an installed RCS backend and initialize it in
131     the root directory.  This is a convenience option for supporting
132     tests of versioning functionality (e.g. .duplicate_bugdir).
133
134     Disable encoding manipulation
135     =============================
136
137     This one is for testing purposed.  You might have non-ASCII
138     Unicode in your bugs, comments, files, etc.  BugDir instances try
139     and support your preferred encoding scheme (e.g. "utf-8") when
140     dealing with stream and file input/output.  For stream output,
141     this involves replacing sys.stdout and sys.stderr
142     (libbe.encode.set_IO_stream_encodings).  However this messes up
143     doctest's output catching.  In order to support doctest tests
144     using BugDirs, set manipulate_encodings=False, and stick to ASCII
145     in your tests.
146     """
147
148     settings_properties = []
149     required_saved_properties = []
150     _prop_save_settings = settings_object.prop_save_settings
151     _prop_load_settings = settings_object.prop_load_settings
152     def _versioned_property(settings_properties=settings_properties,
153                             required_saved_properties=required_saved_properties,
154                             **kwargs):
155         if "settings_properties" not in kwargs:
156             kwargs["settings_properties"] = settings_properties
157         if "required_saved_properties" not in kwargs:
158             kwargs["required_saved_properties"]=required_saved_properties
159         return settings_object.versioned_property(**kwargs)
160
161     @_versioned_property(name="target",
162                          doc="The current project development target.")
163     def target(): return {}
164
165     def _guess_encoding(self):
166         return encoding.get_encoding()
167     def _check_encoding(value):
168         if value != None:
169             return encoding.known_encoding(value)
170     def _setup_encoding(self, new_encoding):
171         # change hook called before generator.
172         if new_encoding not in [None, settings_object.EMPTY]:
173             if self._manipulate_encodings == True:
174                 encoding.set_IO_stream_encodings(new_encoding)
175     def _set_encoding(self, old_encoding, new_encoding):
176         self._setup_encoding(new_encoding)
177         self._prop_save_settings(old_encoding, new_encoding)
178
179     @_versioned_property(name="encoding",
180                          doc="""The default input/output encoding to use (e.g. "utf-8").""",
181                          change_hook=_set_encoding,
182                          generator=_guess_encoding,
183                          check_fn=_check_encoding)
184     def encoding(): return {}
185
186     def _setup_user_id(self, user_id):
187         self.rcs.user_id = user_id
188     def _guess_user_id(self):
189         return self.rcs.get_user_id()
190     def _set_user_id(self, old_user_id, new_user_id):
191         self._setup_user_id(new_user_id)
192         self._prop_save_settings(old_user_id, new_user_id)
193
194     @_versioned_property(name="user_id",
195                          doc=
196 """The user's prefered name, e.g. 'John Doe <jdoe@example.com>'.  Note
197 that the Arch RCS backend *enforces* ids with this format.""",
198                          change_hook=_set_user_id,
199                          generator=_guess_user_id)
200     def user_id(): return {}
201
202     @_versioned_property(name="default_assignee",
203                          doc=
204 """The default assignee for new bugs e.g. 'John Doe <jdoe@example.com>'.""")
205     def default_assignee(): return {}
206
207     @_versioned_property(name="rcs_name",
208                          doc="""The name of the current RCS.  Kept seperate to make saving/loading
209 settings easy.  Don't set this attribute.  Set .rcs instead, and
210 .rcs_name will be automatically adjusted.""",
211                          default="None",
212                          allowed=["None", "Arch", "bzr", "darcs", "git", "hg"])
213     def rcs_name(): return {}
214
215     def _get_rcs(self, rcs_name=None):
216         """Get and root a new revision control system"""
217         if rcs_name == None:
218             rcs_name = self.rcs_name
219         new_rcs = rcs.rcs_by_name(rcs_name)
220         self._change_rcs(None, new_rcs)
221         return new_rcs
222     def _change_rcs(self, old_rcs, new_rcs):
223         new_rcs.encoding = self.encoding
224         new_rcs.root(self.root)
225         self.rcs_name = new_rcs.name
226
227     @Property
228     @change_hook_property(hook=_change_rcs)
229     @cached_property(generator=_get_rcs)
230     @local_property("rcs")
231     @doc_property(doc="A revision control system instance.")
232     def rcs(): return {}
233
234     def _bug_map_gen(self):
235         map = {}
236         for bug in self:
237             map[bug.uuid] = bug
238         for uuid in self.list_uuids():
239             if uuid not in map:
240                 map[uuid] = None
241         self._bug_map_value = map # ._bug_map_value used by @local_property
242
243     def _extra_strings_check_fn(value):
244         return utility.iterable_full_of_strings(value, \
245                          alternative=settings_object.EMPTY)
246     def _extra_strings_change_hook(self, old, new):
247         self.extra_strings.sort() # to make merging easier
248         self._prop_save_settings(old, new)
249     @_versioned_property(name="extra_strings",
250                          doc="Space for an array of extra strings.  Useful for storing state for functionality implemented purely in becommands/<some_function>.py.",
251                          default=[],
252                          check_fn=_extra_strings_check_fn,
253                          change_hook=_extra_strings_change_hook,
254                          mutable=True)
255     def extra_strings(): return {}
256
257     @Property
258     @primed_property(primer=_bug_map_gen)
259     @local_property("bug_map")
260     @doc_property(doc="A dict of (bug-uuid, bug-instance) pairs.")
261     def _bug_map(): return {}
262
263     def _setup_severities(self, severities):
264         if severities not in [None, settings_object.EMPTY]:
265             bug.load_severities(severities)
266     def _set_severities(self, old_severities, new_severities):
267         self._setup_severities(new_severities)
268         self._prop_save_settings(old_severities, new_severities)
269     @_versioned_property(name="severities",
270                          doc="The allowed bug severities and their descriptions.",
271                          change_hook=_set_severities)
272     def severities(): return {}
273
274     def _setup_status(self, active_status, inactive_status):
275         bug.load_status(active_status, inactive_status)
276     def _set_active_status(self, old_active_status, new_active_status):
277         self._setup_status(new_active_status, self.inactive_status)
278         self._prop_save_settings(old_active_status, new_active_status)
279     @_versioned_property(name="active_status",
280                          doc="The allowed active bug states and their descriptions.",
281                          change_hook=_set_active_status)
282     def active_status(): return {}
283
284     def _set_inactive_status(self, old_inactive_status, new_inactive_status):
285         self._setup_status(self.active_status, new_inactive_status)
286         self._prop_save_settings(old_inactive_status, new_inactive_status)
287     @_versioned_property(name="inactive_status",
288                          doc="The allowed inactive bug states and their descriptions.",
289                          change_hook=_set_inactive_status)
290     def inactive_status(): return {}
291
292
293     def __init__(self, root=None, sink_to_existing_root=True,
294                  assert_new_BugDir=False, allow_rcs_init=False,
295                  manipulate_encodings=True, from_disk=False, rcs=None):
296         list.__init__(self)
297         settings_object.SavedSettingsObject.__init__(self)
298         self._manipulate_encodings = manipulate_encodings
299         if root == None:
300             root = os.getcwd()
301         if sink_to_existing_root == True:
302             self.root = self._find_root(root)
303         else:
304             if not os.path.exists(root):
305                 raise NoRootEntry(root)
306             self.root = root
307         # get a temporary rcs until we've loaded settings
308         self.sync_with_disk = False
309         self.rcs = self._guess_rcs()
310
311         if from_disk == True:
312             self.sync_with_disk = True
313             self.load()
314         else:
315             self.sync_with_disk = False
316             if assert_new_BugDir == True:
317                 if os.path.exists(self.get_path()):
318                     raise AlreadyInitialized, self.get_path()
319             if rcs == None:
320                 rcs = self._guess_rcs(allow_rcs_init)
321             self.rcs = rcs
322             self._setup_user_id(self.user_id)
323
324     def __del__(self):
325         self.cleanup()
326
327     def cleanup(self):
328         self.rcs.cleanup()
329
330     # methods for getting the BugDir situated in the filesystem
331
332     def _find_root(self, path):
333         """
334         Search for an existing bug database dir and it's ancestors and
335         return a BugDir rooted there.  Only called by __init__, and
336         then only if sink_to_existing_root == True.
337         """
338         if not os.path.exists(path):
339             raise NoRootEntry(path)
340         versionfile=utility.search_parent_directories(path,
341                                                       os.path.join(".be", "version"))
342         if versionfile != None:
343             beroot = os.path.dirname(versionfile)
344             root = os.path.dirname(beroot)
345             return root
346         else:
347             beroot = utility.search_parent_directories(path, ".be")
348             if beroot == None:
349                 raise NoBugDir(path)
350             return beroot
351
352     def _guess_rcs(self, allow_rcs_init=False):
353         """
354         Only called by __init__.
355         """
356         deepdir = self.get_path()
357         if not os.path.exists(deepdir):
358             deepdir = os.path.dirname(deepdir)
359         new_rcs = rcs.detect_rcs(deepdir)
360         install = False
361         if new_rcs.name == "None":
362             if allow_rcs_init == True:
363                 new_rcs = rcs.installed_rcs()
364                 new_rcs.init(self.root)
365         return new_rcs
366
367     # methods for saving/loading/accessing settings and properties.
368
369     def get_path(self, *args):
370         """
371         Return a path relative to .root.
372         """
373         my_dir = os.path.join(self.root, ".be")
374         if len(args) == 0:
375             return my_dir
376         assert args[0] in ["version", "settings", "bugs"], str(args)
377         return os.path.join(my_dir, *args)
378
379     def _get_settings(self, settings_path, for_duplicate_bugdir=False):
380         allow_no_rcs = not self.rcs.path_in_root(settings_path)
381         if allow_no_rcs == True:
382             assert for_duplicate_bugdir == True
383         if self.sync_with_disk == False and for_duplicate_bugdir == False:
384             # duplicates can ignore this bugdir's .sync_with_disk status
385             raise DiskAccessRequired("_get settings")
386         try:
387             settings = mapfile.map_load(self.rcs, settings_path, allow_no_rcs)
388         except rcs.NoSuchFile:
389             settings = {"rcs_name": "None"}
390         return settings
391
392     def _save_settings(self, settings_path, settings,
393                        for_duplicate_bugdir=False):
394         allow_no_rcs = not self.rcs.path_in_root(settings_path)
395         if allow_no_rcs == True:
396             assert for_duplicate_bugdir == True
397         if self.sync_with_disk == False and for_duplicate_bugdir == False:
398             # duplicates can ignore this bugdir's .sync_with_disk status
399             raise DiskAccessRequired("_save settings")
400         self.rcs.mkdir(self.get_path(), allow_no_rcs)
401         mapfile.map_save(self.rcs, settings_path, settings, allow_no_rcs)
402
403     def load_settings(self):
404         self.settings = self._get_settings(self.get_path("settings"))
405         self._setup_saved_settings()
406         self._setup_user_id(self.user_id)
407         self._setup_encoding(self.encoding)
408         self._setup_severities(self.severities)
409         self._setup_status(self.active_status, self.inactive_status)
410         self.rcs = rcs.rcs_by_name(self.rcs_name)
411         self._setup_user_id(self.user_id)
412
413     def save_settings(self):
414         settings = self._get_saved_settings()
415         self._save_settings(self.get_path("settings"), settings)
416
417     def get_version(self, path=None, use_none_rcs=False):
418         """
419         Requires disk access.
420         """
421         if self.sync_with_disk == False:
422             raise DiskAccessRequired("get version")
423         if use_none_rcs == True:
424             RCS = rcs.rcs_by_name("None")
425             RCS.root(self.root)
426             RCS.encoding = encoding.get_encoding()
427         else:
428             RCS = self.rcs
429
430         if path == None:
431             path = self.get_path("version")
432         tree_version = RCS.get_file_contents(path)
433         return tree_version
434
435     def set_version(self):
436         """
437         Requires disk access.
438         """
439         if self.sync_with_disk == False:
440             raise DiskAccessRequired("set version")
441         self.rcs.mkdir(self.get_path())
442         self.rcs.set_file_contents(self.get_path("version"),
443                                    TREE_VERSION_STRING)
444
445     # methods controlling disk access
446
447     def set_sync_with_disk(self, value):
448         """
449         Adjust .sync_with_disk for the BugDir and all it's children.
450         See the BugDir docstring for a description of the role of
451         .sync_with_disk.
452         """
453         self.sync_with_disk = value
454         for bug in self:
455             bug.set_sync_with_disk(value)
456
457     def load(self):
458         """
459         Reqires disk access
460         """
461         version = self.get_version(use_none_rcs=True)
462         if version != TREE_VERSION_STRING:
463             raise NotImplementedError, \
464                 "BugDir cannot handle version '%s' yet." % version
465         else:
466             if not os.path.exists(self.get_path()):
467                 raise NoBugDir(self.get_path())
468             self.load_settings()
469
470     def load_all_bugs(self):
471         """
472         Requires disk access.
473         Warning: this could take a while.
474         """
475         if self.sync_with_disk == False:
476             raise DiskAccessRequired("load all bugs")
477         self._clear_bugs()
478         for uuid in self.list_uuids():
479             self._load_bug(uuid)
480
481     def save(self):
482         """
483         Note that this command writes to disk _regardless_ of the
484         status of .sync_with_disk.
485
486         Save any loaded contents to disk.  Because of lazy loading of
487         bugs and comments, this is actually not too inefficient.
488
489         However, if .sync_with_disk = True, then any changes are
490         automatically written to disk as soon as they happen, so
491         calling this method will just waste time (unless something
492         else has been messing with your on-disk files).
493
494         Requires disk access.
495         """
496         sync_with_disk = self.sync_with_disk
497         if sync_with_disk == False:
498             self.set_sync_with_disk(True)
499         self.set_version()
500         self.save_settings()
501         for bug in self:
502             bug.save()
503         if sync_with_disk == False:
504             self.set_sync_with_disk(sync_with_disk)
505
506     # methods for managing duplicate BugDirs
507
508     def duplicate_bugdir(self, revision):
509         duplicate_path = self.rcs.duplicate_repo(revision)
510
511         # setup revision RCS as None, since the duplicate may not be
512         # initialized for versioning
513         duplicate_settings_path = os.path.join(duplicate_path,
514                                                ".be", "settings")
515         duplicate_settings = self._get_settings(duplicate_settings_path,
516                                                 for_duplicate_bugdir=True)
517         if "rcs_name" in duplicate_settings:
518             duplicate_settings["rcs_name"] = "None"
519             duplicate_settings["user_id"] = self.user_id
520         if "disabled" in bug.status_values:
521             # Hack to support old versions of BE bugs
522             duplicate_settings["inactive_status"] = self.inactive_status
523         self._save_settings(duplicate_settings_path, duplicate_settings,
524                             for_duplicate_bugdir=True)
525
526         return BugDir(duplicate_path, from_disk=True, manipulate_encodings=self._manipulate_encodings)
527
528     def remove_duplicate_bugdir(self):
529         self.rcs.remove_duplicate_repo()
530
531     # methods for managing bugs
532
533     def list_uuids(self):
534         uuids = []
535         if self.sync_with_disk == True and os.path.exists(self.get_path()):
536             # list the uuids on disk
537             for uuid in os.listdir(self.get_path("bugs")):
538                 if not (uuid.startswith('.')):
539                     uuids.append(uuid)
540                     yield uuid
541         # and the ones that are still just in memory
542         for bug in self:
543             if bug.uuid not in uuids:
544                 uuids.append(bug.uuid)
545                 yield bug.uuid
546
547     def _clear_bugs(self):
548         while len(self) > 0:
549             self.pop()
550         self._bug_map_gen()
551
552     def _load_bug(self, uuid):
553         if self.sync_with_disk == False:
554             raise DiskAccessRequired("_load bug")
555         bg = bug.Bug(bugdir=self, uuid=uuid, from_disk=True)
556         self.append(bg)
557         self._bug_map_gen()
558         return bg
559
560     def new_bug(self, uuid=None, summary=None):
561         bg = bug.Bug(bugdir=self, uuid=uuid, summary=summary)
562         bg.set_sync_with_disk(self.sync_with_disk)
563         if bg.sync_with_disk == True:
564             bg.save()
565         self.append(bg)
566         self._bug_map_gen()
567         return bg
568
569     def remove_bug(self, bug):
570         self.remove(bug)
571         if bug.sync_with_disk == True:
572             bug.remove()
573
574     def bug_shortname(self, bug):
575         """
576         Generate short names from uuids.  Picks the minimum number of
577         characters (>=3) from the beginning of the uuid such that the
578         short names are unique.
579
580         Obviously, as the number of bugs in the database grows, these
581         short names will cease to be unique.  The complete uuid should be
582         used for long term reference.
583         """
584         chars = 3
585         for uuid in self._bug_map.keys():
586             if bug.uuid == uuid:
587                 continue
588             while (bug.uuid[:chars] == uuid[:chars]):
589                 chars+=1
590         return bug.uuid[:chars]
591
592     def bug_from_shortname(self, shortname):
593         """
594         >>> bd = SimpleBugDir(sync_with_disk=False)
595         >>> bug_a = bd.bug_from_shortname('a')
596         >>> print type(bug_a)
597         <class 'libbe.bug.Bug'>
598         >>> print bug_a
599         a:om: Bug A
600         >>> bd.cleanup()
601         """
602         matches = []
603         self._bug_map_gen()
604         for uuid in self._bug_map.keys():
605             if uuid.startswith(shortname):
606                 matches.append(uuid)
607         if len(matches) > 1:
608             raise MultipleBugMatches(shortname, matches)
609         if len(matches) == 1:
610             return self.bug_from_uuid(matches[0])
611         raise NoBugMatches(shortname)
612
613     def bug_from_uuid(self, uuid):
614         if not self.has_bug(uuid):
615             raise KeyError("No bug matches %s\n  bug map: %s\n  root: %s" \
616                                % (uuid, self._bug_map, self.root))
617         if self._bug_map[uuid] == None:
618             self._load_bug(uuid)
619         return self._bug_map[uuid]
620
621     def has_bug(self, bug_uuid):
622         if bug_uuid not in self._bug_map:
623             self._bug_map_gen()
624             if bug_uuid not in self._bug_map:
625                 return False
626         return True
627
628
629 class SimpleBugDir (BugDir):
630     """
631     For testing.  Set sync_with_disk==False for a memory-only bugdir.
632     >>> bugdir = SimpleBugDir()
633     >>> uuids = list(bugdir.list_uuids())
634     >>> uuids.sort()
635     >>> print uuids
636     ['a', 'b']
637     >>> bugdir.cleanup()
638     """
639     def __init__(self, sync_with_disk=True):
640         if sync_with_disk == True:
641             dir = utility.Dir()
642             assert os.path.exists(dir.path)
643             root = dir.path
644             assert_new_BugDir = True
645             rcs_init = True
646         else:
647             root = "/"
648             assert_new_BugDir = False
649             rcs_init = False
650         BugDir.__init__(self, root, sink_to_existing_root=False,
651                     assert_new_BugDir=assert_new_BugDir,
652                     allow_rcs_init=rcs_init,
653                     manipulate_encodings=False)
654         if sync_with_disk == True: # postpone cleanup since dir.__del__() removes dir.
655             self._dir_ref = dir
656         bug_a = self.new_bug("a", summary="Bug A")
657         bug_a.creator = "John Doe <jdoe@example.com>"
658         bug_a.time = 0
659         bug_b = self.new_bug("b", summary="Bug B")
660         bug_b.creator = "Jane Doe <jdoe@example.com>"
661         bug_b.time = 0
662         bug_b.status = "closed"
663         if sync_with_disk == True:
664             self.save()
665             self.set_sync_with_disk(True)
666     def cleanup(self):
667         if hasattr(self, "_dir_ref"):
668             self._dir_ref.cleanup()
669         BugDir.cleanup(self)
670
671 class BugDirTestCase(unittest.TestCase):
672     def setUp(self):
673         self.dir = utility.Dir()
674         self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False,
675                              allow_rcs_init=True)
676         self.rcs = self.bugdir.rcs
677     def tearDown(self):
678         self.bugdir.cleanup()
679         self.dir.cleanup()
680     def fullPath(self, path):
681         return os.path.join(self.dir.path, path)
682     def assertPathExists(self, path):
683         fullpath = self.fullPath(path)
684         self.failUnless(os.path.exists(fullpath)==True,
685                         "path %s does not exist" % fullpath)
686         self.assertRaises(AlreadyInitialized, BugDir,
687                           self.dir.path, assertNewBugDir=True)
688     def versionTest(self):
689         if self.rcs.versioned == False:
690             return
691         original = self.bugdir.rcs.commit("Began versioning")
692         bugA = self.bugdir.bug_from_uuid("a")
693         bugA.status = "fixed"
694         self.bugdir.save()
695         new = self.rcs.commit("Fixed bug a")
696         dupdir = self.bugdir.duplicate_bugdir(original)
697         self.failUnless(dupdir.root != self.bugdir.root,
698                         "%s, %s" % (dupdir.root, self.bugdir.root))
699         bugAorig = dupdir.bug_from_uuid("a")
700         self.failUnless(bugA != bugAorig,
701                         "\n%s\n%s" % (bugA.string(), bugAorig.string()))
702         bugAorig.status = "fixed"
703         self.failUnless(bug.cmp_status(bugA, bugAorig)==0,
704                         "%s, %s" % (bugA.status, bugAorig.status))
705         self.failUnless(bug.cmp_severity(bugA, bugAorig)==0,
706                         "%s, %s" % (bugA.severity, bugAorig.severity))
707         self.failUnless(bug.cmp_assigned(bugA, bugAorig)==0,
708                         "%s, %s" % (bugA.assigned, bugAorig.assigned))
709         self.failUnless(bug.cmp_time(bugA, bugAorig)==0,
710                         "%s, %s" % (bugA.time, bugAorig.time))
711         self.failUnless(bug.cmp_creator(bugA, bugAorig)==0,
712                         "%s, %s" % (bugA.creator, bugAorig.creator))
713         self.failUnless(bugA == bugAorig,
714                         "\n%s\n%s" % (bugA.string(), bugAorig.string()))
715         self.bugdir.remove_duplicate_bugdir()
716         self.failUnless(os.path.exists(dupdir.root)==False, str(dupdir.root))
717     def testRun(self):
718         self.bugdir.new_bug(uuid="a", summary="Ant")
719         self.bugdir.new_bug(uuid="b", summary="Cockroach")
720         self.bugdir.new_bug(uuid="c", summary="Praying mantis")
721         length = len(self.bugdir)
722         self.failUnless(length == 3, "%d != 3 bugs" % length)
723         uuids = list(self.bugdir.list_uuids())
724         self.failUnless(len(uuids) == 3, "%d != 3 uuids" % len(uuids))
725         self.failUnless(uuids == ["a","b","c"], str(uuids))
726         bugA = self.bugdir.bug_from_uuid("a")
727         bugAprime = self.bugdir.bug_from_shortname("a")
728         self.failUnless(bugA == bugAprime, "%s != %s" % (bugA, bugAprime))
729         self.bugdir.save()
730         self.versionTest()
731     def testComments(self, sync_with_disk=False):
732         if sync_with_disk == True:
733             self.bugdir.set_sync_with_disk(True)
734         self.bugdir.new_bug(uuid="a", summary="Ant")
735         bug = self.bugdir.bug_from_uuid("a")
736         comm = bug.comment_root
737         rep = comm.new_reply("Ants are small.")
738         rep.new_reply("And they have six legs.")
739         if sync_with_disk == False:
740             self.bugdir.save()
741             self.bugdir.set_sync_with_disk(True)
742         self.bugdir._clear_bugs()
743         bug = self.bugdir.bug_from_uuid("a")
744         bug.load_comments()
745         if sync_with_disk == False:
746             self.bugdir.set_sync_with_disk(False)
747         self.failUnless(len(bug.comment_root)==1, len(bug.comment_root))
748         for index,comment in enumerate(bug.comments()):
749             if index == 0:
750                 repLoaded = comment
751                 self.failUnless(repLoaded.uuid == rep.uuid, repLoaded.uuid)
752                 self.failUnless(comment.sync_with_disk == sync_with_disk,
753                                 comment.sync_with_disk)
754                 self.failUnless(comment.content_type == "text/plain",
755                                 comment.content_type)
756                 self.failUnless(repLoaded.settings["Content-type"]=="text/plain",
757                                 repLoaded.settings)
758                 self.failUnless(repLoaded.body == "Ants are small.",
759                                 repLoaded.body)
760             elif index == 1:
761                 self.failUnless(comment.in_reply_to == repLoaded.uuid,
762                                 repLoaded.uuid)
763                 self.failUnless(comment.body == "And they have six legs.",
764                                 comment.body)
765             else:
766                 self.failIf(True, "Invalid comment: %d\n%s" % (index, comment))
767     def testSyncedComments(self):
768         self.testComments(sync_with_disk=True)
769
770 class SimpleBugDirTestCase (unittest.TestCase):
771     def setUp(self):
772         # create a pre-existing bugdir in a temporary directory
773         self.dir = utility.Dir()
774         self.original_working_dir = os.getcwd()
775         os.chdir(self.dir.path)
776         self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False,
777                              allow_rcs_init=True)
778         self.bugdir.new_bug("preexisting", summary="Hopefully not imported")
779         self.bugdir.save()
780     def tearDown(self):
781         os.chdir(self.original_working_dir)
782         self.bugdir.cleanup()
783         self.dir.cleanup()
784     def testOnDiskCleanLoad(self):
785         """SimpleBugDir(sync_with_disk==True) should not import preexisting bugs."""
786         bugdir = SimpleBugDir(sync_with_disk=True)
787         self.failUnless(bugdir.sync_with_disk==True, bugdir.sync_with_disk)
788         uuids = sorted([bug.uuid for bug in bugdir])
789         self.failUnless(uuids == ['a', 'b'], uuids)
790         bugdir._clear_bugs()
791         uuids = sorted([bug.uuid for bug in bugdir])
792         self.failUnless(uuids == [], uuids)
793         bugdir.load_all_bugs()
794         uuids = sorted([bug.uuid for bug in bugdir])
795         self.failUnless(uuids == ['a', 'b'], uuids)
796         bugdir.cleanup()
797     def testInMemoryCleanLoad(self):
798         """SimpleBugDir(sync_with_disk==False) should not import preexisting bugs."""
799         bugdir = SimpleBugDir(sync_with_disk=False)
800         self.failUnless(bugdir.sync_with_disk==False, bugdir.sync_with_disk)
801         uuids = sorted([bug.uuid for bug in bugdir])
802         self.failUnless(uuids == ['a', 'b'], uuids)
803         self.failUnlessRaises(DiskAccessRequired, bugdir.load_all_bugs)
804         uuids = sorted([bug.uuid for bug in bugdir])
805         self.failUnless(uuids == ['a', 'b'], uuids)
806         bugdir._clear_bugs()
807         uuids = sorted([bug.uuid for bug in bugdir])
808         self.failUnless(uuids == [], uuids)
809         bugdir.cleanup()
810
811 unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
812 suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])