storage:vcs:base: fix _gets_ -> _get_ typo in test_gets_existing_user_id.
[be.git] / libbe / storage / vcs / base.py
1 # Copyright (C) 2005-2012 Aaron Bentley <abentley@panoramicfeedback.com>
2 #                         Alexander Belchenko <bialix@ukr.net>
3 #                         Ben Finney <benf@cybersource.com.au>
4 #                         Chris Ball <cjb@laptop.org>
5 #                         Gianluca Montecchi <gian@grys.it>
6 #                         W. Trevor King <wking@drexel.edu>
7 #
8 # This file is part of Bugs Everywhere.
9 #
10 # Bugs Everywhere is free software: you can redistribute it and/or modify it
11 # under the terms of the GNU General Public License as published by the Free
12 # Software Foundation, either version 2 of the License, or (at your option) any
13 # later version.
14 #
15 # Bugs Everywhere is distributed in the hope that it will be useful, but
16 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
17 # FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
18 # more details.
19 #
20 # You should have received a copy of the GNU General Public License along with
21 # Bugs Everywhere.  If not, see <http://www.gnu.org/licenses/>.
22
23 """Define the base :class:`VCS` (Version Control System) class, which
24 should be subclassed by other Version Control System backends.  The
25 base class implements a "do not version" VCS.
26 """
27
28 import codecs
29 import os
30 import os.path
31 import re
32 import shutil
33 import sys
34 import tempfile
35 import types
36
37 import libbe
38 import libbe.storage
39 import libbe.storage.base
40 import libbe.util.encoding
41 from libbe.storage.base import EmptyCommit, InvalidRevision, InvalidID
42 from libbe.util.utility import Dir, search_parent_directories
43 from libbe.util.subproc import CommandError, invoke
44 from libbe.util.plugin import import_by_name
45 import libbe.storage.util.upgrade as upgrade
46
47 if libbe.TESTING == True:
48     import unittest
49     import doctest
50
51     import libbe.ui.util.user
52
53 VCS_ORDER = ['arch', 'bzr', 'darcs', 'git', 'hg', 'monotone']
54 """List VCS modules in order of preference.
55
56 Don't list this module, it is implicitly last.
57 """
58
59 def set_preferred_vcs(name):
60     """Manipulate :data:`VCS_ORDER` to place `name` first.
61
62     This is primarily indended for testing purposes.
63     """
64     global VCS_ORDER
65     assert name in VCS_ORDER, \
66         'unrecognized VCS %s not in\n  %s' % (name, VCS_ORDER)
67     VCS_ORDER.remove(name)
68     VCS_ORDER.insert(0, name)
69
70 def _get_matching_vcs(matchfn):
71     """Return the first module for which matchfn(VCS_instance) is True.
72
73     Searches in :data:`VCS_ORDER`.
74     """
75     for submodname in VCS_ORDER:
76         module = import_by_name('libbe.storage.vcs.%s' % submodname)
77         vcs = module.new()
78         if matchfn(vcs) == True:
79             return vcs
80     return VCS()
81
82 def vcs_by_name(vcs_name):
83     """Return the module for the VCS with the given name.
84
85     Searches in :data:`VCS_ORDER`.
86     """
87     if vcs_name == VCS.name:
88         return new()
89     return _get_matching_vcs(lambda vcs: vcs.name == vcs_name)
90
91 def detect_vcs(dir):
92     """Return an VCS instance for the vcs being used in this directory.
93
94     Searches in :data:`VCS_ORDER`.
95     """
96     return _get_matching_vcs(lambda vcs: vcs._detect(dir))
97
98 def installed_vcs():
99     """Return an instance of an installed VCS.
100
101     Searches in :data:`VCS_ORDER`.
102     """
103     return _get_matching_vcs(lambda vcs: vcs.installed())
104
105
106 class VCSNotRooted (libbe.storage.base.ConnectionError):
107     def __init__(self, vcs):
108         msg = 'VCS not rooted'
109         libbe.storage.base.ConnectionError.__init__(self, msg)
110         self.vcs = vcs
111
112 class VCSUnableToRoot (libbe.storage.base.ConnectionError):
113     def __init__(self, vcs):
114         msg = 'VCS unable to root'
115         libbe.storage.base.ConnectionError.__init__(self, msg)
116         self.vcs = vcs
117
118 class InvalidPath (InvalidID):
119     def __init__(self, path, root, msg=None, **kwargs):
120         if msg == None:
121             msg = 'Path "%s" not in root "%s"' % (path, root)
122         InvalidID.__init__(self, msg=msg, **kwargs)
123         self.path = path
124         self.root = root
125
126 class SpacerCollision (InvalidPath):
127     def __init__(self, path, spacer):
128         msg = 'Path "%s" collides with spacer directory "%s"' % (path, spacer)
129         InvalidPath.__init__(self, path, root=None, msg=msg)
130         self.spacer = spacer
131
132 class NoSuchFile (InvalidID):
133     def __init__(self, pathname, root='.'):
134         path = os.path.abspath(os.path.join(root, pathname))
135         InvalidID.__init__(self, 'No such file: %s' % path)
136
137
138 class CachedPathID (object):
139     """Cache Storage ID <-> path policy.
140  
141     Paths generated following::
142
143        .../.be/BUGDIR/bugs/BUG/comments/COMMENT
144           ^-- root path
145
146     See :mod:`libbe.util.id` for a discussion of ID formats.
147
148     Examples
149     --------
150
151     >>> dir = Dir()
152     >>> os.mkdir(os.path.join(dir.path, '.be'))
153     >>> os.mkdir(os.path.join(dir.path, '.be', 'abc'))
154     >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs'))
155     >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123'))
156     >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments'))
157     >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def'))
158     >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '456'))
159     >>> open(os.path.join(dir.path, '.be', 'abc', 'values'),
160     ...      'w').close()
161     >>> open(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'values'),
162     ...      'w').close()
163     >>> open(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'),
164     ...      'w').close()
165     >>> c = CachedPathID()
166     >>> c.root(dir.path)
167     >>> c.id(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'))
168     'def/values'
169     >>> c.init()
170     >>> sorted(os.listdir(os.path.join(c._root, '.be')))
171     ['abc', 'id-cache']
172     >>> c.connect()
173     >>> c.path('123/values') # doctest: +ELLIPSIS
174     u'.../.be/abc/bugs/123/values'
175     >>> c.disconnect()
176     >>> c.destroy()
177     >>> sorted(os.listdir(os.path.join(c._root, '.be')))
178     ['abc']
179     >>> c.connect() # demonstrate auto init
180     >>> sorted(os.listdir(os.path.join(c._root, '.be')))
181     ['abc', 'id-cache']
182     >>> c.add_id(u'xyz', parent=None) # doctest: +ELLIPSIS
183     u'.../.be/xyz'
184     >>> c.add_id('xyz/def', parent='xyz') # doctest: +ELLIPSIS
185     u'.../.be/xyz/def'
186     >>> c.add_id('qrs', parent='123') # doctest: +ELLIPSIS
187     u'.../.be/abc/bugs/123/comments/qrs'
188     >>> c.disconnect()
189     >>> c.connect()
190     >>> c.path('qrs') # doctest: +ELLIPSIS
191     u'.../.be/abc/bugs/123/comments/qrs'
192     >>> c.remove_id('qrs')
193     >>> c.path('qrs')
194     Traceback (most recent call last):
195       ...
196     InvalidID: qrs in revision None
197     >>> c.disconnect()
198     >>> c.destroy()
199     >>> dir.cleanup()
200     """
201     def __init__(self, encoding=None):
202         self.encoding = libbe.util.encoding.get_text_file_encoding()
203         self._spacer_dirs = ['.be', 'bugs', 'comments']
204
205     def root(self, path):
206         self._root = os.path.abspath(path).rstrip(os.path.sep)
207         self._cache_path = os.path.join(
208             self._root, self._spacer_dirs[0], 'id-cache')
209
210     def init(self, verbose=True, cache=None):
211         """Create cache file for an existing .be directory.
212
213         The file contains multiple lines of the form::
214
215             UUID\tPATH
216         """
217         if cache == None:
218             self._cache = {}
219         else:
220             self._cache = cache
221         spaced_root = os.path.join(self._root, self._spacer_dirs[0])
222         for dirpath, dirnames, filenames in os.walk(spaced_root,
223                                                     followlinks=True):
224             if dirpath == spaced_root:
225                 continue
226             try:
227                 id = self.id(dirpath)
228                 relpath = dirpath[len(self._root + os.path.sep):]
229                 if id.count('/') == 0:
230                     if verbose == True and id in self._cache:
231                         print >> sys.stderr, 'Multiple paths for %s: \n  %s\n  %s' % (id, self._cache[id], relpath)
232                     self._cache[id] = relpath
233             except InvalidPath:
234                 pass
235         if self._cache != cache:
236             self._changed = True
237         if cache == None:
238             self.disconnect()
239
240     def destroy(self):
241         if os.path.exists(self._cache_path):
242             os.remove(self._cache_path)
243
244     def connect(self):
245         if not os.path.exists(self._cache_path):
246             try:
247                 self.init()
248             except IOError:
249                 raise libbe.storage.base.ConnectionError
250         self._cache = {} # key: uuid, value: path
251         self._changed = False
252         f = codecs.open(self._cache_path, 'r', self.encoding)
253         for line in f:
254             fields = line.rstrip('\n').split('\t')
255             self._cache[fields[0]] = fields[1]
256         f.close()
257
258     def disconnect(self):
259         if self._changed == True:
260             f = codecs.open(self._cache_path, 'w', self.encoding)
261             for uuid,path in self._cache.items():
262                 f.write('%s\t%s\n' % (uuid, path))
263             f.close()
264         self._cache = {}
265
266     def path(self, id, relpath=False):
267         fields = id.split('/', 1)
268         uuid = fields[0]
269         if len(fields) == 1:
270             extra = []
271         else:
272             extra = fields[1:]
273         if uuid not in self._cache:
274             self.init(verbose=False, cache=self._cache)
275             if uuid not in self._cache:
276                 raise InvalidID(uuid)
277         if relpath == True:
278             return os.path.join(self._cache[uuid], *extra)
279         return os.path.join(self._root, self._cache[uuid], *extra)
280
281     def add_id(self, id, parent=None):
282         if id.count('/') > 0:
283             # not a UUID-level path
284             assert id.startswith(parent), \
285                 'Strange ID: "%s" should start with "%s"' % (id, parent)
286             path = self.path(id)
287         elif id in self._cache:
288             # already added
289             path = self.path(id)
290         else:
291             if parent == None:
292                 parent_path = ''
293                 spacer = self._spacer_dirs[0]
294             else:
295                 assert parent.count('/') == 0, \
296                     'Strange parent ID: "%s" should be UUID' % parent
297                 parent_path = self.path(parent, relpath=True)
298                 parent_spacer = parent_path.split(os.path.sep)[-2]
299                 i = self._spacer_dirs.index(parent_spacer)
300                 spacer = self._spacer_dirs[i+1]
301             path = os.path.join(parent_path, spacer, id)
302             self._cache[id] = path
303             self._changed = True
304             path = os.path.join(self._root, path)
305         return path
306
307     def remove_id(self, id):
308         if id.count('/') > 0:
309             return # not a UUID-level path
310         self._cache.pop(id)
311         self._changed = True
312
313     def id(self, path):
314         path = os.path.join(self._root, path)
315         if not path.startswith(self._root + os.path.sep):
316             raise InvalidPath(path, self._root)
317         path = path[len(self._root + os.path.sep):]
318         orig_path = path
319         if not path.startswith(self._spacer_dirs[0] + os.path.sep):
320             raise InvalidPath(path, self._spacer_dirs[0])
321         for spacer in self._spacer_dirs:
322             if not path.startswith(spacer + os.path.sep):
323                 break
324             id = path[len(spacer + os.path.sep):]
325             fields = path[len(spacer + os.path.sep):].split(os.path.sep,1)
326             if len(fields) == 1:
327                 break
328             path = fields[1]
329         for spacer in self._spacer_dirs:
330             if id.endswith(os.path.sep + spacer):
331                 raise SpacerCollision(orig_path, spacer)
332         if os.path.sep != '/':
333             id = id.replace(os.path.sep, '/')
334         return id
335
336
337 def new():
338     return VCS()
339
340 class VCS (libbe.storage.base.VersionedStorage):
341     """Implement a 'no-VCS' interface.
342
343     Support for other VCSs can be added by subclassing this class, and
344     overriding methods `_vcs_*()` with code appropriate for your VCS.
345
346     The methods `_u_*()` are utility methods available to the `_vcs_*()`
347     methods.
348     """
349     name = 'None'
350     client = 'false' # command-line tool for _u_invoke_client
351
352     def __init__(self, *args, **kwargs):
353         if 'encoding' not in kwargs:
354             kwargs['encoding'] = libbe.util.encoding.get_text_file_encoding()
355         libbe.storage.base.VersionedStorage.__init__(self, *args, **kwargs)
356         self.versioned = False
357         self.interspersed_vcs_files = False
358         self.verbose_invoke = False
359         self._cached_path_id = CachedPathID()
360         self._rooted = False
361
362     def _vcs_version(self):
363         """
364         Return the VCS version string.
365         """
366         return '0'
367
368     def _vcs_get_user_id(self):
369         """
370         Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
371         If the VCS has not been configured with a username, return None.
372         """
373         return None
374
375     def _vcs_detect(self, path=None):
376         """
377         Detect whether a directory is revision controlled with this VCS.
378         """
379         return True
380
381     def _vcs_root(self, path):
382         """
383         Get the VCS root.  This is the default working directory for
384         future invocations.  You would normally set this to the root
385         directory for your VCS.
386         """
387         if os.path.isdir(path) == False:
388             path = os.path.dirname(path)
389             if path == '':
390                 path = os.path.abspath('.')
391         return path
392
393     def _vcs_init(self, path):
394         """
395         Begin versioning the tree based at path.
396         """
397         pass
398
399     def _vcs_destroy(self):
400         """
401         Remove any files used in versioning (e.g. whatever _vcs_init()
402         created).
403         """
404         pass
405
406     def _vcs_add(self, path):
407         """
408         Add the already created file at path to version control.
409         """
410         pass
411
412     def _vcs_exists(self, path, revision=None):
413         """
414         Does the path exist in a given revision? (True/False)
415         """
416         raise NotImplementedError('Lazy BE developers')
417
418     def _vcs_remove(self, path):
419         """
420         Remove the file at path from version control.  Optionally
421         remove the file from the filesystem as well.
422         """
423         pass
424
425     def _vcs_update(self, path):
426         """
427         Notify the versioning system of changes to the versioned file
428         at path.
429         """
430         pass
431
432     def _vcs_is_versioned(self, path):
433         """
434         Return true if a path is under version control, False
435         otherwise.  You only need to set this if the VCS goes about
436         dumping VCS-specific files into the .be directory.
437
438         If you do need to implement this method (e.g. Arch), set
439           self.interspersed_vcs_files = True
440         """
441         assert self.interspersed_vcs_files == False
442         raise NotImplementedError
443
444     def _vcs_get_file_contents(self, path, revision=None):
445         """
446         Get the file contents as they were in a given revision.
447         Revision==None specifies the current revision.
448         """
449         if revision != None:
450             raise libbe.storage.base.InvalidRevision(
451                 'The %s VCS does not support revision specifiers' % self.name)
452         path = os.path.join(self.repo, path)
453         if not os.path.exists(path):
454             return libbe.util.InvalidObject
455         if os.path.isdir(path):
456             return libbe.storage.base.InvalidDirectory
457         f = open(path, 'rb')
458         contents = f.read()
459         f.close()
460         return contents
461
462     def _vcs_path(self, id, revision):
463         """
464         Return the relative path to object id as of revision.
465         
466         Revision will not be None.
467         """
468         raise NotImplementedError
469
470     def _vcs_isdir(self, path, revision):
471         """
472         Return True if path (as returned by _vcs_path) was a directory
473         as of revision, False otherwise.
474         
475         Revision will not be None.
476         """
477         raise NotImplementedError
478
479     def _vcs_listdir(self, path, revision):
480         """
481         Return a list of the contents of the directory path (as
482         returned by _vcs_path) as of revision.
483         
484         Revision will not be None, and ._vcs_isdir(path, revision)
485         will be True.
486         """
487         raise NotImplementedError
488
489     def _vcs_commit(self, commitfile, allow_empty=False):
490         """
491         Commit the current working directory, using the contents of
492         commitfile as the comment.  Return the name of the old
493         revision (or None if commits are not supported).
494
495         If allow_empty == False, raise EmptyCommit if there are no
496         changes to commit.
497         """
498         return None
499
500     def _vcs_revision_id(self, index):
501         """
502         Return the name of the <index>th revision.  Index will be an
503         integer (possibly <= 0).  The choice of which branch to follow
504         when crossing branches/merges is not defined.
505
506         Return None if revision IDs are not supported, or if the
507         specified revision does not exist.
508         """
509         return None
510
511     def _vcs_changed(self, revision):
512         """
513         Return a tuple of lists of ids
514           (new, modified, removed)
515         from the specified revision to the current situation.
516         """
517         return ([], [], [])
518
519     def version(self):
520         # Cache version string for efficiency.
521         if not hasattr(self, '_version'):
522             self._version = self._vcs_version()
523         return self._version
524
525     def version_cmp(self, *args):
526         """Compare the installed VCS version `V_i` with another version
527         `V_o` (given in `*args`).  Returns
528
529            === ===============
530             1  if `V_i > V_o`
531             0  if `V_i == V_o`
532            -1  if `V_i < V_o`
533            === ===============
534
535         Examples
536         --------
537
538         >>> v = VCS(repo='.')
539         >>> v._version = '2.3.1 (release)'
540         >>> v.version_cmp(2,3,1)
541         0
542         >>> v.version_cmp(2,3,2)
543         -1
544         >>> v.version_cmp(2,3,'a',5)
545         1
546         >>> v.version_cmp(2,3,0)
547         1
548         >>> v.version_cmp(2,3,1,'a',5)
549         1
550         >>> v.version_cmp(2,3,1,1)
551         -1
552         >>> v.version_cmp(3)
553         -1
554         >>> v._version = '2.0.0pre2'
555         >>> v._parsed_version = None
556         >>> v.version_cmp(3)
557         -1
558         >>> v.version_cmp(2,0,1)
559         -1
560         >>> v.version_cmp(2,0,0,'pre',1)
561         1
562         >>> v.version_cmp(2,0,0,'pre',2)
563         0
564         >>> v.version_cmp(2,0,0,'pre',3)
565         -1
566         >>> v.version_cmp(2,0,0,'a',3)
567         1
568         >>> v.version_cmp(2,0,0,'rc',1)
569         -1
570         """
571         if not hasattr(self, '_parsed_version') \
572                 or self._parsed_version == None:
573             num_part = self.version().split(' ')[0]
574             self._parsed_version = []
575             for num in num_part.split('.'):
576                 try:
577                     self._parsed_version.append(int(num))
578                 except ValueError, e:
579                     # bzr version number might contain non-numerical tags
580                     splitter = re.compile(r'[\D]') # Match non-digits
581                     splits = splitter.split(num)
582                     # if len(tag) > 1 some splits will be empty; remove
583                     splits = filter(lambda s: s != '', splits)
584                     tag_starti = len(splits[0])
585                     num_starti = num.find(splits[1], tag_starti)
586                     tag = num[tag_starti:num_starti]
587                     self._parsed_version.append(int(splits[0]))
588                     self._parsed_version.append(tag)
589                     self._parsed_version.append(int(splits[1]))
590         for current,other in zip(self._parsed_version, args):
591             if type(current) != type (other):
592                 # one of them is a pre-release string
593                 if type(current) != types.IntType:
594                     return -1
595                 else:
596                     return 1
597             c = cmp(current,other)
598             if c != 0:
599                 return c
600         # see if one is longer than the other
601         verlen = len(self._parsed_version)
602         arglen = len(args)
603         if verlen == arglen:
604             return 0
605         elif verlen > arglen:
606             if type(self._parsed_version[arglen]) != types.IntType:
607                 return -1 # self is a prerelease
608             else:
609                 return 1
610         else:
611             if type(args[verlen]) != types.IntType:
612                 return 1 # args is a prerelease
613             else:
614                 return -1
615
616     def installed(self):
617         if self.version() != None:
618             return True
619         return False
620
621     def get_user_id(self):
622         """
623         Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
624         If the VCS has not been configured with a username, return None.
625         You can override the automatic lookup procedure by setting the
626         VCS.user_id attribute to a string of your choice.
627         """
628         if not hasattr(self, 'user_id'):
629             self.user_id = self._vcs_get_user_id()
630             if self.user_id == None:
631                 # guess missing info
632                 name = libbe.ui.util.user.get_fallback_fullname()
633                 email = libbe.ui.util.user.get_fallback_email()
634                 self.user_id = libbe.ui.util.user.create_user_id(name, email)
635         return self.user_id
636
637     def _detect(self, path='.'):
638         """
639         Detect whether a directory is revision controlled with this VCS.
640         """
641         return self._vcs_detect(path)
642
643     def root(self):
644         """Set the root directory to the path's VCS root.
645
646         This is the default working directory for future invocations.
647         Consider the following usage case:
648
649         You have a project rooted in::
650
651           /path/to/source/
652
653         by which I mean the VCS repository is in, for example::
654
655           /path/to/source/.bzr
656
657         However, you're of in some subdirectory like::
658
659           /path/to/source/ui/testing
660
661         and you want to comment on a bug.  `root` will locate your VCS
662         root (``/path/to/source/``) and set the repo there.  This
663         means that it doesn't matter where you are in your project
664         tree when you call "be COMMAND", it always acts as if you called
665         it from the VCS root.
666         """
667         if self._detect(self.repo) == False:
668             raise VCSUnableToRoot(self)
669         root = self._vcs_root(self.repo)
670         self.repo = os.path.realpath(root)
671         if os.path.isdir(self.repo) == False:
672             self.repo = os.path.dirname(self.repo)
673         self.be_dir = os.path.join(
674             self.repo, self._cached_path_id._spacer_dirs[0])
675         self._cached_path_id.root(self.repo)
676         self._rooted = True
677
678     def _init(self):
679         """
680         Begin versioning the tree based at self.repo.
681         Also roots the vcs at path.
682
683         See Also
684         --------
685         root : called if the VCS has already been initialized.
686         """
687         if not os.path.exists(self.repo) or not os.path.isdir(self.repo):
688             raise VCSUnableToRoot(self)
689         if self._vcs_detect(self.repo) == False:
690             self._vcs_init(self.repo)
691         if self._rooted == False:
692             self.root()
693         os.mkdir(self.be_dir)
694         self._vcs_add(self._u_rel_path(self.be_dir))
695         self._setup_storage_version()
696         self._cached_path_id.init()
697
698     def _destroy(self):
699         self._vcs_destroy()
700         self._cached_path_id.destroy()
701         if os.path.exists(self.be_dir):
702             shutil.rmtree(self.be_dir)
703
704     def _connect(self):
705         if self._rooted == False:
706             self.root()
707         if not os.path.isdir(self.be_dir):
708             raise libbe.storage.base.ConnectionError(self)
709         self._cached_path_id.connect()
710         self.check_storage_version()
711
712     def _disconnect(self):
713         self._cached_path_id.disconnect()
714
715     def path(self, id, revision=None, relpath=True):
716         if revision == None:
717             path = self._cached_path_id.path(id)
718             if relpath == True:
719                 return self._u_rel_path(path)
720             return path
721         path = self._vcs_path(id, revision)
722         if relpath == True:
723             return path
724         return os.path.join(self.repo, path)
725
726     def _add_path(self, path, directory=False):
727         relpath = self._u_rel_path(path)
728         reldirs = relpath.split(os.path.sep)
729         if directory == False:
730             reldirs = reldirs[:-1]
731         dir = self.repo
732         for reldir in reldirs:
733             dir = os.path.join(dir, reldir)
734             if not os.path.exists(dir):
735                 os.mkdir(dir)
736                 self._vcs_add(self._u_rel_path(dir))
737             elif not os.path.isdir(dir):
738                 raise libbe.storage.base.InvalidDirectory
739         if directory == False:
740             if not os.path.exists(path):
741                 open(path, 'w').close()
742             self._vcs_add(self._u_rel_path(path))
743
744     def _add(self, id, parent=None, **kwargs):
745         path = self._cached_path_id.add_id(id, parent)
746         self._add_path(path, **kwargs)
747
748     def _exists(self, id, revision=None):
749         if revision == None:
750             try:
751                 path = self.path(id, revision, relpath=False)
752             except InvalidID, e:
753                 return False
754             return os.path.exists(path)
755         path = self.path(id, revision, relpath=True)
756         return self._vcs_exists(relpath, revision)
757
758     def _remove(self, id):
759         path = self._cached_path_id.path(id)
760         if os.path.exists(path):
761             if os.path.isdir(path) and len(self.children(id)) > 0:
762                 raise libbe.storage.base.DirectoryNotEmpty(id)
763             self._vcs_remove(self._u_rel_path(path))
764             if os.path.exists(path):
765                 if os.path.isdir(path):
766                     os.rmdir(path)
767                 else:
768                     os.remove(path)
769         self._cached_path_id.remove_id(id)
770
771     def _recursive_remove(self, id):
772         path = self._cached_path_id.path(id)
773         for dirpath,dirnames,filenames in os.walk(path, topdown=False):
774             filenames.extend(dirnames)
775             for f in filenames:
776                 fullpath = os.path.join(dirpath, f)
777                 if os.path.exists(fullpath) == False:
778                     continue
779                 self._vcs_remove(self._u_rel_path(fullpath))
780         if os.path.exists(path):
781             shutil.rmtree(path)
782         path = self._cached_path_id.path(id, relpath=True)
783         for id,p in self._cached_path_id._cache.items():
784             if p.startswith(path):
785                 self._cached_path_id.remove_id(id)
786
787     def _ancestors(self, id=None, revision=None):
788         if id==None:
789             path = self.be_dir
790         else:
791             path = self.path(id, revision, relpath=False)
792         ancestors = []
793         while True:
794             if not path.startswith(self.repo + os.path.sep):
795                 break
796             path = os.path.dirname(path)
797             try:
798                 id = self._u_path_to_id(path)
799                 ancestors.append(id)
800             except (SpacerCollision, InvalidPath):
801                 pass    
802         return ancestors
803
804     def _children(self, id=None, revision=None):
805         if revision == None:
806             isdir = os.path.isdir
807             listdir = os.listdir
808         else:
809             isdir = lambda path : self._vcs_isdir(
810                 self._u_rel_path(path), revision)
811             listdir = lambda path : self._vcs_listdir(
812                 self._u_rel_path(path), revision)
813         if id==None:
814             path = self.be_dir
815         else:
816             path = self.path(id, revision, relpath=False)
817         if isdir(path) == False: 
818             return []
819         children = listdir(path)
820         for i,c in enumerate(children):
821             if c in self._cached_path_id._spacer_dirs:
822                 children[i] = None
823                 children.extend([os.path.join(c, c2) for c2 in
824                                  listdir(os.path.join(path, c))])
825             elif c in ['id-cache', 'version']:
826                 children[i] = None
827             elif self.interspersed_vcs_files \
828                     and self._vcs_is_versioned(c) == False:
829                 children[i] = None
830         for i,c in enumerate(children):
831             if c == None: continue
832             cpath = os.path.join(path, c)
833             if self.interspersed_vcs_files == True \
834                     and revision != None \
835                     and self._vcs_is_versioned(cpath) == False:
836                 children[i] = None
837             else:
838                 children[i] = self._u_path_to_id(cpath)
839         return [c for c in children if c != None]
840
841     def _get(self, id, default=libbe.util.InvalidObject, revision=None):
842         try:
843             relpath = self.path(id, revision, relpath=True)
844             contents = self._vcs_get_file_contents(relpath, revision)
845         except InvalidID, e:
846             if default == libbe.util.InvalidObject:
847                 raise e
848             return default
849         if contents in [libbe.storage.base.InvalidDirectory,
850                         libbe.util.InvalidObject] \
851                 or len(contents) == 0:
852             if default == libbe.util.InvalidObject:
853                 raise InvalidID(id, revision)
854             return default
855         return contents
856
857     def _set(self, id, value):
858         try:
859             path = self._cached_path_id.path(id)
860         except InvalidID, e:
861             raise
862         if not os.path.exists(path):
863             raise InvalidID(id)
864         if os.path.isdir(path):
865             raise libbe.storage.base.InvalidDirectory(id)
866         f = open(path, "wb")
867         f.write(value)
868         f.close()
869         self._vcs_update(self._u_rel_path(path))
870
871     def _commit(self, summary, body=None, allow_empty=False):
872         summary = summary.strip()+'\n'
873         if body is not None:
874             summary += '\n' + body.strip() + '\n'
875         descriptor, filename = tempfile.mkstemp()
876         revision = None
877         try:
878             temp_file = os.fdopen(descriptor, 'wb')
879             temp_file.write(summary)
880             temp_file.flush()
881             revision = self._vcs_commit(filename, allow_empty=allow_empty)
882             temp_file.close()
883         finally:
884             os.remove(filename)
885         return revision
886
887     def revision_id(self, index=None):
888         if index == None:
889             return None
890         try:
891             if int(index) != index:
892                 raise InvalidRevision(index)
893         except ValueError:
894             raise InvalidRevision(index)
895         revid = self._vcs_revision_id(index)
896         if revid == None:
897             raise libbe.storage.base.InvalidRevision(index)
898         return revid
899
900     def changed(self, revision):
901         new,mod,rem = self._vcs_changed(revision)
902         def paths_to_ids(paths):
903             for p in paths:
904                 try:
905                     id = self._u_path_to_id(p)
906                     yield id
907                 except (SpacerCollision, InvalidPath):
908                     pass
909         new_id = list(paths_to_ids(new))
910         mod_id = list(paths_to_ids(mod))
911         rem_id = list(paths_to_ids(rem))
912         return (new_id, mod_id, rem_id)
913
914     def _u_any_in_string(self, list, string):
915         """Return True if any of the strings in list are in string.
916         Otherwise return False.
917         """
918         for list_string in list:
919             if list_string in string:
920                 return True
921         return False
922
923     def _u_invoke(self, *args, **kwargs):
924         if 'cwd' not in kwargs:
925             kwargs['cwd'] = self.repo
926         if 'verbose' not in kwargs:
927             kwargs['verbose'] = self.verbose_invoke
928         if 'encoding' not in kwargs:
929             kwargs['encoding'] = self.encoding
930         return invoke(*args, **kwargs)
931
932     def _u_invoke_client(self, *args, **kwargs):
933         cl_args = [self.client]
934         cl_args.extend(args)
935         return self._u_invoke(cl_args, **kwargs)
936
937     def _u_search_parent_directories(self, path, filename):
938         """Find the file (or directory) named filename in path or in any of
939         path's parents.
940
941         e.g.
942           search_parent_directories("/a/b/c", ".be")
943         will return the path to the first existing file from
944           /a/b/c/.be
945           /a/b/.be
946           /a/.be
947           /.be
948         or None if none of those files exist.
949         """
950         try:
951             ret = search_parent_directories(path, filename)
952         except AssertionError, e:
953             return None
954         return ret
955
956     def _u_find_id_from_manifest(self, id, manifest, revision=None):
957         """Search for the relative path to id using manifest, a list of all
958         files.
959         
960         Returns None if the id is not found.
961         """
962         be_dir = self._cached_path_id._spacer_dirs[0]
963         be_dir_sep = self._cached_path_id._spacer_dirs[0] + os.path.sep
964         files = [f for f in manifest if f.startswith(be_dir_sep)]
965         for file in files:
966             if not file.startswith(be_dir+os.path.sep):
967                 continue
968             parts = file.split(os.path.sep)
969             dir = parts.pop(0) # don't add the first spacer dir
970             for part in parts[:-1]:
971                 dir = os.path.join(dir, part)
972                 if not dir in files:
973                     files.append(dir)
974         for file in files:
975             try:
976                 p_id = self._u_path_to_id(file)
977                 if p_id == id:
978                     return file
979             except (SpacerCollision, InvalidPath):
980                 pass
981         raise InvalidID(id, revision=revision)
982
983     def _u_find_id(self, id, revision):
984         """Search for the relative path to id as of revision.
985
986         Returns None if the id is not found.
987         """
988         assert self._rooted == True
989         be_dir = self._cached_path_id._spacer_dirs[0]
990         stack = [(be_dir, be_dir)]
991         while len(stack) > 0:
992             path,long_id = stack.pop()
993             if long_id.endswith('/'+id):
994                 return path
995             if self._vcs_isdir(path, revision) == False:
996                 continue
997             for child in self._vcs_listdir(path, revision):
998                 stack.append((os.path.join(path, child),
999                               '/'.join([long_id, child])))
1000         raise InvalidID(id, revision=revision)
1001
1002     def _u_path_to_id(self, path):
1003         return self._cached_path_id.id(path)
1004
1005     def _u_rel_path(self, path, root=None):
1006         """Return the relative path to path from root.
1007
1008         Examples:
1009
1010         >>> vcs = new()
1011         >>> vcs._u_rel_path("/a.b/c/.be", "/a.b/c")
1012         '.be'
1013         >>> vcs._u_rel_path("/a.b/c/", "/a.b/c")
1014         '.'
1015         >>> vcs._u_rel_path("/a.b/c/", "/a.b/c/")
1016         '.'
1017         >>> vcs._u_rel_path("./a", ".")
1018         'a'
1019         """
1020         if root == None:
1021             if self.repo == None:
1022                 raise VCSNotRooted(self)
1023             root = self.repo
1024         path = os.path.abspath(path)
1025         absRoot = os.path.abspath(root)
1026         absRootSlashedDir = os.path.join(absRoot,"")
1027         if path in [absRoot, absRootSlashedDir]:
1028             return '.'
1029         if not path.startswith(absRootSlashedDir):
1030             raise InvalidPath(path, absRootSlashedDir)
1031         relpath = path[len(absRootSlashedDir):]
1032         return relpath
1033
1034     def _u_abspath(self, path, root=None):
1035         """Return the absolute path from a path relative to root.
1036
1037         Examples
1038         --------
1039
1040         >>> vcs = new()
1041         >>> vcs._u_abspath(".be", "/a.b/c")
1042         '/a.b/c/.be'
1043         """
1044         if root == None:
1045             assert self.repo != None, "VCS not rooted"
1046             root = self.repo
1047         return os.path.abspath(os.path.join(root, path))
1048
1049     def _u_parse_commitfile(self, commitfile):
1050         """Split the commitfile created in self.commit() back into summary and
1051         header lines.
1052         """
1053         f = codecs.open(commitfile, 'r', self.encoding)
1054         summary = f.readline()
1055         body = f.read()
1056         body.lstrip('\n')
1057         if len(body) == 0:
1058             body = None
1059         f.close()
1060         return (summary, body)
1061
1062     def check_storage_version(self):
1063         version = self.storage_version()
1064         if version != libbe.storage.STORAGE_VERSION:
1065             upgrade.upgrade(self.repo, version)
1066
1067     def storage_version(self, revision=None, path=None):
1068         """Return the storage version of the on-disk files.
1069
1070         See Also
1071         --------
1072         :mod:`libbe.storage.util.upgrade`
1073         """
1074         if path == None:
1075             path = os.path.join(self.repo, '.be', 'version')
1076         if not os.path.exists(path):
1077             raise libbe.storage.InvalidStorageVersion(None)
1078         if revision == None: # don't require connection
1079             return libbe.util.encoding.get_file_contents(
1080                 path, decode=True).rstrip()
1081         relpath = self._u_rel_path(path)
1082         contents = self._vcs_get_file_contents(relpath, revision=revision)
1083         if type(contents) != types.UnicodeType:
1084             contents = unicode(contents, self.encoding)
1085         return contents.strip()
1086
1087     def _setup_storage_version(self):
1088         """
1089         Requires disk access.
1090         """
1091         assert self._rooted == True
1092         path = os.path.join(self.be_dir, 'version')
1093         if not os.path.exists(path):
1094             libbe.util.encoding.set_file_contents(path,
1095                 libbe.storage.STORAGE_VERSION+'\n')
1096             self._vcs_add(self._u_rel_path(path))
1097
1098
1099 if libbe.TESTING == True:
1100     class VCSTestCase (unittest.TestCase):
1101         """
1102         Test cases for base VCS class (in addition to the Storage test
1103         cases).
1104         """
1105
1106         Class = VCS
1107
1108         def __init__(self, *args, **kwargs):
1109             super(VCSTestCase, self).__init__(*args, **kwargs)
1110             self.dirname = None
1111
1112         def setUp(self):
1113             """Set up test fixtures for Storage test case."""
1114             super(VCSTestCase, self).setUp()
1115             self.dir = Dir()
1116             self.dirname = self.dir.path
1117             self.s = self.Class(repo=self.dirname)
1118             if self.s.installed() == True:
1119                 self.s.init()
1120                 self.s.connect()
1121
1122         def tearDown(self):
1123             super(VCSTestCase, self).tearDown()
1124             if self.s.installed() == True:
1125                 self.s.disconnect()
1126                 self.s.destroy()
1127             self.dir.cleanup()
1128
1129     class VCS_installed_TestCase (VCSTestCase):
1130         def test_installed(self):
1131             """See if the VCS is installed.
1132             """
1133             self.failUnless(self.s.installed() == True,
1134                             '%(name)s VCS not found' % vars(self.Class))
1135
1136
1137     class VCS_detection_TestCase (VCSTestCase):
1138         def test_detection(self):
1139             """See if the VCS detects its installed repository
1140             """
1141             if self.s.installed():
1142                 self.s.disconnect()
1143                 self.failUnless(self.s._detect(self.dirname) == True,
1144                     'Did not detected %(name)s VCS after initialising'
1145                     % vars(self.Class))
1146                 self.s.connect()
1147
1148         def test_no_detection(self):
1149             """See if the VCS detects its installed repository
1150             """
1151             if self.s.installed() and self.Class.name != 'None':
1152                 self.s.disconnect()
1153                 self.s.destroy()
1154                 self.failUnless(self.s._detect(self.dirname) == False,
1155                     'Detected %(name)s VCS before initialising'
1156                     % vars(self.Class))
1157                 self.s.init()
1158                 self.s.connect()
1159
1160         def test_vcs_repo_in_specified_root_path(self):
1161             """VCS root directory should be in specified root path."""
1162             rp = os.path.realpath(self.s.repo)
1163             dp = os.path.realpath(self.dirname)
1164             vcs_name = self.Class.name
1165             self.failUnless(
1166                 dp == rp or rp == None,
1167                 "%(vcs_name)s VCS root in wrong dir (%(dp)s %(rp)s)" % vars())
1168
1169     class VCS_get_user_id_TestCase(VCSTestCase):
1170         """Test cases for VCS.get_user_id method."""
1171
1172         def test_get_existing_user_id(self):
1173             """Should get the existing user ID."""
1174             if self.s.installed():
1175                 user_id = self.s.get_user_id()
1176                 if user_id == None:
1177                     return
1178                 name,email = libbe.ui.util.user.parse_user_id(user_id)
1179                 if email != None:
1180                     self.failUnless('@' in email, email)
1181
1182     def make_vcs_testcase_subclasses(vcs_class, namespace):
1183         c = vcs_class()
1184         if c.installed():
1185             if c.versioned == True:
1186                 libbe.storage.base.make_versioned_storage_testcase_subclasses(
1187                     vcs_class, namespace)
1188             else:
1189                 libbe.storage.base.make_storage_testcase_subclasses(
1190                     vcs_class, namespace)
1191
1192         if namespace != sys.modules[__name__]:
1193             # Make VCSTestCase subclasses for vcs_class in the namespace.
1194             vcs_testcase_classes = [
1195                 c for c in (
1196                     ob for ob in globals().values() if isinstance(ob, type))
1197                 if issubclass(c, VCSTestCase) \
1198                     and c.Class == VCS]
1199
1200             for base_class in vcs_testcase_classes:
1201                 testcase_class_name = vcs_class.__name__ + base_class.__name__
1202                 testcase_class_bases = (base_class,)
1203                 testcase_class_dict = dict(base_class.__dict__)
1204                 testcase_class_dict['Class'] = vcs_class
1205                 testcase_class = type(
1206                     testcase_class_name, testcase_class_bases, testcase_class_dict)
1207                 setattr(namespace, testcase_class_name, testcase_class)
1208
1209     make_vcs_testcase_subclasses(VCS, sys.modules[__name__])
1210
1211     unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
1212     suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])