Moved bug.new_bug code into bugdir.BugDir.new_bug.
[be.git] / libbe / bugdir.py
1 # Copyright (C) 2005 Aaron Bentley and Panometrics, Inc.
2 # <abentley@panoramicfeedback.com>
3 #
4 #    This program is free software; you can redistribute it and/or modify
5 #    it under the terms of the GNU General Public License as published by
6 #    the Free Software Foundation; either version 2 of the License, or
7 #    (at your option) any later version.
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program; if not, write to the Free Software
16 #    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17 import os
18 import os.path
19 import cmdutil
20 import errno
21 import unittest
22 import doctest
23 import names
24 import mapfile
25 import time
26 import utility
27 from rcs import rcs_by_name, installed_rcs
28 from bug import Bug
29
30 class NoBugDir(Exception):
31     def __init__(self, path):
32         msg = "The directory \"%s\" has no bug directory." % path
33         Exception.__init__(self, msg)
34         self.path = path
35
36  
37 def iter_parent_dirs(cur_dir):
38     cur_dir = os.path.realpath(cur_dir)
39     old_dir = None
40     while True:
41         yield cur_dir
42         old_dir = cur_dir
43         cur_dir = os.path.normpath(os.path.join(cur_dir, '..'))
44         if old_dir == cur_dir:
45             break;
46
47
48 def tree_root(dir, old_version=False):
49     for rootdir in iter_parent_dirs(dir):
50         versionfile=os.path.join(rootdir, ".be", "version")
51         if os.path.exists(versionfile):
52             if not old_version:
53                 test_version(versionfile)
54             return BugDir(os.path.join(rootdir, ".be"))
55         elif not os.path.exists(rootdir):
56             raise NoRootEntry(rootdir)
57         old_rootdir = rootdir
58         rootdir=os.path.join('..', rootdir)
59     
60     raise NoBugDir(dir)
61
62 class BadTreeVersion(Exception):
63     def __init__(self, version):
64         Exception.__init__(self, "Unsupported tree version: %s" % version)
65         self.version = version
66
67 def test_version(path):
68     tree_version = file(path, "rb").read()
69     if tree_version != TREE_VERSION_STRING:
70         raise BadTreeVersion(tree_version)
71
72 def set_version(path, rcs):
73     rcs.set_file_contents(os.path.join(path, "version"), TREE_VERSION_STRING)
74     
75
76 TREE_VERSION_STRING = "Bugs Everywhere Tree 1 0\n"
77
78 class NoRootEntry(Exception):
79     def __init__(self, path):
80         self.path = path
81         Exception.__init__(self, "Specified root does not exist: %s" % path)
82
83 class AlreadyInitialized(Exception):
84     def __init__(self, path):
85         self.path = path
86         Exception.__init__(self, 
87                            "Specified root is already initialized: %s" % path)
88
89 def bugdir_root(versioning_root):
90     return os.path.join(versioning_root, ".be")
91
92 def create_bug_dir(path, rcs):
93     """
94     >>> import tests
95     >>> rcs = rcs_by_name("None")
96     >>> create_bug_dir('/highly-unlikely-to-exist', rcs)
97     Traceback (most recent call last):
98     NoRootEntry: Specified root does not exist: /highly-unlikely-to-exist
99     """
100     root = os.path.join(path, ".be")
101     try:
102         rcs.mkdir(root)
103     except OSError, e:
104         if e.errno == errno.ENOENT:
105             raise NoRootEntry(path)
106         elif e.errno == errno.EEXIST:
107             raise AlreadyInitialized(path)
108         else:
109             raise
110     rcs.mkdir(os.path.join(root, "bugs"))
111     set_version(root, rcs)
112     mapfile.map_save(rcs,
113                      os.path.join(root, "settings"), {"rcs_name": rcs.name})
114     return BugDir(bugdir_root(path))
115
116
117 def setting_property(name, valid=None):
118     def getter(self):
119         value = self.settings.get(name) 
120         if valid is not None:
121             if value not in valid:
122                 raise InvalidValue(name, value)
123         return value
124
125     def setter(self, value):
126         if valid is not None:
127             if value not in valid and value is not None:
128                 raise InvalidValue(name, value)
129         if value is None:
130             del self.settings[name]
131         else:
132             self.settings[name] = value
133         self.save_settings()
134     return property(getter, setter)
135
136
137 class BugDir:
138     def __init__(self, dir):
139         self.dir = dir
140         self.bugs_path = os.path.join(self.dir, "bugs")
141         try:
142             self.settings = mapfile.map_load(os.path.join(self.dir,"settings"))
143         except mapfile.NoSuchFile:
144             self.settings = {"rcs_name": "None"}
145
146     rcs_name = setting_property("rcs_name",
147                                 ("None", "bzr", "git", "Arch", "hg"))
148     _rcs = None
149
150     target = setting_property("target")
151     
152     def save_settings(self):
153         mapfile.map_save(self.rcs,
154                          os.path.join(self.dir, "settings"), self.settings)
155
156     def _get_rcs(self):
157         if self._rcs is not None:
158             if self.rcs_name == self._rcs.name:
159                 return self._rcs
160         self._rcs = rcs_by_name(self.rcs_name)
161         self._rcs.root(self.dir)
162         return self._rcs
163
164     rcs = property(_get_rcs)
165
166     def duplicate_bugdir(self, revision):
167         return BugDir(bugdir_root(self.rcs.duplicate_repo(revision)))
168
169     def remove_duplicate_bugdir(self):
170         self.rcs.remove_duplicate_repo()
171
172     def list(self):
173         for uuid in self.list_uuids():
174             yield self.get_bug(uuid)
175
176     def bug_map(self):
177         bugs = {}
178         for bug in self.list():
179             bugs[bug.uuid] = bug
180         return bugs
181
182     def get_bug(self, uuid):
183         return Bug(self.bugs_path, uuid, self.rcs, self)
184
185     def list_uuids(self):
186         for uuid in os.listdir(self.bugs_path):
187             if (uuid.startswith('.')):
188                 continue
189             yield uuid
190
191     def new_bug(self, uuid=None):
192         if uuid is None:
193             uuid = names.uuid()
194         path = os.path.join(self.bugs_path, uuid)
195         self.rcs.mkdir(path)
196         bug = Bug(self.bugs_path, None, self.rcs, self)
197         bug.uuid = uuid
198         bug.creator = self.rcs.get_user_id()
199         bug.severity = "minor"
200         bug.status = "open"
201         bug.time = time.time()
202         return bug
203
204 class InvalidValue(ValueError):
205     def __init__(self, name, value):
206         msg = "Cannot assign value %s to %s" % (value, name)
207         Exception.__init__(self, msg)
208         self.name = name
209         self.value = value
210
211 def simple_bug_dir():
212     """
213     For testing
214     >>> bugdir = simple_bug_dir()
215     >>> ls = list(bugdir.list_uuids())
216     >>> ls.sort()
217     >>> print ls
218     ['a', 'b']
219     """
220     dir = utility.Dir()
221     rcs = installed_rcs()
222     rcs.init(dir.path)
223     assert os.path.exists(dir.path)
224     bugdir = create_bug_dir(dir.path, rcs)
225     bugdir._dir_ref = dir # postpone cleanup since dir.__del__() removes dir.
226     bug_a = bugdir.new_bug("a")
227     bug_a.summary = "Bug A"
228     bug_a.save()
229     bug_b = bugdir.new_bug("b")
230     bug_b.status = "closed"
231     bug_b.summary = "Bug B"
232     bug_b.save()
233     return bugdir
234
235
236 class BugDirTestCase(unittest.TestCase):
237     def __init__(self, *args, **kwargs):
238         unittest.TestCase.__init__(self, *args, **kwargs)
239     def setUp(self):
240         self.dir = utility.Dir()
241         self.rcs = installed_rcs()
242         self.rcs.init(self.dir.path)
243         self.bugdir = create_bug_dir(self.dir.path, self.rcs)
244     def tearDown(self):
245         del(self.rcs)
246         del(self.dir)
247     def fullPath(self, path):
248         return os.path.join(self.dir.path, path)
249     def assertPathExists(self, path):
250         fullpath = self.fullPath(path)
251         self.failUnless(os.path.exists(fullpath)==True,
252                         "path %s does not exist" % fullpath)
253     def testBugDirDuplicate(self):
254         self.assertRaises(AlreadyInitialized, create_bug_dir,
255                           self.dir.path, self.rcs)
256
257 unitsuite = unittest.TestLoader().loadTestsFromTestCase(BugDirTestCase)
258 suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])