OLD | NEW |
(Empty) | |
| 1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. |
| 4 |
| 5 import dbus |
| 6 import logging |
| 7 import sys |
| 8 import common |
| 9 import constants |
| 10 import login |
| 11 import ownership |
| 12 |
| 13 from autotest_lib.client.bin import test, utils |
| 14 from autotest_lib.client.common_lib import error |
| 15 |
| 16 |
| 17 class OwnershipTest(test.test): |
| 18 """Base class for tests that test device ownership and policies. |
| 19 |
| 20 If your subclass overrides the initialize() or cleanup() methods, it |
| 21 should make sure to invoke this class' version of those methods as well. |
| 22 The standard super(...) function cannot be used for this, since the base |
| 23 test class is not a 'new style' Python class. |
| 24 """ |
| 25 version = 1 |
| 26 |
| 27 def initialize(self): |
| 28 ownership.clear_ownership() |
| 29 login.refresh_login_screen() |
| 30 super(OwnershipTest, self).initialize() |
| 31 |
| 32 |
| 33 def connect_to_session_manager(self): |
| 34 """Create and return a DBus connection to session_manager. |
| 35 |
| 36 Connects to the session manager over the DBus system bus. Returns |
| 37 appropriately configured DBus interface object. |
| 38 """ |
| 39 bus = dbus.SystemBus() |
| 40 proxy = bus.get_object('org.chromium.SessionManager', |
| 41 '/org/chromium/SessionManager') |
| 42 return dbus.Interface(proxy, 'org.chromium.SessionManagerInterface') |
| 43 |
| 44 |
| 45 def generate_policy(self, key, pubkey, policy, old_key=None): |
| 46 """Generate and serialize a populated device policy protobuffer. |
| 47 |
| 48 Creates a protobuf containing the device policy |policy|, signed with |
| 49 |key|. Also includes the public key |pubkey|, signed with |old_key| |
| 50 if provided. If not, |pubkey| is signed with |key|. The protobuf |
| 51 is serialized to a string and returned. |
| 52 """ |
| 53 # Pull in protobuf definitions. |
| 54 sys.path.append(self.srcdir) |
| 55 from device_management_backend_pb2 import PolicyFetchResponse |
| 56 |
| 57 if old_key == None: |
| 58 old_key = key |
| 59 policy_proto = PolicyFetchResponse() |
| 60 policy_proto.policy_data = policy |
| 61 policy_proto.policy_data_signature = ownership.sign(key, policy) |
| 62 policy_proto.new_public_key = pubkey |
| 63 policy_proto.new_public_key_signature = ownership.sign(old_key, pubkey) |
| 64 return policy_proto.SerializeToString() |
| 65 |
| 66 |
| 67 def push_policy(self, policy_string, sm): |
| 68 """Push a device policy to the session manager over DBus. |
| 69 |
| 70 The serialized device policy |policy_string| is sent to the session |
| 71 manager with the StorePolicy DBus call. Success of the store is |
| 72 validated by fetching the policy again and comparing. |
| 73 """ |
| 74 sm.StorePolicy(dbus.ByteArray(policy_string), byte_arrays=True) |
| 75 login.wait_for_ownership() |
| 76 |
| 77 retrieved_policy = sm.RetrievePolicy(byte_arrays=True) |
| 78 if retrieved_policy != policy_string: |
| 79 raise error.TestFail('Policy should not be %s' % retrieved_policy) |
| 80 |
| 81 |
| 82 def cleanup(self): |
| 83 login.nuke_login_manager() |
| 84 super(OwnershipTest, self).cleanup() |
OLD | NEW |