| Index: tools/telemetry/telemetry/testing/system_stub.py
|
| diff --git a/tools/telemetry/telemetry/testing/system_stub.py b/tools/telemetry/telemetry/testing/system_stub.py
|
| deleted file mode 100644
|
| index 369e0d8a01109fc581caae3ce9cd73649f42e919..0000000000000000000000000000000000000000
|
| --- a/tools/telemetry/telemetry/testing/system_stub.py
|
| +++ /dev/null
|
| @@ -1,505 +0,0 @@
|
| -# Copyright 2012 The Chromium Authors. All rights reserved.
|
| -# Use of this source code is governed by a BSD-style license that can be
|
| -# found in the LICENSE file.
|
| -
|
| -"""Provides stubs for os, sys and subprocess for testing
|
| -
|
| -This test allows one to test code that itself uses os, sys, and subprocess.
|
| -"""
|
| -
|
| -import ntpath
|
| -import os
|
| -import posixpath
|
| -import re
|
| -import shlex
|
| -import sys
|
| -
|
| -
|
| -class Override(object):
|
| - def __init__(self, base_module, module_list):
|
| - stubs = {'cloud_storage': CloudStorageModuleStub,
|
| - 'open': OpenFunctionStub,
|
| - 'os': OsModuleStub,
|
| - 'perf_control': PerfControlModuleStub,
|
| - 'raw_input': RawInputFunctionStub,
|
| - 'subprocess': SubprocessModuleStub,
|
| - 'sys': SysModuleStub,
|
| - 'thermal_throttle': ThermalThrottleModuleStub,
|
| - 'logging': LoggingStub,
|
| - 'certutils': CertUtilsStub,
|
| - 'adb_install_cert': AdbInstallCertStub,
|
| - 'platformsettings': PlatformSettingsStub,
|
| - }
|
| - self.adb_commands = None
|
| - self.os = None
|
| - self.subprocess = None
|
| - self.sys = None
|
| -
|
| - self._base_module = base_module
|
| - self._overrides = {}
|
| -
|
| - for module_name in module_list:
|
| - self._overrides[module_name] = getattr(base_module, module_name, None)
|
| - setattr(self, module_name, stubs[module_name]())
|
| - setattr(base_module, module_name, getattr(self, module_name))
|
| -
|
| - if self.os and self.sys:
|
| - self.os.path.sys = self.sys
|
| -
|
| - def __del__(self):
|
| - assert not len(self._overrides)
|
| -
|
| - def Restore(self):
|
| - for module_name, original_module in self._overrides.iteritems():
|
| - if original_module is None:
|
| - # This will happen when we override built-in functions, like open.
|
| - # If we don't delete the attribute, we will shadow the built-in
|
| - # function with an attribute set to None.
|
| - delattr(self._base_module, module_name)
|
| - else:
|
| - setattr(self._base_module, module_name, original_module)
|
| - self._overrides = {}
|
| -
|
| -
|
| -class AdbDevice(object):
|
| -
|
| - def __init__(self):
|
| - self.has_root = False
|
| - self.needs_su = False
|
| - self.shell_command_handlers = {}
|
| - self.mock_content = []
|
| - self.system_properties = {}
|
| - if self.system_properties.get('ro.product.cpu.abi') == None:
|
| - self.system_properties['ro.product.cpu.abi'] = 'armeabi-v7a'
|
| -
|
| - def HasRoot(self):
|
| - return self.has_root
|
| -
|
| - def NeedsSU(self):
|
| - return self.needs_su
|
| -
|
| - def RunShellCommand(self, args, **kwargs):
|
| - del kwargs # unused
|
| - if isinstance(args, basestring):
|
| - args = shlex.split(args)
|
| - handler = self.shell_command_handlers[args[0]]
|
| - return handler(args)
|
| -
|
| - def FileExists(self, _):
|
| - return False
|
| -
|
| - def ReadFile(self, device_path, as_root=False):
|
| - del device_path, as_root # unused
|
| - return self.mock_content
|
| -
|
| - def GetProp(self, property_name):
|
| - return self.system_properties[property_name]
|
| -
|
| - def SetProp(self, property_name, property_value):
|
| - self.system_properties[property_name] = property_value
|
| -
|
| -
|
| -class CloudStorageModuleStub(object):
|
| - PUBLIC_BUCKET = 'chromium-telemetry'
|
| - PARTNER_BUCKET = 'chrome-partner-telemetry'
|
| - INTERNAL_BUCKET = 'chrome-telemetry'
|
| - BUCKET_ALIASES = {
|
| - 'public': PUBLIC_BUCKET,
|
| - 'partner': PARTNER_BUCKET,
|
| - 'internal': INTERNAL_BUCKET,
|
| - }
|
| -
|
| - # These are used to test for CloudStorage errors.
|
| - INTERNAL_PERMISSION = 2
|
| - PARTNER_PERMISSION = 1
|
| - PUBLIC_PERMISSION = 0
|
| - # Not logged in.
|
| - CREDENTIALS_ERROR_PERMISSION = -1
|
| -
|
| - class NotFoundError(Exception):
|
| - pass
|
| -
|
| - class CloudStorageError(Exception):
|
| - pass
|
| -
|
| - class PermissionError(CloudStorageError):
|
| - pass
|
| -
|
| - class CredentialsError(CloudStorageError):
|
| - pass
|
| -
|
| - def __init__(self):
|
| - self.default_remote_paths = {CloudStorageModuleStub.INTERNAL_BUCKET:{},
|
| - CloudStorageModuleStub.PARTNER_BUCKET:{},
|
| - CloudStorageModuleStub.PUBLIC_BUCKET:{}}
|
| - self.remote_paths = self.default_remote_paths
|
| - self.local_file_hashes = {}
|
| - self.local_hash_files = {}
|
| - self.permission_level = CloudStorageModuleStub.INTERNAL_PERMISSION
|
| - self.downloaded_files = []
|
| -
|
| - def SetPermissionLevelForTesting(self, permission_level):
|
| - self.permission_level = permission_level
|
| -
|
| - def CheckPermissionLevelForBucket(self, bucket):
|
| - if bucket == CloudStorageModuleStub.PUBLIC_BUCKET:
|
| - return
|
| - elif (self.permission_level ==
|
| - CloudStorageModuleStub.CREDENTIALS_ERROR_PERMISSION):
|
| - raise CloudStorageModuleStub.CredentialsError()
|
| - elif bucket == CloudStorageModuleStub.PARTNER_BUCKET:
|
| - if self.permission_level < CloudStorageModuleStub.PARTNER_PERMISSION:
|
| - raise CloudStorageModuleStub.PermissionError()
|
| - elif bucket == CloudStorageModuleStub.INTERNAL_BUCKET:
|
| - if self.permission_level < CloudStorageModuleStub.INTERNAL_PERMISSION:
|
| - raise CloudStorageModuleStub.PermissionError()
|
| - elif bucket not in self.remote_paths:
|
| - raise CloudStorageModuleStub.NotFoundError()
|
| -
|
| - def SetRemotePathsForTesting(self, remote_path_dict=None):
|
| - if not remote_path_dict:
|
| - self.remote_paths = self.default_remote_paths
|
| - return
|
| - self.remote_paths = remote_path_dict
|
| -
|
| - def GetRemotePathsForTesting(self):
|
| - if not self.remote_paths:
|
| - self.remote_paths = self.default_remote_paths
|
| - return self.remote_paths
|
| -
|
| - # Set a dictionary of data files and their "calculated" hashes.
|
| - def SetCalculatedHashesForTesting(self, calculated_hash_dictionary):
|
| - self.local_file_hashes = calculated_hash_dictionary
|
| -
|
| - def GetLocalDataFiles(self):
|
| - return self.local_file_hashes.keys()
|
| -
|
| - # Set a dictionary of hash files and the hashes they should contain.
|
| - def SetHashFileContentsForTesting(self, hash_file_dictionary):
|
| - self.local_hash_files = hash_file_dictionary
|
| -
|
| - def GetLocalHashFiles(self):
|
| - return self.local_hash_files.keys()
|
| -
|
| - def ChangeRemoteHashForTesting(self, bucket, remote_path, new_hash):
|
| - self.remote_paths[bucket][remote_path] = new_hash
|
| -
|
| - def List(self, bucket):
|
| - if not bucket or not bucket in self.remote_paths:
|
| - bucket_error = ('Incorrect bucket specified, correct buckets:' +
|
| - str(self.remote_paths))
|
| - raise CloudStorageModuleStub.CloudStorageError(bucket_error)
|
| - CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
|
| - return list(self.remote_paths[bucket].keys())
|
| -
|
| - def Exists(self, bucket, remote_path):
|
| - CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
|
| - return remote_path in self.remote_paths[bucket]
|
| -
|
| - def Insert(self, bucket, remote_path, local_path):
|
| - CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
|
| - if not local_path in self.GetLocalDataFiles():
|
| - file_path_error = 'Local file path does not exist'
|
| - raise CloudStorageModuleStub.CloudStorageError(file_path_error)
|
| - self.remote_paths[bucket][remote_path] = (
|
| - CloudStorageModuleStub.CalculateHash(self, local_path))
|
| - return remote_path
|
| -
|
| - def GetHelper(self, bucket, remote_path, local_path, only_if_changed):
|
| - CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
|
| - if not remote_path in self.remote_paths[bucket]:
|
| - if only_if_changed:
|
| - return False
|
| - raise CloudStorageModuleStub.NotFoundError('Remote file does not exist.')
|
| - remote_hash = self.remote_paths[bucket][remote_path]
|
| - local_hash = self.local_file_hashes[local_path]
|
| - if only_if_changed and remote_hash == local_hash:
|
| - return False
|
| - self.downloaded_files.append(remote_path)
|
| - self.local_file_hashes[local_path] = remote_hash
|
| - self.local_hash_files[local_path + '.sha1'] = remote_hash
|
| - return remote_hash
|
| -
|
| - def Get(self, bucket, remote_path, local_path):
|
| - return CloudStorageModuleStub.GetHelper(self, bucket, remote_path,
|
| - local_path, False)
|
| -
|
| - def GetIfChanged(self, local_path, bucket=None):
|
| - remote_path = os.path.basename(local_path)
|
| - if bucket:
|
| - return CloudStorageModuleStub.GetHelper(self, bucket, remote_path,
|
| - local_path, True)
|
| - result = CloudStorageModuleStub.GetHelper(
|
| - self, self.PUBLIC_BUCKET, remote_path, local_path, True)
|
| - if not result:
|
| - result = CloudStorageModuleStub.GetHelper(
|
| - self, self.PARTNER_BUCKET, remote_path, local_path, True)
|
| - if not result:
|
| - result = CloudStorageModuleStub.GetHelper(
|
| - self, self.INTERNAL_BUCKET, remote_path, local_path, True)
|
| - return result
|
| -
|
| - def GetFilesInDirectoryIfChanged(self, directory, bucket):
|
| - if os.path.dirname(directory) == directory: # If in the root dir.
|
| - raise ValueError('Trying to serve root directory from HTTP server.')
|
| - for dirpath, _, filenames in os.walk(directory):
|
| - for filename in filenames:
|
| - path, extension = os.path.splitext(
|
| - os.path.join(dirpath, filename))
|
| - if extension != '.sha1':
|
| - continue
|
| - self.GetIfChanged(path, bucket)
|
| -
|
| - def CalculateHash(self, file_path):
|
| - return self.local_file_hashes[file_path]
|
| -
|
| - def ReadHash(self, hash_path):
|
| - return self.local_hash_files[hash_path]
|
| -
|
| -
|
| -class LoggingStub(object):
|
| - def __init__(self):
|
| - self.warnings = []
|
| - self.errors = []
|
| -
|
| - def info(self, msg, *args):
|
| - pass
|
| -
|
| - def error(self, msg, *args):
|
| - self.errors.append(msg % args)
|
| -
|
| - def warning(self, msg, *args):
|
| - self.warnings.append(msg % args)
|
| -
|
| - def warn(self, msg, *args):
|
| - self.warning(msg, *args)
|
| -
|
| -
|
| -class OpenFunctionStub(object):
|
| - class FileStub(object):
|
| - def __init__(self, data):
|
| - self._data = data
|
| -
|
| - def __enter__(self):
|
| - return self
|
| -
|
| - def __exit__(self, *args):
|
| - pass
|
| -
|
| - def read(self, size=None):
|
| - if size:
|
| - return self._data[:size]
|
| - else:
|
| - return self._data
|
| -
|
| - def write(self, data):
|
| - self._data.write(data)
|
| -
|
| - def close(self):
|
| - pass
|
| -
|
| - def __init__(self):
|
| - self.files = {}
|
| -
|
| - def __call__(self, name, *args, **kwargs):
|
| - return OpenFunctionStub.FileStub(self.files[name])
|
| -
|
| -
|
| -class OsModuleStub(object):
|
| - class OsEnvironModuleStub(object):
|
| - def get(self, _):
|
| - return None
|
| -
|
| - class OsPathModuleStub(object):
|
| - def __init__(self, sys_module):
|
| - self.sys = sys_module
|
| - self.files = []
|
| - self.dirs = []
|
| -
|
| - def exists(self, path):
|
| - return path in self.files
|
| -
|
| - def isfile(self, path):
|
| - return path in self.files
|
| -
|
| - def isdir(self, path):
|
| - return path in self.dirs
|
| -
|
| - def join(self, *paths):
|
| - def IsAbsolutePath(path):
|
| - if self.sys.platform.startswith('win'):
|
| - return re.match('[a-zA-Z]:\\\\', path)
|
| - else:
|
| - return path.startswith('/')
|
| -
|
| - # Per Python specification, if any component is an absolute path,
|
| - # discard previous components.
|
| - for index, path in reversed(list(enumerate(paths))):
|
| - if IsAbsolutePath(path):
|
| - paths = paths[index:]
|
| - break
|
| -
|
| - if self.sys.platform.startswith('win'):
|
| - tmp = os.path.join(*paths)
|
| - return tmp.replace('/', '\\')
|
| - else:
|
| - tmp = os.path.join(*paths)
|
| - return tmp.replace('\\', '/')
|
| -
|
| - def basename(self, path):
|
| - if self.sys.platform.startswith('win'):
|
| - return ntpath.basename(path)
|
| - else:
|
| - return posixpath.basename(path)
|
| -
|
| - @staticmethod
|
| - def abspath(path):
|
| - return os.path.abspath(path)
|
| -
|
| - @staticmethod
|
| - def expanduser(path):
|
| - return os.path.expanduser(path)
|
| -
|
| - @staticmethod
|
| - def dirname(path):
|
| - return os.path.dirname(path)
|
| -
|
| - @staticmethod
|
| - def realpath(path):
|
| - return os.path.realpath(path)
|
| -
|
| - @staticmethod
|
| - def split(path):
|
| - return os.path.split(path)
|
| -
|
| - @staticmethod
|
| - def splitext(path):
|
| - return os.path.splitext(path)
|
| -
|
| - @staticmethod
|
| - def splitdrive(path):
|
| - return os.path.splitdrive(path)
|
| -
|
| - X_OK = os.X_OK
|
| -
|
| - sep = os.sep
|
| - pathsep = os.pathsep
|
| -
|
| - def __init__(self, sys_module=sys):
|
| - self.path = OsModuleStub.OsPathModuleStub(sys_module)
|
| - self.environ = OsModuleStub.OsEnvironModuleStub()
|
| - self.display = ':0'
|
| - self.local_app_data = None
|
| - self.sys_path = None
|
| - self.program_files = None
|
| - self.program_files_x86 = None
|
| - self.devnull = os.devnull
|
| - self._directory = {}
|
| -
|
| - def access(self, path, _):
|
| - return path in self.path.files
|
| -
|
| - def getenv(self, name, value=None):
|
| - if name == 'DISPLAY':
|
| - env = self.display
|
| - elif name == 'LOCALAPPDATA':
|
| - env = self.local_app_data
|
| - elif name == 'PATH':
|
| - env = self.sys_path
|
| - elif name == 'PROGRAMFILES':
|
| - env = self.program_files
|
| - elif name == 'PROGRAMFILES(X86)':
|
| - env = self.program_files_x86
|
| - else:
|
| - raise NotImplementedError('Unsupported getenv')
|
| - return env if env else value
|
| -
|
| - def chdir(self, path):
|
| - pass
|
| -
|
| - def walk(self, top):
|
| - for dir_name in self._directory:
|
| - yield top, dir_name, self._directory[dir_name]
|
| -
|
| -
|
| -class PerfControlModuleStub(object):
|
| - class PerfControlStub(object):
|
| - def __init__(self, adb):
|
| - pass
|
| -
|
| - def __init__(self):
|
| - self.PerfControl = PerfControlModuleStub.PerfControlStub
|
| -
|
| -
|
| -class RawInputFunctionStub(object):
|
| - def __init__(self):
|
| - self.input = ''
|
| -
|
| - def __call__(self, name, *args, **kwargs):
|
| - return self.input
|
| -
|
| -
|
| -class SubprocessModuleStub(object):
|
| - class PopenStub(object):
|
| - def __init__(self):
|
| - self.communicate_result = ('', '')
|
| - self.returncode_result = 0
|
| -
|
| - def __call__(self, args, **kwargs):
|
| - return self
|
| -
|
| - def communicate(self):
|
| - return self.communicate_result
|
| -
|
| - @property
|
| - def returncode(self):
|
| - return self.returncode_result
|
| -
|
| - def __init__(self):
|
| - self.Popen = SubprocessModuleStub.PopenStub()
|
| - self.PIPE = None
|
| -
|
| - def call(self, *args, **kwargs):
|
| - pass
|
| -
|
| -
|
| -class SysModuleStub(object):
|
| - def __init__(self):
|
| - self.platform = ''
|
| -
|
| -
|
| -class ThermalThrottleModuleStub(object):
|
| - class ThermalThrottleStub(object):
|
| - def __init__(self, adb):
|
| - pass
|
| -
|
| - def __init__(self):
|
| - self.ThermalThrottle = ThermalThrottleModuleStub.ThermalThrottleStub
|
| -
|
| -class CertUtilsStub(object):
|
| - openssl_import_error = None
|
| -
|
| - @staticmethod
|
| - def write_dummy_ca_cert(_ca_cert_str, _key_str, cert_path):
|
| - pass
|
| -
|
| - @staticmethod
|
| - def generate_dummy_ca_cert():
|
| - return '-', '-'
|
| -
|
| -class AdbInstallCertStub(object):
|
| - class AndroidCertInstaller(object):
|
| - def __init__(self, device_id, cert_name, cert_path):
|
| - del cert_name, cert_path # unused
|
| - if device_id == 'success':
|
| - pass
|
| - elif device_id == 'failure':
|
| - raise Exception('Test exception.')
|
| -
|
| - def install_cert(self, overwrite_cert=False):
|
| - pass
|
| -
|
| -class PlatformSettingsStub(object):
|
| - @staticmethod
|
| - def HasSniSupport():
|
| - return True
|
|
|