Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(88)

Side by Side Diff: tools/telemetry/telemetry/unittest/system_stub_unittest.py

Issue 430473011: Fleshout cloud_storage stub implementation. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Remove presubmit unittests since presubmit is going to be deleted in hte next CL. Created 6 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « tools/telemetry/telemetry/unittest/system_stub.py ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 # Copyright 2013 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 import os
6 import unittest
7
8 PERF_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
9 from telemetry.unittest import system_stub
10
11
12 class CloudStorageTest(unittest.TestCase):
13 SUCCESS_FILE_HASH = 'success'.zfill(40)
14 PUBLIC_FILE_HASH = 'public'.zfill(40)
15 PARTNER_FILE_HASH = 'partner'.zfill(40)
16 INTERNAL_FILE_HASH = 'internal'.zfill(40)
17 UPDATED_HASH = 'updated'.zfill(40)
18
19 def setUp(self):
20 self.cloud_storage = system_stub.CloudStorageModuleStub()
21
22 # Files in Cloud Storage.
23 self.remote_files = ['preset_public_file.wpr',
24 'preset_partner_file.wpr',
25 'preset_internal_file.wpr']
26 self.remote_paths = {
27 self.cloud_storage.PUBLIC_BUCKET:
28 {'preset_public_file.wpr':CloudStorageTest.PUBLIC_FILE_HASH},
29 self.cloud_storage.PARTNER_BUCKET:
30 {'preset_partner_file.wpr':CloudStorageTest.PARTNER_FILE_HASH},
31 self.cloud_storage.INTERNAL_BUCKET:
32 {'preset_internal_file.wpr':CloudStorageTest.INTERNAL_FILE_HASH}}
33
34 # Local data files and hashes.
35 self.data_files = ['/path/to/success.wpr',
36 '/path/to/wrong_hash.wpr',
37 '/path/to/preset_public_file.wpr',
38 '/path/to/preset_partner_file.wpr',
39 '/path/to/preset_internal_file.wpr']
40 self.local_file_hashes = {
41 '/path/to/success.wpr': CloudStorageTest.SUCCESS_FILE_HASH,
42 '/path/to/wrong_hash.wpr': CloudStorageTest.SUCCESS_FILE_HASH,
43 '/path/to/preset_public_file.wpr':CloudStorageTest.PUBLIC_FILE_HASH,
44 '/path/to/preset_partner_file.wpr':CloudStorageTest.PARTNER_FILE_HASH,
45 '/path/to/preset_internal_file.wpr':CloudStorageTest.INTERNAL_FILE_HASH,
46 }
47 self.cloud_storage.SetCalculatedHashesForTesting(self.local_file_hashes)
48 # Local hash files and their contents.
49 local_hash_files = {
50 '/path/to/success.wpr.sha1': CloudStorageTest.SUCCESS_FILE_HASH,
51 '/path/to/wrong_hash.wpr.sha1': 'wronghash'.zfill(40),
52 '/path/to/preset_public_file.wpr.sha1': CloudStorageTest.PUBLIC_FILE_HASH,
53 '/path/to/preset_partner_file.wpr.sha1':
54 CloudStorageTest.PARTNER_FILE_HASH,
55 '/path/to/preset_internal_file.wpr.sha1':
56 CloudStorageTest.INTERNAL_FILE_HASH,
57 }
58 self.cloud_storage.SetHashFileContentsForTesting(local_hash_files)
59
60 def testSetup(self):
61 self.assertEqual(self.local_file_hashes,
62 self.cloud_storage.local_file_hashes)
63 self.assertEqual(set(self.data_files),
64 set(self.cloud_storage.GetLocalDataFiles()))
65 self.assertEqual(self.cloud_storage.default_remote_paths,
66 self.cloud_storage.GetRemotePathsForTesting())
67 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths)
68 self.assertEqual(self.remote_paths,
69 self.cloud_storage.GetRemotePathsForTesting())
70
71 def testExistsEmptyCloudStorage(self):
72 # Test empty remote files dictionary.
73 self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
74 'preset_public_file.wpr'))
75 self.assertFalse(self.cloud_storage.Exists(
76 self.cloud_storage.PARTNER_BUCKET, 'preset_partner_file.wpr'))
77 self.assertFalse(self.cloud_storage.Exists(
78 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr'))
79
80 def testExistsNonEmptyCloudStorage(self):
81 # Test non-empty remote files dictionary.
82 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths)
83 self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
84 'preset_public_file.wpr'))
85 self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PARTNER_BUCKET,
86 'preset_partner_file.wpr'))
87 self.assertTrue(self.cloud_storage.Exists(
88 self.cloud_storage.INTERNAL_BUCKET,'preset_internal_file.wpr'))
89 self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
90 'fake_file'))
91 self.assertFalse(self.cloud_storage.Exists(
92 self.cloud_storage.PARTNER_BUCKET,'fake_file'))
93 self.assertFalse(self.cloud_storage.Exists(
94 self.cloud_storage.INTERNAL_BUCKET, 'fake_file'))
95 # Reset state.
96 self.cloud_storage.SetRemotePathsForTesting()
97
98 def testNonEmptyInsertAndExistsPublic(self):
99 # Test non-empty remote files dictionary.
100 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths)
101 self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
102 'success.wpr'))
103 self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
104 '/path/to/success.wpr')
105 self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
106 'success.wpr'))
107 # Reset state.
108 self.cloud_storage.SetRemotePathsForTesting()
109
110 def testEmptyInsertAndExistsPublic(self):
111 # Test empty remote files dictionary.
112 self.assertFalse(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
113 'success.wpr'))
114 self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
115 '/path/to/success.wpr')
116 self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
117 'success.wpr'))
118
119 def testEmptyInsertAndGet(self):
120 self.assertRaises(self.cloud_storage.NotFoundError, self.cloud_storage.Get,
121 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
122 '/path/to/success.wpr')
123 self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
124 '/path/to/success.wpr')
125 self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
126 'success.wpr'))
127 self.assertEqual(CloudStorageTest.SUCCESS_FILE_HASH,
128 self.cloud_storage.Get(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
129 '/path/to/success.wpr'))
130
131 def testNonEmptyInsertAndGet(self):
132 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths)
133 self.assertRaises(self.cloud_storage.NotFoundError, self.cloud_storage.Get,
134 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
135 '/path/to/success.wpr')
136 self.cloud_storage.Insert(self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
137 '/path/to/success.wpr')
138 self.assertTrue(self.cloud_storage.Exists(self.cloud_storage.PUBLIC_BUCKET,
139 'success.wpr'))
140 self.assertEqual(CloudStorageTest.SUCCESS_FILE_HASH,
141 self.cloud_storage.Get(self.cloud_storage.PUBLIC_BUCKET,
142 'success.wpr',
143 '/path/to/success.wpr'))
144 # Reset state.
145 self.cloud_storage.SetRemotePathsForTesting()
146
147 def testGetIfChanged(self):
148 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths)
149 self.assertRaises(self.cloud_storage.NotFoundError, self.cloud_storage.Get,
150 self.cloud_storage.PUBLIC_BUCKET, 'success.wpr',
151 '/path/to/success.wpr')
152 self.assertFalse(self.cloud_storage.GetIfChanged(
153 self.cloud_storage.PUBLIC_BUCKET, '/path/to/preset_public_file.wpr'))
154 self.cloud_storage.ChangeRemoteHashForTesting(
155 self.cloud_storage.PUBLIC_BUCKET, 'preset_public_file.wpr',
156 CloudStorageTest.UPDATED_HASH)
157 self.assertTrue(self.cloud_storage.GetIfChanged(
158 self.cloud_storage.PUBLIC_BUCKET, '/path/to/preset_public_file.wpr'))
159 self.assertFalse(self.cloud_storage.GetIfChanged(
160 self.cloud_storage.PUBLIC_BUCKET, '/path/to/preset_public_file.wpr'))
161 # Reset state.
162 self.cloud_storage.SetRemotePathsForTesting()
163
164 def testList(self):
165 self.assertEqual([],
166 self.cloud_storage.List(self.cloud_storage.PUBLIC_BUCKET))
167 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths)
168 self.assertEqual(['preset_public_file.wpr'],
169 self.cloud_storage.List(self.cloud_storage.PUBLIC_BUCKET))
170 # Reset state.
171 self.cloud_storage.SetRemotePathsForTesting()
172
173 def testPermissionError(self):
174 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths)
175 self.cloud_storage.SetPermissionLevelForTesting(
176 self.cloud_storage.PUBLIC_PERMISSION)
177 self.assertRaises(
178 self.cloud_storage.PermissionError, self.cloud_storage.Get,
179 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr',
180 '/path/to/preset_internal_file.wpr')
181 self.assertRaises(
182 self.cloud_storage.PermissionError, self.cloud_storage.GetIfChanged,
183 self.cloud_storage.INTERNAL_BUCKET, '/path/to/preset_internal_file.wpr')
184 self.assertRaises(
185 self.cloud_storage.PermissionError, self.cloud_storage.List,
186 self.cloud_storage.INTERNAL_BUCKET)
187 self.assertRaises(
188 self.cloud_storage.PermissionError, self.cloud_storage.Exists,
189 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr')
190 self.assertRaises(
191 self.cloud_storage.PermissionError, self.cloud_storage.Insert,
192 self.cloud_storage.INTERNAL_BUCKET, 'success.wpr', '/path/to/success.wpr')
193 # Reset state.
194 self.cloud_storage.SetRemotePathsForTesting()
195
196 def testCredentialsError(self):
197 self.cloud_storage.SetRemotePathsForTesting(self.remote_paths)
198 self.cloud_storage.SetPermissionLevelForTesting(
199 self.cloud_storage.CREDENTIALS_ERROR_PERMISSION)
200 self.assertRaises(
201 self.cloud_storage.CredentialsError, self.cloud_storage.Get,
202 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr',
203 '/path/to/preset_internal_file.wpr')
204 self.assertRaises(
205 self.cloud_storage.CredentialsError, self.cloud_storage.GetIfChanged,
206 self.cloud_storage.INTERNAL_BUCKET, '/path/to/preset_internal_file.wpr')
207 self.assertRaises(
208 self.cloud_storage.CredentialsError, self.cloud_storage.List,
209 self.cloud_storage.INTERNAL_BUCKET)
210 self.assertRaises(
211 self.cloud_storage.CredentialsError, self.cloud_storage.Exists,
212 self.cloud_storage.INTERNAL_BUCKET, 'preset_internal_file.wpr')
213 self.assertRaises(
214 self.cloud_storage.CredentialsError, self.cloud_storage.Insert,
215 self.cloud_storage.INTERNAL_BUCKET, 'success.wpr',
216 '/path/to/success.wpr')
217 # Reset state.
218 self.cloud_storage.SetRemotePathsForTesting()
OLDNEW
« no previous file with comments | « tools/telemetry/telemetry/unittest/system_stub.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698