Merged revisions 1884-1905 via svnmerge from
[scons.git] / test / Perforce / Perforce.py
1 #!/usr/bin/env python
2 #
3 # __COPYRIGHT__
4 #
5 # Permission is hereby granted, free of charge, to any person obtaining
6 # a copy of this software and associated documentation files (the
7 # "Software"), to deal in the Software without restriction, including
8 # without limitation the rights to use, copy, modify, merge, publish,
9 # distribute, sublicense, and/or sell copies of the Software, and to
10 # permit persons to whom the Software is furnished to do so, subject to
11 # the following conditions:
12 #
13 # The above copyright notice and this permission notice shall be included
14 # in all copies or substantial portions of the Software.
15 #
16 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
17 # KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
18 # WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
19 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
20 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
21 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
22 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23 #
24
25 __revision__ = "__FILE__ __REVISION__ __DATE__ __DEVELOPER__"
26
27 """
28 Test fetching source files from Perforce.
29
30 This test requires that a Perforce server be running on the test system
31 on port 1666, as well as that of course a client must be present.
32 """
33
34 import os
35 import string
36
37 import TestSCons
38
39 class TestPerforce(TestSCons.TestSCons):
40     def __init__(self, *args, **kw):
41         apply(TestSCons.TestSCons.__init__, (self,)+args, kw)
42
43         self.p4path = self.where_is('p4')
44         if not self.p4path:
45             self.skip_test("Could not find 'p4'; skipping test(s).\n")
46
47         #import socket
48         #self.host = socket.gethostname()
49         self.host = '127.0.0.1'
50
51         self.user = os.environ.get('USER')
52         if not self.user:
53             self.user = os.environ.get('USERNAME')
54         if not self.user:
55             self.user = os.environ.get('P4USER')
56
57         self.depot = 'testme'
58
59         self.p4d = self.where_is('p4d')
60         if self.p4d:
61             self.p4portflags = ['-p', self.host + ':1777']
62             self.subdir('depot', ['depot', 'testme'])
63             def quote_space(a):
64                 if ' ' in a:
65                     a = '"%s"' % a
66                 return a
67             args = map(quote_space, [self.p4d, '-q', '-d'] + \
68                                     self.p4portflags + \
69                                     ['-J', 'Journal',
70                                      '-L', 'Log',
71                                      '-r', self.workpath('depot')])
72
73             # We don't use self.run() because the TestCmd logic will hang
74             # waiting for the daemon to exit, even when we pass it
75             # the -d option.
76             try:
77                 spawnv = os.spawnv
78             except AttributeError:
79                 os.system(string.join(args))
80             else:
81                 spawnv(os.P_NOWAIT, self.p4d, args)
82                 self.sleep(2)
83         else:
84             import socket
85             s = socket.socket()
86             host_port = (self.host, 1666)
87             try:
88                 s.connect(host_port)
89             except socket.error:
90                 self.skip_test("No Perforce server found; skipping test(s).\n")
91             else:
92                 s.close()
93
94             self.p4portflags = ['-p', '%s:%s' % host_port]
95             try:
96                 self.p4('obliterate -y //%s/...' % self.depot)
97                 self.p4('depot -d %s' % self.depot)
98             except TestSCons.TestFailed:
99                 # It's okay if this fails.  It will fail if the depot
100                 # is already clear.
101                 pass
102
103         self.portflag = string.join(self.p4portflags)
104
105     def p4(self, *args, **kw):
106         try:
107             arguments = kw['arguments']
108         except KeyError:
109             arguments = args[0]
110             args = args[1:]
111         kw['arguments'] = string.join(self.p4portflags + [arguments])
112         kw['program'] = self.p4path
113         return apply(self.run, args, kw)
114
115     def substitute(self, s, **kw):
116         kw = kw.copy()
117         kw.update(self.__dict__)
118         return s % kw
119
120     def cleanup(self, condition = None):
121         if self.p4d:
122             self.p4('admin stop')
123             self.p4d = None
124
125         if TestSCons:
126             TestSCons.TestSCons.cleanup(self, condition)
127
128 test = TestPerforce()
129
130 # Set up a perforce depot for testing.
131 depotspec = test.substitute("""\
132 # A Perforce Depot Specification.
133 Depot:  %(depot)s
134
135 Owner:  %(user)s
136
137 Date:   2003/02/19 17:21:41
138
139 Description:
140         A test depot.
141
142 Type:   local
143
144 Address:        subdir
145
146 Map:    %(depot)s/...
147 """)
148
149 test.p4(arguments='depot -i', stdin = depotspec)
150
151 # Now set up 2 clients, one to check in some files, and one to
152 # do the building.
153 clientspec = """\
154 # A Perforce Client Specification.
155 Client: %(client)s
156
157 Owner:  %(user)s
158
159 Description:
160         Created by %(user)s.
161
162 Root:   %(root)s
163
164 Options:        noallwrite noclobber nocompress unlocked nomodtime normdir
165
166 LineEnd:        local
167
168 View:
169         //%(depot)s/%(subdir)s... //%(client)s/...
170 """
171
172 clientspec1 = test.substitute(clientspec,
173                               client = 'testclient1',
174                               root = test.workpath('import'),
175                               subdir = 'foo/',
176                               )
177
178 clientspec2 = test.substitute(clientspec,
179                               client = 'testclient2',
180                               root = test.workpath('work'),
181                               subdir = '',
182                               )
183
184 test.subdir('import', ['import', 'sub'], 'work')
185
186 test.p4('client -i', stdin=clientspec1)
187 test.p4('client -i', stdin=clientspec2)
188
189 test.write(['import', 'aaa.in'], "import/aaa.in\n")
190 test.write(['import', 'bbb.in'], "import/bbb.in\n")
191 test.write(['import', 'ccc.in'], "import/ccc.in\n")
192
193 test.write(['import', 'sub', 'SConscript'], """
194 Import("env")
195 env.Cat('ddd.out', 'ddd.in')
196 env.Cat('eee.out', 'eee.in')
197 env.Cat('fff.out', 'fff.in')
198 env.Cat('all', ['ddd.out', 'eee.out', 'fff.out'])
199 """)
200
201 test.write(['import', 'sub', 'ddd.in'], "import/sub/ddd.in\n")
202 test.write(['import', 'sub', 'eee.in'], "import/sub/eee.in\n")
203 test.write(['import', 'sub', 'fff.in'], "import/sub/fff.in\n")
204
205 # Perforce uses the PWD environment variable in preference to the actual cwd
206 os.environ["PWD"] = test.workpath('import')
207 paths = [ 'aaa.in', 'bbb.in', 'ccc.in',
208           'sub/ddd.in', 'sub/eee.in', 'sub/fff.in', 'sub/SConscript' ]
209 paths = map(os.path.normpath, paths)
210 args = '-c testclient1 add -t binary %s' % string.join(paths)
211 test.p4(args, chdir='import')
212
213 changespec = test.substitute("""
214 Change: new
215
216 Client: testclient1
217
218 User:   %(user)s
219
220 Status: new
221
222 Description:
223         A test check in
224
225 Files:
226         //%(depot)s/foo/aaa.in  # add
227         //%(depot)s/foo/bbb.in  # add
228         //%(depot)s/foo/ccc.in  # add
229         //%(depot)s/foo/sub/SConscript  # add
230         //%(depot)s/foo/sub/ddd.in      # add
231         //%(depot)s/foo/sub/eee.in      # add
232         //%(depot)s/foo/sub/fff.in      # add
233 """)
234
235 test.p4('-c testclient1 opened')
236 test.p4('-c testclient1 submit -i', stdin=changespec)
237
238 SConstruct_contents = test.substitute("""
239 def cat(env, source, target):
240     target = str(target[0])
241     source = map(str, source)
242     f = open(target, "wb")
243     for src in source:
244         f.write(open(src, "rb").read())
245     f.close()
246 env = Environment(tools = ['default', 'Perforce'],
247                   BUILDERS={'Cat':Builder(action=cat)},
248                   P4=r'%(p4path)s',
249                   P4FLAGS='%(portflag)s -c testclient2')
250 env.Cat('aaa.out', 'foo/aaa.in')
251 env.Cat('bbb.out', 'foo/bbb.in')
252 env.Cat('ccc.out', 'foo/ccc.in')
253 env.Cat('all', ['aaa.out', 'bbb.out', 'ccc.out'])
254 env.SourceCode('.', env.Perforce())
255 SConscript('foo/sub/SConscript', 'env')
256 """)
257
258 test.write(['work', 'SConstruct'], SConstruct_contents)
259
260 test.subdir(['work', 'foo'])
261 test.write(['work', 'foo', 'bbb.in'], "work/foo/bbb.in\n")
262
263 test.subdir(['work', 'foo', 'sub'])
264 test.write(['work', 'foo', 'sub', 'eee.in'], "work/foo/sub/eee.in\n")
265
266 test.run(chdir = 'work', arguments = '.')
267
268 test.fail_test(test.read(['work', 'all']) != "import/aaa.in\nwork/foo/bbb.in\nimport/ccc.in\n")
269 test.fail_test(test.read(['work', 'foo', 'sub', 'all']) != "import/sub/ddd.in\nwork/foo/sub/eee.in\nimport/sub/fff.in\n")
270
271 test.pass_test()