Use relative paths *._vcs_* methods.
[be.git] / libbe / storage / vcs / base.py
1 # Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc.
2 #                         Alexander Belchenko <bialix@ukr.net>
3 #                         Ben Finney <benf@cybersource.com.au>
4 #                         Chris Ball <cjb@laptop.org>
5 #                         Gianluca Montecchi <gian@grys.it>
6 #                         W. Trevor King <wking@drexel.edu>
7 #
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 2 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License along
19 # with this program; if not, write to the Free Software Foundation, Inc.,
20 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21
22 """
23 Define the base VCS (Version Control System) class, which should be
24 subclassed by other Version Control System backends.  The base class
25 implements a "do not version" VCS.
26 """
27
28 import codecs
29 import os
30 import os.path
31 import re
32 import shutil
33 import sys
34 import tempfile
35 import types
36
37 import libbe
38 import libbe.storage
39 import libbe.storage.base
40 import libbe.util.encoding
41 from libbe.storage.base import EmptyCommit, InvalidRevision, InvalidID
42 from libbe.util.utility import Dir, search_parent_directories
43 from libbe.util.subproc import CommandError, invoke
44 from libbe.util.plugin import import_by_name
45 import libbe.storage.util.upgrade as upgrade
46
47 if libbe.TESTING == True:
48     import unittest
49     import doctest
50
51     import libbe.ui.util.user
52
53 # List VCS modules in order of preference.
54 # Don't list this module, it is implicitly last.
55 VCS_ORDER = ['arch', 'bzr', 'darcs', 'git', 'hg']
56
57 def set_preferred_vcs(name):
58     global VCS_ORDER
59     assert name in VCS_ORDER, \
60         'unrecognized VCS %s not in\n  %s' % (name, VCS_ORDER)
61     VCS_ORDER.remove(name)
62     VCS_ORDER.insert(0, name)
63
64 def _get_matching_vcs(matchfn):
65     """Return the first module for which matchfn(VCS_instance) is true"""
66     for submodname in VCS_ORDER:
67         module = import_by_name('libbe.storage.vcs.%s' % submodname)
68         vcs = module.new()
69         if matchfn(vcs) == True:
70             return vcs
71     return VCS()
72
73 def vcs_by_name(vcs_name):
74     """Return the module for the VCS with the given name"""
75     if vcs_name == VCS.name:
76         return new()
77     return _get_matching_vcs(lambda vcs: vcs.name == vcs_name)
78
79 def detect_vcs(dir):
80     """Return an VCS instance for the vcs being used in this directory"""
81     return _get_matching_vcs(lambda vcs: vcs._detect(dir))
82
83 def installed_vcs():
84     """Return an instance of an installed VCS"""
85     return _get_matching_vcs(lambda vcs: vcs.installed())
86
87
88 class VCSNotRooted (libbe.storage.base.ConnectionError):
89     def __init__(self, vcs):
90         msg = 'VCS not rooted'
91         libbe.storage.base.ConnectionError.__init__(self, msg)
92         self.vcs = vcs
93
94 class VCSUnableToRoot (libbe.storage.base.ConnectionError):
95     def __init__(self, vcs):
96         msg = 'VCS unable to root'
97         libbe.storage.base.ConnectionError.__init__(self, msg)
98         self.vcs = vcs
99
100 class InvalidPath (InvalidID):
101     def __init__(self, path, root, msg=None, **kwargs):
102         if msg == None:
103             msg = 'Path "%s" not in root "%s"' % (path, root)
104         InvalidID.__init__(self, msg=msg, **kwargs)
105         self.path = path
106         self.root = root
107
108 class SpacerCollision (InvalidPath):
109     def __init__(self, path, spacer):
110         msg = 'Path "%s" collides with spacer directory "%s"' % (path, spacer)
111         InvalidPath.__init__(self, path, root=None, msg=msg)
112         self.spacer = spacer
113
114 class NoSuchFile (InvalidID):
115     def __init__(self, pathname, root='.'):
116         path = os.path.abspath(os.path.join(root, pathname))
117         InvalidID.__init__(self, 'No such file: %s' % path)
118
119
120 class CachedPathID (object):
121     """
122     Storage ID <-> path policy.
123       .../.be/BUGDIR/bugs/BUG/comments/COMMENT
124         ^-- root path
125
126     >>> dir = Dir()
127     >>> os.mkdir(os.path.join(dir.path, '.be'))
128     >>> os.mkdir(os.path.join(dir.path, '.be', 'abc'))
129     >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs'))
130     >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123'))
131     >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments'))
132     >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def'))
133     >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '456'))
134     >>> file(os.path.join(dir.path, '.be', 'abc', 'values'),
135     ...      'w').close()
136     >>> file(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'values'),
137     ...      'w').close()
138     >>> file(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'),
139     ...      'w').close()
140     >>> c = CachedPathID()
141     >>> c.root(dir.path)
142     >>> c.id(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'))
143     'def/values'
144     >>> c.init()
145     >>> sorted(os.listdir(os.path.join(c._root, '.be')))
146     ['abc', 'id-cache']
147     >>> c.connect()
148     >>> c.path('123/values') # doctest: +ELLIPSIS
149     u'.../.be/abc/bugs/123/values'
150     >>> c.disconnect()
151     >>> c.destroy()
152     >>> sorted(os.listdir(os.path.join(c._root, '.be')))
153     ['abc']
154     >>> c.connect() # demonstrate auto init
155     >>> sorted(os.listdir(os.path.join(c._root, '.be')))
156     ['abc', 'id-cache']
157     >>> c.add_id(u'xyz', parent=None) # doctest: +ELLIPSIS
158     u'.../.be/xyz'
159     >>> c.add_id('xyz/def', parent='xyz') # doctest: +ELLIPSIS
160     u'.../.be/xyz/def'
161     >>> c.add_id('qrs', parent='123') # doctest: +ELLIPSIS
162     u'.../.be/abc/bugs/123/comments/qrs'
163     >>> c.disconnect()
164     >>> c.connect()
165     >>> c.path('qrs') # doctest: +ELLIPSIS
166     u'.../.be/abc/bugs/123/comments/qrs'
167     >>> c.remove_id('qrs')
168     >>> c.path('qrs')
169     Traceback (most recent call last):
170       ...
171     InvalidID: qrs in revision None
172     >>> c.disconnect()
173     >>> c.destroy()
174     >>> dir.cleanup()
175     """
176     def __init__(self, encoding=None):
177         self.encoding = libbe.util.encoding.get_filesystem_encoding()
178         self._spacer_dirs = ['.be', 'bugs', 'comments']
179
180     def root(self, path):
181         self._root = os.path.abspath(path).rstrip(os.path.sep)
182         self._cache_path = os.path.join(
183             self._root, self._spacer_dirs[0], 'id-cache')
184
185     def init(self):
186         """
187         Create cache file for an existing .be directory.
188         File if multiple lines of the form:
189           UUID\tPATH
190         """
191         self._cache = {}
192         spaced_root = os.path.join(self._root, self._spacer_dirs[0])
193         for dirpath, dirnames, filenames in os.walk(spaced_root):
194             if dirpath == spaced_root:
195                 continue
196             try:
197                 id = self.id(dirpath)
198                 relpath = dirpath[len(self._root)+1:]
199                 if id.count('/') == 0:
200                     if id in self._cache:
201                         print >> sys.stderr, 'Multiple paths for %s: \n  %s\n  %s' % (id, self._cache[id], relpath)
202                     self._cache[id] = relpath
203             except InvalidPath:
204                 pass
205         self._changed = True
206         self.disconnect()
207
208     def destroy(self):
209         if os.path.exists(self._cache_path):
210             os.remove(self._cache_path)
211
212     def connect(self):
213         if not os.path.exists(self._cache_path):
214             try:
215                 self.init()
216             except IOError:
217                 raise libbe.storage.base.ConnectionError
218         self._cache = {} # key: uuid, value: path
219         self._changed = False
220         f = codecs.open(self._cache_path, 'r', self.encoding)
221         for line in f:
222             fields = line.rstrip('\n').split('\t')
223             self._cache[fields[0]] = fields[1]
224         f.close()
225
226     def disconnect(self):
227         if self._changed == True:
228             f = codecs.open(self._cache_path, 'w', self.encoding)
229             for uuid,path in self._cache.items():
230                 f.write('%s\t%s\n' % (uuid, path))
231             f.close()
232         self._cache = {}
233
234     def path(self, id, relpath=False):
235         fields = id.split('/', 1)
236         uuid = fields[0]
237         if len(fields) == 1:
238             extra = []
239         else:
240             extra = fields[1:]
241         if uuid not in self._cache:
242             raise InvalidID(uuid)
243         if relpath == True:
244             return os.path.join(self._cache[uuid], *extra)
245         return os.path.join(self._root, self._cache[uuid], *extra)
246
247     def add_id(self, id, parent=None):
248         if id.count('/') > 0:
249             # not a UUID-level path
250             assert id.startswith(parent), \
251                 'Strange ID: "%s" should start with "%s"' % (id, parent)
252             path = self.path(id)
253         elif id in self._cache:
254             # already added
255             path = self.path(id)
256         else:
257             if parent == None:
258                 parent_path = ''
259                 spacer = self._spacer_dirs[0]
260             else:
261                 assert parent.count('/') == 0, \
262                     'Strange parent ID: "%s" should be UUID' % parent
263                 parent_path = self.path(parent, relpath=True)
264                 parent_spacer = parent_path.split(os.path.sep)[-2]
265                 i = self._spacer_dirs.index(parent_spacer)
266                 spacer = self._spacer_dirs[i+1]
267             path = os.path.join(parent_path, spacer, id)
268             self._cache[id] = path
269             self._changed = True
270             path = os.path.join(self._root, path)
271         return path
272
273     def remove_id(self, id):
274         if id.count('/') > 0:
275             return # not a UUID-level path
276         self._cache.pop(id)
277         self._changed = True
278
279     def id(self, path):
280         path = os.path.join(self._root, path)
281         if not path.startswith(self._root + os.path.sep):
282             raise InvalidPath(path, self._root)
283         path = path[len(self._root)+1:]
284         orig_path = path
285         if not path.startswith(self._spacer_dirs[0] + os.path.sep):
286             raise InvalidPath(path, self._spacer_dirs[0])
287         for spacer in self._spacer_dirs:
288             if not path.startswith(spacer + os.path.sep):
289                 break
290             id = path[len(spacer)+1:]
291             fields = path[len(spacer)+1:].split(os.path.sep,1)
292             if len(fields) == 1:
293                 break
294             path = fields[1]
295         for spacer in self._spacer_dirs:
296             if id.endswith(os.path.sep + spacer):
297                 raise SpacerCollision(orig_path, spacer)
298         if os.path.sep != '/':
299             id = id.replace(os.path.sep, '/')
300         return id
301
302
303 def new():
304     return VCS()
305
306 class VCS (libbe.storage.base.VersionedStorage):
307     """
308     This class implements a 'no-vcs' interface.
309
310     Support for other VCSs can be added by subclassing this class, and
311     overriding methods _vcs_*() with code appropriate for your VCS.
312
313     The methods _u_*() are utility methods available to the _vcs_*()
314     methods.
315
316     Sink to existing root
317     ======================
318
319     Consider the following usage case:
320     You have a bug directory rooted in
321       /path/to/source
322     by which I mean the '.be' directory is at
323       /path/to/source/.be
324     However, you're of in some subdirectory like
325       /path/to/source/GUI/testing
326     and you want to comment on a bug.  Setting sink_to_root=True when
327     you initialize your BugDir will cause it to search for the '.be'
328     file in the ancestors of the path you passed in as 'root'.
329       /path/to/source/GUI/testing/.be     miss
330       /path/to/source/GUI/.be             miss
331       /path/to/source/.be                 hit!
332     So it still roots itself appropriately without much work for you.
333
334     File-system access
335     ==================
336
337     BugDirs live completely in memory when .sync_with_disk is False.
338     This is the default configuration setup by BugDir(from_disk=False).
339     If .sync_with_disk == True (e.g. BugDir(from_disk=True)), then
340     any changes to the BugDir will be immediately written to disk.
341
342     If you want to change .sync_with_disk, we suggest you use
343     .set_sync_with_disk(), which propogates the new setting through to
344     all bugs/comments/etc. that have been loaded into memory.  If
345     you've been living in memory and want to move to
346     .sync_with_disk==True, but you're not sure if anything has been
347     changed in memory, a call to .save() immediately before the
348     .set_sync_with_disk(True) call is a safe move.
349
350     Regardless of .sync_with_disk, a call to .save() will write out
351     all the contents that the BugDir instance has loaded into memory.
352     If sync_with_disk has been True over the course of all interesting
353     changes, this .save() call will be a waste of time.
354
355     The BugDir will only load information from the file system when it
356     loads new settings/bugs/comments that it doesn't already have in
357     memory and .sync_with_disk == True.
358
359     Allow storage initialization
360     ========================
361
362     This one is for testing purposes.  Setting it to True allows the
363     BugDir to search for an installed Storage backend and initialize
364     it in the root directory.  This is a convenience option for
365     supporting tests of versioning functionality
366     (e.g. RevisionedBugDir).
367
368     Disable encoding manipulation
369     =============================
370
371     This one is for testing purposed.  You might have non-ASCII
372     Unicode in your bugs, comments, files, etc.  BugDir instances try
373     and support your preferred encoding scheme (e.g. "utf-8") when
374     dealing with stream and file input/output.  For stream output,
375     this involves replacing sys.stdout and sys.stderr
376     (libbe.encode.set_IO_stream_encodings).  However this messes up
377     doctest's output catching.  In order to support doctest tests
378     using BugDirs, set manipulate_encodings=False, and stick to ASCII
379     in your tests.
380
381         if root == None:
382             root = os.getcwd()
383         if sink_to_existing_root == True:
384             self.root = self._find_root(root)
385         else:
386             if not os.path.exists(root):
387                 self.root = None
388                 raise NoRootEntry(root)
389             self.root = root
390         # get a temporary storage until we've loaded settings
391         self.sync_with_disk = False
392         self.storage = self._guess_storage()
393
394             if assert_new_BugDir == True:
395                 if os.path.exists(self.get_path()):
396                     raise AlreadyInitialized, self.get_path()
397             if storage == None:
398                 storage = self._guess_storage(allow_storage_init)
399             self.storage = storage
400             self._setup_user_id(self.user_id)
401
402
403     # methods for getting the BugDir situated in the filesystem
404
405     def _find_root(self, path):
406         '''
407         Search for an existing bug database dir and it's ancestors and
408         return a BugDir rooted there.  Only called by __init__, and
409         then only if sink_to_existing_root == True.
410         '''
411         if not os.path.exists(path):
412             self.root = None
413             raise NoRootEntry(path)
414         versionfile=utility.search_parent_directories(path,
415                                                       os.path.join(".be", "version"))
416         if versionfile != None:
417             beroot = os.path.dirname(versionfile)
418             root = os.path.dirname(beroot)
419             return root
420         else:
421             beroot = utility.search_parent_directories(path, ".be")
422             if beroot == None:
423                 self.root = None
424                 raise NoBugDir(path)
425             return beroot
426
427     def _guess_storage(self, allow_storage_init=False):
428         '''
429         Only called by __init__.
430         '''
431         deepdir = self.get_path()
432         if not os.path.exists(deepdir):
433             deepdir = os.path.dirname(deepdir)
434         new_storage = storage.detect_storage(deepdir)
435         install = False
436         if new_storage.name == "None":
437             if allow_storage_init == True:
438                 new_storage = storage.installed_storage()
439                 new_storage.init(self.root)
440         return new_storage
441
442 os.listdir(self.get_path("bugs")):
443     """
444     name = 'None'
445     client = 'false' # command-line tool for _u_invoke_client
446
447     def __init__(self, *args, **kwargs):
448         if 'encoding' not in kwargs:
449             kwargs['encoding'] = libbe.util.encoding.get_filesystem_encoding()
450         libbe.storage.base.VersionedStorage.__init__(self, *args, **kwargs)
451         self.versioned = False
452         self.interspersed_vcs_files = False
453         self.verbose_invoke = False
454         self._cached_path_id = CachedPathID()
455         self._rooted = False
456
457     def _vcs_version(self):
458         """
459         Return the VCS version string.
460         """
461         return '0'
462
463     def _vcs_get_user_id(self):
464         """
465         Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
466         If the VCS has not been configured with a username, return None.
467         """
468         return None
469
470     def _vcs_detect(self, path=None):
471         """
472         Detect whether a directory is revision controlled with this VCS.
473         """
474         return True
475
476     def _vcs_root(self, path):
477         """
478         Get the VCS root.  This is the default working directory for
479         future invocations.  You would normally set this to the root
480         directory for your VCS.
481         """
482         if os.path.isdir(path) == False:
483             path = os.path.dirname(path)
484             if path == '':
485                 path = os.path.abspath('.')
486         return path
487
488     def _vcs_init(self, path):
489         """
490         Begin versioning the tree based at path.
491         """
492         pass
493
494     def _vcs_destroy(self):
495         """
496         Remove any files used in versioning (e.g. whatever _vcs_init()
497         created).
498         """
499         pass
500
501     def _vcs_add(self, path):
502         """
503         Add the already created file at path to version control.
504         """
505         pass
506
507     def _vcs_remove(self, path):
508         """
509         Remove the file at path from version control.  Optionally
510         remove the file from the filesystem as well.
511         """
512         pass
513
514     def _vcs_update(self, path):
515         """
516         Notify the versioning system of changes to the versioned file
517         at path.
518         """
519         pass
520
521     def _vcs_is_versioned(self, path):
522         """
523         Return true if a path is under version control, False
524         otherwise.  You only need to set this if the VCS goes about
525         dumping VCS-specific files into the .be directory.
526
527         If you do need to implement this method (e.g. Arch), set
528           self.interspersed_vcs_files = True
529         """
530         assert self.interspersed_vcs_files == False
531         raise NotImplementedError
532
533     def _vcs_get_file_contents(self, path, revision=None):
534         """
535         Get the file contents as they were in a given revision.
536         Revision==None specifies the current revision.
537         """
538         if revision != None:
539             raise libbe.storage.base.InvalidRevision(
540                 'The %s VCS does not support revision specifiers' % self.name)
541         path = os.path.join(self.repo, path)
542         if not os.path.exists(path):
543             return libbe.util.InvalidObject
544         if os.path.isdir(path):
545             return libbe.storage.base.InvalidDirectory
546         f = open(path, 'rb')
547         contents = f.read()
548         f.close()
549         return contents
550
551     def _vcs_path(self, id, revision):
552         """
553         Return the path to object id as of revision.
554         
555         Revision will not be None.
556         """
557         raise NotImplementedError
558
559     def _vcs_isdir(self, path, revision):
560         """
561         Return True if path (as returned by _vcs_path) was a directory
562         as of revision, False otherwise.
563         
564         Revision will not be None.
565         """
566         raise NotImplementedError
567
568     def _vcs_listdir(self, path, revision):
569         """
570         Return a list of the contents of the directory path (as
571         returned by _vcs_path) as of revision.
572         
573         Revision will not be None, and ._vcs_isdir(path, revision)
574         will be True.
575         """
576         raise NotImplementedError
577
578     def _vcs_commit(self, commitfile, allow_empty=False):
579         """
580         Commit the current working directory, using the contents of
581         commitfile as the comment.  Return the name of the old
582         revision (or None if commits are not supported).
583
584         If allow_empty == False, raise EmptyCommit if there are no
585         changes to commit.
586         """
587         return None
588
589     def _vcs_revision_id(self, index):
590         """
591         Return the name of the <index>th revision.  Index will be an
592         integer (possibly <= 0).  The choice of which branch to follow
593         when crossing branches/merges is not defined.
594
595         Return None if revision IDs are not supported, or if the
596         specified revision does not exist.
597         """
598         return None
599
600     def _vcs_changed(self, revision):
601         """
602         Return a tuple of lists of ids
603           (new, modified, removed)
604         from the specified revision to the current situation.
605         """
606         return ([], [], [])
607
608     def version(self):
609         # Cache version string for efficiency.
610         if not hasattr(self, '_version'):
611             self._version = self._get_version()
612         return self._version
613
614     def _get_version(self):
615         try:
616             ret = self._vcs_version()
617             return ret
618         except OSError, e:
619             if e.errno == errno.ENOENT:
620                 return None
621             else:
622                 raise OSError, e
623         except CommandError:
624             return None
625
626     def installed(self):
627         if self.version() != None:
628             return True
629         return False
630
631     def get_user_id(self):
632         """
633         Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
634         If the VCS has not been configured with a username, return None.
635         You can override the automatic lookup procedure by setting the
636         VCS.user_id attribute to a string of your choice.
637         """
638         if not hasattr(self, 'user_id'):
639             self.user_id = self._vcs_get_user_id()
640         return self.user_id
641
642     def _detect(self, path='.'):
643         """
644         Detect whether a directory is revision controlled with this VCS.
645         """
646         return self._vcs_detect(path)
647
648     def root(self):
649         """
650         Set the root directory to the path's VCS root.  This is the
651         default working directory for future invocations.
652         """
653         if self._detect(self.repo) == False:
654             raise VCSUnableToRoot(self)
655         root = self._vcs_root(self.repo)
656         self.repo = os.path.abspath(root)
657         if os.path.isdir(self.repo) == False:
658             self.repo = os.path.dirname(self.repo)
659         self.be_dir = os.path.join(
660             self.repo, self._cached_path_id._spacer_dirs[0])
661         self._cached_path_id.root(self.repo)
662         self._rooted = True
663
664     def _init(self):
665         """
666         Begin versioning the tree based at self.repo.
667         Also roots the vcs at path.
668         """
669         if not os.path.exists(self.repo) or not os.path.isdir(self.repo):
670             raise VCSUnableToRoot(self)
671         if self._vcs_detect(self.repo) == False:
672             self._vcs_init(self.repo)
673         if self._rooted == False:
674             self.root()
675         os.mkdir(self.be_dir)
676         self._vcs_add(self._u_rel_path(self.be_dir))
677         self._setup_storage_version()
678         self._cached_path_id.init()
679
680     def _destroy(self):
681         self._vcs_destroy()
682         self._cached_path_id.destroy()
683         if os.path.exists(self.be_dir):
684             shutil.rmtree(self.be_dir)
685
686     def _connect(self):
687         if self._rooted == False:
688             self.root()
689         if not os.path.isdir(self.be_dir):
690             raise libbe.storage.base.ConnectionError(self)
691         self._cached_path_id.connect()
692         self.check_storage_version()
693
694     def _disconnect(self):
695         self._cached_path_id.disconnect()
696
697     def _add_path(self, path, directory=False):
698         relpath = self._u_rel_path(path)
699         reldirs = relpath.split(os.path.sep)
700         if directory == False:
701             reldirs = reldirs[:-1]
702         dir = self.repo
703         for reldir in reldirs:
704             dir = os.path.join(dir, reldir)
705             if not os.path.exists(dir):
706                 os.mkdir(dir)
707                 self._vcs_add(self._u_rel_path(dir))
708             elif not os.path.isdir(dir):
709                 raise libbe.storage.base.InvalidDirectory
710         if directory == False:
711             if not os.path.exists(path):
712                 open(path, 'w').close()
713             self._vcs_add(self._u_rel_path(path))
714
715     def _add(self, id, parent=None, **kwargs):
716         path = self._cached_path_id.add_id(id, parent)
717         self._add_path(path, **kwargs)
718
719     def _remove(self, id):
720         path = self._cached_path_id.path(id)
721         if os.path.exists(path):
722             if os.path.isdir(path) and len(self.children(id)) > 0:
723                 raise libbe.storage.base.DirectoryNotEmpty(id)
724             self._vcs_remove(self._u_rel_path(path))
725             if os.path.exists(path):
726                 if os.path.isdir(path):
727                     os.rmdir(path)
728                 else:
729                     os.remove(path)
730         self._cached_path_id.remove_id(id)
731
732     def _recursive_remove(self, id):
733         path = self._cached_path_id.path(id)
734         for dirpath,dirnames,filenames in os.walk(path, topdown=False):
735             filenames.extend(dirnames)
736             for f in filenames:
737                 fullpath = os.path.join(dirpath, f)
738                 if os.path.exists(fullpath) == False:
739                     continue
740                 self._vcs_remove(self._u_rel_path(fullpath))
741         if os.path.exists(path):
742             shutil.rmtree(path)
743         path = self._cached_path_id.path(id, relpath=True)
744         for id,p in self._cached_path_id._cache.items():
745             if p.startswith(path):
746                 self._cached_path_id.remove_id(id)
747
748     def _ancestors(self, id=None, revision=None):
749         if revision == None:
750             id_to_path = self._cached_path_id.path
751         else:
752             id_to_path = lambda id : self._vcs_path(id, revision)
753         if id==None:
754             path = self.be_dir
755         else:
756             path = id_to_path(id)
757         ancestors = []
758         while True:
759             if not path.startswith(self.repo + os.path.sep):
760                 break
761             path = os.path.dirname(path)
762             try:
763                 id = self._u_path_to_id(path)
764                 ancestors.append(id)
765             except (SpacerCollision, InvalidPath):
766                 pass    
767         return ancestors
768
769     def _children(self, id=None, revision=None):
770         if revision == None:
771             id_to_path = self._cached_path_id.path
772             isdir = os.path.isdir
773             listdir = os.listdir
774         else:
775             id_to_path = lambda id : self._vcs_path(id, revision)
776             isdir = lambda path : self._vcs_isdir(
777                 self._u_rel_path(path), revision)
778             listdir = lambda path : self._vcs_listdir(
779                 self._u_rel_path(path), revision)
780         if id==None:
781             path = self.be_dir
782         else:
783             path = id_to_path(id)
784         if isdir(path) == False: 
785             return []
786         children = listdir(path)
787         for i,c in enumerate(children):
788             if c in self._cached_path_id._spacer_dirs:
789                 children[i] = None
790                 children.extend([os.path.join(c, c2) for c2 in
791                                  listdir(os.path.join(path, c))])
792             elif c in ['id-cache', 'version']:
793                 children[i] = None
794             elif self.interspersed_vcs_files \
795                     and self._vcs_is_versioned(c) == False:
796                 children[i] = None
797         for i,c in enumerate(children):
798             if c == None: continue
799             cpath = os.path.join(path, c)
800             if self.interspersed_vcs_files == True \
801                     and revision != None \
802                     and self._vcs_is_versioned(cpath) == False:
803                 children[i] = None
804             else:
805                 children[i] = self._u_path_to_id(cpath)
806                 children[i]
807         return [c for c in children if c != None]
808
809     def _get(self, id, default=libbe.util.InvalidObject, revision=None):
810         try:
811             path = self._cached_path_id.path(id)
812         except InvalidID, e:
813             if default == libbe.util.InvalidObject:
814                 raise e
815             return default
816         relpath = self._u_rel_path(path)
817         try:
818             contents = self._vcs_get_file_contents(relpath, revision)
819         except InvalidID, e:
820             if e.id == None:
821                 e.id = id
822             if e.revision == None:
823                 e.revision = revision
824             raise
825         if contents in [libbe.storage.base.InvalidDirectory,
826                         libbe.util.InvalidObject]:
827             raise InvalidID(id, revision)
828         elif len(contents) == 0:
829             return None
830         return contents
831
832     def _set(self, id, value):
833         try:
834             path = self._cached_path_id.path(id)
835         except InvalidID, e:
836             raise
837         if not os.path.exists(path):
838             raise InvalidID(id)
839         if os.path.isdir(path):
840             raise libbe.storage.base.InvalidDirectory(id)
841         f = open(path, "wb")
842         f.write(value)
843         f.close()
844         self._vcs_update(self._u_rel_path(path))
845
846     def _commit(self, summary, body=None, allow_empty=False):
847         summary = summary.strip()+'\n'
848         if body is not None:
849             summary += '\n' + body.strip() + '\n'
850         descriptor, filename = tempfile.mkstemp()
851         revision = None
852         try:
853             temp_file = os.fdopen(descriptor, 'wb')
854             temp_file.write(summary)
855             temp_file.flush()
856             revision = self._vcs_commit(filename, allow_empty=allow_empty)
857             temp_file.close()
858         finally:
859             os.remove(filename)
860         return revision
861
862     def revision_id(self, index=None):
863         if index == None:
864             return None
865         try:
866             if int(index) != index:
867                 raise InvalidRevision(index)
868         except ValueError:
869             raise InvalidRevision(index)
870         revid = self._vcs_revision_id(index)
871         if revid == None:
872             raise libbe.storage.base.InvalidRevision(index)
873         return revid
874
875     def changed(self, revision):
876         new,mod,rem = self._vcs_changed(revision)
877         def paths_to_ids(paths):
878             for p in paths:
879                 try:
880                     id = self._u_path_to_id(p)
881                     yield id
882                 except (SpacerCollision, InvalidPath):
883                     pass
884         new_id = list(paths_to_ids(new))
885         mod_id = list(paths_to_ids(mod))
886         rem_id = list(paths_to_ids(rem))
887         return (new_id, mod_id, rem_id)
888
889     def _u_any_in_string(self, list, string):
890         """
891         Return True if any of the strings in list are in string.
892         Otherwise return False.
893         """
894         for list_string in list:
895             if list_string in string:
896                 return True
897         return False
898
899     def _u_invoke(self, *args, **kwargs):
900         if 'cwd' not in kwargs:
901             kwargs['cwd'] = self.repo
902         if 'verbose' not in kwargs:
903             kwargs['verbose'] = self.verbose_invoke
904         if 'encoding' not in kwargs:
905             kwargs['encoding'] = self.encoding
906         return invoke(*args, **kwargs)
907
908     def _u_invoke_client(self, *args, **kwargs):
909         cl_args = [self.client]
910         cl_args.extend(args)
911         return self._u_invoke(cl_args, **kwargs)
912
913     def _u_search_parent_directories(self, path, filename):
914         """
915         Find the file (or directory) named filename in path or in any
916         of path's parents.
917
918         e.g.
919           search_parent_directories("/a/b/c", ".be")
920         will return the path to the first existing file from
921           /a/b/c/.be
922           /a/b/.be
923           /a/.be
924           /.be
925         or None if none of those files exist.
926         """
927         try:
928             ret = search_parent_directories(path, filename)
929         except AssertionError, e:
930             return None
931         return ret
932
933     def _u_find_id_from_manifest(self, id, manifest, revision=None):
934         """
935         Search for the relative path to id using manifest, a list of all files.
936         
937         Returns None if the id is not found.
938         """
939         be_dir = self._cached_path_id._spacer_dirs[0]
940         be_dir_sep = self._cached_path_id._spacer_dirs[0] + os.path.sep
941         files = [f for f in manifest if f.startswith(be_dir_sep)]
942         for file in files:
943             if not file.startswith(be_dir+os.path.sep):
944                 continue
945             parts = file.split(os.path.sep)
946             dir = parts.pop(0) # don't add the first spacer dir
947             for part in parts[:-1]:
948                 dir = os.path.join(dir, part)
949                 if not dir in files:
950                     files.append(dir)
951         for file in files:
952             try:
953                 p_id = self._u_path_to_id(file)
954                 if p_id == id:
955                     return file
956             except (SpacerCollision, InvalidPath):
957                 pass
958         raise InvalidID(id, revision=revision)
959
960     def _u_find_id(self, id, revision):
961         """
962         Search for the relative path to id as of revision.
963         Returns None if the id is not found.
964         """
965         assert self._rooted == True
966         be_dir = self._cached_path_id._spacer_dirs[0]
967         stack = [(be_dir, be_dir)]
968         while len(stack) > 0:
969             path,long_id = stack.pop()
970             if long_id.endswith('/'+id):
971                 return path
972             if self._vcs_isdir(path, revision) == False:
973                 continue
974             for child in self._vcs_listdir(path, revision):
975                 stack.append((os.path.join(path, child),
976                               '/'.join([long_id, child])))
977         raise InvalidID(id, revision=revision)
978
979     def _u_path_to_id(self, path):
980         return self._cached_path_id.id(path)
981
982     def _u_rel_path(self, path, root=None):
983         """
984         Return the relative path to path from root.
985         >>> vcs = new()
986         >>> vcs._u_rel_path("/a.b/c/.be", "/a.b/c")
987         '.be'
988         >>> vcs._u_rel_path("/a.b/c/", "/a.b/c")
989         '.'
990         >>> vcs._u_rel_path("/a.b/c/", "/a.b/c/")
991         '.'
992         >>> vcs._u_rel_path("./a", ".")
993         'a'
994         """
995         if root == None:
996             if self.repo == None:
997                 raise VCSNotRooted(self)
998             root = self.repo
999         path = os.path.abspath(path)
1000         absRoot = os.path.abspath(root)
1001         absRootSlashedDir = os.path.join(absRoot,"")
1002         if path in [absRoot, absRootSlashedDir]:
1003             return '.'
1004         if not path.startswith(absRootSlashedDir):
1005             raise InvalidPath(path, absRootSlashedDir)
1006         relpath = path[len(absRootSlashedDir):]
1007         return relpath
1008
1009     def _u_abspath(self, path, root=None):
1010         """
1011         Return the absolute path from a path realtive to root.
1012         >>> vcs = new()
1013         >>> vcs._u_abspath(".be", "/a.b/c")
1014         '/a.b/c/.be'
1015         """
1016         if root == None:
1017             assert self.repo != None, "VCS not rooted"
1018             root = self.repo
1019         return os.path.abspath(os.path.join(root, path))
1020
1021     def _u_parse_commitfile(self, commitfile):
1022         """
1023         Split the commitfile created in self.commit() back into
1024         summary and header lines.
1025         """
1026         f = codecs.open(commitfile, 'r', self.encoding)
1027         summary = f.readline()
1028         body = f.read()
1029         body.lstrip('\n')
1030         if len(body) == 0:
1031             body = None
1032         f.close()
1033         return (summary, body)
1034
1035     def check_storage_version(self):
1036         version = self.storage_version()
1037         if version != libbe.storage.STORAGE_VERSION:
1038             upgrade.upgrade(self.repo, version)
1039
1040     def storage_version(self, revision=None, path=None):
1041         """
1042         Requires disk access.
1043         """
1044         if path == None:
1045             path = os.path.join(self.repo, '.be', 'version')
1046         if not os.path.exists(path):
1047             raise libbe.storage.InvalidStorageVersion(None)
1048         if revision == None: # don't require connection
1049             return libbe.util.encoding.get_file_contents(
1050                 path, decode=True).rstrip('\n')
1051         relpath = self._u_rel_path(path)
1052         contents = self._vcs_get_file_contents(relpath, revision=revision)
1053         if type(contents) != types.UnicodeType:
1054             contents = unicode(contents, self.encoding)
1055         return contents.strip()
1056
1057     def _setup_storage_version(self):
1058         """
1059         Requires disk access.
1060         """
1061         assert self._rooted == True
1062         path = os.path.join(self.be_dir, 'version')
1063         if not os.path.exists(path):
1064             libbe.util.encoding.set_file_contents(path,
1065                 libbe.storage.STORAGE_VERSION+'\n')
1066             self._vcs_add(self._u_rel_path(path))
1067
1068 \f
1069 if libbe.TESTING == True:
1070     class VCSTestCase (unittest.TestCase):
1071         """
1072         Test cases for base VCS class (in addition to the Storage test
1073         cases).
1074         """
1075
1076         Class = VCS
1077
1078         def __init__(self, *args, **kwargs):
1079             super(VCSTestCase, self).__init__(*args, **kwargs)
1080             self.dirname = None
1081
1082         def setUp(self):
1083             """Set up test fixtures for Storage test case."""
1084             super(VCSTestCase, self).setUp()
1085             self.dir = Dir()
1086             self.dirname = self.dir.path
1087             self.s = self.Class(repo=self.dirname)
1088             if self.s.installed() == True:
1089                 self.s.init()
1090                 self.s.connect()
1091
1092         def tearDown(self):
1093             super(VCSTestCase, self).tearDown()
1094             if self.s.installed() == True:
1095                 self.s.disconnect()
1096                 self.s.destroy()
1097             self.dir.cleanup()
1098
1099     class VCS_installed_TestCase (VCSTestCase):
1100         def test_installed(self):
1101             """See if the VCS is installed.
1102             """
1103             self.failUnless(self.s.installed() == True,
1104                             '%(name)s VCS not found' % vars(self.Class))
1105
1106
1107     class VCS_detection_TestCase (VCSTestCase):
1108         def test_detection(self):
1109             """See if the VCS detects its installed repository
1110             """
1111             if self.s.installed():
1112                 self.s.disconnect()
1113                 self.failUnless(self.s._detect(self.dirname) == True,
1114                     'Did not detected %(name)s VCS after initialising'
1115                     % vars(self.Class))
1116                 self.s.connect()
1117
1118         def test_no_detection(self):
1119             """See if the VCS detects its installed repository
1120             """
1121             if self.s.installed() and self.Class.name != 'None':
1122                 self.s.disconnect()
1123                 self.s.destroy()
1124                 self.failUnless(self.s._detect(self.dirname) == False,
1125                     'Detected %(name)s VCS before initialising'
1126                     % vars(self.Class))
1127                 self.s.init()
1128                 self.s.connect()
1129
1130         def test_vcs_repo_in_specified_root_path(self):
1131             """VCS root directory should be in specified root path."""
1132             rp = os.path.realpath(self.s.repo)
1133             dp = os.path.realpath(self.dirname)
1134             vcs_name = self.Class.name
1135             self.failUnless(
1136                 dp == rp or rp == None,
1137                 "%(vcs_name)s VCS root in wrong dir (%(dp)s %(rp)s)" % vars())
1138
1139     class VCS_get_user_id_TestCase(VCSTestCase):
1140         """Test cases for VCS.get_user_id method."""
1141
1142         def test_gets_existing_user_id(self):
1143             """Should get the existing user ID."""
1144             if self.s.installed():
1145                 user_id = self.s.get_user_id()
1146                 if user_id == None:
1147                     return
1148                 name,email = libbe.ui.util.user.parse_user_id(user_id)
1149                 if email != None:
1150                     self.failUnless('@' in email, email)
1151
1152     def make_vcs_testcase_subclasses(vcs_class, namespace):
1153         c = vcs_class()
1154         if c.installed():
1155             if c.versioned == True:
1156                 libbe.storage.base.make_versioned_storage_testcase_subclasses(
1157                     vcs_class, namespace)
1158             else:
1159                 libbe.storage.base.make_storage_testcase_subclasses(
1160                     vcs_class, namespace)
1161
1162         if namespace != sys.modules[__name__]:
1163             # Make VCSTestCase subclasses for vcs_class in the namespace.
1164             vcs_testcase_classes = [
1165                 c for c in (
1166                     ob for ob in globals().values() if isinstance(ob, type))
1167                 if issubclass(c, VCSTestCase) \
1168                     and c.Class == VCS]
1169
1170             for base_class in vcs_testcase_classes:
1171                 testcase_class_name = vcs_class.__name__ + base_class.__name__
1172                 testcase_class_bases = (base_class,)
1173                 testcase_class_dict = dict(base_class.__dict__)
1174                 testcase_class_dict['Class'] = vcs_class
1175                 testcase_class = type(
1176                     testcase_class_name, testcase_class_bases, testcase_class_dict)
1177                 setattr(namespace, testcase_class_name, testcase_class)
1178
1179     make_vcs_testcase_subclasses(VCS, sys.modules[__name__])
1180
1181     unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
1182     suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])