1 # Copyright (C) 2005-2011 Aaron Bentley <abentley@panoramicfeedback.com>
2 # Alexander Belchenko <bialix@ukr.net>
3 # Ben Finney <benf@cybersource.com.au>
4 # Chris Ball <cjb@laptop.org>
5 # Gianluca Montecchi <gian@grys.it>
6 # W. Trevor King <wking@drexel.edu>
8 # This file is part of Bugs Everywhere.
10 # Bugs Everywhere is free software; you can redistribute it and/or modify it
11 # under the terms of the GNU General Public License as published by the
12 # Free Software Foundation, either version 2 of the License, or (at your
13 # option) any later version.
15 # Bugs Everywhere is distributed in the hope that it will be useful, but
16 # WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 # General Public License for more details.
20 # You should have received a copy of the GNU General Public License
21 # along with Bugs Everywhere. If not, see <http://www.gnu.org/licenses/>.
23 """Define the base :class:`VCS` (Version Control System) class, which
24 should be subclassed by other Version Control System backends. The
25 base class implements a "do not version" VCS.
39 import libbe.storage.base
40 import libbe.util.encoding
41 from libbe.storage.base import EmptyCommit, InvalidRevision, InvalidID
42 from libbe.util.utility import Dir, search_parent_directories
43 from libbe.util.subproc import CommandError, invoke
44 from libbe.util.plugin import import_by_name
45 import libbe.storage.util.upgrade as upgrade
47 if libbe.TESTING == True:
51 import libbe.ui.util.user
53 VCS_ORDER = ['arch', 'bzr', 'darcs', 'git', 'hg', 'monotone']
54 """List VCS modules in order of preference.
56 Don't list this module, it is implicitly last.
59 def set_preferred_vcs(name):
60 """Manipulate :data:`VCS_ORDER` to place `name` first.
62 This is primarily indended for testing purposes.
65 assert name in VCS_ORDER, \
66 'unrecognized VCS %s not in\n %s' % (name, VCS_ORDER)
67 VCS_ORDER.remove(name)
68 VCS_ORDER.insert(0, name)
70 def _get_matching_vcs(matchfn):
71 """Return the first module for which matchfn(VCS_instance) is True.
73 Searches in :data:`VCS_ORDER`.
75 for submodname in VCS_ORDER:
76 module = import_by_name('libbe.storage.vcs.%s' % submodname)
78 if matchfn(vcs) == True:
82 def vcs_by_name(vcs_name):
83 """Return the module for the VCS with the given name.
85 Searches in :data:`VCS_ORDER`.
87 if vcs_name == VCS.name:
89 return _get_matching_vcs(lambda vcs: vcs.name == vcs_name)
92 """Return an VCS instance for the vcs being used in this directory.
94 Searches in :data:`VCS_ORDER`.
96 return _get_matching_vcs(lambda vcs: vcs._detect(dir))
99 """Return an instance of an installed VCS.
101 Searches in :data:`VCS_ORDER`.
103 return _get_matching_vcs(lambda vcs: vcs.installed())
106 class VCSNotRooted (libbe.storage.base.ConnectionError):
107 def __init__(self, vcs):
108 msg = 'VCS not rooted'
109 libbe.storage.base.ConnectionError.__init__(self, msg)
112 class VCSUnableToRoot (libbe.storage.base.ConnectionError):
113 def __init__(self, vcs):
114 msg = 'VCS unable to root'
115 libbe.storage.base.ConnectionError.__init__(self, msg)
118 class InvalidPath (InvalidID):
119 def __init__(self, path, root, msg=None, **kwargs):
121 msg = 'Path "%s" not in root "%s"' % (path, root)
122 InvalidID.__init__(self, msg=msg, **kwargs)
126 class SpacerCollision (InvalidPath):
127 def __init__(self, path, spacer):
128 msg = 'Path "%s" collides with spacer directory "%s"' % (path, spacer)
129 InvalidPath.__init__(self, path, root=None, msg=msg)
132 class NoSuchFile (InvalidID):
133 def __init__(self, pathname, root='.'):
134 path = os.path.abspath(os.path.join(root, pathname))
135 InvalidID.__init__(self, 'No such file: %s' % path)
138 class CachedPathID (object):
139 """Cache Storage ID <-> path policy.
141 Paths generated following::
143 .../.be/BUGDIR/bugs/BUG/comments/COMMENT
146 See :mod:`libbe.util.id` for a discussion of ID formats.
152 >>> os.mkdir(os.path.join(dir.path, '.be'))
153 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc'))
154 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs'))
155 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123'))
156 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments'))
157 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def'))
158 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '456'))
159 >>> open(os.path.join(dir.path, '.be', 'abc', 'values'),
161 >>> open(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'values'),
163 >>> open(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'),
165 >>> c = CachedPathID()
167 >>> c.id(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'))
170 >>> sorted(os.listdir(os.path.join(c._root, '.be')))
173 >>> c.path('123/values') # doctest: +ELLIPSIS
174 u'.../.be/abc/bugs/123/values'
177 >>> sorted(os.listdir(os.path.join(c._root, '.be')))
179 >>> c.connect() # demonstrate auto init
180 >>> sorted(os.listdir(os.path.join(c._root, '.be')))
182 >>> c.add_id(u'xyz', parent=None) # doctest: +ELLIPSIS
184 >>> c.add_id('xyz/def', parent='xyz') # doctest: +ELLIPSIS
186 >>> c.add_id('qrs', parent='123') # doctest: +ELLIPSIS
187 u'.../.be/abc/bugs/123/comments/qrs'
190 >>> c.path('qrs') # doctest: +ELLIPSIS
191 u'.../.be/abc/bugs/123/comments/qrs'
192 >>> c.remove_id('qrs')
194 Traceback (most recent call last):
196 InvalidID: qrs in revision None
201 def __init__(self, encoding=None):
202 self.encoding = libbe.util.encoding.get_text_file_encoding()
203 self._spacer_dirs = ['.be', 'bugs', 'comments']
205 def root(self, path):
206 self._root = os.path.abspath(path).rstrip(os.path.sep)
207 self._cache_path = os.path.join(
208 self._root, self._spacer_dirs[0], 'id-cache')
210 def init(self, verbose=True, cache=None):
211 """Create cache file for an existing .be directory.
213 The file contains multiple lines of the form::
221 spaced_root = os.path.join(self._root, self._spacer_dirs[0])
222 for dirpath, dirnames, filenames in os.walk(spaced_root):
223 if dirpath == spaced_root:
226 id = self.id(dirpath)
227 relpath = dirpath[len(self._root + os.path.sep):]
228 if id.count('/') == 0:
229 if verbose == True and id in self._cache:
230 print >> sys.stderr, 'Multiple paths for %s: \n %s\n %s' % (id, self._cache[id], relpath)
231 self._cache[id] = relpath
234 if self._cache != cache:
240 if os.path.exists(self._cache_path):
241 os.remove(self._cache_path)
244 if not os.path.exists(self._cache_path):
248 raise libbe.storage.base.ConnectionError
249 self._cache = {} # key: uuid, value: path
250 self._changed = False
251 f = codecs.open(self._cache_path, 'r', self.encoding)
253 fields = line.rstrip('\n').split('\t')
254 self._cache[fields[0]] = fields[1]
257 def disconnect(self):
258 if self._changed == True:
259 f = codecs.open(self._cache_path, 'w', self.encoding)
260 for uuid,path in self._cache.items():
261 f.write('%s\t%s\n' % (uuid, path))
265 def path(self, id, relpath=False):
266 fields = id.split('/', 1)
272 if uuid not in self._cache:
273 self.init(verbose=False, cache=self._cache)
274 if uuid not in self._cache:
275 raise InvalidID(uuid)
277 return os.path.join(self._cache[uuid], *extra)
278 return os.path.join(self._root, self._cache[uuid], *extra)
280 def add_id(self, id, parent=None):
281 if id.count('/') > 0:
282 # not a UUID-level path
283 assert id.startswith(parent), \
284 'Strange ID: "%s" should start with "%s"' % (id, parent)
286 elif id in self._cache:
292 spacer = self._spacer_dirs[0]
294 assert parent.count('/') == 0, \
295 'Strange parent ID: "%s" should be UUID' % parent
296 parent_path = self.path(parent, relpath=True)
297 parent_spacer = parent_path.split(os.path.sep)[-2]
298 i = self._spacer_dirs.index(parent_spacer)
299 spacer = self._spacer_dirs[i+1]
300 path = os.path.join(parent_path, spacer, id)
301 self._cache[id] = path
303 path = os.path.join(self._root, path)
306 def remove_id(self, id):
307 if id.count('/') > 0:
308 return # not a UUID-level path
313 path = os.path.join(self._root, path)
314 if not path.startswith(self._root + os.path.sep):
315 raise InvalidPath(path, self._root)
316 path = path[len(self._root + os.path.sep):]
318 if not path.startswith(self._spacer_dirs[0] + os.path.sep):
319 raise InvalidPath(path, self._spacer_dirs[0])
320 for spacer in self._spacer_dirs:
321 if not path.startswith(spacer + os.path.sep):
323 id = path[len(spacer + os.path.sep):]
324 fields = path[len(spacer + os.path.sep):].split(os.path.sep,1)
328 for spacer in self._spacer_dirs:
329 if id.endswith(os.path.sep + spacer):
330 raise SpacerCollision(orig_path, spacer)
331 if os.path.sep != '/':
332 id = id.replace(os.path.sep, '/')
339 class VCS (libbe.storage.base.VersionedStorage):
340 """Implement a 'no-VCS' interface.
342 Support for other VCSs can be added by subclassing this class, and
343 overriding methods `_vcs_*()` with code appropriate for your VCS.
345 The methods `_u_*()` are utility methods available to the `_vcs_*()`
349 client = 'false' # command-line tool for _u_invoke_client
351 def __init__(self, *args, **kwargs):
352 if 'encoding' not in kwargs:
353 kwargs['encoding'] = libbe.util.encoding.get_text_file_encoding()
354 libbe.storage.base.VersionedStorage.__init__(self, *args, **kwargs)
355 self.versioned = False
356 self.interspersed_vcs_files = False
357 self.verbose_invoke = False
358 self._cached_path_id = CachedPathID()
361 def _vcs_version(self):
363 Return the VCS version string.
367 def _vcs_get_user_id(self):
369 Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
370 If the VCS has not been configured with a username, return None.
374 def _vcs_detect(self, path=None):
376 Detect whether a directory is revision controlled with this VCS.
380 def _vcs_root(self, path):
382 Get the VCS root. This is the default working directory for
383 future invocations. You would normally set this to the root
384 directory for your VCS.
386 if os.path.isdir(path) == False:
387 path = os.path.dirname(path)
389 path = os.path.abspath('.')
392 def _vcs_init(self, path):
394 Begin versioning the tree based at path.
398 def _vcs_destroy(self):
400 Remove any files used in versioning (e.g. whatever _vcs_init()
405 def _vcs_add(self, path):
407 Add the already created file at path to version control.
411 def _vcs_exists(self, path, revision=None):
413 Does the path exist in a given revision? (True/False)
415 raise NotImplementedError('Lazy BE developers')
417 def _vcs_remove(self, path):
419 Remove the file at path from version control. Optionally
420 remove the file from the filesystem as well.
424 def _vcs_update(self, path):
426 Notify the versioning system of changes to the versioned file
431 def _vcs_is_versioned(self, path):
433 Return true if a path is under version control, False
434 otherwise. You only need to set this if the VCS goes about
435 dumping VCS-specific files into the .be directory.
437 If you do need to implement this method (e.g. Arch), set
438 self.interspersed_vcs_files = True
440 assert self.interspersed_vcs_files == False
441 raise NotImplementedError
443 def _vcs_get_file_contents(self, path, revision=None):
445 Get the file contents as they were in a given revision.
446 Revision==None specifies the current revision.
449 raise libbe.storage.base.InvalidRevision(
450 'The %s VCS does not support revision specifiers' % self.name)
451 path = os.path.join(self.repo, path)
452 if not os.path.exists(path):
453 return libbe.util.InvalidObject
454 if os.path.isdir(path):
455 return libbe.storage.base.InvalidDirectory
461 def _vcs_path(self, id, revision):
463 Return the relative path to object id as of revision.
465 Revision will not be None.
467 raise NotImplementedError
469 def _vcs_isdir(self, path, revision):
471 Return True if path (as returned by _vcs_path) was a directory
472 as of revision, False otherwise.
474 Revision will not be None.
476 raise NotImplementedError
478 def _vcs_listdir(self, path, revision):
480 Return a list of the contents of the directory path (as
481 returned by _vcs_path) as of revision.
483 Revision will not be None, and ._vcs_isdir(path, revision)
486 raise NotImplementedError
488 def _vcs_commit(self, commitfile, allow_empty=False):
490 Commit the current working directory, using the contents of
491 commitfile as the comment. Return the name of the old
492 revision (or None if commits are not supported).
494 If allow_empty == False, raise EmptyCommit if there are no
499 def _vcs_revision_id(self, index):
501 Return the name of the <index>th revision. Index will be an
502 integer (possibly <= 0). The choice of which branch to follow
503 when crossing branches/merges is not defined.
505 Return None if revision IDs are not supported, or if the
506 specified revision does not exist.
510 def _vcs_changed(self, revision):
512 Return a tuple of lists of ids
513 (new, modified, removed)
514 from the specified revision to the current situation.
519 # Cache version string for efficiency.
520 if not hasattr(self, '_version'):
521 self._version = self._vcs_version()
524 def version_cmp(self, *args):
525 """Compare the installed VCS version `V_i` with another version
526 `V_o` (given in `*args`). Returns
537 >>> v = VCS(repo='.')
538 >>> v._version = '2.3.1 (release)'
539 >>> v.version_cmp(2,3,1)
541 >>> v.version_cmp(2,3,2)
543 >>> v.version_cmp(2,3,'a',5)
545 >>> v.version_cmp(2,3,0)
547 >>> v.version_cmp(2,3,1,'a',5)
549 >>> v.version_cmp(2,3,1,1)
553 >>> v._version = '2.0.0pre2'
554 >>> v._parsed_version = None
557 >>> v.version_cmp(2,0,1)
559 >>> v.version_cmp(2,0,0,'pre',1)
561 >>> v.version_cmp(2,0,0,'pre',2)
563 >>> v.version_cmp(2,0,0,'pre',3)
565 >>> v.version_cmp(2,0,0,'a',3)
567 >>> v.version_cmp(2,0,0,'rc',1)
570 if not hasattr(self, '_parsed_version') \
571 or self._parsed_version == None:
572 num_part = self.version().split(' ')[0]
573 self._parsed_version = []
574 for num in num_part.split('.'):
576 self._parsed_version.append(int(num))
577 except ValueError, e:
578 # bzr version number might contain non-numerical tags
579 splitter = re.compile(r'[\D]') # Match non-digits
580 splits = splitter.split(num)
581 # if len(tag) > 1 some splits will be empty; remove
582 splits = filter(lambda s: s != '', splits)
583 tag_starti = len(splits[0])
584 num_starti = num.find(splits[1], tag_starti)
585 tag = num[tag_starti:num_starti]
586 self._parsed_version.append(int(splits[0]))
587 self._parsed_version.append(tag)
588 self._parsed_version.append(int(splits[1]))
589 for current,other in zip(self._parsed_version, args):
590 if type(current) != type (other):
591 # one of them is a pre-release string
592 if type(current) != types.IntType:
596 c = cmp(current,other)
599 # see if one is longer than the other
600 verlen = len(self._parsed_version)
604 elif verlen > arglen:
605 if type(self._parsed_version[arglen]) != types.IntType:
606 return -1 # self is a prerelease
610 if type(args[verlen]) != types.IntType:
611 return 1 # args is a prerelease
616 if self.version() != None:
620 def get_user_id(self):
622 Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
623 If the VCS has not been configured with a username, return None.
624 You can override the automatic lookup procedure by setting the
625 VCS.user_id attribute to a string of your choice.
627 if not hasattr(self, 'user_id'):
628 self.user_id = self._vcs_get_user_id()
629 if self.user_id == None:
631 name = libbe.ui.util.user.get_fallback_fullname()
632 email = libbe.ui.util.user.get_fallback_email()
633 self.user_id = libbe.ui.util.user.create_user_id(name, email)
636 def _detect(self, path='.'):
638 Detect whether a directory is revision controlled with this VCS.
640 return self._vcs_detect(path)
643 """Set the root directory to the path's VCS root.
645 This is the default working directory for future invocations.
646 Consider the following usage case:
648 You have a project rooted in::
652 by which I mean the VCS repository is in, for example::
656 However, you're of in some subdirectory like::
658 /path/to/source/ui/testing
660 and you want to comment on a bug. `root` will locate your VCS
661 root (``/path/to/source/``) and set the repo there. This
662 means that it doesn't matter where you are in your project
663 tree when you call "be COMMAND", it always acts as if you called
664 it from the VCS root.
666 if self._detect(self.repo) == False:
667 raise VCSUnableToRoot(self)
668 root = self._vcs_root(self.repo)
669 self.repo = os.path.realpath(root)
670 if os.path.isdir(self.repo) == False:
671 self.repo = os.path.dirname(self.repo)
672 self.be_dir = os.path.join(
673 self.repo, self._cached_path_id._spacer_dirs[0])
674 self._cached_path_id.root(self.repo)
679 Begin versioning the tree based at self.repo.
680 Also roots the vcs at path.
684 root : called if the VCS has already been initialized.
686 if not os.path.exists(self.repo) or not os.path.isdir(self.repo):
687 raise VCSUnableToRoot(self)
688 if self._vcs_detect(self.repo) == False:
689 self._vcs_init(self.repo)
690 if self._rooted == False:
692 os.mkdir(self.be_dir)
693 self._vcs_add(self._u_rel_path(self.be_dir))
694 self._setup_storage_version()
695 self._cached_path_id.init()
699 self._cached_path_id.destroy()
700 if os.path.exists(self.be_dir):
701 shutil.rmtree(self.be_dir)
704 if self._rooted == False:
706 if not os.path.isdir(self.be_dir):
707 raise libbe.storage.base.ConnectionError(self)
708 self._cached_path_id.connect()
709 self.check_storage_version()
711 def _disconnect(self):
712 self._cached_path_id.disconnect()
714 def path(self, id, revision=None, relpath=True):
716 path = self._cached_path_id.path(id)
718 return self._u_rel_path(path)
720 path = self._vcs_path(id, revision)
723 return os.path.join(self.repo, path)
725 def _add_path(self, path, directory=False):
726 relpath = self._u_rel_path(path)
727 reldirs = relpath.split(os.path.sep)
728 if directory == False:
729 reldirs = reldirs[:-1]
731 for reldir in reldirs:
732 dir = os.path.join(dir, reldir)
733 if not os.path.exists(dir):
735 self._vcs_add(self._u_rel_path(dir))
736 elif not os.path.isdir(dir):
737 raise libbe.storage.base.InvalidDirectory
738 if directory == False:
739 if not os.path.exists(path):
740 open(path, 'w').close()
741 self._vcs_add(self._u_rel_path(path))
743 def _add(self, id, parent=None, **kwargs):
744 path = self._cached_path_id.add_id(id, parent)
745 self._add_path(path, **kwargs)
747 def _exists(self, id, revision=None):
750 path = self.path(id, revision, relpath=False)
753 return os.path.exists(path)
754 path = self.path(id, revision, relpath=True)
755 return self._vcs_exists(relpath, revision)
757 def _remove(self, id):
758 path = self._cached_path_id.path(id)
759 if os.path.exists(path):
760 if os.path.isdir(path) and len(self.children(id)) > 0:
761 raise libbe.storage.base.DirectoryNotEmpty(id)
762 self._vcs_remove(self._u_rel_path(path))
763 if os.path.exists(path):
764 if os.path.isdir(path):
768 self._cached_path_id.remove_id(id)
770 def _recursive_remove(self, id):
771 path = self._cached_path_id.path(id)
772 for dirpath,dirnames,filenames in os.walk(path, topdown=False):
773 filenames.extend(dirnames)
775 fullpath = os.path.join(dirpath, f)
776 if os.path.exists(fullpath) == False:
778 self._vcs_remove(self._u_rel_path(fullpath))
779 if os.path.exists(path):
781 path = self._cached_path_id.path(id, relpath=True)
782 for id,p in self._cached_path_id._cache.items():
783 if p.startswith(path):
784 self._cached_path_id.remove_id(id)
786 def _ancestors(self, id=None, revision=None):
790 path = self.path(id, revision, relpath=False)
793 if not path.startswith(self.repo + os.path.sep):
795 path = os.path.dirname(path)
797 id = self._u_path_to_id(path)
799 except (SpacerCollision, InvalidPath):
803 def _children(self, id=None, revision=None):
805 isdir = os.path.isdir
808 isdir = lambda path : self._vcs_isdir(
809 self._u_rel_path(path), revision)
810 listdir = lambda path : self._vcs_listdir(
811 self._u_rel_path(path), revision)
815 path = self.path(id, revision, relpath=False)
816 if isdir(path) == False:
818 children = listdir(path)
819 for i,c in enumerate(children):
820 if c in self._cached_path_id._spacer_dirs:
822 children.extend([os.path.join(c, c2) for c2 in
823 listdir(os.path.join(path, c))])
824 elif c in ['id-cache', 'version']:
826 elif self.interspersed_vcs_files \
827 and self._vcs_is_versioned(c) == False:
829 for i,c in enumerate(children):
830 if c == None: continue
831 cpath = os.path.join(path, c)
832 if self.interspersed_vcs_files == True \
833 and revision != None \
834 and self._vcs_is_versioned(cpath) == False:
837 children[i] = self._u_path_to_id(cpath)
838 return [c for c in children if c != None]
840 def _get(self, id, default=libbe.util.InvalidObject, revision=None):
842 relpath = self.path(id, revision, relpath=True)
843 contents = self._vcs_get_file_contents(relpath, revision)
845 if default == libbe.util.InvalidObject:
848 if contents in [libbe.storage.base.InvalidDirectory,
849 libbe.util.InvalidObject] \
850 or len(contents) == 0:
851 if default == libbe.util.InvalidObject:
852 raise InvalidID(id, revision)
856 def _set(self, id, value):
858 path = self._cached_path_id.path(id)
861 if not os.path.exists(path):
863 if os.path.isdir(path):
864 raise libbe.storage.base.InvalidDirectory(id)
868 self._vcs_update(self._u_rel_path(path))
870 def _commit(self, summary, body=None, allow_empty=False):
871 summary = summary.strip()+'\n'
873 summary += '\n' + body.strip() + '\n'
874 descriptor, filename = tempfile.mkstemp()
877 temp_file = os.fdopen(descriptor, 'wb')
878 temp_file.write(summary)
880 revision = self._vcs_commit(filename, allow_empty=allow_empty)
886 def revision_id(self, index=None):
890 if int(index) != index:
891 raise InvalidRevision(index)
893 raise InvalidRevision(index)
894 revid = self._vcs_revision_id(index)
896 raise libbe.storage.base.InvalidRevision(index)
899 def changed(self, revision):
900 new,mod,rem = self._vcs_changed(revision)
901 def paths_to_ids(paths):
904 id = self._u_path_to_id(p)
906 except (SpacerCollision, InvalidPath):
908 new_id = list(paths_to_ids(new))
909 mod_id = list(paths_to_ids(mod))
910 rem_id = list(paths_to_ids(rem))
911 return (new_id, mod_id, rem_id)
913 def _u_any_in_string(self, list, string):
914 """Return True if any of the strings in list are in string.
915 Otherwise return False.
917 for list_string in list:
918 if list_string in string:
922 def _u_invoke(self, *args, **kwargs):
923 if 'cwd' not in kwargs:
924 kwargs['cwd'] = self.repo
925 if 'verbose' not in kwargs:
926 kwargs['verbose'] = self.verbose_invoke
927 if 'encoding' not in kwargs:
928 kwargs['encoding'] = self.encoding
929 return invoke(*args, **kwargs)
931 def _u_invoke_client(self, *args, **kwargs):
932 cl_args = [self.client]
934 return self._u_invoke(cl_args, **kwargs)
936 def _u_search_parent_directories(self, path, filename):
937 """Find the file (or directory) named filename in path or in any of
941 search_parent_directories("/a/b/c", ".be")
942 will return the path to the first existing file from
947 or None if none of those files exist.
950 ret = search_parent_directories(path, filename)
951 except AssertionError, e:
955 def _u_find_id_from_manifest(self, id, manifest, revision=None):
956 """Search for the relative path to id using manifest, a list of all
959 Returns None if the id is not found.
961 be_dir = self._cached_path_id._spacer_dirs[0]
962 be_dir_sep = self._cached_path_id._spacer_dirs[0] + os.path.sep
963 files = [f for f in manifest if f.startswith(be_dir_sep)]
965 if not file.startswith(be_dir+os.path.sep):
967 parts = file.split(os.path.sep)
968 dir = parts.pop(0) # don't add the first spacer dir
969 for part in parts[:-1]:
970 dir = os.path.join(dir, part)
975 p_id = self._u_path_to_id(file)
978 except (SpacerCollision, InvalidPath):
980 raise InvalidID(id, revision=revision)
982 def _u_find_id(self, id, revision):
983 """Search for the relative path to id as of revision.
985 Returns None if the id is not found.
987 assert self._rooted == True
988 be_dir = self._cached_path_id._spacer_dirs[0]
989 stack = [(be_dir, be_dir)]
990 while len(stack) > 0:
991 path,long_id = stack.pop()
992 if long_id.endswith('/'+id):
994 if self._vcs_isdir(path, revision) == False:
996 for child in self._vcs_listdir(path, revision):
997 stack.append((os.path.join(path, child),
998 '/'.join([long_id, child])))
999 raise InvalidID(id, revision=revision)
1001 def _u_path_to_id(self, path):
1002 return self._cached_path_id.id(path)
1004 def _u_rel_path(self, path, root=None):
1005 """Return the relative path to path from root.
1010 >>> vcs._u_rel_path("/a.b/c/.be", "/a.b/c")
1012 >>> vcs._u_rel_path("/a.b/c/", "/a.b/c")
1014 >>> vcs._u_rel_path("/a.b/c/", "/a.b/c/")
1016 >>> vcs._u_rel_path("./a", ".")
1020 if self.repo == None:
1021 raise VCSNotRooted(self)
1023 path = os.path.abspath(path)
1024 absRoot = os.path.abspath(root)
1025 absRootSlashedDir = os.path.join(absRoot,"")
1026 if path in [absRoot, absRootSlashedDir]:
1028 if not path.startswith(absRootSlashedDir):
1029 raise InvalidPath(path, absRootSlashedDir)
1030 relpath = path[len(absRootSlashedDir):]
1033 def _u_abspath(self, path, root=None):
1034 """Return the absolute path from a path realtive to root.
1040 >>> vcs._u_abspath(".be", "/a.b/c")
1044 assert self.repo != None, "VCS not rooted"
1046 return os.path.abspath(os.path.join(root, path))
1048 def _u_parse_commitfile(self, commitfile):
1049 """Split the commitfile created in self.commit() back into summary and
1052 f = codecs.open(commitfile, 'r', self.encoding)
1053 summary = f.readline()
1059 return (summary, body)
1061 def check_storage_version(self):
1062 version = self.storage_version()
1063 if version != libbe.storage.STORAGE_VERSION:
1064 upgrade.upgrade(self.repo, version)
1066 def storage_version(self, revision=None, path=None):
1067 """Return the storage version of the on-disk files.
1071 :mod:`libbe.storage.util.upgrade`
1074 path = os.path.join(self.repo, '.be', 'version')
1075 if not os.path.exists(path):
1076 raise libbe.storage.InvalidStorageVersion(None)
1077 if revision == None: # don't require connection
1078 return libbe.util.encoding.get_file_contents(
1079 path, decode=True).rstrip()
1080 relpath = self._u_rel_path(path)
1081 contents = self._vcs_get_file_contents(relpath, revision=revision)
1082 if type(contents) != types.UnicodeType:
1083 contents = unicode(contents, self.encoding)
1084 return contents.strip()
1086 def _setup_storage_version(self):
1088 Requires disk access.
1090 assert self._rooted == True
1091 path = os.path.join(self.be_dir, 'version')
1092 if not os.path.exists(path):
1093 libbe.util.encoding.set_file_contents(path,
1094 libbe.storage.STORAGE_VERSION+'\n')
1095 self._vcs_add(self._u_rel_path(path))
1098 if libbe.TESTING == True:
1099 class VCSTestCase (unittest.TestCase):
1101 Test cases for base VCS class (in addition to the Storage test
1107 def __init__(self, *args, **kwargs):
1108 super(VCSTestCase, self).__init__(*args, **kwargs)
1112 """Set up test fixtures for Storage test case."""
1113 super(VCSTestCase, self).setUp()
1115 self.dirname = self.dir.path
1116 self.s = self.Class(repo=self.dirname)
1117 if self.s.installed() == True:
1122 super(VCSTestCase, self).tearDown()
1123 if self.s.installed() == True:
1128 class VCS_installed_TestCase (VCSTestCase):
1129 def test_installed(self):
1130 """See if the VCS is installed.
1132 self.failUnless(self.s.installed() == True,
1133 '%(name)s VCS not found' % vars(self.Class))
1136 class VCS_detection_TestCase (VCSTestCase):
1137 def test_detection(self):
1138 """See if the VCS detects its installed repository
1140 if self.s.installed():
1142 self.failUnless(self.s._detect(self.dirname) == True,
1143 'Did not detected %(name)s VCS after initialising'
1147 def test_no_detection(self):
1148 """See if the VCS detects its installed repository
1150 if self.s.installed() and self.Class.name != 'None':
1153 self.failUnless(self.s._detect(self.dirname) == False,
1154 'Detected %(name)s VCS before initialising'
1159 def test_vcs_repo_in_specified_root_path(self):
1160 """VCS root directory should be in specified root path."""
1161 rp = os.path.realpath(self.s.repo)
1162 dp = os.path.realpath(self.dirname)
1163 vcs_name = self.Class.name
1165 dp == rp or rp == None,
1166 "%(vcs_name)s VCS root in wrong dir (%(dp)s %(rp)s)" % vars())
1168 class VCS_get_user_id_TestCase(VCSTestCase):
1169 """Test cases for VCS.get_user_id method."""
1171 def test_gets_existing_user_id(self):
1172 """Should get the existing user ID."""
1173 if self.s.installed():
1174 user_id = self.s.get_user_id()
1177 name,email = libbe.ui.util.user.parse_user_id(user_id)
1179 self.failUnless('@' in email, email)
1181 def make_vcs_testcase_subclasses(vcs_class, namespace):
1184 if c.versioned == True:
1185 libbe.storage.base.make_versioned_storage_testcase_subclasses(
1186 vcs_class, namespace)
1188 libbe.storage.base.make_storage_testcase_subclasses(
1189 vcs_class, namespace)
1191 if namespace != sys.modules[__name__]:
1192 # Make VCSTestCase subclasses for vcs_class in the namespace.
1193 vcs_testcase_classes = [
1195 ob for ob in globals().values() if isinstance(ob, type))
1196 if issubclass(c, VCSTestCase) \
1199 for base_class in vcs_testcase_classes:
1200 testcase_class_name = vcs_class.__name__ + base_class.__name__
1201 testcase_class_bases = (base_class,)
1202 testcase_class_dict = dict(base_class.__dict__)
1203 testcase_class_dict['Class'] = vcs_class
1204 testcase_class = type(
1205 testcase_class_name, testcase_class_bases, testcase_class_dict)
1206 setattr(namespace, testcase_class_name, testcase_class)
1208 make_vcs_testcase_subclasses(VCS, sys.modules[__name__])
1210 unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
1211 suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])