Index: tools/telemetry/telemetry/unittest/system_stub.py |
diff --git a/tools/telemetry/telemetry/unittest/system_stub.py b/tools/telemetry/telemetry/unittest/system_stub.py |
index f8b803c94150b18f4ca00c06e2e096411c5998d8..e4c34131872a94fe5d954e9815db296d36be8abf 100644 |
--- a/tools/telemetry/telemetry/unittest/system_stub.py |
+++ b/tools/telemetry/telemetry/unittest/system_stub.py |
@@ -146,19 +146,122 @@ class CloudStorageModuleStub(object): |
'internal': INTERNAL_BUCKET, |
} |
+ # These are used to test for CloudStorage errors. |
+ INTERNAL_PERMISSION = 2 |
+ PARTNER_PERMISSION = 1 |
+ PUBLIC_PERMISSION = 0 |
+ # Not logged in. |
+ CREDENTIALS_ERROR_PERMISSION = -1 |
+ |
+ class NotFoundError(Exception): |
+ pass |
+ |
class CloudStorageError(Exception): |
pass |
+ class PermissionError(CloudStorageError): |
+ pass |
+ |
+ class CredentialsError(CloudStorageError): |
+ pass |
+ |
def __init__(self): |
- self.remote_paths = [] |
+ self.default_remote_paths = {CloudStorageModuleStub.INTERNAL_BUCKET:{}, |
+ CloudStorageModuleStub.PARTNER_BUCKET:{}, |
+ CloudStorageModuleStub.PUBLIC_BUCKET:{}} |
+ self.remote_paths = self.default_remote_paths |
self.local_file_hashes = {} |
self.local_hash_files = {} |
+ self.permission_level = CloudStorageModuleStub.INTERNAL_PERMISSION |
+ |
+ def SetPermissionLevelForTesting(self, permission_level): |
+ self.permission_level = permission_level |
+ |
+ def CheckPermissionLevelForBucket(self, bucket): |
+ if bucket == CloudStorageModuleStub.PUBLIC_BUCKET: |
+ return |
+ elif (self.permission_level == |
+ CloudStorageModuleStub.CREDENTIALS_ERROR_PERMISSION): |
+ raise CloudStorageModuleStub.CredentialsError() |
+ elif bucket == CloudStorageModuleStub.PARTNER_BUCKET: |
+ if self.permission_level < CloudStorageModuleStub.PARTNER_PERMISSION: |
+ raise CloudStorageModuleStub.PermissionError() |
+ elif bucket == CloudStorageModuleStub.INTERNAL_BUCKET: |
+ if self.permission_level < CloudStorageModuleStub.INTERNAL_PERMISSION: |
+ raise CloudStorageModuleStub.PermissionError() |
+ else: |
+ raise CloudStorageModuleStub.NotFoundError() |
- def List(self, _): |
+ def SetRemotePathsForTesting(self, remote_path_dict=None): |
+ if not remote_path_dict: |
+ self.remote_paths = self.default_remote_paths |
+ return |
+ self.remote_paths = remote_path_dict |
+ |
+ def GetRemotePathsForTesting(self): |
+ if not self.remote_paths: |
+ self.remote_paths = self.default_remote_paths |
return self.remote_paths |
+ # Set a dictionary of data files and their "calculated" hashes. |
+ def SetCalculatedHashesForTesting(self, calculated_hash_dictionary): |
+ self.local_file_hashes = calculated_hash_dictionary |
+ |
+ def GetLocalDataFiles(self): |
+ return self.local_file_hashes.keys() |
+ |
+ # Set a dictionary of hash files and the hashes they should contain. |
+ def SetHashFileContentsForTesting(self, hash_file_dictionary): |
+ self.local_hash_files = hash_file_dictionary |
+ |
+ def GetLocalHashFiles(self): |
+ return self.local_hash_files.keys() |
+ |
+ def ChangeRemoteHashForTesting(self, bucket, remote_path, new_hash): |
+ self.remote_paths[bucket][remote_path] = new_hash |
+ |
+ def List(self, bucket): |
+ if not bucket or not bucket in self.remote_paths: |
+ bucket_error = ('Incorrect bucket specified, correct buckets:' + |
+ str(self.remote_paths)) |
+ raise CloudStorageModuleStub.CloudStorageError(bucket_error) |
+ CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket) |
+ return list(self.remote_paths[bucket].keys()) |
+ |
+ def Exists(self, bucket, remote_path): |
+ CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket) |
+ return remote_path in self.remote_paths[bucket] |
+ |
def Insert(self, bucket, remote_path, local_path): |
- pass |
+ CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket) |
+ if not local_path in self.GetLocalDataFiles(): |
+ file_path_error = 'Local file path does not exist' |
+ raise CloudStorageModuleStub.CloudStorageError(file_path_error) |
+ self.remote_paths[bucket][remote_path] = ( |
+ CloudStorageModuleStub.CalculateHash(self, local_path)) |
+ |
+ def GetHelper(self, bucket, remote_path, local_path, only_if_changed): |
+ CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket) |
+ if not remote_path in self.remote_paths[bucket]: |
+ if only_if_changed: |
+ return False |
+ raise CloudStorageModuleStub.NotFoundError('Remote file does not exist.') |
+ remote_hash = self.remote_paths[bucket][remote_path] |
+ local_hash = self.local_file_hashes[local_path] |
+ if only_if_changed and remote_hash == local_hash: |
+ return False |
+ self.local_file_hashes[local_path] = remote_hash |
+ self.local_hash_files[local_path + '.sha1'] = remote_hash |
+ return remote_hash |
+ |
+ def Get(self, bucket, remote_path, local_path): |
+ return CloudStorageModuleStub.GetHelper(self, bucket, remote_path, |
+ local_path, False) |
+ |
+ def GetIfChanged(self, bucket, local_path): |
+ remote_path = os.path.basename(local_path) |
+ return CloudStorageModuleStub.GetHelper(self, bucket, remote_path, |
+ local_path, True) |
def CalculateHash(self, file_path): |
return self.local_file_hashes[file_path] |