| Index: client/site_tests/login_OwnershipRetaken/login_OwnershipRetaken.py
|
| diff --git a/client/site_tests/login_OwnershipRetaken/login_OwnershipRetaken.py b/client/site_tests/login_OwnershipRetaken/login_OwnershipRetaken.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..323736c0413d7d00b7c54d2546a95e58ae0c9020
|
| --- /dev/null
|
| +++ b/client/site_tests/login_OwnershipRetaken/login_OwnershipRetaken.py
|
| @@ -0,0 +1,122 @@
|
| +# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
|
| +# Use of this source code is governed by a BSD-style license that can be
|
| +# found in the LICENSE file.
|
| +
|
| +import dbus
|
| +import gobject
|
| +import logging
|
| +import sys
|
| +import os
|
| +import tempfile
|
| +
|
| +from autotest_lib.client.bin import test, utils
|
| +from autotest_lib.client.common_lib import autotemp, error
|
| +from autotest_lib.client.cros import constants, cros_ui, cryptohome, login
|
| +from autotest_lib.client.cros import cros_ownership_test, ownership
|
| +
|
| +from dbus.mainloop.glib import DBusGMainLoop
|
| +
|
| +
|
| +class login_OwnershipRetaken(cros_ownership_test.OwnershipTest):
|
| + version = 1
|
| +
|
| + _tempdir = None
|
| + _got_new_key = False
|
| + _got_new_policy = False
|
| +
|
| + def setup(self):
|
| + os.chdir(self.srcdir)
|
| + utils.make('OUT_DIR=.')
|
| +
|
| +
|
| + def __handle_new_key(self, success):
|
| + self._got_new_key = (success == 'success')
|
| +
|
| +
|
| + def __handle_new_policy(self, success):
|
| + self._got_new_policy = (success == 'success')
|
| +
|
| +
|
| + def __received_signals(self):
|
| + """Process dbus events"""
|
| + context = gobject.MainLoop().get_context()
|
| + while context.iteration(False):
|
| + pass
|
| + return self._got_new_key and self._got_new_policy
|
| +
|
| +
|
| + def __reset_signal_state(self):
|
| + self._got_new_policy = self._got_new_key = False
|
| +
|
| +
|
| + def initialize(self):
|
| + super(login_OwnershipRetaken, self).initialize()
|
| + # Start with a clean slate wrt ownership
|
| + cros_ui.stop()
|
| + cryptohome.remove_vault(self._testuser)
|
| + cryptohome.mount_vault(self._testuser, self._testpass, create=True)
|
| + ownership.clear_ownership()
|
| + cros_ui.start()
|
| + login.wait_for_browser()
|
| + DBusGMainLoop(set_as_default=True)
|
| + ownership.listen_to_session_manager_signal(self.__handle_new_key,
|
| + 'SetOwnerKeyComplete')
|
| + ownership.listen_to_session_manager_signal(self.__handle_new_policy,
|
| + 'PropertyChangeComplete')
|
| +
|
| +
|
| + def run_once(self):
|
| + pkey = ownership.known_privkey()
|
| + pubkey = ownership.known_pubkey()
|
| + sm = ownership.connect_to_session_manager()
|
| +
|
| + # Pre-configure some owner settings, including initial key.
|
| + poldata = self.build_policy_data(owner=self._testuser,
|
| + guests=False,
|
| + new_users=True,
|
| + roaming=True,
|
| + whitelist=(self._testuser, 'a@b.c'),
|
| + proxies={ 'proxy_mode': 'direct' })
|
| + policy_string = self.generate_policy(pkey, pubkey, poldata)
|
| + self.push_policy(policy_string, sm)
|
| +
|
| +
|
| + # wait for new-owner-key signal, property-changed signal.
|
| + login.wait_for_ownership()
|
| +
|
| + # grab key, ensure that it's the same as the known key.
|
| + if (utils.read_file(constants.OWNER_KEY_FILE) != pubkey):
|
| + raise error.TestFail('Owner key should have changed!')
|
| +
|
| + # Start a new session, which will trigger the re-taking of ownership.
|
| + if not sm.StartSession(self._testuser, ''):
|
| + raise error.TestFail('Could not start session for owner')
|
| +
|
| + # wait for new-owner-key signal, property-changed signal.
|
| + utils.poll_for_condition(condition=lambda: self.__received_signals(),
|
| + desc='Retaking of ownership complete.',
|
| + timeout=constants.DEFAULT_OWNERSHIP_TIMEOUT)
|
| +
|
| + # grab key, ensure that it's different than known key
|
| + if (utils.read_file(constants.OWNER_KEY_FILE) == pubkey):
|
| + raise error.TestFail('Owner key should have changed!')
|
| +
|
| + # RetrievePolicy, check sig against new key, check properties
|
| + retrieved_policy = sm.RetrievePolicy(byte_arrays=True)
|
| + if retrieved_policy is None:
|
| + raise error.TestFail('Policy not found')
|
| + self.compare_policy_response(retrieved_policy,
|
| + owner=self._testuser,
|
| + guests=False,
|
| + new_users=True,
|
| + roaming=True,
|
| + whitelist=(self._testuser, 'a@b.c'),
|
| + proxies={ 'proxy_mode': 'direct' })
|
| +
|
| +
|
| +
|
| + def cleanup(self):
|
| + cryptohome.unmount_vault()
|
| + if self._tempdir: self._tempdir.clean()
|
| + cros_ui.start(allow_fail=True)
|
| + super(login_OwnershipRetaken, self).cleanup()
|
|
|