Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(335)

Unified Diff: chrome/test/pyautolib/policy_base.py

Issue 222873002: Remove pyauto tests. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src/
Patch Set: sync Created 6 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « chrome/test/pyautolib/plugins_info.py ('k') | chrome/test/pyautolib/policy_posix_util.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: chrome/test/pyautolib/policy_base.py
===================================================================
--- chrome/test/pyautolib/policy_base.py (revision 261231)
+++ chrome/test/pyautolib/policy_base.py (working copy)
@@ -1,586 +0,0 @@
-# Copyright (c) 2012 The Chromium Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Base class for tests that need to update the policies enforced by Chrome.
-
-Subclasses can call SetUserPolicy (ChromeOS, Linux, Windows) and
-SetDevicePolicy (ChromeOS only) with a dictionary of the policies to install.
-
-The current implementation depends on the platform. The implementations might
-change in the future, but tests relying on the above calls will keep working.
-"""
-
-# On ChromeOS, a mock DMServer is started and enterprise enrollment faked
-# against it. The mock DMServer then serves user and device policy to Chrome.
-#
-# For this setup to work, the DNS, GAIA and TPM (if present) are mocked as well:
-#
-# * The mock DNS resolves all addresses to 127.0.0.1. This allows the mock GAIA
-# to handle all login attempts. It also eliminates the impact of flaky network
-# connections on tests. Beware though that no cloud services can be accessed
-# due to this DNS redirect.
-#
-# * The mock GAIA permits login with arbitrary credentials and accepts any OAuth
-# tokens sent to it for verification as valid.
-#
-# * When running on a real device, its TPM is disabled. If the TPM were enabled,
-# enrollment could not be undone without a reboot. Disabling the TPM makes
-# cryptohomed behave as if no TPM was present, allowing enrollment to be
-# undone by removing the install attributes.
-#
-# To disable the TPM, 0 must be written to /sys/class/misc/tpm0/device/enabled.
-# Since this file is not writeable, a tpmfs is mounted that shadows the file
-# with a writeable copy.
-
-import json
-import logging
-import os
-import subprocess
-
-import pyauto
-
-if pyauto.PyUITest.IsChromeOS():
- import sys
- import warnings
-
- import pyauto_paths
-
- # Ignore deprecation warnings, they make our output more cluttered.
- warnings.filterwarnings('ignore', category=DeprecationWarning)
-
- # Find the path to the pyproto and add it to sys.path.
- # Prepend it so that google.protobuf is loaded from here.
- for path in pyauto_paths.GetBuildDirs():
- p = os.path.join(path, 'pyproto')
- if os.path.isdir(p):
- sys.path = [p, os.path.join(p, 'chrome', 'browser', 'policy',
- 'proto')] + sys.path
- break
- sys.path.append('/usr/local') # to import autotest libs.
-
- import dbus
- import device_management_backend_pb2 as dm
- import pyauto_utils
- import string
- import tempfile
- import urllib
- import urllib2
- import uuid
- from autotest.cros import auth_server
- from autotest.cros import constants
- from autotest.cros import cros_ui
- from autotest.cros import dns_server
-elif pyauto.PyUITest.IsWin():
- import _winreg as winreg
-elif pyauto.PyUITest.IsMac():
- import getpass
- import plistlib
-
-# ASN.1 object identifier for PKCS#1/RSA.
-PKCS1_RSA_OID = '\x2a\x86\x48\x86\xf7\x0d\x01\x01\x01'
-
-TPM_SYSFS_PATH = '/sys/class/misc/tpm0'
-TPM_SYSFS_ENABLED_FILE = os.path.join(TPM_SYSFS_PATH, 'device/enabled')
-
-
-class PolicyTestBase(pyauto.PyUITest):
- """A base class for tests that need to set up and modify policies.
-
- Subclasses can use the methods SetUserPolicy (ChromeOS, Linux, Windows) and
- SetDevicePolicy (ChromeOS only) to set the policies seen by Chrome.
- """
-
- if pyauto.PyUITest.IsChromeOS():
- # TODO(bartfab): Extend the C++ wrapper that starts the mock DMServer so
- # that an owner can be passed in. Without this, the server will assume that
- # the owner is user@example.com and for consistency, so must we.
- owner = 'user@example.com'
- # Subclasses may override these credentials to fake enrollment into another
- # mode or use different device and machine IDs.
- mode = 'enterprise'
- device_id = string.upper(str(uuid.uuid4()))
- machine_id = 'CROSTEST'
-
- _auth_server = None
- _dns_server = None
-
- def ShouldAutoLogin(self):
- return False
-
- @staticmethod
- def _Call(command, check=False):
- """Invokes a subprocess and optionally asserts the return value is zero."""
- with open(os.devnull, 'w') as devnull:
- if check:
- return subprocess.check_call(command.split(' '), stdout=devnull)
- else:
- return subprocess.call(command.split(' '), stdout=devnull)
-
- def _WriteFile(self, path, content):
- """Writes content to path, creating any intermediary directories."""
- if not os.path.exists(os.path.dirname(path)):
- os.makedirs(os.path.dirname(path))
- f = open(path, 'w')
- f.write(content)
- f.close()
-
- def _GetTestServerPoliciesFilePath(self):
- """Returns the path of the cloud policy configuration file."""
- assert self.IsChromeOS()
- return os.path.join(self._temp_data_dir, 'device_management')
-
- def _GetHttpURLForDeviceManagement(self):
- """Returns the URL at which the TestServer is serving user policy."""
- assert self.IsChromeOS()
- return self._http_server.GetURL('device_management').spec()
-
- def _RemoveIfExists(self, filename):
- """Removes a file if it exists."""
- if os.path.exists(filename):
- os.remove(filename)
-
- def _StartSessionManagerAndChrome(self):
- """Starts the session manager and Chrome.
-
- Requires that the session manager be stopped already.
- """
- # Ugly hack: session manager will not spawn Chrome if this file exists. That
- # is usually a good thing (to keep the automation channel open), but in this
- # case we really want to restart chrome. PyUITest.setUp() will be called
- # after session manager and chrome have restarted, and will setup the
- # automation channel.
- restore_magic_file = False
- if os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE):
- logging.debug('DISABLE_BROWSER_RESTART_MAGIC_FILE found. '
- 'Removing temporarily for the next restart.')
- restore_magic_file = True
- os.remove(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE)
- assert not os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE)
-
- logging.debug('Starting session manager again')
- cros_ui.start()
-
- # cros_ui.start() waits for the login prompt to be visible, so Chrome has
- # already started once it returns.
- if restore_magic_file:
- open(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE, 'w').close()
- assert os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE)
-
- def _WritePolicyOnChromeOS(self):
- """Updates the mock DMServer's input file with current policy."""
- assert self.IsChromeOS()
- policy_dict = {
- 'google/chromeos/device': self._device_policy,
- 'google/chromeos/user': {
- 'mandatory': self._user_policy,
- 'recommended': {},
- },
- 'managed_users': ['*'],
- }
- self._WriteFile(self._GetTestServerPoliciesFilePath(),
- json.dumps(policy_dict, sort_keys=True, indent=2) + '\n')
-
- @staticmethod
- def _IsCryptohomedReadyOnChromeOS():
- """Checks whether cryptohomed is running and ready to accept DBus calls."""
- assert pyauto.PyUITest.IsChromeOS()
- try:
- bus = dbus.SystemBus()
- proxy = bus.get_object('org.chromium.Cryptohome',
- '/org/chromium/Cryptohome')
- dbus.Interface(proxy, 'org.chromium.CryptohomeInterface')
- except dbus.DBusException:
- return False
- return True
-
- def _ClearInstallAttributesOnChromeOS(self):
- """Resets the install attributes."""
- assert self.IsChromeOS()
- self._RemoveIfExists('/home/.shadow/install_attributes.pb')
- self._Call('restart cryptohomed', check=True)
- assert self.WaitUntil(self._IsCryptohomedReadyOnChromeOS)
-
- def _DMPostRequest(self, request_type, request, headers):
- """Posts a request to the mock DMServer."""
- assert self.IsChromeOS()
- url = self._GetHttpURLForDeviceManagement()
- url += '?' + urllib.urlencode({
- 'deviceid': self.device_id,
- 'oauth_token': 'dummy_oauth_token_that_is_not_checked_anyway',
- 'request': request_type,
- 'devicetype': 2,
- 'apptype': 'Chrome',
- 'agent': 'Chrome',
- })
- response = dm.DeviceManagementResponse()
- response.ParseFromString(urllib2.urlopen(urllib2.Request(
- url, request.SerializeToString(), headers)).read())
- return response
-
- def _DMRegisterDevice(self):
- """Registers with the mock DMServer and returns the DMToken."""
- assert self.IsChromeOS()
- dm_request = dm.DeviceManagementRequest()
- request = dm_request.register_request
- request.type = dm.DeviceRegisterRequest.DEVICE
- request.machine_id = self.machine_id
- dm_response = self._DMPostRequest('register', dm_request, {})
- return dm_response.register_response.device_management_token
-
- def _DMFetchPolicy(self, dm_token):
- """Fetches device policy from the mock DMServer."""
- assert self.IsChromeOS()
- dm_request = dm.DeviceManagementRequest()
- policy_request = dm_request.policy_request
- request = policy_request.request.add()
- request.policy_type = 'google/chromeos/device'
- request.signature_type = dm.PolicyFetchRequest.SHA1_RSA
- headers = {'Authorization': 'GoogleDMToken token=' + dm_token}
- dm_response = self._DMPostRequest('policy', dm_request, headers)
- response = dm_response.policy_response.response[0]
- assert response.policy_data
- assert response.policy_data_signature
- assert response.new_public_key
- return response
-
- def ExtraChromeFlags(self):
- """Sets up Chrome to use cloud policies on ChromeOS."""
- flags = pyauto.PyUITest.ExtraChromeFlags(self)
- if self.IsChromeOS():
- while '--skip-oauth-login' in flags:
- flags.remove('--skip-oauth-login')
- url = self._GetHttpURLForDeviceManagement()
- flags.append('--device-management-url=' + url)
- flags.append('--disable-sync')
- return flags
-
- def _SetUpWithSessionManagerStopped(self):
- """Sets up the test environment after stopping the session manager."""
- assert self.IsChromeOS()
- logging.debug('Stopping session manager')
- cros_ui.stop(allow_fail=True)
-
- # Start mock GAIA server.
- self._auth_server = auth_server.GoogleAuthServer()
- self._auth_server.run()
-
- # Disable TPM if present.
- if os.path.exists(TPM_SYSFS_PATH):
- self._Call('mount -t tmpfs -o size=1k tmpfs %s'
- % os.path.realpath(TPM_SYSFS_PATH), check=True)
- self._WriteFile(TPM_SYSFS_ENABLED_FILE, '0')
-
- # Clear install attributes and restart cryptohomed to pick up the change.
- self._ClearInstallAttributesOnChromeOS()
-
- # Set install attributes to mock enterprise enrollment.
- bus = dbus.SystemBus()
- proxy = bus.get_object('org.chromium.Cryptohome',
- '/org/chromium/Cryptohome')
- install_attributes = {
- 'enterprise.device_id': self.device_id,
- 'enterprise.domain': string.split(self.owner, '@')[-1],
- 'enterprise.mode': self.mode,
- 'enterprise.owned': 'true',
- 'enterprise.user': self.owner
- }
- interface = dbus.Interface(proxy, 'org.chromium.CryptohomeInterface')
- for name, value in install_attributes.iteritems():
- interface.InstallAttributesSet(name, '%s\0' % value)
- interface.InstallAttributesFinalize()
-
- # Start mock DNS server that redirects all traffic to 127.0.0.1.
- self._dns_server = dns_server.LocalDns()
- self._dns_server.run()
-
- # Start mock DMServer.
- source_dir = os.path.normpath(pyauto_paths.GetSourceDir())
- self._temp_data_dir = tempfile.mkdtemp(dir=source_dir)
- logging.debug('TestServer input path: %s' % self._temp_data_dir)
- relative_temp_data_dir = os.path.basename(self._temp_data_dir)
- self._http_server = self.StartHTTPServer(relative_temp_data_dir)
-
- # Initialize the policy served.
- self._device_policy = {}
- self._user_policy = {}
- self._WritePolicyOnChromeOS()
-
- # Register with mock DMServer and retrieve initial device policy blob.
- dm_token = self._DMRegisterDevice()
- policy = self._DMFetchPolicy(dm_token)
-
- # Write the initial device policy blob.
- self._WriteFile(constants.OWNER_KEY_FILE, policy.new_public_key)
- self._WriteFile(constants.SIGNED_POLICY_FILE, policy.SerializeToString())
-
- # Remove any existing vaults.
- self.RemoveAllCryptohomeVaultsOnChromeOS()
-
- # Restart session manager and Chrome.
- self._StartSessionManagerAndChrome()
-
- def _tearDownWithSessionManagerStopped(self):
- """Resets the test environment after stopping the session manager."""
- assert self.IsChromeOS()
- logging.debug('Stopping session manager')
- cros_ui.stop(allow_fail=True)
-
- # Stop mock GAIA server.
- if self._auth_server:
- self._auth_server.stop()
-
- # Reenable TPM if present.
- if os.path.exists(TPM_SYSFS_PATH):
- self._Call('umount %s' % os.path.realpath(TPM_SYSFS_PATH))
-
- # Clear install attributes and restart cryptohomed to pick up the change.
- self._ClearInstallAttributesOnChromeOS()
-
- # Stop mock DNS server.
- if self._dns_server:
- self._dns_server.stop()
-
- # Stop mock DMServer.
- self.StopHTTPServer(self._http_server)
-
- # Clear the policy served.
- pyauto_utils.RemovePath(self._temp_data_dir)
-
- # Remove the device policy blob.
- self._RemoveIfExists(constants.OWNER_KEY_FILE)
- self._RemoveIfExists(constants.SIGNED_POLICY_FILE)
-
- # Remove any existing vaults.
- self.RemoveAllCryptohomeVaultsOnChromeOS()
-
- # Restart session manager and Chrome.
- self._StartSessionManagerAndChrome()
-
- def setUp(self):
- """Sets up the platform for policy testing.
-
- On ChromeOS, part of the setup involves restarting the session manager to
- inject an initial device policy blob.
- """
- if self.IsChromeOS():
- # Perform the remainder of the setup with the device manager stopped.
- try:
- self.WaitForSessionManagerRestart(
- self._SetUpWithSessionManagerStopped)
- except:
- # Destroy the non re-entrant services.
- if self._auth_server:
- self._auth_server.stop()
- if self._dns_server:
- self._dns_server.stop()
- raise
-
- pyauto.PyUITest.setUp(self)
- self._branding = self.GetBrowserInfo()['properties']['branding']
-
- def tearDown(self):
- """Cleans up the policies and related files created in tests."""
- if self.IsChromeOS():
- # Perform the cleanup with the device manager stopped.
- self.WaitForSessionManagerRestart(self._tearDownWithSessionManagerStopped)
- else:
- # On other platforms, there is only user policy to clear.
- self.SetUserPolicy(refresh=False)
-
- pyauto.PyUITest.tearDown(self)
-
- def LoginWithTestAccount(self, account='prod_enterprise_test_user'):
- """Convenience method for logging in with one of the test accounts."""
- assert self.IsChromeOS()
- credentials = self.GetPrivateInfo()[account]
- self.Login(credentials['username'], credentials['password'])
- assert self.GetLoginInfo()['is_logged_in']
-
- def _GetCurrentLoginScreenId(self):
- return self.ExecuteJavascriptInOOBEWebUI(
- """window.domAutomationController.send(
- String(cr.ui.Oobe.getInstance().currentScreen.id));
- """)
-
- def _WaitForLoginScreenId(self, id):
- self.assertTrue(
- self.WaitUntil(function=self._GetCurrentLoginScreenId,
- expect_retval=id),
- msg='Expected login screen "%s" to be visible.' % id)
-
- def _CheckLoginFormLoading(self):
- return self.ExecuteJavascriptInOOBEWebUI(
- """window.domAutomationController.send(
- cr.ui.Oobe.getInstance().currentScreen.loading);
- """)
-
- def PrepareToWaitForLoginFormReload(self):
- self.assertEqual('gaia-signin',
- self._GetCurrentLoginScreenId(),
- msg='Expected the login form to be visible.')
- self.assertTrue(
- self.WaitUntil(function=self._CheckLoginFormLoading,
- expect_retval=False),
- msg='Expected the login form to finish loading.')
- # Set up a sentinel variable that is false now and will toggle to true when
- # the login form starts reloading.
- self.ExecuteJavascriptInOOBEWebUI(
- """var screen = cr.ui.Oobe.getInstance().currentScreen;
- if (!('reload_started' in screen)) {
- screen.orig_loadAuthExtension_ = screen.loadAuthExtension_;
- screen.loadAuthExtension_ = function(data) {
- this.orig_loadAuthExtension_(data);
- if (this.loading)
- this.reload_started = true;
- }
- }
- screen.reload_started = false;
- window.domAutomationController.send(true);""")
-
- def _CheckLoginFormReloaded(self):
- return self.ExecuteJavascriptInOOBEWebUI(
- """window.domAutomationController.send(
- cr.ui.Oobe.getInstance().currentScreen.reload_started &&
- !cr.ui.Oobe.getInstance().currentScreen.loading);
- """)
-
- def WaitForLoginFormReload(self):
- self.assertEqual('gaia-signin',
- self._GetCurrentLoginScreenId(),
- msg='Expected the login form to be visible.')
- self.assertTrue(
- self.WaitUntil(function=self._CheckLoginFormReloaded),
- msg='Expected the login form to finish reloading.')
-
- def _SetUserPolicyChromeOS(self, user_policy=None):
- """Writes the given user policy to the mock DMServer's input file."""
- self._user_policy = user_policy or {}
- self._WritePolicyOnChromeOS()
-
- def _SetUserPolicyWin(self, user_policy=None):
- """Writes the given user policy to the Windows registry."""
- def SetValueEx(key, sub_key, value):
- if isinstance(value, int):
- winreg.SetValueEx(key, sub_key, 0, winreg.REG_DWORD, int(value))
- elif isinstance(value, basestring):
- winreg.SetValueEx(key, sub_key, 0, winreg.REG_SZ, value.encode('ascii'))
- elif isinstance(value, list):
- k = winreg.CreateKey(key, sub_key)
- for index, v in list(enumerate(value)):
- SetValueEx(k, str(index + 1), v)
- winreg.CloseKey(k)
- else:
- raise TypeError('Unsupported data type: "%s"' % value)
-
- assert self.IsWin()
- if self._branding == 'Google Chrome':
- reg_base = r'SOFTWARE\Policies\Google\Chrome'
- else:
- reg_base = r'SOFTWARE\Policies\Chromium'
-
- if subprocess.call(
- r'reg query HKEY_LOCAL_MACHINE\%s' % reg_base) == 0:
- logging.debug(r'Removing %s' % reg_base)
- subprocess.call(r'reg delete HKLM\%s /f' % reg_base)
-
- if user_policy is not None:
- root_key = winreg.CreateKey(winreg.HKEY_LOCAL_MACHINE, reg_base)
- for k, v in user_policy.iteritems():
- SetValueEx(root_key, k, v)
- winreg.CloseKey(root_key)
-
- def _SetUserPolicyLinux(self, user_policy=None):
- """Writes the given user policy to the JSON policy file read by Chrome."""
- assert self.IsLinux()
- sudo_cmd_file = os.path.join(os.path.dirname(__file__),
- 'policy_posix_util.py')
-
- if self._branding == 'Google Chrome':
- policies_location_base = '/etc/opt/chrome'
- else:
- policies_location_base = '/etc/chromium'
-
- if os.path.exists(policies_location_base):
- logging.debug('Removing directory %s' % policies_location_base)
- subprocess.call(['suid-python', sudo_cmd_file,
- 'remove_dir', policies_location_base])
-
- if user_policy is not None:
- self._WriteFile('/tmp/chrome.json',
- json.dumps(user_policy, sort_keys=True, indent=2) + '\n')
-
- policies_location = '%s/policies/managed' % policies_location_base
- subprocess.call(['suid-python', sudo_cmd_file,
- 'setup_dir', policies_location])
- subprocess.call(['suid-python', sudo_cmd_file,
- 'perm_dir', policies_location])
- # Copy chrome.json file to the managed directory
- subprocess.call(['suid-python', sudo_cmd_file,
- 'copy', '/tmp/chrome.json', policies_location])
- os.remove('/tmp/chrome.json')
-
- def _SetUserPolicyMac(self, user_policy=None):
- """Writes the given user policy to the plist policy file read by Chrome."""
- assert self.IsMac()
- sudo_cmd_file = os.path.join(os.path.dirname(__file__),
- 'policy_posix_util.py')
-
- if self._branding == 'Google Chrome':
- policies_file_base = 'com.google.Chrome.plist'
- else:
- policies_file_base = 'org.chromium.Chromium.plist'
-
- policies_location = os.path.join('/Library', 'Managed Preferences',
- getpass.getuser())
-
- if os.path.exists(policies_location):
- logging.debug('Removing directory %s' % policies_location)
- subprocess.call(['suid-python', sudo_cmd_file,
- 'remove_dir', policies_location])
-
- if user_policy is not None:
- policies_tmp_file = os.path.join('/tmp', policies_file_base)
- plistlib.writePlist(user_policy, policies_tmp_file)
- subprocess.call(['suid-python', sudo_cmd_file,
- 'setup_dir', policies_location])
- # Copy policy file to the managed directory
- subprocess.call(['suid-python', sudo_cmd_file,
- 'copy', policies_tmp_file, policies_location])
- os.remove(policies_tmp_file)
-
- def SetUserPolicy(self, user_policy=None, refresh=True):
- """Sets the user policy provided as a dict.
-
- Args:
- user_policy: The user policy to set. None clears it.
- refresh: If True, Chrome will refresh and apply the new policy.
- Requires Chrome to be alive for it.
- """
- if self.IsChromeOS():
- self._SetUserPolicyChromeOS(user_policy=user_policy)
- elif self.IsWin():
- self._SetUserPolicyWin(user_policy=user_policy)
- elif self.IsLinux():
- self._SetUserPolicyLinux(user_policy=user_policy)
- elif self.IsMac():
- self._SetUserPolicyMac(user_policy=user_policy)
- else:
- raise NotImplementedError('Not available on this platform.')
-
- if refresh:
- self.RefreshPolicies()
-
- def SetDevicePolicy(self, device_policy=None, refresh=True):
- """Sets the device policy provided as a dict.
-
- Args:
- device_policy: The device policy to set. None clears it.
- refresh: If True, Chrome will refresh and apply the new policy.
- Requires Chrome to be alive for it.
- """
- assert self.IsChromeOS()
- self._device_policy = device_policy or {}
- self._WritePolicyOnChromeOS()
- if refresh:
- self.RefreshPolicies()
« no previous file with comments | « chrome/test/pyautolib/plugins_info.py ('k') | chrome/test/pyautolib/policy_posix_util.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698