Chromium Code Reviews| Index: chrome/test/pyautolib/policy_base.py |
| diff --git a/chrome/test/pyautolib/policy_base.py b/chrome/test/pyautolib/policy_base.py |
| index db66807245e19cb20e9924c4c966b1ff63c32f71..51a1ee16e2f454d33a7b2d3c565b62cea2fed63d 100644 |
| --- a/chrome/test/pyautolib/policy_base.py |
| +++ b/chrome/test/pyautolib/policy_base.py |
| @@ -5,36 +5,34 @@ |
| """Base class for tests that need to update the policies enforced by Chrome. |
| -Subclasses can call SetPolicies with a dictionary of policies to install. |
| -SetPolicies can also be used to set the device policies on ChromeOS. |
| +Subclasses can call SetUserPolicy (ChromeOS, Linux, Windows) and |
| +SetDevicePolicy (ChromeOS only) with a dictionary of the policies to install. |
| The current implementation depends on the platform. The implementations might |
| -change in the future, but tests relying on SetPolicies will keep working. |
| +change in the future, but tests relying on the above calls will keep working. |
| """ |
| -# On ChromeOS this relies on the device_management.py part of the TestServer, |
| -# and forces the policies by triggering a new policy fetch and refreshing the |
| -# cloud policy providers. This requires preparing the system for policy |
| -# fetching, which currently means the DMTokens have to be in place. Without the |
| -# DMTokens, the cloud policy controller won't be able to proceed, because the |
| -# Gaia tokens for the DMService aren't available during tests. |
| -# In the future this setup might not be necessary anymore, and the policy |
| -# might also be pushed through the session_manager. |
| +# On ChromeOS, both user and device policies are supported. Chrome is set up to |
| +# fetch user policy from a TestServer. A call to SetUserPolicy triggers an |
| +# immediate policy refresh, allowing the effects of user policy changes on a |
| +# running session to be tested. |
| # |
| -# On other platforms it relies on the SetPolicies automation call, which is |
| -# only available on non-official builds. This automation call is meant to be |
| -# eventually removed when a replacement for every platform is available. |
| -# This requires setting up the policies in the registry on Windows, and writing |
| -# the right files on Linux and Mac. |
| +# Device policy is injected by stopping Chrome and the session manager, writing |
| +# a new device policy blob and starting Chrome and the session manager again. |
| +# This means that setting device policy always logs out the current user. It is |
| +# *not* possible to test the effects on a running session. These limitations |
| +# stem from the fact that Chrome will only fetch device policy from a TestServer |
| +# if the device is enterprise owned. Enterprise ownership, in turn, requires |
| +# ownership of the TPM which can only be undone by reboothing the device (and |
| +# hence is not possible in a client test). |
| import json |
| import logging |
| import os |
| import subprocess |
| import tempfile |
| -import urllib |
| -import urllib2 |
| +import asn1der |
| import pyauto |
| import pyauto_paths |
| import pyauto_utils |
| @@ -42,6 +40,10 @@ import pyauto_utils |
| if pyauto.PyUITest.IsChromeOS(): |
| import sys |
| + import warnings |
| + # Ignore deprecation warnings, they make our output more cluttered. |
| + warnings.filterwarnings("ignore", category=DeprecationWarning) |
|
Nirnimesh
2012/04/05 19:05:30
nit: use ' instead of " for consistency
bartfab (slow)
2012/04/10 15:27:20
Done.
|
| + |
| # Find the path to the pyproto and add it to sys.path. |
| # Prepend it so that google.protobuf is loaded from here. |
| for path in pyauto_paths.GetBuildDirs(): |
| @@ -51,18 +53,25 @@ if pyauto.PyUITest.IsChromeOS(): |
| 'proto')] + sys.path |
| break |
| sys.path.append('/usr/local') # to import autotest libs. |
| - import device_management_local_pb2 as dml |
| - import device_management_backend_pb2 as dmb |
| + sys.path.append(os.path.join(pyauto_paths.GetThirdPartyDir(), 'tlslite')) |
| + |
| + import chrome_device_policy_pb2 as dp |
| + import device_management_backend_pb2 as dm |
| + import tlslite.api |
| from autotest.cros import constants |
| from autotest.cros import cros_ui |
| elif pyauto.PyUITest.IsWin(): |
| import _winreg as winreg |
| +# ASN.1 object identifier for PKCS#1/RSA. |
| +PKCS1_RSA_OID = '\x2a\x86\x48\x86\xf7\x0d\x01\x01\x01' |
| + |
| class PolicyTestBase(pyauto.PyUITest): |
| """A base class for tests that need to set up and modify policies. |
| - Subclasses can use the SetPolicies call to set the policies seen by Chrome. |
| + Subclasses can use the methods SetUserPolicy (ChromeOS, Linux, Windows) and |
| + SetDevicePolicy (ChromeOS only) to set the policies seen by Chrome. |
| """ |
| def _WriteFile(self, path, content): |
| @@ -79,118 +88,25 @@ class PolicyTestBase(pyauto.PyUITest): |
| return os.path.join(self._temp_data_dir, 'device_management') |
| def _GetHttpURLForDeviceManagement(self): |
| + """Returns the URL at which the TestServer is serving user policy.""" |
| assert self.IsChromeOS() |
| return self._http_server.GetURL('device_management').spec() |
| - def _WriteUserPolicyToken(self, token): |
| - """Writes the given token to the user device management cache.""" |
| - assert self.IsChromeOS() |
| - blob = dml.DeviceCredentials() |
| - blob.device_token = token |
| - blob.device_id = '123' |
| - self._WriteFile('/home/chronos/user/Device Management/Token', |
| - blob.SerializeToString()) |
| + def _WriteDevicePolicyWithSessionManagerStopped(self): |
| + """Writes the device policy blob while the session manager is stopped. |
| - def _WriteDevicePolicy(self, fetch_response): |
| - """Writes the given signed fetch_response to the device policy cache. |
| - |
| - Also writes the owner key, used to verify the signature. |
| + Updates the files holding the device policy blob and the public key need to |
| + verify its signature. |
| """ |
| assert self.IsChromeOS() |
| - self._WriteFile(constants.SIGNED_POLICY_FILE, |
| - fetch_response.SerializeToString()) |
| - self._WriteFile(constants.OWNER_KEY_FILE, |
| - fetch_response.new_public_key) |
| - |
| - def _PostToDMServer(self, request_type, body, headers): |
| - """Posts a request to the TestServer's Device Management interface. |
| - |
| - |request_type| is the value of the 'request' HTTP parameter. |
| - Returns the response's body. |
| - """ |
| - assert self.IsChromeOS() |
| - url = self._GetHttpURLForDeviceManagement() |
| - url += '?' + urllib.urlencode({ |
| - 'deviceid': '123', |
| - 'oauth_token': '456', |
| - 'request': request_type, |
| - 'devicetype': 2, |
| - 'apptype': 'Chrome', |
| - 'agent': 'Chrome', |
| - }) |
| - return urllib2.urlopen(urllib2.Request(url, body, headers)).read() |
| - |
| - def _PostRegisterRequest(self, type): |
| - """Sends a device register request to the TestServer, of the given type.""" |
| - assert self.IsChromeOS() |
| - request = dmb.DeviceManagementRequest() |
| - register = request.register_request |
| - register.machine_id = '789' |
| - register.type = type |
| - return self._PostToDMServer('register', request.SerializeToString(), {}) |
| - |
| - def _RegisterAndGetDMToken(self, device): |
| - """Registers with the TestServer and returns the DMToken fetched. |
| - |
| - Registers for device policy if device is True. Otherwise registers for |
| - user policy. |
| - """ |
| - assert self.IsChromeOS() |
| - type = device and dmb.DeviceRegisterRequest.DEVICE \ |
| - or dmb.DeviceRegisterRequest.USER |
| - rstring = self._PostRegisterRequest(type) |
| - response = dmb.DeviceManagementResponse() |
| - response.ParseFromString(rstring) |
| - return response.register_response.device_management_token |
| - |
| - def _PostPolicyRequest(self, token, type, want_signature=False): |
| - """Fetches policy from the TestServer, using the given token. |
| - |
| - Policy is fetched for the given type. If want_signature is True, the |
| - request will ask for a signed response. Returns the response body. |
| - """ |
| - assert self.IsChromeOS() |
| - request = dmb.DeviceManagementRequest() |
| - policy = request.policy_request |
| - prequest = policy.request.add() |
| - prequest.policy_type = type |
| - if want_signature: |
| - prequest.signature_type = dmb.PolicyFetchRequest.SHA1_RSA |
| - headers = { |
| - 'Authorization': 'GoogleDMToken token=' + token, |
| - } |
| - return self._PostToDMServer('policy', request.SerializeToString(), headers) |
| - |
| - def _FetchPolicy(self, token, device): |
| - """Fetches policy from the TestServer, using the given token. |
| - |
| - Token must be a valid token retrieved with _RegisterAndGetDMToken. If |
| - device is True, fetches signed device policy. Otherwise fetches user policy. |
| - This method also verifies the response, and returns the first policy fetch |
| - response. |
| - """ |
| - assert self.IsChromeOS() |
| - type = device and 'google/chromeos/device' or 'google/chromeos/user' |
| - rstring = self._PostPolicyRequest(token=token, type=type, |
| - want_signature=device) |
| - response = dmb.DeviceManagementResponse() |
| - response.ParseFromString(rstring) |
| - fetch_response = response.policy_response.response[0] |
| - assert fetch_response.policy_data |
| - assert fetch_response.policy_data_signature |
| - assert fetch_response.new_public_key |
| - return fetch_response |
| - |
| - def _WriteDevicePolicyWithSessionManagerStopped(self, policy): |
| - """Writes the device policy blob while the Session Manager is stopped.""" |
| - assert self.IsChromeOS() |
| logging.debug('Stopping session manager') |
| cros_ui.stop(allow_fail=True) |
| - logging.debug('Writing device policy cache') |
| - self._WriteDevicePolicy(policy) |
| + logging.debug('Writing device policy blob') |
| + self._WriteFile(constants.SIGNED_POLICY_FILE, self._device_policy_blob) |
| + self._WriteFile(constants.OWNER_KEY_FILE, self._public_key) |
| - # Ugly hack: session manager won't spawn chrome if this file exists. That's |
| - # usually a good thing (to keep the automation channel open), but in this |
| + # Ugly hack: session manager will not spawn Chrome if this file exists. That |
| + # is usually a good thing (to keep the automation channel open), but in this |
| # case we really want to restart chrome. PyUITest.setUp() will be called |
| # after session manager and chrome have restarted, and will setup the |
| # automation channel. |
| @@ -205,7 +121,7 @@ class PolicyTestBase(pyauto.PyUITest): |
| logging.debug('Starting session manager again') |
| cros_ui.start() |
| - # cros_ui.start() waits for the login prompt to be visible, so chrome has |
| + # cros_ui.start() waits for the login prompt to be visible, so Chrome has |
| # already started once it returns. |
| if restore_magic_file: |
| open(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE, 'w').close() |
| @@ -215,102 +131,93 @@ class PolicyTestBase(pyauto.PyUITest): |
| """Sets up Chrome to use cloud policies on ChromeOS.""" |
| flags = pyauto.PyUITest.ExtraChromeFlags(self) |
| if self.IsChromeOS(): |
| + while '--skip-oauth-login' in flags: |
| + flags.remove('--skip-oauth-login') |
| url = self._GetHttpURLForDeviceManagement() |
| - flag = '--device-management-url=' + url |
| - flags += [flag] |
| + flags.append('--device-management-url=' + url) |
| + flags.append('--disable-sync') |
| return flags |
| def setUp(self): |
| """Sets up the platform for policy testing. |
| - On ChromeOS, part of the set up involves restarting the session_manager and |
| - logging in with the $default account. |
| + On ChromeOS, part of the setup involves restarting the session manager to |
| + inject a device policy blob. |
| """ |
| if self.IsChromeOS(): |
| - # Setup a temporary data dir and a TestServer serving files from there. |
| + # Set up a temporary data dir and a TestServer serving files from there. |
| # The TestServer makes its document root relative to the src dir. |
| self._temp_data_dir = tempfile.mkdtemp(dir=pyauto_paths.GetSourceDir()) |
| relative_temp_data_dir = os.path.basename(self._temp_data_dir) |
| self._http_server = self.StartHTTPServer(relative_temp_data_dir) |
| - # Setup empty policies, so that the TestServer can start replying. |
| - self._SetCloudPolicies() |
| + # Set up an empty user policy so that the TestServer can start replying. |
| + self._SetUserPolicyChromeOS() |
| - device_dmtoken = self._RegisterAndGetDMToken(device=True) |
| - policy = self._FetchPolicy(token=device_dmtoken, device=True) |
| - user_dmtoken = self._RegisterAndGetDMToken(device=False) |
| + # Generate a key pair for signing device policy. |
| + self._private_key = tlslite.api.generateRSAKey(1024) |
| + algorithm = asn1der.Sequence( |
| + [asn1der.Data(asn1der.OBJECT_IDENTIFIER, PKCS1_RSA_OID), |
| + asn1der.Data(asn1der.NULL, '')]) |
| + rsa_pubkey = asn1der.Sequence([asn1der.Integer(self._private_key.n), |
| + asn1der.Integer(self._private_key.e)]) |
| + self._public_key = asn1der.Sequence( |
| + [algorithm, asn1der.Bitstring(rsa_pubkey)]) |
| - # The device policy blob is only picked up by the session manager on |
| - # startup, and is overwritten on shutdown. So the blob has to be written |
| - # while the session manager is stopped. |
| - self.WaitForSessionManagerRestart( |
| - lambda: self._WriteDevicePolicyWithSessionManagerStopped(policy)) |
| - logging.debug('Session manager restarted with device policy ready') |
| + # Clear device policy. This also invokes pyauto.PyUITest.setUp(self). |
| + self.SetDevicePolicy() |
| - pyauto.PyUITest.setUp(self) |
| - |
| - if self.IsChromeOS(): |
| - logging.debug('Logging in') |
| - credentials = self.GetPrivateInfo()['prod_enterprise_test_user'] |
| - self.Login(credentials['username'], credentials['password']) |
| - assert self.GetLoginInfo()['is_logged_in'] |
| - |
| - self._WriteUserPolicyToken(user_dmtoken) |
| - # The browser has to be reloaded to make the user policy token cache |
| - # reload the file just written. The file can also be written only after |
| - # the cryptohome is mounted, after login. |
| - self.RestartBrowser(clear_profile=False) |
| + # Remove any existing vaults. |
| + self.RemoveAllCryptohomeVaultsOnChromeOS() |
| + else: |
| + pyauto.PyUITest.setUp(self) |
| def tearDown(self): |
| - """Cleans up the files created by setUp and policies added in tests.""" |
| - # Clear the policies. |
| - self.SetPolicies() |
| - |
| + """Cleans up the policies and related files created in tests.""" |
| if self.IsChromeOS(): |
| - pyauto.PyUITest.Logout(self) |
| + # On ChromeOS, clearing device policy logs out the current user so that |
| + # no policy is in force. User policy is then permanently cleared at the |
| + # end of the method after stopping the TestServer. |
| + self.SetDevicePolicy() |
| + else: |
| + # On other platforms, there is only user policy to clear. |
| + self.SetUserPolicy() |
| pyauto.PyUITest.tearDown(self) |
| if self.IsChromeOS(): |
| self.StopHTTPServer(self._http_server) |
| pyauto_utils.RemovePath(self._temp_data_dir) |
| + self.RemoveAllCryptohomeVaultsOnChromeOS() |
| - def _SetCloudPolicies(self, user_mandatory=None, user_recommended=None, |
| - device=None): |
| - """Exports the policies to the configuration file of the TestServer. |
| + def LoginWithTestAccount(self, account='prod_enterprise_test_user'): |
| + """Convenience method for logging in with one of the test accounts.""" |
| + assert self.IsChromeOS() |
| + credentials = self.GetPrivateInfo()[account] |
| + self.Login(credentials['username'], credentials['password']) |
| + assert self.GetLoginInfo()['is_logged_in'] |
| - The TestServer will serve these policies after this function returns. |
| + def SetPolicy(self): |
| + raise NotImplementedError('This class supports user and device policies. ' |
| + 'Instead of SetPolicies, use SetUserPolicy or ' |
| + 'SetDevicePolicy.') |
| - Args: |
| - user_mandatory: user policies of mandatory level |
| - user_recommended: user policies of recommended level |
| - device: device policies |
| - """ |
| + def _SetUserPolicyChromeOS(self, user_policy=None): |
| + """Writes the given user policy to the TestServer's input file.""" |
| assert self.IsChromeOS() |
| policy_dict = { |
| - 'google/chromeos/device': device or {}, |
| + 'google/chromeos/device': {}, |
| 'google/chromeos/user': { |
| - 'mandatory': user_mandatory or {}, |
| - 'recommended': user_recommended or {}, |
| + 'mandatory': user_policy or {}, |
| + 'recommended': {}, |
| }, |
| 'managed_users': ['*'], |
| } |
| self._WriteFile(self._GetTestServerPoliciesFilePath(), |
| json.dumps(policy_dict, sort_keys=True, indent=2) + '\n') |
| - def _SetPoliciesWin(self, user_policy=None): |
| - """Exports the policies as dictionary in the argument to Window registry. |
| - |
| - Removes the registry key and its subkeys if they exist. |
| - |
| - Args: |
| - user_policy: A dictionary representing the user policies. Clear the |
| - registry if None. |
| - |
| - Raises: |
| - TypeError: If an unexpected value is found in the policy dictionary. |
| - """ |
| - |
| + def _SetUserPolicyWin(self, user_policy=None): |
| + """Writes the given user policy to the Windows registry.""" |
| def SetValueEx(key, sub_key, value): |
| if isinstance(value, int): |
| winreg.SetValueEx(key, sub_key, 0, winreg.REG_DWORD, int(value)) |
| @@ -341,15 +248,8 @@ class PolicyTestBase(pyauto.PyUITest): |
| SetValueEx(root_key, k, v) |
| winreg.CloseKey(root_key) |
| - def _SetPoliciesLinux(self, user_policy=None): |
| - """Exports the policies as dictionary in the argument to a JSON file. |
| - |
| - Removes the JSON file if it exists. |
| - |
| - Args: |
| - user_policy: A dictionary representing the user policies. Remove the |
| - JSON file if None |
| - """ |
| + def _SetUserPolicyLinux(self, user_policy=None): |
| + """Writes the given user policy to the JSON policy file read by Chrome.""" |
| assert self.IsLinux() |
| sudo_cmd_file = os.path.join(os.path.dirname(__file__), |
| 'policy_linux_util.py') |
| @@ -376,29 +276,101 @@ class PolicyTestBase(pyauto.PyUITest): |
| 'copy', '/tmp/chrome.json', policies_location]) |
| os.remove('/tmp/chrome.json') |
| - def SetPolicies(self, user_policy=None, device_policy=None): |
| - """Enforces the policies given in the arguments as dictionaries. |
| - |
| - These policies will have been installed after this call returns. |
| + def SetUserPolicy(self, user_policy=None): |
| + """Sets the user policy provided as a dict. |
| - Args: |
| - user_policy: A dictionary representing the user policies. |
| - device_policy: A dictionary representing the device policies on Chrome OS. |
| - |
| - Raises: |
| - NotImplementedError if the platform is not supported. |
| - """ |
| + Passing a value of None clears the user policy.""" |
| if self.IsChromeOS(): |
| - self._SetCloudPolicies(user_mandatory=user_policy, device=device_policy) |
| + self._SetUserPolicyChromeOS(user_policy=user_policy) |
| + elif self.IsWin(): |
| + self._SetUserPolicyWin(user_policy=user_policy) |
| + elif self.IsLinux(): |
| + self._SetUserPolicyLinux(user_policy=user_policy) |
| else: |
| - if device_policy is not None: |
| - raise NotImplementedError('Device policy is only available on ChromeOS') |
| - if self.IsWin(): |
| - self._SetPoliciesWin(user_policy=user_policy) |
| - elif self.IsLinux(): |
| - self._SetPoliciesLinux(user_policy=user_policy) |
| - else: |
| - raise NotImplementedError('Not available on this platform.') |
| + raise NotImplementedError('Not available on this platform.') |
| self.RefreshPolicies() |
| + def _SetProtobufMessageField(self, group_message, field, field_value): |
| + """Sets the given field in a protobuf to the given value.""" |
| + if field.label == field.LABEL_REPEATED: |
| + assert type(field_value) == list |
| + entries = group_message.__getattribute__(field.name) |
| + for list_item in field_value: |
| + entries.append(list_item) |
| + return |
| + elif field.type == field.TYPE_BOOL: |
| + assert type(field_value) == bool |
| + elif field.type == field.TYPE_STRING: |
| + assert type(field_value) == str or type(field_value) == unicode |
| + elif field.type == field.TYPE_INT64: |
| + assert type(field_value) == int |
| + elif (field.type == field.TYPE_MESSAGE and |
| + field.message_type.name == 'StringList'): |
| + assert type(field_value) == list |
| + entries = group_message.__getattribute__(field.name).entries |
| + for list_item in field_value: |
| + entries.append(list_item) |
| + return |
| + else: |
| + raise Exception('Unknown field type %s' % field.type) |
| + group_message.__setattr__(field.name, field_value) |
| + |
| + def _GenerateDevicePolicyBlob(self, device_policy=None, owner=None): |
| + """Generates a signed device policy blob.""" |
| + |
| + # Fill in the device settings protobuf. |
| + device_policy = device_policy or {} |
| + owner = owner or constants.CREDENTIALS['$mockowner'][0] |
| + settings = dp.ChromeDeviceSettingsProto() |
| + for group in settings.DESCRIPTOR.fields: |
| + # Create protobuf message for group. |
| + group_message = eval('dp.' + group.message_type.name + '()') |
| + # Indicates if at least one field was set in |group_message|. |
| + got_fields = False |
| + # Iterate over fields of the message and feed them from the policy dict. |
| + for field in group_message.DESCRIPTOR.fields: |
| + field_value = None |
| + if field.name in device_policy: |
| + got_fields = True |
| + field_value = device_policy[field.name] |
| + self._SetProtobufMessageField(group_message, field, field_value) |
| + if got_fields: |
| + settings.__getattribute__(group.name).CopyFrom(group_message) |
| + |
| + # Fill in the policy data protobuf. |
| + policy_data = dm.PolicyData() |
| + policy_data.policy_type = 'google/chromeos/device' |
| + policy_data.policy_value = settings.SerializeToString() |
| + policy_data.username = owner |
| + serialized_policy_data = policy_data.SerializeToString() |
| + |
| + # Fill in the device management response protobuf. |
| + response = dm.DeviceManagementResponse() |
| + fetch_response = response.policy_response.response.add() |
| + fetch_response.policy_data = serialized_policy_data |
| + fetch_response.policy_data_signature = ( |
| + self._private_key.hashAndSign(serialized_policy_data).tostring()) |
| + |
| + self._device_policy_blob = fetch_response.SerializeToString() |
| + |
| + def _RefreshDevicePolicy(self): |
| + """Refreshes the device policy in force on ChromeOS.""" |
| + assert self.IsChromeOS() |
| + # The device policy blob is only picked up by the session manager on |
| + # startup, and is overwritten on shutdown. So the blob has to be written |
| + # while the session manager is stopped. |
| + self.WaitForSessionManagerRestart( |
| + lambda: self._WriteDevicePolicyWithSessionManagerStopped()) |
|
Nirnimesh
2012/04/05 19:05:30
lambda is redundant.
self.WaitForSessionManagerRe
bartfab (slow)
2012/04/10 15:27:20
Done.
|
| + logging.debug('Session manager restarted with device policy ready') |
| + pyauto.PyUITest.setUp(self) |
| + |
| + def SetDevicePolicy(self, device_policy=None, owner=None): |
| + """Sets the device policy provided as a dict and the owner on ChromeOS. |
| + |
| + Passing a value of None as the device policy clears it.""" |
| + if not self.IsChromeOS(): |
| + raise NotImplementedError('Device policy is only available on ChromeOS.') |
| + |
| + self._GenerateDevicePolicyBlob(device_policy, owner) |
| + self._RefreshDevicePolicy() |