| OLD | NEW |
| 1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. | 1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 import common | 5 import common |
| 6 import constants | 6 import constants |
| 7 import cryptohome | 7 import cryptohome |
| 8 import dbus | 8 import dbus |
| 9 import logging | 9 import logging |
| 10 import login | 10 import login |
| 11 import os | 11 import os |
| 12 import tempfile | 12 import tempfile |
| 13 from autotest_lib.client.bin import utils | 13 from autotest_lib.client.bin import utils |
| 14 from autotest_lib.client.common_lib import autotemp, error | 14 from autotest_lib.client.common_lib import autotemp, error |
| 15 | 15 |
| 16 | 16 |
| 17 class OwnershipError(error.TestError): |
| 18 """Generic error for ownership-related failures.""" |
| 19 pass |
| 20 |
| 21 |
| 17 class scoped_tempfile(object): | 22 class scoped_tempfile(object): |
| 18 """A wrapper that provides scoped semantics for temporary files. | 23 """A wrapper that provides scoped semantics for temporary files. |
| 19 | 24 |
| 20 Providing a file path causes the scoped_tempfile to take ownership of the | 25 Providing a file path causes the scoped_tempfile to take ownership of the |
| 21 file at the provided path. The file at the path will be deleted when this | 26 file at the provided path. The file at the path will be deleted when this |
| 22 object goes out of scope. If no path is provided, then a temporary file | 27 object goes out of scope. If no path is provided, then a temporary file |
| 23 object will be created for the lifetime of the scoped_tempfile | 28 object will be created for the lifetime of the scoped_tempfile |
| 24 | 29 |
| 25 autotemp.tempfile objects don't seem to play nicely with being | 30 autotemp.tempfile objects don't seem to play nicely with being |
| 26 used in system commands, so they can't be used for my purposes. | 31 used in system commands, so they can't be used for my purposes. |
| (...skipping 27 matching lines...) Expand all Loading... |
| 54 | 59 |
| 55 def __unlink(filename): | 60 def __unlink(filename): |
| 56 try: | 61 try: |
| 57 os.unlink(filename) | 62 os.unlink(filename) |
| 58 except (IOError, OSError) as error: | 63 except (IOError, OSError) as error: |
| 59 logging.info(error) | 64 logging.info(error) |
| 60 | 65 |
| 61 | 66 |
| 62 def clear_ownership(): | 67 def clear_ownership(): |
| 63 __unlink(constants.OWNER_KEY_FILE) | 68 __unlink(constants.OWNER_KEY_FILE) |
| 64 __unlink(constants.SIGNED_PREFERENCES_FILE) | |
| 65 __unlink(constants.SIGNED_POLICY_FILE) | 69 __unlink(constants.SIGNED_POLICY_FILE) |
| 66 | 70 |
| 67 | 71 |
| 68 def connect_to_session_manager(): | 72 def connect_to_session_manager(): |
| 69 """Create and return a DBus connection to session_manager. | 73 """Create and return a DBus connection to session_manager. |
| 70 | 74 |
| 71 Connects to the session manager over the DBus system bus. Returns | 75 Connects to the session manager over the DBus system bus. Returns |
| 72 appropriately configured DBus interface object. | 76 appropriately configured DBus interface object. |
| 73 """ | 77 """ |
| 74 bus = dbus.SystemBus() | 78 bus = dbus.SystemBus() |
| 75 proxy = bus.get_object('org.chromium.SessionManager', | 79 proxy = bus.get_object('org.chromium.SessionManager', |
| 76 '/org/chromium/SessionManager') | 80 '/org/chromium/SessionManager') |
| 77 return dbus.Interface(proxy, 'org.chromium.SessionManagerInterface') | 81 return dbus.Interface(proxy, 'org.chromium.SessionManagerInterface') |
| 78 | 82 |
| 79 | 83 |
| 84 def listen_to_session_manager_signal(callback, signal): |
| 85 """Create and return a DBus connection to session_manager. |
| 86 |
| 87 Connects to the session manager over the DBus system bus. Returns |
| 88 appropriately configured DBus interface object. |
| 89 """ |
| 90 bus = dbus.SystemBus() |
| 91 bus.add_signal_receiver( |
| 92 handler_function=callback, |
| 93 signal_name=signal, |
| 94 dbus_interface='org.chromium.Chromium', |
| 95 bus_name=None, |
| 96 path='/') |
| 97 |
| 98 POLICY_TYPE = 'google/chromeos/device' |
| 99 |
| 100 |
| 101 def assert_has_policy_data(response_proto): |
| 102 if not response_proto.HasField("policy_data"): |
| 103 raise OwnershipError('Malformatted response.') |
| 104 |
| 105 |
| 106 def assert_has_device_settings(data_proto): |
| 107 if (not data_proto.HasField("policy_type") or |
| 108 data_proto.policy_type != POLICY_TYPE or |
| 109 not data_proto.HasField("policy_value")): |
| 110 raise OwnershipError('Malformatted response.') |
| 111 |
| 112 |
| 113 def assert_username(data_proto, username): |
| 114 if data_proto.username != username: |
| 115 raise OwnershipError('Incorrect username.') |
| 116 |
| 117 |
| 118 def assert_guest_setting(settings, guests): |
| 119 if not settings.HasField("guest_mode_enabled"): |
| 120 raise OwnershipError('No guest mode setting protobuf.') |
| 121 if not settings.guest_mode_enabled.HasField("guest_mode_enabled"): |
| 122 raise OwnershipError('No guest mode setting.') |
| 123 if settings.guest_mode_enabled.guest_mode_enabled != guests: |
| 124 raise OwnershipError('Incorrect guest mode setting.') |
| 125 |
| 126 |
| 127 def assert_show_users(settings, show_users): |
| 128 if not settings.HasField("show_user_names"): |
| 129 raise OwnershipError('No show users setting protobuf.') |
| 130 if not settings.show_user_names.HasField("show_user_names"): |
| 131 raise OwnershipError('No show users setting.') |
| 132 if settings.show_user_names.show_user_names != show_users: |
| 133 raise OwnershipError('Incorrect show users setting.') |
| 134 |
| 135 |
| 136 def assert_roaming(settings, roaming): |
| 137 if not settings.HasField("data_roaming_enabled"): |
| 138 raise OwnershipError('No roaming setting protobuf.') |
| 139 if not settings.data_roaming_enabled.HasField("data_roaming_enabled"): |
| 140 raise OwnershipError('No roaming setting.') |
| 141 if settings.data_roaming_enabled.data_roaming_enabled != roaming: |
| 142 raise OwnershipError('Incorrect roaming setting.') |
| 143 |
| 144 |
| 145 def assert_new_users(settings, new_users): |
| 146 if not settings.HasField("allow_new_users"): |
| 147 raise OwnershipError('No allow new users setting protobuf.') |
| 148 if not settings.allow_new_users.HasField("allow_new_users"): |
| 149 raise OwnershipError('No allow new users setting.') |
| 150 if settings.allow_new_users.allow_new_users != new_users: |
| 151 raise OwnershipError('Incorrect allow new users setting.') |
| 152 |
| 153 |
| 154 def assert_users_on_whitelist(settings, users): |
| 155 if settings.HasField("user_whitelist"): |
| 156 for user in users: |
| 157 if user not in settings.user_whitelist.user_whitelist: |
| 158 raise OwnershipError(user + ' not whitelisted.') |
| 159 else: |
| 160 raise OwnershipError('No user whitelist.') |
| 161 |
| 162 |
| 163 def assert_proxy_settings(settings, proxies): |
| 164 if not settings.HasField("device_proxy_settings"): |
| 165 raise OwnershipError('No proxy settings protobuf.') |
| 166 if not settings.device_proxy_settings.HasField("proxy_mode"): |
| 167 raise OwnershipError('No proxy_mode setting.') |
| 168 if settings.device_proxy_settings.proxy_mode != proxies['proxy_mode']: |
| 169 raise OwnershipError('Incorrect proxies: %s' % proxies) |
| 170 |
| 171 |
| 80 NSSDB = constants.CRYPTOHOME_MOUNT_PT + '/.pki/nssdb' | 172 NSSDB = constants.CRYPTOHOME_MOUNT_PT + '/.pki/nssdb' |
| 81 PK12UTIL = 'nsspk12util' | 173 PK12UTIL = 'nsspk12util' |
| 82 OPENSSLP12 = 'openssl pkcs12' | 174 OPENSSLP12 = 'openssl pkcs12' |
| 83 OPENSSLX509 = 'openssl x509' | 175 OPENSSLX509 = 'openssl x509' |
| 84 OPENSSLRSA = 'openssl rsa' | 176 OPENSSLRSA = 'openssl rsa' |
| 85 OPENSSLREQ = 'openssl req' | 177 OPENSSLREQ = 'openssl req' |
| 86 OPENSSLCRYPTO = 'openssl sha1' | 178 OPENSSLCRYPTO = 'openssl sha1' |
| 87 | 179 |
| 88 | 180 |
| 181 def use_known_ownerkeys(): |
| 182 """Sets the system up to use a well-known keypair for owner operations. |
| 183 |
| 184 Assuming the appropriate cryptohome is already mounted, configures the |
| 185 device to accept policies signed with the checked-in 'mock' owner key. |
| 186 """ |
| 187 dirname = os.path.dirname(__file__) |
| 188 mock_keyfile = os.path.join(dirname, constants.MOCK_OWNER_KEY) |
| 189 mock_certfile = os.path.join(dirname, constants.MOCK_OWNER_CERT) |
| 190 push_to_nss(mock_keyfile, mock_certfile, NSSDB) |
| 191 utils.open_write_close(constants.OWNER_KEY_FILE, |
| 192 cert_extract_pubkey_der(mock_certfile)) |
| 193 |
| 194 |
| 195 def known_privkey(): |
| 196 """Returns the mock owner private key in PEM format. |
| 197 """ |
| 198 dirname = os.path.dirname(__file__) |
| 199 return utils.read_file(os.path.join(dirname, constants.MOCK_OWNER_KEY)) |
| 200 |
| 201 |
| 202 def known_pubkey(): |
| 203 """Returns the mock owner public key in DER format. |
| 204 """ |
| 205 dirname = os.path.dirname(__file__) |
| 206 return cert_extract_pubkey_der(os.path.join(dirname, |
| 207 constants.MOCK_OWNER_CERT)) |
| 208 |
| 209 |
| 89 def pairgen(): | 210 def pairgen(): |
| 90 """Generate a self-signed cert and associated private key. | 211 """Generate a self-signed cert and associated private key. |
| 91 | 212 |
| 92 Generates a self-signed X509 certificate and the associated private key. | 213 Generates a self-signed X509 certificate and the associated private key. |
| 93 The key is 2048 bits. The generated material is stored in PEM format | 214 The key is 2048 bits. The generated material is stored in PEM format |
| 94 and the paths to the two files are returned. | 215 and the paths to the two files are returned. |
| 95 | 216 |
| 96 The caller is responsible for cleaning up these files. | 217 The caller is responsible for cleaning up these files. |
| 97 """ | 218 """ |
| 98 keyfile = scoped_tempfile.tempdir.name + '/private.key' | 219 keyfile = scoped_tempfile.tempdir.name + '/private.key' |
| (...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 205 stdout_tee=sig.fo, | 326 stdout_tee=sig.fo, |
| 206 stderr_tee=err.fo) | 327 stderr_tee=err.fo) |
| 207 except: | 328 except: |
| 208 err.fo.seek(0) | 329 err.fo.seek(0) |
| 209 logging.error(err.fo.read()) | 330 logging.error(err.fo.read()) |
| 210 raise | 331 raise |
| 211 | 332 |
| 212 sig.fo.seek(0) | 333 sig.fo.seek(0) |
| 213 sig_data = sig.fo.read() | 334 sig_data = sig.fo.read() |
| 214 if not sig_data: | 335 if not sig_data: |
| 215 raise error.TestFail('Empty signature!') | 336 raise error.OwnershipError('Empty signature!') |
| 216 return sig_data | 337 return sig_data |
| OLD | NEW |