1 # Copyright (C) 2005-2012 Aaron Bentley <abentley@panoramicfeedback.com>
2 # Alexander Belchenko <bialix@ukr.net>
3 # Ben Finney <benf@cybersource.com.au>
4 # Chris Ball <cjb@laptop.org>
5 # Gianluca Montecchi <gian@grys.it>
6 # W. Trevor King <wking@tremily.us>
8 # This file is part of Bugs Everywhere.
10 # Bugs Everywhere is free software: you can redistribute it and/or modify it
11 # under the terms of the GNU General Public License as published by the Free
12 # Software Foundation, either version 2 of the License, or (at your option) any
15 # Bugs Everywhere is distributed in the hope that it will be useful, but
16 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
17 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
20 # You should have received a copy of the GNU General Public License along with
21 # Bugs Everywhere. If not, see <http://www.gnu.org/licenses/>.
23 """Define the base :py:class:`VCS` (Version Control System) class, which
24 should be subclassed by other Version Control System backends. The
25 base class implements a "do not version" VCS.
39 import libbe.storage.base
40 import libbe.util.encoding
41 from libbe.storage.base import EmptyCommit, InvalidRevision, InvalidID
42 from libbe.util.utility import Dir, search_parent_directories
43 from libbe.util.subproc import CommandError, invoke
44 from libbe.util.plugin import import_by_name
45 import libbe.storage.util.upgrade as upgrade
47 if libbe.TESTING == True:
51 import libbe.ui.util.user
53 VCS_ORDER = ['arch', 'bzr', 'darcs', 'git', 'hg', 'monotone']
54 """List VCS modules in order of preference.
56 Don't list this module, it is implicitly last.
59 def set_preferred_vcs(name):
60 """Manipulate :py:data:`VCS_ORDER` to place `name` first.
62 This is primarily indended for testing purposes.
65 assert name in VCS_ORDER, \
66 'unrecognized VCS %s not in\n %s' % (name, VCS_ORDER)
67 VCS_ORDER.remove(name)
68 VCS_ORDER.insert(0, name)
70 def _get_matching_vcs(matchfn):
71 """Return the first module for which matchfn(VCS_instance) is True.
73 Searches in :py:data:`VCS_ORDER`.
75 for submodname in VCS_ORDER:
76 module = import_by_name('libbe.storage.vcs.%s' % submodname)
78 if matchfn(vcs) == True:
82 def vcs_by_name(vcs_name):
83 """Return the module for the VCS with the given name.
85 Searches in :py:data:`VCS_ORDER`.
87 if vcs_name == VCS.name:
89 return _get_matching_vcs(lambda vcs: vcs.name == vcs_name)
92 """Return an VCS instance for the vcs being used in this directory.
94 Searches in :py:data:`VCS_ORDER`.
96 return _get_matching_vcs(lambda vcs: vcs._detect(dir))
99 """Return an instance of an installed VCS.
101 Searches in :py:data:`VCS_ORDER`.
103 return _get_matching_vcs(lambda vcs: vcs.installed())
106 class VCSNotRooted (libbe.storage.base.ConnectionError):
107 def __init__(self, vcs):
108 msg = 'VCS not rooted'
109 libbe.storage.base.ConnectionError.__init__(self, msg)
112 class VCSUnableToRoot (libbe.storage.base.ConnectionError):
113 def __init__(self, vcs):
114 msg = 'VCS unable to root'
115 libbe.storage.base.ConnectionError.__init__(self, msg)
118 class InvalidPath (InvalidID):
119 def __init__(self, path, root, msg=None, **kwargs):
121 msg = 'Path "%s" not in root "%s"' % (path, root)
122 InvalidID.__init__(self, msg=msg, **kwargs)
126 class SpacerCollision (InvalidPath):
127 def __init__(self, path, spacer):
128 msg = 'Path "%s" collides with spacer directory "%s"' % (path, spacer)
129 InvalidPath.__init__(self, path, root=None, msg=msg)
132 class NoSuchFile (InvalidID):
133 def __init__(self, pathname, root='.'):
134 path = os.path.abspath(os.path.join(root, pathname))
135 InvalidID.__init__(self, 'No such file: %s' % path)
138 class CachedPathID (object):
139 """Cache Storage ID <-> path policy.
141 Paths generated following::
143 .../.be/BUGDIR/bugs/BUG/comments/COMMENT
146 See :py:mod:`libbe.util.id` for a discussion of ID formats.
152 >>> os.mkdir(os.path.join(dir.path, '.be'))
153 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc'))
154 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs'))
155 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123'))
156 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments'))
157 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def'))
158 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '456'))
159 >>> open(os.path.join(dir.path, '.be', 'abc', 'values'),
161 >>> open(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'values'),
163 >>> open(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'),
165 >>> c = CachedPathID()
167 >>> c.id(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'))
170 >>> sorted(os.listdir(os.path.join(c._root, '.be')))
173 >>> c.path('123/values') # doctest: +ELLIPSIS
174 u'.../.be/abc/bugs/123/values'
177 >>> sorted(os.listdir(os.path.join(c._root, '.be')))
179 >>> c.connect() # demonstrate auto init
180 >>> sorted(os.listdir(os.path.join(c._root, '.be')))
182 >>> c.add_id(u'xyz', parent=None) # doctest: +ELLIPSIS
184 >>> c.add_id('xyz/def', parent='xyz') # doctest: +ELLIPSIS
186 >>> c.add_id('qrs', parent='123') # doctest: +ELLIPSIS
187 u'.../.be/abc/bugs/123/comments/qrs'
190 >>> c.path('qrs') # doctest: +ELLIPSIS
191 u'.../.be/abc/bugs/123/comments/qrs'
192 >>> c.remove_id('qrs')
194 Traceback (most recent call last):
196 InvalidID: qrs in revision None
201 def __init__(self, encoding=None):
202 self.encoding = libbe.util.encoding.get_text_file_encoding()
203 self._spacer_dirs = ['.be', 'bugs', 'comments']
205 def root(self, path):
206 self._root = os.path.abspath(path).rstrip(os.path.sep)
207 self._cache_path = os.path.join(
208 self._root, self._spacer_dirs[0], 'id-cache')
210 def init(self, cache=None):
211 """Create cache file for an existing .be directory.
213 The file contains multiple lines of the form::
221 spaced_root = os.path.join(self._root, self._spacer_dirs[0])
222 for dirpath, dirnames, filenames in os.walk(spaced_root,
224 if dirpath == spaced_root:
227 id = self.id(dirpath)
228 relpath = dirpath[len(self._root + os.path.sep):]
229 if id.count('/') == 0:
230 if id in self._cache:
232 'multiple paths for {0}:\n {1}\n {2}'.format(
233 id, self._cache[id], relpath))
234 self._cache[id] = relpath
237 if self._cache != cache:
243 if os.path.exists(self._cache_path):
244 os.remove(self._cache_path)
247 if not os.path.exists(self._cache_path):
251 raise libbe.storage.base.ConnectionError
252 self._cache = {} # key: uuid, value: path
253 self._changed = False
254 f = codecs.open(self._cache_path, 'r', self.encoding)
256 fields = line.rstrip('\n').split('\t')
257 self._cache[fields[0]] = fields[1]
260 def disconnect(self):
261 if self._changed == True:
262 f = codecs.open(self._cache_path, 'w', self.encoding)
263 for uuid,path in self._cache.items():
264 f.write('%s\t%s\n' % (uuid, path))
268 def path(self, id, relpath=False):
269 fields = id.split('/', 1)
275 if uuid not in self._cache:
276 self.init(cache=self._cache)
277 if uuid not in self._cache:
278 raise InvalidID(uuid)
280 return os.path.join(self._cache[uuid], *extra)
281 return os.path.join(self._root, self._cache[uuid], *extra)
283 def add_id(self, id, parent=None):
284 if id.count('/') > 0:
285 # not a UUID-level path
286 assert id.startswith(parent), \
287 'Strange ID: "%s" should start with "%s"' % (id, parent)
289 elif id in self._cache:
295 spacer = self._spacer_dirs[0]
297 assert parent.count('/') == 0, \
298 'Strange parent ID: "%s" should be UUID' % parent
299 parent_path = self.path(parent, relpath=True)
300 parent_spacer = parent_path.split(os.path.sep)[-2]
301 i = self._spacer_dirs.index(parent_spacer)
302 spacer = self._spacer_dirs[i+1]
303 path = os.path.join(parent_path, spacer, id)
304 self._cache[id] = path
306 path = os.path.join(self._root, path)
309 def remove_id(self, id):
310 if id.count('/') > 0:
311 return # not a UUID-level path
316 path = os.path.join(self._root, path)
317 if not path.startswith(self._root + os.path.sep):
318 raise InvalidPath(path, self._root)
319 path = path[len(self._root + os.path.sep):]
321 if not path.startswith(self._spacer_dirs[0] + os.path.sep):
322 raise InvalidPath(path, self._spacer_dirs[0])
323 for spacer in self._spacer_dirs:
324 if not path.startswith(spacer + os.path.sep):
326 id = path[len(spacer + os.path.sep):]
327 fields = path[len(spacer + os.path.sep):].split(os.path.sep,1)
331 for spacer in self._spacer_dirs:
332 if id.endswith(os.path.sep + spacer):
333 raise SpacerCollision(orig_path, spacer)
334 if os.path.sep != '/':
335 id = id.replace(os.path.sep, '/')
342 class VCS (libbe.storage.base.VersionedStorage):
343 """Implement a 'no-VCS' interface.
345 Support for other VCSs can be added by subclassing this class, and
346 overriding methods `_vcs_*()` with code appropriate for your VCS.
348 The methods `_u_*()` are utility methods available to the `_vcs_*()`
352 client = 'false' # command-line tool for _u_invoke_client
354 def __init__(self, *args, **kwargs):
355 if 'encoding' not in kwargs:
356 kwargs['encoding'] = libbe.util.encoding.get_text_file_encoding()
357 libbe.storage.base.VersionedStorage.__init__(self, *args, **kwargs)
358 self.versioned = False
359 self.interspersed_vcs_files = False
360 self._cached_path_id = CachedPathID()
363 def _vcs_version(self):
365 Return the VCS version string.
369 def _vcs_get_user_id(self):
371 Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
372 If the VCS has not been configured with a username, return None.
376 def _vcs_detect(self, path=None):
378 Detect whether a directory is revision controlled with this VCS.
382 def _vcs_root(self, path):
384 Get the VCS root. This is the default working directory for
385 future invocations. You would normally set this to the root
386 directory for your VCS.
388 if os.path.isdir(path) == False:
389 path = os.path.dirname(path)
391 path = os.path.abspath('.')
394 def _vcs_init(self, path):
396 Begin versioning the tree based at path.
400 def _vcs_destroy(self):
402 Remove any files used in versioning (e.g. whatever _vcs_init()
407 def _vcs_add(self, path):
409 Add the already created file at path to version control.
413 def _vcs_exists(self, path, revision=None):
415 Does the path exist in a given revision? (True/False)
417 raise NotImplementedError('Lazy BE developers')
419 def _vcs_remove(self, path):
421 Remove the file at path from version control. Optionally
422 remove the file from the filesystem as well.
426 def _vcs_update(self, path):
428 Notify the versioning system of changes to the versioned file
433 def _vcs_is_versioned(self, path):
435 Return true if a path is under version control, False
436 otherwise. You only need to set this if the VCS goes about
437 dumping VCS-specific files into the .be directory.
439 If you do need to implement this method (e.g. Arch), set
440 self.interspersed_vcs_files = True
442 assert self.interspersed_vcs_files == False
443 raise NotImplementedError
445 def _vcs_get_file_contents(self, path, revision=None):
447 Get the file contents as they were in a given revision.
448 Revision==None specifies the current revision.
451 raise libbe.storage.base.InvalidRevision(
452 'The %s VCS does not support revision specifiers' % self.name)
453 path = os.path.join(self.repo, path)
454 if not os.path.exists(path):
455 return libbe.util.InvalidObject
456 if os.path.isdir(path):
457 return libbe.storage.base.InvalidDirectory
463 def _vcs_path(self, id, revision):
465 Return the relative path to object id as of revision.
467 Revision will not be None.
469 raise NotImplementedError
471 def _vcs_isdir(self, path, revision):
473 Return True if path (as returned by _vcs_path) was a directory
474 as of revision, False otherwise.
476 Revision will not be None.
478 raise NotImplementedError
480 def _vcs_listdir(self, path, revision):
482 Return a list of the contents of the directory path (as
483 returned by _vcs_path) as of revision.
485 Revision will not be None, and ._vcs_isdir(path, revision)
488 raise NotImplementedError
490 def _vcs_commit(self, commitfile, allow_empty=False):
492 Commit the current working directory, using the contents of
493 commitfile as the comment. Return the name of the old
494 revision (or None if commits are not supported).
496 If allow_empty == False, raise EmptyCommit if there are no
501 def _vcs_revision_id(self, index):
503 Return the name of the <index>th revision. Index will be an
504 integer (possibly <= 0). The choice of which branch to follow
505 when crossing branches/merges is not defined.
507 Return None if revision IDs are not supported, or if the
508 specified revision does not exist.
512 def _vcs_changed(self, revision):
514 Return a tuple of lists of ids
515 (new, modified, removed)
516 from the specified revision to the current situation.
521 # Cache version string for efficiency.
522 if not hasattr(self, '_version'):
523 self._version = self._vcs_version()
526 def version_cmp(self, *args):
527 """Compare the installed VCS version `V_i` with another version
528 `V_o` (given in `*args`). Returns
539 >>> v = VCS(repo='.')
540 >>> v._version = '2.3.1 (release)'
541 >>> v.version_cmp(2,3,1)
543 >>> v.version_cmp(2,3,2)
545 >>> v.version_cmp(2,3,'a',5)
547 >>> v.version_cmp(2,3,0)
549 >>> v.version_cmp(2,3,1,'a',5)
551 >>> v.version_cmp(2,3,1,1)
555 >>> v._version = '2.0.0pre2'
556 >>> v._parsed_version = None
559 >>> v.version_cmp(2,0,1)
561 >>> v.version_cmp(2,0,0,'pre',1)
563 >>> v.version_cmp(2,0,0,'pre',2)
565 >>> v.version_cmp(2,0,0,'pre',3)
567 >>> v.version_cmp(2,0,0,'a',3)
569 >>> v.version_cmp(2,0,0,'rc',1)
572 if not hasattr(self, '_parsed_version') \
573 or self._parsed_version == None:
574 num_part = self.version().split(' ')[0]
575 self._parsed_version = []
576 for num in num_part.split('.'):
578 self._parsed_version.append(int(num))
579 except ValueError, e:
580 # bzr version number might contain non-numerical tags
581 splitter = re.compile(r'[\D]') # Match non-digits
582 splits = splitter.split(num)
583 # if len(tag) > 1 some splits will be empty; remove
584 splits = filter(lambda s: s != '', splits)
585 tag_starti = len(splits[0])
586 num_starti = num.find(splits[1], tag_starti)
587 tag = num[tag_starti:num_starti]
588 self._parsed_version.append(int(splits[0]))
589 self._parsed_version.append(tag)
590 self._parsed_version.append(int(splits[1]))
591 for current,other in zip(self._parsed_version, args):
592 if type(current) != type (other):
593 # one of them is a pre-release string
594 if type(current) != types.IntType:
598 c = cmp(current,other)
601 # see if one is longer than the other
602 verlen = len(self._parsed_version)
606 elif verlen > arglen:
607 if type(self._parsed_version[arglen]) != types.IntType:
608 return -1 # self is a prerelease
612 if type(args[verlen]) != types.IntType:
613 return 1 # args is a prerelease
618 if self.version() != None:
622 def get_user_id(self):
624 Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
625 If the VCS has not been configured with a username, return None.
626 You can override the automatic lookup procedure by setting the
627 VCS.user_id attribute to a string of your choice.
629 if not hasattr(self, 'user_id'):
630 self.user_id = self._vcs_get_user_id()
631 if self.user_id == None:
633 name = libbe.ui.util.user.get_fallback_fullname()
634 email = libbe.ui.util.user.get_fallback_email()
635 self.user_id = libbe.ui.util.user.create_user_id(name, email)
638 def _detect(self, path='.'):
640 Detect whether a directory is revision controlled with this VCS.
642 return self._vcs_detect(path)
645 """Set the root directory to the path's VCS root.
647 This is the default working directory for future invocations.
648 Consider the following usage case:
650 You have a project rooted in::
654 by which I mean the VCS repository is in, for example::
658 However, you're of in some subdirectory like::
660 /path/to/source/ui/testing
662 and you want to comment on a bug. `root` will locate your VCS
663 root (``/path/to/source/``) and set the repo there. This
664 means that it doesn't matter where you are in your project
665 tree when you call "be COMMAND", it always acts as if you called
666 it from the VCS root.
668 if self._detect(self.repo) == False:
669 raise VCSUnableToRoot(self)
670 root = self._vcs_root(self.repo)
671 self.repo = os.path.realpath(root)
672 if os.path.isdir(self.repo) == False:
673 self.repo = os.path.dirname(self.repo)
674 self.be_dir = os.path.join(
675 self.repo, self._cached_path_id._spacer_dirs[0])
676 self._cached_path_id.root(self.repo)
681 Begin versioning the tree based at self.repo.
682 Also roots the vcs at path.
686 root : called if the VCS has already been initialized.
688 if not os.path.exists(self.repo) or not os.path.isdir(self.repo):
689 raise VCSUnableToRoot(self)
690 if self._vcs_detect(self.repo) == False:
691 self._vcs_init(self.repo)
692 if self._rooted == False:
694 os.mkdir(self.be_dir)
695 self._vcs_add(self._u_rel_path(self.be_dir))
696 self._setup_storage_version()
697 self._cached_path_id.init()
701 self._cached_path_id.destroy()
702 if os.path.exists(self.be_dir):
703 shutil.rmtree(self.be_dir)
706 if self._rooted == False:
708 if not os.path.isdir(self.be_dir):
709 raise libbe.storage.base.ConnectionError(self)
710 self._cached_path_id.connect()
711 self.check_storage_version()
713 def _disconnect(self):
714 self._cached_path_id.disconnect()
716 def path(self, id, revision=None, relpath=True):
718 path = self._cached_path_id.path(id)
720 return self._u_rel_path(path)
722 path = self._vcs_path(id, revision)
725 return os.path.join(self.repo, path)
727 def _add_path(self, path, directory=False):
728 relpath = self._u_rel_path(path)
729 reldirs = relpath.split(os.path.sep)
730 if directory == False:
731 reldirs = reldirs[:-1]
733 for reldir in reldirs:
734 dir = os.path.join(dir, reldir)
735 if not os.path.exists(dir):
737 self._vcs_add(self._u_rel_path(dir))
738 elif not os.path.isdir(dir):
739 raise libbe.storage.base.InvalidDirectory
740 if directory == False:
741 if not os.path.exists(path):
742 open(path, 'w').close()
743 self._vcs_add(self._u_rel_path(path))
745 def _add(self, id, parent=None, **kwargs):
746 path = self._cached_path_id.add_id(id, parent)
747 self._add_path(path, **kwargs)
749 def _exists(self, id, revision=None):
752 path = self.path(id, revision, relpath=False)
755 return os.path.exists(path)
756 path = self.path(id, revision, relpath=True)
757 return self._vcs_exists(relpath, revision)
759 def _remove(self, id):
760 path = self._cached_path_id.path(id)
761 if os.path.exists(path):
762 if os.path.isdir(path) and len(self.children(id)) > 0:
763 raise libbe.storage.base.DirectoryNotEmpty(id)
764 self._vcs_remove(self._u_rel_path(path))
765 if os.path.exists(path):
766 if os.path.isdir(path):
770 self._cached_path_id.remove_id(id)
772 def _recursive_remove(self, id):
773 path = self._cached_path_id.path(id)
774 for dirpath,dirnames,filenames in os.walk(path, topdown=False):
775 filenames.extend(dirnames)
777 fullpath = os.path.join(dirpath, f)
778 if os.path.exists(fullpath) == False:
780 self._vcs_remove(self._u_rel_path(fullpath))
781 if os.path.exists(path):
783 path = self._cached_path_id.path(id, relpath=True)
784 for id,p in self._cached_path_id._cache.items():
785 if p.startswith(path):
786 self._cached_path_id.remove_id(id)
788 def _ancestors(self, id=None, revision=None):
792 path = self.path(id, revision, relpath=False)
795 if not path.startswith(self.repo + os.path.sep):
797 path = os.path.dirname(path)
799 id = self._u_path_to_id(path)
801 except (SpacerCollision, InvalidPath):
805 def _children(self, id=None, revision=None):
807 isdir = os.path.isdir
810 isdir = lambda path : self._vcs_isdir(
811 self._u_rel_path(path), revision)
812 listdir = lambda path : self._vcs_listdir(
813 self._u_rel_path(path), revision)
817 path = self.path(id, revision, relpath=False)
818 if isdir(path) == False:
820 children = listdir(path)
821 for i,c in enumerate(children):
822 if c in self._cached_path_id._spacer_dirs:
824 children.extend([os.path.join(c, c2) for c2 in
825 listdir(os.path.join(path, c))])
826 elif c in ['id-cache', 'version']:
828 elif self.interspersed_vcs_files \
829 and self._vcs_is_versioned(c) == False:
831 for i,c in enumerate(children):
832 if c == None: continue
833 cpath = os.path.join(path, c)
834 if self.interspersed_vcs_files == True \
835 and revision != None \
836 and self._vcs_is_versioned(cpath) == False:
839 children[i] = self._u_path_to_id(cpath)
840 return [c for c in children if c != None]
842 def _get(self, id, default=libbe.util.InvalidObject, revision=None):
844 relpath = self.path(id, revision, relpath=True)
845 contents = self._vcs_get_file_contents(relpath, revision)
847 if default == libbe.util.InvalidObject:
850 if contents in [libbe.storage.base.InvalidDirectory,
851 libbe.util.InvalidObject] \
852 or len(contents) == 0:
853 if default == libbe.util.InvalidObject:
854 raise InvalidID(id, revision)
858 def _set(self, id, value):
860 path = self._cached_path_id.path(id)
863 if not os.path.exists(path):
865 if os.path.isdir(path):
866 raise libbe.storage.base.InvalidDirectory(id)
870 self._vcs_update(self._u_rel_path(path))
872 def _commit(self, summary, body=None, allow_empty=False):
873 summary = summary.strip()+'\n'
875 summary += '\n' + body.strip() + '\n'
876 descriptor, filename = tempfile.mkstemp()
879 temp_file = os.fdopen(descriptor, 'wb')
880 temp_file.write(summary)
882 revision = self._vcs_commit(filename, allow_empty=allow_empty)
888 def revision_id(self, index=None):
892 if int(index) != index:
893 raise InvalidRevision(index)
895 raise InvalidRevision(index)
896 revid = self._vcs_revision_id(index)
898 raise libbe.storage.base.InvalidRevision(index)
901 def changed(self, revision):
902 new,mod,rem = self._vcs_changed(revision)
903 def paths_to_ids(paths):
906 id = self._u_path_to_id(p)
908 except (SpacerCollision, InvalidPath):
910 new_id = list(paths_to_ids(new))
911 mod_id = list(paths_to_ids(mod))
912 rem_id = list(paths_to_ids(rem))
913 return (new_id, mod_id, rem_id)
915 def _u_any_in_string(self, list, string):
916 """Return True if any of the strings in list are in string.
917 Otherwise return False.
919 for list_string in list:
920 if list_string in string:
924 def _u_invoke(self, *args, **kwargs):
925 if 'cwd' not in kwargs:
926 kwargs['cwd'] = self.repo
927 if 'encoding' not in kwargs:
928 kwargs['encoding'] = self.encoding
929 return invoke(*args, **kwargs)
931 def _u_invoke_client(self, *args, **kwargs):
932 cl_args = [self.client]
934 return self._u_invoke(cl_args, **kwargs)
936 def _u_search_parent_directories(self, path, filename):
937 """Find the file (or directory) named filename in path or in any of
941 search_parent_directories("/a/b/c", ".be")
942 will return the path to the first existing file from
947 or None if none of those files exist.
950 ret = search_parent_directories(path, filename)
951 except AssertionError, e:
955 def _u_find_id_from_manifest(self, id, manifest, revision=None):
956 """Search for the relative path to id using manifest, a list of all
959 Returns None if the id is not found.
961 be_dir = self._cached_path_id._spacer_dirs[0]
962 be_dir_sep = self._cached_path_id._spacer_dirs[0] + os.path.sep
963 files = [f for f in manifest if f.startswith(be_dir_sep)]
965 if not file.startswith(be_dir+os.path.sep):
967 parts = file.split(os.path.sep)
968 dir = parts.pop(0) # don't add the first spacer dir
969 for part in parts[:-1]:
970 dir = os.path.join(dir, part)
975 p_id = self._u_path_to_id(file)
978 except (SpacerCollision, InvalidPath):
980 raise InvalidID(id, revision=revision)
982 def _u_find_id(self, id, revision):
983 """Search for the relative path to id as of revision.
985 Returns None if the id is not found.
987 assert self._rooted == True
988 be_dir = self._cached_path_id._spacer_dirs[0]
989 stack = [(be_dir, be_dir)]
990 while len(stack) > 0:
991 path,long_id = stack.pop()
992 if long_id.endswith('/'+id):
994 if self._vcs_isdir(path, revision) == False:
996 for child in self._vcs_listdir(path, revision):
997 stack.append((os.path.join(path, child),
998 '/'.join([long_id, child])))
999 raise InvalidID(id, revision=revision)
1001 def _u_path_to_id(self, path):
1002 return self._cached_path_id.id(path)
1004 def _u_rel_path(self, path, root=None):
1005 """Return the relative path to path from root.
1010 >>> vcs._u_rel_path("/a.b/c/.be", "/a.b/c")
1012 >>> vcs._u_rel_path("/a.b/c/", "/a.b/c")
1014 >>> vcs._u_rel_path("/a.b/c/", "/a.b/c/")
1016 >>> vcs._u_rel_path("./a", ".")
1020 if self.repo == None:
1021 raise VCSNotRooted(self)
1023 path = os.path.abspath(path)
1024 absRoot = os.path.abspath(root)
1025 absRootSlashedDir = os.path.join(absRoot,"")
1026 if path in [absRoot, absRootSlashedDir]:
1028 if not path.startswith(absRootSlashedDir):
1029 raise InvalidPath(path, absRootSlashedDir)
1030 relpath = path[len(absRootSlashedDir):]
1033 def _u_abspath(self, path, root=None):
1034 """Return the absolute path from a path relative to root.
1040 >>> vcs._u_abspath(".be", "/a.b/c")
1044 assert self.repo != None, "VCS not rooted"
1046 return os.path.abspath(os.path.join(root, path))
1048 def _u_parse_commitfile(self, commitfile):
1049 """Split the commitfile created in self.commit() back into summary and
1052 f = codecs.open(commitfile, 'r', self.encoding)
1053 summary = f.readline()
1059 return (summary, body)
1061 def check_storage_version(self):
1062 version = self.storage_version()
1063 if version != libbe.storage.STORAGE_VERSION:
1064 upgrade.upgrade(self.repo, version)
1066 def storage_version(self, revision=None, path=None):
1067 """Return the storage version of the on-disk files.
1071 libbe.storage.util.upgrade
1074 path = os.path.join(self.repo, '.be', 'version')
1075 if not os.path.exists(path):
1076 raise libbe.storage.InvalidStorageVersion(None)
1077 if revision == None: # don't require connection
1078 return libbe.util.encoding.get_file_contents(
1079 path, decode=True).rstrip()
1080 relpath = self._u_rel_path(path)
1081 contents = self._vcs_get_file_contents(relpath, revision=revision)
1082 if type(contents) != types.UnicodeType:
1083 contents = unicode(contents, self.encoding)
1084 return contents.strip()
1086 def _setup_storage_version(self):
1088 Requires disk access.
1090 assert self._rooted == True
1091 path = os.path.join(self.be_dir, 'version')
1092 if not os.path.exists(path):
1093 libbe.util.encoding.set_file_contents(path,
1094 libbe.storage.STORAGE_VERSION+'\n')
1095 self._vcs_add(self._u_rel_path(path))
1098 if libbe.TESTING == True:
1099 class VCSTestCase (unittest.TestCase):
1101 Test cases for base VCS class (in addition to the Storage test
1107 def __init__(self, *args, **kwargs):
1108 super(VCSTestCase, self).__init__(*args, **kwargs)
1112 """Set up test fixtures for Storage test case."""
1113 super(VCSTestCase, self).setUp()
1115 self.dirname = self.dir.path
1116 self.s = self.Class(repo=self.dirname)
1117 if self.s.installed() == True:
1122 super(VCSTestCase, self).tearDown()
1123 if self.s.installed() == True:
1128 class VCS_installed_TestCase (VCSTestCase):
1129 def test_installed(self):
1130 """See if the VCS is installed.
1132 self.failUnless(self.s.installed() == True,
1133 '%(name)s VCS not found' % vars(self.Class))
1136 class VCS_detection_TestCase (VCSTestCase):
1137 def test_detection(self):
1138 """See if the VCS detects its installed repository
1140 if self.s.installed():
1142 self.failUnless(self.s._detect(self.dirname) == True,
1143 'Did not detected %(name)s VCS after initialising'
1147 def test_no_detection(self):
1148 """See if the VCS detects its installed repository
1150 if self.s.installed() and self.Class.name != 'None':
1153 self.failUnless(self.s._detect(self.dirname) == False,
1154 'Detected %(name)s VCS before initialising'
1159 def test_vcs_repo_in_specified_root_path(self):
1160 """VCS root directory should be in specified root path."""
1161 rp = os.path.realpath(self.s.repo)
1162 dp = os.path.realpath(self.dirname)
1163 vcs_name = self.Class.name
1165 dp == rp or rp == None,
1166 "%(vcs_name)s VCS root in wrong dir (%(dp)s %(rp)s)" % vars())
1168 class VCS_get_user_id_TestCase(VCSTestCase):
1169 """Test cases for VCS.get_user_id method."""
1171 def test_get_existing_user_id(self):
1172 """Should get the existing user ID."""
1173 if self.s.installed():
1174 user_id = self.s.get_user_id()
1177 name,email = libbe.ui.util.user.parse_user_id(user_id)
1179 self.failUnless('@' in email, email)
1181 def make_vcs_testcase_subclasses(vcs_class, namespace):
1184 if c.versioned == True:
1185 libbe.storage.base.make_versioned_storage_testcase_subclasses(
1186 vcs_class, namespace)
1188 libbe.storage.base.make_storage_testcase_subclasses(
1189 vcs_class, namespace)
1191 if namespace != sys.modules[__name__]:
1192 # Make VCSTestCase subclasses for vcs_class in the namespace.
1193 vcs_testcase_classes = [
1195 ob for ob in globals().values() if isinstance(ob, type))
1196 if issubclass(c, VCSTestCase) \
1199 for base_class in vcs_testcase_classes:
1200 testcase_class_name = vcs_class.__name__ + base_class.__name__
1201 testcase_class_bases = (base_class,)
1202 testcase_class_dict = dict(base_class.__dict__)
1203 testcase_class_dict['Class'] = vcs_class
1204 testcase_class = type(
1205 testcase_class_name, testcase_class_bases, testcase_class_dict)
1206 setattr(namespace, testcase_class_name, testcase_class)
1208 make_vcs_testcase_subclasses(VCS, sys.modules[__name__])
1210 unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
1211 suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])