OLD | NEW |
| (Empty) |
1 #!/usr/bin/python | |
2 # Copyright (c) 2010 The Chromium OS Authors. All rights reserved. | |
3 # Use of this source code is governed by a BSD-style license that can be | |
4 # found in the LICENSE file. | |
5 | |
6 import copy | |
7 import mox | |
8 import os | |
9 import prebuilt | |
10 import shutil | |
11 import tempfile | |
12 import unittest | |
13 import urllib | |
14 from chromite.lib import cros_build_lib | |
15 from chromite.lib.binpkg import PackageIndex | |
16 | |
17 PUBLIC_PACKAGES = [{'CPV': 'gtk+/public1', 'SHA1': '1'}, | |
18 {'CPV': 'gtk+/public2', 'SHA1': '2', | |
19 'PATH': 'gtk%2B/foo.tgz'}] | |
20 PRIVATE_PACKAGES = [{'CPV': 'private', 'SHA1': '3'}] | |
21 | |
22 | |
23 def SimplePackageIndex(header=True, packages=True): | |
24 pkgindex = PackageIndex() | |
25 if header: | |
26 pkgindex.header['URI'] = 'http://www.example.com' | |
27 if packages: | |
28 pkgindex.packages = copy.deepcopy(PUBLIC_PACKAGES + PRIVATE_PACKAGES) | |
29 return pkgindex | |
30 | |
31 | |
32 class TestUpdateFile(unittest.TestCase): | |
33 | |
34 def setUp(self): | |
35 self.contents_str = ['# comment that should be skipped', | |
36 'PKGDIR="/var/lib/portage/pkgs"', | |
37 'PORTAGE_BINHOST="http://no.thanks.com"', | |
38 'portage portage-20100310.tar.bz2', | |
39 'COMPILE_FLAGS="some_value=some_other"', | |
40 ] | |
41 temp_fd, self.version_file = tempfile.mkstemp() | |
42 os.write(temp_fd, '\n'.join(self.contents_str)) | |
43 os.close(temp_fd) | |
44 | |
45 def tearDown(self): | |
46 os.remove(self.version_file) | |
47 | |
48 def _read_version_file(self, version_file=None): | |
49 """Read the contents of self.version_file and return as a list.""" | |
50 if not version_file: | |
51 version_file = self.version_file | |
52 | |
53 version_fh = open(version_file) | |
54 try: | |
55 return [line.strip() for line in version_fh.readlines()] | |
56 finally: | |
57 version_fh.close() | |
58 | |
59 def _verify_key_pair(self, key, val): | |
60 file_contents = self._read_version_file() | |
61 # ensure key for verify is wrapped on quotes | |
62 if '"' not in val: | |
63 val = '"%s"' % val | |
64 for entry in file_contents: | |
65 if '=' not in entry: | |
66 continue | |
67 file_key, file_val = entry.split('=') | |
68 if file_key == key: | |
69 if val == file_val: | |
70 break | |
71 else: | |
72 self.fail('Could not find "%s=%s" in version file' % (key, val)) | |
73 | |
74 def testAddVariableThatDoesNotExist(self): | |
75 """Add in a new variable that was no present in the file.""" | |
76 key = 'PORTAGE_BINHOST' | |
77 value = '1234567' | |
78 prebuilt.UpdateLocalFile(self.version_file, value) | |
79 print self.version_file | |
80 current_version_str = self._read_version_file() | |
81 self._verify_key_pair(key, value) | |
82 print self.version_file | |
83 | |
84 def testUpdateVariable(self): | |
85 """Test updating a variable that already exists.""" | |
86 key, val = self.contents_str[2].split('=') | |
87 new_val = 'test_update' | |
88 self._verify_key_pair(key, val) | |
89 prebuilt.UpdateLocalFile(self.version_file, new_val) | |
90 self._verify_key_pair(key, new_val) | |
91 | |
92 def testUpdateNonExistentFile(self): | |
93 key = 'PORTAGE_BINHOST' | |
94 value = '1234567' | |
95 non_existent_file = tempfile.mktemp() | |
96 try: | |
97 prebuilt.UpdateLocalFile(non_existent_file, value) | |
98 file_contents = self._read_version_file(non_existent_file) | |
99 self.assertEqual(file_contents, ['%s=%s' % (key, value)]) | |
100 finally: | |
101 if os.path.exists(non_existent_file): | |
102 os.remove(non_existent_file) | |
103 | |
104 | |
105 class TestPrebuiltFilters(unittest.TestCase): | |
106 | |
107 def setUp(self): | |
108 self.tmp_dir = tempfile.mkdtemp() | |
109 self.private_dir = os.path.join(self.tmp_dir, | |
110 prebuilt._PRIVATE_OVERLAY_DIR) | |
111 self.private_structure_base = 'chromeos-overlay/chromeos-base' | |
112 self.private_pkgs = ['test-package/salt-flavor-0.1.r3.ebuild', | |
113 'easy/alpha_beta-0.1.41.r3.ebuild', | |
114 'dev/j-t-r-0.1.r3.ebuild',] | |
115 self.expected_filters = set(['salt-flavor', 'alpha_beta', 'j-t-r']) | |
116 | |
117 def tearDown(self): | |
118 if self.tmp_dir: | |
119 shutil.rmtree(self.tmp_dir) | |
120 | |
121 def _CreateNestedDir(self, tmp_dir, dir_structure): | |
122 for entry in dir_structure: | |
123 full_path = os.path.join(os.path.join(tmp_dir, entry)) | |
124 # ensure dirs are created | |
125 try: | |
126 os.makedirs(os.path.dirname(full_path)) | |
127 if full_path.endswith('/'): | |
128 # we only want to create directories | |
129 return | |
130 except OSError, err: | |
131 if err.errno == errno.EEXIST: | |
132 # we don't care if the dir already exists | |
133 pass | |
134 else: | |
135 raise | |
136 # create dummy files | |
137 tmp = open(full_path, 'w') | |
138 tmp.close() | |
139 | |
140 def _LoadPrivateMockFilters(self): | |
141 """Load mock filters as defined in the setUp function.""" | |
142 dir_structure = [os.path.join(self.private_structure_base, entry) | |
143 for entry in self.private_pkgs] | |
144 | |
145 self._CreateNestedDir(self.private_dir, dir_structure) | |
146 prebuilt.LoadPrivateFilters(self.tmp_dir) | |
147 | |
148 def testFilterPattern(self): | |
149 """Check that particular packages are filtered properly.""" | |
150 self._LoadPrivateMockFilters() | |
151 packages = ['/some/dir/area/j-t-r-0.1.r3.tbz', | |
152 '/var/pkgs/new/alpha_beta-0.2.3.4.tbz', | |
153 '/usr/local/cache/good-0.1.3.tbz', | |
154 '/usr-blah/b_d/salt-flavor-0.0.3.tbz'] | |
155 expected_list = ['/usr/local/cache/good-0.1.3.tbz'] | |
156 filtered_list = [file for file in packages if not | |
157 prebuilt.ShouldFilterPackage(file)] | |
158 self.assertEqual(expected_list, filtered_list) | |
159 | |
160 def testLoadPrivateFilters(self): | |
161 self._LoadPrivateMockFilters() | |
162 prebuilt.LoadPrivateFilters(self.tmp_dir) | |
163 self.assertEqual(self.expected_filters, prebuilt._FILTER_PACKAGES) | |
164 | |
165 def testEmptyFiltersErrors(self): | |
166 """Ensure LoadPrivateFilters errors if an empty list is generated.""" | |
167 os.makedirs(os.path.join(self.tmp_dir, prebuilt._PRIVATE_OVERLAY_DIR)) | |
168 self.assertRaises(prebuilt.FiltersEmpty, prebuilt.LoadPrivateFilters, | |
169 self.tmp_dir) | |
170 | |
171 | |
172 class TestPrebuilt(unittest.TestCase): | |
173 | |
174 def setUp(self): | |
175 self.mox = mox.Mox() | |
176 | |
177 def tearDown(self): | |
178 self.mox.UnsetStubs() | |
179 self.mox.VerifyAll() | |
180 | |
181 def testGenerateUploadDict(self): | |
182 base_local_path = '/b/cbuild/build/chroot/build/x86-dogfood/' | |
183 gs_bucket_path = 'gs://chromeos-prebuilt/host/version' | |
184 local_path = os.path.join(base_local_path, 'public1.tbz2') | |
185 self.mox.StubOutWithMock(prebuilt.os.path, 'exists') | |
186 prebuilt.os.path.exists(local_path).AndReturn(True) | |
187 self.mox.ReplayAll() | |
188 pkgs = [{ 'CPV': 'public1' }] | |
189 result = prebuilt.GenerateUploadDict(base_local_path, gs_bucket_path, pkgs) | |
190 expected = { local_path: gs_bucket_path + '/public1.tbz2' } | |
191 self.assertEqual(result, expected) | |
192 | |
193 def testFailonUploadFail(self): | |
194 """Make sure we fail if one of the upload processes fail.""" | |
195 files = {'test': '/uasd'} | |
196 self.assertEqual(prebuilt.RemoteUpload(files), set([('test', '/uasd')])) | |
197 | |
198 def testDeterminePrebuiltConfHost(self): | |
199 """Test that the host prebuilt path comes back properly.""" | |
200 expected_path = os.path.join(prebuilt._PREBUILT_MAKE_CONF['amd64']) | |
201 self.assertEqual(prebuilt.DeterminePrebuiltConfFile('fake_path', 'amd64'), | |
202 expected_path) | |
203 | |
204 def testDeterminePrebuiltConf(self): | |
205 """Test the different known variants of boards for proper path discovery.""" | |
206 fake_path = '/b/cbuild' | |
207 script_path = os.path.join(fake_path, 'src/scripts/bin') | |
208 public_overlay_path = os.path.join(fake_path, 'src/overlays') | |
209 private_overlay_path = os.path.join(fake_path, | |
210 prebuilt._PRIVATE_OVERLAY_DIR) | |
211 path_dict = {'private_overlay_path': private_overlay_path, | |
212 'public_overlay_path': public_overlay_path} | |
213 # format for targets | |
214 # board target key in dictionar | |
215 # Tuple containing cmd run, expected results as cmd obj, and expected output | |
216 | |
217 # Mock output from cros_overlay_list | |
218 x86_out = ('%(private_overlay_path)s/chromeos-overlay\n' | |
219 '%(public_overlay_path)s/overlay-x86-generic\n' % path_dict) | |
220 | |
221 x86_cmd = './cros_overlay_list --board x86-generic' | |
222 x86_expected_path = os.path.join(public_overlay_path, 'overlay-x86-generic', | |
223 'prebuilt.conf') | |
224 # Mock output from cros_overlay_list | |
225 tegra2_out = ('%(private_overlay_path)s/chromeos-overlay\n' | |
226 '%(public_overlay_path)s/overlay-tegra2\n' | |
227 '%(public_overlay_path)s/overlay-variant-tegra2-seaboard\n' | |
228 '%(private_overlay_path)s/overlay-tegra2-private\n' | |
229 '%(private_overlay_path)s/' | |
230 'overlay-variant-tegra2-seaboard-private\n' % path_dict) | |
231 tegra2_cmd = './cros_overlay_list --board tegra2 --variant seaboard' | |
232 tegra2_expected_path = os.path.join( | |
233 private_overlay_path, 'overlay-variant-tegra2-seaboard-private', | |
234 'prebuilt.conf') | |
235 | |
236 | |
237 targets = {'x86-generic': {'cmd': x86_cmd, | |
238 'output': x86_out, | |
239 'result': x86_expected_path}, | |
240 'tegra2_seaboard': {'cmd': tegra2_cmd, | |
241 'output': tegra2_out, | |
242 'result': tegra2_expected_path} | |
243 } | |
244 | |
245 self.mox.StubOutWithMock(prebuilt.cros_build_lib, 'RunCommand') | |
246 for target, expected_results in targets.iteritems(): | |
247 # create command object for output | |
248 cmd_result_obj = cros_build_lib.CommandResult() | |
249 cmd_result_obj.output = expected_results['output'] | |
250 prebuilt.cros_build_lib.RunCommand( | |
251 expected_results['cmd'].split(), redirect_stdout=True, | |
252 cwd=script_path).AndReturn(cmd_result_obj) | |
253 | |
254 self.mox.ReplayAll() | |
255 for target, expected_results in targets.iteritems(): | |
256 self.assertEqual( | |
257 prebuilt.DeterminePrebuiltConfFile(fake_path, target), | |
258 expected_results['result']) | |
259 | |
260 def testDeterminePrebuiltConfGarbage(self): | |
261 """Ensure an exception is raised on bad input.""" | |
262 self.assertRaises(prebuilt.UnknownBoardFormat, | |
263 prebuilt.DeterminePrebuiltConfFile, | |
264 'fake_path', 'asdfasdf') | |
265 | |
266 | |
267 class TestPackagesFileFiltering(unittest.TestCase): | |
268 | |
269 def testFilterPkgIndex(self): | |
270 pkgindex = SimplePackageIndex() | |
271 pkgindex.RemoveFilteredPackages(lambda pkg: pkg in PRIVATE_PACKAGES) | |
272 self.assertEqual(pkgindex.packages, PUBLIC_PACKAGES) | |
273 self.assertEqual(pkgindex.modified, True) | |
274 | |
275 | |
276 class TestPopulateDuplicateDB(unittest.TestCase): | |
277 | |
278 def testEmptyIndex(self): | |
279 pkgindex = SimplePackageIndex(packages=False) | |
280 db = {} | |
281 pkgindex._PopulateDuplicateDB(db) | |
282 self.assertEqual(db, {}) | |
283 | |
284 def testNormalIndex(self): | |
285 pkgindex = SimplePackageIndex() | |
286 db = {} | |
287 pkgindex._PopulateDuplicateDB(db) | |
288 self.assertEqual(len(db), 3) | |
289 self.assertEqual(db['1'], 'http://www.example.com/gtk%2B/public1.tbz2') | |
290 self.assertEqual(db['2'], 'http://www.example.com/gtk%2B/foo.tgz') | |
291 self.assertEqual(db['3'], 'http://www.example.com/private.tbz2') | |
292 | |
293 def testMissingSHA1(self): | |
294 db = {} | |
295 pkgindex = SimplePackageIndex() | |
296 del pkgindex.packages[0]['SHA1'] | |
297 pkgindex._PopulateDuplicateDB(db) | |
298 self.assertEqual(len(db), 2) | |
299 self.assertEqual(db['2'], 'http://www.example.com/gtk%2B/foo.tgz') | |
300 self.assertEqual(db['3'], 'http://www.example.com/private.tbz2') | |
301 | |
302 def testFailedPopulate(self): | |
303 db = {} | |
304 pkgindex = SimplePackageIndex(header=False) | |
305 self.assertRaises(KeyError, pkgindex._PopulateDuplicateDB, db) | |
306 pkgindex = SimplePackageIndex() | |
307 del pkgindex.packages[0]['CPV'] | |
308 self.assertRaises(KeyError, pkgindex._PopulateDuplicateDB, db) | |
309 | |
310 | |
311 class TestResolveDuplicateUploads(unittest.TestCase): | |
312 | |
313 def testEmptyList(self): | |
314 pkgindex = SimplePackageIndex() | |
315 pristine = SimplePackageIndex() | |
316 uploads = pkgindex.ResolveDuplicateUploads([]) | |
317 self.assertEqual(uploads, pristine.packages) | |
318 self.assertEqual(pkgindex.packages, pristine.packages) | |
319 self.assertEqual(pkgindex.modified, False) | |
320 | |
321 def testEmptyIndex(self): | |
322 pkgindex = SimplePackageIndex() | |
323 pristine = SimplePackageIndex() | |
324 empty = SimplePackageIndex(packages=False) | |
325 uploads = pkgindex.ResolveDuplicateUploads([empty]) | |
326 self.assertEqual(uploads, pristine.packages) | |
327 self.assertEqual(pkgindex.packages, pristine.packages) | |
328 self.assertEqual(pkgindex.modified, False) | |
329 | |
330 def testDuplicates(self): | |
331 pkgindex = SimplePackageIndex() | |
332 dup_pkgindex = SimplePackageIndex() | |
333 expected_pkgindex = SimplePackageIndex() | |
334 for pkg in expected_pkgindex.packages: | |
335 pkg.setdefault('PATH', urllib.quote(pkg['CPV'] + '.tbz2')) | |
336 uploads = pkgindex.ResolveDuplicateUploads([dup_pkgindex]) | |
337 self.assertEqual(pkgindex.packages, expected_pkgindex.packages) | |
338 | |
339 def testMissingSHA1(self): | |
340 db = {} | |
341 pkgindex = SimplePackageIndex() | |
342 dup_pkgindex = SimplePackageIndex() | |
343 expected_pkgindex = SimplePackageIndex() | |
344 del pkgindex.packages[0]['SHA1'] | |
345 del expected_pkgindex.packages[0]['SHA1'] | |
346 for pkg in expected_pkgindex.packages[1:]: | |
347 pkg.setdefault('PATH', pkg['CPV'] + '.tbz2') | |
348 uploads = pkgindex.ResolveDuplicateUploads([dup_pkgindex]) | |
349 self.assertEqual(pkgindex.packages, expected_pkgindex.packages) | |
350 | |
351 | |
352 class TestWritePackageIndex(unittest.TestCase): | |
353 | |
354 def setUp(self): | |
355 self.mox = mox.Mox() | |
356 | |
357 def tearDown(self): | |
358 self.mox.UnsetStubs() | |
359 self.mox.VerifyAll() | |
360 | |
361 def testSimple(self): | |
362 pkgindex = SimplePackageIndex() | |
363 self.mox.StubOutWithMock(pkgindex, 'Write') | |
364 pkgindex.Write(mox.IgnoreArg()) | |
365 self.mox.ReplayAll() | |
366 f = pkgindex.WriteToNamedTemporaryFile() | |
367 self.assertEqual(f.read(), '') | |
368 | |
369 | |
370 if __name__ == '__main__': | |
371 unittest.main() | |
OLD | NEW |