OLD | NEW |
---|---|
1 # Copyright 2014 The Chromium Authors. All rights reserved. | 1 # Copyright 2014 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 import unittest | 5 import unittest |
6 | 6 |
7 from telemetry import decorators | 7 from telemetry import decorators |
8 | 8 |
9 from telemetry.unittest_util import system_stub | 9 from telemetry.unittest_util import system_stub |
10 from telemetry.util import cloud_storage | 10 from telemetry.util import cloud_storage |
11 | 11 |
12 | 12 |
13 def _FakeFindGsutil(): | 13 def _FakeFindGsutil(): |
14 return 'fake gsutil path' | 14 return 'fake gsutil path' |
15 | 15 |
16 def _FakeReadHash(_): | |
17 return 'hashthis!' | |
18 | |
19 def _FakeCalulateHashMatchesRead(_): | |
20 return 'hashthis!' | |
21 | |
22 def _FakeCalulateHashNewHash(_): | |
23 return 'omgnewhash' | |
24 | |
16 | 25 |
17 class CloudStorageUnitTest(unittest.TestCase): | 26 class CloudStorageUnitTest(unittest.TestCase): |
18 | 27 |
19 def _FakeRunCommand(self, cmd): | 28 def _FakeRunCommand(self, cmd): |
20 pass | 29 pass |
21 | 30 |
22 def testValidCloudUrl(self): | 31 def _FakeGet(self, bucket, remote_path, local_path): |
32 pass | |
33 | |
34 def testRunCommandCredentialsError(self): | |
35 stubs = system_stub.Override(cloud_storage, ['open', 'subprocess']) | |
36 orig_find_gs_util = cloud_storage.FindGsutil | |
37 cloud_storage.FindGsutil = _FakeFindGsutil | |
38 try: | |
39 stubs.open.files = {'fake gsutil path':''} | |
40 stubs.subprocess.Popen.returncode_result = 1 | |
41 stubs.subprocess.Popen.communicate_result = ( | |
42 '', | |
43 'You are attempting to access protected data with no configured') | |
44 self.assertRaises(cloud_storage.CredentialsError, | |
45 cloud_storage._RunCommand, []) | |
sullivan
2015/02/20 17:47:37
These lines are repeated many times throughout the
aiolos (Not reviewing)
2015/02/20 19:06:51
Done.
| |
46 stubs.subprocess.Popen.communicate_result = ( | |
47 '', | |
48 'Failure: No handler was ready to authenticate.') | |
49 self.assertRaises(cloud_storage.CredentialsError, | |
50 cloud_storage._RunCommand, []) | |
51 finally: | |
52 stubs.Restore() | |
53 cloud_storage.FindGsutil = orig_find_gs_util | |
54 | |
55 def testRunCommandPermissionError(self): | |
56 stubs = system_stub.Override(cloud_storage, ['open', 'subprocess']) | |
57 orig_find_gs_util = cloud_storage.FindGsutil | |
58 cloud_storage.FindGsutil = _FakeFindGsutil | |
59 try: | |
60 stubs.open.files = {'fake gsutil path':''} | |
61 stubs.subprocess.Popen.returncode_result = 1 | |
62 stubs.subprocess.Popen.communicate_result = ( | |
63 '', | |
64 'status=403') | |
65 self.assertRaises(cloud_storage.PermissionError, | |
66 cloud_storage._RunCommand, []) | |
67 stubs.subprocess.Popen.communicate_result = ( | |
68 '', | |
69 'status 403') | |
70 self.assertRaises(cloud_storage.PermissionError, | |
71 cloud_storage._RunCommand, []) | |
72 stubs.subprocess.Popen.communicate_result = ( | |
73 '', | |
74 '403 Forbidden') | |
75 self.assertRaises(cloud_storage.PermissionError, | |
76 cloud_storage._RunCommand, []) | |
77 finally: | |
78 stubs.Restore() | |
79 cloud_storage.FindGsutil = orig_find_gs_util | |
80 | |
81 def testRunCommandNotFoundError(self): | |
82 stubs = system_stub.Override(cloud_storage, ['open', 'subprocess']) | |
83 orig_find_gs_util = cloud_storage.FindGsutil | |
84 cloud_storage.FindGsutil = _FakeFindGsutil | |
85 try: | |
86 stubs.open.files = {'fake gsutil path':''} | |
87 stubs.subprocess.Popen.returncode_result = 1 | |
88 stubs.subprocess.Popen.communicate_result = ( | |
89 '', | |
90 'InvalidUriError') | |
91 self.assertRaises(cloud_storage.NotFoundError, | |
92 cloud_storage._RunCommand, []) | |
93 stubs.subprocess.Popen.communicate_result = ( | |
94 '', | |
95 'No such object') | |
96 self.assertRaises(cloud_storage.NotFoundError, | |
97 cloud_storage._RunCommand, []) | |
98 stubs.subprocess.Popen.communicate_result = ( | |
99 '', | |
100 'No URLs matched') | |
101 self.assertRaises(cloud_storage.NotFoundError, | |
102 cloud_storage._RunCommand, []) | |
103 stubs.subprocess.Popen.communicate_result = ( | |
104 '', | |
105 'One or more URLs matched no') | |
106 self.assertRaises(cloud_storage.NotFoundError, | |
107 cloud_storage._RunCommand, []) | |
108 finally: | |
109 stubs.Restore() | |
110 cloud_storage.FindGsutil = orig_find_gs_util | |
111 | |
112 def testRunCommandServerError(self): | |
113 stubs = system_stub.Override(cloud_storage, ['open', 'subprocess']) | |
114 orig_find_gs_util = cloud_storage.FindGsutil | |
115 cloud_storage.FindGsutil = _FakeFindGsutil | |
116 try: | |
117 stubs.open.files = {'fake gsutil path':''} | |
118 stubs.subprocess.Popen.returncode_result = 1 | |
119 stubs.subprocess.Popen.communicate_result = ( | |
120 '', | |
121 'InvalidUriError') | |
122 self.assertRaises(cloud_storage.NotFoundError, | |
123 cloud_storage._RunCommand, []) | |
124 finally: | |
125 stubs.Restore() | |
126 cloud_storage.FindGsutil = orig_find_gs_util | |
127 | |
128 def testRunCommandGenericError(self): | |
129 stubs = system_stub.Override(cloud_storage, ['open', 'subprocess']) | |
130 orig_find_gs_util = cloud_storage.FindGsutil | |
131 cloud_storage.FindGsutil = _FakeFindGsutil | |
132 try: | |
133 stubs.open.files = {'fake gsutil path':''} | |
134 stubs.subprocess.Popen.returncode_result = 1 | |
135 stubs.subprocess.Popen.communicate_result = ( | |
136 '', | |
137 'InvalidUriError') | |
138 self.assertRaises(cloud_storage.CloudStorageError, | |
139 cloud_storage._RunCommand, []) | |
140 finally: | |
141 stubs.Restore() | |
142 cloud_storage.FindGsutil = orig_find_gs_util | |
143 | |
144 def testInsertCreatesValidCloudUrl(self): | |
23 orig_run_command = cloud_storage._RunCommand | 145 orig_run_command = cloud_storage._RunCommand |
24 try: | 146 try: |
25 cloud_storage._RunCommand = self._FakeRunCommand | 147 cloud_storage._RunCommand = self._FakeRunCommand |
26 remote_path = 'test-remote-path.html' | 148 remote_path = 'test-remote-path.html' |
27 local_path = 'test-local-path.html' | 149 local_path = 'test-local-path.html' |
28 cloud_url = cloud_storage.Insert(cloud_storage.PUBLIC_BUCKET, | 150 cloud_url = cloud_storage.Insert(cloud_storage.PUBLIC_BUCKET, |
29 remote_path, local_path) | 151 remote_path, local_path) |
30 self.assertEqual('https://console.developers.google.com/m/cloudstorage' | 152 self.assertEqual('https://console.developers.google.com/m/cloudstorage' |
31 '/b/chromium-telemetry/o/test-remote-path.html', | 153 '/b/chromium-telemetry/o/test-remote-path.html', |
32 cloud_url) | 154 cloud_url) |
33 finally: | 155 finally: |
34 cloud_storage._RunCommand = orig_run_command | 156 cloud_storage._RunCommand = orig_run_command |
35 | 157 |
36 def testExistsReturnsFalse(self): | 158 def testExistsReturnsFalse(self): |
37 stubs = system_stub.Override(cloud_storage, ['subprocess']) | 159 stubs = system_stub.Override(cloud_storage, ['subprocess']) |
38 orig_find_gs_util = cloud_storage.FindGsutil | 160 orig_find_gs_util = cloud_storage.FindGsutil |
39 try: | 161 try: |
40 stubs.subprocess.Popen.communicate_result = ( | 162 stubs.subprocess.Popen.communicate_result = ( |
41 '', | 163 '', |
42 'CommandException: One or more URLs matched no objects.\n') | 164 'CommandException: One or more URLs matched no objects.\n') |
43 stubs.subprocess.Popen.returncode_result = 1 | 165 stubs.subprocess.Popen.returncode_result = 1 |
44 cloud_storage.FindGsutil = _FakeFindGsutil | 166 cloud_storage.FindGsutil = _FakeFindGsutil |
45 self.assertFalse(cloud_storage.Exists('fake bucket', | 167 self.assertFalse(cloud_storage.Exists('fake bucket', |
46 'fake remote path')) | 168 'fake remote path')) |
47 finally: | 169 finally: |
48 stubs.Restore() | 170 stubs.Restore() |
49 cloud_storage.FindGsutil = orig_find_gs_util | 171 cloud_storage.FindGsutil = orig_find_gs_util |
172 | |
173 def testGetIfChanged(self): | |
174 stubs = system_stub.Override(cloud_storage, ['os', 'open']) | |
175 stubs.open.files[_FakeFindGsutil()] = '' | |
176 orig_get = cloud_storage.Get | |
177 orig_read_hash = cloud_storage.ReadHash | |
178 orig_calculate_hash = cloud_storage.CalculateHash | |
179 cloud_storage.ReadHash = _FakeReadHash | |
180 cloud_storage.CalculateHash = _FakeCalulateHashMatchesRead | |
181 file_path = 'test-file-path.wpr' | |
182 hash_path = file_path + '.sha1' | |
183 try: | |
184 cloud_storage.Get = self._FakeGet | |
185 # hash_path doesn't exist. | |
186 self.assertFalse(cloud_storage.GetIfChanged(file_path, | |
187 cloud_storage.PUBLIC_BUCKET)) | |
188 # hash_path exists, but file_path doesn't. | |
189 stubs.os.path.files.append(hash_path) | |
190 self.assertTrue(cloud_storage.GetIfChanged(file_path, | |
191 cloud_storage.PUBLIC_BUCKET)) | |
192 # hash_path and file_path exist, and have same hash. | |
193 stubs.os.path.files.append(file_path) | |
194 self.assertFalse(cloud_storage.GetIfChanged(file_path, | |
195 cloud_storage.PUBLIC_BUCKET)) | |
196 # hash_path and file_path exist, and have different hashes. | |
197 cloud_storage.CalculateHash = _FakeCalulateHashNewHash | |
198 self.assertTrue(cloud_storage.GetIfChanged(file_path, | |
199 cloud_storage.PUBLIC_BUCKET)) | |
200 finally: | |
201 stubs.Restore() | |
202 cloud_storage.Get = orig_get | |
203 cloud_storage.CalculateHash = orig_calculate_hash | |
204 cloud_storage.ReadHash = orig_read_hash | |
205 | |
206 def testGetFilesInDirectoryIfChanged(self): | |
207 stubs = system_stub.Override(cloud_storage, ['os']) | |
208 stubs.os._directory = {'dir1':['1file1.sha1', '1file2.txt', '1file3.sha1'], | |
209 'dir2':['2file.txt'], 'dir3':['3file1.sha1']} | |
210 def IncrementFilesUpdated(*_): | |
211 IncrementFilesUpdated.files_updated +=1 | |
212 IncrementFilesUpdated.files_updated = 0 | |
213 orig_get_if_changed = cloud_storage.GetIfChanged | |
214 cloud_storage.GetIfChanged = IncrementFilesUpdated | |
215 try: | |
216 self.assertRaises(ValueError, cloud_storage.GetFilesInDirectoryIfChanged, | |
217 '/', cloud_storage.PUBLIC_BUCKET) | |
218 IncrementFilesUpdated.files_updated = 0 | |
219 cloud_storage.GetFilesInDirectoryIfChanged('dir_path', | |
220 cloud_storage.PUBLIC_BUCKET) | |
221 self.assertEqual(3, IncrementFilesUpdated.files_updated) | |
222 finally: | |
223 cloud_storage.GetIfChanged = orig_get_if_changed | |
224 stubs.Restore() | |
OLD | NEW |