OLD | NEW |
1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. | 1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 import dbus | 5 import dbus |
6 import dbus.glib | 6 import dbus.glib |
7 import gobject | 7 import gobject |
8 import logging | 8 import logging |
9 import os | 9 import os |
10 import sys | |
11 import tempfile | 10 import tempfile |
12 | 11 |
13 from autotest_lib.client.bin import test, utils | 12 from autotest_lib.client.bin import test, utils |
14 from autotest_lib.client.common_lib import autotemp, error | 13 from autotest_lib.client.common_lib import autotemp, error |
15 from autotest_lib.client.cros import constants, cros_ui, cryptohome, login | 14 from autotest_lib.client.cros import constants, cros_ui, cryptohome, login |
16 from autotest_lib.client.cros import ownership | 15 from autotest_lib.client.cros import cros_ownership_test, ownership |
17 | 16 |
18 | 17 |
19 class login_OwnershipApi(test.test): | 18 class login_OwnershipApi(cros_ownership_test.OwnershipTest): |
20 version = 1 | 19 version = 1 |
21 | 20 |
22 _testuser = 'cryptohometest@chromium.org' | 21 _testuser = 'cryptohometest@chromium.org' |
23 _testpass = 'testme' | 22 _testpass = 'testme' |
24 _poldata = 'hooberbloob' | 23 _poldata = 'policydata' |
25 | 24 |
26 _tempdir = None | 25 _tempdir = None |
27 | 26 |
28 def setup(self): | 27 def setup(self): |
29 os.chdir(self.srcdir) | 28 os.chdir(self.srcdir) |
30 utils.make('OUT_DIR=.') | 29 utils.make('OUT_DIR=.') |
31 | 30 |
32 | 31 |
33 def __unlink(self, filename): | |
34 try: | |
35 os.unlink(filename) | |
36 except (IOError, OSError) as error: | |
37 logging.info(error) | |
38 | |
39 def initialize(self): | 32 def initialize(self): |
40 self.__unlink(constants.OWNER_KEY_FILE) | 33 super(login_OwnershipApi, self).initialize() |
41 self.__unlink(constants.SIGNED_PREFERENCES_FILE) | |
42 self.__unlink(constants.SIGNED_POLICY_FILE) | |
43 login.refresh_login_screen() | |
44 cryptohome.remove_vault(self._testuser) | 34 cryptohome.remove_vault(self._testuser) |
45 cryptohome.mount_vault(self._testuser, self._testpass, create=True) | 35 cryptohome.mount_vault(self._testuser, self._testpass, create=True) |
46 self._tempdir = autotemp.tempdir(unique_id=self.__class__.__name__) | 36 self._tempdir = autotemp.tempdir(unique_id=self.__class__.__name__) |
47 # to prime nssdb. | 37 # to prime nssdb. |
48 tmpname = self.__generate_temp_filename() | 38 tmpname = self.__generate_temp_filename() |
49 cros_ui.xsystem_as('HOME=%s %s %s' % (constants.CRYPTOHOME_MOUNT_PT, | 39 cros_ui.xsystem_as('HOME=%s %s %s' % (constants.CRYPTOHOME_MOUNT_PT, |
50 constants.KEYGEN, | 40 constants.KEYGEN, |
51 tmpname)) | 41 tmpname)) |
52 os.unlink(tmpname) | 42 os.unlink(tmpname) |
53 super(login_OwnershipApi, self).initialize() | |
54 | 43 |
55 | 44 |
56 def __generate_temp_filename(self): | 45 def __generate_temp_filename(self): |
57 just_for_name = tempfile.NamedTemporaryFile(mode='w', delete=True) | 46 just_for_name = tempfile.NamedTemporaryFile(mode='w', delete=True) |
58 basename = just_for_name.name | 47 basename = just_for_name.name |
59 just_for_name.close() # deletes file. | 48 just_for_name.close() # deletes file. |
60 return basename | 49 return basename |
61 | 50 |
62 | 51 |
63 def __log_and_stop(self, ret_code): | |
64 logging.info("exited %s" % ret_code) | |
65 self._loop.quit() | |
66 | |
67 | |
68 def __log_err_and_stop(self, e): | |
69 logging.debug(e) | |
70 self._loop.quit() | |
71 | |
72 | |
73 def run_once(self): | 52 def run_once(self): |
74 keyfile = ownership.generate_and_register_owner_keypair(self._testuser, | 53 (pkey, pubkey) = ownership.generate_and_register_keypair(self._testuser, |
75 self._testpass) | 54 self._testpass) |
76 # Pull in protobuf definitions. | 55 sm = self.connect_to_session_manager() |
77 sys.path.append(self.srcdir) | 56 if not sm.StartSession(self._testuser, ''): |
78 from device_management_backend_pb2 import PolicyFetchResponse | 57 raise error.TestFail('Could not start session for owner') |
79 | 58 self.push_policy(self.generate_policy(pkey, pubkey, self._poldata), sm) |
80 # open DBus connection to session_manager | 59 if not sm.StopSession(''): |
81 bus = dbus.SystemBus() | 60 raise error.TestFail('Could not stop session for owner') |
82 proxy = bus.get_object('org.chromium.SessionManager', | |
83 '/org/chromium/SessionManager') | |
84 sm = dbus.Interface(proxy, 'org.chromium.SessionManagerInterface') | |
85 | |
86 policy_proto = PolicyFetchResponse() | |
87 policy_proto.policy_data = self._poldata | |
88 policy_proto.policy_data_signature = ownership.sign(keyfile, | |
89 self._poldata) | |
90 sm.StorePolicy(dbus.ByteArray(policy_proto.SerializeToString()), | |
91 byte_arrays=True, | |
92 reply_handler=self.__log_and_stop, | |
93 error_handler=self.__log_err_and_stop) | |
94 | |
95 self._loop = gobject.MainLoop() | |
96 self._loop.run() | |
97 | |
98 retrieved_policy = sm.RetrievePolicy(byte_arrays=True) | |
99 if retrieved_policy != policy_proto.SerializeToString(): | |
100 raise error.TestFail('Policy should not be %s' % retrieved_policy) | |
101 | 61 |
102 | 62 |
103 def cleanup(self): | 63 def cleanup(self): |
104 cryptohome.unmount_vault() | 64 cryptohome.unmount_vault() |
105 self._tempdir.clean() | 65 if self._tempdir: self._tempdir.clean() |
106 super(login_OwnershipApi, self).cleanup() | 66 super(login_OwnershipApi, self).cleanup() |
OLD | NEW |