Index: tools/telemetry/telemetry/util/cloud_storage_unittest.py |
diff --git a/tools/telemetry/telemetry/util/cloud_storage_unittest.py b/tools/telemetry/telemetry/util/cloud_storage_unittest.py |
index da3d9d5e3349ec1a97c42a5a722a714c38d1d693..bd26d66c0c383018cb7388aa09283f29671eb99a 100644 |
--- a/tools/telemetry/telemetry/util/cloud_storage_unittest.py |
+++ b/tools/telemetry/telemetry/util/cloud_storage_unittest.py |
@@ -2,6 +2,7 @@ |
# Use of this source code is governed by a BSD-style license that can be |
# found in the LICENSE file. |
+import os |
import unittest |
from telemetry import decorators |
@@ -13,13 +14,61 @@ from telemetry.util import cloud_storage |
def _FakeFindGsutil(): |
return 'fake gsutil path' |
+def _FakeReadHash(_): |
+ return 'hashthis!' |
+ |
+def _FakeCalulateHashMatchesRead(_): |
+ return 'hashthis!' |
+ |
+def _FakeCalulateHashNewHash(_): |
+ return 'omgnewhash' |
+ |
class CloudStorageUnitTest(unittest.TestCase): |
def _FakeRunCommand(self, cmd): |
pass |
- def testValidCloudUrl(self): |
+ def _FakeGet(self, bucket, remote_path, local_path): |
+ pass |
+ |
+ def _assertRunCommandRaisesError(self, communicate_strs, error): |
+ stubs = system_stub.Override(cloud_storage, ['open', 'subprocess']) |
+ orig_find_gs_util = cloud_storage.FindGsutil |
+ cloud_storage.FindGsutil = _FakeFindGsutil |
+ stubs.open.files = {'fake gsutil path':''} |
+ stubs.subprocess.Popen.returncode_result = 1 |
+ try: |
+ for string in communicate_strs: |
+ stubs.subprocess.Popen.communicate_result = ('', string) |
+ self.assertRaises(error, cloud_storage._RunCommand, []) |
+ finally: |
+ stubs.Restore() |
+ cloud_storage.FindGsutil = orig_find_gs_util |
+ |
+ def testRunCommandCredentialsError(self): |
+ strs = ['You are attempting to access protected data with no configured', |
+ 'Failure: No handler was ready to authenticate.'] |
+ self._assertRunCommandRaisesError(strs, cloud_storage.CredentialsError) |
+ |
+ def testRunCommandPermissionError(self): |
+ strs = ['status=403', 'status 403', '403 Forbidden'] |
+ self._assertRunCommandRaisesError(strs, cloud_storage.PermissionError) |
+ |
+ def testRunCommandNotFoundError(self): |
+ strs = ['InvalidUriError', 'No such object', 'No URLs matched', |
+ 'One or more URLs matched no', 'InvalidUriError'] |
+ self._assertRunCommandRaisesError(strs, cloud_storage.NotFoundError) |
+ |
+ def testRunCommandServerError(self): |
+ strs = ['500 Internal Server Error'] |
+ self._assertRunCommandRaisesError(strs, cloud_storage.ServerError) |
+ |
+ def testRunCommandGenericError(self): |
+ strs = ['Random string'] |
+ self._assertRunCommandRaisesError(strs, cloud_storage.CloudStorageError) |
+ |
+ def testInsertCreatesValidCloudUrl(self): |
orig_run_command = cloud_storage._RunCommand |
try: |
cloud_storage._RunCommand = self._FakeRunCommand |
@@ -47,3 +96,60 @@ class CloudStorageUnitTest(unittest.TestCase): |
finally: |
stubs.Restore() |
cloud_storage.FindGsutil = orig_find_gs_util |
+ |
+ def testGetIfChanged(self): |
+ stubs = system_stub.Override(cloud_storage, ['os', 'open']) |
+ stubs.open.files[_FakeFindGsutil()] = '' |
+ orig_get = cloud_storage.Get |
+ orig_read_hash = cloud_storage.ReadHash |
+ orig_calculate_hash = cloud_storage.CalculateHash |
+ cloud_storage.ReadHash = _FakeReadHash |
+ cloud_storage.CalculateHash = _FakeCalulateHashMatchesRead |
+ file_path = 'test-file-path.wpr' |
+ hash_path = file_path + '.sha1' |
+ try: |
+ cloud_storage.Get = self._FakeGet |
+ # hash_path doesn't exist. |
+ self.assertFalse(cloud_storage.GetIfChanged(file_path, |
+ cloud_storage.PUBLIC_BUCKET)) |
+ # hash_path exists, but file_path doesn't. |
+ stubs.os.path.files.append(hash_path) |
+ self.assertTrue(cloud_storage.GetIfChanged(file_path, |
+ cloud_storage.PUBLIC_BUCKET)) |
+ # hash_path and file_path exist, and have same hash. |
+ stubs.os.path.files.append(file_path) |
+ self.assertFalse(cloud_storage.GetIfChanged(file_path, |
+ cloud_storage.PUBLIC_BUCKET)) |
+ # hash_path and file_path exist, and have different hashes. |
+ cloud_storage.CalculateHash = _FakeCalulateHashNewHash |
+ self.assertTrue(cloud_storage.GetIfChanged(file_path, |
+ cloud_storage.PUBLIC_BUCKET)) |
+ finally: |
+ stubs.Restore() |
+ cloud_storage.Get = orig_get |
+ cloud_storage.CalculateHash = orig_calculate_hash |
+ cloud_storage.ReadHash = orig_read_hash |
+ |
+ def testGetFilesInDirectoryIfChanged(self): |
+ stubs = system_stub.Override(cloud_storage, ['os']) |
+ stubs.os._directory = {'dir1':['1file1.sha1', '1file2.txt', '1file3.sha1'], |
+ 'dir2':['2file.txt'], 'dir3':['3file1.sha1']} |
+ stubs.os.path.dirs = ['real_dir_path'] |
+ def IncrementFilesUpdated(*_): |
+ IncrementFilesUpdated.files_updated +=1 |
+ IncrementFilesUpdated.files_updated = 0 |
+ orig_get_if_changed = cloud_storage.GetIfChanged |
+ cloud_storage.GetIfChanged = IncrementFilesUpdated |
+ try: |
+ self.assertRaises(ValueError, cloud_storage.GetFilesInDirectoryIfChanged, |
+ os.path.abspath(os.sep), cloud_storage.PUBLIC_BUCKET) |
+ self.assertEqual(0, IncrementFilesUpdated.files_updated) |
+ self.assertRaises(ValueError, cloud_storage.GetFilesInDirectoryIfChanged, |
+ 'fake_dir_path', cloud_storage.PUBLIC_BUCKET) |
+ self.assertEqual(0, IncrementFilesUpdated.files_updated) |
+ cloud_storage.GetFilesInDirectoryIfChanged('real_dir_path', |
+ cloud_storage.PUBLIC_BUCKET) |
+ self.assertEqual(3, IncrementFilesUpdated.files_updated) |
+ finally: |
+ cloud_storage.GetIfChanged = orig_get_if_changed |
+ stubs.Restore() |