Index: client/site_tests/login_OwnershipRetaken/login_OwnershipRetaken.py |
diff --git a/client/site_tests/login_OwnershipRetaken/login_OwnershipRetaken.py b/client/site_tests/login_OwnershipRetaken/login_OwnershipRetaken.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..f08d9acba186ebe6d39348ed12a9b4630146dbec |
--- /dev/null |
+++ b/client/site_tests/login_OwnershipRetaken/login_OwnershipRetaken.py |
@@ -0,0 +1,73 @@ |
+# Copyright (c) 2011 The Chromium OS Authors. All rights reserved. |
+# Use of this source code is governed by a BSD-style license that can be |
+# found in the LICENSE file. |
+ |
+import dbus |
+import gobject |
+import logging |
+import sys |
+import os |
+import tempfile |
+ |
+from autotest_lib.client.bin import test, utils |
+from autotest_lib.client.common_lib import autotemp, error |
+from autotest_lib.client.cros import constants, cros_ui, cryptohome, login |
+from autotest_lib.client.cros import cros_ui_test, ownership |
+ |
+from dbus.mainloop.glib import DBusGMainLoop |
+ |
+ |
+class login_OwnershipRetaken(cros_ui_test.UITest): |
+ version = 1 |
+ |
+ _got_new_key = False |
+ _got_new_policy = False |
+ |
+ def setup(self): |
+ os.chdir(self.srcdir) |
+ utils.make('OUT_DIR=.') |
+ |
+ |
+ def __handle_new_key(self, success): |
+ self._got_new_key = (success == 'success') |
+ |
+ |
+ def __handle_new_policy(self, success): |
+ self._got_new_policy = (success == 'success') |
+ |
+ |
+ def __received_signals(self): |
+ """Process dbus events""" |
+ context = gobject.MainLoop().get_context() |
+ while context.iteration(False): |
+ pass |
+ return self._got_new_key and self._got_new_policy |
+ |
+ |
+ def initialize(self, creds='$mockowner'): |
+ assert creds == '$mockowner', "Must use mockowner creds for this test." |
+ super(login_OwnershipRetaken, self).initialize(creds) |
+ ownership.listen_to_session_manager_signal(self.__handle_new_key, |
+ 'SetOwnerKeyComplete') |
+ ownership.listen_to_session_manager_signal(self.__handle_new_policy, |
+ 'PropertyChangeComplete') |
+ |
+ |
+ def run_once(self): |
+ # wait for new-owner-key signal, property-changed signal. |
+ utils.poll_for_condition( |
+ condition=lambda: self.__received_signals(), |
+ desc='Retaking of ownership complete.', |
+ timeout=constants.DEFAULT_OWNERSHIP_TIMEOUT) |
+ |
+ # grab key, ensure that it's different than known key |
+ if (utils.read_file(constants.OWNER_KEY_FILE) == |
+ ownership.known_pubkey()): |
+ raise error.TestFail('Owner key should have changed!') |
+ |
+ # RetrievePolicy, check sig against new key, check properties |
+ sm = ownership.connect_to_session_manager() |
+ retrieved_policy = sm.RetrievePolicy(byte_arrays=True) |
+ if retrieved_policy is None: |
+ raise error.TestFail('Policy not found') |
+ self.validate_basic_policy(retrieved_policy) |