Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1155)

Unified Diff: tools/telemetry/telemetry/internal/platform/android_platform_backend.py

Issue 1647513002: Delete tools/telemetry. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: tools/telemetry/telemetry/internal/platform/android_platform_backend.py
diff --git a/tools/telemetry/telemetry/internal/platform/android_platform_backend.py b/tools/telemetry/telemetry/internal/platform/android_platform_backend.py
deleted file mode 100644
index 86264d887713967afa7e15fc42cf3fa98f23ecda..0000000000000000000000000000000000000000
--- a/tools/telemetry/telemetry/internal/platform/android_platform_backend.py
+++ /dev/null
@@ -1,821 +0,0 @@
-# Copyright 2013 The Chromium Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-import logging
-import os
-import re
-import shutil
-import subprocess
-import tempfile
-
-from telemetry.core import android_platform
-from telemetry.core import exceptions
-from telemetry.core import platform
-from telemetry.core import util
-from telemetry import decorators
-from telemetry.internal.forwarders import android_forwarder
-from telemetry.internal.image_processing import video
-from telemetry.internal.platform import android_device
-from telemetry.internal.platform import linux_based_platform_backend
-from telemetry.internal.platform.power_monitor import android_dumpsys_power_monitor
-from telemetry.internal.platform.power_monitor import android_fuelgauge_power_monitor
-from telemetry.internal.platform.power_monitor import android_temperature_monitor
-from telemetry.internal.platform.power_monitor import monsoon_power_monitor
-from telemetry.internal.platform.power_monitor import (
- android_power_monitor_controller)
-from telemetry.internal.platform.power_monitor import sysfs_power_monitor
-from telemetry.internal.platform.profiler import android_prebuilt_profiler_helper
-from telemetry.internal.util import exception_formatter
-from telemetry.internal.util import external_modules
-
-psutil = external_modules.ImportOptionalModule('psutil')
-import adb_install_cert
-import certutils
-import platformsettings
-
-from devil.android import app_ui
-from devil.android import battery_utils
-from devil.android import device_errors
-from devil.android import device_utils
-from devil.android.perf import cache_control
-from devil.android.perf import perf_control
-from devil.android.perf import thermal_throttle
-from devil.android.sdk import version_codes
-from devil.android.tools import video_recorder
-
-try:
- from devil.android.perf import surface_stats_collector
-except Exception:
- surface_stats_collector = None
-
-
-_DEVICE_COPY_SCRIPT_FILE = os.path.abspath(os.path.join(
- os.path.dirname(__file__), 'efficient_android_directory_copy.sh'))
-_DEVICE_COPY_SCRIPT_LOCATION = (
- '/data/local/tmp/efficient_android_directory_copy.sh')
-
-# TODO(nednguyen): Remove this method and update the client config to point to
-# the correct binary instead.
-def _FindLocallyBuiltPath(binary_name):
- """Finds the most recently built |binary_name|."""
- command = None
- command_mtime = 0
- chrome_root = util.GetChromiumSrcDir()
- required_mode = os.X_OK
- if binary_name.endswith('.apk'):
- required_mode = os.R_OK
- for build_dir, build_type in util.GetBuildDirectories():
- candidate = os.path.join(chrome_root, build_dir, build_type, binary_name)
- if os.path.isfile(candidate) and os.access(candidate, required_mode):
- candidate_mtime = os.stat(candidate).st_mtime
- if candidate_mtime > command_mtime:
- command = candidate
- command_mtime = candidate_mtime
- return command
-
-
-def _GetBuildTypeOfPath(path):
- if not path:
- return None
- for build_dir, build_type in util.GetBuildDirectories():
- if os.path.join(build_dir, build_type) in path:
- return build_type
- return None
-
-
-class AndroidPlatformBackend(
- linux_based_platform_backend.LinuxBasedPlatformBackend):
- def __init__(self, device, finder_options):
- assert device, (
- 'AndroidPlatformBackend can only be initialized from remote device')
- super(AndroidPlatformBackend, self).__init__(device)
- self._device = device_utils.DeviceUtils(device.device_id)
- # Trying to root the device, if possible.
- if not self._device.HasRoot():
- try:
- self._device.EnableRoot()
- except device_errors.CommandFailedError:
- logging.warning('Unable to root %s', str(self._device))
- self._battery = battery_utils.BatteryUtils(self._device)
- self._enable_performance_mode = device.enable_performance_mode
- self._surface_stats_collector = None
- self._perf_tests_setup = perf_control.PerfControl(self._device)
- self._thermal_throttle = thermal_throttle.ThermalThrottle(self._device)
- self._raw_display_frame_rate_measurements = []
- try:
- self._can_access_protected_file_contents = (
- self._device.HasRoot() or self._device.NeedsSU())
- except Exception:
- logging.exception('New exception caused by DeviceUtils conversion')
- raise
- self._device_copy_script = None
- self._power_monitor = (
- android_power_monitor_controller.AndroidPowerMonitorController([
- android_temperature_monitor.AndroidTemperatureMonitor(self._device),
- monsoon_power_monitor.MonsoonPowerMonitor(self._device, self),
- android_dumpsys_power_monitor.DumpsysPowerMonitor(
- self._battery, self),
- sysfs_power_monitor.SysfsPowerMonitor(self, standalone=True),
- android_fuelgauge_power_monitor.FuelGaugePowerMonitor(
- self._battery),
- ], self._battery))
- self._video_recorder = None
- self._installed_applications = None
-
- self._wpr_ca_cert_path = None
- self._device_cert_util = None
- self._is_test_ca_installed = False
- self._system_ui = None
-
- self._use_rndis_forwarder = (
- finder_options.android_rndis or
- finder_options.browser_options.netsim or
- platform.GetHostPlatform().GetOSName() != 'linux')
-
- _FixPossibleAdbInstability()
-
- @property
- def log_file_path(self):
- return None
-
- @classmethod
- def SupportsDevice(cls, device):
- return isinstance(device, android_device.AndroidDevice)
-
- @classmethod
- def CreatePlatformForDevice(cls, device, finder_options):
- assert cls.SupportsDevice(device)
- platform_backend = AndroidPlatformBackend(device, finder_options)
- return android_platform.AndroidPlatform(platform_backend)
-
- @property
- def forwarder_factory(self):
- if not self._forwarder_factory:
- self._forwarder_factory = android_forwarder.AndroidForwarderFactory(
- self._device, self._use_rndis_forwarder)
-
- return self._forwarder_factory
-
- @property
- def use_rndis_forwarder(self):
- return self._use_rndis_forwarder
-
- @property
- def device(self):
- return self._device
-
- def GetSystemUi(self):
- if self._system_ui is None:
- self._system_ui = app_ui.AppUi(self.device, 'com.android.systemui')
- return self._system_ui
-
- def IsSvelte(self):
- try:
- self._device.RunShellCommand(
- 'getprop ro.build.description | grep svelte', check_return=True)
- return True
- except device_errors.AdbCommandFailedError:
- return False
-
- def IsDisplayTracingSupported(self):
- return bool(self.GetOSVersionName() >= 'J')
-
- def StartDisplayTracing(self):
- assert not self._surface_stats_collector
- # Clear any leftover data from previous timed out tests
- self._raw_display_frame_rate_measurements = []
- self._surface_stats_collector = \
- surface_stats_collector.SurfaceStatsCollector(self._device)
- self._surface_stats_collector.Start()
-
- def StopDisplayTracing(self):
- if not self._surface_stats_collector:
- return
-
- try:
- refresh_period, timestamps = self._surface_stats_collector.Stop()
- pid = self._surface_stats_collector.GetSurfaceFlingerPid()
- finally:
- self._surface_stats_collector = None
- # TODO(sullivan): should this code be inline, or live elsewhere?
- events = []
- for ts in timestamps:
- events.append({
- 'cat': 'SurfaceFlinger',
- 'name': 'vsync_before',
- 'ts': ts,
- 'pid': pid,
- 'tid': pid,
- 'args': {'data': {
- 'frame_count': 1,
- 'refresh_period': refresh_period,
- }}
- })
- return events
-
- def CanTakeScreenshot(self):
- return True
-
- def TakeScreenshot(self, file_path):
- return bool(self._device.TakeScreenshot(host_path=file_path))
-
- def SetFullPerformanceModeEnabled(self, enabled):
- if not self._enable_performance_mode:
- logging.warning('CPU governor will not be set!')
- return
- if enabled:
- self._perf_tests_setup.SetHighPerfMode()
- else:
- self._perf_tests_setup.SetDefaultPerfMode()
-
- def CanMonitorThermalThrottling(self):
- return True
-
- def IsThermallyThrottled(self):
- return self._thermal_throttle.IsThrottled()
-
- def HasBeenThermallyThrottled(self):
- return self._thermal_throttle.HasBeenThrottled()
-
- def GetCpuStats(self, pid):
- if not self._can_access_protected_file_contents:
- logging.warning('CPU stats cannot be retrieved on non-rooted device.')
- return {}
- return super(AndroidPlatformBackend, self).GetCpuStats(pid)
-
- def GetCpuTimestamp(self):
- if not self._can_access_protected_file_contents:
- logging.warning('CPU timestamp cannot be retrieved on non-rooted device.')
- return {}
- return super(AndroidPlatformBackend, self).GetCpuTimestamp()
-
- def SetGraphicsMemoryTrackingEnabled(self, enabled):
- if not enabled:
- self.KillApplication('memtrack_helper')
- return
-
- if not android_prebuilt_profiler_helper.InstallOnDevice(
- self._device, 'memtrack_helper'):
- raise Exception('Error installing memtrack_helper.')
- try:
- cmd = android_prebuilt_profiler_helper.GetDevicePath('memtrack_helper')
- cmd += ' -d'
- self._device.RunShellCommand(cmd, as_root=True, check_return=True)
- except Exception:
- logging.exception('New exception caused by DeviceUtils conversion')
- raise
-
- def PurgeUnpinnedMemory(self):
- """Purges the unpinned ashmem memory for the whole system.
-
- This can be used to make memory measurements more stable. Requires root.
- """
- if not self._can_access_protected_file_contents:
- logging.warning('Cannot run purge_ashmem. Requires a rooted device.')
- return
-
- if not android_prebuilt_profiler_helper.InstallOnDevice(
- self._device, 'purge_ashmem'):
- raise Exception('Error installing purge_ashmem.')
- try:
- output = self._device.RunShellCommand(
- android_prebuilt_profiler_helper.GetDevicePath('purge_ashmem'))
- except Exception:
- logging.exception('New exception caused by DeviceUtils conversion')
- raise
- for l in output:
- logging.info(l)
-
- def GetMemoryStats(self, pid):
- memory_usage = self._device.GetMemoryUsageForPid(pid)
- if not memory_usage:
- return {}
- return {'ProportionalSetSize': memory_usage['Pss'] * 1024,
- 'SharedDirty': memory_usage['Shared_Dirty'] * 1024,
- 'PrivateDirty': memory_usage['Private_Dirty'] * 1024,
- 'VMPeak': memory_usage['VmHWM'] * 1024}
-
- def GetChildPids(self, pid):
- child_pids = []
- ps = self.GetPsOutput(['pid', 'name'])
- for curr_pid, curr_name in ps:
- if int(curr_pid) == pid:
- name = curr_name
- for curr_pid, curr_name in ps:
- if curr_name.startswith(name) and curr_name != name:
- child_pids.append(int(curr_pid))
- break
- return child_pids
-
- @decorators.Cache
- def GetCommandLine(self, pid):
- ps = self.GetPsOutput(['pid', 'name'], pid)
- if not ps:
- raise exceptions.ProcessGoneException()
- return ps[0][1]
-
- @decorators.Cache
- def GetArchName(self):
- return self._device.GetABI()
-
- def GetOSName(self):
- return 'android'
-
- def GetDeviceTypeName(self):
- return self._device.product_model
-
- @decorators.Cache
- def GetOSVersionName(self):
- return self._device.GetProp('ro.build.id')[0]
-
- def CanFlushIndividualFilesFromSystemCache(self):
- return False
-
- def FlushEntireSystemCache(self):
- cache = cache_control.CacheControl(self._device)
- cache.DropRamCaches()
-
- def FlushSystemCacheForDirectory(self, directory):
- raise NotImplementedError()
-
- def FlushDnsCache(self):
- self._device.RunShellCommand('ndc resolver flushdefaultif', as_root=True)
-
- def StopApplication(self, application):
- """Stop the given |application|.
-
- Args:
- application: The full package name string of the application to stop.
- """
- self._device.ForceStop(application)
-
- def KillApplication(self, application):
- """Kill the given |application|.
-
- Might be used instead of ForceStop for efficiency reasons.
-
- Args:
- application: The full package name string of the application to kill.
- """
- self._device.KillAll(application, blocking=True, quiet=True)
-
- def LaunchApplication(
- self, application, parameters=None, elevate_privilege=False):
- """Launches the given |application| with a list of |parameters| on the OS.
-
- Args:
- application: The full package name string of the application to launch.
- parameters: A list of parameters to be passed to the ActivityManager.
- elevate_privilege: Currently unimplemented on Android.
- """
- if elevate_privilege:
- raise NotImplementedError("elevate_privilege isn't supported on android.")
- if not parameters:
- parameters = ''
- result_lines = self._device.RunShellCommand('am start %s %s' %
- (parameters, application))
- for line in result_lines:
- if line.startswith('Error: '):
- raise ValueError('Failed to start "%s" with error\n %s' %
- (application, line))
-
- def IsApplicationRunning(self, application):
- return len(self._device.GetPids(application)) > 0
-
- def CanLaunchApplication(self, application):
- if not self._installed_applications:
- self._installed_applications = self._device.RunShellCommand(
- 'pm list packages')
- return 'package:' + application in self._installed_applications
-
- def InstallApplication(self, application):
- self._installed_applications = None
- self._device.Install(application)
-
- @decorators.Cache
- def CanCaptureVideo(self):
- return self.GetOSVersionName() >= 'K'
-
- def StartVideoCapture(self, min_bitrate_mbps):
- """Starts the video capture at specified bitrate."""
- min_bitrate_mbps = max(min_bitrate_mbps, 0.1)
- if min_bitrate_mbps > 100:
- raise ValueError('Android video capture cannot capture at %dmbps. '
- 'Max capture rate is 100mbps.' % min_bitrate_mbps)
- if self.is_video_capture_running:
- self._video_recorder.Stop()
- self._video_recorder = video_recorder.VideoRecorder(
- self._device, megabits_per_second=min_bitrate_mbps)
- self._video_recorder.Start(timeout=5)
-
- @property
- def is_video_capture_running(self):
- return self._video_recorder is not None
-
- def StopVideoCapture(self):
- assert self.is_video_capture_running, 'Must start video capture first'
- self._video_recorder.Stop()
- video_file_obj = tempfile.NamedTemporaryFile()
- self._video_recorder.Pull(video_file_obj.name)
- self._video_recorder = None
-
- return video.Video(video_file_obj)
-
- def CanMonitorPower(self):
- return self._power_monitor.CanMonitorPower()
-
- def StartMonitoringPower(self, browser):
- self._power_monitor.StartMonitoringPower(browser)
-
- def StopMonitoringPower(self):
- return self._power_monitor.StopMonitoringPower()
-
- def CanMonitorNetworkData(self):
- return self._device.build_version_sdk >= version_codes.LOLLIPOP
-
- def GetNetworkData(self, browser):
- return self._battery.GetNetworkData(browser._browser_backend.package)
-
- def PathExists(self, device_path, timeout=None, retries=None):
- """ Return whether the given path exists on the device.
- This method is the same as
- devil.android.device_utils.DeviceUtils.PathExists.
- """
- return self._device.PathExists(
- device_path, timeout=timeout, retries=retries)
-
- def GetFileContents(self, fname):
- if not self._can_access_protected_file_contents:
- logging.warning('%s cannot be retrieved on non-rooted device.' % fname)
- return ''
- return self._device.ReadFile(fname, as_root=True)
-
- def GetPsOutput(self, columns, pid=None):
- assert columns == ['pid', 'name'] or columns == ['pid'], \
- 'Only know how to return pid and name. Requested: ' + columns
- command = 'ps'
- if pid:
- command += ' -p %d' % pid
- ps = self._device.RunShellCommand(command, large_output=True)[1:]
- output = []
- for line in ps:
- data = line.split()
- curr_pid = data[1]
- curr_name = data[-1]
- if columns == ['pid', 'name']:
- output.append([curr_pid, curr_name])
- else:
- output.append([curr_pid])
- return output
-
- def RunCommand(self, command):
- return '\n'.join(self._device.RunShellCommand(command))
-
- @staticmethod
- def ParseCStateSample(sample):
- sample_stats = {}
- for cpu in sample:
- values = sample[cpu].splitlines()
- # Each state has three values after excluding the time value.
- num_states = (len(values) - 1) / 3
- names = values[:num_states]
- times = values[num_states:2 * num_states]
- cstates = {'C0': int(values[-1]) * 10 ** 6}
- for i, state in enumerate(names):
- if state == 'C0':
- # The Exynos cpuidle driver for the Nexus 10 uses the name 'C0' for
- # its WFI state.
- # TODO(tmandel): We should verify that no other Android device
- # actually reports time in C0 causing this to report active time as
- # idle time.
- state = 'WFI'
- cstates[state] = int(times[i])
- cstates['C0'] -= int(times[i])
- sample_stats[cpu] = cstates
- return sample_stats
-
- def SetRelaxSslCheck(self, value):
- old_flag = self._device.GetProp('socket.relaxsslcheck')
- self._device.SetProp('socket.relaxsslcheck', value)
- return old_flag
-
- def ForwardHostToDevice(self, host_port, device_port):
- self._device.adb.Forward('tcp:%d' % host_port, device_port)
-
- def StopForwardingHost(self, host_port):
- for line in self._device.adb.ForwardList().strip().splitlines():
- line = line.split(' ')
- if line[0] == self._device and line[1] == 'tcp:%s' % host_port:
- self._device.adb.ForwardRemove('tcp:%d' % host_port)
- break
- else:
- logging.warning('Port %s not found in adb forward --list for device %s',
- host_port, self._device)
-
- def DismissCrashDialogIfNeeded(self):
- """Dismiss any error dialogs.
-
- Limit the number in case we have an error loop or we are failing to dismiss.
- """
- for _ in xrange(10):
- if not self._device.DismissCrashDialogIfNeeded():
- break
-
- def IsAppRunning(self, process_name):
- """Determine if the given process is running.
-
- Args:
- process_name: The full package name string of the process.
- """
- return bool(self._device.GetPids(process_name))
-
- @property
- def wpr_ca_cert_path(self):
- """Path to root certificate installed on browser (or None).
-
- If this is set, web page replay will use it to sign HTTPS responses.
- """
- if self._wpr_ca_cert_path:
- assert os.path.isfile(self._wpr_ca_cert_path)
- return self._wpr_ca_cert_path
-
- def InstallTestCa(self):
- """Install a randomly generated root CA on the android device.
-
- This allows transparent HTTPS testing with WPR server without need
- to tweak application network stack.
- """
- # TODO(slamm): Move certificate creation related to webpagereplay.py.
- # The only code that needs to be in platform backend is installing the cert.
- if certutils.openssl_import_error:
- logging.warning(
- 'The OpenSSL module is unavailable. '
- 'Will fallback to ignoring certificate errors.')
- return
- if not platformsettings.HasSniSupport():
- logging.warning(
- 'Web Page Replay requires SNI support (pyOpenSSL 0.13 or greater) '
- 'to generate certificates from a test CA. '
- 'Will fallback to ignoring certificate errors.')
- return
- try:
- self._wpr_ca_cert_path = os.path.join(tempfile.mkdtemp(), 'testca.pem')
- certutils.write_dummy_ca_cert(*certutils.generate_dummy_ca_cert(),
- cert_path=self._wpr_ca_cert_path)
- self._device_cert_util = adb_install_cert.AndroidCertInstaller(
- self._device.adb.GetDeviceSerial(), None, self._wpr_ca_cert_path)
- logging.info('Installing test certificate authority on device: %s',
- str(self._device))
- self._device_cert_util.install_cert(overwrite_cert=True)
- self._is_test_ca_installed = True
- except Exception as e:
- # Fallback to ignoring certificate errors.
- self.RemoveTestCa()
- logging.warning(
- 'Unable to install test certificate authority on device: %s. '
- 'Will fallback to ignoring certificate errors. Install error: %s',
- str(self._device), e)
-
- @property
- def is_test_ca_installed(self):
- return self._is_test_ca_installed
-
- def RemoveTestCa(self):
- """Remove root CA generated by previous call to InstallTestCa().
-
- Removes the test root certificate from both the device and host machine.
- """
- if not self._wpr_ca_cert_path:
- return
-
- if self._is_test_ca_installed:
- try:
- self._device_cert_util.remove_cert()
- except Exception:
- # Best effort cleanup - show the error and continue.
- exception_formatter.PrintFormattedException(
- msg=('Error while trying to remove certificate authority: %s. '
- % str(self._device)))
- self._is_test_ca_installed = False
-
- shutil.rmtree(os.path.dirname(self._wpr_ca_cert_path), ignore_errors=True)
- self._wpr_ca_cert_path = None
- self._device_cert_util = None
-
- def PushProfile(self, package, new_profile_dir):
- """Replace application profile with files found on host machine.
-
- Pushing the profile is slow, so we don't want to do it every time.
- Avoid this by pushing to a safe location using PushChangedFiles, and
- then copying into the correct location on each test run.
-
- Args:
- package: The full package name string of the application for which the
- profile is to be updated.
- new_profile_dir: Location where profile to be pushed is stored on the
- host machine.
- """
- (profile_parent, profile_base) = os.path.split(new_profile_dir)
- # If the path ends with a '/' python split will return an empty string for
- # the base name; so we now need to get the base name from the directory.
- if not profile_base:
- profile_base = os.path.basename(profile_parent)
-
- saved_profile_location = '/sdcard/profile/%s' % profile_base
- self._device.PushChangedFiles([(new_profile_dir, saved_profile_location)])
-
- profile_dir = self._GetProfileDir(package)
- try:
- self._EfficientDeviceDirectoryCopy(
- saved_profile_location, profile_dir)
- except Exception:
- logging.exception('New exception caused by DeviceUtils conversion')
- raise
- dumpsys = self._device.RunShellCommand('dumpsys package %s' % package)
- id_line = next(line for line in dumpsys if 'userId=' in line)
- uid = re.search(r'\d+', id_line).group()
- files = self._device.RunShellCommand(
- 'ls "%s"' % profile_dir, as_root=True)
- files.remove('lib')
- paths = ['%s%s' % (profile_dir, f) for f in files]
- for path in paths:
- extended_path = '%s %s/* %s/*/* %s/*/*/*' % (path, path, path, path)
- self._device.RunShellCommand(
- 'chown %s.%s %s' % (uid, uid, extended_path))
-
- def _EfficientDeviceDirectoryCopy(self, source, dest):
- if not self._device_copy_script:
- self._device.adb.Push(
- _DEVICE_COPY_SCRIPT_FILE,
- _DEVICE_COPY_SCRIPT_LOCATION)
- self._device_copy_script = _DEVICE_COPY_SCRIPT_FILE
- self._device.RunShellCommand(
- ['sh', self._device_copy_script, source, dest])
-
- def RemoveProfile(self, package, ignore_list):
- """Delete application profile on device.
-
- Args:
- package: The full package name string of the application for which the
- profile is to be deleted.
- ignore_list: List of files to keep.
- """
- profile_dir = self._GetProfileDir(package)
- files = self._device.RunShellCommand(
- 'ls "%s"' % profile_dir, as_root=True)
- paths = ['"%s%s"' % (profile_dir, f) for f in files
- if f not in ignore_list]
- self._device.RunShellCommand('rm -r %s' % ' '.join(paths), as_root=True)
-
- def PullProfile(self, package, output_profile_path):
- """Copy application profile from device to host machine.
-
- Args:
- package: The full package name string of the application for which the
- profile is to be copied.
- output_profile_dir: Location where profile to be stored on host machine.
- """
- profile_dir = self._GetProfileDir(package)
- logging.info("Pulling profile directory from device: '%s'->'%s'.",
- profile_dir, output_profile_path)
- # To minimize bandwidth it might be good to look at whether all the data
- # pulled down is really needed e.g. .pak files.
- if not os.path.exists(output_profile_path):
- os.makedirs(output_profile_path)
- try:
- files = self._device.RunShellCommand(['ls', profile_dir])
- except Exception:
- logging.exception('New exception caused by DeviceUtils conversion')
- raise
- for f in files:
- # Don't pull lib, since it is created by the installer.
- if f != 'lib':
- source = '%s%s' % (profile_dir, f)
- dest = os.path.join(output_profile_path, f)
- try:
- self._device.PullFile(source, dest, timeout=240)
- except device_errors.CommandFailedError:
- logging.exception('Failed to pull %s to %s', source, dest)
-
- def _GetProfileDir(self, package):
- """Returns the on-device location where the application profile is stored
- based on Android convention.
-
- Args:
- package: The full package name string of the application.
- """
- return '/data/data/%s/' % package
-
- def SetDebugApp(self, package):
- """Set application to debugging.
-
- Args:
- package: The full package name string of the application.
- """
- if self._device.IsUserBuild():
- logging.debug('User build device, setting debug app')
- self._device.RunShellCommand('am set-debug-app --persistent %s' % package)
-
- def GetLogCat(self, number_of_lines=500):
- """Returns most recent lines of logcat dump.
-
- Args:
- number_of_lines: Number of lines of log to return.
- """
- return '\n'.join(self._device.RunShellCommand(
- 'logcat -d -t %d' % number_of_lines))
-
- def GetStandardOutput(self):
- return None
-
- def GetStackTrace(self, target_arch):
- """Returns stack trace.
-
- The stack trace consists of raw logcat dump, logcat dump with symbols,
- and stack info from tomstone files.
-
- Args:
- target_arch: String specifying device architecture (eg. arm, arm64, mips,
- x86, x86_64)
- """
- def Decorate(title, content):
- return "%s\n%s\n%s\n" % (title, content, '*' * 80)
- # Get the last lines of logcat (large enough to contain stacktrace)
- logcat = self.GetLogCat()
- ret = Decorate('Logcat', logcat)
- stack = os.path.join(util.GetChromiumSrcDir(), 'third_party',
- 'android_platform', 'development', 'scripts', 'stack')
- # Try to symbolize logcat.
- if os.path.exists(stack):
- cmd = [stack]
- if target_arch:
- cmd.append('--arch=%s' % target_arch)
- p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
- ret += Decorate('Stack from Logcat', p.communicate(input=logcat)[0])
-
- # Try to get tombstones.
- tombstones = os.path.join(util.GetChromiumSrcDir(), 'build', 'android',
- 'tombstones.py')
- if os.path.exists(tombstones):
- ret += Decorate('Tombstones',
- subprocess.Popen([tombstones, '-w', '--device',
- self._device.adb.GetDeviceSerial()],
- stdout=subprocess.PIPE).communicate()[0])
- return ret
-
- def IsScreenOn(self):
- """Determines if device screen is on."""
- return self._device.IsScreenOn()
-
- @staticmethod
- def _IsScreenLocked(input_methods):
- """Parser method for IsScreenLocked()
-
- Args:
- input_methods: Output from dumpsys input_methods
-
- Returns:
- boolean: True if screen is locked, false if screen is not locked.
-
- Raises:
- ValueError: An unknown value is found for the screen lock state.
- AndroidDeviceParsingError: Error in detecting screen state.
-
- """
- for line in input_methods:
- if 'mHasBeenInactive' in line:
- for pair in line.strip().split(' '):
- key, value = pair.split('=', 1)
- if key == 'mHasBeenInactive':
- if value == 'true':
- return True
- elif value == 'false':
- return False
- else:
- raise ValueError('Unknown value for %s: %s' % (key, value))
- raise exceptions.AndroidDeviceParsingError(str(input_methods))
-
- def IsScreenLocked(self):
- """Determines if device screen is locked."""
- input_methods = self._device.RunShellCommand('dumpsys input_method',
- check_return=True)
- return self._IsScreenLocked(input_methods)
-
-def _FixPossibleAdbInstability():
- """Host side workaround for crbug.com/268450 (adb instability).
-
- The adb server has a race which is mitigated by binding to a single core.
- """
- if not psutil:
- return
- for process in psutil.process_iter():
- try:
- if psutil.version_info >= (2, 0):
- if 'adb' in process.name():
- process.cpu_affinity([0])
- else:
- if 'adb' in process.name:
- process.set_cpu_affinity([0])
- except (psutil.NoSuchProcess, psutil.AccessDenied):
- logging.warn('Failed to set adb process CPU affinity')

Powered by Google App Engine
This is Rietveld 408576698