1 # Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc.
2 # Alexander Belchenko <bialix@ukr.net>
3 # Ben Finney <benf@cybersource.com.au>
4 # Chris Ball <cjb@laptop.org>
5 # Gianluca Montecchi <gian@grys.it>
6 # W. Trevor King <wking@drexel.edu>
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 2 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License along
19 # with this program; if not, write to the Free Software Foundation, Inc.,
20 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22 """Define the base :class:`VCS` (Version Control System) class, which
23 should be subclassed by other Version Control System backends. The
24 base class implements a "do not version" VCS.
38 import libbe.storage.base
39 import libbe.util.encoding
40 from libbe.storage.base import EmptyCommit, InvalidRevision, InvalidID
41 from libbe.util.utility import Dir, search_parent_directories
42 from libbe.util.subproc import CommandError, invoke
43 from libbe.util.plugin import import_by_name
44 import libbe.storage.util.upgrade as upgrade
46 if libbe.TESTING == True:
50 import libbe.ui.util.user
52 VCS_ORDER = ['arch', 'bzr', 'darcs', 'git', 'hg']
53 """List VCS modules in order of preference.
55 Don't list this module, it is implicitly last.
58 def set_preferred_vcs(name):
59 """Manipulate :data:`VCS_ORDER` to place `name` first.
61 This is primarily indended for testing purposes.
64 assert name in VCS_ORDER, \
65 'unrecognized VCS %s not in\n %s' % (name, VCS_ORDER)
66 VCS_ORDER.remove(name)
67 VCS_ORDER.insert(0, name)
69 def _get_matching_vcs(matchfn):
70 """Return the first module for which matchfn(VCS_instance) is True.
72 Searches in :data:`VCS_ORDER`.
74 for submodname in VCS_ORDER:
75 module = import_by_name('libbe.storage.vcs.%s' % submodname)
77 if matchfn(vcs) == True:
81 def vcs_by_name(vcs_name):
82 """Return the module for the VCS with the given name.
84 Searches in :data:`VCS_ORDER`.
86 if vcs_name == VCS.name:
88 return _get_matching_vcs(lambda vcs: vcs.name == vcs_name)
91 """Return an VCS instance for the vcs being used in this directory.
93 Searches in :data:`VCS_ORDER`.
95 return _get_matching_vcs(lambda vcs: vcs._detect(dir))
98 """Return an instance of an installed VCS.
100 Searches in :data:`VCS_ORDER`.
102 return _get_matching_vcs(lambda vcs: vcs.installed())
105 class VCSNotRooted (libbe.storage.base.ConnectionError):
106 def __init__(self, vcs):
107 msg = 'VCS not rooted'
108 libbe.storage.base.ConnectionError.__init__(self, msg)
111 class VCSUnableToRoot (libbe.storage.base.ConnectionError):
112 def __init__(self, vcs):
113 msg = 'VCS unable to root'
114 libbe.storage.base.ConnectionError.__init__(self, msg)
117 class InvalidPath (InvalidID):
118 def __init__(self, path, root, msg=None, **kwargs):
120 msg = 'Path "%s" not in root "%s"' % (path, root)
121 InvalidID.__init__(self, msg=msg, **kwargs)
125 class SpacerCollision (InvalidPath):
126 def __init__(self, path, spacer):
127 msg = 'Path "%s" collides with spacer directory "%s"' % (path, spacer)
128 InvalidPath.__init__(self, path, root=None, msg=msg)
131 class NoSuchFile (InvalidID):
132 def __init__(self, pathname, root='.'):
133 path = os.path.abspath(os.path.join(root, pathname))
134 InvalidID.__init__(self, 'No such file: %s' % path)
137 class CachedPathID (object):
138 """Cache Storage ID <-> path policy.
140 Paths generated following::
142 .../.be/BUGDIR/bugs/BUG/comments/COMMENT
145 See :mod:`libbe.util.id` for a discussion of ID formats.
151 >>> os.mkdir(os.path.join(dir.path, '.be'))
152 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc'))
153 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs'))
154 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123'))
155 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments'))
156 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def'))
157 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '456'))
158 >>> file(os.path.join(dir.path, '.be', 'abc', 'values'),
160 >>> file(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'values'),
162 >>> file(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'),
164 >>> c = CachedPathID()
166 >>> c.id(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'))
169 >>> sorted(os.listdir(os.path.join(c._root, '.be')))
172 >>> c.path('123/values') # doctest: +ELLIPSIS
173 u'.../.be/abc/bugs/123/values'
176 >>> sorted(os.listdir(os.path.join(c._root, '.be')))
178 >>> c.connect() # demonstrate auto init
179 >>> sorted(os.listdir(os.path.join(c._root, '.be')))
181 >>> c.add_id(u'xyz', parent=None) # doctest: +ELLIPSIS
183 >>> c.add_id('xyz/def', parent='xyz') # doctest: +ELLIPSIS
185 >>> c.add_id('qrs', parent='123') # doctest: +ELLIPSIS
186 u'.../.be/abc/bugs/123/comments/qrs'
189 >>> c.path('qrs') # doctest: +ELLIPSIS
190 u'.../.be/abc/bugs/123/comments/qrs'
191 >>> c.remove_id('qrs')
193 Traceback (most recent call last):
195 InvalidID: qrs in revision None
200 def __init__(self, encoding=None):
201 self.encoding = libbe.util.encoding.get_filesystem_encoding()
202 self._spacer_dirs = ['.be', 'bugs', 'comments']
204 def root(self, path):
205 self._root = os.path.abspath(path).rstrip(os.path.sep)
206 self._cache_path = os.path.join(
207 self._root, self._spacer_dirs[0], 'id-cache')
209 def init(self, verbose=True, cache=None):
210 """Create cache file for an existing .be directory.
212 The file contains multiple lines of the form::
220 spaced_root = os.path.join(self._root, self._spacer_dirs[0])
221 for dirpath, dirnames, filenames in os.walk(spaced_root):
222 if dirpath == spaced_root:
225 id = self.id(dirpath)
226 relpath = dirpath[len(self._root)+1:]
227 if id.count('/') == 0:
228 if verbose == True and id in self._cache:
229 print >> sys.stderr, 'Multiple paths for %s: \n %s\n %s' % (id, self._cache[id], relpath)
230 self._cache[id] = relpath
233 if self._cache != cache:
239 if os.path.exists(self._cache_path):
240 os.remove(self._cache_path)
243 if not os.path.exists(self._cache_path):
247 raise libbe.storage.base.ConnectionError
248 self._cache = {} # key: uuid, value: path
249 self._changed = False
250 f = codecs.open(self._cache_path, 'r', self.encoding)
252 fields = line.rstrip('\n').split('\t')
253 self._cache[fields[0]] = fields[1]
256 def disconnect(self):
257 if self._changed == True:
258 f = codecs.open(self._cache_path, 'w', self.encoding)
259 for uuid,path in self._cache.items():
260 f.write('%s\t%s\n' % (uuid, path))
264 def path(self, id, relpath=False):
265 fields = id.split('/', 1)
271 if uuid not in self._cache:
272 self.init(verbose=False, cache=self._cache)
273 if uuid not in self._cache:
274 raise InvalidID(uuid)
276 return os.path.join(self._cache[uuid], *extra)
277 return os.path.join(self._root, self._cache[uuid], *extra)
279 def add_id(self, id, parent=None):
280 if id.count('/') > 0:
281 # not a UUID-level path
282 assert id.startswith(parent), \
283 'Strange ID: "%s" should start with "%s"' % (id, parent)
285 elif id in self._cache:
291 spacer = self._spacer_dirs[0]
293 assert parent.count('/') == 0, \
294 'Strange parent ID: "%s" should be UUID' % parent
295 parent_path = self.path(parent, relpath=True)
296 parent_spacer = parent_path.split(os.path.sep)[-2]
297 i = self._spacer_dirs.index(parent_spacer)
298 spacer = self._spacer_dirs[i+1]
299 path = os.path.join(parent_path, spacer, id)
300 self._cache[id] = path
302 path = os.path.join(self._root, path)
305 def remove_id(self, id):
306 if id.count('/') > 0:
307 return # not a UUID-level path
312 path = os.path.join(self._root, path)
313 if not path.startswith(self._root + os.path.sep):
314 raise InvalidPath(path, self._root)
315 path = path[len(self._root)+1:]
317 if not path.startswith(self._spacer_dirs[0] + os.path.sep):
318 raise InvalidPath(path, self._spacer_dirs[0])
319 for spacer in self._spacer_dirs:
320 if not path.startswith(spacer + os.path.sep):
322 id = path[len(spacer)+1:]
323 fields = path[len(spacer)+1:].split(os.path.sep,1)
327 for spacer in self._spacer_dirs:
328 if id.endswith(os.path.sep + spacer):
329 raise SpacerCollision(orig_path, spacer)
330 if os.path.sep != '/':
331 id = id.replace(os.path.sep, '/')
338 class VCS (libbe.storage.base.VersionedStorage):
339 """Implement a 'no-VCS' interface.
341 Support for other VCSs can be added by subclassing this class, and
342 overriding methods `_vcs_*()` with code appropriate for your VCS.
344 The methods `_u_*()` are utility methods available to the `_vcs_*()`
348 client = 'false' # command-line tool for _u_invoke_client
350 def __init__(self, *args, **kwargs):
351 if 'encoding' not in kwargs:
352 kwargs['encoding'] = libbe.util.encoding.get_filesystem_encoding()
353 libbe.storage.base.VersionedStorage.__init__(self, *args, **kwargs)
354 self.versioned = False
355 self.interspersed_vcs_files = False
356 self.verbose_invoke = False
357 self._cached_path_id = CachedPathID()
360 def _vcs_version(self):
362 Return the VCS version string.
366 def _vcs_get_user_id(self):
368 Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
369 If the VCS has not been configured with a username, return None.
373 def _vcs_detect(self, path=None):
375 Detect whether a directory is revision controlled with this VCS.
379 def _vcs_root(self, path):
381 Get the VCS root. This is the default working directory for
382 future invocations. You would normally set this to the root
383 directory for your VCS.
385 if os.path.isdir(path) == False:
386 path = os.path.dirname(path)
388 path = os.path.abspath('.')
391 def _vcs_init(self, path):
393 Begin versioning the tree based at path.
397 def _vcs_destroy(self):
399 Remove any files used in versioning (e.g. whatever _vcs_init()
404 def _vcs_add(self, path):
406 Add the already created file at path to version control.
410 def _vcs_exists(self, path, revision=None):
412 Does the path exist in a given revision? (True/False)
414 raise NotImplementedError('Lazy BE developers')
416 def _vcs_remove(self, path):
418 Remove the file at path from version control. Optionally
419 remove the file from the filesystem as well.
423 def _vcs_update(self, path):
425 Notify the versioning system of changes to the versioned file
430 def _vcs_is_versioned(self, path):
432 Return true if a path is under version control, False
433 otherwise. You only need to set this if the VCS goes about
434 dumping VCS-specific files into the .be directory.
436 If you do need to implement this method (e.g. Arch), set
437 self.interspersed_vcs_files = True
439 assert self.interspersed_vcs_files == False
440 raise NotImplementedError
442 def _vcs_get_file_contents(self, path, revision=None):
444 Get the file contents as they were in a given revision.
445 Revision==None specifies the current revision.
448 raise libbe.storage.base.InvalidRevision(
449 'The %s VCS does not support revision specifiers' % self.name)
450 path = os.path.join(self.repo, path)
451 if not os.path.exists(path):
452 return libbe.util.InvalidObject
453 if os.path.isdir(path):
454 return libbe.storage.base.InvalidDirectory
460 def _vcs_path(self, id, revision):
462 Return the relative path to object id as of revision.
464 Revision will not be None.
466 raise NotImplementedError
468 def _vcs_isdir(self, path, revision):
470 Return True if path (as returned by _vcs_path) was a directory
471 as of revision, False otherwise.
473 Revision will not be None.
475 raise NotImplementedError
477 def _vcs_listdir(self, path, revision):
479 Return a list of the contents of the directory path (as
480 returned by _vcs_path) as of revision.
482 Revision will not be None, and ._vcs_isdir(path, revision)
485 raise NotImplementedError
487 def _vcs_commit(self, commitfile, allow_empty=False):
489 Commit the current working directory, using the contents of
490 commitfile as the comment. Return the name of the old
491 revision (or None if commits are not supported).
493 If allow_empty == False, raise EmptyCommit if there are no
498 def _vcs_revision_id(self, index):
500 Return the name of the <index>th revision. Index will be an
501 integer (possibly <= 0). The choice of which branch to follow
502 when crossing branches/merges is not defined.
504 Return None if revision IDs are not supported, or if the
505 specified revision does not exist.
509 def _vcs_changed(self, revision):
511 Return a tuple of lists of ids
512 (new, modified, removed)
513 from the specified revision to the current situation.
518 # Cache version string for efficiency.
519 if not hasattr(self, '_version'):
520 self._version = self._get_version()
523 def _get_version(self):
525 ret = self._vcs_version()
528 if e.errno == errno.ENOENT:
536 if self.version() != None:
540 def get_user_id(self):
542 Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
543 If the VCS has not been configured with a username, return None.
544 You can override the automatic lookup procedure by setting the
545 VCS.user_id attribute to a string of your choice.
547 if not hasattr(self, 'user_id'):
548 self.user_id = self._vcs_get_user_id()
551 def _detect(self, path='.'):
553 Detect whether a directory is revision controlled with this VCS.
555 return self._vcs_detect(path)
558 """Set the root directory to the path's VCS root.
560 This is the default working directory for future invocations.
561 Consider the following usage case:
563 You have a project rooted in::
567 by which I mean the VCS repository is in, for example::
571 However, you're of in some subdirectory like::
573 /path/to/source/ui/testing
575 and you want to comment on a bug. `root` will locate your VCS
576 root (``/path/to/source/``) and set the repo there. This
577 means that it doesn't matter where you are in your project
578 tree when you call "be COMMAND", it always acts as if you called
579 it from the VCS root.
581 if self._detect(self.repo) == False:
582 raise VCSUnableToRoot(self)
583 root = self._vcs_root(self.repo)
584 self.repo = os.path.abspath(root)
585 if os.path.isdir(self.repo) == False:
586 self.repo = os.path.dirname(self.repo)
587 self.be_dir = os.path.join(
588 self.repo, self._cached_path_id._spacer_dirs[0])
589 self._cached_path_id.root(self.repo)
594 Begin versioning the tree based at self.repo.
595 Also roots the vcs at path.
599 root : called if the VCS has already been initialized.
601 if not os.path.exists(self.repo) or not os.path.isdir(self.repo):
602 raise VCSUnableToRoot(self)
603 if self._vcs_detect(self.repo) == False:
604 self._vcs_init(self.repo)
605 if self._rooted == False:
607 os.mkdir(self.be_dir)
608 self._vcs_add(self._u_rel_path(self.be_dir))
609 self._setup_storage_version()
610 self._cached_path_id.init()
614 self._cached_path_id.destroy()
615 if os.path.exists(self.be_dir):
616 shutil.rmtree(self.be_dir)
619 if self._rooted == False:
621 if not os.path.isdir(self.be_dir):
622 raise libbe.storage.base.ConnectionError(self)
623 self._cached_path_id.connect()
624 self.check_storage_version()
626 def _disconnect(self):
627 self._cached_path_id.disconnect()
629 def path(self, id, revision=None, relpath=True):
631 path = self._cached_path_id.path(id)
633 return self._u_rel_path(path)
635 path = self._vcs_path(id, revision)
638 return os.path.join(self.repo, path)
640 def _add_path(self, path, directory=False):
641 relpath = self._u_rel_path(path)
642 reldirs = relpath.split(os.path.sep)
643 if directory == False:
644 reldirs = reldirs[:-1]
646 for reldir in reldirs:
647 dir = os.path.join(dir, reldir)
648 if not os.path.exists(dir):
650 self._vcs_add(self._u_rel_path(dir))
651 elif not os.path.isdir(dir):
652 raise libbe.storage.base.InvalidDirectory
653 if directory == False:
654 if not os.path.exists(path):
655 open(path, 'w').close()
656 self._vcs_add(self._u_rel_path(path))
658 def _add(self, id, parent=None, **kwargs):
659 path = self._cached_path_id.add_id(id, parent)
660 self._add_path(path, **kwargs)
662 def _exists(self, id, revision=None):
665 path = self.path(id, revision, relpath=False)
668 return os.path.exists(path)
669 path = self.path(id, revision, relpath=True)
670 return self._vcs_exists(relpath, revision)
672 def _remove(self, id):
673 path = self._cached_path_id.path(id)
674 if os.path.exists(path):
675 if os.path.isdir(path) and len(self.children(id)) > 0:
676 raise libbe.storage.base.DirectoryNotEmpty(id)
677 self._vcs_remove(self._u_rel_path(path))
678 if os.path.exists(path):
679 if os.path.isdir(path):
683 self._cached_path_id.remove_id(id)
685 def _recursive_remove(self, id):
686 path = self._cached_path_id.path(id)
687 for dirpath,dirnames,filenames in os.walk(path, topdown=False):
688 filenames.extend(dirnames)
690 fullpath = os.path.join(dirpath, f)
691 if os.path.exists(fullpath) == False:
693 self._vcs_remove(self._u_rel_path(fullpath))
694 if os.path.exists(path):
696 path = self._cached_path_id.path(id, relpath=True)
697 for id,p in self._cached_path_id._cache.items():
698 if p.startswith(path):
699 self._cached_path_id.remove_id(id)
701 def _ancestors(self, id=None, revision=None):
705 path = self.path(id, revision, relpath=False)
708 if not path.startswith(self.repo + os.path.sep):
710 path = os.path.dirname(path)
712 id = self._u_path_to_id(path)
714 except (SpacerCollision, InvalidPath):
718 def _children(self, id=None, revision=None):
720 isdir = os.path.isdir
723 isdir = lambda path : self._vcs_isdir(
724 self._u_rel_path(path), revision)
725 listdir = lambda path : self._vcs_listdir(
726 self._u_rel_path(path), revision)
730 path = self.path(id, revision, relpath=False)
731 if isdir(path) == False:
733 children = listdir(path)
734 for i,c in enumerate(children):
735 if c in self._cached_path_id._spacer_dirs:
737 children.extend([os.path.join(c, c2) for c2 in
738 listdir(os.path.join(path, c))])
739 elif c in ['id-cache', 'version']:
741 elif self.interspersed_vcs_files \
742 and self._vcs_is_versioned(c) == False:
744 for i,c in enumerate(children):
745 if c == None: continue
746 cpath = os.path.join(path, c)
747 if self.interspersed_vcs_files == True \
748 and revision != None \
749 and self._vcs_is_versioned(cpath) == False:
752 children[i] = self._u_path_to_id(cpath)
754 return [c for c in children if c != None]
756 def _get(self, id, default=libbe.util.InvalidObject, revision=None):
758 relpath = self.path(id, revision, relpath=True)
759 contents = self._vcs_get_file_contents(relpath, revision)
761 if default == libbe.util.InvalidObject:
764 if contents in [libbe.storage.base.InvalidDirectory,
765 libbe.util.InvalidObject] \
766 or len(contents) == 0:
767 if default == libbe.util.InvalidObject:
768 raise InvalidID(id, revision)
772 def _set(self, id, value):
774 path = self._cached_path_id.path(id)
777 if not os.path.exists(path):
779 if os.path.isdir(path):
780 raise libbe.storage.base.InvalidDirectory(id)
784 self._vcs_update(self._u_rel_path(path))
786 def _commit(self, summary, body=None, allow_empty=False):
787 summary = summary.strip()+'\n'
789 summary += '\n' + body.strip() + '\n'
790 descriptor, filename = tempfile.mkstemp()
793 temp_file = os.fdopen(descriptor, 'wb')
794 temp_file.write(summary)
796 revision = self._vcs_commit(filename, allow_empty=allow_empty)
802 def revision_id(self, index=None):
806 if int(index) != index:
807 raise InvalidRevision(index)
809 raise InvalidRevision(index)
810 revid = self._vcs_revision_id(index)
812 raise libbe.storage.base.InvalidRevision(index)
815 def changed(self, revision):
816 new,mod,rem = self._vcs_changed(revision)
817 def paths_to_ids(paths):
820 id = self._u_path_to_id(p)
822 except (SpacerCollision, InvalidPath):
824 new_id = list(paths_to_ids(new))
825 mod_id = list(paths_to_ids(mod))
826 rem_id = list(paths_to_ids(rem))
827 return (new_id, mod_id, rem_id)
829 def _u_any_in_string(self, list, string):
830 """Return True if any of the strings in list are in string.
831 Otherwise return False.
833 for list_string in list:
834 if list_string in string:
838 def _u_invoke(self, *args, **kwargs):
839 if 'cwd' not in kwargs:
840 kwargs['cwd'] = self.repo
841 if 'verbose' not in kwargs:
842 kwargs['verbose'] = self.verbose_invoke
843 if 'encoding' not in kwargs:
844 kwargs['encoding'] = self.encoding
845 return invoke(*args, **kwargs)
847 def _u_invoke_client(self, *args, **kwargs):
848 cl_args = [self.client]
850 return self._u_invoke(cl_args, **kwargs)
852 def _u_search_parent_directories(self, path, filename):
853 """Find the file (or directory) named filename in path or in any of
857 search_parent_directories("/a/b/c", ".be")
858 will return the path to the first existing file from
863 or None if none of those files exist.
866 ret = search_parent_directories(path, filename)
867 except AssertionError, e:
871 def _u_find_id_from_manifest(self, id, manifest, revision=None):
872 """Search for the relative path to id using manifest, a list of all
875 Returns None if the id is not found.
877 be_dir = self._cached_path_id._spacer_dirs[0]
878 be_dir_sep = self._cached_path_id._spacer_dirs[0] + os.path.sep
879 files = [f for f in manifest if f.startswith(be_dir_sep)]
881 if not file.startswith(be_dir+os.path.sep):
883 parts = file.split(os.path.sep)
884 dir = parts.pop(0) # don't add the first spacer dir
885 for part in parts[:-1]:
886 dir = os.path.join(dir, part)
891 p_id = self._u_path_to_id(file)
894 except (SpacerCollision, InvalidPath):
896 raise InvalidID(id, revision=revision)
898 def _u_find_id(self, id, revision):
899 """Search for the relative path to id as of revision.
901 Returns None if the id is not found.
903 assert self._rooted == True
904 be_dir = self._cached_path_id._spacer_dirs[0]
905 stack = [(be_dir, be_dir)]
906 while len(stack) > 0:
907 path,long_id = stack.pop()
908 if long_id.endswith('/'+id):
910 if self._vcs_isdir(path, revision) == False:
912 for child in self._vcs_listdir(path, revision):
913 stack.append((os.path.join(path, child),
914 '/'.join([long_id, child])))
915 raise InvalidID(id, revision=revision)
917 def _u_path_to_id(self, path):
918 return self._cached_path_id.id(path)
920 def _u_rel_path(self, path, root=None):
921 """Return the relative path to path from root.
926 >>> vcs._u_rel_path("/a.b/c/.be", "/a.b/c")
928 >>> vcs._u_rel_path("/a.b/c/", "/a.b/c")
930 >>> vcs._u_rel_path("/a.b/c/", "/a.b/c/")
932 >>> vcs._u_rel_path("./a", ".")
936 if self.repo == None:
937 raise VCSNotRooted(self)
939 path = os.path.abspath(path)
940 absRoot = os.path.abspath(root)
941 absRootSlashedDir = os.path.join(absRoot,"")
942 if path in [absRoot, absRootSlashedDir]:
944 if not path.startswith(absRootSlashedDir):
945 raise InvalidPath(path, absRootSlashedDir)
946 relpath = path[len(absRootSlashedDir):]
949 def _u_abspath(self, path, root=None):
950 """Return the absolute path from a path realtive to root.
956 >>> vcs._u_abspath(".be", "/a.b/c")
960 assert self.repo != None, "VCS not rooted"
962 return os.path.abspath(os.path.join(root, path))
964 def _u_parse_commitfile(self, commitfile):
965 """Split the commitfile created in self.commit() back into summary and
968 f = codecs.open(commitfile, 'r', self.encoding)
969 summary = f.readline()
975 return (summary, body)
977 def check_storage_version(self):
978 version = self.storage_version()
979 if version != libbe.storage.STORAGE_VERSION:
980 upgrade.upgrade(self.repo, version)
982 def storage_version(self, revision=None, path=None):
983 """Return the storage version of the on-disk files.
987 :mod:`libbe.storage.util.upgrade`
990 path = os.path.join(self.repo, '.be', 'version')
991 if not os.path.exists(path):
992 raise libbe.storage.InvalidStorageVersion(None)
993 if revision == None: # don't require connection
994 return libbe.util.encoding.get_file_contents(
995 path, decode=True).rstrip('\n')
996 relpath = self._u_rel_path(path)
997 contents = self._vcs_get_file_contents(relpath, revision=revision)
998 if type(contents) != types.UnicodeType:
999 contents = unicode(contents, self.encoding)
1000 return contents.strip()
1002 def _setup_storage_version(self):
1004 Requires disk access.
1006 assert self._rooted == True
1007 path = os.path.join(self.be_dir, 'version')
1008 if not os.path.exists(path):
1009 libbe.util.encoding.set_file_contents(path,
1010 libbe.storage.STORAGE_VERSION+'\n')
1011 self._vcs_add(self._u_rel_path(path))
1014 if libbe.TESTING == True:
1015 class VCSTestCase (unittest.TestCase):
1017 Test cases for base VCS class (in addition to the Storage test
1023 def __init__(self, *args, **kwargs):
1024 super(VCSTestCase, self).__init__(*args, **kwargs)
1028 """Set up test fixtures for Storage test case."""
1029 super(VCSTestCase, self).setUp()
1031 self.dirname = self.dir.path
1032 self.s = self.Class(repo=self.dirname)
1033 if self.s.installed() == True:
1038 super(VCSTestCase, self).tearDown()
1039 if self.s.installed() == True:
1044 class VCS_installed_TestCase (VCSTestCase):
1045 def test_installed(self):
1046 """See if the VCS is installed.
1048 self.failUnless(self.s.installed() == True,
1049 '%(name)s VCS not found' % vars(self.Class))
1052 class VCS_detection_TestCase (VCSTestCase):
1053 def test_detection(self):
1054 """See if the VCS detects its installed repository
1056 if self.s.installed():
1058 self.failUnless(self.s._detect(self.dirname) == True,
1059 'Did not detected %(name)s VCS after initialising'
1063 def test_no_detection(self):
1064 """See if the VCS detects its installed repository
1066 if self.s.installed() and self.Class.name != 'None':
1069 self.failUnless(self.s._detect(self.dirname) == False,
1070 'Detected %(name)s VCS before initialising'
1075 def test_vcs_repo_in_specified_root_path(self):
1076 """VCS root directory should be in specified root path."""
1077 rp = os.path.realpath(self.s.repo)
1078 dp = os.path.realpath(self.dirname)
1079 vcs_name = self.Class.name
1081 dp == rp or rp == None,
1082 "%(vcs_name)s VCS root in wrong dir (%(dp)s %(rp)s)" % vars())
1084 class VCS_get_user_id_TestCase(VCSTestCase):
1085 """Test cases for VCS.get_user_id method."""
1087 def test_gets_existing_user_id(self):
1088 """Should get the existing user ID."""
1089 if self.s.installed():
1090 user_id = self.s.get_user_id()
1093 name,email = libbe.ui.util.user.parse_user_id(user_id)
1095 self.failUnless('@' in email, email)
1097 def make_vcs_testcase_subclasses(vcs_class, namespace):
1100 if c.versioned == True:
1101 libbe.storage.base.make_versioned_storage_testcase_subclasses(
1102 vcs_class, namespace)
1104 libbe.storage.base.make_storage_testcase_subclasses(
1105 vcs_class, namespace)
1107 if namespace != sys.modules[__name__]:
1108 # Make VCSTestCase subclasses for vcs_class in the namespace.
1109 vcs_testcase_classes = [
1111 ob for ob in globals().values() if isinstance(ob, type))
1112 if issubclass(c, VCSTestCase) \
1115 for base_class in vcs_testcase_classes:
1116 testcase_class_name = vcs_class.__name__ + base_class.__name__
1117 testcase_class_bases = (base_class,)
1118 testcase_class_dict = dict(base_class.__dict__)
1119 testcase_class_dict['Class'] = vcs_class
1120 testcase_class = type(
1121 testcase_class_name, testcase_class_bases, testcase_class_dict)
1122 setattr(namespace, testcase_class_name, testcase_class)
1124 make_vcs_testcase_subclasses(VCS, sys.modules[__name__])
1126 unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
1127 suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])