| Index: tools/telemetry/telemetry/unittest/system_stub_unittest.py
|
| diff --git a/tools/telemetry/telemetry/unittest/system_stub_unittest.py b/tools/telemetry/telemetry/unittest/system_stub_unittest.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..238f344b71711596188df957ec7ac83820fa1a75
|
| --- /dev/null
|
| +++ b/tools/telemetry/telemetry/unittest/system_stub_unittest.py
|
| @@ -0,0 +1,218 @@
|
| +# Copyright 2013 The Chromium Authors. All rights reserved.
|
| +# Use of this source code is governed by a BSD-style license that can be
|
| +# found in the LICENSE file.
|
| +
|
| +import os
|
| +import unittest
|
| +
|
| +PERF_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
| +from telemetry.unittest import system_stub
|
| +
|
| +
|
| +class CloudStorageTest(unittest.TestCase):
|
| + SUCCESS_FILE_HASH = 'success'.zfill(40)
|
| + PUBLIC_FILE_HASH = 'public'.zfill(40)
|
| + PARTNER_FILE_HASH = 'partner'.zfill(40)
|
| + INTERNAL_FILE_HASH = 'internal'.zfill(40)
|
| + UPDATED_HASH = 'updated'.zfill(40)
|
| +
|
| + def setUp(self):
|
| + self.cloud_storage = system_stub.CloudStorageModuleStub()
|
| +
|
| + # Files in Cloud Storage.
|
| + self.remote_files = ['preset_public_file.wpr',
|
| + 'preset_partner_file.wpr',
|
| + 'preset_internal_file.wpr']
|
| + self.remote_paths = {
|
| + self.cloud_storage.PUBLIC_BUCKET:
|
| + {'preset_public_file.wpr':CloudStorageTest.PUBLIC_FILE_HASH},
|
| + self.cloud_storage.PARTNER_BUCKET:
|
| + {'preset_partner_file.wpr':CloudStorageTest.PARTNER_FILE_HASH},
|
| + self.cloud_storage.INTERNAL_BUCKET:
|
| + {'preset_internal_file.wpr':CloudStorageTest.INTERNAL_FILE_HASH}}
|
| +
|
| + # Local data files and hashes.
|
| + self.data_files = ['/path/to/success.wpr',
|
| + '/path/to/wrong_hash.wpr',
|
| + '/path/to/preset_public_file.wpr',
|
| + '/path/to/preset_partner_file.wpr',
|
| + '/path/to/preset_internal_file.wpr']
|
| + self.local_file_hashes = {
|
| + '/path/to/success.wpr': CloudStorageTest.SUCCESS_FILE_HASH,
|
| + '/path/to/wrong_hash.wpr': CloudStorageTest.SUCCESS_FILE_HASH,
|
| + '/path/to/preset_public_file.wpr':CloudStorageTest.PUBLIC_FILE_HASH,
|
| + '/path/to/preset_partner_file.wpr':CloudStorageTest.PARTNER_FILE_HASH,
|
| + '/path/to/preset_internal_file.wpr':CloudStorageTest.INTERNAL_FILE_HASH,
|
| + }
|
| + self.cloud_storage.SetCalculatedHashesForTesting(self.local_file_hashes)
|
| + # Local hash files and their contents.
|
| + local_hash_files = {
|
| + '/path/to/success.wpr.sha1': CloudStorageTest.SUCCESS_FILE_HASH,
|
| + '/path/to/wrong_hash.wpr.sha1': 'wronghash'.zfill(40),
|
| + '/path/to/preset_public_file.wpr.sha1': CloudStorageTest.PUBLIC_FILE_HASH,
|
| + '/path/to/preset_partner_file.wpr.sha1':
|
| + CloudStorageTest.PARTNER_FILE_HASH,
|
| + '/path/to/preset_internal_file.wpr.sha1':
|
| + CloudStorageTest.INTERNAL_FILE_HASH,
|
| + }
|
| + self.cloud_storage.SetHashFileContentsForTesting(local_hash_files)
|
| +
|
| + def testSetup(self):
|
| + self.assertEqual(self.local_file_hashes,
|
| + self.cloud_storage.local_file_hashes)
|
| + self.assertEqual(set(self.data_files),
|
| + set(self.cloud_storage.GetLocalDataFiles()))
|
| + self.assertEqual(self.cloud_storage.default_remote_paths,
|
| + self.cloud_storage.GetRemotePathsForTesting())
|
| + self.cloud_storage.SetRemotePathsForTesting(self.remote_paths)
|
| + self.assertEqual(self.remote_paths,
|
| + self.cloud_storage.GetRemotePathsForTesting())
|
| +
|
| + def testExistsEmptyCloudStorage(self):
|
| + # Test empty remote files dictionary.
|
| + self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
|
| + 'preset_public_file.wpr'))
|
| + self.assertFalse(self.cloud_storage.Exists(
|
| + self.cloud_storage.PARTNER_BUCKET, 'preset_partner_file.wpr'))
|
| + self.assertFalse(self.cloud_storage.Exists(
|
| + self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr'))
|
| +
|
| + def testExistsNonEmptyCloudStorage(self):
|
| + # Test non-empty remote files dictionary.
|
| + self.cloud_storage.SetRemotePathsForTesting(self.remote_paths)
|
| + self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
|
| + 'preset_public_file.wpr'))
|
| + self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PARTNER_BUCKET,
|
| + 'preset_partner_file.wpr'))
|
| + self.assertTrue(self.cloud_storage.Exists(
|
| + self.cloud_storage.INTERNAL_BUCKET,'preset_internal_file.wpr'))
|
| + self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
|
| + 'fake_file'))
|
| + self.assertFalse(self.cloud_storage.Exists(
|
| + self.cloud_storage.PARTNER_BUCKET,'fake_file'))
|
| + self.assertFalse(self.cloud_storage.Exists(
|
| + self.cloud_storage.INTERNAL_BUCKET, 'fake_file'))
|
| + # Reset state.
|
| + self.cloud_storage.SetRemotePathsForTesting()
|
| +
|
| + def testNonEmptyInsertAndExistsPublic(self):
|
| + # Test non-empty remote files dictionary.
|
| + self.cloud_storage.SetRemotePathsForTesting(self.remote_paths)
|
| + self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
|
| + 'success.wpr'))
|
| + self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
|
| + '/path/to/success.wpr')
|
| + self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
|
| + 'success.wpr'))
|
| + # Reset state.
|
| + self.cloud_storage.SetRemotePathsForTesting()
|
| +
|
| + def testEmptyInsertAndExistsPublic(self):
|
| + # Test empty remote files dictionary.
|
| + self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
|
| + 'success.wpr'))
|
| + self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
|
| + '/path/to/success.wpr')
|
| + self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
|
| + 'success.wpr'))
|
| +
|
| + def testEmptyInsertAndGet(self):
|
| + self.assertRaises(self.cloud_storage.NotFoundError, self.cloud_storage.Get,
|
| + self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
|
| + '/path/to/success.wpr')
|
| + self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
|
| + '/path/to/success.wpr')
|
| + self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
|
| + 'success.wpr'))
|
| + self.assertEqual(CloudStorageTest.SUCCESS_FILE_HASH,
|
| + self.cloud_storage.Get(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
|
| + '/path/to/success.wpr'))
|
| +
|
| + def testNonEmptyInsertAndGet(self):
|
| + self.cloud_storage.SetRemotePathsForTesting(self.remote_paths)
|
| + self.assertRaises(self.cloud_storage.NotFoundError, self.cloud_storage.Get,
|
| + self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
|
| + '/path/to/success.wpr')
|
| + self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
|
| + '/path/to/success.wpr')
|
| + self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
|
| + 'success.wpr'))
|
| + self.assertEqual(CloudStorageTest.SUCCESS_FILE_HASH,
|
| + self.cloud_storage.Get(self.cloud_storage.PUBLIC_BUCKET,
|
| + 'success.wpr',
|
| + '/path/to/success.wpr'))
|
| + # Reset state.
|
| + self.cloud_storage.SetRemotePathsForTesting()
|
| +
|
| + def testGetIfChanged(self):
|
| + self.cloud_storage.SetRemotePathsForTesting(self.remote_paths)
|
| + self.assertRaises(self.cloud_storage.NotFoundError, self.cloud_storage.Get,
|
| + self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
|
| + '/path/to/success.wpr')
|
| + self.assertFalse(self.cloud_storage.GetIfChanged(
|
| + self.cloud_storage.PUBLIC_BUCKET, '/path/to/preset_public_file.wpr'))
|
| + self.cloud_storage.ChangeRemoteHashForTesting(
|
| + self.cloud_storage.PUBLIC_BUCKET, 'preset_public_file.wpr',
|
| + CloudStorageTest.UPDATED_HASH)
|
| + self.assertTrue(self.cloud_storage.GetIfChanged(
|
| + self.cloud_storage.PUBLIC_BUCKET, '/path/to/preset_public_file.wpr'))
|
| + self.assertFalse(self.cloud_storage.GetIfChanged(
|
| + self.cloud_storage.PUBLIC_BUCKET, '/path/to/preset_public_file.wpr'))
|
| + # Reset state.
|
| + self.cloud_storage.SetRemotePathsForTesting()
|
| +
|
| + def testList(self):
|
| + self.assertEqual([],
|
| + self.cloud_storage.List(self.cloud_storage.PUBLIC_BUCKET))
|
| + self.cloud_storage.SetRemotePathsForTesting(self.remote_paths)
|
| + self.assertEqual(['preset_public_file.wpr'],
|
| + self.cloud_storage.List(self.cloud_storage.PUBLIC_BUCKET))
|
| + # Reset state.
|
| + self.cloud_storage.SetRemotePathsForTesting()
|
| +
|
| + def testPermissionError(self):
|
| + self.cloud_storage.SetRemotePathsForTesting(self.remote_paths)
|
| + self.cloud_storage.SetPermissionLevelForTesting(
|
| + self.cloud_storage.PUBLIC_PERMISSION)
|
| + self.assertRaises(
|
| + self.cloud_storage.PermissionError, self.cloud_storage.Get,
|
| + self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr',
|
| + '/path/to/preset_internal_file.wpr')
|
| + self.assertRaises(
|
| + self.cloud_storage.PermissionError, self.cloud_storage.GetIfChanged,
|
| + self.cloud_storage.INTERNAL_BUCKET, '/path/to/preset_internal_file.wpr')
|
| + self.assertRaises(
|
| + self.cloud_storage.PermissionError, self.cloud_storage.List,
|
| + self.cloud_storage.INTERNAL_BUCKET)
|
| + self.assertRaises(
|
| + self.cloud_storage.PermissionError, self.cloud_storage.Exists,
|
| + self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr')
|
| + self.assertRaises(
|
| + self.cloud_storage.PermissionError, self.cloud_storage.Insert,
|
| + self.cloud_storage.INTERNAL_BUCKET, 'success.wpr', '/path/to/success.wpr')
|
| + # Reset state.
|
| + self.cloud_storage.SetRemotePathsForTesting()
|
| +
|
| + def testCredentialsError(self):
|
| + self.cloud_storage.SetRemotePathsForTesting(self.remote_paths)
|
| + self.cloud_storage.SetPermissionLevelForTesting(
|
| + self.cloud_storage.CREDENTIALS_ERROR_PERMISSION)
|
| + self.assertRaises(
|
| + self.cloud_storage.CredentialsError, self.cloud_storage.Get,
|
| + self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr',
|
| + '/path/to/preset_internal_file.wpr')
|
| + self.assertRaises(
|
| + self.cloud_storage.CredentialsError, self.cloud_storage.GetIfChanged,
|
| + self.cloud_storage.INTERNAL_BUCKET, '/path/to/preset_internal_file.wpr')
|
| + self.assertRaises(
|
| + self.cloud_storage.CredentialsError, self.cloud_storage.List,
|
| + self.cloud_storage.INTERNAL_BUCKET)
|
| + self.assertRaises(
|
| + self.cloud_storage.CredentialsError, self.cloud_storage.Exists,
|
| + self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr')
|
| + self.assertRaises(
|
| + self.cloud_storage.CredentialsError, self.cloud_storage.Insert,
|
| + self.cloud_storage.INTERNAL_BUCKET, 'success.wpr',
|
| + '/path/to/success.wpr')
|
| + # Reset state.
|
| + self.cloud_storage.SetRemotePathsForTesting()
|
|
|