| OLD | NEW |
| 1 # Copyright 2014 The Chromium Authors. All rights reserved. | 1 # Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 import os | 5 import os |
| 6 import unittest | 6 import unittest |
| 7 | 7 |
| 8 from catapult_base import cloud_storage | 8 from catapult_base import cloud_storage |
| 9 from telemetry.testing import system_stub | 9 from telemetry.testing import system_stub |
| 10 | 10 |
| 11 | 11 |
| 12 def _FakeFindGsutil(): | |
| 13 return 'fake gsutil path' | |
| 14 | |
| 15 def _FakeReadHash(_): | 12 def _FakeReadHash(_): |
| 16 return 'hashthis!' | 13 return 'hashthis!' |
| 17 | 14 |
| 18 def _FakeCalulateHashMatchesRead(_): | 15 def _FakeCalulateHashMatchesRead(_): |
| 19 return 'hashthis!' | 16 return 'hashthis!' |
| 20 | 17 |
| 21 def _FakeCalulateHashNewHash(_): | 18 def _FakeCalulateHashNewHash(_): |
| 22 return 'omgnewhash' | 19 return 'omgnewhash' |
| 23 | 20 |
| 24 | 21 |
| 25 class CloudStorageUnitTest(unittest.TestCase): | 22 class CloudStorageUnitTest(unittest.TestCase): |
| 26 | 23 |
| 27 def _FakeRunCommand(self, cmd): | 24 def _FakeRunCommand(self, cmd): |
| 28 pass | 25 pass |
| 29 | 26 |
| 30 def _FakeGet(self, bucket, remote_path, local_path): | 27 def _FakeGet(self, bucket, remote_path, local_path): |
| 31 pass | 28 pass |
| 32 | 29 |
| 33 def _assertRunCommandRaisesError(self, communicate_strs, error): | 30 def _assertRunCommandRaisesError(self, communicate_strs, error): |
| 34 stubs = system_stub.Override(cloud_storage, ['open', 'subprocess']) | 31 stubs = system_stub.Override(cloud_storage, ['open', 'subprocess']) |
| 35 orig_find_gs_util = cloud_storage.FindGsutil | |
| 36 cloud_storage.FindGsutil = _FakeFindGsutil | |
| 37 stubs.open.files = {'fake gsutil path':''} | 32 stubs.open.files = {'fake gsutil path':''} |
| 38 stubs.subprocess.Popen.returncode_result = 1 | 33 stubs.subprocess.Popen.returncode_result = 1 |
| 39 try: | 34 try: |
| 40 for string in communicate_strs: | 35 for string in communicate_strs: |
| 41 stubs.subprocess.Popen.communicate_result = ('', string) | 36 stubs.subprocess.Popen.communicate_result = ('', string) |
| 42 self.assertRaises(error, cloud_storage._RunCommand, []) | 37 self.assertRaises(error, cloud_storage._RunCommand, []) |
| 43 finally: | 38 finally: |
| 44 stubs.Restore() | 39 stubs.Restore() |
| 45 cloud_storage.FindGsutil = orig_find_gs_util | |
| 46 | 40 |
| 47 def testRunCommandCredentialsError(self): | 41 def testRunCommandCredentialsError(self): |
| 48 strs = ['You are attempting to access protected data with no configured', | 42 strs = ['You are attempting to access protected data with no configured', |
| 49 'Failure: No handler was ready to authenticate.'] | 43 'Failure: No handler was ready to authenticate.'] |
| 50 self._assertRunCommandRaisesError(strs, cloud_storage.CredentialsError) | 44 self._assertRunCommandRaisesError(strs, cloud_storage.CredentialsError) |
| 51 | 45 |
| 52 def testRunCommandPermissionError(self): | 46 def testRunCommandPermissionError(self): |
| 53 strs = ['status=403', 'status 403', '403 Forbidden'] | 47 strs = ['status=403', 'status 403', '403 Forbidden'] |
| 54 self._assertRunCommandRaisesError(strs, cloud_storage.PermissionError) | 48 self._assertRunCommandRaisesError(strs, cloud_storage.PermissionError) |
| 55 | 49 |
| (...skipping 19 matching lines...) Expand all Loading... |
| 75 cloud_url = cloud_storage.Insert(cloud_storage.PUBLIC_BUCKET, | 69 cloud_url = cloud_storage.Insert(cloud_storage.PUBLIC_BUCKET, |
| 76 remote_path, local_path) | 70 remote_path, local_path) |
| 77 self.assertEqual('https://console.developers.google.com/m/cloudstorage' | 71 self.assertEqual('https://console.developers.google.com/m/cloudstorage' |
| 78 '/b/chromium-telemetry/o/test-remote-path.html', | 72 '/b/chromium-telemetry/o/test-remote-path.html', |
| 79 cloud_url) | 73 cloud_url) |
| 80 finally: | 74 finally: |
| 81 cloud_storage._RunCommand = orig_run_command | 75 cloud_storage._RunCommand = orig_run_command |
| 82 | 76 |
| 83 def testExistsReturnsFalse(self): | 77 def testExistsReturnsFalse(self): |
| 84 stubs = system_stub.Override(cloud_storage, ['subprocess']) | 78 stubs = system_stub.Override(cloud_storage, ['subprocess']) |
| 85 orig_find_gs_util = cloud_storage.FindGsutil | |
| 86 try: | 79 try: |
| 87 stubs.subprocess.Popen.communicate_result = ( | 80 stubs.subprocess.Popen.communicate_result = ( |
| 88 '', | 81 '', |
| 89 'CommandException: One or more URLs matched no objects.\n') | 82 'CommandException: One or more URLs matched no objects.\n') |
| 90 stubs.subprocess.Popen.returncode_result = 1 | 83 stubs.subprocess.Popen.returncode_result = 1 |
| 91 cloud_storage.FindGsutil = _FakeFindGsutil | |
| 92 self.assertFalse(cloud_storage.Exists('fake bucket', | 84 self.assertFalse(cloud_storage.Exists('fake bucket', |
| 93 'fake remote path')) | 85 'fake remote path')) |
| 94 finally: | 86 finally: |
| 95 stubs.Restore() | 87 stubs.Restore() |
| 96 cloud_storage.FindGsutil = orig_find_gs_util | |
| 97 | 88 |
| 98 def testGetIfChanged(self): | 89 def testGetIfChanged(self): |
| 99 stubs = system_stub.Override(cloud_storage, ['os', 'open']) | 90 stubs = system_stub.Override(cloud_storage, ['os', 'open']) |
| 100 stubs.open.files[_FakeFindGsutil()] = '' | |
| 101 orig_get = cloud_storage.Get | 91 orig_get = cloud_storage.Get |
| 102 orig_read_hash = cloud_storage.ReadHash | 92 orig_read_hash = cloud_storage.ReadHash |
| 103 orig_calculate_hash = cloud_storage.CalculateHash | 93 orig_calculate_hash = cloud_storage.CalculateHash |
| 104 cloud_storage.ReadHash = _FakeReadHash | 94 cloud_storage.ReadHash = _FakeReadHash |
| 105 cloud_storage.CalculateHash = _FakeCalulateHashMatchesRead | 95 cloud_storage.CalculateHash = _FakeCalulateHashMatchesRead |
| 106 file_path = 'test-file-path.wpr' | 96 file_path = 'test-file-path.wpr' |
| 107 hash_path = file_path + '.sha1' | 97 hash_path = file_path + '.sha1' |
| 108 try: | 98 try: |
| 109 cloud_storage.Get = self._FakeGet | 99 cloud_storage.Get = self._FakeGet |
| 110 # hash_path doesn't exist. | 100 # hash_path doesn't exist. |
| (...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 156 orig_run_command = cloud_storage._RunCommand | 146 orig_run_command = cloud_storage._RunCommand |
| 157 def AssertCorrectRunCommandArgs(args): | 147 def AssertCorrectRunCommandArgs(args): |
| 158 self.assertEqual(expected_args, args) | 148 self.assertEqual(expected_args, args) |
| 159 cloud_storage._RunCommand = AssertCorrectRunCommandArgs | 149 cloud_storage._RunCommand = AssertCorrectRunCommandArgs |
| 160 expected_args = ['cp', 'gs://bucket1/remote_path1', | 150 expected_args = ['cp', 'gs://bucket1/remote_path1', |
| 161 'gs://bucket2/remote_path2'] | 151 'gs://bucket2/remote_path2'] |
| 162 try: | 152 try: |
| 163 cloud_storage.Copy('bucket1', 'bucket2', 'remote_path1', 'remote_path2') | 153 cloud_storage.Copy('bucket1', 'bucket2', 'remote_path1', 'remote_path2') |
| 164 finally: | 154 finally: |
| 165 cloud_storage._RunCommand = orig_run_command | 155 cloud_storage._RunCommand = orig_run_command |
| OLD | NEW |