Index: tools/telemetry/telemetry/util/cloud_storage_unittest.py |
diff --git a/tools/telemetry/telemetry/util/cloud_storage_unittest.py b/tools/telemetry/telemetry/util/cloud_storage_unittest.py |
index da3d9d5e3349ec1a97c42a5a722a714c38d1d693..48a83d2c93844250572234b1ad3cd3577f81dea6 100644 |
--- a/tools/telemetry/telemetry/util/cloud_storage_unittest.py |
+++ b/tools/telemetry/telemetry/util/cloud_storage_unittest.py |
@@ -13,13 +13,135 @@ from telemetry.util import cloud_storage |
def _FakeFindGsutil(): |
return 'fake gsutil path' |
+def _FakeReadHash(_): |
+ return 'hashthis!' |
+ |
+def _FakeCalulateHashMatchesRead(_): |
+ return 'hashthis!' |
+ |
+def _FakeCalulateHashNewHash(_): |
+ return 'omgnewhash' |
+ |
class CloudStorageUnitTest(unittest.TestCase): |
def _FakeRunCommand(self, cmd): |
pass |
- def testValidCloudUrl(self): |
+ def _FakeGet(self, bucket, remote_path, local_path): |
+ pass |
+ |
+ def testRunCommandCredentialsError(self): |
+ stubs = system_stub.Override(cloud_storage, ['open', 'subprocess']) |
+ orig_find_gs_util = cloud_storage.FindGsutil |
+ cloud_storage.FindGsutil = _FakeFindGsutil |
+ try: |
+ stubs.open.files = {'fake gsutil path':''} |
+ stubs.subprocess.Popen.returncode_result = 1 |
+ stubs.subprocess.Popen.communicate_result = ( |
+ '', |
+ 'You are attempting to access protected data with no configured') |
+ self.assertRaises(cloud_storage.CredentialsError, |
+ cloud_storage._RunCommand, []) |
sullivan
2015/02/20 17:47:37
These lines are repeated many times throughout the
aiolos (Not reviewing)
2015/02/20 19:06:51
Done.
|
+ stubs.subprocess.Popen.communicate_result = ( |
+ '', |
+ 'Failure: No handler was ready to authenticate.') |
+ self.assertRaises(cloud_storage.CredentialsError, |
+ cloud_storage._RunCommand, []) |
+ finally: |
+ stubs.Restore() |
+ cloud_storage.FindGsutil = orig_find_gs_util |
+ |
+ def testRunCommandPermissionError(self): |
+ stubs = system_stub.Override(cloud_storage, ['open', 'subprocess']) |
+ orig_find_gs_util = cloud_storage.FindGsutil |
+ cloud_storage.FindGsutil = _FakeFindGsutil |
+ try: |
+ stubs.open.files = {'fake gsutil path':''} |
+ stubs.subprocess.Popen.returncode_result = 1 |
+ stubs.subprocess.Popen.communicate_result = ( |
+ '', |
+ 'status=403') |
+ self.assertRaises(cloud_storage.PermissionError, |
+ cloud_storage._RunCommand, []) |
+ stubs.subprocess.Popen.communicate_result = ( |
+ '', |
+ 'status 403') |
+ self.assertRaises(cloud_storage.PermissionError, |
+ cloud_storage._RunCommand, []) |
+ stubs.subprocess.Popen.communicate_result = ( |
+ '', |
+ '403 Forbidden') |
+ self.assertRaises(cloud_storage.PermissionError, |
+ cloud_storage._RunCommand, []) |
+ finally: |
+ stubs.Restore() |
+ cloud_storage.FindGsutil = orig_find_gs_util |
+ |
+ def testRunCommandNotFoundError(self): |
+ stubs = system_stub.Override(cloud_storage, ['open', 'subprocess']) |
+ orig_find_gs_util = cloud_storage.FindGsutil |
+ cloud_storage.FindGsutil = _FakeFindGsutil |
+ try: |
+ stubs.open.files = {'fake gsutil path':''} |
+ stubs.subprocess.Popen.returncode_result = 1 |
+ stubs.subprocess.Popen.communicate_result = ( |
+ '', |
+ 'InvalidUriError') |
+ self.assertRaises(cloud_storage.NotFoundError, |
+ cloud_storage._RunCommand, []) |
+ stubs.subprocess.Popen.communicate_result = ( |
+ '', |
+ 'No such object') |
+ self.assertRaises(cloud_storage.NotFoundError, |
+ cloud_storage._RunCommand, []) |
+ stubs.subprocess.Popen.communicate_result = ( |
+ '', |
+ 'No URLs matched') |
+ self.assertRaises(cloud_storage.NotFoundError, |
+ cloud_storage._RunCommand, []) |
+ stubs.subprocess.Popen.communicate_result = ( |
+ '', |
+ 'One or more URLs matched no') |
+ self.assertRaises(cloud_storage.NotFoundError, |
+ cloud_storage._RunCommand, []) |
+ finally: |
+ stubs.Restore() |
+ cloud_storage.FindGsutil = orig_find_gs_util |
+ |
+ def testRunCommandServerError(self): |
+ stubs = system_stub.Override(cloud_storage, ['open', 'subprocess']) |
+ orig_find_gs_util = cloud_storage.FindGsutil |
+ cloud_storage.FindGsutil = _FakeFindGsutil |
+ try: |
+ stubs.open.files = {'fake gsutil path':''} |
+ stubs.subprocess.Popen.returncode_result = 1 |
+ stubs.subprocess.Popen.communicate_result = ( |
+ '', |
+ 'InvalidUriError') |
+ self.assertRaises(cloud_storage.NotFoundError, |
+ cloud_storage._RunCommand, []) |
+ finally: |
+ stubs.Restore() |
+ cloud_storage.FindGsutil = orig_find_gs_util |
+ |
+ def testRunCommandGenericError(self): |
+ stubs = system_stub.Override(cloud_storage, ['open', 'subprocess']) |
+ orig_find_gs_util = cloud_storage.FindGsutil |
+ cloud_storage.FindGsutil = _FakeFindGsutil |
+ try: |
+ stubs.open.files = {'fake gsutil path':''} |
+ stubs.subprocess.Popen.returncode_result = 1 |
+ stubs.subprocess.Popen.communicate_result = ( |
+ '', |
+ 'InvalidUriError') |
+ self.assertRaises(cloud_storage.CloudStorageError, |
+ cloud_storage._RunCommand, []) |
+ finally: |
+ stubs.Restore() |
+ cloud_storage.FindGsutil = orig_find_gs_util |
+ |
+ def testInsertCreatesValidCloudUrl(self): |
orig_run_command = cloud_storage._RunCommand |
try: |
cloud_storage._RunCommand = self._FakeRunCommand |
@@ -47,3 +169,56 @@ class CloudStorageUnitTest(unittest.TestCase): |
finally: |
stubs.Restore() |
cloud_storage.FindGsutil = orig_find_gs_util |
+ |
+ def testGetIfChanged(self): |
+ stubs = system_stub.Override(cloud_storage, ['os', 'open']) |
+ stubs.open.files[_FakeFindGsutil()] = '' |
+ orig_get = cloud_storage.Get |
+ orig_read_hash = cloud_storage.ReadHash |
+ orig_calculate_hash = cloud_storage.CalculateHash |
+ cloud_storage.ReadHash = _FakeReadHash |
+ cloud_storage.CalculateHash = _FakeCalulateHashMatchesRead |
+ file_path = 'test-file-path.wpr' |
+ hash_path = file_path + '.sha1' |
+ try: |
+ cloud_storage.Get = self._FakeGet |
+ # hash_path doesn't exist. |
+ self.assertFalse(cloud_storage.GetIfChanged(file_path, |
+ cloud_storage.PUBLIC_BUCKET)) |
+ # hash_path exists, but file_path doesn't. |
+ stubs.os.path.files.append(hash_path) |
+ self.assertTrue(cloud_storage.GetIfChanged(file_path, |
+ cloud_storage.PUBLIC_BUCKET)) |
+ # hash_path and file_path exist, and have same hash. |
+ stubs.os.path.files.append(file_path) |
+ self.assertFalse(cloud_storage.GetIfChanged(file_path, |
+ cloud_storage.PUBLIC_BUCKET)) |
+ # hash_path and file_path exist, and have different hashes. |
+ cloud_storage.CalculateHash = _FakeCalulateHashNewHash |
+ self.assertTrue(cloud_storage.GetIfChanged(file_path, |
+ cloud_storage.PUBLIC_BUCKET)) |
+ finally: |
+ stubs.Restore() |
+ cloud_storage.Get = orig_get |
+ cloud_storage.CalculateHash = orig_calculate_hash |
+ cloud_storage.ReadHash = orig_read_hash |
+ |
+ def testGetFilesInDirectoryIfChanged(self): |
+ stubs = system_stub.Override(cloud_storage, ['os']) |
+ stubs.os._directory = {'dir1':['1file1.sha1', '1file2.txt', '1file3.sha1'], |
+ 'dir2':['2file.txt'], 'dir3':['3file1.sha1']} |
+ def IncrementFilesUpdated(*_): |
+ IncrementFilesUpdated.files_updated +=1 |
+ IncrementFilesUpdated.files_updated = 0 |
+ orig_get_if_changed = cloud_storage.GetIfChanged |
+ cloud_storage.GetIfChanged = IncrementFilesUpdated |
+ try: |
+ self.assertRaises(ValueError, cloud_storage.GetFilesInDirectoryIfChanged, |
+ '/', cloud_storage.PUBLIC_BUCKET) |
+ IncrementFilesUpdated.files_updated = 0 |
+ cloud_storage.GetFilesInDirectoryIfChanged('dir_path', |
+ cloud_storage.PUBLIC_BUCKET) |
+ self.assertEqual(3, IncrementFilesUpdated.files_updated) |
+ finally: |
+ cloud_storage.GetIfChanged = orig_get_if_changed |
+ stubs.Restore() |