OLD | NEW |
1 # Copyright 2013 The Chromium Authors. All rights reserved. | 1 # Copyright 2013 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 import os | 5 import os |
6 import unittest | 6 import unittest |
7 | 7 |
8 PERF_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | 8 PERF_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
9 from telemetry.unittest_util import system_stub | 9 from telemetry.unittest_util import system_stub |
10 from unittest_data import system_stub_test_module | 10 from unittest_data import system_stub_test_module |
(...skipping 14 matching lines...) Expand all Loading... |
25 'preset_internal_file.wpr'] | 25 'preset_internal_file.wpr'] |
26 self.remote_paths = { | 26 self.remote_paths = { |
27 self.cloud_storage.PUBLIC_BUCKET: | 27 self.cloud_storage.PUBLIC_BUCKET: |
28 {'preset_public_file.wpr':CloudStorageTest.PUBLIC_FILE_HASH}, | 28 {'preset_public_file.wpr':CloudStorageTest.PUBLIC_FILE_HASH}, |
29 self.cloud_storage.PARTNER_BUCKET: | 29 self.cloud_storage.PARTNER_BUCKET: |
30 {'preset_partner_file.wpr':CloudStorageTest.PARTNER_FILE_HASH}, | 30 {'preset_partner_file.wpr':CloudStorageTest.PARTNER_FILE_HASH}, |
31 self.cloud_storage.INTERNAL_BUCKET: | 31 self.cloud_storage.INTERNAL_BUCKET: |
32 {'preset_internal_file.wpr':CloudStorageTest.INTERNAL_FILE_HASH}} | 32 {'preset_internal_file.wpr':CloudStorageTest.INTERNAL_FILE_HASH}} |
33 | 33 |
34 # Local data files and hashes. | 34 # Local data files and hashes. |
35 self.data_files = [ | 35 self.data_files = ['/path/to/success.wpr', |
36 os.path.join(os.path.sep, 'path', 'to', 'success.wpr'), | 36 '/path/to/wrong_hash.wpr', |
37 os.path.join(os.path.sep, 'path', 'to', 'wrong_hash.wpr'), | 37 '/path/to/preset_public_file.wpr', |
38 os.path.join(os.path.sep, 'path', 'to', 'preset_public_file.wpr'), | 38 '/path/to/preset_partner_file.wpr', |
39 os.path.join(os.path.sep, 'path', 'to', 'preset_partner_file.wpr'), | 39 '/path/to/preset_internal_file.wpr'] |
40 os.path.join(os.path.sep, 'path', 'to', 'preset_internal_file.wpr')] | |
41 self.local_file_hashes = { | 40 self.local_file_hashes = { |
42 os.path.join(os.path.sep, 'path', 'to', 'success.wpr'): | 41 '/path/to/success.wpr': CloudStorageTest.SUCCESS_FILE_HASH, |
43 CloudStorageTest.SUCCESS_FILE_HASH, | 42 '/path/to/wrong_hash.wpr': CloudStorageTest.SUCCESS_FILE_HASH, |
44 os.path.join(os.path.sep, 'path', 'to', 'wrong_hash.wpr'): | 43 '/path/to/preset_public_file.wpr':CloudStorageTest.PUBLIC_FILE_HASH, |
45 CloudStorageTest.SUCCESS_FILE_HASH, | 44 '/path/to/preset_partner_file.wpr':CloudStorageTest.PARTNER_FILE_HASH, |
46 os.path.join(os.path.sep, 'path', 'to', 'preset_public_file.wpr'): | 45 '/path/to/preset_internal_file.wpr':CloudStorageTest.INTERNAL_FILE_HASH, |
47 CloudStorageTest.PUBLIC_FILE_HASH, | |
48 os.path.join(os.path.sep, 'path', 'to', 'preset_partner_file.wpr'): | |
49 CloudStorageTest.PARTNER_FILE_HASH, | |
50 os.path.join(os.path.sep, 'path', 'to', 'preset_internal_file.wpr'): | |
51 CloudStorageTest.INTERNAL_FILE_HASH, | |
52 } | 46 } |
53 self.cloud_storage.SetCalculatedHashesForTesting(self.local_file_hashes) | 47 self.cloud_storage.SetCalculatedHashesForTesting(self.local_file_hashes) |
54 # Local hash files and their contents. | 48 # Local hash files and their contents. |
55 local_hash_files = { | 49 local_hash_files = { |
56 os.path.join(os.path.sep, 'path', 'to', 'success.wpr.sha1'): | 50 '/path/to/success.wpr.sha1': CloudStorageTest.SUCCESS_FILE_HASH, |
57 CloudStorageTest.SUCCESS_FILE_HASH, | 51 '/path/to/wrong_hash.wpr.sha1': 'wronghash'.zfill(40), |
58 os.path.join(os.path.sep, 'path', 'to', 'wrong_hash.wpr.sha1'): | 52 '/path/to/preset_public_file.wpr.sha1': CloudStorageTest.PUBLIC_FILE_HASH, |
59 'wronghash'.zfill(40), | 53 '/path/to/preset_partner_file.wpr.sha1': |
60 os.path.join(os.path.sep, 'path', 'to', 'preset_public_file.wpr.sha1'): | 54 CloudStorageTest.PARTNER_FILE_HASH, |
61 CloudStorageTest.PUBLIC_FILE_HASH, | 55 '/path/to/preset_internal_file.wpr.sha1': |
62 os.path.join(os.path.sep, 'path', 'to', 'preset_partner_file.wpr.sha1'): | 56 CloudStorageTest.INTERNAL_FILE_HASH, |
63 CloudStorageTest.PARTNER_FILE_HASH, | |
64 os.path.join(os.path.sep, 'path', 'to', | |
65 'preset_internal_file.wpr.sha1'): | |
66 CloudStorageTest.INTERNAL_FILE_HASH, | |
67 } | 57 } |
68 self.cloud_storage.SetHashFileContentsForTesting(local_hash_files) | 58 self.cloud_storage.SetHashFileContentsForTesting(local_hash_files) |
69 | 59 |
70 def testSetup(self): | 60 def testSetup(self): |
71 self.assertEqual(self.local_file_hashes, | 61 self.assertEqual(self.local_file_hashes, |
72 self.cloud_storage.local_file_hashes) | 62 self.cloud_storage.local_file_hashes) |
73 self.assertEqual(set(self.data_files), | 63 self.assertEqual(set(self.data_files), |
74 set(self.cloud_storage.GetLocalDataFiles())) | 64 set(self.cloud_storage.GetLocalDataFiles())) |
75 self.assertEqual(self.cloud_storage.default_remote_paths, | 65 self.assertEqual(self.cloud_storage.default_remote_paths, |
76 self.cloud_storage.GetRemotePathsForTesting()) | 66 self.cloud_storage.GetRemotePathsForTesting()) |
77 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) | 67 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) |
78 self.assertEqual(self.remote_paths, | 68 self.assertEqual(self.remote_paths, |
79 self.cloud_storage.GetRemotePathsForTesting()) | 69 self.cloud_storage.GetRemotePathsForTesting()) |
80 | 70 |
81 def testExistsEmptyCloudStorage(self): | 71 def testExistsEmptyCloudStorage(self): |
82 # Test empty remote files dictionary. | 72 # Test empty remote files dictionary. |
83 self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, | 73 self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, |
84 'preset_public_file.wpr')) | 74 'preset_public_file.wpr')) |
85 self.assertFalse(self.cloud_storage.Exists( | 75 self.assertFalse(self.cloud_storage.Exists( |
86 self.cloud_storage.PARTNER_BUCKET, 'preset_partner_file.wpr')) | 76 self.cloud_storage.PARTNER_BUCKET, 'preset_partner_file.wpr')) |
87 self.assertFalse(self.cloud_storage.Exists( | 77 self.assertFalse(self.cloud_storage.Exists( |
88 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr')) | 78 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr')) |
89 | 79 |
90 def testExistsNonEmptyCloudStorage(self): | 80 def testExistsNonEmptyCloudStorage(self): |
91 # Test non-empty remote files dictionary. | 81 # Test non-empty remote files dictionary. |
92 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) | 82 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) |
| 83 self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, |
| 84 'preset_public_file.wpr')) |
| 85 self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PARTNER_BUCKET, |
| 86 'preset_partner_file.wpr')) |
93 self.assertTrue(self.cloud_storage.Exists( | 87 self.assertTrue(self.cloud_storage.Exists( |
94 self.cloud_storage.PUBLIC_BUCKET, 'preset_public_file.wpr')) | 88 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr')) |
95 self.assertTrue(self.cloud_storage.Exists( | 89 self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, |
96 self.cloud_storage.PARTNER_BUCKET, 'preset_partner_file.wpr')) | 90 'fake_file')) |
97 self.assertTrue(self.cloud_storage.Exists( | |
98 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr')) | |
99 self.assertFalse(self.cloud_storage.Exists( | 91 self.assertFalse(self.cloud_storage.Exists( |
100 self.cloud_storage.PUBLIC_BUCKET, 'fake_file')) | 92 self.cloud_storage.PARTNER_BUCKET, 'fake_file')) |
101 self.assertFalse(self.cloud_storage.Exists( | 93 self.assertFalse(self.cloud_storage.Exists( |
102 self.cloud_storage.PARTNER_BUCKET, 'fake_file')) | 94 self.cloud_storage.INTERNAL_BUCKET, 'fake_file')) |
103 self.assertFalse(self.cloud_storage.Exists( | |
104 self.cloud_storage.INTERNAL_BUCKET, 'fake_file')) | |
105 # Reset state. | 95 # Reset state. |
106 self.cloud_storage.SetRemotePathsForTesting() | 96 self.cloud_storage.SetRemotePathsForTesting() |
107 | 97 |
108 def testNonEmptyInsertAndExistsPublic(self): | 98 def testNonEmptyInsertAndExistsPublic(self): |
109 # Test non-empty remote files dictionary. | 99 # Test non-empty remote files dictionary. |
110 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) | 100 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) |
111 self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, | 101 self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, |
112 'success.wpr')) | 102 'success.wpr')) |
113 self.cloud_storage.Insert( | 103 self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
114 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', | 104 '/path/to/success.wpr') |
115 os.path.join(os.path.sep, 'path', 'to', 'success.wpr')) | 105 self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, |
116 self.assertTrue(self.cloud_storage.Exists( | 106 'success.wpr')) |
117 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr')) | |
118 # Reset state. | 107 # Reset state. |
119 self.cloud_storage.SetRemotePathsForTesting() | 108 self.cloud_storage.SetRemotePathsForTesting() |
120 | 109 |
121 def testEmptyInsertAndExistsPublic(self): | 110 def testEmptyInsertAndExistsPublic(self): |
122 # Test empty remote files dictionary. | 111 # Test empty remote files dictionary. |
123 self.assertFalse(self.cloud_storage.Exists( | 112 self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, |
124 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr')) | 113 'success.wpr')) |
125 self.cloud_storage.Insert( | 114 self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
126 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', | 115 '/path/to/success.wpr') |
127 os.path.join(os.path.sep, 'path', 'to', 'success.wpr')) | 116 self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, |
128 self.assertTrue(self.cloud_storage.Exists( | 117 'success.wpr')) |
129 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr')) | |
130 | 118 |
131 def testEmptyInsertAndGet(self): | 119 def testEmptyInsertAndGet(self): |
132 self.assertRaises(self.cloud_storage.NotFoundError, self.cloud_storage.Get, | 120 self.assertRaises(self.cloud_storage.NotFoundError, self.cloud_storage.Get, |
133 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', | 121 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
134 os.path.join(os.path.sep, 'path', 'to', 'success.wpr')) | 122 '/path/to/success.wpr') |
135 self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', | 123 self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
136 os.path.join(os.path.sep, 'path', 'to', | 124 '/path/to/success.wpr') |
137 'success.wpr')) | 125 self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, |
138 self.assertTrue(self.cloud_storage.Exists( | 126 'success.wpr')) |
139 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr')) | 127 self.assertEqual(CloudStorageTest.SUCCESS_FILE_HASH, |
140 self.assertEqual(CloudStorageTest.SUCCESS_FILE_HASH, self.cloud_storage.Get( | 128 self.cloud_storage.Get(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
141 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', | 129 '/path/to/success.wpr')) |
142 os.path.join(os.path.sep, 'path', 'to', 'success.wpr'))) | |
143 | 130 |
144 def testNonEmptyInsertAndGet(self): | 131 def testNonEmptyInsertAndGet(self): |
145 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) | 132 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) |
146 self.assertRaises(self.cloud_storage.NotFoundError, self.cloud_storage.Get, | 133 self.assertRaises(self.cloud_storage.NotFoundError, self.cloud_storage.Get, |
147 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', | 134 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
148 os.path.join(os.path.sep, 'path', 'to', 'success.wpr')) | 135 '/path/to/success.wpr') |
149 self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', | 136 self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
150 os.path.join(os.path.sep, 'path', 'to', | 137 '/path/to/success.wpr') |
151 'success.wpr')) | |
152 self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, | 138 self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET, |
153 'success.wpr')) | 139 'success.wpr')) |
154 self.assertEqual( | 140 self.assertEqual(CloudStorageTest.SUCCESS_FILE_HASH, |
155 CloudStorageTest.SUCCESS_FILE_HASH, self.cloud_storage.Get( | 141 self.cloud_storage.Get(self.cloud_storage.PUBLIC_BUCKET, |
156 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', | 142 'success.wpr', |
157 os.path.join(os.path.sep, 'path', 'to', 'success.wpr'))) | 143 '/path/to/success.wpr')) |
158 # Reset state. | 144 # Reset state. |
159 self.cloud_storage.SetRemotePathsForTesting() | 145 self.cloud_storage.SetRemotePathsForTesting() |
160 | 146 |
161 def testGetIfChanged(self): | 147 def testGetIfChanged(self): |
162 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) | 148 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) |
163 self.assertRaises( | 149 self.assertRaises(self.cloud_storage.NotFoundError, self.cloud_storage.Get, |
164 self.cloud_storage.NotFoundError, self.cloud_storage.Get, | 150 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', |
165 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr', | 151 '/path/to/success.wpr') |
166 os.path.join(os.path.sep, 'path', 'to', 'success.wpr')) | |
167 self.assertFalse(self.cloud_storage.GetIfChanged( | 152 self.assertFalse(self.cloud_storage.GetIfChanged( |
168 os.path.join(os.path.sep, 'path', 'to', 'preset_public_file.wpr'), | 153 '/path/to/preset_public_file.wpr', self.cloud_storage.PUBLIC_BUCKET)) |
169 self.cloud_storage.PUBLIC_BUCKET)) | |
170 self.cloud_storage.ChangeRemoteHashForTesting( | 154 self.cloud_storage.ChangeRemoteHashForTesting( |
171 self.cloud_storage.PUBLIC_BUCKET, 'preset_public_file.wpr', | 155 self.cloud_storage.PUBLIC_BUCKET, 'preset_public_file.wpr', |
172 CloudStorageTest.UPDATED_HASH) | 156 CloudStorageTest.UPDATED_HASH) |
173 self.assertTrue(self.cloud_storage.GetIfChanged( | 157 self.assertTrue(self.cloud_storage.GetIfChanged( |
174 os.path.join(os.path.sep, 'path', 'to', 'preset_public_file.wpr'), | 158 '/path/to/preset_public_file.wpr', self.cloud_storage.PUBLIC_BUCKET)) |
175 self.cloud_storage.PUBLIC_BUCKET)) | |
176 self.assertFalse(self.cloud_storage.GetIfChanged( | 159 self.assertFalse(self.cloud_storage.GetIfChanged( |
177 os.path.join(os.path.sep, 'path', 'to', 'preset_public_file.wpr'), | 160 '/path/to/preset_public_file.wpr', self.cloud_storage.PUBLIC_BUCKET)) |
178 self.cloud_storage.PUBLIC_BUCKET)) | |
179 # Reset state. | 161 # Reset state. |
180 self.cloud_storage.SetRemotePathsForTesting() | 162 self.cloud_storage.SetRemotePathsForTesting() |
181 | 163 |
182 def testList(self): | 164 def testList(self): |
183 self.assertEqual([], | 165 self.assertEqual([], |
184 self.cloud_storage.List(self.cloud_storage.PUBLIC_BUCKET)) | 166 self.cloud_storage.List(self.cloud_storage.PUBLIC_BUCKET)) |
185 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) | 167 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) |
186 self.assertEqual(['preset_public_file.wpr'], | 168 self.assertEqual(['preset_public_file.wpr'], |
187 self.cloud_storage.List(self.cloud_storage.PUBLIC_BUCKET)) | 169 self.cloud_storage.List(self.cloud_storage.PUBLIC_BUCKET)) |
188 # Reset state. | 170 # Reset state. |
189 self.cloud_storage.SetRemotePathsForTesting() | 171 self.cloud_storage.SetRemotePathsForTesting() |
190 | 172 |
191 def testPermissionError(self): | 173 def testPermissionError(self): |
192 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) | 174 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) |
193 self.cloud_storage.SetPermissionLevelForTesting( | 175 self.cloud_storage.SetPermissionLevelForTesting( |
194 self.cloud_storage.PUBLIC_PERMISSION) | 176 self.cloud_storage.PUBLIC_PERMISSION) |
195 self.assertRaises( | 177 self.assertRaises( |
196 self.cloud_storage.PermissionError, self.cloud_storage.Get, | 178 self.cloud_storage.PermissionError, self.cloud_storage.Get, |
197 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr', | 179 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr', |
198 os.path.join(os.path.sep, 'path', 'to', 'preset_internal_file.wpr')) | 180 '/path/to/preset_internal_file.wpr') |
199 self.assertRaises( | 181 self.assertRaises( |
200 self.cloud_storage.PermissionError, self.cloud_storage.GetIfChanged, | 182 self.cloud_storage.PermissionError, self.cloud_storage.GetIfChanged, |
201 os.path.join(os.path.sep, 'path', 'to', 'preset_internal_file.wpr'), | 183 '/path/to/preset_internal_file.wpr', self.cloud_storage.INTERNAL_BUCKET) |
202 self.cloud_storage.INTERNAL_BUCKET) | |
203 self.assertRaises( | 184 self.assertRaises( |
204 self.cloud_storage.PermissionError, self.cloud_storage.List, | 185 self.cloud_storage.PermissionError, self.cloud_storage.List, |
205 self.cloud_storage.INTERNAL_BUCKET) | 186 self.cloud_storage.INTERNAL_BUCKET) |
206 self.assertRaises( | 187 self.assertRaises( |
207 self.cloud_storage.PermissionError, self.cloud_storage.Exists, | 188 self.cloud_storage.PermissionError, self.cloud_storage.Exists, |
208 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr') | 189 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr') |
209 self.assertRaises( | 190 self.assertRaises( |
210 self.cloud_storage.PermissionError, self.cloud_storage.Insert, | 191 self.cloud_storage.PermissionError, self.cloud_storage.Insert, |
211 self.cloud_storage.INTERNAL_BUCKET, 'success.wpr', | 192 self.cloud_storage.INTERNAL_BUCKET, 'success.wpr', '/path/to/success.wpr') |
212 os.path.join(os.path.sep, 'path', 'to', 'success.wpr')) | |
213 # Reset state. | 193 # Reset state. |
214 self.cloud_storage.SetRemotePathsForTesting() | 194 self.cloud_storage.SetRemotePathsForTesting() |
215 | 195 |
216 def testCredentialsError(self): | 196 def testCredentialsError(self): |
217 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) | 197 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths) |
218 self.cloud_storage.SetPermissionLevelForTesting( | 198 self.cloud_storage.SetPermissionLevelForTesting( |
219 self.cloud_storage.CREDENTIALS_ERROR_PERMISSION) | 199 self.cloud_storage.CREDENTIALS_ERROR_PERMISSION) |
220 self.assertRaises( | 200 self.assertRaises( |
221 self.cloud_storage.CredentialsError, self.cloud_storage.Get, | 201 self.cloud_storage.CredentialsError, self.cloud_storage.Get, |
222 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr', | 202 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr', |
223 os.path.join(os.path.sep, 'path', 'to', 'preset_internal_file.wpr')) | 203 '/path/to/preset_internal_file.wpr') |
224 self.assertRaises( | 204 self.assertRaises( |
225 self.cloud_storage.CredentialsError, self.cloud_storage.GetIfChanged, | 205 self.cloud_storage.CredentialsError, self.cloud_storage.GetIfChanged, |
226 self.cloud_storage.INTERNAL_BUCKET, | 206 self.cloud_storage.INTERNAL_BUCKET, '/path/to/preset_internal_file.wpr') |
227 os.path.join(os.path.sep, 'path', 'to', 'preset_internal_file.wpr')) | |
228 self.assertRaises( | 207 self.assertRaises( |
229 self.cloud_storage.CredentialsError, self.cloud_storage.List, | 208 self.cloud_storage.CredentialsError, self.cloud_storage.List, |
230 self.cloud_storage.INTERNAL_BUCKET) | 209 self.cloud_storage.INTERNAL_BUCKET) |
231 self.assertRaises( | 210 self.assertRaises( |
232 self.cloud_storage.CredentialsError, self.cloud_storage.Exists, | 211 self.cloud_storage.CredentialsError, self.cloud_storage.Exists, |
233 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr') | 212 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr') |
234 self.assertRaises( | 213 self.assertRaises( |
235 self.cloud_storage.CredentialsError, self.cloud_storage.Insert, | 214 self.cloud_storage.CredentialsError, self.cloud_storage.Insert, |
236 self.cloud_storage.INTERNAL_BUCKET, 'success.wpr', | 215 self.cloud_storage.INTERNAL_BUCKET, 'success.wpr', |
237 os.path.join(os.path.sep, 'path', 'to', 'success.wpr')) | 216 '/path/to/success.wpr') |
238 # Reset state. | 217 # Reset state. |
239 self.cloud_storage.SetRemotePathsForTesting() | 218 self.cloud_storage.SetRemotePathsForTesting() |
240 | 219 |
241 def testOpenRestoresCorrectly(self): | 220 def testOpenRestoresCorrectly(self): |
242 file_path = os.path.realpath(__file__) | 221 file_path = os.path.realpath(__file__) |
243 stubs = system_stub.Override(system_stub_test_module, ['open']) | 222 stubs = system_stub.Override(system_stub_test_module, ['open']) |
244 stubs.open.files = {file_path:'contents'} | 223 stubs.open.files = {file_path:'contents'} |
245 f = system_stub_test_module.SystemStubTest.TestOpen(file_path) | 224 f = system_stub_test_module.SystemStubTest.TestOpen(file_path) |
246 self.assertEqual(type(f), system_stub.OpenFunctionStub.FileStub) | 225 self.assertEqual(type(f), system_stub.OpenFunctionStub.FileStub) |
247 stubs.open.files = {} | 226 stubs.open.files = {} |
248 stubs.Restore() | 227 stubs.Restore() |
249 # This will throw an error if the open stub wasn't restored correctly. | 228 # This will throw an error if the open stub wasn't restored correctly. |
250 f = system_stub_test_module.SystemStubTest.TestOpen(file_path) | 229 f = system_stub_test_module.SystemStubTest.TestOpen(file_path) |
251 self.assertEqual(type(f), file) | 230 self.assertEqual(type(f), file) |
OLD | NEW |