| Index: tools/telemetry/telemetry/unittest/system_stub.py
|
| diff --git a/tools/telemetry/telemetry/unittest/system_stub.py b/tools/telemetry/telemetry/unittest/system_stub.py
|
| index f8b803c94150b18f4ca00c06e2e096411c5998d8..e4c34131872a94fe5d954e9815db296d36be8abf 100644
|
| --- a/tools/telemetry/telemetry/unittest/system_stub.py
|
| +++ b/tools/telemetry/telemetry/unittest/system_stub.py
|
| @@ -146,19 +146,122 @@ class CloudStorageModuleStub(object):
|
| 'internal': INTERNAL_BUCKET,
|
| }
|
|
|
| + # These are used to test for CloudStorage errors.
|
| + INTERNAL_PERMISSION = 2
|
| + PARTNER_PERMISSION = 1
|
| + PUBLIC_PERMISSION = 0
|
| + # Not logged in.
|
| + CREDENTIALS_ERROR_PERMISSION = -1
|
| +
|
| + class NotFoundError(Exception):
|
| + pass
|
| +
|
| class CloudStorageError(Exception):
|
| pass
|
|
|
| + class PermissionError(CloudStorageError):
|
| + pass
|
| +
|
| + class CredentialsError(CloudStorageError):
|
| + pass
|
| +
|
| def __init__(self):
|
| - self.remote_paths = []
|
| + self.default_remote_paths = {CloudStorageModuleStub.INTERNAL_BUCKET:{},
|
| + CloudStorageModuleStub.PARTNER_BUCKET:{},
|
| + CloudStorageModuleStub.PUBLIC_BUCKET:{}}
|
| + self.remote_paths = self.default_remote_paths
|
| self.local_file_hashes = {}
|
| self.local_hash_files = {}
|
| + self.permission_level = CloudStorageModuleStub.INTERNAL_PERMISSION
|
| +
|
| + def SetPermissionLevelForTesting(self, permission_level):
|
| + self.permission_level = permission_level
|
| +
|
| + def CheckPermissionLevelForBucket(self, bucket):
|
| + if bucket == CloudStorageModuleStub.PUBLIC_BUCKET:
|
| + return
|
| + elif (self.permission_level ==
|
| + CloudStorageModuleStub.CREDENTIALS_ERROR_PERMISSION):
|
| + raise CloudStorageModuleStub.CredentialsError()
|
| + elif bucket == CloudStorageModuleStub.PARTNER_BUCKET:
|
| + if self.permission_level < CloudStorageModuleStub.PARTNER_PERMISSION:
|
| + raise CloudStorageModuleStub.PermissionError()
|
| + elif bucket == CloudStorageModuleStub.INTERNAL_BUCKET:
|
| + if self.permission_level < CloudStorageModuleStub.INTERNAL_PERMISSION:
|
| + raise CloudStorageModuleStub.PermissionError()
|
| + else:
|
| + raise CloudStorageModuleStub.NotFoundError()
|
|
|
| - def List(self, _):
|
| + def SetRemotePathsForTesting(self, remote_path_dict=None):
|
| + if not remote_path_dict:
|
| + self.remote_paths = self.default_remote_paths
|
| + return
|
| + self.remote_paths = remote_path_dict
|
| +
|
| + def GetRemotePathsForTesting(self):
|
| + if not self.remote_paths:
|
| + self.remote_paths = self.default_remote_paths
|
| return self.remote_paths
|
|
|
| + # Set a dictionary of data files and their "calculated" hashes.
|
| + def SetCalculatedHashesForTesting(self, calculated_hash_dictionary):
|
| + self.local_file_hashes = calculated_hash_dictionary
|
| +
|
| + def GetLocalDataFiles(self):
|
| + return self.local_file_hashes.keys()
|
| +
|
| + # Set a dictionary of hash files and the hashes they should contain.
|
| + def SetHashFileContentsForTesting(self, hash_file_dictionary):
|
| + self.local_hash_files = hash_file_dictionary
|
| +
|
| + def GetLocalHashFiles(self):
|
| + return self.local_hash_files.keys()
|
| +
|
| + def ChangeRemoteHashForTesting(self, bucket, remote_path, new_hash):
|
| + self.remote_paths[bucket][remote_path] = new_hash
|
| +
|
| + def List(self, bucket):
|
| + if not bucket or not bucket in self.remote_paths:
|
| + bucket_error = ('Incorrect bucket specified, correct buckets:' +
|
| + str(self.remote_paths))
|
| + raise CloudStorageModuleStub.CloudStorageError(bucket_error)
|
| + CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
|
| + return list(self.remote_paths[bucket].keys())
|
| +
|
| + def Exists(self, bucket, remote_path):
|
| + CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
|
| + return remote_path in self.remote_paths[bucket]
|
| +
|
| def Insert(self, bucket, remote_path, local_path):
|
| - pass
|
| + CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
|
| + if not local_path in self.GetLocalDataFiles():
|
| + file_path_error = 'Local file path does not exist'
|
| + raise CloudStorageModuleStub.CloudStorageError(file_path_error)
|
| + self.remote_paths[bucket][remote_path] = (
|
| + CloudStorageModuleStub.CalculateHash(self, local_path))
|
| +
|
| + def GetHelper(self, bucket, remote_path, local_path, only_if_changed):
|
| + CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
|
| + if not remote_path in self.remote_paths[bucket]:
|
| + if only_if_changed:
|
| + return False
|
| + raise CloudStorageModuleStub.NotFoundError('Remote file does not exist.')
|
| + remote_hash = self.remote_paths[bucket][remote_path]
|
| + local_hash = self.local_file_hashes[local_path]
|
| + if only_if_changed and remote_hash == local_hash:
|
| + return False
|
| + self.local_file_hashes[local_path] = remote_hash
|
| + self.local_hash_files[local_path + '.sha1'] = remote_hash
|
| + return remote_hash
|
| +
|
| + def Get(self, bucket, remote_path, local_path):
|
| + return CloudStorageModuleStub.GetHelper(self, bucket, remote_path,
|
| + local_path, False)
|
| +
|
| + def GetIfChanged(self, bucket, local_path):
|
| + remote_path = os.path.basename(local_path)
|
| + return CloudStorageModuleStub.GetHelper(self, bucket, remote_path,
|
| + local_path, True)
|
|
|
| def CalculateHash(self, file_path):
|
| return self.local_file_hashes[file_path]
|
|
|