import doctest
from beuuid import uuid_gen
+from properties import Property, doc_property, local_property, \
+ defaulting_property, checked_property, cached_property, \
+ primed_property, change_hook_property, settings_property
import mapfile
import comment
import utility
status_index[status_values[i]] = i
-def checked_property(name, valid):
+# Define an invalid value for our properties, distinct from None,
+# which shows that a property has been initialized but has no value.
+EMPTY = -1
+
+
+class Bug(object):
"""
- Provide access to an attribute name, testing for valid values.
+ >>> b = Bug()
+ >>> print b.status
+ open
+ >>> print b.severity
+ minor
+
+ There are two formats for time, int and string. Setting either
+ one will adjust the other appropriately. The string form is the
+ one stored in the bug's settings file on disk.
+ >>> print type(b.time)
+ <type 'int'>
+ >>> print type(b.time_string)
+ <type 'str'>
+ >>> b.time = 0
+ >>> print b.time_string
+ Thu, 01 Jan 1970 00:00:00 +0000
+ >>> b.time_string="Thu, 01 Jan 1970 00:01:00 +0000"
+ >>> b.time
+ 60
+ >>> print b.settings["time"]
+ Thu, 01 Jan 1970 00:01:00 +0000
"""
- def getter(self):
- value = getattr(self, "_"+name)
- if value not in valid:
- raise InvalidValue(name, value)
- return value
+ def _save_settings(self, old, new):
+ if self.sync_with_disk==True:
+ self.save_settings()
+ def _load_settings(self):
+ if self.sync_with_disk==True and self._settings_loaded==False:
+ self.load_settings()
+ else:
+ for property in self.settings_properties:
+ if property not in self.settings:
+ self.settings[property] = EMPTY
+
+ settings_properties = []
+ required_saved_properties = ['status','severity'] # to protect against future changes in default values
+
+ def _versioned_property(name, doc, default=None, save=_save_settings, load=_load_settings, setprops=settings_properties, allowed=None):
+ "Combine the common decorators in a single function"
+ setprops.append(name)
+ def decorator(funcs):
+ if allowed != None:
+ checked = checked_property(allowed=allowed)
+ defaulting = defaulting_property(default=default, null=EMPTY)
+ change_hook = change_hook_property(hook=save)
+ primed = primed_property(primer=load)
+ settings = settings_property(name=name)
+ docp = doc_property(doc=doc)
+ deco = defaulting(change_hook(primed(settings(docp(funcs)))))
+ if allowed != None:
+ deco = checked(deco)
+ return Property(deco)
+ return decorator
+
+ @_versioned_property(name="severity",
+ doc="A measure of the bug's importance",
+ default="minor",
+ allowed=severity_values)
+ def severity(): return {}
+
+ @_versioned_property(name="status",
+ doc="The bug's current status",
+ default="open",
+ allowed=status_values)
+ def status(): return {}
+
+ @property
+ def active(self):
+ return self.status in active_status_values
- def setter(self, value):
- if value not in valid:
- raise InvalidValue(name, value)
- return setattr(self, "_"+name, value)
- return property(getter, setter)
+ @_versioned_property(name="target",
+ doc="The deadline for fixing this bug")
+ def target(): return {}
+ @_versioned_property(name="creator",
+ doc="The user who entered the bug into the system")
+ def creator(): return {}
-class Bug(object):
- severity = checked_property("severity", severity_values)
- status = checked_property("status", status_values)
+ @_versioned_property(name="reporter",
+ doc="The user who reported the bug")
+ def reporter(): return {}
- def _get_active(self):
- return self.status in active_status_values
+ @_versioned_property(name="assigned",
+ doc="The developer in charge of the bug")
+ def assigned(): return {}
+
+ @_versioned_property(name="time",
+ doc="An RFC 2822 timestamp for bug creation")
+ def time_string(): return {}
- active = property(_get_active)
+ def _get_time(self):
+ if self.time_string == None:
+ return None
+ return utility.str_to_time(self.time_string)
+ def _set_time(self, value):
+ self.time_string = utility.time_to_str(value)
+ time = property(fget=_get_time,
+ fset=_set_time,
+ doc="An integere version of .time_string")
+
+ @_versioned_property(name="summary",
+ doc="A one-line bug description")
+ def summary(): return {}
def _get_comment_root(self):
- if self._comment_root == None:
- if self._comments_loaded == True:
- self._comment_root = comment.loadComments(self)
- else:
- self._comment_root = comment.Comment(self,
- uuid=comment.INVALID_UUID)
- return self._comment_root
+ if self.sync_with_disk:
+ return comment.loadComments(self)
+ else:
+ return comment.Comment(self, uuid=comment.INVALID_UUID)
+
+ @Property
+ @cached_property(generator=_get_comment_root)
+ @local_property("comment_root")
+ @doc_property(doc="The trunk of the comment tree")
+ def comment_root(): return {}
- def _set_comment_root(self, comment_root):
- self._comment_root = comment_root
+ def _get_rcs(self):
+ if hasattr(self.bugdir, "rcs"):
+ return self.bugdir.rcs
- _comment_root = None
- comment_root = property(_get_comment_root, _set_comment_root,
- doc="The trunk of the comment tree")
+ @Property
+ @cached_property(generator=_get_rcs)
+ @local_property("rcs")
+ @doc_property(doc="A revision control system instance.")
+ def rcs(): return {}
def __init__(self, bugdir=None, uuid=None, from_disk=False,
load_comments=False, summary=None):
self.bugdir = bugdir
- if bugdir != None:
- self.rcs = bugdir.rcs
- else:
- self.rcs = None
+ self.uuid = uuid
+ self._settings_loaded = False
+ self.settings = {}
if from_disk == True:
- self._comments_loaded = False
- self.uuid = uuid
- self.load(load_comments=load_comments)
+ self.sync_with_disk = True
+ #self.load(load_comments=load_comments)
else:
- # Note: defaults should match those in Bug.load()
- self._comments_loaded = True
- if uuid != None:
- self.uuid = uuid
- else:
+ self.sync_with_disk = False
+ if uuid == None:
self.uuid = uuid_gen()
- self.summary = summary
+ self.time = int(time.time()) # only save to second precision
if self.rcs != None:
self.creator = self.rcs.get_user_id()
- else:
- self.creator = None
- self.target = None
- self.status = "open"
- self.severity = "minor"
- self.assigned = None
- self.time = int(time.time()) # only save to second precision
+ self.summary = summary
def __repr__(self):
return "Bug(uuid=%r)" % self.uuid
else:
shortname = self.bugdir.bug_shortname(self)
if shortlist == False:
- if self.time == None:
- timestring = ""
+ if self.time_string == "":
+ timestring = self.time_string
else:
htime = utility.handy_time(self.time)
- ftime = utility.time_to_str(self.time)
- timestring = "%s (%s)" % (htime, ftime)
+ timestring = "%s (%s)" % (htime, self.time_string)
info = [("ID", self.uuid),
("Short name", shortname),
("Severity", self.severity),
("Status", self.status),
- ("Assigned", self.assigned),
- ("Target", self.target),
- ("Creator", self.creator),
+ ("Assigned", self.assigned or ""),
+ ("Target", self.target or ""),
+ ("Creator", self.creator or ""),
("Created", timestring)]
- newinfo = []
- for k,v in info:
- if v == None:
- newinfo.append((k,""))
- else:
- newinfo.append((k,v))
- info = newinfo
longest_key_len = max([len(k) for k,v in info])
infolines = [" %*s : %s\n" %(longest_key_len,k,v) for k,v in info]
bugout = "".join(infolines) + "%s" % self.summary.rstrip('\n')
bugout = "%s:%s: %s" % (shortname,chars,self.summary.rstrip('\n'))
if show_comments == True:
- if self._comments_loaded == False:
- self.load_comments()
# take advantage of the string_thread(auto_name_map=True)
# SIDE-EFFECT of sorting by comment time.
comout = self.comment_root.string_thread(flatten=False,
assert name in ["values", "comments"]
return os.path.join(my_dir, name)
- def load(self, load_comments=False):
- map = mapfile.map_load(self.rcs, self.get_path("values"))
- self.summary = map.get("summary")
- self.creator = map.get("creator")
- self.target = map.get("target")
- self.status = map.get("status", "open")
- self.severity = map.get("severity", "minor")
- self.assigned = map.get("assigned")
- self.time = map.get("time")
- if self.time is not None:
- self.time = utility.str_to_time(self.time)
-
- if load_comments == True:
- self.load_comments()
+ def load_settings(self):
+ self.settings = mapfile.map_load(self.rcs, self.get_path("values"))
+ for property in self.settings_properties:
+ if property not in self.settings:
+ self.settings[property] = EMPTY
+ elif self.settings[property] == None:
+ self.settings[property] = EMPTY
+ self._settings_loaded = True
def load_comments(self):
- # clear _comment_root, so _get_comment_root returns a fresh version
- self._comment_root = None
- self._comments_loaded = True
-
- def comments(self):
- if self._comments_loaded == False:
- self.load_comments()
- for comment in self.comment_root.traverse():
- yield comment
-
- def _add_attr(self, map, name):
- value = getattr(self, name)
- if value is not None:
- map[name] = value
-
- def save(self):
+ # Clear _comment_root, so _get_comment_root returns a fresh
+ # version. Turn of syncing temporarily so we don't write our
+ # blank comment tree to disk.
+ self.sync_with_disk = False
+ self._comment_root = None
+ self.sync_with_disk = True
+
+ def save_settings(self):
assert self.summary != None, "Can't save blank bug"
map = {}
- self._add_attr(map, "assigned")
- self._add_attr(map, "summary")
- self._add_attr(map, "creator")
- self._add_attr(map, "target")
- self._add_attr(map, "status")
- self._add_attr(map, "severity")
- if self.time is not None:
- map["time"] = utility.time_to_str(self.time)
+ for k,v in self.settings.items():
+ if (v != None and v != EMPTY):
+ map[k] = v
+ for k in self.required_saved_properties:
+ map[k] = getattr(self, k)
self.rcs.mkdir(self.get_path())
path = self.get_path("values")
mapfile.map_save(self.rcs, path, map)
+
+ def save(self):
+ self.save_settings()
if len(self.comment_root) > 0:
self.rcs.mkdir(self.get_path("comments"))
path = self.get_path()
self.rcs.recursive_remove(path)
+ def comments(self):
+ for comment in self.comment_root.traverse():
+ yield comment
+
def new_comment(self, body=None):
comm = self.comment_root.new_reply(body=body)
return comm
for id, comment in self.comment_root.comment_shortnames(shortname):
yield (id, comment)
-# the general rule for bug sorting is that "more important" bugs are
+
+# The general rule for bug sorting is that "more important" bugs are
# less than "less important" bugs. This way sorting a list of bugs
# will put the most important bugs first in the list. When relative
# importance is unclear, the sorting follows some arbitrary convention
--- /dev/null
+# Bugs Everywhere - a distributed bugtracker
+# Copyright (C) 2008 W. Trevor King <wking@drexel.edu>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+This module provides a series of useful decorators for defining
+various types of properties. For example usage, consider the
+unittests at the end of the module.
+
+See
+ http://www.python.org/dev/peps/pep-0318/
+and
+ http://www.phyast.pitt.edu/~micheles/python/documentation.html
+for more information on decorators.
+"""
+
+import unittest
+
+class ValueCheckError (ValueError):
+ def __init__(self, name, value, allowed):
+ msg = "%s not in %s for %s" % (value, allowed, name)
+ ValueError.__init__(self, msg)
+ self.name = name
+ self.value = value
+ self.allowed = allowed
+
+def Property(funcs):
+ """
+ End a chain of property decorators, returning a property.
+ """
+ args = {}
+ args["fget"] = funcs.get("fget", None)
+ args["fset"] = funcs.get("fset", None)
+ args["fdel"] = funcs.get("fdel", None)
+ args["doc"] = funcs.get("doc", None)
+
+ #print "Creating a property with"
+ #for key, val in args.items(): print key, value
+ return property(**args)
+
+def doc_property(doc=None):
+ """
+ Add a docstring to a chain of property decorators.
+ """
+ def decorator(funcs=None):
+ """
+ Takes either a dict of funcs {"fget":fnX, "fset":fnY, ...}
+ or a function fn() returning such a dict.
+ """
+ if hasattr(funcs, "__call__"):
+ funcs = funcs() # convert from function-arg to dict
+ funcs["doc"] = doc
+ return funcs
+ return decorator
+
+def local_property(name):
+ """
+ Define get/set access to per-parent-instance local storage. Uses
+ ._<name>_value to store the value for a particular owner instance.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget", None)
+ fset = funcs.get("fset", None)
+ def _fget(self):
+ if fget is not None:
+ fget(self)
+ value = getattr(self, "_%s_value" % name, None)
+ return value
+ def _fset(self, value):
+ setattr(self, "_%s_value" % name, value)
+ if fset is not None:
+ fset(self, value)
+ funcs["fget"] = _fget
+ funcs["fset"] = _fset
+ funcs["name"] = name
+ return funcs
+ return decorator
+
+def settings_property(name):
+ """
+ Similar to local_property, except where local_property stores the
+ value in instance._<name>_value, settings_property stores the
+ value in instance.settings[name].
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget", None)
+ fset = funcs.get("fset", None)
+ def _fget(self):
+ if fget is not None:
+ fget(self)
+ value = self.settings.get(name, None)
+ return value
+ def _fset(self, value):
+ self.settings[name] = value
+ if fset is not None:
+ fset(self, value)
+ funcs["fget"] = _fget
+ funcs["fset"] = _fset
+ funcs["name"] = name
+ return funcs
+ return decorator
+
+def defaulting_property(default=None, null=None):
+ """
+ Define a default value for get access to a property.
+ If the stored value is null, then default is returned.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ def _fget(self):
+ value = fget(self)
+ if value == null:
+ return default
+ return value
+ funcs["fget"] = _fget
+ return funcs
+ return decorator
+
+def checked_property(allowed=[]):
+ """
+ Define allowed values for get/set access to a property.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ fset = funcs.get("fset")
+ name = funcs.get("name", "<unknown>")
+ def _fget(self):
+ value = fget(self)
+ if value not in allowed:
+ raise ValueCheckError(name, value, allowed)
+ return value
+ def _fset(self, value):
+ if value not in allowed:
+ raise ValueCheckError(name, value, allowed)
+ fset(self, value)
+ funcs["fget"] = _fget
+ funcs["fset"] = _fset
+ return funcs
+ return decorator
+
+def cached_property(generator, initVal=None):
+ """
+ Allow caching of values generated by generator(instance), where
+ instance is the instance to which this property belongs. Uses
+ ._<name>_cache to store a cache flag for a particular owner
+ instance. When the cache flag is True (or missing), the normal
+ value is returned. Otherwise the generator is called (and it's
+ output stored) for every get. The cache flag is missing on
+ initialization. Particular instances may override by setting
+ their own flag.
+
+ If caching is True, but the stored value == initVal, the parameter
+ is considered 'uninitialized', and the generator is called anyway.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ fset = funcs.get("fset")
+ name = funcs.get("name", "<unknown>")
+ def _fget(self):
+ cache = getattr(self, "_%s_cache" % name, True)
+ if cache == True:
+ value = fget(self)
+ if cache == False or (cache == True and value == initVal):
+ value = generator(self)
+ fset(self, value)
+ return value
+ funcs["fget"] = _fget
+ return funcs
+ return decorator
+
+def primed_property(primer, initVal=None):
+ """
+ Just like a generator_property, except that instead of returning a
+ new value and running fset to cache it, the primer performs some
+ background manipulation (e.g. loads data into instance.settings)
+ such that a _second_ pass through fget succeeds.
+
+ The 'cache' flag becomes a 'prime' flag, with priming taking place
+ whenever ._<name>_prime is True, or is False or missing and
+ value == initVal.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ name = funcs.get("name", "<unknown>")
+ def _fget(self):
+ prime = getattr(self, "_%s_prime" % name, False)
+ if prime == False:
+ value = fget(self)
+ if prime == True or (prime == False and value == initVal):
+ primer(self)
+ value = fget(self)
+ return value
+ funcs["fget"] = _fget
+ return funcs
+ return decorator
+
+def change_hook_property(hook):
+ """
+ Call the function hook(instance, old_value, new_value) whenever a
+ value different from the current value is set (instance is a a
+ reference to the class instance to which this property belongs).
+ This is useful for saving changes to disk, etc.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ fset = funcs.get("fset")
+ name = funcs.get("name", "<unknown>")
+ def _fset(self, value):
+ old_value = fget(self)
+ if value != old_value:
+ hook(self, old_value, value)
+ fset(self, value)
+ funcs["fset"] = _fset
+ return funcs
+ return decorator
+
+
+class DecoratorTests(unittest.TestCase):
+ def testLocalDoc(self):
+ class Test(object):
+ @Property
+ @doc_property("A fancy property")
+ def x():
+ return {}
+ self.failUnless(Test.x.__doc__ == "A fancy property",
+ Test.x.__doc__)
+ def testLocalProperty(self):
+ class Test(object):
+ @Property
+ @local_property(name="LOCAL")
+ def x():
+ return {}
+ t = Test()
+ self.failUnless(t.x == None, str(t.x))
+ t.x = 'z' # the first set initializes ._LOCAL_value
+ self.failUnless(t.x == 'z', str(t.x))
+ self.failUnless("_LOCAL_value" in dir(t), dir(t))
+ self.failUnless(t._LOCAL_value == 'z', t._LOCAL_value)
+ def testSettingsProperty(self):
+ class Test(object):
+ @Property
+ @settings_property(name="attr")
+ def x():
+ return {}
+ def __init__(self):
+ self.settings = {}
+ t = Test()
+ self.failUnless(t.x == None, str(t.x))
+ t.x = 'z' # the first set initializes ._LOCAL_value
+ self.failUnless(t.x == 'z', str(t.x))
+ self.failUnless("attr" in t.settings, t.settings)
+ self.failUnless(t.settings["attr"] == 'z', t.settings["attr"])
+ def testDefaultingLocalProperty(self):
+ class Test(object):
+ @Property
+ @defaulting_property(default='y', null='x')
+ @local_property(name="DEFAULT")
+ def x(): return {}
+ t = Test()
+ self.failUnless(t.x == None, str(t.x))
+ t.x = 'x'
+ self.failUnless(t.x == 'y', str(t.x))
+ t.x = 'y'
+ self.failUnless(t.x == 'y', str(t.x))
+ t.x = 'z'
+ self.failUnless(t.x == 'z', str(t.x))
+ def testCheckedLocalProperty(self):
+ class Test(object):
+ @Property
+ @checked_property(allowed=['x', 'y', 'z'])
+ @local_property(name="CHECKED")
+ def x(): return {}
+ def __init__(self):
+ self._CHECKED_value = 'x'
+ t = Test()
+ self.failUnless(t.x == 'x', str(t.x))
+ try:
+ t.x = None
+ e = None
+ except ValueCheckError, e:
+ pass
+ self.failUnless(type(e) == ValueCheckError, type(e))
+ def testTwoCheckedLocalProperties(self):
+ class Test(object):
+ @Property
+ @checked_property(allowed=['x', 'y', 'z'])
+ @local_property(name="X")
+ def x(): return {}
+
+ @Property
+ @checked_property(allowed=['a', 'b', 'c'])
+ @local_property(name="A")
+ def a(): return {}
+ def __init__(self):
+ self._A_value = 'a'
+ self._X_value = 'x'
+ t = Test()
+ try:
+ t.x = 'a'
+ e = None
+ except ValueCheckError, e:
+ pass
+ self.failUnless(type(e) == ValueCheckError, type(e))
+ t.x = 'x'
+ t.x = 'y'
+ t.x = 'z'
+ try:
+ t.a = 'x'
+ e = None
+ except ValueCheckError, e:
+ pass
+ self.failUnless(type(e) == ValueCheckError, type(e))
+ t.a = 'a'
+ t.a = 'b'
+ t.a = 'c'
+ def testCachedLocalProperty(self):
+ class Gen(object):
+ def __init__(self):
+ self.i = 0
+ def __call__(self, owner):
+ self.i += 1
+ return self.i
+ class Test(object):
+ @Property
+ @cached_property(generator=Gen(), initVal=None)
+ @local_property(name="CACHED")
+ def x(): return {}
+ t = Test()
+ self.failIf("_CACHED_cache" in dir(t), getattr(t, "_CACHED_cache", None))
+ self.failUnless(t.x == 1, t.x)
+ self.failUnless(t.x == 1, t.x)
+ self.failUnless(t.x == 1, t.x)
+ t.x = 8
+ self.failUnless(t.x == 8, t.x)
+ self.failUnless(t.x == 8, t.x)
+ t._CACHED_cache = False
+ val = t.x
+ self.failUnless(val == 2, val)
+ val = t.x
+ self.failUnless(val == 3, val)
+ val = t.x
+ self.failUnless(val == 4, val)
+ t._CACHED_cache = True
+ self.failUnless(t.x == 4, str(t.x))
+ self.failUnless(t.x == 4, str(t.x))
+ self.failUnless(t.x == 4, str(t.x))
+ def testPrimedLocalProperty(self):
+ class Test(object):
+ def prime(self):
+ self.settings["PRIMED"] = "initialized"
+ @Property
+ @primed_property(primer=prime, initVal=None)
+ @settings_property(name="PRIMED")
+ def x(): return {}
+ def __init__(self):
+ self.settings={}
+ t = Test()
+ self.failIf("_PRIMED_prime" in dir(t), getattr(t, "_PRIMED_prime", None))
+ self.failUnless(t.x == "initialized", t.x)
+ t.x = 1
+ self.failUnless(t.x == 1, t.x)
+ t.x = None
+ self.failUnless(t.x == "initialized", t.x)
+ t._PRIMED_prime = True
+ t.x = 3
+ self.failUnless(t.x == "initialized", t.x)
+ t._PRIMED_prime = False
+ t.x = 3
+ self.failUnless(t.x == 3, t.x)
+ def testChangeHookLocalProperty(self):
+ class Test(object):
+ def _hook(self, old, new):
+ self.old = old
+ self.new = new
+
+ @Property
+ @change_hook_property(_hook)
+ @local_property(name="HOOKED")
+ def x(): return {}
+ t = Test()
+ t.x = 1
+ self.failUnless(t.old == None, t.old)
+ self.failUnless(t.new == 1, t.new)
+ t.x = 1
+ self.failUnless(t.old == None, t.old)
+ self.failUnless(t.new == 1, t.new)
+ t.x = 2
+ self.failUnless(t.old == 1, t.old)
+ self.failUnless(t.new == 2, t.new)
+
+suite = unittest.TestLoader().loadTestsFromTestCase(DecoratorTests)
+