| OLD | NEW |
| (Empty) |
| 1 #!/usr/bin/python | |
| 2 # Copyright (c) 2010 The Chromium OS Authors. All rights reserved. | |
| 3 # Use of this source code is governed by a BSD-style license that can be | |
| 4 # found in the LICENSE file. | |
| 5 | |
| 6 import copy | |
| 7 import mox | |
| 8 import os | |
| 9 import prebuilt | |
| 10 import shutil | |
| 11 import tempfile | |
| 12 import unittest | |
| 13 import urllib | |
| 14 from chromite.lib import cros_build_lib | |
| 15 from chromite.lib.binpkg import PackageIndex | |
| 16 | |
| 17 PUBLIC_PACKAGES = [{'CPV': 'gtk+/public1', 'SHA1': '1'}, | |
| 18 {'CPV': 'gtk+/public2', 'SHA1': '2', | |
| 19 'PATH': 'gtk%2B/foo.tgz'}] | |
| 20 PRIVATE_PACKAGES = [{'CPV': 'private', 'SHA1': '3'}] | |
| 21 | |
| 22 | |
| 23 def SimplePackageIndex(header=True, packages=True): | |
| 24 pkgindex = PackageIndex() | |
| 25 if header: | |
| 26 pkgindex.header['URI'] = 'http://www.example.com' | |
| 27 if packages: | |
| 28 pkgindex.packages = copy.deepcopy(PUBLIC_PACKAGES + PRIVATE_PACKAGES) | |
| 29 return pkgindex | |
| 30 | |
| 31 | |
| 32 class TestUpdateFile(unittest.TestCase): | |
| 33 | |
| 34 def setUp(self): | |
| 35 self.contents_str = ['# comment that should be skipped', | |
| 36 'PKGDIR="/var/lib/portage/pkgs"', | |
| 37 'PORTAGE_BINHOST="http://no.thanks.com"', | |
| 38 'portage portage-20100310.tar.bz2', | |
| 39 'COMPILE_FLAGS="some_value=some_other"', | |
| 40 ] | |
| 41 temp_fd, self.version_file = tempfile.mkstemp() | |
| 42 os.write(temp_fd, '\n'.join(self.contents_str)) | |
| 43 os.close(temp_fd) | |
| 44 | |
| 45 def tearDown(self): | |
| 46 os.remove(self.version_file) | |
| 47 | |
| 48 def _read_version_file(self, version_file=None): | |
| 49 """Read the contents of self.version_file and return as a list.""" | |
| 50 if not version_file: | |
| 51 version_file = self.version_file | |
| 52 | |
| 53 version_fh = open(version_file) | |
| 54 try: | |
| 55 return [line.strip() for line in version_fh.readlines()] | |
| 56 finally: | |
| 57 version_fh.close() | |
| 58 | |
| 59 def _verify_key_pair(self, key, val): | |
| 60 file_contents = self._read_version_file() | |
| 61 # ensure key for verify is wrapped on quotes | |
| 62 if '"' not in val: | |
| 63 val = '"%s"' % val | |
| 64 for entry in file_contents: | |
| 65 if '=' not in entry: | |
| 66 continue | |
| 67 file_key, file_val = entry.split('=') | |
| 68 if file_key == key: | |
| 69 if val == file_val: | |
| 70 break | |
| 71 else: | |
| 72 self.fail('Could not find "%s=%s" in version file' % (key, val)) | |
| 73 | |
| 74 def testAddVariableThatDoesNotExist(self): | |
| 75 """Add in a new variable that was no present in the file.""" | |
| 76 key = 'PORTAGE_BINHOST' | |
| 77 value = '1234567' | |
| 78 prebuilt.UpdateLocalFile(self.version_file, value) | |
| 79 print self.version_file | |
| 80 current_version_str = self._read_version_file() | |
| 81 self._verify_key_pair(key, value) | |
| 82 print self.version_file | |
| 83 | |
| 84 def testUpdateVariable(self): | |
| 85 """Test updating a variable that already exists.""" | |
| 86 key, val = self.contents_str[2].split('=') | |
| 87 new_val = 'test_update' | |
| 88 self._verify_key_pair(key, val) | |
| 89 prebuilt.UpdateLocalFile(self.version_file, new_val) | |
| 90 self._verify_key_pair(key, new_val) | |
| 91 | |
| 92 def testUpdateNonExistentFile(self): | |
| 93 key = 'PORTAGE_BINHOST' | |
| 94 value = '1234567' | |
| 95 non_existent_file = tempfile.mktemp() | |
| 96 try: | |
| 97 prebuilt.UpdateLocalFile(non_existent_file, value) | |
| 98 file_contents = self._read_version_file(non_existent_file) | |
| 99 self.assertEqual(file_contents, ['%s=%s' % (key, value)]) | |
| 100 finally: | |
| 101 if os.path.exists(non_existent_file): | |
| 102 os.remove(non_existent_file) | |
| 103 | |
| 104 | |
| 105 class TestPrebuiltFilters(unittest.TestCase): | |
| 106 | |
| 107 def setUp(self): | |
| 108 self.tmp_dir = tempfile.mkdtemp() | |
| 109 self.private_dir = os.path.join(self.tmp_dir, | |
| 110 prebuilt._PRIVATE_OVERLAY_DIR) | |
| 111 self.private_structure_base = 'chromeos-overlay/chromeos-base' | |
| 112 self.private_pkgs = ['test-package/salt-flavor-0.1.r3.ebuild', | |
| 113 'easy/alpha_beta-0.1.41.r3.ebuild', | |
| 114 'dev/j-t-r-0.1.r3.ebuild',] | |
| 115 self.expected_filters = set(['salt-flavor', 'alpha_beta', 'j-t-r']) | |
| 116 | |
| 117 def tearDown(self): | |
| 118 if self.tmp_dir: | |
| 119 shutil.rmtree(self.tmp_dir) | |
| 120 | |
| 121 def _CreateNestedDir(self, tmp_dir, dir_structure): | |
| 122 for entry in dir_structure: | |
| 123 full_path = os.path.join(os.path.join(tmp_dir, entry)) | |
| 124 # ensure dirs are created | |
| 125 try: | |
| 126 os.makedirs(os.path.dirname(full_path)) | |
| 127 if full_path.endswith('/'): | |
| 128 # we only want to create directories | |
| 129 return | |
| 130 except OSError, err: | |
| 131 if err.errno == errno.EEXIST: | |
| 132 # we don't care if the dir already exists | |
| 133 pass | |
| 134 else: | |
| 135 raise | |
| 136 # create dummy files | |
| 137 tmp = open(full_path, 'w') | |
| 138 tmp.close() | |
| 139 | |
| 140 def _LoadPrivateMockFilters(self): | |
| 141 """Load mock filters as defined in the setUp function.""" | |
| 142 dir_structure = [os.path.join(self.private_structure_base, entry) | |
| 143 for entry in self.private_pkgs] | |
| 144 | |
| 145 self._CreateNestedDir(self.private_dir, dir_structure) | |
| 146 prebuilt.LoadPrivateFilters(self.tmp_dir) | |
| 147 | |
| 148 def testFilterPattern(self): | |
| 149 """Check that particular packages are filtered properly.""" | |
| 150 self._LoadPrivateMockFilters() | |
| 151 packages = ['/some/dir/area/j-t-r-0.1.r3.tbz', | |
| 152 '/var/pkgs/new/alpha_beta-0.2.3.4.tbz', | |
| 153 '/usr/local/cache/good-0.1.3.tbz', | |
| 154 '/usr-blah/b_d/salt-flavor-0.0.3.tbz'] | |
| 155 expected_list = ['/usr/local/cache/good-0.1.3.tbz'] | |
| 156 filtered_list = [file for file in packages if not | |
| 157 prebuilt.ShouldFilterPackage(file)] | |
| 158 self.assertEqual(expected_list, filtered_list) | |
| 159 | |
| 160 def testLoadPrivateFilters(self): | |
| 161 self._LoadPrivateMockFilters() | |
| 162 prebuilt.LoadPrivateFilters(self.tmp_dir) | |
| 163 self.assertEqual(self.expected_filters, prebuilt._FILTER_PACKAGES) | |
| 164 | |
| 165 def testEmptyFiltersErrors(self): | |
| 166 """Ensure LoadPrivateFilters errors if an empty list is generated.""" | |
| 167 os.makedirs(os.path.join(self.tmp_dir, prebuilt._PRIVATE_OVERLAY_DIR)) | |
| 168 self.assertRaises(prebuilt.FiltersEmpty, prebuilt.LoadPrivateFilters, | |
| 169 self.tmp_dir) | |
| 170 | |
| 171 | |
| 172 class TestPrebuilt(unittest.TestCase): | |
| 173 | |
| 174 def setUp(self): | |
| 175 self.mox = mox.Mox() | |
| 176 | |
| 177 def tearDown(self): | |
| 178 self.mox.UnsetStubs() | |
| 179 self.mox.VerifyAll() | |
| 180 | |
| 181 def testGenerateUploadDict(self): | |
| 182 base_local_path = '/b/cbuild/build/chroot/build/x86-dogfood/' | |
| 183 gs_bucket_path = 'gs://chromeos-prebuilt/host/version' | |
| 184 local_path = os.path.join(base_local_path, 'public1.tbz2') | |
| 185 self.mox.StubOutWithMock(prebuilt.os.path, 'exists') | |
| 186 prebuilt.os.path.exists(local_path).AndReturn(True) | |
| 187 self.mox.ReplayAll() | |
| 188 pkgs = [{ 'CPV': 'public1' }] | |
| 189 result = prebuilt.GenerateUploadDict(base_local_path, gs_bucket_path, pkgs) | |
| 190 expected = { local_path: gs_bucket_path + '/public1.tbz2' } | |
| 191 self.assertEqual(result, expected) | |
| 192 | |
| 193 def testFailonUploadFail(self): | |
| 194 """Make sure we fail if one of the upload processes fail.""" | |
| 195 files = {'test': '/uasd'} | |
| 196 self.assertEqual(prebuilt.RemoteUpload(files), set([('test', '/uasd')])) | |
| 197 | |
| 198 def testDeterminePrebuiltConfHost(self): | |
| 199 """Test that the host prebuilt path comes back properly.""" | |
| 200 expected_path = os.path.join(prebuilt._PREBUILT_MAKE_CONF['amd64']) | |
| 201 self.assertEqual(prebuilt.DeterminePrebuiltConfFile('fake_path', 'amd64'), | |
| 202 expected_path) | |
| 203 | |
| 204 def testDeterminePrebuiltConf(self): | |
| 205 """Test the different known variants of boards for proper path discovery.""" | |
| 206 fake_path = '/b/cbuild' | |
| 207 script_path = os.path.join(fake_path, 'src/scripts/bin') | |
| 208 public_overlay_path = os.path.join(fake_path, 'src/overlays') | |
| 209 private_overlay_path = os.path.join(fake_path, | |
| 210 prebuilt._PRIVATE_OVERLAY_DIR) | |
| 211 path_dict = {'private_overlay_path': private_overlay_path, | |
| 212 'public_overlay_path': public_overlay_path} | |
| 213 # format for targets | |
| 214 # board target key in dictionar | |
| 215 # Tuple containing cmd run, expected results as cmd obj, and expected output | |
| 216 | |
| 217 # Mock output from cros_overlay_list | |
| 218 x86_out = ('%(private_overlay_path)s/chromeos-overlay\n' | |
| 219 '%(public_overlay_path)s/overlay-x86-generic\n' % path_dict) | |
| 220 | |
| 221 x86_cmd = './cros_overlay_list --board x86-generic' | |
| 222 x86_expected_path = os.path.join(public_overlay_path, 'overlay-x86-generic', | |
| 223 'prebuilt.conf') | |
| 224 # Mock output from cros_overlay_list | |
| 225 tegra2_out = ('%(private_overlay_path)s/chromeos-overlay\n' | |
| 226 '%(public_overlay_path)s/overlay-tegra2\n' | |
| 227 '%(public_overlay_path)s/overlay-variant-tegra2-seaboard\n' | |
| 228 '%(private_overlay_path)s/overlay-tegra2-private\n' | |
| 229 '%(private_overlay_path)s/' | |
| 230 'overlay-variant-tegra2-seaboard-private\n' % path_dict) | |
| 231 tegra2_cmd = './cros_overlay_list --board tegra2 --variant seaboard' | |
| 232 tegra2_expected_path = os.path.join( | |
| 233 private_overlay_path, 'overlay-variant-tegra2-seaboard-private', | |
| 234 'prebuilt.conf') | |
| 235 | |
| 236 | |
| 237 targets = {'x86-generic': {'cmd': x86_cmd, | |
| 238 'output': x86_out, | |
| 239 'result': x86_expected_path}, | |
| 240 'tegra2_seaboard': {'cmd': tegra2_cmd, | |
| 241 'output': tegra2_out, | |
| 242 'result': tegra2_expected_path} | |
| 243 } | |
| 244 | |
| 245 self.mox.StubOutWithMock(prebuilt.cros_build_lib, 'RunCommand') | |
| 246 for target, expected_results in targets.iteritems(): | |
| 247 # create command object for output | |
| 248 cmd_result_obj = cros_build_lib.CommandResult() | |
| 249 cmd_result_obj.output = expected_results['output'] | |
| 250 prebuilt.cros_build_lib.RunCommand( | |
| 251 expected_results['cmd'].split(), redirect_stdout=True, | |
| 252 cwd=script_path).AndReturn(cmd_result_obj) | |
| 253 | |
| 254 self.mox.ReplayAll() | |
| 255 for target, expected_results in targets.iteritems(): | |
| 256 self.assertEqual( | |
| 257 prebuilt.DeterminePrebuiltConfFile(fake_path, target), | |
| 258 expected_results['result']) | |
| 259 | |
| 260 def testDeterminePrebuiltConfGarbage(self): | |
| 261 """Ensure an exception is raised on bad input.""" | |
| 262 self.assertRaises(prebuilt.UnknownBoardFormat, | |
| 263 prebuilt.DeterminePrebuiltConfFile, | |
| 264 'fake_path', 'asdfasdf') | |
| 265 | |
| 266 | |
| 267 class TestPackagesFileFiltering(unittest.TestCase): | |
| 268 | |
| 269 def testFilterPkgIndex(self): | |
| 270 pkgindex = SimplePackageIndex() | |
| 271 pkgindex.RemoveFilteredPackages(lambda pkg: pkg in PRIVATE_PACKAGES) | |
| 272 self.assertEqual(pkgindex.packages, PUBLIC_PACKAGES) | |
| 273 self.assertEqual(pkgindex.modified, True) | |
| 274 | |
| 275 | |
| 276 class TestPopulateDuplicateDB(unittest.TestCase): | |
| 277 | |
| 278 def testEmptyIndex(self): | |
| 279 pkgindex = SimplePackageIndex(packages=False) | |
| 280 db = {} | |
| 281 pkgindex._PopulateDuplicateDB(db) | |
| 282 self.assertEqual(db, {}) | |
| 283 | |
| 284 def testNormalIndex(self): | |
| 285 pkgindex = SimplePackageIndex() | |
| 286 db = {} | |
| 287 pkgindex._PopulateDuplicateDB(db) | |
| 288 self.assertEqual(len(db), 3) | |
| 289 self.assertEqual(db['1'], 'http://www.example.com/gtk%2B/public1.tbz2') | |
| 290 self.assertEqual(db['2'], 'http://www.example.com/gtk%2B/foo.tgz') | |
| 291 self.assertEqual(db['3'], 'http://www.example.com/private.tbz2') | |
| 292 | |
| 293 def testMissingSHA1(self): | |
| 294 db = {} | |
| 295 pkgindex = SimplePackageIndex() | |
| 296 del pkgindex.packages[0]['SHA1'] | |
| 297 pkgindex._PopulateDuplicateDB(db) | |
| 298 self.assertEqual(len(db), 2) | |
| 299 self.assertEqual(db['2'], 'http://www.example.com/gtk%2B/foo.tgz') | |
| 300 self.assertEqual(db['3'], 'http://www.example.com/private.tbz2') | |
| 301 | |
| 302 def testFailedPopulate(self): | |
| 303 db = {} | |
| 304 pkgindex = SimplePackageIndex(header=False) | |
| 305 self.assertRaises(KeyError, pkgindex._PopulateDuplicateDB, db) | |
| 306 pkgindex = SimplePackageIndex() | |
| 307 del pkgindex.packages[0]['CPV'] | |
| 308 self.assertRaises(KeyError, pkgindex._PopulateDuplicateDB, db) | |
| 309 | |
| 310 | |
| 311 class TestResolveDuplicateUploads(unittest.TestCase): | |
| 312 | |
| 313 def testEmptyList(self): | |
| 314 pkgindex = SimplePackageIndex() | |
| 315 pristine = SimplePackageIndex() | |
| 316 uploads = pkgindex.ResolveDuplicateUploads([]) | |
| 317 self.assertEqual(uploads, pristine.packages) | |
| 318 self.assertEqual(pkgindex.packages, pristine.packages) | |
| 319 self.assertEqual(pkgindex.modified, False) | |
| 320 | |
| 321 def testEmptyIndex(self): | |
| 322 pkgindex = SimplePackageIndex() | |
| 323 pristine = SimplePackageIndex() | |
| 324 empty = SimplePackageIndex(packages=False) | |
| 325 uploads = pkgindex.ResolveDuplicateUploads([empty]) | |
| 326 self.assertEqual(uploads, pristine.packages) | |
| 327 self.assertEqual(pkgindex.packages, pristine.packages) | |
| 328 self.assertEqual(pkgindex.modified, False) | |
| 329 | |
| 330 def testDuplicates(self): | |
| 331 pkgindex = SimplePackageIndex() | |
| 332 dup_pkgindex = SimplePackageIndex() | |
| 333 expected_pkgindex = SimplePackageIndex() | |
| 334 for pkg in expected_pkgindex.packages: | |
| 335 pkg.setdefault('PATH', urllib.quote(pkg['CPV'] + '.tbz2')) | |
| 336 uploads = pkgindex.ResolveDuplicateUploads([dup_pkgindex]) | |
| 337 self.assertEqual(pkgindex.packages, expected_pkgindex.packages) | |
| 338 | |
| 339 def testMissingSHA1(self): | |
| 340 db = {} | |
| 341 pkgindex = SimplePackageIndex() | |
| 342 dup_pkgindex = SimplePackageIndex() | |
| 343 expected_pkgindex = SimplePackageIndex() | |
| 344 del pkgindex.packages[0]['SHA1'] | |
| 345 del expected_pkgindex.packages[0]['SHA1'] | |
| 346 for pkg in expected_pkgindex.packages[1:]: | |
| 347 pkg.setdefault('PATH', pkg['CPV'] + '.tbz2') | |
| 348 uploads = pkgindex.ResolveDuplicateUploads([dup_pkgindex]) | |
| 349 self.assertEqual(pkgindex.packages, expected_pkgindex.packages) | |
| 350 | |
| 351 | |
| 352 class TestWritePackageIndex(unittest.TestCase): | |
| 353 | |
| 354 def setUp(self): | |
| 355 self.mox = mox.Mox() | |
| 356 | |
| 357 def tearDown(self): | |
| 358 self.mox.UnsetStubs() | |
| 359 self.mox.VerifyAll() | |
| 360 | |
| 361 def testSimple(self): | |
| 362 pkgindex = SimplePackageIndex() | |
| 363 self.mox.StubOutWithMock(pkgindex, 'Write') | |
| 364 pkgindex.Write(mox.IgnoreArg()) | |
| 365 self.mox.ReplayAll() | |
| 366 f = pkgindex.WriteToNamedTemporaryFile() | |
| 367 self.assertEqual(f.read(), '') | |
| 368 | |
| 369 | |
| 370 if __name__ == '__main__': | |
| 371 unittest.main() | |
| OLD | NEW |