| Index: prebuilt_unittest.py
|
| diff --git a/prebuilt_unittest.py b/prebuilt_unittest.py
|
| deleted file mode 100755
|
| index f5b3c63b5281dadfbfc97f8d1bb7db3acbf97602..0000000000000000000000000000000000000000
|
| --- a/prebuilt_unittest.py
|
| +++ /dev/null
|
| @@ -1,371 +0,0 @@
|
| -#!/usr/bin/python
|
| -# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
|
| -# Use of this source code is governed by a BSD-style license that can be
|
| -# found in the LICENSE file.
|
| -
|
| -import copy
|
| -import mox
|
| -import os
|
| -import prebuilt
|
| -import shutil
|
| -import tempfile
|
| -import unittest
|
| -import urllib
|
| -from chromite.lib import cros_build_lib
|
| -from chromite.lib.binpkg import PackageIndex
|
| -
|
| -PUBLIC_PACKAGES = [{'CPV': 'gtk+/public1', 'SHA1': '1'},
|
| - {'CPV': 'gtk+/public2', 'SHA1': '2',
|
| - 'PATH': 'gtk%2B/foo.tgz'}]
|
| -PRIVATE_PACKAGES = [{'CPV': 'private', 'SHA1': '3'}]
|
| -
|
| -
|
| -def SimplePackageIndex(header=True, packages=True):
|
| - pkgindex = PackageIndex()
|
| - if header:
|
| - pkgindex.header['URI'] = 'http://www.example.com'
|
| - if packages:
|
| - pkgindex.packages = copy.deepcopy(PUBLIC_PACKAGES + PRIVATE_PACKAGES)
|
| - return pkgindex
|
| -
|
| -
|
| -class TestUpdateFile(unittest.TestCase):
|
| -
|
| - def setUp(self):
|
| - self.contents_str = ['# comment that should be skipped',
|
| - 'PKGDIR="/var/lib/portage/pkgs"',
|
| - 'PORTAGE_BINHOST="http://no.thanks.com"',
|
| - 'portage portage-20100310.tar.bz2',
|
| - 'COMPILE_FLAGS="some_value=some_other"',
|
| - ]
|
| - temp_fd, self.version_file = tempfile.mkstemp()
|
| - os.write(temp_fd, '\n'.join(self.contents_str))
|
| - os.close(temp_fd)
|
| -
|
| - def tearDown(self):
|
| - os.remove(self.version_file)
|
| -
|
| - def _read_version_file(self, version_file=None):
|
| - """Read the contents of self.version_file and return as a list."""
|
| - if not version_file:
|
| - version_file = self.version_file
|
| -
|
| - version_fh = open(version_file)
|
| - try:
|
| - return [line.strip() for line in version_fh.readlines()]
|
| - finally:
|
| - version_fh.close()
|
| -
|
| - def _verify_key_pair(self, key, val):
|
| - file_contents = self._read_version_file()
|
| - # ensure key for verify is wrapped on quotes
|
| - if '"' not in val:
|
| - val = '"%s"' % val
|
| - for entry in file_contents:
|
| - if '=' not in entry:
|
| - continue
|
| - file_key, file_val = entry.split('=')
|
| - if file_key == key:
|
| - if val == file_val:
|
| - break
|
| - else:
|
| - self.fail('Could not find "%s=%s" in version file' % (key, val))
|
| -
|
| - def testAddVariableThatDoesNotExist(self):
|
| - """Add in a new variable that was no present in the file."""
|
| - key = 'PORTAGE_BINHOST'
|
| - value = '1234567'
|
| - prebuilt.UpdateLocalFile(self.version_file, value)
|
| - print self.version_file
|
| - current_version_str = self._read_version_file()
|
| - self._verify_key_pair(key, value)
|
| - print self.version_file
|
| -
|
| - def testUpdateVariable(self):
|
| - """Test updating a variable that already exists."""
|
| - key, val = self.contents_str[2].split('=')
|
| - new_val = 'test_update'
|
| - self._verify_key_pair(key, val)
|
| - prebuilt.UpdateLocalFile(self.version_file, new_val)
|
| - self._verify_key_pair(key, new_val)
|
| -
|
| - def testUpdateNonExistentFile(self):
|
| - key = 'PORTAGE_BINHOST'
|
| - value = '1234567'
|
| - non_existent_file = tempfile.mktemp()
|
| - try:
|
| - prebuilt.UpdateLocalFile(non_existent_file, value)
|
| - file_contents = self._read_version_file(non_existent_file)
|
| - self.assertEqual(file_contents, ['%s=%s' % (key, value)])
|
| - finally:
|
| - if os.path.exists(non_existent_file):
|
| - os.remove(non_existent_file)
|
| -
|
| -
|
| -class TestPrebuiltFilters(unittest.TestCase):
|
| -
|
| - def setUp(self):
|
| - self.tmp_dir = tempfile.mkdtemp()
|
| - self.private_dir = os.path.join(self.tmp_dir,
|
| - prebuilt._PRIVATE_OVERLAY_DIR)
|
| - self.private_structure_base = 'chromeos-overlay/chromeos-base'
|
| - self.private_pkgs = ['test-package/salt-flavor-0.1.r3.ebuild',
|
| - 'easy/alpha_beta-0.1.41.r3.ebuild',
|
| - 'dev/j-t-r-0.1.r3.ebuild',]
|
| - self.expected_filters = set(['salt-flavor', 'alpha_beta', 'j-t-r'])
|
| -
|
| - def tearDown(self):
|
| - if self.tmp_dir:
|
| - shutil.rmtree(self.tmp_dir)
|
| -
|
| - def _CreateNestedDir(self, tmp_dir, dir_structure):
|
| - for entry in dir_structure:
|
| - full_path = os.path.join(os.path.join(tmp_dir, entry))
|
| - # ensure dirs are created
|
| - try:
|
| - os.makedirs(os.path.dirname(full_path))
|
| - if full_path.endswith('/'):
|
| - # we only want to create directories
|
| - return
|
| - except OSError, err:
|
| - if err.errno == errno.EEXIST:
|
| - # we don't care if the dir already exists
|
| - pass
|
| - else:
|
| - raise
|
| - # create dummy files
|
| - tmp = open(full_path, 'w')
|
| - tmp.close()
|
| -
|
| - def _LoadPrivateMockFilters(self):
|
| - """Load mock filters as defined in the setUp function."""
|
| - dir_structure = [os.path.join(self.private_structure_base, entry)
|
| - for entry in self.private_pkgs]
|
| -
|
| - self._CreateNestedDir(self.private_dir, dir_structure)
|
| - prebuilt.LoadPrivateFilters(self.tmp_dir)
|
| -
|
| - def testFilterPattern(self):
|
| - """Check that particular packages are filtered properly."""
|
| - self._LoadPrivateMockFilters()
|
| - packages = ['/some/dir/area/j-t-r-0.1.r3.tbz',
|
| - '/var/pkgs/new/alpha_beta-0.2.3.4.tbz',
|
| - '/usr/local/cache/good-0.1.3.tbz',
|
| - '/usr-blah/b_d/salt-flavor-0.0.3.tbz']
|
| - expected_list = ['/usr/local/cache/good-0.1.3.tbz']
|
| - filtered_list = [file for file in packages if not
|
| - prebuilt.ShouldFilterPackage(file)]
|
| - self.assertEqual(expected_list, filtered_list)
|
| -
|
| - def testLoadPrivateFilters(self):
|
| - self._LoadPrivateMockFilters()
|
| - prebuilt.LoadPrivateFilters(self.tmp_dir)
|
| - self.assertEqual(self.expected_filters, prebuilt._FILTER_PACKAGES)
|
| -
|
| - def testEmptyFiltersErrors(self):
|
| - """Ensure LoadPrivateFilters errors if an empty list is generated."""
|
| - os.makedirs(os.path.join(self.tmp_dir, prebuilt._PRIVATE_OVERLAY_DIR))
|
| - self.assertRaises(prebuilt.FiltersEmpty, prebuilt.LoadPrivateFilters,
|
| - self.tmp_dir)
|
| -
|
| -
|
| -class TestPrebuilt(unittest.TestCase):
|
| -
|
| - def setUp(self):
|
| - self.mox = mox.Mox()
|
| -
|
| - def tearDown(self):
|
| - self.mox.UnsetStubs()
|
| - self.mox.VerifyAll()
|
| -
|
| - def testGenerateUploadDict(self):
|
| - base_local_path = '/b/cbuild/build/chroot/build/x86-dogfood/'
|
| - gs_bucket_path = 'gs://chromeos-prebuilt/host/version'
|
| - local_path = os.path.join(base_local_path, 'public1.tbz2')
|
| - self.mox.StubOutWithMock(prebuilt.os.path, 'exists')
|
| - prebuilt.os.path.exists(local_path).AndReturn(True)
|
| - self.mox.ReplayAll()
|
| - pkgs = [{ 'CPV': 'public1' }]
|
| - result = prebuilt.GenerateUploadDict(base_local_path, gs_bucket_path, pkgs)
|
| - expected = { local_path: gs_bucket_path + '/public1.tbz2' }
|
| - self.assertEqual(result, expected)
|
| -
|
| - def testFailonUploadFail(self):
|
| - """Make sure we fail if one of the upload processes fail."""
|
| - files = {'test': '/uasd'}
|
| - self.assertEqual(prebuilt.RemoteUpload(files), set([('test', '/uasd')]))
|
| -
|
| - def testDeterminePrebuiltConfHost(self):
|
| - """Test that the host prebuilt path comes back properly."""
|
| - expected_path = os.path.join(prebuilt._PREBUILT_MAKE_CONF['amd64'])
|
| - self.assertEqual(prebuilt.DeterminePrebuiltConfFile('fake_path', 'amd64'),
|
| - expected_path)
|
| -
|
| - def testDeterminePrebuiltConf(self):
|
| - """Test the different known variants of boards for proper path discovery."""
|
| - fake_path = '/b/cbuild'
|
| - script_path = os.path.join(fake_path, 'src/scripts/bin')
|
| - public_overlay_path = os.path.join(fake_path, 'src/overlays')
|
| - private_overlay_path = os.path.join(fake_path,
|
| - prebuilt._PRIVATE_OVERLAY_DIR)
|
| - path_dict = {'private_overlay_path': private_overlay_path,
|
| - 'public_overlay_path': public_overlay_path}
|
| - # format for targets
|
| - # board target key in dictionar
|
| - # Tuple containing cmd run, expected results as cmd obj, and expected output
|
| -
|
| - # Mock output from cros_overlay_list
|
| - x86_out = ('%(private_overlay_path)s/chromeos-overlay\n'
|
| - '%(public_overlay_path)s/overlay-x86-generic\n' % path_dict)
|
| -
|
| - x86_cmd = './cros_overlay_list --board x86-generic'
|
| - x86_expected_path = os.path.join(public_overlay_path, 'overlay-x86-generic',
|
| - 'prebuilt.conf')
|
| - # Mock output from cros_overlay_list
|
| - tegra2_out = ('%(private_overlay_path)s/chromeos-overlay\n'
|
| - '%(public_overlay_path)s/overlay-tegra2\n'
|
| - '%(public_overlay_path)s/overlay-variant-tegra2-seaboard\n'
|
| - '%(private_overlay_path)s/overlay-tegra2-private\n'
|
| - '%(private_overlay_path)s/'
|
| - 'overlay-variant-tegra2-seaboard-private\n' % path_dict)
|
| - tegra2_cmd = './cros_overlay_list --board tegra2 --variant seaboard'
|
| - tegra2_expected_path = os.path.join(
|
| - private_overlay_path, 'overlay-variant-tegra2-seaboard-private',
|
| - 'prebuilt.conf')
|
| -
|
| -
|
| - targets = {'x86-generic': {'cmd': x86_cmd,
|
| - 'output': x86_out,
|
| - 'result': x86_expected_path},
|
| - 'tegra2_seaboard': {'cmd': tegra2_cmd,
|
| - 'output': tegra2_out,
|
| - 'result': tegra2_expected_path}
|
| - }
|
| -
|
| - self.mox.StubOutWithMock(prebuilt.cros_build_lib, 'RunCommand')
|
| - for target, expected_results in targets.iteritems():
|
| - # create command object for output
|
| - cmd_result_obj = cros_build_lib.CommandResult()
|
| - cmd_result_obj.output = expected_results['output']
|
| - prebuilt.cros_build_lib.RunCommand(
|
| - expected_results['cmd'].split(), redirect_stdout=True,
|
| - cwd=script_path).AndReturn(cmd_result_obj)
|
| -
|
| - self.mox.ReplayAll()
|
| - for target, expected_results in targets.iteritems():
|
| - self.assertEqual(
|
| - prebuilt.DeterminePrebuiltConfFile(fake_path, target),
|
| - expected_results['result'])
|
| -
|
| - def testDeterminePrebuiltConfGarbage(self):
|
| - """Ensure an exception is raised on bad input."""
|
| - self.assertRaises(prebuilt.UnknownBoardFormat,
|
| - prebuilt.DeterminePrebuiltConfFile,
|
| - 'fake_path', 'asdfasdf')
|
| -
|
| -
|
| -class TestPackagesFileFiltering(unittest.TestCase):
|
| -
|
| - def testFilterPkgIndex(self):
|
| - pkgindex = SimplePackageIndex()
|
| - pkgindex.RemoveFilteredPackages(lambda pkg: pkg in PRIVATE_PACKAGES)
|
| - self.assertEqual(pkgindex.packages, PUBLIC_PACKAGES)
|
| - self.assertEqual(pkgindex.modified, True)
|
| -
|
| -
|
| -class TestPopulateDuplicateDB(unittest.TestCase):
|
| -
|
| - def testEmptyIndex(self):
|
| - pkgindex = SimplePackageIndex(packages=False)
|
| - db = {}
|
| - pkgindex._PopulateDuplicateDB(db)
|
| - self.assertEqual(db, {})
|
| -
|
| - def testNormalIndex(self):
|
| - pkgindex = SimplePackageIndex()
|
| - db = {}
|
| - pkgindex._PopulateDuplicateDB(db)
|
| - self.assertEqual(len(db), 3)
|
| - self.assertEqual(db['1'], 'http://www.example.com/gtk%2B/public1.tbz2')
|
| - self.assertEqual(db['2'], 'http://www.example.com/gtk%2B/foo.tgz')
|
| - self.assertEqual(db['3'], 'http://www.example.com/private.tbz2')
|
| -
|
| - def testMissingSHA1(self):
|
| - db = {}
|
| - pkgindex = SimplePackageIndex()
|
| - del pkgindex.packages[0]['SHA1']
|
| - pkgindex._PopulateDuplicateDB(db)
|
| - self.assertEqual(len(db), 2)
|
| - self.assertEqual(db['2'], 'http://www.example.com/gtk%2B/foo.tgz')
|
| - self.assertEqual(db['3'], 'http://www.example.com/private.tbz2')
|
| -
|
| - def testFailedPopulate(self):
|
| - db = {}
|
| - pkgindex = SimplePackageIndex(header=False)
|
| - self.assertRaises(KeyError, pkgindex._PopulateDuplicateDB, db)
|
| - pkgindex = SimplePackageIndex()
|
| - del pkgindex.packages[0]['CPV']
|
| - self.assertRaises(KeyError, pkgindex._PopulateDuplicateDB, db)
|
| -
|
| -
|
| -class TestResolveDuplicateUploads(unittest.TestCase):
|
| -
|
| - def testEmptyList(self):
|
| - pkgindex = SimplePackageIndex()
|
| - pristine = SimplePackageIndex()
|
| - uploads = pkgindex.ResolveDuplicateUploads([])
|
| - self.assertEqual(uploads, pristine.packages)
|
| - self.assertEqual(pkgindex.packages, pristine.packages)
|
| - self.assertEqual(pkgindex.modified, False)
|
| -
|
| - def testEmptyIndex(self):
|
| - pkgindex = SimplePackageIndex()
|
| - pristine = SimplePackageIndex()
|
| - empty = SimplePackageIndex(packages=False)
|
| - uploads = pkgindex.ResolveDuplicateUploads([empty])
|
| - self.assertEqual(uploads, pristine.packages)
|
| - self.assertEqual(pkgindex.packages, pristine.packages)
|
| - self.assertEqual(pkgindex.modified, False)
|
| -
|
| - def testDuplicates(self):
|
| - pkgindex = SimplePackageIndex()
|
| - dup_pkgindex = SimplePackageIndex()
|
| - expected_pkgindex = SimplePackageIndex()
|
| - for pkg in expected_pkgindex.packages:
|
| - pkg.setdefault('PATH', urllib.quote(pkg['CPV'] + '.tbz2'))
|
| - uploads = pkgindex.ResolveDuplicateUploads([dup_pkgindex])
|
| - self.assertEqual(pkgindex.packages, expected_pkgindex.packages)
|
| -
|
| - def testMissingSHA1(self):
|
| - db = {}
|
| - pkgindex = SimplePackageIndex()
|
| - dup_pkgindex = SimplePackageIndex()
|
| - expected_pkgindex = SimplePackageIndex()
|
| - del pkgindex.packages[0]['SHA1']
|
| - del expected_pkgindex.packages[0]['SHA1']
|
| - for pkg in expected_pkgindex.packages[1:]:
|
| - pkg.setdefault('PATH', pkg['CPV'] + '.tbz2')
|
| - uploads = pkgindex.ResolveDuplicateUploads([dup_pkgindex])
|
| - self.assertEqual(pkgindex.packages, expected_pkgindex.packages)
|
| -
|
| -
|
| -class TestWritePackageIndex(unittest.TestCase):
|
| -
|
| - def setUp(self):
|
| - self.mox = mox.Mox()
|
| -
|
| - def tearDown(self):
|
| - self.mox.UnsetStubs()
|
| - self.mox.VerifyAll()
|
| -
|
| - def testSimple(self):
|
| - pkgindex = SimplePackageIndex()
|
| - self.mox.StubOutWithMock(pkgindex, 'Write')
|
| - pkgindex.Write(mox.IgnoreArg())
|
| - self.mox.ReplayAll()
|
| - f = pkgindex.WriteToNamedTemporaryFile()
|
| - self.assertEqual(f.read(), '')
|
| -
|
| -
|
| -if __name__ == '__main__':
|
| - unittest.main()
|
|
|