Index: tools/telemetry/telemetry/unittest/system_stub_unittest.py |
diff --git a/tools/telemetry/telemetry/unittest/system_stub_unittest.py b/tools/telemetry/telemetry/unittest/system_stub_unittest.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..238f344b71711596188df957ec7ac83820fa1a75 |
--- /dev/null |
+++ b/tools/telemetry/telemetry/unittest/system_stub_unittest.py |
@@ -0,0 +1,218 @@ |
+# Copyright 2013 The Chromium Authors. All rights reserved. |
+# Use of this source code is governed by a BSD-style license that can be |
+# found in the LICENSE file. |
+ |
+import os |
+import unittest |
+ |
+PERF_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
+from telemetry.unittest import system_stub |
+ |
+ |
+class CloudStorageTest(unittest.TestCase): |
+ SUCCESS_FILE_HASH = 'success'.zfill(40) |
+ PUBLIC_FILE_HASH = 'public'.zfill(40) |
+ PARTNER_FILE_HASH = 'partner'.zfill(40) |
+ INTERNAL_FILE_HASH = 'internal'.zfill(40) |
+ UPDATED_HASH = 'updated'.zfill(40) |
+ |
+ def setUp(self): |
+ self.cloud_storage = system_stub.CloudStorageModuleStub() |
+ |
+ # Files in Cloud Storage. |
+ self.remote_files = ['preset_public_file.wpr', |
+ 'preset_partner_file.wpr', |
+ 'preset_internal_file.wpr'] |
+ self.remote_paths = { |
+ self.cloud_storage.PUBLIC_BUCKET: |
+ {'preset_public_file.wpr':CloudStorageTest.PUBLIC_FILE_HASH}, |
+ self.cloud_storage.PARTNER_BUCKET: |
+ {'preset_partner_file.wpr':CloudStorageTest.PARTNER_FILE_HASH}, |
+ self.cloud_storage.INTERNAL_BUCKET: |
+ {'preset_internal_file.wpr':CloudStorageTest.INTERNAL_FILE_HASH}} |
+ |
+ # Local data files and hashes. |
+ self.data_files = ['/path/to/success.wpr', |
+ '/path/to/wrong_hash.wpr', |
+ '/path/to/preset_public_file.wpr', |
+ '/path/to/preset_partner_file.wpr', |
+ '/path/to/preset_internal_file.wpr'] |
+ self.local_file_hashes = { |
+ '/path/to/success.wpr': CloudStorageTest.SUCCESS_FILE_HASH, |
+ '/path/to/wrong_hash.wpr': CloudStorageTest.SUCCESS_FILE_HASH, |
+ '/path/to/preset_public_file.wpr':CloudStorageTest.PUBLIC_FILE_HASH, |
+ '/path/to/preset_partner_file.wpr':CloudStorageTest.PARTNER_FILE_HASH, |
+ '/path/to/preset_internal_file.wpr':CloudStorageTest.INTERNAL_FILE_HASH, |
+ } |
+ self.cloud_storage.SetCalculatedHashesForTesting(self.local_file_hashes) |
+ # Local hash files and their contents. |
+ local_hash_files = { |
+ '/path/to/success.wpr.sha1': CloudStorageTest.SUCCESS_FILE_HASH, |
+ '/path/to/wrong_hash.wpr.sha1': 'wronghash'.zfill(40), |
+ '/path/to/preset_public_file.wpr.sha1': CloudStorageTest.PUBLIC_FILE_HASH, |
+ '/path/to/preset_partner_file.wpr.sha1': |
+ CloudStorageTest.PARTNER_FILE_HASH, |
+ '/path/to/preset_internal_file.wpr.sha1': |
+ CloudStorageTest.INTERNAL_FILE_HASH, |
+ } |
+ self.cloud_storage.SetHashFileContentsForTesting(local_hash_files) |
+ |
+ def testSetup(self): |
+ self.assertEqual(self.local_file_hashes, |
+ self.cloud_storage.local_file_hashes) |
+ self.assertEqual(set(self.data_files), |
+ set(self.cloud_storage.GetLocalDataFiles())) |
+ self.assertEqual(self.cloud_storage.default_remote_paths, |
+ self.cloud_storage.GetRemotePathsForTesting()) |
+ self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) |
+ self.assertEqual(self.remote_paths, |
+ self.cloud_storage.GetRemotePathsForTesting()) |
+ |
+ def testExistsEmptyCloudStorage(self): |
+ # Test empty remote files dictionary. |
+ self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, |
+ 'preset_public_file.wpr')) |
+ self.assertFalse(self.cloud_storage.Exists( |
+ self.cloud_storage.PARTNER_BUCKET, 'preset_partner_file.wpr')) |
+ self.assertFalse(self.cloud_storage.Exists( |
+ self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr')) |
+ |
+ def testExistsNonEmptyCloudStorage(self): |
+ # Test non-empty remote files dictionary. |
+ self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) |
+ self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, |
+ 'preset_public_file.wpr')) |
+ self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PARTNER_BUCKET, |
+ 'preset_partner_file.wpr')) |
+ self.assertTrue(self.cloud_storage.Exists( |
+ self.cloud_storage.INTERNAL_BUCKET,'preset_internal_file.wpr')) |
+ self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, |
+ 'fake_file')) |
+ self.assertFalse(self.cloud_storage.Exists( |
+ self.cloud_storage.PARTNER_BUCKET,'fake_file')) |
+ self.assertFalse(self.cloud_storage.Exists( |
+ self.cloud_storage.INTERNAL_BUCKET, 'fake_file')) |
+ # Reset state. |
+ self.cloud_storage.SetRemotePathsForTesting() |
+ |
+ def testNonEmptyInsertAndExistsPublic(self): |
+ # Test non-empty remote files dictionary. |
+ self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) |
+ self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, |
+ 'success.wpr')) |
+ self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
+ '/path/to/success.wpr') |
+ self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, |
+ 'success.wpr')) |
+ # Reset state. |
+ self.cloud_storage.SetRemotePathsForTesting() |
+ |
+ def testEmptyInsertAndExistsPublic(self): |
+ # Test empty remote files dictionary. |
+ self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, |
+ 'success.wpr')) |
+ self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
+ '/path/to/success.wpr') |
+ self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, |
+ 'success.wpr')) |
+ |
+ def testEmptyInsertAndGet(self): |
+ self.assertRaises(self.cloud_storage.NotFoundError, self.cloud_storage.Get, |
+ self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
+ '/path/to/success.wpr') |
+ self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
+ '/path/to/success.wpr') |
+ self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, |
+ 'success.wpr')) |
+ self.assertEqual(CloudStorageTest.SUCCESS_FILE_HASH, |
+ self.cloud_storage.Get(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
+ '/path/to/success.wpr')) |
+ |
+ def testNonEmptyInsertAndGet(self): |
+ self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) |
+ self.assertRaises(self.cloud_storage.NotFoundError, self.cloud_storage.Get, |
+ self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
+ '/path/to/success.wpr') |
+ self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
+ '/path/to/success.wpr') |
+ self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, |
+ 'success.wpr')) |
+ self.assertEqual(CloudStorageTest.SUCCESS_FILE_HASH, |
+ self.cloud_storage.Get(self.cloud_storage.PUBLIC_BUCKET, |
+ 'success.wpr', |
+ '/path/to/success.wpr')) |
+ # Reset state. |
+ self.cloud_storage.SetRemotePathsForTesting() |
+ |
+ def testGetIfChanged(self): |
+ self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) |
+ self.assertRaises(self.cloud_storage.NotFoundError, self.cloud_storage.Get, |
+ self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
+ '/path/to/success.wpr') |
+ self.assertFalse(self.cloud_storage.GetIfChanged( |
+ self.cloud_storage.PUBLIC_BUCKET, '/path/to/preset_public_file.wpr')) |
+ self.cloud_storage.ChangeRemoteHashForTesting( |
+ self.cloud_storage.PUBLIC_BUCKET, 'preset_public_file.wpr', |
+ CloudStorageTest.UPDATED_HASH) |
+ self.assertTrue(self.cloud_storage.GetIfChanged( |
+ self.cloud_storage.PUBLIC_BUCKET, '/path/to/preset_public_file.wpr')) |
+ self.assertFalse(self.cloud_storage.GetIfChanged( |
+ self.cloud_storage.PUBLIC_BUCKET, '/path/to/preset_public_file.wpr')) |
+ # Reset state. |
+ self.cloud_storage.SetRemotePathsForTesting() |
+ |
+ def testList(self): |
+ self.assertEqual([], |
+ self.cloud_storage.List(self.cloud_storage.PUBLIC_BUCKET)) |
+ self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) |
+ self.assertEqual(['preset_public_file.wpr'], |
+ self.cloud_storage.List(self.cloud_storage.PUBLIC_BUCKET)) |
+ # Reset state. |
+ self.cloud_storage.SetRemotePathsForTesting() |
+ |
+ def testPermissionError(self): |
+ self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) |
+ self.cloud_storage.SetPermissionLevelForTesting( |
+ self.cloud_storage.PUBLIC_PERMISSION) |
+ self.assertRaises( |
+ self.cloud_storage.PermissionError, self.cloud_storage.Get, |
+ self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr', |
+ '/path/to/preset_internal_file.wpr') |
+ self.assertRaises( |
+ self.cloud_storage.PermissionError, self.cloud_storage.GetIfChanged, |
+ self.cloud_storage.INTERNAL_BUCKET, '/path/to/preset_internal_file.wpr') |
+ self.assertRaises( |
+ self.cloud_storage.PermissionError, self.cloud_storage.List, |
+ self.cloud_storage.INTERNAL_BUCKET) |
+ self.assertRaises( |
+ self.cloud_storage.PermissionError, self.cloud_storage.Exists, |
+ self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr') |
+ self.assertRaises( |
+ self.cloud_storage.PermissionError, self.cloud_storage.Insert, |
+ self.cloud_storage.INTERNAL_BUCKET, 'success.wpr', '/path/to/success.wpr') |
+ # Reset state. |
+ self.cloud_storage.SetRemotePathsForTesting() |
+ |
+ def testCredentialsError(self): |
+ self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) |
+ self.cloud_storage.SetPermissionLevelForTesting( |
+ self.cloud_storage.CREDENTIALS_ERROR_PERMISSION) |
+ self.assertRaises( |
+ self.cloud_storage.CredentialsError, self.cloud_storage.Get, |
+ self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr', |
+ '/path/to/preset_internal_file.wpr') |
+ self.assertRaises( |
+ self.cloud_storage.CredentialsError, self.cloud_storage.GetIfChanged, |
+ self.cloud_storage.INTERNAL_BUCKET, '/path/to/preset_internal_file.wpr') |
+ self.assertRaises( |
+ self.cloud_storage.CredentialsError, self.cloud_storage.List, |
+ self.cloud_storage.INTERNAL_BUCKET) |
+ self.assertRaises( |
+ self.cloud_storage.CredentialsError, self.cloud_storage.Exists, |
+ self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr') |
+ self.assertRaises( |
+ self.cloud_storage.CredentialsError, self.cloud_storage.Insert, |
+ self.cloud_storage.INTERNAL_BUCKET, 'success.wpr', |
+ '/path/to/success.wpr') |
+ # Reset state. |
+ self.cloud_storage.SetRemotePathsForTesting() |