| Index: client/cros/ownership.py
|
| diff --git a/client/cros/ownership.py b/client/cros/ownership.py
|
| index 008f5319bdc498ff1ba93eed0915c9d9ce08df5e..5dd1dcfd3f1009f539adf1748c1b89654bc6c66f 100644
|
| --- a/client/cros/ownership.py
|
| +++ b/client/cros/ownership.py
|
| @@ -14,6 +14,11 @@ from autotest_lib.client.bin import utils
|
| from autotest_lib.client.common_lib import autotemp, error
|
|
|
|
|
| +class OwnershipError(error.TestError):
|
| + """Generic error for ownership-related failures."""
|
| + pass
|
| +
|
| +
|
| class scoped_tempfile(object):
|
| """A wrapper that provides scoped semantics for temporary files.
|
|
|
| @@ -61,7 +66,6 @@ def __unlink(filename):
|
|
|
| def clear_ownership():
|
| __unlink(constants.OWNER_KEY_FILE)
|
| - __unlink(constants.SIGNED_PREFERENCES_FILE)
|
| __unlink(constants.SIGNED_POLICY_FILE)
|
|
|
|
|
| @@ -77,6 +81,94 @@ def connect_to_session_manager():
|
| return dbus.Interface(proxy, 'org.chromium.SessionManagerInterface')
|
|
|
|
|
| +def listen_to_session_manager_signal(callback, signal):
|
| + """Create and return a DBus connection to session_manager.
|
| +
|
| + Connects to the session manager over the DBus system bus. Returns
|
| + appropriately configured DBus interface object.
|
| + """
|
| + bus = dbus.SystemBus()
|
| + bus.add_signal_receiver(
|
| + handler_function=callback,
|
| + signal_name=signal,
|
| + dbus_interface='org.chromium.Chromium',
|
| + bus_name=None,
|
| + path='/')
|
| +
|
| +POLICY_TYPE = 'google/chromeos/device'
|
| +
|
| +
|
| +def assert_has_policy_data(response_proto):
|
| + if not response_proto.HasField("policy_data"):
|
| + raise OwnershipError('Malformatted response.')
|
| +
|
| +
|
| +def assert_has_device_settings(data_proto):
|
| + if (not data_proto.HasField("policy_type") or
|
| + data_proto.policy_type != POLICY_TYPE or
|
| + not data_proto.HasField("policy_value")):
|
| + raise OwnershipError('Malformatted response.')
|
| +
|
| +
|
| +def assert_username(data_proto, username):
|
| + if data_proto.username != username:
|
| + raise OwnershipError('Incorrect username.')
|
| +
|
| +
|
| +def assert_guest_setting(settings, guests):
|
| + if not settings.HasField("guest_mode_enabled"):
|
| + raise OwnershipError('No guest mode setting protobuf.')
|
| + if not settings.guest_mode_enabled.HasField("guest_mode_enabled"):
|
| + raise OwnershipError('No guest mode setting.')
|
| + if settings.guest_mode_enabled.guest_mode_enabled != guests:
|
| + raise OwnershipError('Incorrect guest mode setting.')
|
| +
|
| +
|
| +def assert_show_users(settings, show_users):
|
| + if not settings.HasField("show_user_names"):
|
| + raise OwnershipError('No show users setting protobuf.')
|
| + if not settings.show_user_names.HasField("show_user_names"):
|
| + raise OwnershipError('No show users setting.')
|
| + if settings.show_user_names.show_user_names != show_users:
|
| + raise OwnershipError('Incorrect show users setting.')
|
| +
|
| +
|
| +def assert_roaming(settings, roaming):
|
| + if not settings.HasField("data_roaming_enabled"):
|
| + raise OwnershipError('No roaming setting protobuf.')
|
| + if not settings.data_roaming_enabled.HasField("data_roaming_enabled"):
|
| + raise OwnershipError('No roaming setting.')
|
| + if settings.data_roaming_enabled.data_roaming_enabled != roaming:
|
| + raise OwnershipError('Incorrect roaming setting.')
|
| +
|
| +
|
| +def assert_new_users(settings, new_users):
|
| + if not settings.HasField("allow_new_users"):
|
| + raise OwnershipError('No allow new users setting protobuf.')
|
| + if not settings.allow_new_users.HasField("allow_new_users"):
|
| + raise OwnershipError('No allow new users setting.')
|
| + if settings.allow_new_users.allow_new_users != new_users:
|
| + raise OwnershipError('Incorrect allow new users setting.')
|
| +
|
| +
|
| +def assert_users_on_whitelist(settings, users):
|
| + if settings.HasField("user_whitelist"):
|
| + for user in users:
|
| + if user not in settings.user_whitelist.user_whitelist:
|
| + raise OwnershipError(user + ' not whitelisted.')
|
| + else:
|
| + raise OwnershipError('No user whitelist.')
|
| +
|
| +
|
| +def assert_proxy_settings(settings, proxies):
|
| + if not settings.HasField("device_proxy_settings"):
|
| + raise OwnershipError('No proxy settings protobuf.')
|
| + if not settings.device_proxy_settings.HasField("proxy_mode"):
|
| + raise OwnershipError('No proxy_mode setting.')
|
| + if settings.device_proxy_settings.proxy_mode != proxies['proxy_mode']:
|
| + raise OwnershipError('Incorrect proxies: %s' % proxies)
|
| +
|
| +
|
| NSSDB = constants.CRYPTOHOME_MOUNT_PT + '/.pki/nssdb'
|
| PK12UTIL = 'nsspk12util'
|
| OPENSSLP12 = 'openssl pkcs12'
|
| @@ -86,6 +178,35 @@ OPENSSLREQ = 'openssl req'
|
| OPENSSLCRYPTO = 'openssl sha1'
|
|
|
|
|
| +def use_known_ownerkeys():
|
| + """Sets the system up to use a well-known keypair for owner operations.
|
| +
|
| + Assuming the appropriate cryptohome is already mounted, configures the
|
| + device to accept policies signed with the checked-in 'mock' owner key.
|
| + """
|
| + dirname = os.path.dirname(__file__)
|
| + mock_keyfile = os.path.join(dirname, constants.MOCK_OWNER_KEY)
|
| + mock_certfile = os.path.join(dirname, constants.MOCK_OWNER_CERT)
|
| + push_to_nss(mock_keyfile, mock_certfile, NSSDB)
|
| + utils.open_write_close(constants.OWNER_KEY_FILE,
|
| + cert_extract_pubkey_der(mock_certfile))
|
| +
|
| +
|
| +def known_privkey():
|
| + """Returns the mock owner private key in PEM format.
|
| + """
|
| + dirname = os.path.dirname(__file__)
|
| + return utils.read_file(os.path.join(dirname, constants.MOCK_OWNER_KEY))
|
| +
|
| +
|
| +def known_pubkey():
|
| + """Returns the mock owner public key in DER format.
|
| + """
|
| + dirname = os.path.dirname(__file__)
|
| + return cert_extract_pubkey_der(os.path.join(dirname,
|
| + constants.MOCK_OWNER_CERT))
|
| +
|
| +
|
| def pairgen():
|
| """Generate a self-signed cert and associated private key.
|
|
|
| @@ -212,5 +333,5 @@ def sign(pem_key, data):
|
| sig.fo.seek(0)
|
| sig_data = sig.fo.read()
|
| if not sig_data:
|
| - raise error.TestFail('Empty signature!')
|
| + raise error.OwnershipError('Empty signature!')
|
| return sig_data
|
|
|