Fix my busted 1512c0e2a64e patch to libbe/util/encoding.py.
[be.git] / libbe / storage / vcs / base.py
1 # Copyright (C) 2005-2011 Aaron Bentley <abentley@panoramicfeedback.com>
2 #                         Alexander Belchenko <bialix@ukr.net>
3 #                         Ben Finney <benf@cybersource.com.au>
4 #                         Chris Ball <cjb@laptop.org>
5 #                         Gianluca Montecchi <gian@grys.it>
6 #                         W. Trevor King <wking@drexel.edu>
7 #
8 # This file is part of Bugs Everywhere.
9 #
10 # Bugs Everywhere is free software; you can redistribute it and/or modify it
11 # under the terms of the GNU General Public License as published by the
12 # Free Software Foundation, either version 2 of the License, or (at your
13 # option) any later version.
14 #
15 # Bugs Everywhere is distributed in the hope that it will be useful, but
16 # WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18 # General Public License for more details.
19 #
20 # You should have received a copy of the GNU General Public License
21 # along with Bugs Everywhere.  If not, see <http://www.gnu.org/licenses/>.
22
23 """Define the base :class:`VCS` (Version Control System) class, which
24 should be subclassed by other Version Control System backends.  The
25 base class implements a "do not version" VCS.
26 """
27
28 import codecs
29 import os
30 import os.path
31 import re
32 import shutil
33 import sys
34 import tempfile
35 import types
36
37 import libbe
38 import libbe.storage
39 import libbe.storage.base
40 import libbe.util.encoding
41 from libbe.storage.base import EmptyCommit, InvalidRevision, InvalidID
42 from libbe.util.utility import Dir, search_parent_directories
43 from libbe.util.subproc import CommandError, invoke
44 from libbe.util.plugin import import_by_name
45 import libbe.storage.util.upgrade as upgrade
46
47 if libbe.TESTING == True:
48     import unittest
49     import doctest
50
51     import libbe.ui.util.user
52
53 VCS_ORDER = ['arch', 'bzr', 'darcs', 'git', 'hg', 'monotone']
54 """List VCS modules in order of preference.
55
56 Don't list this module, it is implicitly last.
57 """
58
59 def set_preferred_vcs(name):
60     """Manipulate :data:`VCS_ORDER` to place `name` first.
61
62     This is primarily indended for testing purposes.
63     """
64     global VCS_ORDER
65     assert name in VCS_ORDER, \
66         'unrecognized VCS %s not in\n  %s' % (name, VCS_ORDER)
67     VCS_ORDER.remove(name)
68     VCS_ORDER.insert(0, name)
69
70 def _get_matching_vcs(matchfn):
71     """Return the first module for which matchfn(VCS_instance) is True.
72
73     Searches in :data:`VCS_ORDER`.
74     """
75     for submodname in VCS_ORDER:
76         module = import_by_name('libbe.storage.vcs.%s' % submodname)
77         vcs = module.new()
78         if matchfn(vcs) == True:
79             return vcs
80     return VCS()
81
82 def vcs_by_name(vcs_name):
83     """Return the module for the VCS with the given name.
84
85     Searches in :data:`VCS_ORDER`.
86     """
87     if vcs_name == VCS.name:
88         return new()
89     return _get_matching_vcs(lambda vcs: vcs.name == vcs_name)
90
91 def detect_vcs(dir):
92     """Return an VCS instance for the vcs being used in this directory.
93
94     Searches in :data:`VCS_ORDER`.
95     """
96     return _get_matching_vcs(lambda vcs: vcs._detect(dir))
97
98 def installed_vcs():
99     """Return an instance of an installed VCS.
100
101     Searches in :data:`VCS_ORDER`.
102     """
103     return _get_matching_vcs(lambda vcs: vcs.installed())
104
105
106 class VCSNotRooted (libbe.storage.base.ConnectionError):
107     def __init__(self, vcs):
108         msg = 'VCS not rooted'
109         libbe.storage.base.ConnectionError.__init__(self, msg)
110         self.vcs = vcs
111
112 class VCSUnableToRoot (libbe.storage.base.ConnectionError):
113     def __init__(self, vcs):
114         msg = 'VCS unable to root'
115         libbe.storage.base.ConnectionError.__init__(self, msg)
116         self.vcs = vcs
117
118 class InvalidPath (InvalidID):
119     def __init__(self, path, root, msg=None, **kwargs):
120         if msg == None:
121             msg = 'Path "%s" not in root "%s"' % (path, root)
122         InvalidID.__init__(self, msg=msg, **kwargs)
123         self.path = path
124         self.root = root
125
126 class SpacerCollision (InvalidPath):
127     def __init__(self, path, spacer):
128         msg = 'Path "%s" collides with spacer directory "%s"' % (path, spacer)
129         InvalidPath.__init__(self, path, root=None, msg=msg)
130         self.spacer = spacer
131
132 class NoSuchFile (InvalidID):
133     def __init__(self, pathname, root='.'):
134         path = os.path.abspath(os.path.join(root, pathname))
135         InvalidID.__init__(self, 'No such file: %s' % path)
136
137
138 class CachedPathID (object):
139     """Cache Storage ID <-> path policy.
140  
141     Paths generated following::
142
143        .../.be/BUGDIR/bugs/BUG/comments/COMMENT
144           ^-- root path
145
146     See :mod:`libbe.util.id` for a discussion of ID formats.
147
148     Examples
149     --------
150
151     >>> dir = Dir()
152     >>> os.mkdir(os.path.join(dir.path, '.be'))
153     >>> os.mkdir(os.path.join(dir.path, '.be', 'abc'))
154     >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs'))
155     >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123'))
156     >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments'))
157     >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def'))
158     >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '456'))
159     >>> open(os.path.join(dir.path, '.be', 'abc', 'values'),
160     ...      'w').close()
161     >>> open(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'values'),
162     ...      'w').close()
163     >>> open(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'),
164     ...      'w').close()
165     >>> c = CachedPathID()
166     >>> c.root(dir.path)
167     >>> c.id(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'))
168     'def/values'
169     >>> c.init()
170     >>> sorted(os.listdir(os.path.join(c._root, '.be')))
171     ['abc', 'id-cache']
172     >>> c.connect()
173     >>> c.path('123/values') # doctest: +ELLIPSIS
174     u'.../.be/abc/bugs/123/values'
175     >>> c.disconnect()
176     >>> c.destroy()
177     >>> sorted(os.listdir(os.path.join(c._root, '.be')))
178     ['abc']
179     >>> c.connect() # demonstrate auto init
180     >>> sorted(os.listdir(os.path.join(c._root, '.be')))
181     ['abc', 'id-cache']
182     >>> c.add_id(u'xyz', parent=None) # doctest: +ELLIPSIS
183     u'.../.be/xyz'
184     >>> c.add_id('xyz/def', parent='xyz') # doctest: +ELLIPSIS
185     u'.../.be/xyz/def'
186     >>> c.add_id('qrs', parent='123') # doctest: +ELLIPSIS
187     u'.../.be/abc/bugs/123/comments/qrs'
188     >>> c.disconnect()
189     >>> c.connect()
190     >>> c.path('qrs') # doctest: +ELLIPSIS
191     u'.../.be/abc/bugs/123/comments/qrs'
192     >>> c.remove_id('qrs')
193     >>> c.path('qrs')
194     Traceback (most recent call last):
195       ...
196     InvalidID: qrs in revision None
197     >>> c.disconnect()
198     >>> c.destroy()
199     >>> dir.cleanup()
200     """
201     def __init__(self, encoding=None):
202         self.encoding = libbe.util.encoding.get_text_file_encoding()
203         self._spacer_dirs = ['.be', 'bugs', 'comments']
204
205     def root(self, path):
206         self._root = os.path.abspath(path).rstrip(os.path.sep)
207         self._cache_path = os.path.join(
208             self._root, self._spacer_dirs[0], 'id-cache')
209
210     def init(self, verbose=True, cache=None):
211         """Create cache file for an existing .be directory.
212
213         The file contains multiple lines of the form::
214
215             UUID\tPATH
216         """
217         if cache == None:
218             self._cache = {}
219         else:
220             self._cache = cache
221         spaced_root = os.path.join(self._root, self._spacer_dirs[0])
222         for dirpath, dirnames, filenames in os.walk(spaced_root):
223             if dirpath == spaced_root:
224                 continue
225             try:
226                 id = self.id(dirpath)
227                 relpath = dirpath[len(self._root + os.path.sep):]
228                 if id.count('/') == 0:
229                     if verbose == True and id in self._cache:
230                         print >> sys.stderr, 'Multiple paths for %s: \n  %s\n  %s' % (id, self._cache[id], relpath)
231                     self._cache[id] = relpath
232             except InvalidPath:
233                 pass
234         if self._cache != cache:
235             self._changed = True
236         if cache == None:
237             self.disconnect()
238
239     def destroy(self):
240         if os.path.exists(self._cache_path):
241             os.remove(self._cache_path)
242
243     def connect(self):
244         if not os.path.exists(self._cache_path):
245             try:
246                 self.init()
247             except IOError:
248                 raise libbe.storage.base.ConnectionError
249         self._cache = {} # key: uuid, value: path
250         self._changed = False
251         f = codecs.open(self._cache_path, 'r', self.encoding)
252         for line in f:
253             fields = line.rstrip('\n').split('\t')
254             self._cache[fields[0]] = fields[1]
255         f.close()
256
257     def disconnect(self):
258         if self._changed == True:
259             f = codecs.open(self._cache_path, 'w', self.encoding)
260             for uuid,path in self._cache.items():
261                 f.write('%s\t%s\n' % (uuid, path))
262             f.close()
263         self._cache = {}
264
265     def path(self, id, relpath=False):
266         fields = id.split('/', 1)
267         uuid = fields[0]
268         if len(fields) == 1:
269             extra = []
270         else:
271             extra = fields[1:]
272         if uuid not in self._cache:
273             self.init(verbose=False, cache=self._cache)
274             if uuid not in self._cache:
275                 raise InvalidID(uuid)
276         if relpath == True:
277             return os.path.join(self._cache[uuid], *extra)
278         return os.path.join(self._root, self._cache[uuid], *extra)
279
280     def add_id(self, id, parent=None):
281         if id.count('/') > 0:
282             # not a UUID-level path
283             assert id.startswith(parent), \
284                 'Strange ID: "%s" should start with "%s"' % (id, parent)
285             path = self.path(id)
286         elif id in self._cache:
287             # already added
288             path = self.path(id)
289         else:
290             if parent == None:
291                 parent_path = ''
292                 spacer = self._spacer_dirs[0]
293             else:
294                 assert parent.count('/') == 0, \
295                     'Strange parent ID: "%s" should be UUID' % parent
296                 parent_path = self.path(parent, relpath=True)
297                 parent_spacer = parent_path.split(os.path.sep)[-2]
298                 i = self._spacer_dirs.index(parent_spacer)
299                 spacer = self._spacer_dirs[i+1]
300             path = os.path.join(parent_path, spacer, id)
301             self._cache[id] = path
302             self._changed = True
303             path = os.path.join(self._root, path)
304         return path
305
306     def remove_id(self, id):
307         if id.count('/') > 0:
308             return # not a UUID-level path
309         self._cache.pop(id)
310         self._changed = True
311
312     def id(self, path):
313         path = os.path.join(self._root, path)
314         if not path.startswith(self._root + os.path.sep):
315             raise InvalidPath(path, self._root)
316         path = path[len(self._root + os.path.sep):]
317         orig_path = path
318         if not path.startswith(self._spacer_dirs[0] + os.path.sep):
319             raise InvalidPath(path, self._spacer_dirs[0])
320         for spacer in self._spacer_dirs:
321             if not path.startswith(spacer + os.path.sep):
322                 break
323             id = path[len(spacer + os.path.sep):]
324             fields = path[len(spacer + os.path.sep):].split(os.path.sep,1)
325             if len(fields) == 1:
326                 break
327             path = fields[1]
328         for spacer in self._spacer_dirs:
329             if id.endswith(os.path.sep + spacer):
330                 raise SpacerCollision(orig_path, spacer)
331         if os.path.sep != '/':
332             id = id.replace(os.path.sep, '/')
333         return id
334
335
336 def new():
337     return VCS()
338
339 class VCS (libbe.storage.base.VersionedStorage):
340     """Implement a 'no-VCS' interface.
341
342     Support for other VCSs can be added by subclassing this class, and
343     overriding methods `_vcs_*()` with code appropriate for your VCS.
344
345     The methods `_u_*()` are utility methods available to the `_vcs_*()`
346     methods.
347     """
348     name = 'None'
349     client = 'false' # command-line tool for _u_invoke_client
350
351     def __init__(self, *args, **kwargs):
352         if 'encoding' not in kwargs:
353             kwargs['encoding'] = libbe.util.encoding.get_text_file_encoding()
354         libbe.storage.base.VersionedStorage.__init__(self, *args, **kwargs)
355         self.versioned = False
356         self.interspersed_vcs_files = False
357         self.verbose_invoke = False
358         self._cached_path_id = CachedPathID()
359         self._rooted = False
360
361     def _vcs_version(self):
362         """
363         Return the VCS version string.
364         """
365         return '0'
366
367     def _vcs_get_user_id(self):
368         """
369         Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
370         If the VCS has not been configured with a username, return None.
371         """
372         return None
373
374     def _vcs_detect(self, path=None):
375         """
376         Detect whether a directory is revision controlled with this VCS.
377         """
378         return True
379
380     def _vcs_root(self, path):
381         """
382         Get the VCS root.  This is the default working directory for
383         future invocations.  You would normally set this to the root
384         directory for your VCS.
385         """
386         if os.path.isdir(path) == False:
387             path = os.path.dirname(path)
388             if path == '':
389                 path = os.path.abspath('.')
390         return path
391
392     def _vcs_init(self, path):
393         """
394         Begin versioning the tree based at path.
395         """
396         pass
397
398     def _vcs_destroy(self):
399         """
400         Remove any files used in versioning (e.g. whatever _vcs_init()
401         created).
402         """
403         pass
404
405     def _vcs_add(self, path):
406         """
407         Add the already created file at path to version control.
408         """
409         pass
410
411     def _vcs_exists(self, path, revision=None):
412         """
413         Does the path exist in a given revision? (True/False)
414         """
415         raise NotImplementedError('Lazy BE developers')
416
417     def _vcs_remove(self, path):
418         """
419         Remove the file at path from version control.  Optionally
420         remove the file from the filesystem as well.
421         """
422         pass
423
424     def _vcs_update(self, path):
425         """
426         Notify the versioning system of changes to the versioned file
427         at path.
428         """
429         pass
430
431     def _vcs_is_versioned(self, path):
432         """
433         Return true if a path is under version control, False
434         otherwise.  You only need to set this if the VCS goes about
435         dumping VCS-specific files into the .be directory.
436
437         If you do need to implement this method (e.g. Arch), set
438           self.interspersed_vcs_files = True
439         """
440         assert self.interspersed_vcs_files == False
441         raise NotImplementedError
442
443     def _vcs_get_file_contents(self, path, revision=None):
444         """
445         Get the file contents as they were in a given revision.
446         Revision==None specifies the current revision.
447         """
448         if revision != None:
449             raise libbe.storage.base.InvalidRevision(
450                 'The %s VCS does not support revision specifiers' % self.name)
451         path = os.path.join(self.repo, path)
452         if not os.path.exists(path):
453             return libbe.util.InvalidObject
454         if os.path.isdir(path):
455             return libbe.storage.base.InvalidDirectory
456         f = open(path, 'rb')
457         contents = f.read()
458         f.close()
459         return contents
460
461     def _vcs_path(self, id, revision):
462         """
463         Return the relative path to object id as of revision.
464         
465         Revision will not be None.
466         """
467         raise NotImplementedError
468
469     def _vcs_isdir(self, path, revision):
470         """
471         Return True if path (as returned by _vcs_path) was a directory
472         as of revision, False otherwise.
473         
474         Revision will not be None.
475         """
476         raise NotImplementedError
477
478     def _vcs_listdir(self, path, revision):
479         """
480         Return a list of the contents of the directory path (as
481         returned by _vcs_path) as of revision.
482         
483         Revision will not be None, and ._vcs_isdir(path, revision)
484         will be True.
485         """
486         raise NotImplementedError
487
488     def _vcs_commit(self, commitfile, allow_empty=False):
489         """
490         Commit the current working directory, using the contents of
491         commitfile as the comment.  Return the name of the old
492         revision (or None if commits are not supported).
493
494         If allow_empty == False, raise EmptyCommit if there are no
495         changes to commit.
496         """
497         return None
498
499     def _vcs_revision_id(self, index):
500         """
501         Return the name of the <index>th revision.  Index will be an
502         integer (possibly <= 0).  The choice of which branch to follow
503         when crossing branches/merges is not defined.
504
505         Return None if revision IDs are not supported, or if the
506         specified revision does not exist.
507         """
508         return None
509
510     def _vcs_changed(self, revision):
511         """
512         Return a tuple of lists of ids
513           (new, modified, removed)
514         from the specified revision to the current situation.
515         """
516         return ([], [], [])
517
518     def version(self):
519         # Cache version string for efficiency.
520         if not hasattr(self, '_version'):
521             self._version = self._vcs_version()
522         return self._version
523
524     def version_cmp(self, *args):
525         """Compare the installed VCS version `V_i` with another version
526         `V_o` (given in `*args`).  Returns
527
528            === ===============
529             1  if `V_i > V_o`
530             0  if `V_i == V_o`
531            -1  if `V_i < V_o`
532            === ===============
533
534         Examples
535         --------
536
537         >>> v = VCS(repo='.')
538         >>> v._version = '2.3.1 (release)'
539         >>> v.version_cmp(2,3,1)
540         0
541         >>> v.version_cmp(2,3,2)
542         -1
543         >>> v.version_cmp(2,3,'a',5)
544         1
545         >>> v.version_cmp(2,3,0)
546         1
547         >>> v.version_cmp(2,3,1,'a',5)
548         1
549         >>> v.version_cmp(2,3,1,1)
550         -1
551         >>> v.version_cmp(3)
552         -1
553         >>> v._version = '2.0.0pre2'
554         >>> v._parsed_version = None
555         >>> v.version_cmp(3)
556         -1
557         >>> v.version_cmp(2,0,1)
558         -1
559         >>> v.version_cmp(2,0,0,'pre',1)
560         1
561         >>> v.version_cmp(2,0,0,'pre',2)
562         0
563         >>> v.version_cmp(2,0,0,'pre',3)
564         -1
565         >>> v.version_cmp(2,0,0,'a',3)
566         1
567         >>> v.version_cmp(2,0,0,'rc',1)
568         -1
569         """
570         if not hasattr(self, '_parsed_version') \
571                 or self._parsed_version == None:
572             num_part = self.version().split(' ')[0]
573             self._parsed_version = []
574             for num in num_part.split('.'):
575                 try:
576                     self._parsed_version.append(int(num))
577                 except ValueError, e:
578                     # bzr version number might contain non-numerical tags
579                     splitter = re.compile(r'[\D]') # Match non-digits
580                     splits = splitter.split(num)
581                     # if len(tag) > 1 some splits will be empty; remove
582                     splits = filter(lambda s: s != '', splits)
583                     tag_starti = len(splits[0])
584                     num_starti = num.find(splits[1], tag_starti)
585                     tag = num[tag_starti:num_starti]
586                     self._parsed_version.append(int(splits[0]))
587                     self._parsed_version.append(tag)
588                     self._parsed_version.append(int(splits[1]))
589         for current,other in zip(self._parsed_version, args):
590             if type(current) != type (other):
591                 # one of them is a pre-release string
592                 if type(current) != types.IntType:
593                     return -1
594                 else:
595                     return 1
596             c = cmp(current,other)
597             if c != 0:
598                 return c
599         # see if one is longer than the other
600         verlen = len(self._parsed_version)
601         arglen = len(args)
602         if verlen == arglen:
603             return 0
604         elif verlen > arglen:
605             if type(self._parsed_version[arglen]) != types.IntType:
606                 return -1 # self is a prerelease
607             else:
608                 return 1
609         else:
610             if type(args[verlen]) != types.IntType:
611                 return 1 # args is a prerelease
612             else:
613                 return -1
614
615     def installed(self):
616         if self.version() != None:
617             return True
618         return False
619
620     def get_user_id(self):
621         """
622         Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
623         If the VCS has not been configured with a username, return None.
624         You can override the automatic lookup procedure by setting the
625         VCS.user_id attribute to a string of your choice.
626         """
627         if not hasattr(self, 'user_id'):
628             self.user_id = self._vcs_get_user_id()
629             if self.user_id == None:
630                 # guess missing info
631                 name = libbe.ui.util.user.get_fallback_fullname()
632                 email = libbe.ui.util.user.get_fallback_email()
633                 self.user_id = libbe.ui.util.user.create_user_id(name, email)
634         return self.user_id
635
636     def _detect(self, path='.'):
637         """
638         Detect whether a directory is revision controlled with this VCS.
639         """
640         return self._vcs_detect(path)
641
642     def root(self):
643         """Set the root directory to the path's VCS root.
644
645         This is the default working directory for future invocations.
646         Consider the following usage case:
647
648         You have a project rooted in::
649
650           /path/to/source/
651
652         by which I mean the VCS repository is in, for example::
653
654           /path/to/source/.bzr
655
656         However, you're of in some subdirectory like::
657
658           /path/to/source/ui/testing
659
660         and you want to comment on a bug.  `root` will locate your VCS
661         root (``/path/to/source/``) and set the repo there.  This
662         means that it doesn't matter where you are in your project
663         tree when you call "be COMMAND", it always acts as if you called
664         it from the VCS root.
665         """
666         if self._detect(self.repo) == False:
667             raise VCSUnableToRoot(self)
668         root = self._vcs_root(self.repo)
669         self.repo = os.path.realpath(root)
670         if os.path.isdir(self.repo) == False:
671             self.repo = os.path.dirname(self.repo)
672         self.be_dir = os.path.join(
673             self.repo, self._cached_path_id._spacer_dirs[0])
674         self._cached_path_id.root(self.repo)
675         self._rooted = True
676
677     def _init(self):
678         """
679         Begin versioning the tree based at self.repo.
680         Also roots the vcs at path.
681
682         See Also
683         --------
684         root : called if the VCS has already been initialized.
685         """
686         if not os.path.exists(self.repo) or not os.path.isdir(self.repo):
687             raise VCSUnableToRoot(self)
688         if self._vcs_detect(self.repo) == False:
689             self._vcs_init(self.repo)
690         if self._rooted == False:
691             self.root()
692         os.mkdir(self.be_dir)
693         self._vcs_add(self._u_rel_path(self.be_dir))
694         self._setup_storage_version()
695         self._cached_path_id.init()
696
697     def _destroy(self):
698         self._vcs_destroy()
699         self._cached_path_id.destroy()
700         if os.path.exists(self.be_dir):
701             shutil.rmtree(self.be_dir)
702
703     def _connect(self):
704         if self._rooted == False:
705             self.root()
706         if not os.path.isdir(self.be_dir):
707             raise libbe.storage.base.ConnectionError(self)
708         self._cached_path_id.connect()
709         self.check_storage_version()
710
711     def _disconnect(self):
712         self._cached_path_id.disconnect()
713
714     def path(self, id, revision=None, relpath=True):
715         if revision == None:
716             path = self._cached_path_id.path(id)
717             if relpath == True:
718                 return self._u_rel_path(path)
719             return path
720         path = self._vcs_path(id, revision)
721         if relpath == True:
722             return path
723         return os.path.join(self.repo, path)
724
725     def _add_path(self, path, directory=False):
726         relpath = self._u_rel_path(path)
727         reldirs = relpath.split(os.path.sep)
728         if directory == False:
729             reldirs = reldirs[:-1]
730         dir = self.repo
731         for reldir in reldirs:
732             dir = os.path.join(dir, reldir)
733             if not os.path.exists(dir):
734                 os.mkdir(dir)
735                 self._vcs_add(self._u_rel_path(dir))
736             elif not os.path.isdir(dir):
737                 raise libbe.storage.base.InvalidDirectory
738         if directory == False:
739             if not os.path.exists(path):
740                 open(path, 'w').close()
741             self._vcs_add(self._u_rel_path(path))
742
743     def _add(self, id, parent=None, **kwargs):
744         path = self._cached_path_id.add_id(id, parent)
745         self._add_path(path, **kwargs)
746
747     def _exists(self, id, revision=None):
748         if revision == None:
749             try:
750                 path = self.path(id, revision, relpath=False)
751             except InvalidID, e:
752                 return False
753             return os.path.exists(path)
754         path = self.path(id, revision, relpath=True)
755         return self._vcs_exists(relpath, revision)
756
757     def _remove(self, id):
758         path = self._cached_path_id.path(id)
759         if os.path.exists(path):
760             if os.path.isdir(path) and len(self.children(id)) > 0:
761                 raise libbe.storage.base.DirectoryNotEmpty(id)
762             self._vcs_remove(self._u_rel_path(path))
763             if os.path.exists(path):
764                 if os.path.isdir(path):
765                     os.rmdir(path)
766                 else:
767                     os.remove(path)
768         self._cached_path_id.remove_id(id)
769
770     def _recursive_remove(self, id):
771         path = self._cached_path_id.path(id)
772         for dirpath,dirnames,filenames in os.walk(path, topdown=False):
773             filenames.extend(dirnames)
774             for f in filenames:
775                 fullpath = os.path.join(dirpath, f)
776                 if os.path.exists(fullpath) == False:
777                     continue
778                 self._vcs_remove(self._u_rel_path(fullpath))
779         if os.path.exists(path):
780             shutil.rmtree(path)
781         path = self._cached_path_id.path(id, relpath=True)
782         for id,p in self._cached_path_id._cache.items():
783             if p.startswith(path):
784                 self._cached_path_id.remove_id(id)
785
786     def _ancestors(self, id=None, revision=None):
787         if id==None:
788             path = self.be_dir
789         else:
790             path = self.path(id, revision, relpath=False)
791         ancestors = []
792         while True:
793             if not path.startswith(self.repo + os.path.sep):
794                 break
795             path = os.path.dirname(path)
796             try:
797                 id = self._u_path_to_id(path)
798                 ancestors.append(id)
799             except (SpacerCollision, InvalidPath):
800                 pass    
801         return ancestors
802
803     def _children(self, id=None, revision=None):
804         if revision == None:
805             isdir = os.path.isdir
806             listdir = os.listdir
807         else:
808             isdir = lambda path : self._vcs_isdir(
809                 self._u_rel_path(path), revision)
810             listdir = lambda path : self._vcs_listdir(
811                 self._u_rel_path(path), revision)
812         if id==None:
813             path = self.be_dir
814         else:
815             path = self.path(id, revision, relpath=False)
816         if isdir(path) == False: 
817             return []
818         children = listdir(path)
819         for i,c in enumerate(children):
820             if c in self._cached_path_id._spacer_dirs:
821                 children[i] = None
822                 children.extend([os.path.join(c, c2) for c2 in
823                                  listdir(os.path.join(path, c))])
824             elif c in ['id-cache', 'version']:
825                 children[i] = None
826             elif self.interspersed_vcs_files \
827                     and self._vcs_is_versioned(c) == False:
828                 children[i] = None
829         for i,c in enumerate(children):
830             if c == None: continue
831             cpath = os.path.join(path, c)
832             if self.interspersed_vcs_files == True \
833                     and revision != None \
834                     and self._vcs_is_versioned(cpath) == False:
835                 children[i] = None
836             else:
837                 children[i] = self._u_path_to_id(cpath)
838         return [c for c in children if c != None]
839
840     def _get(self, id, default=libbe.util.InvalidObject, revision=None):
841         try:
842             relpath = self.path(id, revision, relpath=True)
843             contents = self._vcs_get_file_contents(relpath, revision)
844         except InvalidID, e:
845             if default == libbe.util.InvalidObject:
846                 raise e
847             return default
848         if contents in [libbe.storage.base.InvalidDirectory,
849                         libbe.util.InvalidObject] \
850                 or len(contents) == 0:
851             if default == libbe.util.InvalidObject:
852                 raise InvalidID(id, revision)
853             return default
854         return contents
855
856     def _set(self, id, value):
857         try:
858             path = self._cached_path_id.path(id)
859         except InvalidID, e:
860             raise
861         if not os.path.exists(path):
862             raise InvalidID(id)
863         if os.path.isdir(path):
864             raise libbe.storage.base.InvalidDirectory(id)
865         f = open(path, "wb")
866         f.write(value)
867         f.close()
868         self._vcs_update(self._u_rel_path(path))
869
870     def _commit(self, summary, body=None, allow_empty=False):
871         summary = summary.strip()+'\n'
872         if body is not None:
873             summary += '\n' + body.strip() + '\n'
874         descriptor, filename = tempfile.mkstemp()
875         revision = None
876         try:
877             temp_file = os.fdopen(descriptor, 'wb')
878             temp_file.write(summary)
879             temp_file.flush()
880             revision = self._vcs_commit(filename, allow_empty=allow_empty)
881             temp_file.close()
882         finally:
883             os.remove(filename)
884         return revision
885
886     def revision_id(self, index=None):
887         if index == None:
888             return None
889         try:
890             if int(index) != index:
891                 raise InvalidRevision(index)
892         except ValueError:
893             raise InvalidRevision(index)
894         revid = self._vcs_revision_id(index)
895         if revid == None:
896             raise libbe.storage.base.InvalidRevision(index)
897         return revid
898
899     def changed(self, revision):
900         new,mod,rem = self._vcs_changed(revision)
901         def paths_to_ids(paths):
902             for p in paths:
903                 try:
904                     id = self._u_path_to_id(p)
905                     yield id
906                 except (SpacerCollision, InvalidPath):
907                     pass
908         new_id = list(paths_to_ids(new))
909         mod_id = list(paths_to_ids(mod))
910         rem_id = list(paths_to_ids(rem))
911         return (new_id, mod_id, rem_id)
912
913     def _u_any_in_string(self, list, string):
914         """Return True if any of the strings in list are in string.
915         Otherwise return False.
916         """
917         for list_string in list:
918             if list_string in string:
919                 return True
920         return False
921
922     def _u_invoke(self, *args, **kwargs):
923         if 'cwd' not in kwargs:
924             kwargs['cwd'] = self.repo
925         if 'verbose' not in kwargs:
926             kwargs['verbose'] = self.verbose_invoke
927         if 'encoding' not in kwargs:
928             kwargs['encoding'] = self.encoding
929         return invoke(*args, **kwargs)
930
931     def _u_invoke_client(self, *args, **kwargs):
932         cl_args = [self.client]
933         cl_args.extend(args)
934         return self._u_invoke(cl_args, **kwargs)
935
936     def _u_search_parent_directories(self, path, filename):
937         """Find the file (or directory) named filename in path or in any of
938         path's parents.
939
940         e.g.
941           search_parent_directories("/a/b/c", ".be")
942         will return the path to the first existing file from
943           /a/b/c/.be
944           /a/b/.be
945           /a/.be
946           /.be
947         or None if none of those files exist.
948         """
949         try:
950             ret = search_parent_directories(path, filename)
951         except AssertionError, e:
952             return None
953         return ret
954
955     def _u_find_id_from_manifest(self, id, manifest, revision=None):
956         """Search for the relative path to id using manifest, a list of all
957         files.
958         
959         Returns None if the id is not found.
960         """
961         be_dir = self._cached_path_id._spacer_dirs[0]
962         be_dir_sep = self._cached_path_id._spacer_dirs[0] + os.path.sep
963         files = [f for f in manifest if f.startswith(be_dir_sep)]
964         for file in files:
965             if not file.startswith(be_dir+os.path.sep):
966                 continue
967             parts = file.split(os.path.sep)
968             dir = parts.pop(0) # don't add the first spacer dir
969             for part in parts[:-1]:
970                 dir = os.path.join(dir, part)
971                 if not dir in files:
972                     files.append(dir)
973         for file in files:
974             try:
975                 p_id = self._u_path_to_id(file)
976                 if p_id == id:
977                     return file
978             except (SpacerCollision, InvalidPath):
979                 pass
980         raise InvalidID(id, revision=revision)
981
982     def _u_find_id(self, id, revision):
983         """Search for the relative path to id as of revision.
984
985         Returns None if the id is not found.
986         """
987         assert self._rooted == True
988         be_dir = self._cached_path_id._spacer_dirs[0]
989         stack = [(be_dir, be_dir)]
990         while len(stack) > 0:
991             path,long_id = stack.pop()
992             if long_id.endswith('/'+id):
993                 return path
994             if self._vcs_isdir(path, revision) == False:
995                 continue
996             for child in self._vcs_listdir(path, revision):
997                 stack.append((os.path.join(path, child),
998                               '/'.join([long_id, child])))
999         raise InvalidID(id, revision=revision)
1000
1001     def _u_path_to_id(self, path):
1002         return self._cached_path_id.id(path)
1003
1004     def _u_rel_path(self, path, root=None):
1005         """Return the relative path to path from root.
1006
1007         Examples:
1008
1009         >>> vcs = new()
1010         >>> vcs._u_rel_path("/a.b/c/.be", "/a.b/c")
1011         '.be'
1012         >>> vcs._u_rel_path("/a.b/c/", "/a.b/c")
1013         '.'
1014         >>> vcs._u_rel_path("/a.b/c/", "/a.b/c/")
1015         '.'
1016         >>> vcs._u_rel_path("./a", ".")
1017         'a'
1018         """
1019         if root == None:
1020             if self.repo == None:
1021                 raise VCSNotRooted(self)
1022             root = self.repo
1023         path = os.path.abspath(path)
1024         absRoot = os.path.abspath(root)
1025         absRootSlashedDir = os.path.join(absRoot,"")
1026         if path in [absRoot, absRootSlashedDir]:
1027             return '.'
1028         if not path.startswith(absRootSlashedDir):
1029             raise InvalidPath(path, absRootSlashedDir)
1030         relpath = path[len(absRootSlashedDir):]
1031         return relpath
1032
1033     def _u_abspath(self, path, root=None):
1034         """Return the absolute path from a path realtive to root.
1035
1036         Examples
1037         --------
1038
1039         >>> vcs = new()
1040         >>> vcs._u_abspath(".be", "/a.b/c")
1041         '/a.b/c/.be'
1042         """
1043         if root == None:
1044             assert self.repo != None, "VCS not rooted"
1045             root = self.repo
1046         return os.path.abspath(os.path.join(root, path))
1047
1048     def _u_parse_commitfile(self, commitfile):
1049         """Split the commitfile created in self.commit() back into summary and
1050         header lines.
1051         """
1052         f = codecs.open(commitfile, 'r', self.encoding)
1053         summary = f.readline()
1054         body = f.read()
1055         body.lstrip('\n')
1056         if len(body) == 0:
1057             body = None
1058         f.close()
1059         return (summary, body)
1060
1061     def check_storage_version(self):
1062         version = self.storage_version()
1063         if version != libbe.storage.STORAGE_VERSION:
1064             upgrade.upgrade(self.repo, version)
1065
1066     def storage_version(self, revision=None, path=None):
1067         """Return the storage version of the on-disk files.
1068
1069         See Also
1070         --------
1071         :mod:`libbe.storage.util.upgrade`
1072         """
1073         if path == None:
1074             path = os.path.join(self.repo, '.be', 'version')
1075         if not os.path.exists(path):
1076             raise libbe.storage.InvalidStorageVersion(None)
1077         if revision == None: # don't require connection
1078             return libbe.util.encoding.get_file_contents(
1079                 path, decode=True).rstrip()
1080         relpath = self._u_rel_path(path)
1081         contents = self._vcs_get_file_contents(relpath, revision=revision)
1082         if type(contents) != types.UnicodeType:
1083             contents = unicode(contents, self.encoding)
1084         return contents.strip()
1085
1086     def _setup_storage_version(self):
1087         """
1088         Requires disk access.
1089         """
1090         assert self._rooted == True
1091         path = os.path.join(self.be_dir, 'version')
1092         if not os.path.exists(path):
1093             libbe.util.encoding.set_file_contents(path,
1094                 libbe.storage.STORAGE_VERSION+'\n')
1095             self._vcs_add(self._u_rel_path(path))
1096
1097
1098 if libbe.TESTING == True:
1099     class VCSTestCase (unittest.TestCase):
1100         """
1101         Test cases for base VCS class (in addition to the Storage test
1102         cases).
1103         """
1104
1105         Class = VCS
1106
1107         def __init__(self, *args, **kwargs):
1108             super(VCSTestCase, self).__init__(*args, **kwargs)
1109             self.dirname = None
1110
1111         def setUp(self):
1112             """Set up test fixtures for Storage test case."""
1113             super(VCSTestCase, self).setUp()
1114             self.dir = Dir()
1115             self.dirname = self.dir.path
1116             self.s = self.Class(repo=self.dirname)
1117             if self.s.installed() == True:
1118                 self.s.init()
1119                 self.s.connect()
1120
1121         def tearDown(self):
1122             super(VCSTestCase, self).tearDown()
1123             if self.s.installed() == True:
1124                 self.s.disconnect()
1125                 self.s.destroy()
1126             self.dir.cleanup()
1127
1128     class VCS_installed_TestCase (VCSTestCase):
1129         def test_installed(self):
1130             """See if the VCS is installed.
1131             """
1132             self.failUnless(self.s.installed() == True,
1133                             '%(name)s VCS not found' % vars(self.Class))
1134
1135
1136     class VCS_detection_TestCase (VCSTestCase):
1137         def test_detection(self):
1138             """See if the VCS detects its installed repository
1139             """
1140             if self.s.installed():
1141                 self.s.disconnect()
1142                 self.failUnless(self.s._detect(self.dirname) == True,
1143                     'Did not detected %(name)s VCS after initialising'
1144                     % vars(self.Class))
1145                 self.s.connect()
1146
1147         def test_no_detection(self):
1148             """See if the VCS detects its installed repository
1149             """
1150             if self.s.installed() and self.Class.name != 'None':
1151                 self.s.disconnect()
1152                 self.s.destroy()
1153                 self.failUnless(self.s._detect(self.dirname) == False,
1154                     'Detected %(name)s VCS before initialising'
1155                     % vars(self.Class))
1156                 self.s.init()
1157                 self.s.connect()
1158
1159         def test_vcs_repo_in_specified_root_path(self):
1160             """VCS root directory should be in specified root path."""
1161             rp = os.path.realpath(self.s.repo)
1162             dp = os.path.realpath(self.dirname)
1163             vcs_name = self.Class.name
1164             self.failUnless(
1165                 dp == rp or rp == None,
1166                 "%(vcs_name)s VCS root in wrong dir (%(dp)s %(rp)s)" % vars())
1167
1168     class VCS_get_user_id_TestCase(VCSTestCase):
1169         """Test cases for VCS.get_user_id method."""
1170
1171         def test_gets_existing_user_id(self):
1172             """Should get the existing user ID."""
1173             if self.s.installed():
1174                 user_id = self.s.get_user_id()
1175                 if user_id == None:
1176                     return
1177                 name,email = libbe.ui.util.user.parse_user_id(user_id)
1178                 if email != None:
1179                     self.failUnless('@' in email, email)
1180
1181     def make_vcs_testcase_subclasses(vcs_class, namespace):
1182         c = vcs_class()
1183         if c.installed():
1184             if c.versioned == True:
1185                 libbe.storage.base.make_versioned_storage_testcase_subclasses(
1186                     vcs_class, namespace)
1187             else:
1188                 libbe.storage.base.make_storage_testcase_subclasses(
1189                     vcs_class, namespace)
1190
1191         if namespace != sys.modules[__name__]:
1192             # Make VCSTestCase subclasses for vcs_class in the namespace.
1193             vcs_testcase_classes = [
1194                 c for c in (
1195                     ob for ob in globals().values() if isinstance(ob, type))
1196                 if issubclass(c, VCSTestCase) \
1197                     and c.Class == VCS]
1198
1199             for base_class in vcs_testcase_classes:
1200                 testcase_class_name = vcs_class.__name__ + base_class.__name__
1201                 testcase_class_bases = (base_class,)
1202                 testcase_class_dict = dict(base_class.__dict__)
1203                 testcase_class_dict['Class'] = vcs_class
1204                 testcase_class = type(
1205                     testcase_class_name, testcase_class_bases, testcase_class_dict)
1206                 setattr(namespace, testcase_class_name, testcase_class)
1207
1208     make_vcs_testcase_subclasses(VCS, sys.modules[__name__])
1209
1210     unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
1211     suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])