| OLD | NEW |
| 1 # Copyright 2013 The Chromium Authors. All rights reserved. | 1 # Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 import os | 5 import os |
| 6 import unittest | 6 import unittest |
| 7 | 7 |
| 8 PERF_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | 8 PERF_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| 9 from telemetry.unittest_util import system_stub | 9 from telemetry.unittest_util import system_stub |
| 10 from unittest_data import system_stub_test_module | 10 from unittest_data import system_stub_test_module |
| (...skipping 14 matching lines...) Expand all Loading... |
| 25 'preset_internal_file.wpr'] | 25 'preset_internal_file.wpr'] |
| 26 self.remote_paths = { | 26 self.remote_paths = { |
| 27 self.cloud_storage.PUBLIC_BUCKET: | 27 self.cloud_storage.PUBLIC_BUCKET: |
| 28 {'preset_public_file.wpr':CloudStorageTest.PUBLIC_FILE_HASH}, | 28 {'preset_public_file.wpr':CloudStorageTest.PUBLIC_FILE_HASH}, |
| 29 self.cloud_storage.PARTNER_BUCKET: | 29 self.cloud_storage.PARTNER_BUCKET: |
| 30 {'preset_partner_file.wpr':CloudStorageTest.PARTNER_FILE_HASH}, | 30 {'preset_partner_file.wpr':CloudStorageTest.PARTNER_FILE_HASH}, |
| 31 self.cloud_storage.INTERNAL_BUCKET: | 31 self.cloud_storage.INTERNAL_BUCKET: |
| 32 {'preset_internal_file.wpr':CloudStorageTest.INTERNAL_FILE_HASH}} | 32 {'preset_internal_file.wpr':CloudStorageTest.INTERNAL_FILE_HASH}} |
| 33 | 33 |
| 34 # Local data files and hashes. | 34 # Local data files and hashes. |
| 35 self.data_files = ['/path/to/success.wpr', | 35 self.data_files = [ |
| 36 '/path/to/wrong_hash.wpr', | 36 os.path.join(os.path.sep, 'path', 'to', 'success.wpr'), |
| 37 '/path/to/preset_public_file.wpr', | 37 os.path.join(os.path.sep, 'path', 'to', 'wrong_hash.wpr'), |
| 38 '/path/to/preset_partner_file.wpr', | 38 os.path.join(os.path.sep, 'path', 'to', 'preset_public_file.wpr'), |
| 39 '/path/to/preset_internal_file.wpr'] | 39 os.path.join(os.path.sep, 'path', 'to', 'preset_partner_file.wpr'), |
| 40 os.path.join(os.path.sep, 'path', 'to', 'preset_internal_file.wpr')] |
| 40 self.local_file_hashes = { | 41 self.local_file_hashes = { |
| 41 '/path/to/success.wpr': CloudStorageTest.SUCCESS_FILE_HASH, | 42 os.path.join(os.path.sep, 'path', 'to', 'success.wpr'): |
| 42 '/path/to/wrong_hash.wpr': CloudStorageTest.SUCCESS_FILE_HASH, | 43 CloudStorageTest.SUCCESS_FILE_HASH, |
| 43 '/path/to/preset_public_file.wpr':CloudStorageTest.PUBLIC_FILE_HASH, | 44 os.path.join(os.path.sep, 'path', 'to', 'wrong_hash.wpr'): |
| 44 '/path/to/preset_partner_file.wpr':CloudStorageTest.PARTNER_FILE_HASH, | 45 CloudStorageTest.SUCCESS_FILE_HASH, |
| 45 '/path/to/preset_internal_file.wpr':CloudStorageTest.INTERNAL_FILE_HASH, | 46 os.path.join(os.path.sep, 'path', 'to', 'preset_public_file.wpr'): |
| 47 CloudStorageTest.PUBLIC_FILE_HASH, |
| 48 os.path.join(os.path.sep, 'path', 'to', 'preset_partner_file.wpr'): |
| 49 CloudStorageTest.PARTNER_FILE_HASH, |
| 50 os.path.join(os.path.sep, 'path', 'to', 'preset_internal_file.wpr'): |
| 51 CloudStorageTest.INTERNAL_FILE_HASH, |
| 46 } | 52 } |
| 47 self.cloud_storage.SetCalculatedHashesForTesting(self.local_file_hashes) | 53 self.cloud_storage.SetCalculatedHashesForTesting(self.local_file_hashes) |
| 48 # Local hash files and their contents. | 54 # Local hash files and their contents. |
| 49 local_hash_files = { | 55 local_hash_files = { |
| 50 '/path/to/success.wpr.sha1': CloudStorageTest.SUCCESS_FILE_HASH, | 56 os.path.join(os.path.sep, 'path', 'to', 'success.wpr.sha1'): |
| 51 '/path/to/wrong_hash.wpr.sha1': 'wronghash'.zfill(40), | 57 CloudStorageTest.SUCCESS_FILE_HASH, |
| 52 '/path/to/preset_public_file.wpr.sha1': CloudStorageTest.PUBLIC_FILE_HASH, | 58 os.path.join(os.path.sep, 'path', 'to', 'wrong_hash.wpr.sha1'): |
| 53 '/path/to/preset_partner_file.wpr.sha1': | 59 'wronghash'.zfill(40), |
| 54 CloudStorageTest.PARTNER_FILE_HASH, | 60 os.path.join(os.path.sep, 'path', 'to', 'preset_public_file.wpr.sha1'): |
| 55 '/path/to/preset_internal_file.wpr.sha1': | 61 CloudStorageTest.PUBLIC_FILE_HASH, |
| 56 CloudStorageTest.INTERNAL_FILE_HASH, | 62 os.path.join(os.path.sep, 'path', 'to', 'preset_partner_file.wpr.sha1'): |
| 63 CloudStorageTest.PARTNER_FILE_HASH, |
| 64 os.path.join(os.path.sep, 'path', 'to', |
| 65 'preset_internal_file.wpr.sha1'): |
| 66 CloudStorageTest.INTERNAL_FILE_HASH, |
| 57 } | 67 } |
| 58 self.cloud_storage.SetHashFileContentsForTesting(local_hash_files) | 68 self.cloud_storage.SetHashFileContentsForTesting(local_hash_files) |
| 59 | 69 |
| 60 def testSetup(self): | 70 def testSetup(self): |
| 61 self.assertEqual(self.local_file_hashes, | 71 self.assertEqual(self.local_file_hashes, |
| 62 self.cloud_storage.local_file_hashes) | 72 self.cloud_storage.local_file_hashes) |
| 63 self.assertEqual(set(self.data_files), | 73 self.assertEqual(set(self.data_files), |
| 64 set(self.cloud_storage.GetLocalDataFiles())) | 74 set(self.cloud_storage.GetLocalDataFiles())) |
| 65 self.assertEqual(self.cloud_storage.default_remote_paths, | 75 self.assertEqual(self.cloud_storage.default_remote_paths, |
| 66 self.cloud_storage.GetRemotePathsForTesting()) | 76 self.cloud_storage.GetRemotePathsForTesting()) |
| 67 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) | 77 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) |
| 68 self.assertEqual(self.remote_paths, | 78 self.assertEqual(self.remote_paths, |
| 69 self.cloud_storage.GetRemotePathsForTesting()) | 79 self.cloud_storage.GetRemotePathsForTesting()) |
| 70 | 80 |
| 71 def testExistsEmptyCloudStorage(self): | 81 def testExistsEmptyCloudStorage(self): |
| 72 # Test empty remote files dictionary. | 82 # Test empty remote files dictionary. |
| 73 self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, | 83 self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, |
| 74 'preset_public_file.wpr')) | 84 'preset_public_file.wpr')) |
| 75 self.assertFalse(self.cloud_storage.Exists( | 85 self.assertFalse(self.cloud_storage.Exists( |
| 76 self.cloud_storage.PARTNER_BUCKET, 'preset_partner_file.wpr')) | 86 self.cloud_storage.PARTNER_BUCKET, 'preset_partner_file.wpr')) |
| 77 self.assertFalse(self.cloud_storage.Exists( | 87 self.assertFalse(self.cloud_storage.Exists( |
| 78 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr')) | 88 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr')) |
| 79 | 89 |
| 80 def testExistsNonEmptyCloudStorage(self): | 90 def testExistsNonEmptyCloudStorage(self): |
| 81 # Test non-empty remote files dictionary. | 91 # Test non-empty remote files dictionary. |
| 82 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) | 92 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) |
| 83 self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, | |
| 84 'preset_public_file.wpr')) | |
| 85 self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PARTNER_BUCKET, | |
| 86 'preset_partner_file.wpr')) | |
| 87 self.assertTrue(self.cloud_storage.Exists( | 93 self.assertTrue(self.cloud_storage.Exists( |
| 88 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr')) | 94 self.cloud_storage.PUBLIC_BUCKET, 'preset_public_file.wpr')) |
| 89 self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, | 95 self.assertTrue(self.cloud_storage.Exists( |
| 90 'fake_file')) | 96 self.cloud_storage.PARTNER_BUCKET, 'preset_partner_file.wpr')) |
| 97 self.assertTrue(self.cloud_storage.Exists( |
| 98 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr')) |
| 91 self.assertFalse(self.cloud_storage.Exists( | 99 self.assertFalse(self.cloud_storage.Exists( |
| 92 self.cloud_storage.PARTNER_BUCKET, 'fake_file')) | 100 self.cloud_storage.PUBLIC_BUCKET, 'fake_file')) |
| 93 self.assertFalse(self.cloud_storage.Exists( | 101 self.assertFalse(self.cloud_storage.Exists( |
| 94 self.cloud_storage.INTERNAL_BUCKET, 'fake_file')) | 102 self.cloud_storage.PARTNER_BUCKET, 'fake_file')) |
| 103 self.assertFalse(self.cloud_storage.Exists( |
| 104 self.cloud_storage.INTERNAL_BUCKET, 'fake_file')) |
| 95 # Reset state. | 105 # Reset state. |
| 96 self.cloud_storage.SetRemotePathsForTesting() | 106 self.cloud_storage.SetRemotePathsForTesting() |
| 97 | 107 |
| 98 def testNonEmptyInsertAndExistsPublic(self): | 108 def testNonEmptyInsertAndExistsPublic(self): |
| 99 # Test non-empty remote files dictionary. | 109 # Test non-empty remote files dictionary. |
| 100 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) | 110 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) |
| 101 self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, | 111 self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, |
| 102 'success.wpr')) | 112 'success.wpr')) |
| 103 self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', | 113 self.cloud_storage.Insert( |
| 104 '/path/to/success.wpr') | 114 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
| 105 self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, | 115 os.path.join(os.path.sep, 'path', 'to', 'success.wpr')) |
| 106 'success.wpr')) | 116 self.assertTrue(self.cloud_storage.Exists( |
| 117 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr')) |
| 107 # Reset state. | 118 # Reset state. |
| 108 self.cloud_storage.SetRemotePathsForTesting() | 119 self.cloud_storage.SetRemotePathsForTesting() |
| 109 | 120 |
| 110 def testEmptyInsertAndExistsPublic(self): | 121 def testEmptyInsertAndExistsPublic(self): |
| 111 # Test empty remote files dictionary. | 122 # Test empty remote files dictionary. |
| 112 self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, | 123 self.assertFalse(self.cloud_storage.Exists( |
| 113 'success.wpr')) | 124 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr')) |
| 114 self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', | 125 self.cloud_storage.Insert( |
| 115 '/path/to/success.wpr') | 126 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
| 116 self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, | 127 os.path.join(os.path.sep, 'path', 'to', 'success.wpr')) |
| 117 'success.wpr')) | 128 self.assertTrue(self.cloud_storage.Exists( |
| 129 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr')) |
| 118 | 130 |
| 119 def testEmptyInsertAndGet(self): | 131 def testEmptyInsertAndGet(self): |
| 120 self.assertRaises(self.cloud_storage.NotFoundError, self.cloud_storage.Get, | 132 self.assertRaises(self.cloud_storage.NotFoundError, self.cloud_storage.Get, |
| 121 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', | 133 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
| 122 '/path/to/success.wpr') | 134 os.path.join(os.path.sep, 'path', 'to', 'success.wpr')) |
| 123 self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', | 135 self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
| 124 '/path/to/success.wpr') | 136 os.path.join(os.path.sep, 'path', 'to', |
| 125 self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, | 137 'success.wpr')) |
| 126 'success.wpr')) | 138 self.assertTrue(self.cloud_storage.Exists( |
| 127 self.assertEqual(CloudStorageTest.SUCCESS_FILE_HASH, | 139 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr')) |
| 128 self.cloud_storage.Get(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', | 140 self.assertEqual(CloudStorageTest.SUCCESS_FILE_HASH, self.cloud_storage.Get( |
| 129 '/path/to/success.wpr')) | 141 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
| 142 os.path.join(os.path.sep, 'path', 'to', 'success.wpr'))) |
| 130 | 143 |
| 131 def testNonEmptyInsertAndGet(self): | 144 def testNonEmptyInsertAndGet(self): |
| 132 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) | 145 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) |
| 133 self.assertRaises(self.cloud_storage.NotFoundError, self.cloud_storage.Get, | 146 self.assertRaises(self.cloud_storage.NotFoundError, self.cloud_storage.Get, |
| 134 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', | 147 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
| 135 '/path/to/success.wpr') | 148 os.path.join(os.path.sep, 'path', 'to', 'success.wpr')) |
| 136 self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', | 149 self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
| 137 '/path/to/success.wpr') | 150 os.path.join(os.path.sep, 'path', 'to', |
| 151 'success.wpr')) |
| 138 self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, | 152 self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, |
| 139 'success.wpr')) | 153 'success.wpr')) |
| 140 self.assertEqual(CloudStorageTest.SUCCESS_FILE_HASH, | 154 self.assertEqual( |
| 141 self.cloud_storage.Get(self.cloud_storage.PUBLIC_BUCKET, | 155 CloudStorageTest.SUCCESS_FILE_HASH, self.cloud_storage.Get( |
| 142 'success.wpr', | 156 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
| 143 '/path/to/success.wpr')) | 157 os.path.join(os.path.sep, 'path', 'to', 'success.wpr'))) |
| 144 # Reset state. | 158 # Reset state. |
| 145 self.cloud_storage.SetRemotePathsForTesting() | 159 self.cloud_storage.SetRemotePathsForTesting() |
| 146 | 160 |
| 147 def testGetIfChanged(self): | 161 def testGetIfChanged(self): |
| 148 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) | 162 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) |
| 149 self.assertRaises(self.cloud_storage.NotFoundError, self.cloud_storage.Get, | 163 self.assertRaises( |
| 150 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', | 164 self.cloud_storage.NotFoundError, self.cloud_storage.Get, |
| 151 '/path/to/success.wpr') | 165 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
| 166 os.path.join(os.path.sep, 'path', 'to', 'success.wpr')) |
| 152 self.assertFalse(self.cloud_storage.GetIfChanged( | 167 self.assertFalse(self.cloud_storage.GetIfChanged( |
| 153 '/path/to/preset_public_file.wpr', self.cloud_storage.PUBLIC_BUCKET)) | 168 os.path.join(os.path.sep, 'path', 'to', 'preset_public_file.wpr'), |
| 169 self.cloud_storage.PUBLIC_BUCKET)) |
| 154 self.cloud_storage.ChangeRemoteHashForTesting( | 170 self.cloud_storage.ChangeRemoteHashForTesting( |
| 155 self.cloud_storage.PUBLIC_BUCKET, 'preset_public_file.wpr', | 171 self.cloud_storage.PUBLIC_BUCKET, 'preset_public_file.wpr', |
| 156 CloudStorageTest.UPDATED_HASH) | 172 CloudStorageTest.UPDATED_HASH) |
| 157 self.assertTrue(self.cloud_storage.GetIfChanged( | 173 self.assertTrue(self.cloud_storage.GetIfChanged( |
| 158 '/path/to/preset_public_file.wpr', self.cloud_storage.PUBLIC_BUCKET)) | 174 os.path.join(os.path.sep, 'path', 'to', 'preset_public_file.wpr'), |
| 175 self.cloud_storage.PUBLIC_BUCKET)) |
| 159 self.assertFalse(self.cloud_storage.GetIfChanged( | 176 self.assertFalse(self.cloud_storage.GetIfChanged( |
| 160 '/path/to/preset_public_file.wpr', self.cloud_storage.PUBLIC_BUCKET)) | 177 os.path.join(os.path.sep, 'path', 'to', 'preset_public_file.wpr'), |
| 178 self.cloud_storage.PUBLIC_BUCKET)) |
| 161 # Reset state. | 179 # Reset state. |
| 162 self.cloud_storage.SetRemotePathsForTesting() | 180 self.cloud_storage.SetRemotePathsForTesting() |
| 163 | 181 |
| 164 def testList(self): | 182 def testList(self): |
| 165 self.assertEqual([], | 183 self.assertEqual([], |
| 166 self.cloud_storage.List(self.cloud_storage.PUBLIC_BUCKET)) | 184 self.cloud_storage.List(self.cloud_storage.PUBLIC_BUCKET)) |
| 167 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) | 185 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) |
| 168 self.assertEqual(['preset_public_file.wpr'], | 186 self.assertEqual(['preset_public_file.wpr'], |
| 169 self.cloud_storage.List(self.cloud_storage.PUBLIC_BUCKET)) | 187 self.cloud_storage.List(self.cloud_storage.PUBLIC_BUCKET)) |
| 170 # Reset state. | 188 # Reset state. |
| 171 self.cloud_storage.SetRemotePathsForTesting() | 189 self.cloud_storage.SetRemotePathsForTesting() |
| 172 | 190 |
| 173 def testPermissionError(self): | 191 def testPermissionError(self): |
| 174 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) | 192 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) |
| 175 self.cloud_storage.SetPermissionLevelForTesting( | 193 self.cloud_storage.SetPermissionLevelForTesting( |
| 176 self.cloud_storage.PUBLIC_PERMISSION) | 194 self.cloud_storage.PUBLIC_PERMISSION) |
| 177 self.assertRaises( | 195 self.assertRaises( |
| 178 self.cloud_storage.PermissionError, self.cloud_storage.Get, | 196 self.cloud_storage.PermissionError, self.cloud_storage.Get, |
| 179 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr', | 197 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr', |
| 180 '/path/to/preset_internal_file.wpr') | 198 os.path.join(os.path.sep, 'path', 'to', 'preset_internal_file.wpr')) |
| 181 self.assertRaises( | 199 self.assertRaises( |
| 182 self.cloud_storage.PermissionError, self.cloud_storage.GetIfChanged, | 200 self.cloud_storage.PermissionError, self.cloud_storage.GetIfChanged, |
| 183 '/path/to/preset_internal_file.wpr', self.cloud_storage.INTERNAL_BUCKET) | 201 os.path.join(os.path.sep, 'path', 'to', 'preset_internal_file.wpr'), |
| 202 self.cloud_storage.INTERNAL_BUCKET) |
| 184 self.assertRaises( | 203 self.assertRaises( |
| 185 self.cloud_storage.PermissionError, self.cloud_storage.List, | 204 self.cloud_storage.PermissionError, self.cloud_storage.List, |
| 186 self.cloud_storage.INTERNAL_BUCKET) | 205 self.cloud_storage.INTERNAL_BUCKET) |
| 187 self.assertRaises( | 206 self.assertRaises( |
| 188 self.cloud_storage.PermissionError, self.cloud_storage.Exists, | 207 self.cloud_storage.PermissionError, self.cloud_storage.Exists, |
| 189 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr') | 208 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr') |
| 190 self.assertRaises( | 209 self.assertRaises( |
| 191 self.cloud_storage.PermissionError, self.cloud_storage.Insert, | 210 self.cloud_storage.PermissionError, self.cloud_storage.Insert, |
| 192 self.cloud_storage.INTERNAL_BUCKET, 'success.wpr', '/path/to/success.wpr') | 211 self.cloud_storage.INTERNAL_BUCKET, 'success.wpr', |
| 212 os.path.join(os.path.sep, 'path', 'to', 'success.wpr')) |
| 193 # Reset state. | 213 # Reset state. |
| 194 self.cloud_storage.SetRemotePathsForTesting() | 214 self.cloud_storage.SetRemotePathsForTesting() |
| 195 | 215 |
| 196 def testCredentialsError(self): | 216 def testCredentialsError(self): |
| 197 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) | 217 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) |
| 198 self.cloud_storage.SetPermissionLevelForTesting( | 218 self.cloud_storage.SetPermissionLevelForTesting( |
| 199 self.cloud_storage.CREDENTIALS_ERROR_PERMISSION) | 219 self.cloud_storage.CREDENTIALS_ERROR_PERMISSION) |
| 200 self.assertRaises( | 220 self.assertRaises( |
| 201 self.cloud_storage.CredentialsError, self.cloud_storage.Get, | 221 self.cloud_storage.CredentialsError, self.cloud_storage.Get, |
| 202 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr', | 222 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr', |
| 203 '/path/to/preset_internal_file.wpr') | 223 os.path.join(os.path.sep, 'path', 'to', 'preset_internal_file.wpr')) |
| 204 self.assertRaises( | 224 self.assertRaises( |
| 205 self.cloud_storage.CredentialsError, self.cloud_storage.GetIfChanged, | 225 self.cloud_storage.CredentialsError, self.cloud_storage.GetIfChanged, |
| 206 self.cloud_storage.INTERNAL_BUCKET, '/path/to/preset_internal_file.wpr') | 226 self.cloud_storage.INTERNAL_BUCKET, |
| 227 os.path.join(os.path.sep, 'path', 'to', 'preset_internal_file.wpr')) |
| 207 self.assertRaises( | 228 self.assertRaises( |
| 208 self.cloud_storage.CredentialsError, self.cloud_storage.List, | 229 self.cloud_storage.CredentialsError, self.cloud_storage.List, |
| 209 self.cloud_storage.INTERNAL_BUCKET) | 230 self.cloud_storage.INTERNAL_BUCKET) |
| 210 self.assertRaises( | 231 self.assertRaises( |
| 211 self.cloud_storage.CredentialsError, self.cloud_storage.Exists, | 232 self.cloud_storage.CredentialsError, self.cloud_storage.Exists, |
| 212 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr') | 233 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr') |
| 213 self.assertRaises( | 234 self.assertRaises( |
| 214 self.cloud_storage.CredentialsError, self.cloud_storage.Insert, | 235 self.cloud_storage.CredentialsError, self.cloud_storage.Insert, |
| 215 self.cloud_storage.INTERNAL_BUCKET, 'success.wpr', | 236 self.cloud_storage.INTERNAL_BUCKET, 'success.wpr', |
| 216 '/path/to/success.wpr') | 237 os.path.join(os.path.sep, 'path', 'to', 'success.wpr')) |
| 217 # Reset state. | 238 # Reset state. |
| 218 self.cloud_storage.SetRemotePathsForTesting() | 239 self.cloud_storage.SetRemotePathsForTesting() |
| 219 | 240 |
| 220 def testOpenRestoresCorrectly(self): | 241 def testOpenRestoresCorrectly(self): |
| 221 file_path = os.path.realpath(__file__) | 242 file_path = os.path.realpath(__file__) |
| 222 stubs = system_stub.Override(system_stub_test_module, ['open']) | 243 stubs = system_stub.Override(system_stub_test_module, ['open']) |
| 223 stubs.open.files = {file_path:'contents'} | 244 stubs.open.files = {file_path:'contents'} |
| 224 f = system_stub_test_module.SystemStubTest.TestOpen(file_path) | 245 f = system_stub_test_module.SystemStubTest.TestOpen(file_path) |
| 225 self.assertEqual(type(f), system_stub.OpenFunctionStub.FileStub) | 246 self.assertEqual(type(f), system_stub.OpenFunctionStub.FileStub) |
| 226 stubs.open.files = {} | 247 stubs.open.files = {} |
| 227 stubs.Restore() | 248 stubs.Restore() |
| 228 # This will throw an error if the open stub wasn't restored correctly. | 249 # This will throw an error if the open stub wasn't restored correctly. |
| 229 f = system_stub_test_module.SystemStubTest.TestOpen(file_path) | 250 f = system_stub_test_module.SystemStubTest.TestOpen(file_path) |
| 230 self.assertEqual(type(f), file) | 251 self.assertEqual(type(f), file) |
| OLD | NEW |