OLD | NEW |
---|---|
1 #!/usr/bin/python | 1 #!/usr/bin/python |
2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | 2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. |
3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
5 | 5 |
6 """Base class for tests that need to update the policies enforced by Chrome. | 6 """Base class for tests that need to update the policies enforced by Chrome. |
7 | 7 |
8 Subclasses can call SetPolicies with a dictionary of policies to install. | 8 Subclasses can call SetUserPolicy (ChromeOS, Linux, Windows) and |
9 SetPolicies can also be used to set the device policies on ChromeOS. | 9 SetDevicePolicy (ChromeOS only) with a dictionary of the policies to install. |
10 | 10 |
11 The current implementation depends on the platform. The implementations might | 11 The current implementation depends on the platform. The implementations might |
12 change in the future, but tests relying on SetPolicies will keep working. | 12 change in the future, but tests relying on the above calls will keep working. |
13 """ | 13 """ |
14 | 14 |
15 # On ChromeOS this relies on the device_management.py part of the TestServer, | 15 # On ChromeOS, both user and device policies are supported. Chrome is set up to |
16 # and forces the policies by triggering a new policy fetch and refreshing the | 16 # fetch user policy from a TestServer. A call to SetUserPolicy triggers an |
17 # cloud policy providers. This requires preparing the system for policy | 17 # immediate policy refresh, allowing the effects of policy changes on a running |
18 # fetching, which currently means the DMTokens have to be in place. Without the | 18 # session to be tested. |
19 # DMTokens, the cloud policy controller won't be able to proceed, because the | |
20 # Gaia tokens for the DMService aren't available during tests. | |
21 # In the future this setup might not be necessary anymore, and the policy | |
22 # might also be pushed through the session_manager. | |
23 # | 19 # |
24 # On other platforms it relies on the SetPolicies automation call, which is | 20 # Device policy is injected by stopping Chrome and the session manager, writing |
25 # only available on non-official builds. This automation call is meant to be | 21 # a new device policy blob and starting Chrome and the session manager again. |
frankf
2012/04/04 01:44:01
Mention that this will log you out. The limitation
bartfab (slow)
2012/04/04 09:24:14
Done.
| |
26 # eventually removed when a replacement for every platform is available. | 22 # This is necessary because Chrome will only fetch device policy from a |
27 # This requires setting up the policies in the registry on Windows, and writing | 23 # TestServer if the device is enterprise owned. Enterprise ownership, in turn, |
24 # requires ownership of the TPM which can only be undone by reboothing the | |
25 # device (and hence is not possible in a client test). | |
28 # the right files on Linux and Mac. | 26 # the right files on Linux and Mac. |
frankf
2012/04/04 01:44:01
delete this line.
bartfab (slow)
2012/04/04 09:24:14
Done.
| |
29 | 27 |
30 import json | 28 import json |
31 import logging | 29 import logging |
32 import os | 30 import os |
33 import subprocess | 31 import subprocess |
34 import tempfile | 32 import tempfile |
35 import urllib | |
36 import urllib2 | |
37 | 33 |
34 import asn1der | |
38 import pyauto | 35 import pyauto |
39 import pyauto_paths | 36 import pyauto_paths |
40 import pyauto_utils | 37 import pyauto_utils |
41 | 38 |
42 | 39 |
43 if pyauto.PyUITest.IsChromeOS(): | 40 if pyauto.PyUITest.IsChromeOS(): |
44 import sys | 41 import sys |
42 import warnings | |
43 # Ignore deprecation warnings, they make our output more cluttered. | |
44 warnings.filterwarnings("ignore", category=DeprecationWarning) | |
45 | |
45 # Find the path to the pyproto and add it to sys.path. | 46 # Find the path to the pyproto and add it to sys.path. |
46 # Prepend it so that google.protobuf is loaded from here. | 47 # Prepend it so that google.protobuf is loaded from here. |
47 for path in pyauto_paths.GetBuildDirs(): | 48 for path in pyauto_paths.GetBuildDirs(): |
48 p = os.path.join(path, 'pyproto') | 49 p = os.path.join(path, 'pyproto') |
49 if os.path.isdir(p): | 50 if os.path.isdir(p): |
50 sys.path = [p, os.path.join(p, 'chrome', 'browser', 'policy', | 51 sys.path = [p, os.path.join(p, 'chrome', 'browser', 'policy', |
51 'proto')] + sys.path | 52 'proto')] + sys.path |
52 break | 53 break |
53 sys.path.append('/usr/local') # to import autotest libs. | 54 sys.path.append('/usr/local') # to import autotest libs. |
54 import device_management_local_pb2 as dml | 55 sys.path.append(os.path.join(pyauto_paths.GetThirdPartyDir(), 'tlslite')) |
55 import device_management_backend_pb2 as dmb | 56 |
57 import chrome_device_policy_pb2 as dp | |
58 import device_management_backend_pb2 as dm | |
59 import tlslite.api | |
56 from autotest.cros import constants | 60 from autotest.cros import constants |
57 from autotest.cros import cros_ui | 61 from autotest.cros import cros_ui |
58 elif pyauto.PyUITest.IsWin(): | 62 elif pyauto.PyUITest.IsWin(): |
59 import _winreg as winreg | 63 import _winreg as winreg |
60 | 64 |
65 # ASN.1 object identifier for PKCS#1/RSA. | |
66 PKCS1_RSA_OID = '\x2a\x86\x48\x86\xf7\x0d\x01\x01\x01' | |
67 | |
61 | 68 |
62 class PolicyTestBase(pyauto.PyUITest): | 69 class PolicyTestBase(pyauto.PyUITest): |
63 """A base class for tests that need to set up and modify policies. | 70 """A base class for tests that need to set up and modify policies. |
64 | 71 |
65 Subclasses can use the SetPolicies call to set the policies seen by Chrome. | 72 Subclasses can use the methods SetUserPolicy (ChromeOS, Linux, Windows) and |
73 SetDevicePolicy (ChromeOS only) to set the policies seen by Chrome. | |
66 """ | 74 """ |
67 | 75 |
68 def _WriteFile(self, path, content): | 76 def _WriteFile(self, path, content): |
69 """Writes content to path, creating any intermediary directories.""" | 77 """Writes content to path, creating any intermediary directories.""" |
70 if not os.path.exists(os.path.dirname(path)): | 78 if not os.path.exists(os.path.dirname(path)): |
71 os.makedirs(os.path.dirname(path)) | 79 os.makedirs(os.path.dirname(path)) |
72 f = open(path, 'w') | 80 f = open(path, 'w') |
73 f.write(content) | 81 f.write(content) |
74 f.close() | 82 f.close() |
75 | 83 |
76 def _GetTestServerPoliciesFilePath(self): | 84 def _GetTestServerPoliciesFilePath(self): |
77 """Returns the path of the cloud policy configuration file.""" | 85 """Returns the path of the cloud policy configuration file.""" |
78 assert self.IsChromeOS() | 86 assert self.IsChromeOS() |
79 return os.path.join(self._temp_data_dir, 'device_management') | 87 return os.path.join(self._temp_data_dir, 'device_management') |
80 | 88 |
81 def _GetHttpURLForDeviceManagement(self): | 89 def _GetHttpURLForDeviceManagement(self): |
90 """Returns the URL at which the TestServer is serving user policy.""" | |
82 assert self.IsChromeOS() | 91 assert self.IsChromeOS() |
83 return self._http_server.GetURL('device_management').spec() | 92 return self._http_server.GetURL('device_management').spec() |
84 | 93 |
85 def _WriteUserPolicyToken(self, token): | 94 def _WriteDevicePolicyWithSessionManagerStopped(self): |
86 """Writes the given token to the user device management cache.""" | 95 """Writes the device policy blob while the session manager is stopped. |
87 assert self.IsChromeOS() | |
88 blob = dml.DeviceCredentials() | |
89 blob.device_token = token | |
90 blob.device_id = '123' | |
91 self._WriteFile('/home/chronos/user/Device Management/Token', | |
92 blob.SerializeToString()) | |
93 | 96 |
94 def _WriteDevicePolicy(self, fetch_response): | 97 Updates the files holding the device policy blob and the public key need to |
95 """Writes the given signed fetch_response to the device policy cache. | 98 verify its signature. |
96 | |
97 Also writes the owner key, used to verify the signature. | |
98 """ | 99 """ |
99 assert self.IsChromeOS() | 100 assert self.IsChromeOS() |
100 self._WriteFile(constants.SIGNED_POLICY_FILE, | |
101 fetch_response.SerializeToString()) | |
102 self._WriteFile(constants.OWNER_KEY_FILE, | |
103 fetch_response.new_public_key) | |
104 | |
105 def _PostToDMServer(self, request_type, body, headers): | |
106 """Posts a request to the TestServer's Device Management interface. | |
107 | |
108 |request_type| is the value of the 'request' HTTP parameter. | |
109 Returns the response's body. | |
110 """ | |
111 assert self.IsChromeOS() | |
112 url = self._GetHttpURLForDeviceManagement() | |
113 url += '?' + urllib.urlencode({ | |
114 'deviceid': '123', | |
115 'oauth_token': '456', | |
116 'request': request_type, | |
117 'devicetype': 2, | |
118 'apptype': 'Chrome', | |
119 'agent': 'Chrome', | |
120 }) | |
121 return urllib2.urlopen(urllib2.Request(url, body, headers)).read() | |
122 | |
123 def _PostRegisterRequest(self, type): | |
124 """Sends a device register request to the TestServer, of the given type.""" | |
125 assert self.IsChromeOS() | |
126 request = dmb.DeviceManagementRequest() | |
127 register = request.register_request | |
128 register.machine_id = '789' | |
129 register.type = type | |
130 return self._PostToDMServer('register', request.SerializeToString(), {}) | |
131 | |
132 def _RegisterAndGetDMToken(self, device): | |
133 """Registers with the TestServer and returns the DMToken fetched. | |
134 | |
135 Registers for device policy if device is True. Otherwise registers for | |
136 user policy. | |
137 """ | |
138 assert self.IsChromeOS() | |
139 type = device and dmb.DeviceRegisterRequest.DEVICE \ | |
140 or dmb.DeviceRegisterRequest.USER | |
141 rstring = self._PostRegisterRequest(type) | |
142 response = dmb.DeviceManagementResponse() | |
143 response.ParseFromString(rstring) | |
144 return response.register_response.device_management_token | |
145 | |
146 def _PostPolicyRequest(self, token, type, want_signature=False): | |
147 """Fetches policy from the TestServer, using the given token. | |
148 | |
149 Policy is fetched for the given type. If want_signature is True, the | |
150 request will ask for a signed response. Returns the response body. | |
151 """ | |
152 assert self.IsChromeOS() | |
153 request = dmb.DeviceManagementRequest() | |
154 policy = request.policy_request | |
155 prequest = policy.request.add() | |
156 prequest.policy_type = type | |
157 if want_signature: | |
158 prequest.signature_type = dmb.PolicyFetchRequest.SHA1_RSA | |
159 headers = { | |
160 'Authorization': 'GoogleDMToken token=' + token, | |
161 } | |
162 return self._PostToDMServer('policy', request.SerializeToString(), headers) | |
163 | |
164 def _FetchPolicy(self, token, device): | |
165 """Fetches policy from the TestServer, using the given token. | |
166 | |
167 Token must be a valid token retrieved with _RegisterAndGetDMToken. If | |
168 device is True, fetches signed device policy. Otherwise fetches user policy. | |
169 This method also verifies the response, and returns the first policy fetch | |
170 response. | |
171 """ | |
172 assert self.IsChromeOS() | |
173 type = device and 'google/chromeos/device' or 'google/chromeos/user' | |
174 rstring = self._PostPolicyRequest(token=token, type=type, | |
175 want_signature=device) | |
176 response = dmb.DeviceManagementResponse() | |
177 response.ParseFromString(rstring) | |
178 fetch_response = response.policy_response.response[0] | |
179 assert fetch_response.policy_data | |
180 assert fetch_response.policy_data_signature | |
181 assert fetch_response.new_public_key | |
182 return fetch_response | |
183 | |
184 def _WriteDevicePolicyWithSessionManagerStopped(self, policy): | |
185 """Writes the device policy blob while the Session Manager is stopped.""" | |
186 assert self.IsChromeOS() | |
187 logging.debug('Stopping session manager') | 101 logging.debug('Stopping session manager') |
188 cros_ui.stop(allow_fail=True) | 102 cros_ui.stop(allow_fail=True) |
189 logging.debug('Writing device policy cache') | 103 logging.debug('Writing device policy blob') |
190 self._WriteDevicePolicy(policy) | 104 self._WriteFile(constants.SIGNED_POLICY_FILE, self._device_policy_blob) |
105 self._WriteFile(constants.OWNER_KEY_FILE, self._public_key) | |
191 | 106 |
192 # Ugly hack: session manager won't spawn chrome if this file exists. That's | 107 # Ugly hack: session manager will not spawn Chrome if this file exists. That |
193 # usually a good thing (to keep the automation channel open), but in this | 108 # is usually a good thing (to keep the automation channel open), but in this |
194 # case we really want to restart chrome. PyUITest.setUp() will be called | 109 # case we really want to restart chrome. PyUITest.setUp() will be called |
195 # after session manager and chrome have restarted, and will setup the | 110 # after session manager and chrome have restarted, and will setup the |
196 # automation channel. | 111 # automation channel. |
197 restore_magic_file = False | 112 restore_magic_file = False |
198 if os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE): | 113 if os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE): |
199 logging.debug('DISABLE_BROWSER_RESTART_MAGIC_FILE found. ' | 114 logging.debug('DISABLE_BROWSER_RESTART_MAGIC_FILE found. ' |
200 'Removing temporarily for the next restart.') | 115 'Removing temporarily for the next restart.') |
201 restore_magic_file = True | 116 restore_magic_file = True |
202 os.remove(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE) | 117 os.remove(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE) |
203 assert not os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE) | 118 assert not os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE) |
204 | 119 |
205 logging.debug('Starting session manager again') | 120 logging.debug('Starting session manager again') |
206 cros_ui.start() | 121 cros_ui.start() |
207 | 122 |
208 # cros_ui.start() waits for the login prompt to be visible, so chrome has | 123 # cros_ui.start() waits for the login prompt to be visible, so Chrome has |
209 # already started once it returns. | 124 # already started once it returns. |
210 if restore_magic_file: | 125 if restore_magic_file: |
211 open(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE, 'w').close() | 126 open(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE, 'w').close() |
212 assert os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE) | 127 assert os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE) |
213 | 128 |
214 def ExtraChromeFlags(self): | 129 def ExtraChromeFlags(self): |
215 """Sets up Chrome to use cloud policies on ChromeOS.""" | 130 """Sets up Chrome to use cloud policies on ChromeOS.""" |
216 flags = pyauto.PyUITest.ExtraChromeFlags(self) | 131 flags = pyauto.PyUITest.ExtraChromeFlags(self) |
217 if self.IsChromeOS(): | 132 if self.IsChromeOS(): |
133 while '--skip-oauth-login' in flags: | |
134 flags.remove('--skip-oauth-login') | |
frankf
2012/04/04 01:44:01
why do we need oauth-login?
bartfab (slow)
2012/04/04 09:24:14
An OAuth token is required for Chrome to register
frankf
2012/04/04 21:44:10
Previously, we explicitly registered with the DMSe
bartfab (slow)
2012/04/05 08:24:45
We can make things hermetic just as easily. The mo
| |
218 url = self._GetHttpURLForDeviceManagement() | 135 url = self._GetHttpURLForDeviceManagement() |
219 flag = '--device-management-url=' + url | 136 flags.append('--device-management-url=' + url) |
220 flags += [flag] | 137 flags.append('--disable-sync') |
221 return flags | 138 return flags |
222 | 139 |
223 def setUp(self): | 140 def setUp(self): |
224 """Sets up the platform for policy testing. | 141 """Sets up the platform for policy testing. |
225 | 142 |
226 On ChromeOS, part of the set up involves restarting the session_manager and | 143 On ChromeOS, part of the setup involves restarting the session manager to |
227 logging in with the $default account. | 144 inject a device policy blob. |
228 """ | 145 """ |
229 if self.IsChromeOS(): | 146 if self.IsChromeOS(): |
230 # Setup a temporary data dir and a TestServer serving files from there. | 147 # Set up a temporary data dir and a TestServer serving files from there. |
231 # The TestServer makes its document root relative to the src dir. | 148 # The TestServer makes its document root relative to the src dir. |
232 self._temp_data_dir = tempfile.mkdtemp(dir=pyauto_paths.GetSourceDir()) | 149 self._temp_data_dir = tempfile.mkdtemp(dir=pyauto_paths.GetSourceDir()) |
233 relative_temp_data_dir = os.path.basename(self._temp_data_dir) | 150 relative_temp_data_dir = os.path.basename(self._temp_data_dir) |
234 self._http_server = self.StartHTTPServer(relative_temp_data_dir) | 151 self._http_server = self.StartHTTPServer(relative_temp_data_dir) |
235 | 152 |
236 # Setup empty policies, so that the TestServer can start replying. | 153 # Set up an empty user policy so that the TestServer can start replying. |
237 self._SetCloudPolicies() | 154 self._SetUserPolicyChromeOS() |
238 | 155 |
239 device_dmtoken = self._RegisterAndGetDMToken(device=True) | 156 # Generate a key pair for signing device policy. |
240 policy = self._FetchPolicy(token=device_dmtoken, device=True) | 157 self._private_key = tlslite.api.generateRSAKey(1024) |
241 user_dmtoken = self._RegisterAndGetDMToken(device=False) | 158 algorithm = asn1der.Sequence( |
159 [ asn1der.Data(asn1der.OBJECT_IDENTIFIER, PKCS1_RSA_OID), | |
160 asn1der.Data(asn1der.NULL, '') ]) | |
frankf
2012/04/04 01:44:01
No spaces around brackets.
bartfab (slow)
2012/04/04 09:24:14
Done.
| |
161 rsa_pubkey = asn1der.Sequence([ asn1der.Integer(self._private_key.n), | |
162 asn1der.Integer(self._private_key.e) ]) | |
163 self._public_key = asn1der.Sequence( | |
164 [ algorithm, asn1der.Bitstring(rsa_pubkey) ]) | |
242 | 165 |
243 # The device policy blob is only picked up by the session manager on | 166 # Clear device policy. This also invokes pyauto.PyUITest.setUp(self). |
244 # startup, and is overwritten on shutdown. So the blob has to be written | 167 self.SetDevicePolicy() |
245 # while the session manager is stopped. | |
246 self.WaitForSessionManagerRestart( | |
247 lambda: self._WriteDevicePolicyWithSessionManagerStopped(policy)) | |
248 logging.debug('Session manager restarted with device policy ready') | |
249 | 168 |
250 pyauto.PyUITest.setUp(self) | 169 # Remove any existing vaults. |
251 | 170 self.RemoveAllCryptohomeVaultsOnChromeOS() |
252 if self.IsChromeOS(): | 171 else: |
253 logging.debug('Logging in') | 172 pyauto.PyUITest.setUp(self) |
254 credentials = self.GetPrivateInfo()['prod_enterprise_test_user'] | |
255 self.Login(credentials['username'], credentials['password']) | |
256 assert self.GetLoginInfo()['is_logged_in'] | |
257 | |
258 self._WriteUserPolicyToken(user_dmtoken) | |
259 # The browser has to be reloaded to make the user policy token cache | |
260 # reload the file just written. The file can also be written only after | |
261 # the cryptohome is mounted, after login. | |
262 self.RestartBrowser(clear_profile=False) | |
263 | 173 |
264 def tearDown(self): | 174 def tearDown(self): |
265 """Cleans up the files created by setUp and policies added in tests.""" | 175 """Cleans up the policies and related files created in tests.""" |
266 # Clear the policies. | |
267 self.SetPolicies() | |
268 | |
269 if self.IsChromeOS(): | 176 if self.IsChromeOS(): |
frankf
2012/04/04 01:44:01
Add comment on why you need this.
bartfab (slow)
2012/04/04 09:24:14
Done.
| |
270 pyauto.PyUITest.Logout(self) | 177 self.SetDevicePolicy() |
178 else: | |
179 self.SetUserPolicy() | |
271 | 180 |
272 pyauto.PyUITest.tearDown(self) | 181 pyauto.PyUITest.tearDown(self) |
273 | 182 |
274 if self.IsChromeOS(): | 183 if self.IsChromeOS(): |
275 self.StopHTTPServer(self._http_server) | 184 self.StopHTTPServer(self._http_server) |
276 pyauto_utils.RemovePath(self._temp_data_dir) | 185 pyauto_utils.RemovePath(self._temp_data_dir) |
186 self.RemoveAllCryptohomeVaultsOnChromeOS() | |
277 | 187 |
278 def _SetCloudPolicies(self, user_mandatory=None, user_recommended=None, | 188 def LoginWithTestAccount(self, account='prod_enterprise_test_user'): |
279 device=None): | 189 """Convenience method for logging in with one of the test accounts.""" |
280 """Exports the policies to the configuration file of the TestServer. | 190 assert self.IsChromeOS() |
191 credentials = self.GetPrivateInfo()[account] | |
192 self.Login(credentials['username'], credentials['password']) | |
193 assert self.GetLoginInfo()['is_logged_in'] | |
281 | 194 |
282 The TestServer will serve these policies after this function returns. | 195 def SetPolicy(self): |
196 raise NotImplementedError('This class supports user and device policies. ' | |
197 'Instead of SetPolicies, use SetUserPolicy or ' | |
198 'SetDevicePolicy.') | |
283 | 199 |
284 Args: | 200 def SetUserPolicy(self, user_policy=None): |
285 user_mandatory: user policies of mandatory level | 201 """Sets the user policy provided as a dict. |
286 user_recommended: user policies of recommended level | 202 |
287 device: device policies | 203 Passing a value of None clears the user policy.""" |
288 """ | 204 if self.IsChromeOS(): |
205 self._SetUserPolicyChromeOS(user_policy=user_policy) | |
206 elif self.IsWin(): | |
207 self._SetUserPolicyWin(user_policy=user_policy) | |
208 elif self.IsLinux(): | |
209 self._SetUserPolicyLinux(user_policy=user_policy) | |
210 else: | |
211 raise NotImplementedError('Not available on this platform.') | |
212 | |
213 self.RefreshPolicies() | |
214 | |
215 def _SetUserPolicyChromeOS(self, user_policy=None): | |
216 """Writes the given user policy to the TestServer's input file.""" | |
289 assert self.IsChromeOS() | 217 assert self.IsChromeOS() |
290 policy_dict = { | 218 policy_dict = { |
291 'google/chromeos/device': device or {}, | 219 'google/chromeos/device': {}, |
292 'google/chromeos/user': { | 220 'google/chromeos/user': { |
293 'mandatory': user_mandatory or {}, | 221 'mandatory': user_policy or {}, |
294 'recommended': user_recommended or {}, | 222 'recommended': {}, |
295 }, | 223 }, |
296 'managed_users': ['*'], | 224 'managed_users': ['*'], |
297 } | 225 } |
298 self._WriteFile(self._GetTestServerPoliciesFilePath(), | 226 self._WriteFile(self._GetTestServerPoliciesFilePath(), |
299 json.dumps(policy_dict, sort_keys=True, indent=2) + '\n') | 227 json.dumps(policy_dict, sort_keys=True, indent=2) + '\n') |
300 | 228 |
301 def _SetPoliciesWin(self, user_policy=None): | 229 def _SetUserPolicyWin(self, user_policy=None): |
302 """Exports the policies as dictionary in the argument to Window registry. | 230 """Writes the given user policy to the Windows registry.""" |
303 | |
304 Removes the registry key and its subkeys if they exist. | |
305 | |
306 Args: | |
307 user_policy: A dictionary representing the user policies. Clear the | |
308 registry if None. | |
309 | |
310 Raises: | |
311 TypeError: If an unexpected value is found in the policy dictionary. | |
312 """ | |
313 | |
314 def SetValueEx(key, sub_key, value): | 231 def SetValueEx(key, sub_key, value): |
315 if isinstance(value, int): | 232 if isinstance(value, int): |
316 winreg.SetValueEx(key, sub_key, 0, winreg.REG_DWORD, int(value)) | 233 winreg.SetValueEx(key, sub_key, 0, winreg.REG_DWORD, int(value)) |
317 elif isinstance(value, basestring): | 234 elif isinstance(value, basestring): |
318 winreg.SetValueEx(key, sub_key, 0, winreg.REG_SZ, value.encode('ascii')) | 235 winreg.SetValueEx(key, sub_key, 0, winreg.REG_SZ, value.encode('ascii')) |
319 elif isinstance(value, list): | 236 elif isinstance(value, list): |
320 k = winreg.CreateKey(key, sub_key) | 237 k = winreg.CreateKey(key, sub_key) |
321 for index, v in list(enumerate(value)): | 238 for index, v in list(enumerate(value)): |
322 SetValueEx(k, str(index + 1), v) | 239 SetValueEx(k, str(index + 1), v) |
323 winreg.CloseKey(k) | 240 winreg.CloseKey(k) |
(...skipping 10 matching lines...) Expand all Loading... | |
334 r'reg query HKEY_LOCAL_MACHINE\%s' % reg_base) == 0: | 251 r'reg query HKEY_LOCAL_MACHINE\%s' % reg_base) == 0: |
335 logging.debug(r'Removing %s' % reg_base) | 252 logging.debug(r'Removing %s' % reg_base) |
336 subprocess.call(r'reg delete HKLM\%s /f' % reg_base) | 253 subprocess.call(r'reg delete HKLM\%s /f' % reg_base) |
337 | 254 |
338 if user_policy is not None: | 255 if user_policy is not None: |
339 root_key = winreg.CreateKey(winreg.HKEY_LOCAL_MACHINE, reg_base) | 256 root_key = winreg.CreateKey(winreg.HKEY_LOCAL_MACHINE, reg_base) |
340 for k, v in user_policy.iteritems(): | 257 for k, v in user_policy.iteritems(): |
341 SetValueEx(root_key, k, v) | 258 SetValueEx(root_key, k, v) |
342 winreg.CloseKey(root_key) | 259 winreg.CloseKey(root_key) |
343 | 260 |
344 def _SetPoliciesLinux(self, user_policy=None): | 261 def _SetUserPolicyLinux(self, user_policy=None): |
345 """Exports the policies as dictionary in the argument to a JSON file. | 262 """Writes the given user policy to the JSON policy file read by Chrome.""" |
346 | |
347 Removes the JSON file if it exists. | |
348 | |
349 Args: | |
350 user_policy: A dictionary representing the user policies. Remove the | |
351 JSON file if None | |
352 """ | |
353 assert self.IsLinux() | 263 assert self.IsLinux() |
354 sudo_cmd_file = os.path.join(os.path.dirname(__file__), | 264 sudo_cmd_file = os.path.join(os.path.dirname(__file__), |
355 'policy_linux_util.py') | 265 'policy_linux_util.py') |
356 | 266 |
357 if self.GetBrowserInfo()['properties']['branding'] == 'Google Chrome': | 267 if self.GetBrowserInfo()['properties']['branding'] == 'Google Chrome': |
358 policies_location_base = '/etc/opt/chrome' | 268 policies_location_base = '/etc/opt/chrome' |
359 else: | 269 else: |
360 policies_location_base = '/etc/chromium' | 270 policies_location_base = '/etc/chromium' |
361 | 271 |
362 if os.path.isdir(policies_location_base): | 272 if os.path.isdir(policies_location_base): |
363 logging.debug('Removing directory %s' % policies_location_base) | 273 logging.debug('Removing directory %s' % policies_location_base) |
364 subprocess.call(['suid-python', sudo_cmd_file, | 274 subprocess.call(['suid-python', sudo_cmd_file, |
365 'remove_dir', policies_location_base]) | 275 'remove_dir', policies_location_base]) |
366 | 276 |
367 if user_policy is not None: | 277 if user_policy is not None: |
368 self._WriteFile('/tmp/chrome.json', | 278 self._WriteFile('/tmp/chrome.json', |
369 json.dumps(user_policy, sort_keys=True, indent=2) + '\n') | 279 json.dumps(user_policy, sort_keys=True, indent=2) + '\n') |
370 | 280 |
371 policies_location = '%s/policies/managed' % policies_location_base | 281 policies_location = '%s/policies/managed' % policies_location_base |
372 subprocess.call(['suid-python', sudo_cmd_file, | 282 subprocess.call(['suid-python', sudo_cmd_file, |
373 'setup_dir', policies_location]) | 283 'setup_dir', policies_location]) |
374 # Copy chrome.json file to the managed directory | 284 # Copy chrome.json file to the managed directory |
375 subprocess.call(['suid-python', sudo_cmd_file, | 285 subprocess.call(['suid-python', sudo_cmd_file, |
376 'copy', '/tmp/chrome.json', policies_location]) | 286 'copy', '/tmp/chrome.json', policies_location]) |
377 os.remove('/tmp/chrome.json') | 287 os.remove('/tmp/chrome.json') |
378 | 288 |
379 def SetPolicies(self, user_policy=None, device_policy=None): | 289 def SetDevicePolicy(self, device_policy=None, owner=None): |
380 """Enforces the policies given in the arguments as dictionaries. | 290 """Sets the device policy provided as a dict and the owner on ChromeOS. |
381 | 291 |
382 These policies will have been installed after this call returns. | 292 Passing a value of None as the device policy clears it.""" |
293 if not self.IsChromeOS(): | |
294 raise NotImplementedError('Device policy is only available on ChromeOS.') | |
383 | 295 |
384 Args: | 296 self._GenerateDevicePolicyBlob(device_policy, owner) |
385 user_policy: A dictionary representing the user policies. | 297 self._RefreshDevicePolicy() |
386 device_policy: A dictionary representing the device policies on Chrome OS. | |
387 | 298 |
388 Raises: | 299 def _SetProtobufMessageField(self, group_message, field, field_value): |
389 NotImplementedError if the platform is not supported. | 300 """Sets the given field in a protobuf to the given value.""" |
390 """ | 301 if field.label == field.LABEL_REPEATED: |
391 if self.IsChromeOS(): | 302 assert type(field_value) == list |
392 self._SetCloudPolicies(user_mandatory=user_policy, device=device_policy) | 303 entries = group_message.__getattribute__(field.name) |
304 for list_item in field_value: | |
305 entries.append(list_item) | |
306 return | |
307 elif field.type == field.TYPE_BOOL: | |
308 assert type(field_value) == bool | |
309 elif field.type == field.TYPE_STRING: | |
310 assert type(field_value) == str or type(field_value) == unicode | |
311 elif field.type == field.TYPE_INT64: | |
312 assert type(field_value) == int | |
313 elif (field.type == field.TYPE_MESSAGE and | |
314 field.message_type.name == 'StringList'): | |
315 assert type(field_value) == list | |
316 entries = group_message.__getattribute__(field.name).entries | |
317 for list_item in field_value: | |
318 entries.append(list_item) | |
319 return | |
393 else: | 320 else: |
394 if device_policy is not None: | 321 raise Exception('Unknown field type %s' % field.type) |
395 raise NotImplementedError('Device policy is only available on ChromeOS') | 322 group_message.__setattr__(field.name, field_value) |
396 if self.IsWin(): | |
397 self._SetPoliciesWin(user_policy=user_policy) | |
398 elif self.IsLinux(): | |
399 self._SetPoliciesLinux(user_policy=user_policy) | |
400 else: | |
401 raise NotImplementedError('Not available on this platform.') | |
402 | 323 |
403 self.RefreshPolicies() | 324 def _GenerateDevicePolicyBlob(self, device_policy=None, owner=None): |
frankf
2012/04/04 01:44:01
Move this before SetDevicePolicy. It's a good idea
bartfab (slow)
2012/04/04 09:24:14
Done. I reordered all functions so that they are d
| |
325 """Generates a signed device policy blob.""" | |
404 | 326 |
327 # Fill in the device settings protobuf. | |
328 device_policy = device_policy or {} | |
329 owner = owner or constants.CREDENTIALS['$mockowner'][0] | |
330 settings = dp.ChromeDeviceSettingsProto() | |
331 for group in settings.DESCRIPTOR.fields: | |
332 # Create protobuf message for group. | |
333 group_message = eval('dp.' + group.message_type.name + '()') | |
334 # Indicates if at least one field was set in |group_message|. | |
335 got_fields = False | |
336 # Iterate over fields of the message and feed them from the policy dict. | |
337 for field in group_message.DESCRIPTOR.fields: | |
338 field_value = None | |
339 if field.name in device_policy: | |
340 got_fields = True | |
341 field_value = device_policy[field.name] | |
342 self._SetProtobufMessageField(group_message, field, field_value) | |
343 if got_fields: | |
344 settings.__getattribute__(group.name).CopyFrom(group_message) | |
345 | |
346 # Fill in the policy data protobuf. | |
347 policy_data = dm.PolicyData() | |
348 policy_data.policy_type = 'google/chromeos/device' | |
349 policy_data.policy_value = settings.SerializeToString() | |
350 policy_data.username = owner or '' | |
frankf
2012/04/04 01:44:01
don't need 'or'.
bartfab (slow)
2012/04/04 09:24:14
Done.
| |
351 serialized_policy_data = policy_data.SerializeToString() | |
352 | |
353 # Fill in the device management response protobuf. | |
354 response = dm.DeviceManagementResponse() | |
355 fetch_response = response.policy_response.response.add() | |
356 fetch_response.policy_data = serialized_policy_data | |
357 fetch_response.policy_data_signature = ( | |
358 self._private_key.hashAndSign(serialized_policy_data).tostring()) | |
359 | |
360 self._device_policy_blob = fetch_response.SerializeToString() | |
361 | |
362 def _RefreshDevicePolicy(self): | |
363 """Refreshes the device policy in force on ChromeOS.""" | |
364 assert self.IsChromeOS() | |
365 # The device policy blob is only picked up by the session manager on | |
366 # startup, and is overwritten on shutdown. So the blob has to be written | |
367 # while the session manager is stopped. | |
368 self.WaitForSessionManagerRestart( | |
369 lambda: self._WriteDevicePolicyWithSessionManagerStopped()) | |
370 logging.debug('Session manager restarted with device policy ready') | |
371 pyauto.PyUITest.setUp(self) | |
OLD | NEW |