| OLD | NEW | 
|---|
| 1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. | 1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. | 
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be | 
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. | 
| 4 | 4 | 
| 5 import dbus | 5 import dbus | 
| 6 import dbus.glib | 6 import dbus.glib | 
| 7 import gobject | 7 import gobject | 
| 8 import logging | 8 import logging | 
| 9 import os | 9 import os | 
|  | 10 import sys | 
| 10 import tempfile | 11 import tempfile | 
| 11 | 12 | 
| 12 from autotest_lib.client.bin import test | 13 from autotest_lib.client.bin import test, utils | 
| 13 from autotest_lib.client.common_lib import autotemp, error | 14 from autotest_lib.client.common_lib import autotemp, error | 
| 14 from autotest_lib.client.cros import constants, cros_ui, cryptohome, login | 15 from autotest_lib.client.cros import constants, cros_ui, cryptohome, login | 
| 15 from autotest_lib.client.cros import ownership | 16 from autotest_lib.client.cros import ownership | 
| 16 | 17 | 
| 17 | 18 | 
| 18 class login_OwnershipApi(test.test): | 19 class login_OwnershipApi(test.test): | 
| 19     version = 1 | 20     version = 1 | 
| 20 | 21 | 
| 21     _testuser = 'cryptohometest@chromium.org' | 22     _testuser = 'cryptohometest@chromium.org' | 
| 22     _testpass = 'testme' | 23     _testpass = 'testme' | 
| 23     _testpolicydata = 'hooberbloob' | 24     _poldata = 'hooberbloob' | 
| 24 | 25 | 
| 25     _tempdir = None | 26     _tempdir = None | 
| 26 | 27 | 
| 27     def initialize(self): | 28     def setup(self): | 
|  | 29         os.chdir(self.srcdir) | 
|  | 30         utils.make('OUT_DIR=.') | 
|  | 31 | 
|  | 32 | 
|  | 33     def __unlink(self, filename): | 
| 28         try: | 34         try: | 
| 29             os.unlink(constants.OWNER_KEY_FILE) | 35             os.unlink(filename) | 
| 30             os.unlink(constants.SIGNED_PREFERENCES_FILE) |  | 
| 31         except (IOError, OSError) as error: | 36         except (IOError, OSError) as error: | 
| 32             logging.info(error) | 37             logging.info(error) | 
|  | 38 | 
|  | 39     def initialize(self): | 
|  | 40         self.__unlink(constants.OWNER_KEY_FILE) | 
|  | 41         self.__unlink(constants.SIGNED_PREFERENCES_FILE) | 
|  | 42         self.__unlink(constants.SIGNED_POLICY_FILE) | 
| 33         login.refresh_login_screen() | 43         login.refresh_login_screen() | 
| 34         cryptohome.remove_vault(self._testuser) | 44         cryptohome.remove_vault(self._testuser) | 
| 35         cryptohome.mount_vault(self._testuser, self._testpass, create=True) | 45         cryptohome.mount_vault(self._testuser, self._testpass, create=True) | 
| 36         self._tempdir = autotemp.tempdir(unique_id=self.__class__.__name__) | 46         self._tempdir = autotemp.tempdir(unique_id=self.__class__.__name__) | 
| 37         # to prime nssdb. | 47         # to prime nssdb. | 
| 38         tmpname = self.__generate_temp_filename() | 48         tmpname = self.__generate_temp_filename() | 
| 39         cros_ui.xsystem_as('HOME=%s %s %s' % (constants.CRYPTOHOME_MOUNT_PT, | 49         cros_ui.xsystem_as('HOME=%s %s %s' % (constants.CRYPTOHOME_MOUNT_PT, | 
| 40                                               constants.KEYGEN, | 50                                               constants.KEYGEN, | 
| 41                                               tmpname)) | 51                                               tmpname)) | 
| 42         os.unlink(tmpname) | 52         os.unlink(tmpname) | 
| (...skipping 13 matching lines...) Expand all  Loading... | 
| 56 | 66 | 
| 57 | 67 | 
| 58     def __log_err_and_stop(self, e): | 68     def __log_err_and_stop(self, e): | 
| 59         logging.debug(e) | 69         logging.debug(e) | 
| 60         self._loop.quit() | 70         self._loop.quit() | 
| 61 | 71 | 
| 62 | 72 | 
| 63     def run_once(self): | 73     def run_once(self): | 
| 64         keyfile = ownership.generate_and_register_owner_keypair(self._testuser, | 74         keyfile = ownership.generate_and_register_owner_keypair(self._testuser, | 
| 65                                                                 self._testpass) | 75                                                                 self._testpass) | 
|  | 76         # Pull in protobuf definitions. | 
|  | 77         sys.path.append(self.srcdir) | 
|  | 78         from device_management_backend_pb2 import PolicyFetchResponse | 
| 66 | 79 | 
| 67         # open DBus connection to session_manager | 80         # open DBus connection to session_manager | 
| 68         bus = dbus.SystemBus() | 81         bus = dbus.SystemBus() | 
| 69         proxy = bus.get_object('org.chromium.SessionManager', | 82         proxy = bus.get_object('org.chromium.SessionManager', | 
| 70                                '/org/chromium/SessionManager') | 83                                '/org/chromium/SessionManager') | 
| 71         sm = dbus.Interface(proxy, 'org.chromium.SessionManagerInterface') | 84         sm = dbus.Interface(proxy, 'org.chromium.SessionManagerInterface') | 
| 72 | 85 | 
| 73         sig = ownership.sign(keyfile, self._testuser) | 86         policy_proto = PolicyFetchResponse() | 
| 74         sm.Whitelist(self._testuser, dbus.ByteArray(sig)) | 87         policy_proto.policy_data = self._poldata | 
| 75         wl_sig = sm.CheckWhitelist(self._testuser, byte_arrays=True) | 88         policy_proto.policy_data_signature = ownership.sign(keyfile, | 
| 76         if sig != wl_sig: | 89                                                             self._poldata) | 
| 77             raise error.TestFail("CheckWhitelist signature mismatch") | 90         sm.StorePolicy(dbus.ByteArray(policy_proto.SerializeToString()), | 
|  | 91                        byte_arrays=True, | 
|  | 92                        reply_handler=self.__log_and_stop, | 
|  | 93                        error_handler=self.__log_err_and_stop) | 
| 78 | 94 | 
| 79         sm.Unwhitelist(self._testuser, dbus.ByteArray(sig)) | 95         self._loop = gobject.MainLoop() | 
| 80         try: | 96         self._loop.run() | 
| 81             sm.CheckWhitelist(self._testuser) | 97 | 
| 82             raise error.TestFail("Should not have found user in whitelist!") | 98         retrieved_policy = sm.RetrievePolicy(byte_arrays=True) | 
| 83         except dbus.DBusException as e: | 99         if retrieved_policy != policy_proto.SerializeToString(): | 
| 84             logging.debug(e) | 100             raise error.TestFail('Policy should not be %s' % retrieved_policy) | 
| 85 | 101 | 
| 86 | 102 | 
| 87     def cleanup(self): | 103     def cleanup(self): | 
| 88         cryptohome.unmount_vault() | 104         cryptohome.unmount_vault() | 
| 89         self._tempdir.clean() | 105         self._tempdir.clean() | 
| 90         super(login_OwnershipApi, self).cleanup() | 106         super(login_OwnershipApi, self).cleanup() | 
| OLD | NEW | 
|---|