Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(198)

Side by Side Diff: chrome/test/pyautolib/policy_base.py

Issue 9791023: Allow setting of user and device policies in functional tests (Closed) Base URL: http://git.chromium.org/chromium/src.git@master
Patch Set: Tested and ready for review. Created 8 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « chrome/test/pyautolib/asn1der.py ('k') | chrome/test/pyautolib/pyauto.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 #!/usr/bin/python 1 #!/usr/bin/python
2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 # Copyright (c) 2012 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be 3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file. 4 # found in the LICENSE file.
5 5
6 """Base class for tests that need to update the policies enforced by Chrome. 6 """Base class for tests that need to update the policies enforced by Chrome.
7 7
8 Subclasses can call SetPolicies with a dictionary of policies to install. 8 Subclasses can call SetUserPolicy (ChromeOS, Linux, Windows) and
9 SetPolicies can also be used to set the device policies on ChromeOS. 9 SetDevicePolicy (ChromeOS only) with a dictionary of the policies to install.
10 10
11 The current implementation depends on the platform. The implementations might 11 The current implementation depends on the platform. The implementations might
12 change in the future, but tests relying on SetPolicies will keep working. 12 change in the future, but tests relying on the above calls will keep working.
13 """ 13 """
14 14
15 # On ChromeOS this relies on the device_management.py part of the TestServer, 15 # On ChromeOS, both user and device policies are supported. Chrome is set up to
16 # and forces the policies by triggering a new policy fetch and refreshing the 16 # fetch user policy from a TestServer. A call to SetUserPolicy triggers an
17 # cloud policy providers. This requires preparing the system for policy 17 # immediate policy refresh, allowing the effects of policy changes on a running
18 # fetching, which currently means the DMTokens have to be in place. Without the 18 # session to be tested.
19 # DMTokens, the cloud policy controller won't be able to proceed, because the
20 # Gaia tokens for the DMService aren't available during tests.
21 # In the future this setup might not be necessary anymore, and the policy
22 # might also be pushed through the session_manager.
23 # 19 #
24 # On other platforms it relies on the SetPolicies automation call, which is 20 # Device policy is injected by stopping Chrome and the session manager, writing
25 # only available on non-official builds. This automation call is meant to be 21 # a new device policy blob and starting Chrome and the session manager again.
frankf 2012/04/04 01:44:01 Mention that this will log you out. The limitation
bartfab (slow) 2012/04/04 09:24:14 Done.
26 # eventually removed when a replacement for every platform is available. 22 # This is necessary because Chrome will only fetch device policy from a
27 # This requires setting up the policies in the registry on Windows, and writing 23 # TestServer if the device is enterprise owned. Enterprise ownership, in turn,
24 # requires ownership of the TPM which can only be undone by reboothing the
25 # device (and hence is not possible in a client test).
28 # the right files on Linux and Mac. 26 # the right files on Linux and Mac.
frankf 2012/04/04 01:44:01 delete this line.
bartfab (slow) 2012/04/04 09:24:14 Done.
29 27
30 import json 28 import json
31 import logging 29 import logging
32 import os 30 import os
33 import subprocess 31 import subprocess
34 import tempfile 32 import tempfile
35 import urllib
36 import urllib2
37 33
34 import asn1der
38 import pyauto 35 import pyauto
39 import pyauto_paths 36 import pyauto_paths
40 import pyauto_utils 37 import pyauto_utils
41 38
42 39
43 if pyauto.PyUITest.IsChromeOS(): 40 if pyauto.PyUITest.IsChromeOS():
44 import sys 41 import sys
42 import warnings
43 # Ignore deprecation warnings, they make our output more cluttered.
44 warnings.filterwarnings("ignore", category=DeprecationWarning)
45
45 # Find the path to the pyproto and add it to sys.path. 46 # Find the path to the pyproto and add it to sys.path.
46 # Prepend it so that google.protobuf is loaded from here. 47 # Prepend it so that google.protobuf is loaded from here.
47 for path in pyauto_paths.GetBuildDirs(): 48 for path in pyauto_paths.GetBuildDirs():
48 p = os.path.join(path, 'pyproto') 49 p = os.path.join(path, 'pyproto')
49 if os.path.isdir(p): 50 if os.path.isdir(p):
50 sys.path = [p, os.path.join(p, 'chrome', 'browser', 'policy', 51 sys.path = [p, os.path.join(p, 'chrome', 'browser', 'policy',
51 'proto')] + sys.path 52 'proto')] + sys.path
52 break 53 break
53 sys.path.append('/usr/local') # to import autotest libs. 54 sys.path.append('/usr/local') # to import autotest libs.
54 import device_management_local_pb2 as dml 55 sys.path.append(os.path.join(pyauto_paths.GetThirdPartyDir(), 'tlslite'))
55 import device_management_backend_pb2 as dmb 56
57 import chrome_device_policy_pb2 as dp
58 import device_management_backend_pb2 as dm
59 import tlslite.api
56 from autotest.cros import constants 60 from autotest.cros import constants
57 from autotest.cros import cros_ui 61 from autotest.cros import cros_ui
58 elif pyauto.PyUITest.IsWin(): 62 elif pyauto.PyUITest.IsWin():
59 import _winreg as winreg 63 import _winreg as winreg
60 64
65 # ASN.1 object identifier for PKCS#1/RSA.
66 PKCS1_RSA_OID = '\x2a\x86\x48\x86\xf7\x0d\x01\x01\x01'
67
61 68
62 class PolicyTestBase(pyauto.PyUITest): 69 class PolicyTestBase(pyauto.PyUITest):
63 """A base class for tests that need to set up and modify policies. 70 """A base class for tests that need to set up and modify policies.
64 71
65 Subclasses can use the SetPolicies call to set the policies seen by Chrome. 72 Subclasses can use the methods SetUserPolicy (ChromeOS, Linux, Windows) and
73 SetDevicePolicy (ChromeOS only) to set the policies seen by Chrome.
66 """ 74 """
67 75
68 def _WriteFile(self, path, content): 76 def _WriteFile(self, path, content):
69 """Writes content to path, creating any intermediary directories.""" 77 """Writes content to path, creating any intermediary directories."""
70 if not os.path.exists(os.path.dirname(path)): 78 if not os.path.exists(os.path.dirname(path)):
71 os.makedirs(os.path.dirname(path)) 79 os.makedirs(os.path.dirname(path))
72 f = open(path, 'w') 80 f = open(path, 'w')
73 f.write(content) 81 f.write(content)
74 f.close() 82 f.close()
75 83
76 def _GetTestServerPoliciesFilePath(self): 84 def _GetTestServerPoliciesFilePath(self):
77 """Returns the path of the cloud policy configuration file.""" 85 """Returns the path of the cloud policy configuration file."""
78 assert self.IsChromeOS() 86 assert self.IsChromeOS()
79 return os.path.join(self._temp_data_dir, 'device_management') 87 return os.path.join(self._temp_data_dir, 'device_management')
80 88
81 def _GetHttpURLForDeviceManagement(self): 89 def _GetHttpURLForDeviceManagement(self):
90 """Returns the URL at which the TestServer is serving user policy."""
82 assert self.IsChromeOS() 91 assert self.IsChromeOS()
83 return self._http_server.GetURL('device_management').spec() 92 return self._http_server.GetURL('device_management').spec()
84 93
85 def _WriteUserPolicyToken(self, token): 94 def _WriteDevicePolicyWithSessionManagerStopped(self):
86 """Writes the given token to the user device management cache.""" 95 """Writes the device policy blob while the session manager is stopped.
87 assert self.IsChromeOS()
88 blob = dml.DeviceCredentials()
89 blob.device_token = token
90 blob.device_id = '123'
91 self._WriteFile('/home/chronos/user/Device Management/Token',
92 blob.SerializeToString())
93 96
94 def _WriteDevicePolicy(self, fetch_response): 97 Updates the files holding the device policy blob and the public key need to
95 """Writes the given signed fetch_response to the device policy cache. 98 verify its signature.
96
97 Also writes the owner key, used to verify the signature.
98 """ 99 """
99 assert self.IsChromeOS() 100 assert self.IsChromeOS()
100 self._WriteFile(constants.SIGNED_POLICY_FILE,
101 fetch_response.SerializeToString())
102 self._WriteFile(constants.OWNER_KEY_FILE,
103 fetch_response.new_public_key)
104
105 def _PostToDMServer(self, request_type, body, headers):
106 """Posts a request to the TestServer's Device Management interface.
107
108 |request_type| is the value of the 'request' HTTP parameter.
109 Returns the response's body.
110 """
111 assert self.IsChromeOS()
112 url = self._GetHttpURLForDeviceManagement()
113 url += '?' + urllib.urlencode({
114 'deviceid': '123',
115 'oauth_token': '456',
116 'request': request_type,
117 'devicetype': 2,
118 'apptype': 'Chrome',
119 'agent': 'Chrome',
120 })
121 return urllib2.urlopen(urllib2.Request(url, body, headers)).read()
122
123 def _PostRegisterRequest(self, type):
124 """Sends a device register request to the TestServer, of the given type."""
125 assert self.IsChromeOS()
126 request = dmb.DeviceManagementRequest()
127 register = request.register_request
128 register.machine_id = '789'
129 register.type = type
130 return self._PostToDMServer('register', request.SerializeToString(), {})
131
132 def _RegisterAndGetDMToken(self, device):
133 """Registers with the TestServer and returns the DMToken fetched.
134
135 Registers for device policy if device is True. Otherwise registers for
136 user policy.
137 """
138 assert self.IsChromeOS()
139 type = device and dmb.DeviceRegisterRequest.DEVICE \
140 or dmb.DeviceRegisterRequest.USER
141 rstring = self._PostRegisterRequest(type)
142 response = dmb.DeviceManagementResponse()
143 response.ParseFromString(rstring)
144 return response.register_response.device_management_token
145
146 def _PostPolicyRequest(self, token, type, want_signature=False):
147 """Fetches policy from the TestServer, using the given token.
148
149 Policy is fetched for the given type. If want_signature is True, the
150 request will ask for a signed response. Returns the response body.
151 """
152 assert self.IsChromeOS()
153 request = dmb.DeviceManagementRequest()
154 policy = request.policy_request
155 prequest = policy.request.add()
156 prequest.policy_type = type
157 if want_signature:
158 prequest.signature_type = dmb.PolicyFetchRequest.SHA1_RSA
159 headers = {
160 'Authorization': 'GoogleDMToken token=' + token,
161 }
162 return self._PostToDMServer('policy', request.SerializeToString(), headers)
163
164 def _FetchPolicy(self, token, device):
165 """Fetches policy from the TestServer, using the given token.
166
167 Token must be a valid token retrieved with _RegisterAndGetDMToken. If
168 device is True, fetches signed device policy. Otherwise fetches user policy.
169 This method also verifies the response, and returns the first policy fetch
170 response.
171 """
172 assert self.IsChromeOS()
173 type = device and 'google/chromeos/device' or 'google/chromeos/user'
174 rstring = self._PostPolicyRequest(token=token, type=type,
175 want_signature=device)
176 response = dmb.DeviceManagementResponse()
177 response.ParseFromString(rstring)
178 fetch_response = response.policy_response.response[0]
179 assert fetch_response.policy_data
180 assert fetch_response.policy_data_signature
181 assert fetch_response.new_public_key
182 return fetch_response
183
184 def _WriteDevicePolicyWithSessionManagerStopped(self, policy):
185 """Writes the device policy blob while the Session Manager is stopped."""
186 assert self.IsChromeOS()
187 logging.debug('Stopping session manager') 101 logging.debug('Stopping session manager')
188 cros_ui.stop(allow_fail=True) 102 cros_ui.stop(allow_fail=True)
189 logging.debug('Writing device policy cache') 103 logging.debug('Writing device policy blob')
190 self._WriteDevicePolicy(policy) 104 self._WriteFile(constants.SIGNED_POLICY_FILE, self._device_policy_blob)
105 self._WriteFile(constants.OWNER_KEY_FILE, self._public_key)
191 106
192 # Ugly hack: session manager won't spawn chrome if this file exists. That's 107 # Ugly hack: session manager will not spawn Chrome if this file exists. That
193 # usually a good thing (to keep the automation channel open), but in this 108 # is usually a good thing (to keep the automation channel open), but in this
194 # case we really want to restart chrome. PyUITest.setUp() will be called 109 # case we really want to restart chrome. PyUITest.setUp() will be called
195 # after session manager and chrome have restarted, and will setup the 110 # after session manager and chrome have restarted, and will setup the
196 # automation channel. 111 # automation channel.
197 restore_magic_file = False 112 restore_magic_file = False
198 if os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE): 113 if os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE):
199 logging.debug('DISABLE_BROWSER_RESTART_MAGIC_FILE found. ' 114 logging.debug('DISABLE_BROWSER_RESTART_MAGIC_FILE found. '
200 'Removing temporarily for the next restart.') 115 'Removing temporarily for the next restart.')
201 restore_magic_file = True 116 restore_magic_file = True
202 os.remove(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE) 117 os.remove(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE)
203 assert not os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE) 118 assert not os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE)
204 119
205 logging.debug('Starting session manager again') 120 logging.debug('Starting session manager again')
206 cros_ui.start() 121 cros_ui.start()
207 122
208 # cros_ui.start() waits for the login prompt to be visible, so chrome has 123 # cros_ui.start() waits for the login prompt to be visible, so Chrome has
209 # already started once it returns. 124 # already started once it returns.
210 if restore_magic_file: 125 if restore_magic_file:
211 open(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE, 'w').close() 126 open(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE, 'w').close()
212 assert os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE) 127 assert os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE)
213 128
214 def ExtraChromeFlags(self): 129 def ExtraChromeFlags(self):
215 """Sets up Chrome to use cloud policies on ChromeOS.""" 130 """Sets up Chrome to use cloud policies on ChromeOS."""
216 flags = pyauto.PyUITest.ExtraChromeFlags(self) 131 flags = pyauto.PyUITest.ExtraChromeFlags(self)
217 if self.IsChromeOS(): 132 if self.IsChromeOS():
133 while '--skip-oauth-login' in flags:
134 flags.remove('--skip-oauth-login')
frankf 2012/04/04 01:44:01 why do we need oauth-login?
bartfab (slow) 2012/04/04 09:24:14 An OAuth token is required for Chrome to register
frankf 2012/04/04 21:44:10 Previously, we explicitly registered with the DMSe
bartfab (slow) 2012/04/05 08:24:45 We can make things hermetic just as easily. The mo
218 url = self._GetHttpURLForDeviceManagement() 135 url = self._GetHttpURLForDeviceManagement()
219 flag = '--device-management-url=' + url 136 flags.append('--device-management-url=' + url)
220 flags += [flag] 137 flags.append('--disable-sync')
221 return flags 138 return flags
222 139
223 def setUp(self): 140 def setUp(self):
224 """Sets up the platform for policy testing. 141 """Sets up the platform for policy testing.
225 142
226 On ChromeOS, part of the set up involves restarting the session_manager and 143 On ChromeOS, part of the setup involves restarting the session manager to
227 logging in with the $default account. 144 inject a device policy blob.
228 """ 145 """
229 if self.IsChromeOS(): 146 if self.IsChromeOS():
230 # Setup a temporary data dir and a TestServer serving files from there. 147 # Set up a temporary data dir and a TestServer serving files from there.
231 # The TestServer makes its document root relative to the src dir. 148 # The TestServer makes its document root relative to the src dir.
232 self._temp_data_dir = tempfile.mkdtemp(dir=pyauto_paths.GetSourceDir()) 149 self._temp_data_dir = tempfile.mkdtemp(dir=pyauto_paths.GetSourceDir())
233 relative_temp_data_dir = os.path.basename(self._temp_data_dir) 150 relative_temp_data_dir = os.path.basename(self._temp_data_dir)
234 self._http_server = self.StartHTTPServer(relative_temp_data_dir) 151 self._http_server = self.StartHTTPServer(relative_temp_data_dir)
235 152
236 # Setup empty policies, so that the TestServer can start replying. 153 # Set up an empty user policy so that the TestServer can start replying.
237 self._SetCloudPolicies() 154 self._SetUserPolicyChromeOS()
238 155
239 device_dmtoken = self._RegisterAndGetDMToken(device=True) 156 # Generate a key pair for signing device policy.
240 policy = self._FetchPolicy(token=device_dmtoken, device=True) 157 self._private_key = tlslite.api.generateRSAKey(1024)
241 user_dmtoken = self._RegisterAndGetDMToken(device=False) 158 algorithm = asn1der.Sequence(
159 [ asn1der.Data(asn1der.OBJECT_IDENTIFIER, PKCS1_RSA_OID),
160 asn1der.Data(asn1der.NULL, '') ])
frankf 2012/04/04 01:44:01 No spaces around brackets.
bartfab (slow) 2012/04/04 09:24:14 Done.
161 rsa_pubkey = asn1der.Sequence([ asn1der.Integer(self._private_key.n),
162 asn1der.Integer(self._private_key.e) ])
163 self._public_key = asn1der.Sequence(
164 [ algorithm, asn1der.Bitstring(rsa_pubkey) ])
242 165
243 # The device policy blob is only picked up by the session manager on 166 # Clear device policy. This also invokes pyauto.PyUITest.setUp(self).
244 # startup, and is overwritten on shutdown. So the blob has to be written 167 self.SetDevicePolicy()
245 # while the session manager is stopped.
246 self.WaitForSessionManagerRestart(
247 lambda: self._WriteDevicePolicyWithSessionManagerStopped(policy))
248 logging.debug('Session manager restarted with device policy ready')
249 168
250 pyauto.PyUITest.setUp(self) 169 # Remove any existing vaults.
251 170 self.RemoveAllCryptohomeVaultsOnChromeOS()
252 if self.IsChromeOS(): 171 else:
253 logging.debug('Logging in') 172 pyauto.PyUITest.setUp(self)
254 credentials = self.GetPrivateInfo()['prod_enterprise_test_user']
255 self.Login(credentials['username'], credentials['password'])
256 assert self.GetLoginInfo()['is_logged_in']
257
258 self._WriteUserPolicyToken(user_dmtoken)
259 # The browser has to be reloaded to make the user policy token cache
260 # reload the file just written. The file can also be written only after
261 # the cryptohome is mounted, after login.
262 self.RestartBrowser(clear_profile=False)
263 173
264 def tearDown(self): 174 def tearDown(self):
265 """Cleans up the files created by setUp and policies added in tests.""" 175 """Cleans up the policies and related files created in tests."""
266 # Clear the policies.
267 self.SetPolicies()
268
269 if self.IsChromeOS(): 176 if self.IsChromeOS():
frankf 2012/04/04 01:44:01 Add comment on why you need this.
bartfab (slow) 2012/04/04 09:24:14 Done.
270 pyauto.PyUITest.Logout(self) 177 self.SetDevicePolicy()
178 else:
179 self.SetUserPolicy()
271 180
272 pyauto.PyUITest.tearDown(self) 181 pyauto.PyUITest.tearDown(self)
273 182
274 if self.IsChromeOS(): 183 if self.IsChromeOS():
275 self.StopHTTPServer(self._http_server) 184 self.StopHTTPServer(self._http_server)
276 pyauto_utils.RemovePath(self._temp_data_dir) 185 pyauto_utils.RemovePath(self._temp_data_dir)
186 self.RemoveAllCryptohomeVaultsOnChromeOS()
277 187
278 def _SetCloudPolicies(self, user_mandatory=None, user_recommended=None, 188 def LoginWithTestAccount(self, account='prod_enterprise_test_user'):
279 device=None): 189 """Convenience method for logging in with one of the test accounts."""
280 """Exports the policies to the configuration file of the TestServer. 190 assert self.IsChromeOS()
191 credentials = self.GetPrivateInfo()[account]
192 self.Login(credentials['username'], credentials['password'])
193 assert self.GetLoginInfo()['is_logged_in']
281 194
282 The TestServer will serve these policies after this function returns. 195 def SetPolicy(self):
196 raise NotImplementedError('This class supports user and device policies. '
197 'Instead of SetPolicies, use SetUserPolicy or '
198 'SetDevicePolicy.')
283 199
284 Args: 200 def SetUserPolicy(self, user_policy=None):
285 user_mandatory: user policies of mandatory level 201 """Sets the user policy provided as a dict.
286 user_recommended: user policies of recommended level 202
287 device: device policies 203 Passing a value of None clears the user policy."""
288 """ 204 if self.IsChromeOS():
205 self._SetUserPolicyChromeOS(user_policy=user_policy)
206 elif self.IsWin():
207 self._SetUserPolicyWin(user_policy=user_policy)
208 elif self.IsLinux():
209 self._SetUserPolicyLinux(user_policy=user_policy)
210 else:
211 raise NotImplementedError('Not available on this platform.')
212
213 self.RefreshPolicies()
214
215 def _SetUserPolicyChromeOS(self, user_policy=None):
216 """Writes the given user policy to the TestServer's input file."""
289 assert self.IsChromeOS() 217 assert self.IsChromeOS()
290 policy_dict = { 218 policy_dict = {
291 'google/chromeos/device': device or {}, 219 'google/chromeos/device': {},
292 'google/chromeos/user': { 220 'google/chromeos/user': {
293 'mandatory': user_mandatory or {}, 221 'mandatory': user_policy or {},
294 'recommended': user_recommended or {}, 222 'recommended': {},
295 }, 223 },
296 'managed_users': ['*'], 224 'managed_users': ['*'],
297 } 225 }
298 self._WriteFile(self._GetTestServerPoliciesFilePath(), 226 self._WriteFile(self._GetTestServerPoliciesFilePath(),
299 json.dumps(policy_dict, sort_keys=True, indent=2) + '\n') 227 json.dumps(policy_dict, sort_keys=True, indent=2) + '\n')
300 228
301 def _SetPoliciesWin(self, user_policy=None): 229 def _SetUserPolicyWin(self, user_policy=None):
302 """Exports the policies as dictionary in the argument to Window registry. 230 """Writes the given user policy to the Windows registry."""
303
304 Removes the registry key and its subkeys if they exist.
305
306 Args:
307 user_policy: A dictionary representing the user policies. Clear the
308 registry if None.
309
310 Raises:
311 TypeError: If an unexpected value is found in the policy dictionary.
312 """
313
314 def SetValueEx(key, sub_key, value): 231 def SetValueEx(key, sub_key, value):
315 if isinstance(value, int): 232 if isinstance(value, int):
316 winreg.SetValueEx(key, sub_key, 0, winreg.REG_DWORD, int(value)) 233 winreg.SetValueEx(key, sub_key, 0, winreg.REG_DWORD, int(value))
317 elif isinstance(value, basestring): 234 elif isinstance(value, basestring):
318 winreg.SetValueEx(key, sub_key, 0, winreg.REG_SZ, value.encode('ascii')) 235 winreg.SetValueEx(key, sub_key, 0, winreg.REG_SZ, value.encode('ascii'))
319 elif isinstance(value, list): 236 elif isinstance(value, list):
320 k = winreg.CreateKey(key, sub_key) 237 k = winreg.CreateKey(key, sub_key)
321 for index, v in list(enumerate(value)): 238 for index, v in list(enumerate(value)):
322 SetValueEx(k, str(index + 1), v) 239 SetValueEx(k, str(index + 1), v)
323 winreg.CloseKey(k) 240 winreg.CloseKey(k)
(...skipping 10 matching lines...) Expand all
334 r'reg query HKEY_LOCAL_MACHINE\%s' % reg_base) == 0: 251 r'reg query HKEY_LOCAL_MACHINE\%s' % reg_base) == 0:
335 logging.debug(r'Removing %s' % reg_base) 252 logging.debug(r'Removing %s' % reg_base)
336 subprocess.call(r'reg delete HKLM\%s /f' % reg_base) 253 subprocess.call(r'reg delete HKLM\%s /f' % reg_base)
337 254
338 if user_policy is not None: 255 if user_policy is not None:
339 root_key = winreg.CreateKey(winreg.HKEY_LOCAL_MACHINE, reg_base) 256 root_key = winreg.CreateKey(winreg.HKEY_LOCAL_MACHINE, reg_base)
340 for k, v in user_policy.iteritems(): 257 for k, v in user_policy.iteritems():
341 SetValueEx(root_key, k, v) 258 SetValueEx(root_key, k, v)
342 winreg.CloseKey(root_key) 259 winreg.CloseKey(root_key)
343 260
344 def _SetPoliciesLinux(self, user_policy=None): 261 def _SetUserPolicyLinux(self, user_policy=None):
345 """Exports the policies as dictionary in the argument to a JSON file. 262 """Writes the given user policy to the JSON policy file read by Chrome."""
346
347 Removes the JSON file if it exists.
348
349 Args:
350 user_policy: A dictionary representing the user policies. Remove the
351 JSON file if None
352 """
353 assert self.IsLinux() 263 assert self.IsLinux()
354 sudo_cmd_file = os.path.join(os.path.dirname(__file__), 264 sudo_cmd_file = os.path.join(os.path.dirname(__file__),
355 'policy_linux_util.py') 265 'policy_linux_util.py')
356 266
357 if self.GetBrowserInfo()['properties']['branding'] == 'Google Chrome': 267 if self.GetBrowserInfo()['properties']['branding'] == 'Google Chrome':
358 policies_location_base = '/etc/opt/chrome' 268 policies_location_base = '/etc/opt/chrome'
359 else: 269 else:
360 policies_location_base = '/etc/chromium' 270 policies_location_base = '/etc/chromium'
361 271
362 if os.path.isdir(policies_location_base): 272 if os.path.isdir(policies_location_base):
363 logging.debug('Removing directory %s' % policies_location_base) 273 logging.debug('Removing directory %s' % policies_location_base)
364 subprocess.call(['suid-python', sudo_cmd_file, 274 subprocess.call(['suid-python', sudo_cmd_file,
365 'remove_dir', policies_location_base]) 275 'remove_dir', policies_location_base])
366 276
367 if user_policy is not None: 277 if user_policy is not None:
368 self._WriteFile('/tmp/chrome.json', 278 self._WriteFile('/tmp/chrome.json',
369 json.dumps(user_policy, sort_keys=True, indent=2) + '\n') 279 json.dumps(user_policy, sort_keys=True, indent=2) + '\n')
370 280
371 policies_location = '%s/policies/managed' % policies_location_base 281 policies_location = '%s/policies/managed' % policies_location_base
372 subprocess.call(['suid-python', sudo_cmd_file, 282 subprocess.call(['suid-python', sudo_cmd_file,
373 'setup_dir', policies_location]) 283 'setup_dir', policies_location])
374 # Copy chrome.json file to the managed directory 284 # Copy chrome.json file to the managed directory
375 subprocess.call(['suid-python', sudo_cmd_file, 285 subprocess.call(['suid-python', sudo_cmd_file,
376 'copy', '/tmp/chrome.json', policies_location]) 286 'copy', '/tmp/chrome.json', policies_location])
377 os.remove('/tmp/chrome.json') 287 os.remove('/tmp/chrome.json')
378 288
379 def SetPolicies(self, user_policy=None, device_policy=None): 289 def SetDevicePolicy(self, device_policy=None, owner=None):
380 """Enforces the policies given in the arguments as dictionaries. 290 """Sets the device policy provided as a dict and the owner on ChromeOS.
381 291
382 These policies will have been installed after this call returns. 292 Passing a value of None as the device policy clears it."""
293 if not self.IsChromeOS():
294 raise NotImplementedError('Device policy is only available on ChromeOS.')
383 295
384 Args: 296 self._GenerateDevicePolicyBlob(device_policy, owner)
385 user_policy: A dictionary representing the user policies. 297 self._RefreshDevicePolicy()
386 device_policy: A dictionary representing the device policies on Chrome OS.
387 298
388 Raises: 299 def _SetProtobufMessageField(self, group_message, field, field_value):
389 NotImplementedError if the platform is not supported. 300 """Sets the given field in a protobuf to the given value."""
390 """ 301 if field.label == field.LABEL_REPEATED:
391 if self.IsChromeOS(): 302 assert type(field_value) == list
392 self._SetCloudPolicies(user_mandatory=user_policy, device=device_policy) 303 entries = group_message.__getattribute__(field.name)
304 for list_item in field_value:
305 entries.append(list_item)
306 return
307 elif field.type == field.TYPE_BOOL:
308 assert type(field_value) == bool
309 elif field.type == field.TYPE_STRING:
310 assert type(field_value) == str or type(field_value) == unicode
311 elif field.type == field.TYPE_INT64:
312 assert type(field_value) == int
313 elif (field.type == field.TYPE_MESSAGE and
314 field.message_type.name == 'StringList'):
315 assert type(field_value) == list
316 entries = group_message.__getattribute__(field.name).entries
317 for list_item in field_value:
318 entries.append(list_item)
319 return
393 else: 320 else:
394 if device_policy is not None: 321 raise Exception('Unknown field type %s' % field.type)
395 raise NotImplementedError('Device policy is only available on ChromeOS') 322 group_message.__setattr__(field.name, field_value)
396 if self.IsWin():
397 self._SetPoliciesWin(user_policy=user_policy)
398 elif self.IsLinux():
399 self._SetPoliciesLinux(user_policy=user_policy)
400 else:
401 raise NotImplementedError('Not available on this platform.')
402 323
403 self.RefreshPolicies() 324 def _GenerateDevicePolicyBlob(self, device_policy=None, owner=None):
frankf 2012/04/04 01:44:01 Move this before SetDevicePolicy. It's a good idea
bartfab (slow) 2012/04/04 09:24:14 Done. I reordered all functions so that they are d
325 """Generates a signed device policy blob."""
404 326
327 # Fill in the device settings protobuf.
328 device_policy = device_policy or {}
329 owner = owner or constants.CREDENTIALS['$mockowner'][0]
330 settings = dp.ChromeDeviceSettingsProto()
331 for group in settings.DESCRIPTOR.fields:
332 # Create protobuf message for group.
333 group_message = eval('dp.' + group.message_type.name + '()')
334 # Indicates if at least one field was set in |group_message|.
335 got_fields = False
336 # Iterate over fields of the message and feed them from the policy dict.
337 for field in group_message.DESCRIPTOR.fields:
338 field_value = None
339 if field.name in device_policy:
340 got_fields = True
341 field_value = device_policy[field.name]
342 self._SetProtobufMessageField(group_message, field, field_value)
343 if got_fields:
344 settings.__getattribute__(group.name).CopyFrom(group_message)
345
346 # Fill in the policy data protobuf.
347 policy_data = dm.PolicyData()
348 policy_data.policy_type = 'google/chromeos/device'
349 policy_data.policy_value = settings.SerializeToString()
350 policy_data.username = owner or ''
frankf 2012/04/04 01:44:01 don't need 'or'.
bartfab (slow) 2012/04/04 09:24:14 Done.
351 serialized_policy_data = policy_data.SerializeToString()
352
353 # Fill in the device management response protobuf.
354 response = dm.DeviceManagementResponse()
355 fetch_response = response.policy_response.response.add()
356 fetch_response.policy_data = serialized_policy_data
357 fetch_response.policy_data_signature = (
358 self._private_key.hashAndSign(serialized_policy_data).tostring())
359
360 self._device_policy_blob = fetch_response.SerializeToString()
361
362 def _RefreshDevicePolicy(self):
363 """Refreshes the device policy in force on ChromeOS."""
364 assert self.IsChromeOS()
365 # The device policy blob is only picked up by the session manager on
366 # startup, and is overwritten on shutdown. So the blob has to be written
367 # while the session manager is stopped.
368 self.WaitForSessionManagerRestart(
369 lambda: self._WriteDevicePolicyWithSessionManagerStopped())
370 logging.debug('Session manager restarted with device policy ready')
371 pyauto.PyUITest.setUp(self)
OLDNEW
« no previous file with comments | « chrome/test/pyautolib/asn1der.py ('k') | chrome/test/pyautolib/pyauto.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698