| Index: tools/telemetry/telemetry/unittest_util/system_stub_unittest.py
|
| diff --git a/tools/telemetry/telemetry/unittest_util/system_stub_unittest.py b/tools/telemetry/telemetry/unittest_util/system_stub_unittest.py
|
| index 99ef5d40ea2b3e589705b9421287300c11ce159d..de549d58ed0d41d7d499291e7908066251d36dd8 100644
|
| --- a/tools/telemetry/telemetry/unittest_util/system_stub_unittest.py
|
| +++ b/tools/telemetry/telemetry/unittest_util/system_stub_unittest.py
|
| @@ -32,28 +32,38 @@ class CloudStorageTest(unittest.TestCase):
|
| {'preset_internal_file.wpr':CloudStorageTest.INTERNAL_FILE_HASH}}
|
|
|
| # Local data files and hashes.
|
| - self.data_files = ['/path/to/success.wpr',
|
| - '/path/to/wrong_hash.wpr',
|
| - '/path/to/preset_public_file.wpr',
|
| - '/path/to/preset_partner_file.wpr',
|
| - '/path/to/preset_internal_file.wpr']
|
| + self.data_files = [
|
| + os.path.join(os.path.sep, 'path', 'to', 'success.wpr'),
|
| + os.path.join(os.path.sep, 'path', 'to', 'wrong_hash.wpr'),
|
| + os.path.join(os.path.sep, 'path', 'to', 'preset_public_file.wpr'),
|
| + os.path.join(os.path.sep, 'path', 'to', 'preset_partner_file.wpr'),
|
| + os.path.join(os.path.sep, 'path', 'to', 'preset_internal_file.wpr')]
|
| self.local_file_hashes = {
|
| - '/path/to/success.wpr': CloudStorageTest.SUCCESS_FILE_HASH,
|
| - '/path/to/wrong_hash.wpr': CloudStorageTest.SUCCESS_FILE_HASH,
|
| - '/path/to/preset_public_file.wpr':CloudStorageTest.PUBLIC_FILE_HASH,
|
| - '/path/to/preset_partner_file.wpr':CloudStorageTest.PARTNER_FILE_HASH,
|
| - '/path/to/preset_internal_file.wpr':CloudStorageTest.INTERNAL_FILE_HASH,
|
| + os.path.join(os.path.sep, 'path', 'to', 'success.wpr'):
|
| + CloudStorageTest.SUCCESS_FILE_HASH,
|
| + os.path.join(os.path.sep, 'path', 'to', 'wrong_hash.wpr'):
|
| + CloudStorageTest.SUCCESS_FILE_HASH,
|
| + os.path.join(os.path.sep, 'path', 'to', 'preset_public_file.wpr'):
|
| + CloudStorageTest.PUBLIC_FILE_HASH,
|
| + os.path.join(os.path.sep, 'path', 'to', 'preset_partner_file.wpr'):
|
| + CloudStorageTest.PARTNER_FILE_HASH,
|
| + os.path.join(os.path.sep, 'path', 'to', 'preset_internal_file.wpr'):
|
| + CloudStorageTest.INTERNAL_FILE_HASH,
|
| }
|
| self.cloud_storage.SetCalculatedHashesForTesting(self.local_file_hashes)
|
| # Local hash files and their contents.
|
| local_hash_files = {
|
| - '/path/to/success.wpr.sha1': CloudStorageTest.SUCCESS_FILE_HASH,
|
| - '/path/to/wrong_hash.wpr.sha1': 'wronghash'.zfill(40),
|
| - '/path/to/preset_public_file.wpr.sha1': CloudStorageTest.PUBLIC_FILE_HASH,
|
| - '/path/to/preset_partner_file.wpr.sha1':
|
| - CloudStorageTest.PARTNER_FILE_HASH,
|
| - '/path/to/preset_internal_file.wpr.sha1':
|
| - CloudStorageTest.INTERNAL_FILE_HASH,
|
| + os.path.join(os.path.sep, 'path', 'to', 'success.wpr.sha1'):
|
| + CloudStorageTest.SUCCESS_FILE_HASH,
|
| + os.path.join(os.path.sep, 'path', 'to', 'wrong_hash.wpr.sha1'):
|
| + 'wronghash'.zfill(40),
|
| + os.path.join(os.path.sep, 'path', 'to', 'preset_public_file.wpr.sha1'):
|
| + CloudStorageTest.PUBLIC_FILE_HASH,
|
| + os.path.join(os.path.sep, 'path', 'to', 'preset_partner_file.wpr.sha1'):
|
| + CloudStorageTest.PARTNER_FILE_HASH,
|
| + os.path.join(os.path.sep, 'path', 'to',
|
| + 'preset_internal_file.wpr.sha1'):
|
| + CloudStorageTest.INTERNAL_FILE_HASH,
|
| }
|
| self.cloud_storage.SetHashFileContentsForTesting(local_hash_files)
|
|
|
| @@ -73,25 +83,25 @@ class CloudStorageTest(unittest.TestCase):
|
| self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
|
| 'preset_public_file.wpr'))
|
| self.assertFalse(self.cloud_storage.Exists(
|
| - self.cloud_storage.PARTNER_BUCKET, 'preset_partner_file.wpr'))
|
| + self.cloud_storage.PARTNER_BUCKET, 'preset_partner_file.wpr'))
|
| self.assertFalse(self.cloud_storage.Exists(
|
| - self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr'))
|
| + self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr'))
|
|
|
| def testExistsNonEmptyCloudStorage(self):
|
| # Test non-empty remote files dictionary.
|
| self.cloud_storage.SetRemotePathsForTesting(self.remote_paths)
|
| - self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
|
| - 'preset_public_file.wpr'))
|
| - self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PARTNER_BUCKET,
|
| - 'preset_partner_file.wpr'))
|
| self.assertTrue(self.cloud_storage.Exists(
|
| - self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr'))
|
| - self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
|
| - 'fake_file'))
|
| + self.cloud_storage.PUBLIC_BUCKET, 'preset_public_file.wpr'))
|
| + self.assertTrue(self.cloud_storage.Exists(
|
| + self.cloud_storage.PARTNER_BUCKET, 'preset_partner_file.wpr'))
|
| + self.assertTrue(self.cloud_storage.Exists(
|
| + self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr'))
|
| + self.assertFalse(self.cloud_storage.Exists(
|
| + self.cloud_storage.PUBLIC_BUCKET, 'fake_file'))
|
| self.assertFalse(self.cloud_storage.Exists(
|
| - self.cloud_storage.PARTNER_BUCKET, 'fake_file'))
|
| + self.cloud_storage.PARTNER_BUCKET, 'fake_file'))
|
| self.assertFalse(self.cloud_storage.Exists(
|
| - self.cloud_storage.INTERNAL_BUCKET, 'fake_file'))
|
| + self.cloud_storage.INTERNAL_BUCKET, 'fake_file'))
|
| # Reset state.
|
| self.cloud_storage.SetRemotePathsForTesting()
|
|
|
| @@ -100,64 +110,72 @@ class CloudStorageTest(unittest.TestCase):
|
| self.cloud_storage.SetRemotePathsForTesting(self.remote_paths)
|
| self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
|
| 'success.wpr'))
|
| - self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
|
| - '/path/to/success.wpr')
|
| - self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
|
| - 'success.wpr'))
|
| + self.cloud_storage.Insert(
|
| + self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
|
| + os.path.join(os.path.sep, 'path', 'to', 'success.wpr'))
|
| + self.assertTrue(self.cloud_storage.Exists(
|
| + self.cloud_storage.PUBLIC_BUCKET, 'success.wpr'))
|
| # Reset state.
|
| self.cloud_storage.SetRemotePathsForTesting()
|
|
|
| def testEmptyInsertAndExistsPublic(self):
|
| # Test empty remote files dictionary.
|
| - self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
|
| - 'success.wpr'))
|
| - self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
|
| - '/path/to/success.wpr')
|
| - self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
|
| - 'success.wpr'))
|
| + self.assertFalse(self.cloud_storage.Exists(
|
| + self.cloud_storage.PUBLIC_BUCKET, 'success.wpr'))
|
| + self.cloud_storage.Insert(
|
| + self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
|
| + os.path.join(os.path.sep, 'path', 'to', 'success.wpr'))
|
| + self.assertTrue(self.cloud_storage.Exists(
|
| + self.cloud_storage.PUBLIC_BUCKET, 'success.wpr'))
|
|
|
| def testEmptyInsertAndGet(self):
|
| self.assertRaises(self.cloud_storage.NotFoundError, self.cloud_storage.Get,
|
| self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
|
| - '/path/to/success.wpr')
|
| + os.path.join(os.path.sep, 'path', 'to', 'success.wpr'))
|
| self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
|
| - '/path/to/success.wpr')
|
| - self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
|
| - 'success.wpr'))
|
| - self.assertEqual(CloudStorageTest.SUCCESS_FILE_HASH,
|
| - self.cloud_storage.Get(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
|
| - '/path/to/success.wpr'))
|
| + os.path.join(os.path.sep, 'path', 'to',
|
| + 'success.wpr'))
|
| + self.assertTrue(self.cloud_storage.Exists(
|
| + self.cloud_storage.PUBLIC_BUCKET, 'success.wpr'))
|
| + self.assertEqual(CloudStorageTest.SUCCESS_FILE_HASH, self.cloud_storage.Get(
|
| + self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
|
| + os.path.join(os.path.sep, 'path', 'to', 'success.wpr')))
|
|
|
| def testNonEmptyInsertAndGet(self):
|
| self.cloud_storage.SetRemotePathsForTesting(self.remote_paths)
|
| self.assertRaises(self.cloud_storage.NotFoundError, self.cloud_storage.Get,
|
| self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
|
| - '/path/to/success.wpr')
|
| + os.path.join(os.path.sep, 'path', 'to', 'success.wpr'))
|
| self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
|
| - '/path/to/success.wpr')
|
| + os.path.join(os.path.sep, 'path', 'to',
|
| + 'success.wpr'))
|
| self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
|
| 'success.wpr'))
|
| - self.assertEqual(CloudStorageTest.SUCCESS_FILE_HASH,
|
| - self.cloud_storage.Get(self.cloud_storage.PUBLIC_BUCKET,
|
| - 'success.wpr',
|
| - '/path/to/success.wpr'))
|
| + self.assertEqual(
|
| + CloudStorageTest.SUCCESS_FILE_HASH, self.cloud_storage.Get(
|
| + self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
|
| + os.path.join(os.path.sep, 'path', 'to', 'success.wpr')))
|
| # Reset state.
|
| self.cloud_storage.SetRemotePathsForTesting()
|
|
|
| def testGetIfChanged(self):
|
| self.cloud_storage.SetRemotePathsForTesting(self.remote_paths)
|
| - self.assertRaises(self.cloud_storage.NotFoundError, self.cloud_storage.Get,
|
| - self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
|
| - '/path/to/success.wpr')
|
| + self.assertRaises(
|
| + self.cloud_storage.NotFoundError, self.cloud_storage.Get,
|
| + self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
|
| + os.path.join(os.path.sep, 'path', 'to', 'success.wpr'))
|
| self.assertFalse(self.cloud_storage.GetIfChanged(
|
| - '/path/to/preset_public_file.wpr', self.cloud_storage.PUBLIC_BUCKET))
|
| + os.path.join(os.path.sep, 'path', 'to', 'preset_public_file.wpr'),
|
| + self.cloud_storage.PUBLIC_BUCKET))
|
| self.cloud_storage.ChangeRemoteHashForTesting(
|
| - self.cloud_storage.PUBLIC_BUCKET, 'preset_public_file.wpr',
|
| - CloudStorageTest.UPDATED_HASH)
|
| + self.cloud_storage.PUBLIC_BUCKET, 'preset_public_file.wpr',
|
| + CloudStorageTest.UPDATED_HASH)
|
| self.assertTrue(self.cloud_storage.GetIfChanged(
|
| - '/path/to/preset_public_file.wpr', self.cloud_storage.PUBLIC_BUCKET))
|
| + os.path.join(os.path.sep, 'path', 'to', 'preset_public_file.wpr'),
|
| + self.cloud_storage.PUBLIC_BUCKET))
|
| self.assertFalse(self.cloud_storage.GetIfChanged(
|
| - '/path/to/preset_public_file.wpr', self.cloud_storage.PUBLIC_BUCKET))
|
| + os.path.join(os.path.sep, 'path', 'to', 'preset_public_file.wpr'),
|
| + self.cloud_storage.PUBLIC_BUCKET))
|
| # Reset state.
|
| self.cloud_storage.SetRemotePathsForTesting()
|
|
|
| @@ -173,47 +191,50 @@ class CloudStorageTest(unittest.TestCase):
|
| def testPermissionError(self):
|
| self.cloud_storage.SetRemotePathsForTesting(self.remote_paths)
|
| self.cloud_storage.SetPermissionLevelForTesting(
|
| - self.cloud_storage.PUBLIC_PERMISSION)
|
| + self.cloud_storage.PUBLIC_PERMISSION)
|
| self.assertRaises(
|
| - self.cloud_storage.PermissionError, self.cloud_storage.Get,
|
| - self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr',
|
| - '/path/to/preset_internal_file.wpr')
|
| + self.cloud_storage.PermissionError, self.cloud_storage.Get,
|
| + self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr',
|
| + os.path.join(os.path.sep, 'path', 'to', 'preset_internal_file.wpr'))
|
| self.assertRaises(
|
| - self.cloud_storage.PermissionError, self.cloud_storage.GetIfChanged,
|
| - '/path/to/preset_internal_file.wpr', self.cloud_storage.INTERNAL_BUCKET)
|
| + self.cloud_storage.PermissionError, self.cloud_storage.GetIfChanged,
|
| + os.path.join(os.path.sep, 'path', 'to', 'preset_internal_file.wpr'),
|
| + self.cloud_storage.INTERNAL_BUCKET)
|
| self.assertRaises(
|
| - self.cloud_storage.PermissionError, self.cloud_storage.List,
|
| - self.cloud_storage.INTERNAL_BUCKET)
|
| + self.cloud_storage.PermissionError, self.cloud_storage.List,
|
| + self.cloud_storage.INTERNAL_BUCKET)
|
| self.assertRaises(
|
| - self.cloud_storage.PermissionError, self.cloud_storage.Exists,
|
| - self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr')
|
| + self.cloud_storage.PermissionError, self.cloud_storage.Exists,
|
| + self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr')
|
| self.assertRaises(
|
| - self.cloud_storage.PermissionError, self.cloud_storage.Insert,
|
| - self.cloud_storage.INTERNAL_BUCKET, 'success.wpr', '/path/to/success.wpr')
|
| + self.cloud_storage.PermissionError, self.cloud_storage.Insert,
|
| + self.cloud_storage.INTERNAL_BUCKET, 'success.wpr',
|
| + os.path.join(os.path.sep, 'path', 'to', 'success.wpr'))
|
| # Reset state.
|
| self.cloud_storage.SetRemotePathsForTesting()
|
|
|
| def testCredentialsError(self):
|
| self.cloud_storage.SetRemotePathsForTesting(self.remote_paths)
|
| self.cloud_storage.SetPermissionLevelForTesting(
|
| - self.cloud_storage.CREDENTIALS_ERROR_PERMISSION)
|
| + self.cloud_storage.CREDENTIALS_ERROR_PERMISSION)
|
| self.assertRaises(
|
| - self.cloud_storage.CredentialsError, self.cloud_storage.Get,
|
| - self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr',
|
| - '/path/to/preset_internal_file.wpr')
|
| + self.cloud_storage.CredentialsError, self.cloud_storage.Get,
|
| + self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr',
|
| + os.path.join(os.path.sep, 'path', 'to', 'preset_internal_file.wpr'))
|
| self.assertRaises(
|
| - self.cloud_storage.CredentialsError, self.cloud_storage.GetIfChanged,
|
| - self.cloud_storage.INTERNAL_BUCKET, '/path/to/preset_internal_file.wpr')
|
| + self.cloud_storage.CredentialsError, self.cloud_storage.GetIfChanged,
|
| + self.cloud_storage.INTERNAL_BUCKET,
|
| + os.path.join(os.path.sep, 'path', 'to', 'preset_internal_file.wpr'))
|
| self.assertRaises(
|
| - self.cloud_storage.CredentialsError, self.cloud_storage.List,
|
| - self.cloud_storage.INTERNAL_BUCKET)
|
| + self.cloud_storage.CredentialsError, self.cloud_storage.List,
|
| + self.cloud_storage.INTERNAL_BUCKET)
|
| self.assertRaises(
|
| - self.cloud_storage.CredentialsError, self.cloud_storage.Exists,
|
| - self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr')
|
| + self.cloud_storage.CredentialsError, self.cloud_storage.Exists,
|
| + self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr')
|
| self.assertRaises(
|
| - self.cloud_storage.CredentialsError, self.cloud_storage.Insert,
|
| - self.cloud_storage.INTERNAL_BUCKET, 'success.wpr',
|
| - '/path/to/success.wpr')
|
| + self.cloud_storage.CredentialsError, self.cloud_storage.Insert,
|
| + self.cloud_storage.INTERNAL_BUCKET, 'success.wpr',
|
| + os.path.join(os.path.sep, 'path', 'to', 'success.wpr'))
|
| # Reset state.
|
| self.cloud_storage.SetRemotePathsForTesting()
|
|
|
|
|