Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(195)

Unified Diff: tools/telemetry/telemetry/unittest/system_stub_unittest.py

Issue 430473011: Fleshout cloud_storage stub implementation. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Remove presubmit unittests since presubmit is going to be deleted in hte next CL. Created 6 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « tools/telemetry/telemetry/unittest/system_stub.py ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: tools/telemetry/telemetry/unittest/system_stub_unittest.py
diff --git a/tools/telemetry/telemetry/unittest/system_stub_unittest.py b/tools/telemetry/telemetry/unittest/system_stub_unittest.py
new file mode 100644
index 0000000000000000000000000000000000000000..238f344b71711596188df957ec7ac83820fa1a75
--- /dev/null
+++ b/tools/telemetry/telemetry/unittest/system_stub_unittest.py
@@ -0,0 +1,218 @@
+# Copyright 2013 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import os
+import unittest
+
+PERF_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+from telemetry.unittest import system_stub
+
+
+class CloudStorageTest(unittest.TestCase):
+ SUCCESS_FILE_HASH = 'success'.zfill(40)
+ PUBLIC_FILE_HASH = 'public'.zfill(40)
+ PARTNER_FILE_HASH = 'partner'.zfill(40)
+ INTERNAL_FILE_HASH = 'internal'.zfill(40)
+ UPDATED_HASH = 'updated'.zfill(40)
+
+ def setUp(self):
+ self.cloud_storage = system_stub.CloudStorageModuleStub()
+
+ # Files in Cloud Storage.
+ self.remote_files = ['preset_public_file.wpr',
+ 'preset_partner_file.wpr',
+ 'preset_internal_file.wpr']
+ self.remote_paths = {
+ self.cloud_storage.PUBLIC_BUCKET:
+ {'preset_public_file.wpr':CloudStorageTest.PUBLIC_FILE_HASH},
+ self.cloud_storage.PARTNER_BUCKET:
+ {'preset_partner_file.wpr':CloudStorageTest.PARTNER_FILE_HASH},
+ self.cloud_storage.INTERNAL_BUCKET:
+ {'preset_internal_file.wpr':CloudStorageTest.INTERNAL_FILE_HASH}}
+
+ # Local data files and hashes.
+ self.data_files = ['/path/to/success.wpr',
+ '/path/to/wrong_hash.wpr',
+ '/path/to/preset_public_file.wpr',
+ '/path/to/preset_partner_file.wpr',
+ '/path/to/preset_internal_file.wpr']
+ self.local_file_hashes = {
+ '/path/to/success.wpr': CloudStorageTest.SUCCESS_FILE_HASH,
+ '/path/to/wrong_hash.wpr': CloudStorageTest.SUCCESS_FILE_HASH,
+ '/path/to/preset_public_file.wpr':CloudStorageTest.PUBLIC_FILE_HASH,
+ '/path/to/preset_partner_file.wpr':CloudStorageTest.PARTNER_FILE_HASH,
+ '/path/to/preset_internal_file.wpr':CloudStorageTest.INTERNAL_FILE_HASH,
+ }
+ self.cloud_storage.SetCalculatedHashesForTesting(self.local_file_hashes)
+ # Local hash files and their contents.
+ local_hash_files = {
+ '/path/to/success.wpr.sha1': CloudStorageTest.SUCCESS_FILE_HASH,
+ '/path/to/wrong_hash.wpr.sha1': 'wronghash'.zfill(40),
+ '/path/to/preset_public_file.wpr.sha1': CloudStorageTest.PUBLIC_FILE_HASH,
+ '/path/to/preset_partner_file.wpr.sha1':
+ CloudStorageTest.PARTNER_FILE_HASH,
+ '/path/to/preset_internal_file.wpr.sha1':
+ CloudStorageTest.INTERNAL_FILE_HASH,
+ }
+ self.cloud_storage.SetHashFileContentsForTesting(local_hash_files)
+
+ def testSetup(self):
+ self.assertEqual(self.local_file_hashes,
+ self.cloud_storage.local_file_hashes)
+ self.assertEqual(set(self.data_files),
+ set(self.cloud_storage.GetLocalDataFiles()))
+ self.assertEqual(self.cloud_storage.default_remote_paths,
+ self.cloud_storage.GetRemotePathsForTesting())
+ self.cloud_storage.SetRemotePathsForTesting(self.remote_paths)
+ self.assertEqual(self.remote_paths,
+ self.cloud_storage.GetRemotePathsForTesting())
+
+ def testExistsEmptyCloudStorage(self):
+ # Test empty remote files dictionary.
+ self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
+ 'preset_public_file.wpr'))
+ self.assertFalse(self.cloud_storage.Exists(
+ self.cloud_storage.PARTNER_BUCKET, 'preset_partner_file.wpr'))
+ self.assertFalse(self.cloud_storage.Exists(
+ self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr'))
+
+ def testExistsNonEmptyCloudStorage(self):
+ # Test non-empty remote files dictionary.
+ self.cloud_storage.SetRemotePathsForTesting(self.remote_paths)
+ self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
+ 'preset_public_file.wpr'))
+ self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PARTNER_BUCKET,
+ 'preset_partner_file.wpr'))
+ self.assertTrue(self.cloud_storage.Exists(
+ self.cloud_storage.INTERNAL_BUCKET,'preset_internal_file.wpr'))
+ self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
+ 'fake_file'))
+ self.assertFalse(self.cloud_storage.Exists(
+ self.cloud_storage.PARTNER_BUCKET,'fake_file'))
+ self.assertFalse(self.cloud_storage.Exists(
+ self.cloud_storage.INTERNAL_BUCKET, 'fake_file'))
+ # Reset state.
+ self.cloud_storage.SetRemotePathsForTesting()
+
+ def testNonEmptyInsertAndExistsPublic(self):
+ # Test non-empty remote files dictionary.
+ self.cloud_storage.SetRemotePathsForTesting(self.remote_paths)
+ self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
+ 'success.wpr'))
+ self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
+ '/path/to/success.wpr')
+ self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
+ 'success.wpr'))
+ # Reset state.
+ self.cloud_storage.SetRemotePathsForTesting()
+
+ def testEmptyInsertAndExistsPublic(self):
+ # Test empty remote files dictionary.
+ self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
+ 'success.wpr'))
+ self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
+ '/path/to/success.wpr')
+ self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
+ 'success.wpr'))
+
+ def testEmptyInsertAndGet(self):
+ self.assertRaises(self.cloud_storage.NotFoundError, self.cloud_storage.Get,
+ self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
+ '/path/to/success.wpr')
+ self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
+ '/path/to/success.wpr')
+ self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
+ 'success.wpr'))
+ self.assertEqual(CloudStorageTest.SUCCESS_FILE_HASH,
+ self.cloud_storage.Get(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
+ '/path/to/success.wpr'))
+
+ def testNonEmptyInsertAndGet(self):
+ self.cloud_storage.SetRemotePathsForTesting(self.remote_paths)
+ self.assertRaises(self.cloud_storage.NotFoundError, self.cloud_storage.Get,
+ self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
+ '/path/to/success.wpr')
+ self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
+ '/path/to/success.wpr')
+ self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
+ 'success.wpr'))
+ self.assertEqual(CloudStorageTest.SUCCESS_FILE_HASH,
+ self.cloud_storage.Get(self.cloud_storage.PUBLIC_BUCKET,
+ 'success.wpr',
+ '/path/to/success.wpr'))
+ # Reset state.
+ self.cloud_storage.SetRemotePathsForTesting()
+
+ def testGetIfChanged(self):
+ self.cloud_storage.SetRemotePathsForTesting(self.remote_paths)
+ self.assertRaises(self.cloud_storage.NotFoundError, self.cloud_storage.Get,
+ self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
+ '/path/to/success.wpr')
+ self.assertFalse(self.cloud_storage.GetIfChanged(
+ self.cloud_storage.PUBLIC_BUCKET, '/path/to/preset_public_file.wpr'))
+ self.cloud_storage.ChangeRemoteHashForTesting(
+ self.cloud_storage.PUBLIC_BUCKET, 'preset_public_file.wpr',
+ CloudStorageTest.UPDATED_HASH)
+ self.assertTrue(self.cloud_storage.GetIfChanged(
+ self.cloud_storage.PUBLIC_BUCKET, '/path/to/preset_public_file.wpr'))
+ self.assertFalse(self.cloud_storage.GetIfChanged(
+ self.cloud_storage.PUBLIC_BUCKET, '/path/to/preset_public_file.wpr'))
+ # Reset state.
+ self.cloud_storage.SetRemotePathsForTesting()
+
+ def testList(self):
+ self.assertEqual([],
+ self.cloud_storage.List(self.cloud_storage.PUBLIC_BUCKET))
+ self.cloud_storage.SetRemotePathsForTesting(self.remote_paths)
+ self.assertEqual(['preset_public_file.wpr'],
+ self.cloud_storage.List(self.cloud_storage.PUBLIC_BUCKET))
+ # Reset state.
+ self.cloud_storage.SetRemotePathsForTesting()
+
+ def testPermissionError(self):
+ self.cloud_storage.SetRemotePathsForTesting(self.remote_paths)
+ self.cloud_storage.SetPermissionLevelForTesting(
+ self.cloud_storage.PUBLIC_PERMISSION)
+ self.assertRaises(
+ self.cloud_storage.PermissionError, self.cloud_storage.Get,
+ self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr',
+ '/path/to/preset_internal_file.wpr')
+ self.assertRaises(
+ self.cloud_storage.PermissionError, self.cloud_storage.GetIfChanged,
+ self.cloud_storage.INTERNAL_BUCKET, '/path/to/preset_internal_file.wpr')
+ self.assertRaises(
+ self.cloud_storage.PermissionError, self.cloud_storage.List,
+ self.cloud_storage.INTERNAL_BUCKET)
+ self.assertRaises(
+ self.cloud_storage.PermissionError, self.cloud_storage.Exists,
+ self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr')
+ self.assertRaises(
+ self.cloud_storage.PermissionError, self.cloud_storage.Insert,
+ self.cloud_storage.INTERNAL_BUCKET, 'success.wpr', '/path/to/success.wpr')
+ # Reset state.
+ self.cloud_storage.SetRemotePathsForTesting()
+
+ def testCredentialsError(self):
+ self.cloud_storage.SetRemotePathsForTesting(self.remote_paths)
+ self.cloud_storage.SetPermissionLevelForTesting(
+ self.cloud_storage.CREDENTIALS_ERROR_PERMISSION)
+ self.assertRaises(
+ self.cloud_storage.CredentialsError, self.cloud_storage.Get,
+ self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr',
+ '/path/to/preset_internal_file.wpr')
+ self.assertRaises(
+ self.cloud_storage.CredentialsError, self.cloud_storage.GetIfChanged,
+ self.cloud_storage.INTERNAL_BUCKET, '/path/to/preset_internal_file.wpr')
+ self.assertRaises(
+ self.cloud_storage.CredentialsError, self.cloud_storage.List,
+ self.cloud_storage.INTERNAL_BUCKET)
+ self.assertRaises(
+ self.cloud_storage.CredentialsError, self.cloud_storage.Exists,
+ self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr')
+ self.assertRaises(
+ self.cloud_storage.CredentialsError, self.cloud_storage.Insert,
+ self.cloud_storage.INTERNAL_BUCKET, 'success.wpr',
+ '/path/to/success.wpr')
+ # Reset state.
+ self.cloud_storage.SetRemotePathsForTesting()
« no previous file with comments | « tools/telemetry/telemetry/unittest/system_stub.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698