Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 #!/usr/bin/python | 1 #!/usr/bin/python |
| 2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | 2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
| 4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
| 5 | 5 |
| 6 """Base class for tests that need to update the policies enforced by Chrome. | 6 """Base class for tests that need to update the policies enforced by Chrome. |
| 7 | 7 |
| 8 Subclasses can call SetPolicies with a dictionary of policies to install. | 8 Subclasses can call SetUserPolicy (ChromeOS, Linux, Windows) and |
| 9 SetPolicies can also be used to set the device policies on ChromeOS. | 9 SetDevicePolicy (ChromeOS only) with a dictionary of the policies to install. |
| 10 | 10 |
| 11 The current implementation depends on the platform. The implementations might | 11 The current implementation depends on the platform. The implementations might |
| 12 change in the future, but tests relying on SetPolicies will keep working. | 12 change in the future, but tests relying on the above calls will keep working. |
| 13 """ | 13 """ |
| 14 | 14 |
| 15 # On ChromeOS this relies on the device_management.py part of the TestServer, | 15 # On ChromeOS, both user and device policies are supported. Chrome is set up to |
| 16 # and forces the policies by triggering a new policy fetch and refreshing the | 16 # fetch user policy from a TestServer. A call to SetUserPolicy triggers an |
| 17 # cloud policy providers. This requires preparing the system for policy | 17 # immediate policy refresh, allowing the effects of policy changes on a running |
| 18 # fetching, which currently means the DMTokens have to be in place. Without the | 18 # session to be tested. |
| 19 # DMTokens, the cloud policy controller won't be able to proceed, because the | |
| 20 # Gaia tokens for the DMService aren't available during tests. | |
| 21 # In the future this setup might not be necessary anymore, and the policy | |
| 22 # might also be pushed through the session_manager. | |
| 23 # | 19 # |
| 24 # On other platforms it relies on the SetPolicies automation call, which is | 20 # Device policy is injected by stopping Chrome and the session manager, writing |
| 25 # only available on non-official builds. This automation call is meant to be | 21 # a new device policy blob and starting Chrome and the session manager again. |
|
frankf
2012/04/04 01:44:01
Mention that this will log you out. The limitation
bartfab (slow)
2012/04/04 09:24:14
Done.
| |
| 26 # eventually removed when a replacement for every platform is available. | 22 # This is necessary because Chrome will only fetch device policy from a |
| 27 # This requires setting up the policies in the registry on Windows, and writing | 23 # TestServer if the device is enterprise owned. Enterprise ownership, in turn, |
| 24 # requires ownership of the TPM which can only be undone by reboothing the | |
| 25 # device (and hence is not possible in a client test). | |
| 28 # the right files on Linux and Mac. | 26 # the right files on Linux and Mac. |
|
frankf
2012/04/04 01:44:01
delete this line.
bartfab (slow)
2012/04/04 09:24:14
Done.
| |
| 29 | 27 |
| 30 import json | 28 import json |
| 31 import logging | 29 import logging |
| 32 import os | 30 import os |
| 33 import subprocess | 31 import subprocess |
| 34 import tempfile | 32 import tempfile |
| 35 import urllib | |
| 36 import urllib2 | |
| 37 | 33 |
| 34 import asn1der | |
| 38 import pyauto | 35 import pyauto |
| 39 import pyauto_paths | 36 import pyauto_paths |
| 40 import pyauto_utils | 37 import pyauto_utils |
| 41 | 38 |
| 42 | 39 |
| 43 if pyauto.PyUITest.IsChromeOS(): | 40 if pyauto.PyUITest.IsChromeOS(): |
| 44 import sys | 41 import sys |
| 42 import warnings | |
| 43 # Ignore deprecation warnings, they make our output more cluttered. | |
| 44 warnings.filterwarnings("ignore", category=DeprecationWarning) | |
| 45 | |
| 45 # Find the path to the pyproto and add it to sys.path. | 46 # Find the path to the pyproto and add it to sys.path. |
| 46 # Prepend it so that google.protobuf is loaded from here. | 47 # Prepend it so that google.protobuf is loaded from here. |
| 47 for path in pyauto_paths.GetBuildDirs(): | 48 for path in pyauto_paths.GetBuildDirs(): |
| 48 p = os.path.join(path, 'pyproto') | 49 p = os.path.join(path, 'pyproto') |
| 49 if os.path.isdir(p): | 50 if os.path.isdir(p): |
| 50 sys.path = [p, os.path.join(p, 'chrome', 'browser', 'policy', | 51 sys.path = [p, os.path.join(p, 'chrome', 'browser', 'policy', |
| 51 'proto')] + sys.path | 52 'proto')] + sys.path |
| 52 break | 53 break |
| 53 sys.path.append('/usr/local') # to import autotest libs. | 54 sys.path.append('/usr/local') # to import autotest libs. |
| 54 import device_management_local_pb2 as dml | 55 sys.path.append(os.path.join(pyauto_paths.GetThirdPartyDir(), 'tlslite')) |
| 55 import device_management_backend_pb2 as dmb | 56 |
| 57 import chrome_device_policy_pb2 as dp | |
| 58 import device_management_backend_pb2 as dm | |
| 59 import tlslite.api | |
| 56 from autotest.cros import constants | 60 from autotest.cros import constants |
| 57 from autotest.cros import cros_ui | 61 from autotest.cros import cros_ui |
| 58 elif pyauto.PyUITest.IsWin(): | 62 elif pyauto.PyUITest.IsWin(): |
| 59 import _winreg as winreg | 63 import _winreg as winreg |
| 60 | 64 |
| 65 # ASN.1 object identifier for PKCS#1/RSA. | |
| 66 PKCS1_RSA_OID = '\x2a\x86\x48\x86\xf7\x0d\x01\x01\x01' | |
| 67 | |
| 61 | 68 |
| 62 class PolicyTestBase(pyauto.PyUITest): | 69 class PolicyTestBase(pyauto.PyUITest): |
| 63 """A base class for tests that need to set up and modify policies. | 70 """A base class for tests that need to set up and modify policies. |
| 64 | 71 |
| 65 Subclasses can use the SetPolicies call to set the policies seen by Chrome. | 72 Subclasses can use the methods SetUserPolicy (ChromeOS, Linux, Windows) and |
| 73 SetDevicePolicy (ChromeOS only) to set the policies seen by Chrome. | |
| 66 """ | 74 """ |
| 67 | 75 |
| 68 def _WriteFile(self, path, content): | 76 def _WriteFile(self, path, content): |
| 69 """Writes content to path, creating any intermediary directories.""" | 77 """Writes content to path, creating any intermediary directories.""" |
| 70 if not os.path.exists(os.path.dirname(path)): | 78 if not os.path.exists(os.path.dirname(path)): |
| 71 os.makedirs(os.path.dirname(path)) | 79 os.makedirs(os.path.dirname(path)) |
| 72 f = open(path, 'w') | 80 f = open(path, 'w') |
| 73 f.write(content) | 81 f.write(content) |
| 74 f.close() | 82 f.close() |
| 75 | 83 |
| 76 def _GetTestServerPoliciesFilePath(self): | 84 def _GetTestServerPoliciesFilePath(self): |
| 77 """Returns the path of the cloud policy configuration file.""" | 85 """Returns the path of the cloud policy configuration file.""" |
| 78 assert self.IsChromeOS() | 86 assert self.IsChromeOS() |
| 79 return os.path.join(self._temp_data_dir, 'device_management') | 87 return os.path.join(self._temp_data_dir, 'device_management') |
| 80 | 88 |
| 81 def _GetHttpURLForDeviceManagement(self): | 89 def _GetHttpURLForDeviceManagement(self): |
| 90 """Returns the URL at which the TestServer is serving user policy.""" | |
| 82 assert self.IsChromeOS() | 91 assert self.IsChromeOS() |
| 83 return self._http_server.GetURL('device_management').spec() | 92 return self._http_server.GetURL('device_management').spec() |
| 84 | 93 |
| 85 def _WriteUserPolicyToken(self, token): | 94 def _WriteDevicePolicyWithSessionManagerStopped(self): |
| 86 """Writes the given token to the user device management cache.""" | 95 """Writes the device policy blob while the session manager is stopped. |
| 87 assert self.IsChromeOS() | |
| 88 blob = dml.DeviceCredentials() | |
| 89 blob.device_token = token | |
| 90 blob.device_id = '123' | |
| 91 self._WriteFile('/home/chronos/user/Device Management/Token', | |
| 92 blob.SerializeToString()) | |
| 93 | 96 |
| 94 def _WriteDevicePolicy(self, fetch_response): | 97 Updates the files holding the device policy blob and the public key need to |
| 95 """Writes the given signed fetch_response to the device policy cache. | 98 verify its signature. |
| 96 | |
| 97 Also writes the owner key, used to verify the signature. | |
| 98 """ | 99 """ |
| 99 assert self.IsChromeOS() | 100 assert self.IsChromeOS() |
| 100 self._WriteFile(constants.SIGNED_POLICY_FILE, | |
| 101 fetch_response.SerializeToString()) | |
| 102 self._WriteFile(constants.OWNER_KEY_FILE, | |
| 103 fetch_response.new_public_key) | |
| 104 | |
| 105 def _PostToDMServer(self, request_type, body, headers): | |
| 106 """Posts a request to the TestServer's Device Management interface. | |
| 107 | |
| 108 |request_type| is the value of the 'request' HTTP parameter. | |
| 109 Returns the response's body. | |
| 110 """ | |
| 111 assert self.IsChromeOS() | |
| 112 url = self._GetHttpURLForDeviceManagement() | |
| 113 url += '?' + urllib.urlencode({ | |
| 114 'deviceid': '123', | |
| 115 'oauth_token': '456', | |
| 116 'request': request_type, | |
| 117 'devicetype': 2, | |
| 118 'apptype': 'Chrome', | |
| 119 'agent': 'Chrome', | |
| 120 }) | |
| 121 return urllib2.urlopen(urllib2.Request(url, body, headers)).read() | |
| 122 | |
| 123 def _PostRegisterRequest(self, type): | |
| 124 """Sends a device register request to the TestServer, of the given type.""" | |
| 125 assert self.IsChromeOS() | |
| 126 request = dmb.DeviceManagementRequest() | |
| 127 register = request.register_request | |
| 128 register.machine_id = '789' | |
| 129 register.type = type | |
| 130 return self._PostToDMServer('register', request.SerializeToString(), {}) | |
| 131 | |
| 132 def _RegisterAndGetDMToken(self, device): | |
| 133 """Registers with the TestServer and returns the DMToken fetched. | |
| 134 | |
| 135 Registers for device policy if device is True. Otherwise registers for | |
| 136 user policy. | |
| 137 """ | |
| 138 assert self.IsChromeOS() | |
| 139 type = device and dmb.DeviceRegisterRequest.DEVICE \ | |
| 140 or dmb.DeviceRegisterRequest.USER | |
| 141 rstring = self._PostRegisterRequest(type) | |
| 142 response = dmb.DeviceManagementResponse() | |
| 143 response.ParseFromString(rstring) | |
| 144 return response.register_response.device_management_token | |
| 145 | |
| 146 def _PostPolicyRequest(self, token, type, want_signature=False): | |
| 147 """Fetches policy from the TestServer, using the given token. | |
| 148 | |
| 149 Policy is fetched for the given type. If want_signature is True, the | |
| 150 request will ask for a signed response. Returns the response body. | |
| 151 """ | |
| 152 assert self.IsChromeOS() | |
| 153 request = dmb.DeviceManagementRequest() | |
| 154 policy = request.policy_request | |
| 155 prequest = policy.request.add() | |
| 156 prequest.policy_type = type | |
| 157 if want_signature: | |
| 158 prequest.signature_type = dmb.PolicyFetchRequest.SHA1_RSA | |
| 159 headers = { | |
| 160 'Authorization': 'GoogleDMToken token=' + token, | |
| 161 } | |
| 162 return self._PostToDMServer('policy', request.SerializeToString(), headers) | |
| 163 | |
| 164 def _FetchPolicy(self, token, device): | |
| 165 """Fetches policy from the TestServer, using the given token. | |
| 166 | |
| 167 Token must be a valid token retrieved with _RegisterAndGetDMToken. If | |
| 168 device is True, fetches signed device policy. Otherwise fetches user policy. | |
| 169 This method also verifies the response, and returns the first policy fetch | |
| 170 response. | |
| 171 """ | |
| 172 assert self.IsChromeOS() | |
| 173 type = device and 'google/chromeos/device' or 'google/chromeos/user' | |
| 174 rstring = self._PostPolicyRequest(token=token, type=type, | |
| 175 want_signature=device) | |
| 176 response = dmb.DeviceManagementResponse() | |
| 177 response.ParseFromString(rstring) | |
| 178 fetch_response = response.policy_response.response[0] | |
| 179 assert fetch_response.policy_data | |
| 180 assert fetch_response.policy_data_signature | |
| 181 assert fetch_response.new_public_key | |
| 182 return fetch_response | |
| 183 | |
| 184 def _WriteDevicePolicyWithSessionManagerStopped(self, policy): | |
| 185 """Writes the device policy blob while the Session Manager is stopped.""" | |
| 186 assert self.IsChromeOS() | |
| 187 logging.debug('Stopping session manager') | 101 logging.debug('Stopping session manager') |
| 188 cros_ui.stop(allow_fail=True) | 102 cros_ui.stop(allow_fail=True) |
| 189 logging.debug('Writing device policy cache') | 103 logging.debug('Writing device policy blob') |
| 190 self._WriteDevicePolicy(policy) | 104 self._WriteFile(constants.SIGNED_POLICY_FILE, self._device_policy_blob) |
| 105 self._WriteFile(constants.OWNER_KEY_FILE, self._public_key) | |
| 191 | 106 |
| 192 # Ugly hack: session manager won't spawn chrome if this file exists. That's | 107 # Ugly hack: session manager will not spawn Chrome if this file exists. That |
| 193 # usually a good thing (to keep the automation channel open), but in this | 108 # is usually a good thing (to keep the automation channel open), but in this |
| 194 # case we really want to restart chrome. PyUITest.setUp() will be called | 109 # case we really want to restart chrome. PyUITest.setUp() will be called |
| 195 # after session manager and chrome have restarted, and will setup the | 110 # after session manager and chrome have restarted, and will setup the |
| 196 # automation channel. | 111 # automation channel. |
| 197 restore_magic_file = False | 112 restore_magic_file = False |
| 198 if os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE): | 113 if os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE): |
| 199 logging.debug('DISABLE_BROWSER_RESTART_MAGIC_FILE found. ' | 114 logging.debug('DISABLE_BROWSER_RESTART_MAGIC_FILE found. ' |
| 200 'Removing temporarily for the next restart.') | 115 'Removing temporarily for the next restart.') |
| 201 restore_magic_file = True | 116 restore_magic_file = True |
| 202 os.remove(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE) | 117 os.remove(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE) |
| 203 assert not os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE) | 118 assert not os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE) |
| 204 | 119 |
| 205 logging.debug('Starting session manager again') | 120 logging.debug('Starting session manager again') |
| 206 cros_ui.start() | 121 cros_ui.start() |
| 207 | 122 |
| 208 # cros_ui.start() waits for the login prompt to be visible, so chrome has | 123 # cros_ui.start() waits for the login prompt to be visible, so Chrome has |
| 209 # already started once it returns. | 124 # already started once it returns. |
| 210 if restore_magic_file: | 125 if restore_magic_file: |
| 211 open(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE, 'w').close() | 126 open(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE, 'w').close() |
| 212 assert os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE) | 127 assert os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE) |
| 213 | 128 |
| 214 def ExtraChromeFlags(self): | 129 def ExtraChromeFlags(self): |
| 215 """Sets up Chrome to use cloud policies on ChromeOS.""" | 130 """Sets up Chrome to use cloud policies on ChromeOS.""" |
| 216 flags = pyauto.PyUITest.ExtraChromeFlags(self) | 131 flags = pyauto.PyUITest.ExtraChromeFlags(self) |
| 217 if self.IsChromeOS(): | 132 if self.IsChromeOS(): |
| 133 while '--skip-oauth-login' in flags: | |
| 134 flags.remove('--skip-oauth-login') | |
|
frankf
2012/04/04 01:44:01
why do we need oauth-login?
bartfab (slow)
2012/04/04 09:24:14
An OAuth token is required for Chrome to register
frankf
2012/04/04 21:44:10
Previously, we explicitly registered with the DMSe
bartfab (slow)
2012/04/05 08:24:45
We can make things hermetic just as easily. The mo
| |
| 218 url = self._GetHttpURLForDeviceManagement() | 135 url = self._GetHttpURLForDeviceManagement() |
| 219 flag = '--device-management-url=' + url | 136 flags.append('--device-management-url=' + url) |
| 220 flags += [flag] | 137 flags.append('--disable-sync') |
| 221 return flags | 138 return flags |
| 222 | 139 |
| 223 def setUp(self): | 140 def setUp(self): |
| 224 """Sets up the platform for policy testing. | 141 """Sets up the platform for policy testing. |
| 225 | 142 |
| 226 On ChromeOS, part of the set up involves restarting the session_manager and | 143 On ChromeOS, part of the setup involves restarting the session manager to |
| 227 logging in with the $default account. | 144 inject a device policy blob. |
| 228 """ | 145 """ |
| 229 if self.IsChromeOS(): | 146 if self.IsChromeOS(): |
| 230 # Setup a temporary data dir and a TestServer serving files from there. | 147 # Set up a temporary data dir and a TestServer serving files from there. |
| 231 # The TestServer makes its document root relative to the src dir. | 148 # The TestServer makes its document root relative to the src dir. |
| 232 self._temp_data_dir = tempfile.mkdtemp(dir=pyauto_paths.GetSourceDir()) | 149 self._temp_data_dir = tempfile.mkdtemp(dir=pyauto_paths.GetSourceDir()) |
| 233 relative_temp_data_dir = os.path.basename(self._temp_data_dir) | 150 relative_temp_data_dir = os.path.basename(self._temp_data_dir) |
| 234 self._http_server = self.StartHTTPServer(relative_temp_data_dir) | 151 self._http_server = self.StartHTTPServer(relative_temp_data_dir) |
| 235 | 152 |
| 236 # Setup empty policies, so that the TestServer can start replying. | 153 # Set up an empty user policy so that the TestServer can start replying. |
| 237 self._SetCloudPolicies() | 154 self._SetUserPolicyChromeOS() |
| 238 | 155 |
| 239 device_dmtoken = self._RegisterAndGetDMToken(device=True) | 156 # Generate a key pair for signing device policy. |
| 240 policy = self._FetchPolicy(token=device_dmtoken, device=True) | 157 self._private_key = tlslite.api.generateRSAKey(1024) |
| 241 user_dmtoken = self._RegisterAndGetDMToken(device=False) | 158 algorithm = asn1der.Sequence( |
| 159 [ asn1der.Data(asn1der.OBJECT_IDENTIFIER, PKCS1_RSA_OID), | |
| 160 asn1der.Data(asn1der.NULL, '') ]) | |
|
frankf
2012/04/04 01:44:01
No spaces around brackets.
bartfab (slow)
2012/04/04 09:24:14
Done.
| |
| 161 rsa_pubkey = asn1der.Sequence([ asn1der.Integer(self._private_key.n), | |
| 162 asn1der.Integer(self._private_key.e) ]) | |
| 163 self._public_key = asn1der.Sequence( | |
| 164 [ algorithm, asn1der.Bitstring(rsa_pubkey) ]) | |
| 242 | 165 |
| 243 # The device policy blob is only picked up by the session manager on | 166 # Clear device policy. This also invokes pyauto.PyUITest.setUp(self). |
| 244 # startup, and is overwritten on shutdown. So the blob has to be written | 167 self.SetDevicePolicy() |
| 245 # while the session manager is stopped. | |
| 246 self.WaitForSessionManagerRestart( | |
| 247 lambda: self._WriteDevicePolicyWithSessionManagerStopped(policy)) | |
| 248 logging.debug('Session manager restarted with device policy ready') | |
| 249 | 168 |
| 250 pyauto.PyUITest.setUp(self) | 169 # Remove any existing vaults. |
| 251 | 170 self.RemoveAllCryptohomeVaultsOnChromeOS() |
| 252 if self.IsChromeOS(): | 171 else: |
| 253 logging.debug('Logging in') | 172 pyauto.PyUITest.setUp(self) |
| 254 credentials = self.GetPrivateInfo()['prod_enterprise_test_user'] | |
| 255 self.Login(credentials['username'], credentials['password']) | |
| 256 assert self.GetLoginInfo()['is_logged_in'] | |
| 257 | |
| 258 self._WriteUserPolicyToken(user_dmtoken) | |
| 259 # The browser has to be reloaded to make the user policy token cache | |
| 260 # reload the file just written. The file can also be written only after | |
| 261 # the cryptohome is mounted, after login. | |
| 262 self.RestartBrowser(clear_profile=False) | |
| 263 | 173 |
| 264 def tearDown(self): | 174 def tearDown(self): |
| 265 """Cleans up the files created by setUp and policies added in tests.""" | 175 """Cleans up the policies and related files created in tests.""" |
| 266 # Clear the policies. | |
| 267 self.SetPolicies() | |
| 268 | |
| 269 if self.IsChromeOS(): | 176 if self.IsChromeOS(): |
|
frankf
2012/04/04 01:44:01
Add comment on why you need this.
bartfab (slow)
2012/04/04 09:24:14
Done.
| |
| 270 pyauto.PyUITest.Logout(self) | 177 self.SetDevicePolicy() |
| 178 else: | |
| 179 self.SetUserPolicy() | |
| 271 | 180 |
| 272 pyauto.PyUITest.tearDown(self) | 181 pyauto.PyUITest.tearDown(self) |
| 273 | 182 |
| 274 if self.IsChromeOS(): | 183 if self.IsChromeOS(): |
| 275 self.StopHTTPServer(self._http_server) | 184 self.StopHTTPServer(self._http_server) |
| 276 pyauto_utils.RemovePath(self._temp_data_dir) | 185 pyauto_utils.RemovePath(self._temp_data_dir) |
| 186 self.RemoveAllCryptohomeVaultsOnChromeOS() | |
| 277 | 187 |
| 278 def _SetCloudPolicies(self, user_mandatory=None, user_recommended=None, | 188 def LoginWithTestAccount(self, account='prod_enterprise_test_user'): |
| 279 device=None): | 189 """Convenience method for logging in with one of the test accounts.""" |
| 280 """Exports the policies to the configuration file of the TestServer. | 190 assert self.IsChromeOS() |
| 191 credentials = self.GetPrivateInfo()[account] | |
| 192 self.Login(credentials['username'], credentials['password']) | |
| 193 assert self.GetLoginInfo()['is_logged_in'] | |
| 281 | 194 |
| 282 The TestServer will serve these policies after this function returns. | 195 def SetPolicy(self): |
| 196 raise NotImplementedError('This class supports user and device policies. ' | |
| 197 'Instead of SetPolicies, use SetUserPolicy or ' | |
| 198 'SetDevicePolicy.') | |
| 283 | 199 |
| 284 Args: | 200 def SetUserPolicy(self, user_policy=None): |
| 285 user_mandatory: user policies of mandatory level | 201 """Sets the user policy provided as a dict. |
| 286 user_recommended: user policies of recommended level | 202 |
| 287 device: device policies | 203 Passing a value of None clears the user policy.""" |
| 288 """ | 204 if self.IsChromeOS(): |
| 205 self._SetUserPolicyChromeOS(user_policy=user_policy) | |
| 206 elif self.IsWin(): | |
| 207 self._SetUserPolicyWin(user_policy=user_policy) | |
| 208 elif self.IsLinux(): | |
| 209 self._SetUserPolicyLinux(user_policy=user_policy) | |
| 210 else: | |
| 211 raise NotImplementedError('Not available on this platform.') | |
| 212 | |
| 213 self.RefreshPolicies() | |
| 214 | |
| 215 def _SetUserPolicyChromeOS(self, user_policy=None): | |
| 216 """Writes the given user policy to the TestServer's input file.""" | |
| 289 assert self.IsChromeOS() | 217 assert self.IsChromeOS() |
| 290 policy_dict = { | 218 policy_dict = { |
| 291 'google/chromeos/device': device or {}, | 219 'google/chromeos/device': {}, |
| 292 'google/chromeos/user': { | 220 'google/chromeos/user': { |
| 293 'mandatory': user_mandatory or {}, | 221 'mandatory': user_policy or {}, |
| 294 'recommended': user_recommended or {}, | 222 'recommended': {}, |
| 295 }, | 223 }, |
| 296 'managed_users': ['*'], | 224 'managed_users': ['*'], |
| 297 } | 225 } |
| 298 self._WriteFile(self._GetTestServerPoliciesFilePath(), | 226 self._WriteFile(self._GetTestServerPoliciesFilePath(), |
| 299 json.dumps(policy_dict, sort_keys=True, indent=2) + '\n') | 227 json.dumps(policy_dict, sort_keys=True, indent=2) + '\n') |
| 300 | 228 |
| 301 def _SetPoliciesWin(self, user_policy=None): | 229 def _SetUserPolicyWin(self, user_policy=None): |
| 302 """Exports the policies as dictionary in the argument to Window registry. | 230 """Writes the given user policy to the Windows registry.""" |
| 303 | |
| 304 Removes the registry key and its subkeys if they exist. | |
| 305 | |
| 306 Args: | |
| 307 user_policy: A dictionary representing the user policies. Clear the | |
| 308 registry if None. | |
| 309 | |
| 310 Raises: | |
| 311 TypeError: If an unexpected value is found in the policy dictionary. | |
| 312 """ | |
| 313 | |
| 314 def SetValueEx(key, sub_key, value): | 231 def SetValueEx(key, sub_key, value): |
| 315 if isinstance(value, int): | 232 if isinstance(value, int): |
| 316 winreg.SetValueEx(key, sub_key, 0, winreg.REG_DWORD, int(value)) | 233 winreg.SetValueEx(key, sub_key, 0, winreg.REG_DWORD, int(value)) |
| 317 elif isinstance(value, basestring): | 234 elif isinstance(value, basestring): |
| 318 winreg.SetValueEx(key, sub_key, 0, winreg.REG_SZ, value.encode('ascii')) | 235 winreg.SetValueEx(key, sub_key, 0, winreg.REG_SZ, value.encode('ascii')) |
| 319 elif isinstance(value, list): | 236 elif isinstance(value, list): |
| 320 k = winreg.CreateKey(key, sub_key) | 237 k = winreg.CreateKey(key, sub_key) |
| 321 for index, v in list(enumerate(value)): | 238 for index, v in list(enumerate(value)): |
| 322 SetValueEx(k, str(index + 1), v) | 239 SetValueEx(k, str(index + 1), v) |
| 323 winreg.CloseKey(k) | 240 winreg.CloseKey(k) |
| (...skipping 10 matching lines...) Expand all Loading... | |
| 334 r'reg query HKEY_LOCAL_MACHINE\%s' % reg_base) == 0: | 251 r'reg query HKEY_LOCAL_MACHINE\%s' % reg_base) == 0: |
| 335 logging.debug(r'Removing %s' % reg_base) | 252 logging.debug(r'Removing %s' % reg_base) |
| 336 subprocess.call(r'reg delete HKLM\%s /f' % reg_base) | 253 subprocess.call(r'reg delete HKLM\%s /f' % reg_base) |
| 337 | 254 |
| 338 if user_policy is not None: | 255 if user_policy is not None: |
| 339 root_key = winreg.CreateKey(winreg.HKEY_LOCAL_MACHINE, reg_base) | 256 root_key = winreg.CreateKey(winreg.HKEY_LOCAL_MACHINE, reg_base) |
| 340 for k, v in user_policy.iteritems(): | 257 for k, v in user_policy.iteritems(): |
| 341 SetValueEx(root_key, k, v) | 258 SetValueEx(root_key, k, v) |
| 342 winreg.CloseKey(root_key) | 259 winreg.CloseKey(root_key) |
| 343 | 260 |
| 344 def _SetPoliciesLinux(self, user_policy=None): | 261 def _SetUserPolicyLinux(self, user_policy=None): |
| 345 """Exports the policies as dictionary in the argument to a JSON file. | 262 """Writes the given user policy to the JSON policy file read by Chrome.""" |
| 346 | |
| 347 Removes the JSON file if it exists. | |
| 348 | |
| 349 Args: | |
| 350 user_policy: A dictionary representing the user policies. Remove the | |
| 351 JSON file if None | |
| 352 """ | |
| 353 assert self.IsLinux() | 263 assert self.IsLinux() |
| 354 sudo_cmd_file = os.path.join(os.path.dirname(__file__), | 264 sudo_cmd_file = os.path.join(os.path.dirname(__file__), |
| 355 'policy_linux_util.py') | 265 'policy_linux_util.py') |
| 356 | 266 |
| 357 if self.GetBrowserInfo()['properties']['branding'] == 'Google Chrome': | 267 if self.GetBrowserInfo()['properties']['branding'] == 'Google Chrome': |
| 358 policies_location_base = '/etc/opt/chrome' | 268 policies_location_base = '/etc/opt/chrome' |
| 359 else: | 269 else: |
| 360 policies_location_base = '/etc/chromium' | 270 policies_location_base = '/etc/chromium' |
| 361 | 271 |
| 362 if os.path.isdir(policies_location_base): | 272 if os.path.isdir(policies_location_base): |
| 363 logging.debug('Removing directory %s' % policies_location_base) | 273 logging.debug('Removing directory %s' % policies_location_base) |
| 364 subprocess.call(['suid-python', sudo_cmd_file, | 274 subprocess.call(['suid-python', sudo_cmd_file, |
| 365 'remove_dir', policies_location_base]) | 275 'remove_dir', policies_location_base]) |
| 366 | 276 |
| 367 if user_policy is not None: | 277 if user_policy is not None: |
| 368 self._WriteFile('/tmp/chrome.json', | 278 self._WriteFile('/tmp/chrome.json', |
| 369 json.dumps(user_policy, sort_keys=True, indent=2) + '\n') | 279 json.dumps(user_policy, sort_keys=True, indent=2) + '\n') |
| 370 | 280 |
| 371 policies_location = '%s/policies/managed' % policies_location_base | 281 policies_location = '%s/policies/managed' % policies_location_base |
| 372 subprocess.call(['suid-python', sudo_cmd_file, | 282 subprocess.call(['suid-python', sudo_cmd_file, |
| 373 'setup_dir', policies_location]) | 283 'setup_dir', policies_location]) |
| 374 # Copy chrome.json file to the managed directory | 284 # Copy chrome.json file to the managed directory |
| 375 subprocess.call(['suid-python', sudo_cmd_file, | 285 subprocess.call(['suid-python', sudo_cmd_file, |
| 376 'copy', '/tmp/chrome.json', policies_location]) | 286 'copy', '/tmp/chrome.json', policies_location]) |
| 377 os.remove('/tmp/chrome.json') | 287 os.remove('/tmp/chrome.json') |
| 378 | 288 |
| 379 def SetPolicies(self, user_policy=None, device_policy=None): | 289 def SetDevicePolicy(self, device_policy=None, owner=None): |
| 380 """Enforces the policies given in the arguments as dictionaries. | 290 """Sets the device policy provided as a dict and the owner on ChromeOS. |
| 381 | 291 |
| 382 These policies will have been installed after this call returns. | 292 Passing a value of None as the device policy clears it.""" |
| 293 if not self.IsChromeOS(): | |
| 294 raise NotImplementedError('Device policy is only available on ChromeOS.') | |
| 383 | 295 |
| 384 Args: | 296 self._GenerateDevicePolicyBlob(device_policy, owner) |
| 385 user_policy: A dictionary representing the user policies. | 297 self._RefreshDevicePolicy() |
| 386 device_policy: A dictionary representing the device policies on Chrome OS. | |
| 387 | 298 |
| 388 Raises: | 299 def _SetProtobufMessageField(self, group_message, field, field_value): |
| 389 NotImplementedError if the platform is not supported. | 300 """Sets the given field in a protobuf to the given value.""" |
| 390 """ | 301 if field.label == field.LABEL_REPEATED: |
| 391 if self.IsChromeOS(): | 302 assert type(field_value) == list |
| 392 self._SetCloudPolicies(user_mandatory=user_policy, device=device_policy) | 303 entries = group_message.__getattribute__(field.name) |
| 304 for list_item in field_value: | |
| 305 entries.append(list_item) | |
| 306 return | |
| 307 elif field.type == field.TYPE_BOOL: | |
| 308 assert type(field_value) == bool | |
| 309 elif field.type == field.TYPE_STRING: | |
| 310 assert type(field_value) == str or type(field_value) == unicode | |
| 311 elif field.type == field.TYPE_INT64: | |
| 312 assert type(field_value) == int | |
| 313 elif (field.type == field.TYPE_MESSAGE and | |
| 314 field.message_type.name == 'StringList'): | |
| 315 assert type(field_value) == list | |
| 316 entries = group_message.__getattribute__(field.name).entries | |
| 317 for list_item in field_value: | |
| 318 entries.append(list_item) | |
| 319 return | |
| 393 else: | 320 else: |
| 394 if device_policy is not None: | 321 raise Exception('Unknown field type %s' % field.type) |
| 395 raise NotImplementedError('Device policy is only available on ChromeOS') | 322 group_message.__setattr__(field.name, field_value) |
| 396 if self.IsWin(): | |
| 397 self._SetPoliciesWin(user_policy=user_policy) | |
| 398 elif self.IsLinux(): | |
| 399 self._SetPoliciesLinux(user_policy=user_policy) | |
| 400 else: | |
| 401 raise NotImplementedError('Not available on this platform.') | |
| 402 | 323 |
| 403 self.RefreshPolicies() | 324 def _GenerateDevicePolicyBlob(self, device_policy=None, owner=None): |
|
frankf
2012/04/04 01:44:01
Move this before SetDevicePolicy. It's a good idea
bartfab (slow)
2012/04/04 09:24:14
Done. I reordered all functions so that they are d
| |
| 325 """Generates a signed device policy blob.""" | |
| 404 | 326 |
| 327 # Fill in the device settings protobuf. | |
| 328 device_policy = device_policy or {} | |
| 329 owner = owner or constants.CREDENTIALS['$mockowner'][0] | |
| 330 settings = dp.ChromeDeviceSettingsProto() | |
| 331 for group in settings.DESCRIPTOR.fields: | |
| 332 # Create protobuf message for group. | |
| 333 group_message = eval('dp.' + group.message_type.name + '()') | |
| 334 # Indicates if at least one field was set in |group_message|. | |
| 335 got_fields = False | |
| 336 # Iterate over fields of the message and feed them from the policy dict. | |
| 337 for field in group_message.DESCRIPTOR.fields: | |
| 338 field_value = None | |
| 339 if field.name in device_policy: | |
| 340 got_fields = True | |
| 341 field_value = device_policy[field.name] | |
| 342 self._SetProtobufMessageField(group_message, field, field_value) | |
| 343 if got_fields: | |
| 344 settings.__getattribute__(group.name).CopyFrom(group_message) | |
| 345 | |
| 346 # Fill in the policy data protobuf. | |
| 347 policy_data = dm.PolicyData() | |
| 348 policy_data.policy_type = 'google/chromeos/device' | |
| 349 policy_data.policy_value = settings.SerializeToString() | |
| 350 policy_data.username = owner or '' | |
|
frankf
2012/04/04 01:44:01
don't need 'or'.
bartfab (slow)
2012/04/04 09:24:14
Done.
| |
| 351 serialized_policy_data = policy_data.SerializeToString() | |
| 352 | |
| 353 # Fill in the device management response protobuf. | |
| 354 response = dm.DeviceManagementResponse() | |
| 355 fetch_response = response.policy_response.response.add() | |
| 356 fetch_response.policy_data = serialized_policy_data | |
| 357 fetch_response.policy_data_signature = ( | |
| 358 self._private_key.hashAndSign(serialized_policy_data).tostring()) | |
| 359 | |
| 360 self._device_policy_blob = fetch_response.SerializeToString() | |
| 361 | |
| 362 def _RefreshDevicePolicy(self): | |
| 363 """Refreshes the device policy in force on ChromeOS.""" | |
| 364 assert self.IsChromeOS() | |
| 365 # The device policy blob is only picked up by the session manager on | |
| 366 # startup, and is overwritten on shutdown. So the blob has to be written | |
| 367 # while the session manager is stopped. | |
| 368 self.WaitForSessionManagerRestart( | |
| 369 lambda: self._WriteDevicePolicyWithSessionManagerStopped()) | |
| 370 logging.debug('Session manager restarted with device policy ready') | |
| 371 pyauto.PyUITest.setUp(self) | |
| OLD | NEW |