OLD | NEW |
(Empty) | |
| 1 # Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. |
| 4 |
| 5 import os |
| 6 import unittest |
| 7 |
| 8 PERF_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| 9 from telemetry.unittest import system_stub |
| 10 |
| 11 |
| 12 class CloudStorageTest(unittest.TestCase): |
| 13 SUCCESS_FILE_HASH = 'success'.zfill(40) |
| 14 PUBLIC_FILE_HASH = 'public'.zfill(40) |
| 15 PARTNER_FILE_HASH = 'partner'.zfill(40) |
| 16 INTERNAL_FILE_HASH = 'internal'.zfill(40) |
| 17 UPDATED_HASH = 'updated'.zfill(40) |
| 18 |
| 19 def setUp(self): |
| 20 self.cloud_storage = system_stub.CloudStorageModuleStub() |
| 21 |
| 22 # Files in Cloud Storage. |
| 23 self.remote_files = ['preset_public_file.wpr', |
| 24 'preset_partner_file.wpr', |
| 25 'preset_internal_file.wpr'] |
| 26 self.remote_paths = { |
| 27 self.cloud_storage.PUBLIC_BUCKET: |
| 28 {'preset_public_file.wpr':CloudStorageTest.PUBLIC_FILE_HASH}, |
| 29 self.cloud_storage.PARTNER_BUCKET: |
| 30 {'preset_partner_file.wpr':CloudStorageTest.PARTNER_FILE_HASH}, |
| 31 self.cloud_storage.INTERNAL_BUCKET: |
| 32 {'preset_internal_file.wpr':CloudStorageTest.INTERNAL_FILE_HASH}} |
| 33 |
| 34 # Local data files and hashes. |
| 35 self.data_files = ['/path/to/success.wpr', |
| 36 '/path/to/wrong_hash.wpr', |
| 37 '/path/to/preset_public_file.wpr', |
| 38 '/path/to/preset_partner_file.wpr', |
| 39 '/path/to/preset_internal_file.wpr'] |
| 40 self.local_file_hashes = { |
| 41 '/path/to/success.wpr': CloudStorageTest.SUCCESS_FILE_HASH, |
| 42 '/path/to/wrong_hash.wpr': CloudStorageTest.SUCCESS_FILE_HASH, |
| 43 '/path/to/preset_public_file.wpr':CloudStorageTest.PUBLIC_FILE_HASH, |
| 44 '/path/to/preset_partner_file.wpr':CloudStorageTest.PARTNER_FILE_HASH, |
| 45 '/path/to/preset_internal_file.wpr':CloudStorageTest.INTERNAL_FILE_HASH, |
| 46 } |
| 47 self.cloud_storage.SetCalculatedHashesForTesting(self.local_file_hashes) |
| 48 # Local hash files and their contents. |
| 49 local_hash_files = { |
| 50 '/path/to/success.wpr.sha1': CloudStorageTest.SUCCESS_FILE_HASH, |
| 51 '/path/to/wrong_hash.wpr.sha1': 'wronghash'.zfill(40), |
| 52 '/path/to/preset_public_file.wpr.sha1': CloudStorageTest.PUBLIC_FILE_HASH, |
| 53 '/path/to/preset_partner_file.wpr.sha1': |
| 54 CloudStorageTest.PARTNER_FILE_HASH, |
| 55 '/path/to/preset_internal_file.wpr.sha1': |
| 56 CloudStorageTest.INTERNAL_FILE_HASH, |
| 57 } |
| 58 self.cloud_storage.SetHashFileContentsForTesting(local_hash_files) |
| 59 |
| 60 def testSetup(self): |
| 61 self.assertEqual(self.local_file_hashes, |
| 62 self.cloud_storage.local_file_hashes) |
| 63 self.assertEqual(set(self.data_files), |
| 64 set(self.cloud_storage.GetLocalDataFiles())) |
| 65 self.assertEqual(self.cloud_storage.default_remote_paths, |
| 66 self.cloud_storage.GetRemotePathsForTesting()) |
| 67 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) |
| 68 self.assertEqual(self.remote_paths, |
| 69 self.cloud_storage.GetRemotePathsForTesting()) |
| 70 |
| 71 def testExistsEmptyCloudStorage(self): |
| 72 # Test empty remote files dictionary. |
| 73 self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, |
| 74 'preset_public_file.wpr')) |
| 75 self.assertFalse(self.cloud_storage.Exists( |
| 76 self.cloud_storage.PARTNER_BUCKET, 'preset_partner_file.wpr')) |
| 77 self.assertFalse(self.cloud_storage.Exists( |
| 78 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr')) |
| 79 |
| 80 def testExistsNonEmptyCloudStorage(self): |
| 81 # Test non-empty remote files dictionary. |
| 82 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) |
| 83 self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, |
| 84 'preset_public_file.wpr')) |
| 85 self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PARTNER_BUCKET, |
| 86 'preset_partner_file.wpr')) |
| 87 self.assertTrue(self.cloud_storage.Exists( |
| 88 self.cloud_storage.INTERNAL_BUCKET,'preset_internal_file.wpr')) |
| 89 self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, |
| 90 'fake_file')) |
| 91 self.assertFalse(self.cloud_storage.Exists( |
| 92 self.cloud_storage.PARTNER_BUCKET,'fake_file')) |
| 93 self.assertFalse(self.cloud_storage.Exists( |
| 94 self.cloud_storage.INTERNAL_BUCKET, 'fake_file')) |
| 95 # Reset state. |
| 96 self.cloud_storage.SetRemotePathsForTesting() |
| 97 |
| 98 def testNonEmptyInsertAndExistsPublic(self): |
| 99 # Test non-empty remote files dictionary. |
| 100 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) |
| 101 self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, |
| 102 'success.wpr')) |
| 103 self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
| 104 '/path/to/success.wpr') |
| 105 self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, |
| 106 'success.wpr')) |
| 107 # Reset state. |
| 108 self.cloud_storage.SetRemotePathsForTesting() |
| 109 |
| 110 def testEmptyInsertAndExistsPublic(self): |
| 111 # Test empty remote files dictionary. |
| 112 self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, |
| 113 'success.wpr')) |
| 114 self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
| 115 '/path/to/success.wpr') |
| 116 self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, |
| 117 'success.wpr')) |
| 118 |
| 119 def testEmptyInsertAndGet(self): |
| 120 self.assertRaises(self.cloud_storage.NotFoundError, self.cloud_storage.Get, |
| 121 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
| 122 '/path/to/success.wpr') |
| 123 self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
| 124 '/path/to/success.wpr') |
| 125 self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, |
| 126 'success.wpr')) |
| 127 self.assertEqual(CloudStorageTest.SUCCESS_FILE_HASH, |
| 128 self.cloud_storage.Get(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
| 129 '/path/to/success.wpr')) |
| 130 |
| 131 def testNonEmptyInsertAndGet(self): |
| 132 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) |
| 133 self.assertRaises(self.cloud_storage.NotFoundError, self.cloud_storage.Get, |
| 134 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
| 135 '/path/to/success.wpr') |
| 136 self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
| 137 '/path/to/success.wpr') |
| 138 self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, |
| 139 'success.wpr')) |
| 140 self.assertEqual(CloudStorageTest.SUCCESS_FILE_HASH, |
| 141 self.cloud_storage.Get(self.cloud_storage.PUBLIC_BUCKET, |
| 142 'success.wpr', |
| 143 '/path/to/success.wpr')) |
| 144 # Reset state. |
| 145 self.cloud_storage.SetRemotePathsForTesting() |
| 146 |
| 147 def testGetIfChanged(self): |
| 148 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) |
| 149 self.assertRaises(self.cloud_storage.NotFoundError, self.cloud_storage.Get, |
| 150 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
| 151 '/path/to/success.wpr') |
| 152 self.assertFalse(self.cloud_storage.GetIfChanged( |
| 153 self.cloud_storage.PUBLIC_BUCKET, '/path/to/preset_public_file.wpr')) |
| 154 self.cloud_storage.ChangeRemoteHashForTesting( |
| 155 self.cloud_storage.PUBLIC_BUCKET, 'preset_public_file.wpr', |
| 156 CloudStorageTest.UPDATED_HASH) |
| 157 self.assertTrue(self.cloud_storage.GetIfChanged( |
| 158 self.cloud_storage.PUBLIC_BUCKET, '/path/to/preset_public_file.wpr')) |
| 159 self.assertFalse(self.cloud_storage.GetIfChanged( |
| 160 self.cloud_storage.PUBLIC_BUCKET, '/path/to/preset_public_file.wpr')) |
| 161 # Reset state. |
| 162 self.cloud_storage.SetRemotePathsForTesting() |
| 163 |
| 164 def testList(self): |
| 165 self.assertEqual([], |
| 166 self.cloud_storage.List(self.cloud_storage.PUBLIC_BUCKET)) |
| 167 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) |
| 168 self.assertEqual(['preset_public_file.wpr'], |
| 169 self.cloud_storage.List(self.cloud_storage.PUBLIC_BUCKET)) |
| 170 # Reset state. |
| 171 self.cloud_storage.SetRemotePathsForTesting() |
| 172 |
| 173 def testPermissionError(self): |
| 174 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) |
| 175 self.cloud_storage.SetPermissionLevelForTesting( |
| 176 self.cloud_storage.PUBLIC_PERMISSION) |
| 177 self.assertRaises( |
| 178 self.cloud_storage.PermissionError, self.cloud_storage.Get, |
| 179 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr', |
| 180 '/path/to/preset_internal_file.wpr') |
| 181 self.assertRaises( |
| 182 self.cloud_storage.PermissionError, self.cloud_storage.GetIfChanged, |
| 183 self.cloud_storage.INTERNAL_BUCKET, '/path/to/preset_internal_file.wpr') |
| 184 self.assertRaises( |
| 185 self.cloud_storage.PermissionError, self.cloud_storage.List, |
| 186 self.cloud_storage.INTERNAL_BUCKET) |
| 187 self.assertRaises( |
| 188 self.cloud_storage.PermissionError, self.cloud_storage.Exists, |
| 189 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr') |
| 190 self.assertRaises( |
| 191 self.cloud_storage.PermissionError, self.cloud_storage.Insert, |
| 192 self.cloud_storage.INTERNAL_BUCKET, 'success.wpr', '/path/to/success.wpr') |
| 193 # Reset state. |
| 194 self.cloud_storage.SetRemotePathsForTesting() |
| 195 |
| 196 def testCredentialsError(self): |
| 197 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) |
| 198 self.cloud_storage.SetPermissionLevelForTesting( |
| 199 self.cloud_storage.CREDENTIALS_ERROR_PERMISSION) |
| 200 self.assertRaises( |
| 201 self.cloud_storage.CredentialsError, self.cloud_storage.Get, |
| 202 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr', |
| 203 '/path/to/preset_internal_file.wpr') |
| 204 self.assertRaises( |
| 205 self.cloud_storage.CredentialsError, self.cloud_storage.GetIfChanged, |
| 206 self.cloud_storage.INTERNAL_BUCKET, '/path/to/preset_internal_file.wpr') |
| 207 self.assertRaises( |
| 208 self.cloud_storage.CredentialsError, self.cloud_storage.List, |
| 209 self.cloud_storage.INTERNAL_BUCKET) |
| 210 self.assertRaises( |
| 211 self.cloud_storage.CredentialsError, self.cloud_storage.Exists, |
| 212 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr') |
| 213 self.assertRaises( |
| 214 self.cloud_storage.CredentialsError, self.cloud_storage.Insert, |
| 215 self.cloud_storage.INTERNAL_BUCKET, 'success.wpr', |
| 216 '/path/to/success.wpr') |
| 217 # Reset state. |
| 218 self.cloud_storage.SetRemotePathsForTesting() |
OLD | NEW |