Index: tools/telemetry/telemetry/util/cloud_storage_unittest.py |
diff --git a/tools/telemetry/telemetry/util/cloud_storage_unittest.py b/tools/telemetry/telemetry/util/cloud_storage_unittest.py |
index bd26d66c0c383018cb7388aa09283f29671eb99a..da3d9d5e3349ec1a97c42a5a722a714c38d1d693 100644 |
--- a/tools/telemetry/telemetry/util/cloud_storage_unittest.py |
+++ b/tools/telemetry/telemetry/util/cloud_storage_unittest.py |
@@ -2,7 +2,6 @@ |
# Use of this source code is governed by a BSD-style license that can be |
# found in the LICENSE file. |
-import os |
import unittest |
from telemetry import decorators |
@@ -14,61 +13,13 @@ |
def _FakeFindGsutil(): |
return 'fake gsutil path' |
-def _FakeReadHash(_): |
- return 'hashthis!' |
- |
-def _FakeCalulateHashMatchesRead(_): |
- return 'hashthis!' |
- |
-def _FakeCalulateHashNewHash(_): |
- return 'omgnewhash' |
- |
class CloudStorageUnitTest(unittest.TestCase): |
def _FakeRunCommand(self, cmd): |
pass |
- def _FakeGet(self, bucket, remote_path, local_path): |
- pass |
- |
- def _assertRunCommandRaisesError(self, communicate_strs, error): |
- stubs = system_stub.Override(cloud_storage, ['open', 'subprocess']) |
- orig_find_gs_util = cloud_storage.FindGsutil |
- cloud_storage.FindGsutil = _FakeFindGsutil |
- stubs.open.files = {'fake gsutil path':''} |
- stubs.subprocess.Popen.returncode_result = 1 |
- try: |
- for string in communicate_strs: |
- stubs.subprocess.Popen.communicate_result = ('', string) |
- self.assertRaises(error, cloud_storage._RunCommand, []) |
- finally: |
- stubs.Restore() |
- cloud_storage.FindGsutil = orig_find_gs_util |
- |
- def testRunCommandCredentialsError(self): |
- strs = ['You are attempting to access protected data with no configured', |
- 'Failure: No handler was ready to authenticate.'] |
- self._assertRunCommandRaisesError(strs, cloud_storage.CredentialsError) |
- |
- def testRunCommandPermissionError(self): |
- strs = ['status=403', 'status 403', '403 Forbidden'] |
- self._assertRunCommandRaisesError(strs, cloud_storage.PermissionError) |
- |
- def testRunCommandNotFoundError(self): |
- strs = ['InvalidUriError', 'No such object', 'No URLs matched', |
- 'One or more URLs matched no', 'InvalidUriError'] |
- self._assertRunCommandRaisesError(strs, cloud_storage.NotFoundError) |
- |
- def testRunCommandServerError(self): |
- strs = ['500 Internal Server Error'] |
- self._assertRunCommandRaisesError(strs, cloud_storage.ServerError) |
- |
- def testRunCommandGenericError(self): |
- strs = ['Random string'] |
- self._assertRunCommandRaisesError(strs, cloud_storage.CloudStorageError) |
- |
- def testInsertCreatesValidCloudUrl(self): |
+ def testValidCloudUrl(self): |
orig_run_command = cloud_storage._RunCommand |
try: |
cloud_storage._RunCommand = self._FakeRunCommand |
@@ -96,60 +47,3 @@ |
finally: |
stubs.Restore() |
cloud_storage.FindGsutil = orig_find_gs_util |
- |
- def testGetIfChanged(self): |
- stubs = system_stub.Override(cloud_storage, ['os', 'open']) |
- stubs.open.files[_FakeFindGsutil()] = '' |
- orig_get = cloud_storage.Get |
- orig_read_hash = cloud_storage.ReadHash |
- orig_calculate_hash = cloud_storage.CalculateHash |
- cloud_storage.ReadHash = _FakeReadHash |
- cloud_storage.CalculateHash = _FakeCalulateHashMatchesRead |
- file_path = 'test-file-path.wpr' |
- hash_path = file_path + '.sha1' |
- try: |
- cloud_storage.Get = self._FakeGet |
- # hash_path doesn't exist. |
- self.assertFalse(cloud_storage.GetIfChanged(file_path, |
- cloud_storage.PUBLIC_BUCKET)) |
- # hash_path exists, but file_path doesn't. |
- stubs.os.path.files.append(hash_path) |
- self.assertTrue(cloud_storage.GetIfChanged(file_path, |
- cloud_storage.PUBLIC_BUCKET)) |
- # hash_path and file_path exist, and have same hash. |
- stubs.os.path.files.append(file_path) |
- self.assertFalse(cloud_storage.GetIfChanged(file_path, |
- cloud_storage.PUBLIC_BUCKET)) |
- # hash_path and file_path exist, and have different hashes. |
- cloud_storage.CalculateHash = _FakeCalulateHashNewHash |
- self.assertTrue(cloud_storage.GetIfChanged(file_path, |
- cloud_storage.PUBLIC_BUCKET)) |
- finally: |
- stubs.Restore() |
- cloud_storage.Get = orig_get |
- cloud_storage.CalculateHash = orig_calculate_hash |
- cloud_storage.ReadHash = orig_read_hash |
- |
- def testGetFilesInDirectoryIfChanged(self): |
- stubs = system_stub.Override(cloud_storage, ['os']) |
- stubs.os._directory = {'dir1':['1file1.sha1', '1file2.txt', '1file3.sha1'], |
- 'dir2':['2file.txt'], 'dir3':['3file1.sha1']} |
- stubs.os.path.dirs = ['real_dir_path'] |
- def IncrementFilesUpdated(*_): |
- IncrementFilesUpdated.files_updated +=1 |
- IncrementFilesUpdated.files_updated = 0 |
- orig_get_if_changed = cloud_storage.GetIfChanged |
- cloud_storage.GetIfChanged = IncrementFilesUpdated |
- try: |
- self.assertRaises(ValueError, cloud_storage.GetFilesInDirectoryIfChanged, |
- os.path.abspath(os.sep), cloud_storage.PUBLIC_BUCKET) |
- self.assertEqual(0, IncrementFilesUpdated.files_updated) |
- self.assertRaises(ValueError, cloud_storage.GetFilesInDirectoryIfChanged, |
- 'fake_dir_path', cloud_storage.PUBLIC_BUCKET) |
- self.assertEqual(0, IncrementFilesUpdated.files_updated) |
- cloud_storage.GetFilesInDirectoryIfChanged('real_dir_path', |
- cloud_storage.PUBLIC_BUCKET) |
- self.assertEqual(3, IncrementFilesUpdated.files_updated) |
- finally: |
- cloud_storage.GetIfChanged = orig_get_if_changed |
- stubs.Restore() |