Index: client/cros/ownership.py |
diff --git a/client/cros/ownership.py b/client/cros/ownership.py |
index 008f5319bdc498ff1ba93eed0915c9d9ce08df5e..5dd1dcfd3f1009f539adf1748c1b89654bc6c66f 100644 |
--- a/client/cros/ownership.py |
+++ b/client/cros/ownership.py |
@@ -14,6 +14,11 @@ from autotest_lib.client.bin import utils |
from autotest_lib.client.common_lib import autotemp, error |
+class OwnershipError(error.TestError): |
+ """Generic error for ownership-related failures.""" |
+ pass |
+ |
+ |
class scoped_tempfile(object): |
"""A wrapper that provides scoped semantics for temporary files. |
@@ -61,7 +66,6 @@ def __unlink(filename): |
def clear_ownership(): |
__unlink(constants.OWNER_KEY_FILE) |
- __unlink(constants.SIGNED_PREFERENCES_FILE) |
__unlink(constants.SIGNED_POLICY_FILE) |
@@ -77,6 +81,94 @@ def connect_to_session_manager(): |
return dbus.Interface(proxy, 'org.chromium.SessionManagerInterface') |
+def listen_to_session_manager_signal(callback, signal): |
+ """Create and return a DBus connection to session_manager. |
+ |
+ Connects to the session manager over the DBus system bus. Returns |
+ appropriately configured DBus interface object. |
+ """ |
+ bus = dbus.SystemBus() |
+ bus.add_signal_receiver( |
+ handler_function=callback, |
+ signal_name=signal, |
+ dbus_interface='org.chromium.Chromium', |
+ bus_name=None, |
+ path='/') |
+ |
+POLICY_TYPE = 'google/chromeos/device' |
+ |
+ |
+def assert_has_policy_data(response_proto): |
+ if not response_proto.HasField("policy_data"): |
+ raise OwnershipError('Malformatted response.') |
+ |
+ |
+def assert_has_device_settings(data_proto): |
+ if (not data_proto.HasField("policy_type") or |
+ data_proto.policy_type != POLICY_TYPE or |
+ not data_proto.HasField("policy_value")): |
+ raise OwnershipError('Malformatted response.') |
+ |
+ |
+def assert_username(data_proto, username): |
+ if data_proto.username != username: |
+ raise OwnershipError('Incorrect username.') |
+ |
+ |
+def assert_guest_setting(settings, guests): |
+ if not settings.HasField("guest_mode_enabled"): |
+ raise OwnershipError('No guest mode setting protobuf.') |
+ if not settings.guest_mode_enabled.HasField("guest_mode_enabled"): |
+ raise OwnershipError('No guest mode setting.') |
+ if settings.guest_mode_enabled.guest_mode_enabled != guests: |
+ raise OwnershipError('Incorrect guest mode setting.') |
+ |
+ |
+def assert_show_users(settings, show_users): |
+ if not settings.HasField("show_user_names"): |
+ raise OwnershipError('No show users setting protobuf.') |
+ if not settings.show_user_names.HasField("show_user_names"): |
+ raise OwnershipError('No show users setting.') |
+ if settings.show_user_names.show_user_names != show_users: |
+ raise OwnershipError('Incorrect show users setting.') |
+ |
+ |
+def assert_roaming(settings, roaming): |
+ if not settings.HasField("data_roaming_enabled"): |
+ raise OwnershipError('No roaming setting protobuf.') |
+ if not settings.data_roaming_enabled.HasField("data_roaming_enabled"): |
+ raise OwnershipError('No roaming setting.') |
+ if settings.data_roaming_enabled.data_roaming_enabled != roaming: |
+ raise OwnershipError('Incorrect roaming setting.') |
+ |
+ |
+def assert_new_users(settings, new_users): |
+ if not settings.HasField("allow_new_users"): |
+ raise OwnershipError('No allow new users setting protobuf.') |
+ if not settings.allow_new_users.HasField("allow_new_users"): |
+ raise OwnershipError('No allow new users setting.') |
+ if settings.allow_new_users.allow_new_users != new_users: |
+ raise OwnershipError('Incorrect allow new users setting.') |
+ |
+ |
+def assert_users_on_whitelist(settings, users): |
+ if settings.HasField("user_whitelist"): |
+ for user in users: |
+ if user not in settings.user_whitelist.user_whitelist: |
+ raise OwnershipError(user + ' not whitelisted.') |
+ else: |
+ raise OwnershipError('No user whitelist.') |
+ |
+ |
+def assert_proxy_settings(settings, proxies): |
+ if not settings.HasField("device_proxy_settings"): |
+ raise OwnershipError('No proxy settings protobuf.') |
+ if not settings.device_proxy_settings.HasField("proxy_mode"): |
+ raise OwnershipError('No proxy_mode setting.') |
+ if settings.device_proxy_settings.proxy_mode != proxies['proxy_mode']: |
+ raise OwnershipError('Incorrect proxies: %s' % proxies) |
+ |
+ |
NSSDB = constants.CRYPTOHOME_MOUNT_PT + '/.pki/nssdb' |
PK12UTIL = 'nsspk12util' |
OPENSSLP12 = 'openssl pkcs12' |
@@ -86,6 +178,35 @@ OPENSSLREQ = 'openssl req' |
OPENSSLCRYPTO = 'openssl sha1' |
+def use_known_ownerkeys(): |
+ """Sets the system up to use a well-known keypair for owner operations. |
+ |
+ Assuming the appropriate cryptohome is already mounted, configures the |
+ device to accept policies signed with the checked-in 'mock' owner key. |
+ """ |
+ dirname = os.path.dirname(__file__) |
+ mock_keyfile = os.path.join(dirname, constants.MOCK_OWNER_KEY) |
+ mock_certfile = os.path.join(dirname, constants.MOCK_OWNER_CERT) |
+ push_to_nss(mock_keyfile, mock_certfile, NSSDB) |
+ utils.open_write_close(constants.OWNER_KEY_FILE, |
+ cert_extract_pubkey_der(mock_certfile)) |
+ |
+ |
+def known_privkey(): |
+ """Returns the mock owner private key in PEM format. |
+ """ |
+ dirname = os.path.dirname(__file__) |
+ return utils.read_file(os.path.join(dirname, constants.MOCK_OWNER_KEY)) |
+ |
+ |
+def known_pubkey(): |
+ """Returns the mock owner public key in DER format. |
+ """ |
+ dirname = os.path.dirname(__file__) |
+ return cert_extract_pubkey_der(os.path.join(dirname, |
+ constants.MOCK_OWNER_CERT)) |
+ |
+ |
def pairgen(): |
"""Generate a self-signed cert and associated private key. |
@@ -212,5 +333,5 @@ def sign(pem_key, data): |
sig.fo.seek(0) |
sig_data = sig.fo.read() |
if not sig_data: |
- raise error.TestFail('Empty signature!') |
+ raise error.OwnershipError('Empty signature!') |
return sig_data |