OLD | NEW |
(Empty) | |
| 1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. |
| 4 |
| 5 import dbus |
| 6 import gobject |
| 7 import logging |
| 8 import sys |
| 9 import os |
| 10 import tempfile |
| 11 |
| 12 from autotest_lib.client.bin import test, utils |
| 13 from autotest_lib.client.common_lib import autotemp, error |
| 14 from autotest_lib.client.cros import constants, cros_ui, cryptohome, login |
| 15 from autotest_lib.client.cros import cros_ownership_test, ownership |
| 16 |
| 17 from dbus.mainloop.glib import DBusGMainLoop |
| 18 |
| 19 |
| 20 class login_OwnershipRetaken(cros_ownership_test.OwnershipTest): |
| 21 version = 1 |
| 22 |
| 23 _tempdir = None |
| 24 _got_new_key = False |
| 25 _got_new_policy = False |
| 26 |
| 27 def setup(self): |
| 28 os.chdir(self.srcdir) |
| 29 utils.make('OUT_DIR=.') |
| 30 |
| 31 |
| 32 def __handle_new_key(self, success): |
| 33 self._got_new_key = (success == 'success') |
| 34 |
| 35 |
| 36 def __handle_new_policy(self, success): |
| 37 self._got_new_policy = (success == 'success') |
| 38 |
| 39 |
| 40 def __received_signals(self): |
| 41 """Process dbus events""" |
| 42 context = gobject.MainLoop().get_context() |
| 43 while context.iteration(False): |
| 44 pass |
| 45 return self._got_new_key and self._got_new_policy |
| 46 |
| 47 |
| 48 def __reset_signal_state(self): |
| 49 self._got_new_policy = self._got_new_key = False |
| 50 |
| 51 |
| 52 def initialize(self): |
| 53 super(login_OwnershipRetaken, self).initialize() |
| 54 # Start with a clean slate wrt ownership |
| 55 cros_ui.stop() |
| 56 cryptohome.remove_vault(self._testuser) |
| 57 cryptohome.mount_vault(self._testuser, self._testpass, create=True) |
| 58 ownership.clear_ownership() |
| 59 cros_ui.start() |
| 60 login.wait_for_browser() |
| 61 DBusGMainLoop(set_as_default=True) |
| 62 ownership.listen_to_session_manager_signal(self.__handle_new_key, |
| 63 'SetOwnerKeyComplete') |
| 64 ownership.listen_to_session_manager_signal(self.__handle_new_policy, |
| 65 'PropertyChangeComplete') |
| 66 |
| 67 |
| 68 def run_once(self): |
| 69 pkey = ownership.known_privkey() |
| 70 pubkey = ownership.known_pubkey() |
| 71 sm = ownership.connect_to_session_manager() |
| 72 |
| 73 # Pre-configure some owner settings, including initial key. |
| 74 poldata = self.build_policy_data(owner=self._testuser, |
| 75 guests=False, |
| 76 new_users=True, |
| 77 roaming=True, |
| 78 whitelist=(self._testuser, 'a@b.c'), |
| 79 proxies={ 'proxy_mode': 'direct' }) |
| 80 policy_string = self.generate_policy(pkey, pubkey, poldata) |
| 81 self.push_policy(policy_string, sm) |
| 82 |
| 83 |
| 84 # wait for new-owner-key signal, property-changed signal. |
| 85 login.wait_for_ownership() |
| 86 |
| 87 # grab key, ensure that it's the same as the known key. |
| 88 if (utils.read_file(constants.OWNER_KEY_FILE) != pubkey): |
| 89 raise error.TestFail('Owner key should have changed!') |
| 90 |
| 91 # Start a new session, which will trigger the re-taking of ownership. |
| 92 if not sm.StartSession(self._testuser, ''): |
| 93 raise error.TestFail('Could not start session for owner') |
| 94 |
| 95 # wait for new-owner-key signal, property-changed signal. |
| 96 utils.poll_for_condition(condition=lambda: self.__received_signals(), |
| 97 desc='Retaking of ownership complete.', |
| 98 timeout=constants.DEFAULT_OWNERSHIP_TIMEOUT) |
| 99 |
| 100 # grab key, ensure that it's different than known key |
| 101 if (utils.read_file(constants.OWNER_KEY_FILE) == pubkey): |
| 102 raise error.TestFail('Owner key should have changed!') |
| 103 |
| 104 # RetrievePolicy, check sig against new key, check properties |
| 105 retrieved_policy = sm.RetrievePolicy(byte_arrays=True) |
| 106 if retrieved_policy is None: |
| 107 raise error.TestFail('Policy not found') |
| 108 self.compare_policy_response(retrieved_policy, |
| 109 owner=self._testuser, |
| 110 guests=False, |
| 111 new_users=True, |
| 112 roaming=True, |
| 113 whitelist=(self._testuser, 'a@b.c'), |
| 114 proxies={ 'proxy_mode': 'direct' }) |
| 115 |
| 116 |
| 117 |
| 118 def cleanup(self): |
| 119 cryptohome.unmount_vault() |
| 120 if self._tempdir: self._tempdir.clean() |
| 121 cros_ui.start(allow_fail=True) |
| 122 super(login_OwnershipRetaken, self).cleanup() |
OLD | NEW |