Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(205)

Unified Diff: tools/telemetry/telemetry/unittest/system_stub.py

Issue 430473011: Fleshout cloud_storage stub implementation. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Remove presubmit unittests since presubmit is going to be deleted in hte next CL. Created 6 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: tools/telemetry/telemetry/unittest/system_stub.py
diff --git a/tools/telemetry/telemetry/unittest/system_stub.py b/tools/telemetry/telemetry/unittest/system_stub.py
index f8b803c94150b18f4ca00c06e2e096411c5998d8..e4c34131872a94fe5d954e9815db296d36be8abf 100644
--- a/tools/telemetry/telemetry/unittest/system_stub.py
+++ b/tools/telemetry/telemetry/unittest/system_stub.py
@@ -146,19 +146,122 @@ class CloudStorageModuleStub(object):
'internal': INTERNAL_BUCKET,
}
+ # These are used to test for CloudStorage errors.
+ INTERNAL_PERMISSION = 2
+ PARTNER_PERMISSION = 1
+ PUBLIC_PERMISSION = 0
+ # Not logged in.
+ CREDENTIALS_ERROR_PERMISSION = -1
+
+ class NotFoundError(Exception):
+ pass
+
class CloudStorageError(Exception):
pass
+ class PermissionError(CloudStorageError):
+ pass
+
+ class CredentialsError(CloudStorageError):
+ pass
+
def __init__(self):
- self.remote_paths = []
+ self.default_remote_paths = {CloudStorageModuleStub.INTERNAL_BUCKET:{},
+ CloudStorageModuleStub.PARTNER_BUCKET:{},
+ CloudStorageModuleStub.PUBLIC_BUCKET:{}}
+ self.remote_paths = self.default_remote_paths
self.local_file_hashes = {}
self.local_hash_files = {}
+ self.permission_level = CloudStorageModuleStub.INTERNAL_PERMISSION
+
+ def SetPermissionLevelForTesting(self, permission_level):
+ self.permission_level = permission_level
+
+ def CheckPermissionLevelForBucket(self, bucket):
+ if bucket == CloudStorageModuleStub.PUBLIC_BUCKET:
+ return
+ elif (self.permission_level ==
+ CloudStorageModuleStub.CREDENTIALS_ERROR_PERMISSION):
+ raise CloudStorageModuleStub.CredentialsError()
+ elif bucket == CloudStorageModuleStub.PARTNER_BUCKET:
+ if self.permission_level < CloudStorageModuleStub.PARTNER_PERMISSION:
+ raise CloudStorageModuleStub.PermissionError()
+ elif bucket == CloudStorageModuleStub.INTERNAL_BUCKET:
+ if self.permission_level < CloudStorageModuleStub.INTERNAL_PERMISSION:
+ raise CloudStorageModuleStub.PermissionError()
+ else:
+ raise CloudStorageModuleStub.NotFoundError()
- def List(self, _):
+ def SetRemotePathsForTesting(self, remote_path_dict=None):
+ if not remote_path_dict:
+ self.remote_paths = self.default_remote_paths
+ return
+ self.remote_paths = remote_path_dict
+
+ def GetRemotePathsForTesting(self):
+ if not self.remote_paths:
+ self.remote_paths = self.default_remote_paths
return self.remote_paths
+ # Set a dictionary of data files and their "calculated" hashes.
+ def SetCalculatedHashesForTesting(self, calculated_hash_dictionary):
+ self.local_file_hashes = calculated_hash_dictionary
+
+ def GetLocalDataFiles(self):
+ return self.local_file_hashes.keys()
+
+ # Set a dictionary of hash files and the hashes they should contain.
+ def SetHashFileContentsForTesting(self, hash_file_dictionary):
+ self.local_hash_files = hash_file_dictionary
+
+ def GetLocalHashFiles(self):
+ return self.local_hash_files.keys()
+
+ def ChangeRemoteHashForTesting(self, bucket, remote_path, new_hash):
+ self.remote_paths[bucket][remote_path] = new_hash
+
+ def List(self, bucket):
+ if not bucket or not bucket in self.remote_paths:
+ bucket_error = ('Incorrect bucket specified, correct buckets:' +
+ str(self.remote_paths))
+ raise CloudStorageModuleStub.CloudStorageError(bucket_error)
+ CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
+ return list(self.remote_paths[bucket].keys())
+
+ def Exists(self, bucket, remote_path):
+ CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
+ return remote_path in self.remote_paths[bucket]
+
def Insert(self, bucket, remote_path, local_path):
- pass
+ CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
+ if not local_path in self.GetLocalDataFiles():
+ file_path_error = 'Local file path does not exist'
+ raise CloudStorageModuleStub.CloudStorageError(file_path_error)
+ self.remote_paths[bucket][remote_path] = (
+ CloudStorageModuleStub.CalculateHash(self, local_path))
+
+ def GetHelper(self, bucket, remote_path, local_path, only_if_changed):
+ CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
+ if not remote_path in self.remote_paths[bucket]:
+ if only_if_changed:
+ return False
+ raise CloudStorageModuleStub.NotFoundError('Remote file does not exist.')
+ remote_hash = self.remote_paths[bucket][remote_path]
+ local_hash = self.local_file_hashes[local_path]
+ if only_if_changed and remote_hash == local_hash:
+ return False
+ self.local_file_hashes[local_path] = remote_hash
+ self.local_hash_files[local_path + '.sha1'] = remote_hash
+ return remote_hash
+
+ def Get(self, bucket, remote_path, local_path):
+ return CloudStorageModuleStub.GetHelper(self, bucket, remote_path,
+ local_path, False)
+
+ def GetIfChanged(self, bucket, local_path):
+ remote_path = os.path.basename(local_path)
+ return CloudStorageModuleStub.GetHelper(self, bucket, remote_path,
+ local_path, True)
def CalculateHash(self, file_path):
return self.local_file_hashes[file_path]
« no previous file with comments | « tools/perf/page_sets/presubmit_unittest.py ('k') | tools/telemetry/telemetry/unittest/system_stub_unittest.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698