1 # Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc.
2 # Alexander Belchenko <bialix@ukr.net>
3 # Ben Finney <benf@cybersource.com.au>
4 # Chris Ball <cjb@laptop.org>
5 # Gianluca Montecchi <gian@grys.it>
6 # W. Trevor King <wking@drexel.edu>
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 2 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License along
19 # with this program; if not, write to the Free Software Foundation, Inc.,
20 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
23 Define the base VCS (Version Control System) class, which should be
24 subclassed by other Version Control System backends. The base class
25 implements a "do not version" VCS.
39 import libbe.storage.base
40 import libbe.util.encoding
41 from libbe.storage.base import EmptyCommit, InvalidRevision, InvalidID
42 from libbe.util.utility import Dir, search_parent_directories
43 from libbe.util.subproc import CommandError, invoke
44 from libbe.util.plugin import import_by_name
45 import libbe.storage.util.upgrade as upgrade
47 if libbe.TESTING == True:
51 import libbe.ui.util.user
53 # List VCS modules in order of preference.
54 # Don't list this module, it is implicitly last.
55 VCS_ORDER = ['arch', 'bzr', 'darcs', 'git', 'hg']
57 def set_preferred_vcs(name):
59 assert name in VCS_ORDER, \
60 'unrecognized VCS %s not in\n %s' % (name, VCS_ORDER)
61 VCS_ORDER.remove(name)
62 VCS_ORDER.insert(0, name)
64 def _get_matching_vcs(matchfn):
65 """Return the first module for which matchfn(VCS_instance) is true"""
66 for submodname in VCS_ORDER:
67 module = import_by_name('libbe.storage.vcs.%s' % submodname)
69 if matchfn(vcs) == True:
73 def vcs_by_name(vcs_name):
74 """Return the module for the VCS with the given name"""
75 if vcs_name == VCS.name:
77 return _get_matching_vcs(lambda vcs: vcs.name == vcs_name)
80 """Return an VCS instance for the vcs being used in this directory"""
81 return _get_matching_vcs(lambda vcs: vcs._detect(dir))
84 """Return an instance of an installed VCS"""
85 return _get_matching_vcs(lambda vcs: vcs.installed())
88 class VCSNotRooted (libbe.storage.base.ConnectionError):
89 def __init__(self, vcs):
90 msg = 'VCS not rooted'
91 libbe.storage.base.ConnectionError.__init__(self, msg)
94 class VCSUnableToRoot (libbe.storage.base.ConnectionError):
95 def __init__(self, vcs):
96 msg = 'VCS unable to root'
97 libbe.storage.base.ConnectionError.__init__(self, msg)
100 class InvalidPath (InvalidID):
101 def __init__(self, path, root, msg=None, **kwargs):
103 msg = 'Path "%s" not in root "%s"' % (path, root)
104 InvalidID.__init__(self, msg=msg, **kwargs)
108 class SpacerCollision (InvalidPath):
109 def __init__(self, path, spacer):
110 msg = 'Path "%s" collides with spacer directory "%s"' % (path, spacer)
111 InvalidPath.__init__(self, path, root=None, msg=msg)
114 class NoSuchFile (InvalidID):
115 def __init__(self, pathname, root='.'):
116 path = os.path.abspath(os.path.join(root, pathname))
117 InvalidID.__init__(self, 'No such file: %s' % path)
120 class CachedPathID (object):
122 Storage ID <-> path policy.
123 .../.be/BUGDIR/bugs/BUG/comments/COMMENT
127 >>> os.mkdir(os.path.join(dir.path, '.be'))
128 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc'))
129 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs'))
130 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123'))
131 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments'))
132 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def'))
133 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '456'))
134 >>> file(os.path.join(dir.path, '.be', 'abc', 'values'),
136 >>> file(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'values'),
138 >>> file(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'),
140 >>> c = CachedPathID()
142 >>> c.id(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'))
145 >>> sorted(os.listdir(os.path.join(c._root, '.be')))
148 >>> c.path('123/values') # doctest: +ELLIPSIS
149 u'.../.be/abc/bugs/123/values'
152 >>> sorted(os.listdir(os.path.join(c._root, '.be')))
154 >>> c.connect() # demonstrate auto init
155 >>> sorted(os.listdir(os.path.join(c._root, '.be')))
157 >>> c.add_id(u'xyz', parent=None) # doctest: +ELLIPSIS
159 >>> c.add_id('xyz/def', parent='xyz') # doctest: +ELLIPSIS
161 >>> c.add_id('qrs', parent='123') # doctest: +ELLIPSIS
162 u'.../.be/abc/bugs/123/comments/qrs'
165 >>> c.path('qrs') # doctest: +ELLIPSIS
166 u'.../.be/abc/bugs/123/comments/qrs'
167 >>> c.remove_id('qrs')
169 Traceback (most recent call last):
171 InvalidID: qrs in revision None
176 def __init__(self, encoding=None):
177 self.encoding = libbe.util.encoding.get_filesystem_encoding()
178 self._spacer_dirs = ['.be', 'bugs', 'comments']
180 def root(self, path):
181 self._root = os.path.abspath(path).rstrip(os.path.sep)
182 self._cache_path = os.path.join(
183 self._root, self._spacer_dirs[0], 'id-cache')
187 Create cache file for an existing .be directory.
188 File if multiple lines of the form:
192 spaced_root = os.path.join(self._root, self._spacer_dirs[0])
193 for dirpath, dirnames, filenames in os.walk(spaced_root):
194 if dirpath == spaced_root:
197 id = self.id(dirpath)
198 relpath = dirpath[len(self._root)+1:]
199 if id.count('/') == 0:
200 if id in self._cache:
201 print >> sys.stderr, 'Multiple paths for %s: \n %s\n %s' % (id, self._cache[id], relpath)
202 self._cache[id] = relpath
209 if os.path.exists(self._cache_path):
210 os.remove(self._cache_path)
213 if not os.path.exists(self._cache_path):
217 raise libbe.storage.base.ConnectionError
218 self._cache = {} # key: uuid, value: path
219 self._changed = False
220 f = codecs.open(self._cache_path, 'r', self.encoding)
222 fields = line.rstrip('\n').split('\t')
223 self._cache[fields[0]] = fields[1]
226 def disconnect(self):
227 if self._changed == True:
228 f = codecs.open(self._cache_path, 'w', self.encoding)
229 for uuid,path in self._cache.items():
230 f.write('%s\t%s\n' % (uuid, path))
234 def path(self, id, relpath=False):
235 fields = id.split('/', 1)
241 if uuid not in self._cache:
242 raise InvalidID(uuid)
244 return os.path.join(self._cache[uuid], *extra)
245 return os.path.join(self._root, self._cache[uuid], *extra)
247 def add_id(self, id, parent=None):
248 if id.count('/') > 0:
249 # not a UUID-level path
250 assert id.startswith(parent), \
251 'Strange ID: "%s" should start with "%s"' % (id, parent)
253 elif id in self._cache:
259 spacer = self._spacer_dirs[0]
261 assert parent.count('/') == 0, \
262 'Strange parent ID: "%s" should be UUID' % parent
263 parent_path = self.path(parent, relpath=True)
264 parent_spacer = parent_path.split(os.path.sep)[-2]
265 i = self._spacer_dirs.index(parent_spacer)
266 spacer = self._spacer_dirs[i+1]
267 path = os.path.join(parent_path, spacer, id)
268 self._cache[id] = path
270 path = os.path.join(self._root, path)
273 def remove_id(self, id):
274 if id.count('/') > 0:
275 return # not a UUID-level path
280 path = os.path.join(self._root, path)
281 if not path.startswith(self._root + os.path.sep):
282 raise InvalidPath(path, self._root)
283 path = path[len(self._root)+1:]
285 if not path.startswith(self._spacer_dirs[0] + os.path.sep):
286 raise InvalidPath(path, self._spacer_dirs[0])
287 for spacer in self._spacer_dirs:
288 if not path.startswith(spacer + os.path.sep):
290 id = path[len(spacer)+1:]
291 fields = path[len(spacer)+1:].split(os.path.sep,1)
295 for spacer in self._spacer_dirs:
296 if id.endswith(os.path.sep + spacer):
297 raise SpacerCollision(orig_path, spacer)
298 if os.path.sep != '/':
299 id = id.replace(os.path.sep, '/')
306 class VCS (libbe.storage.base.VersionedStorage):
308 This class implements a 'no-vcs' interface.
310 Support for other VCSs can be added by subclassing this class, and
311 overriding methods _vcs_*() with code appropriate for your VCS.
313 The methods _u_*() are utility methods available to the _vcs_*()
316 Sink to existing root
317 ======================
319 Consider the following usage case:
320 You have a bug directory rooted in
322 by which I mean the '.be' directory is at
324 However, you're of in some subdirectory like
325 /path/to/source/GUI/testing
326 and you want to comment on a bug. Setting sink_to_root=True when
327 you initialize your BugDir will cause it to search for the '.be'
328 file in the ancestors of the path you passed in as 'root'.
329 /path/to/source/GUI/testing/.be miss
330 /path/to/source/GUI/.be miss
331 /path/to/source/.be hit!
332 So it still roots itself appropriately without much work for you.
337 BugDirs live completely in memory when .sync_with_disk is False.
338 This is the default configuration setup by BugDir(from_disk=False).
339 If .sync_with_disk == True (e.g. BugDir(from_disk=True)), then
340 any changes to the BugDir will be immediately written to disk.
342 If you want to change .sync_with_disk, we suggest you use
343 .set_sync_with_disk(), which propogates the new setting through to
344 all bugs/comments/etc. that have been loaded into memory. If
345 you've been living in memory and want to move to
346 .sync_with_disk==True, but you're not sure if anything has been
347 changed in memory, a call to .save() immediately before the
348 .set_sync_with_disk(True) call is a safe move.
350 Regardless of .sync_with_disk, a call to .save() will write out
351 all the contents that the BugDir instance has loaded into memory.
352 If sync_with_disk has been True over the course of all interesting
353 changes, this .save() call will be a waste of time.
355 The BugDir will only load information from the file system when it
356 loads new settings/bugs/comments that it doesn't already have in
357 memory and .sync_with_disk == True.
359 Allow storage initialization
360 ========================
362 This one is for testing purposes. Setting it to True allows the
363 BugDir to search for an installed Storage backend and initialize
364 it in the root directory. This is a convenience option for
365 supporting tests of versioning functionality
366 (e.g. RevisionedBugDir).
368 Disable encoding manipulation
369 =============================
371 This one is for testing purposed. You might have non-ASCII
372 Unicode in your bugs, comments, files, etc. BugDir instances try
373 and support your preferred encoding scheme (e.g. "utf-8") when
374 dealing with stream and file input/output. For stream output,
375 this involves replacing sys.stdout and sys.stderr
376 (libbe.encode.set_IO_stream_encodings). However this messes up
377 doctest's output catching. In order to support doctest tests
378 using BugDirs, set manipulate_encodings=False, and stick to ASCII
383 if sink_to_existing_root == True:
384 self.root = self._find_root(root)
386 if not os.path.exists(root):
388 raise NoRootEntry(root)
390 # get a temporary storage until we've loaded settings
391 self.sync_with_disk = False
392 self.storage = self._guess_storage()
394 if assert_new_BugDir == True:
395 if os.path.exists(self.get_path()):
396 raise AlreadyInitialized, self.get_path()
398 storage = self._guess_storage(allow_storage_init)
399 self.storage = storage
400 self._setup_user_id(self.user_id)
403 # methods for getting the BugDir situated in the filesystem
405 def _find_root(self, path):
407 Search for an existing bug database dir and it's ancestors and
408 return a BugDir rooted there. Only called by __init__, and
409 then only if sink_to_existing_root == True.
411 if not os.path.exists(path):
413 raise NoRootEntry(path)
414 versionfile=utility.search_parent_directories(path,
415 os.path.join(".be", "version"))
416 if versionfile != None:
417 beroot = os.path.dirname(versionfile)
418 root = os.path.dirname(beroot)
421 beroot = utility.search_parent_directories(path, ".be")
427 def _guess_storage(self, allow_storage_init=False):
429 Only called by __init__.
431 deepdir = self.get_path()
432 if not os.path.exists(deepdir):
433 deepdir = os.path.dirname(deepdir)
434 new_storage = storage.detect_storage(deepdir)
436 if new_storage.name == "None":
437 if allow_storage_init == True:
438 new_storage = storage.installed_storage()
439 new_storage.init(self.root)
442 os.listdir(self.get_path("bugs")):
445 client = 'false' # command-line tool for _u_invoke_client
447 def __init__(self, *args, **kwargs):
448 if 'encoding' not in kwargs:
449 kwargs['encoding'] = libbe.util.encoding.get_filesystem_encoding()
450 libbe.storage.base.VersionedStorage.__init__(self, *args, **kwargs)
451 self.versioned = False
452 self.interspersed_vcs_files = False
453 self.verbose_invoke = False
454 self._cached_path_id = CachedPathID()
457 def _vcs_version(self):
459 Return the VCS version string.
463 def _vcs_get_user_id(self):
465 Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
466 If the VCS has not been configured with a username, return None.
470 def _vcs_detect(self, path=None):
472 Detect whether a directory is revision controlled with this VCS.
476 def _vcs_root(self, path):
478 Get the VCS root. This is the default working directory for
479 future invocations. You would normally set this to the root
480 directory for your VCS.
482 if os.path.isdir(path) == False:
483 path = os.path.dirname(path)
485 path = os.path.abspath('.')
488 def _vcs_init(self, path):
490 Begin versioning the tree based at path.
494 def _vcs_destroy(self):
496 Remove any files used in versioning (e.g. whatever _vcs_init()
501 def _vcs_add(self, path):
503 Add the already created file at path to version control.
507 def _vcs_remove(self, path):
509 Remove the file at path from version control. Optionally
510 remove the file from the filesystem as well.
514 def _vcs_update(self, path):
516 Notify the versioning system of changes to the versioned file
521 def _vcs_is_versioned(self, path):
523 Return true if a path is under version control, False
524 otherwise. You only need to set this if the VCS goes about
525 dumping VCS-specific files into the .be directory.
527 If you do need to implement this method (e.g. Arch), set
528 self.interspersed_vcs_files = True
530 assert self.interspersed_vcs_files == False
531 raise NotImplementedError
533 def _vcs_get_file_contents(self, path, revision=None):
535 Get the file contents as they were in a given revision.
536 Revision==None specifies the current revision.
539 raise libbe.storage.base.InvalidRevision(
540 'The %s VCS does not support revision specifiers' % self.name)
541 path = os.path.join(self.repo, path)
542 if not os.path.exists(path):
543 return libbe.util.InvalidObject
544 if os.path.isdir(path):
545 return libbe.storage.base.InvalidDirectory
551 def _vcs_path(self, id, revision):
553 Return the path to object id as of revision.
555 Revision will not be None.
557 raise NotImplementedError
559 def _vcs_isdir(self, path, revision):
561 Return True if path (as returned by _vcs_path) was a directory
562 as of revision, False otherwise.
564 Revision will not be None.
566 raise NotImplementedError
568 def _vcs_listdir(self, path, revision):
570 Return a list of the contents of the directory path (as
571 returned by _vcs_path) as of revision.
573 Revision will not be None, and ._vcs_isdir(path, revision)
576 raise NotImplementedError
578 def _vcs_commit(self, commitfile, allow_empty=False):
580 Commit the current working directory, using the contents of
581 commitfile as the comment. Return the name of the old
582 revision (or None if commits are not supported).
584 If allow_empty == False, raise EmptyCommit if there are no
589 def _vcs_revision_id(self, index):
591 Return the name of the <index>th revision. Index will be an
592 integer (possibly <= 0). The choice of which branch to follow
593 when crossing branches/merges is not defined.
595 Return None if revision IDs are not supported, or if the
596 specified revision does not exist.
600 def _vcs_changed(self, revision):
602 Return a tuple of lists of ids
603 (new, modified, removed)
604 from the specified revision to the current situation.
609 # Cache version string for efficiency.
610 if not hasattr(self, '_version'):
611 self._version = self._get_version()
614 def _get_version(self):
616 ret = self._vcs_version()
619 if e.errno == errno.ENOENT:
627 if self.version() != None:
631 def get_user_id(self):
633 Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
634 If the VCS has not been configured with a username, return None.
635 You can override the automatic lookup procedure by setting the
636 VCS.user_id attribute to a string of your choice.
638 if not hasattr(self, 'user_id'):
639 self.user_id = self._vcs_get_user_id()
642 def _detect(self, path='.'):
644 Detect whether a directory is revision controlled with this VCS.
646 return self._vcs_detect(path)
650 Set the root directory to the path's VCS root. This is the
651 default working directory for future invocations.
653 if self._detect(self.repo) == False:
654 raise VCSUnableToRoot(self)
655 root = self._vcs_root(self.repo)
656 self.repo = os.path.abspath(root)
657 if os.path.isdir(self.repo) == False:
658 self.repo = os.path.dirname(self.repo)
659 self.be_dir = os.path.join(
660 self.repo, self._cached_path_id._spacer_dirs[0])
661 self._cached_path_id.root(self.repo)
666 Begin versioning the tree based at self.repo.
667 Also roots the vcs at path.
669 if not os.path.exists(self.repo) or not os.path.isdir(self.repo):
670 raise VCSUnableToRoot(self)
671 if self._vcs_detect(self.repo) == False:
672 self._vcs_init(self.repo)
673 if self._rooted == False:
675 os.mkdir(self.be_dir)
676 self._vcs_add(self._u_rel_path(self.be_dir))
677 self._setup_storage_version()
678 self._cached_path_id.init()
682 self._cached_path_id.destroy()
683 if os.path.exists(self.be_dir):
684 shutil.rmtree(self.be_dir)
687 if self._rooted == False:
689 if not os.path.isdir(self.be_dir):
690 raise libbe.storage.base.ConnectionError(self)
691 self._cached_path_id.connect()
692 self.check_storage_version()
694 def _disconnect(self):
695 self._cached_path_id.disconnect()
697 def _add_path(self, path, directory=False):
698 relpath = self._u_rel_path(path)
699 reldirs = relpath.split(os.path.sep)
700 if directory == False:
701 reldirs = reldirs[:-1]
703 for reldir in reldirs:
704 dir = os.path.join(dir, reldir)
705 if not os.path.exists(dir):
707 self._vcs_add(self._u_rel_path(dir))
708 elif not os.path.isdir(dir):
709 raise libbe.storage.base.InvalidDirectory
710 if directory == False:
711 if not os.path.exists(path):
712 open(path, 'w').close()
713 self._vcs_add(self._u_rel_path(path))
715 def _add(self, id, parent=None, **kwargs):
716 path = self._cached_path_id.add_id(id, parent)
717 self._add_path(path, **kwargs)
719 def _remove(self, id):
720 path = self._cached_path_id.path(id)
721 if os.path.exists(path):
722 if os.path.isdir(path) and len(self.children(id)) > 0:
723 raise libbe.storage.base.DirectoryNotEmpty(id)
724 self._vcs_remove(self._u_rel_path(path))
725 if os.path.exists(path):
726 if os.path.isdir(path):
730 self._cached_path_id.remove_id(id)
732 def _recursive_remove(self, id):
733 path = self._cached_path_id.path(id)
734 for dirpath,dirnames,filenames in os.walk(path, topdown=False):
735 filenames.extend(dirnames)
737 fullpath = os.path.join(dirpath, f)
738 if os.path.exists(fullpath) == False:
740 self._vcs_remove(self._u_rel_path(fullpath))
741 if os.path.exists(path):
743 path = self._cached_path_id.path(id, relpath=True)
744 for id,p in self._cached_path_id._cache.items():
745 if p.startswith(path):
746 self._cached_path_id.remove_id(id)
748 def _ancestors(self, id=None, revision=None):
750 id_to_path = self._cached_path_id.path
752 id_to_path = lambda id : self._vcs_path(id, revision)
756 path = id_to_path(id)
759 if not path.startswith(self.repo + os.path.sep):
761 path = os.path.dirname(path)
763 id = self._u_path_to_id(path)
765 except (SpacerCollision, InvalidPath):
769 def _children(self, id=None, revision=None):
771 id_to_path = self._cached_path_id.path
772 isdir = os.path.isdir
775 id_to_path = lambda id : self._vcs_path(id, revision)
776 isdir = lambda path : self._vcs_isdir(
777 self._u_rel_path(path), revision)
778 listdir = lambda path : self._vcs_listdir(
779 self._u_rel_path(path), revision)
783 path = id_to_path(id)
784 if isdir(path) == False:
786 children = listdir(path)
787 for i,c in enumerate(children):
788 if c in self._cached_path_id._spacer_dirs:
790 children.extend([os.path.join(c, c2) for c2 in
791 listdir(os.path.join(path, c))])
792 elif c in ['id-cache', 'version']:
794 elif self.interspersed_vcs_files \
795 and self._vcs_is_versioned(c) == False:
797 for i,c in enumerate(children):
798 if c == None: continue
799 cpath = os.path.join(path, c)
800 if self.interspersed_vcs_files == True \
801 and revision != None \
802 and self._vcs_is_versioned(cpath) == False:
805 children[i] = self._u_path_to_id(cpath)
807 return [c for c in children if c != None]
809 def _get(self, id, default=libbe.util.InvalidObject, revision=None):
811 path = self._cached_path_id.path(id)
813 if default == libbe.util.InvalidObject:
816 relpath = self._u_rel_path(path)
818 contents = self._vcs_get_file_contents(relpath, revision)
822 if e.revision == None:
823 e.revision = revision
825 if contents in [libbe.storage.base.InvalidDirectory,
826 libbe.util.InvalidObject]:
827 raise InvalidID(id, revision)
828 elif len(contents) == 0:
832 def _set(self, id, value):
834 path = self._cached_path_id.path(id)
837 if not os.path.exists(path):
839 if os.path.isdir(path):
840 raise libbe.storage.base.InvalidDirectory(id)
844 self._vcs_update(self._u_rel_path(path))
846 def _commit(self, summary, body=None, allow_empty=False):
847 summary = summary.strip()+'\n'
849 summary += '\n' + body.strip() + '\n'
850 descriptor, filename = tempfile.mkstemp()
853 temp_file = os.fdopen(descriptor, 'wb')
854 temp_file.write(summary)
856 revision = self._vcs_commit(filename, allow_empty=allow_empty)
862 def revision_id(self, index=None):
866 if int(index) != index:
867 raise InvalidRevision(index)
869 raise InvalidRevision(index)
870 revid = self._vcs_revision_id(index)
872 raise libbe.storage.base.InvalidRevision(index)
875 def changed(self, revision):
876 new,mod,rem = self._vcs_changed(revision)
877 def paths_to_ids(paths):
880 id = self._u_path_to_id(p)
882 except (SpacerCollision, InvalidPath):
884 new_id = list(paths_to_ids(new))
885 mod_id = list(paths_to_ids(mod))
886 rem_id = list(paths_to_ids(rem))
887 return (new_id, mod_id, rem_id)
889 def _u_any_in_string(self, list, string):
891 Return True if any of the strings in list are in string.
892 Otherwise return False.
894 for list_string in list:
895 if list_string in string:
899 def _u_invoke(self, *args, **kwargs):
900 if 'cwd' not in kwargs:
901 kwargs['cwd'] = self.repo
902 if 'verbose' not in kwargs:
903 kwargs['verbose'] = self.verbose_invoke
904 if 'encoding' not in kwargs:
905 kwargs['encoding'] = self.encoding
906 return invoke(*args, **kwargs)
908 def _u_invoke_client(self, *args, **kwargs):
909 cl_args = [self.client]
911 return self._u_invoke(cl_args, **kwargs)
913 def _u_search_parent_directories(self, path, filename):
915 Find the file (or directory) named filename in path or in any
919 search_parent_directories("/a/b/c", ".be")
920 will return the path to the first existing file from
925 or None if none of those files exist.
928 ret = search_parent_directories(path, filename)
929 except AssertionError, e:
933 def _u_find_id_from_manifest(self, id, manifest, revision=None):
935 Search for the relative path to id using manifest, a list of all files.
937 Returns None if the id is not found.
939 be_dir = self._cached_path_id._spacer_dirs[0]
940 be_dir_sep = self._cached_path_id._spacer_dirs[0] + os.path.sep
941 files = [f for f in manifest if f.startswith(be_dir_sep)]
943 if not file.startswith(be_dir+os.path.sep):
945 parts = file.split(os.path.sep)
946 dir = parts.pop(0) # don't add the first spacer dir
947 for part in parts[:-1]:
948 dir = os.path.join(dir, part)
953 p_id = self._u_path_to_id(file)
956 except (SpacerCollision, InvalidPath):
958 raise InvalidID(id, revision=revision)
960 def _u_find_id(self, id, revision):
962 Search for the relative path to id as of revision.
963 Returns None if the id is not found.
965 assert self._rooted == True
966 be_dir = self._cached_path_id._spacer_dirs[0]
967 stack = [(be_dir, be_dir)]
968 while len(stack) > 0:
969 path,long_id = stack.pop()
970 if long_id.endswith('/'+id):
972 if self._vcs_isdir(path, revision) == False:
974 for child in self._vcs_listdir(path, revision):
975 stack.append((os.path.join(path, child),
976 '/'.join([long_id, child])))
977 raise InvalidID(id, revision=revision)
979 def _u_path_to_id(self, path):
980 return self._cached_path_id.id(path)
982 def _u_rel_path(self, path, root=None):
984 Return the relative path to path from root.
986 >>> vcs._u_rel_path("/a.b/c/.be", "/a.b/c")
988 >>> vcs._u_rel_path("/a.b/c/", "/a.b/c")
990 >>> vcs._u_rel_path("/a.b/c/", "/a.b/c/")
992 >>> vcs._u_rel_path("./a", ".")
996 if self.repo == None:
997 raise VCSNotRooted(self)
999 path = os.path.abspath(path)
1000 absRoot = os.path.abspath(root)
1001 absRootSlashedDir = os.path.join(absRoot,"")
1002 if path in [absRoot, absRootSlashedDir]:
1004 if not path.startswith(absRootSlashedDir):
1005 raise InvalidPath(path, absRootSlashedDir)
1006 relpath = path[len(absRootSlashedDir):]
1009 def _u_abspath(self, path, root=None):
1011 Return the absolute path from a path realtive to root.
1013 >>> vcs._u_abspath(".be", "/a.b/c")
1017 assert self.repo != None, "VCS not rooted"
1019 return os.path.abspath(os.path.join(root, path))
1021 def _u_parse_commitfile(self, commitfile):
1023 Split the commitfile created in self.commit() back into
1024 summary and header lines.
1026 f = codecs.open(commitfile, 'r', self.encoding)
1027 summary = f.readline()
1033 return (summary, body)
1035 def check_storage_version(self):
1036 version = self.storage_version()
1037 if version != libbe.storage.STORAGE_VERSION:
1038 upgrade.upgrade(self.repo, version)
1040 def storage_version(self, revision=None, path=None):
1042 Requires disk access.
1045 path = os.path.join(self.repo, '.be', 'version')
1046 if not os.path.exists(path):
1047 raise libbe.storage.InvalidStorageVersion(None)
1048 if revision == None: # don't require connection
1049 return libbe.util.encoding.get_file_contents(
1050 path, decode=True).rstrip('\n')
1051 relpath = self._u_rel_path(path)
1052 contents = self._vcs_get_file_contents(relpath, revision=revision)
1053 if type(contents) != types.UnicodeType:
1054 contents = unicode(contents, self.encoding)
1055 return contents.strip()
1057 def _setup_storage_version(self):
1059 Requires disk access.
1061 assert self._rooted == True
1062 path = os.path.join(self.be_dir, 'version')
1063 if not os.path.exists(path):
1064 libbe.util.encoding.set_file_contents(path,
1065 libbe.storage.STORAGE_VERSION+'\n')
1066 self._vcs_add(self._u_rel_path(path))
1069 if libbe.TESTING == True:
1070 class VCSTestCase (unittest.TestCase):
1072 Test cases for base VCS class (in addition to the Storage test
1078 def __init__(self, *args, **kwargs):
1079 super(VCSTestCase, self).__init__(*args, **kwargs)
1083 """Set up test fixtures for Storage test case."""
1084 super(VCSTestCase, self).setUp()
1086 self.dirname = self.dir.path
1087 self.s = self.Class(repo=self.dirname)
1088 if self.s.installed() == True:
1093 super(VCSTestCase, self).tearDown()
1094 if self.s.installed() == True:
1099 class VCS_installed_TestCase (VCSTestCase):
1100 def test_installed(self):
1101 """See if the VCS is installed.
1103 self.failUnless(self.s.installed() == True,
1104 '%(name)s VCS not found' % vars(self.Class))
1107 class VCS_detection_TestCase (VCSTestCase):
1108 def test_detection(self):
1109 """See if the VCS detects its installed repository
1111 if self.s.installed():
1113 self.failUnless(self.s._detect(self.dirname) == True,
1114 'Did not detected %(name)s VCS after initialising'
1118 def test_no_detection(self):
1119 """See if the VCS detects its installed repository
1121 if self.s.installed() and self.Class.name != 'None':
1124 self.failUnless(self.s._detect(self.dirname) == False,
1125 'Detected %(name)s VCS before initialising'
1130 def test_vcs_repo_in_specified_root_path(self):
1131 """VCS root directory should be in specified root path."""
1132 rp = os.path.realpath(self.s.repo)
1133 dp = os.path.realpath(self.dirname)
1134 vcs_name = self.Class.name
1136 dp == rp or rp == None,
1137 "%(vcs_name)s VCS root in wrong dir (%(dp)s %(rp)s)" % vars())
1139 class VCS_get_user_id_TestCase(VCSTestCase):
1140 """Test cases for VCS.get_user_id method."""
1142 def test_gets_existing_user_id(self):
1143 """Should get the existing user ID."""
1144 if self.s.installed():
1145 user_id = self.s.get_user_id()
1148 name,email = libbe.ui.util.user.parse_user_id(user_id)
1150 self.failUnless('@' in email, email)
1152 def make_vcs_testcase_subclasses(vcs_class, namespace):
1155 if c.versioned == True:
1156 libbe.storage.base.make_versioned_storage_testcase_subclasses(
1157 vcs_class, namespace)
1159 libbe.storage.base.make_storage_testcase_subclasses(
1160 vcs_class, namespace)
1162 if namespace != sys.modules[__name__]:
1163 # Make VCSTestCase subclasses for vcs_class in the namespace.
1164 vcs_testcase_classes = [
1166 ob for ob in globals().values() if isinstance(ob, type))
1167 if issubclass(c, VCSTestCase) \
1170 for base_class in vcs_testcase_classes:
1171 testcase_class_name = vcs_class.__name__ + base_class.__name__
1172 testcase_class_bases = (base_class,)
1173 testcase_class_dict = dict(base_class.__dict__)
1174 testcase_class_dict['Class'] = vcs_class
1175 testcase_class = type(
1176 testcase_class_name, testcase_class_bases, testcase_class_dict)
1177 setattr(namespace, testcase_class_name, testcase_class)
1179 make_vcs_testcase_subclasses(VCS, sys.modules[__name__])
1181 unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
1182 suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])