| Index: appengine/swarming/server/acl_test.py
|
| diff --git a/appengine/swarming/server/acl_test.py b/appengine/swarming/server/acl_test.py
|
| index 48bec844b431a80aac7cf4027ae7b21e44a6c9f6..6f5c79fea212752d6c11036cae48f652f6d37e22 100755
|
| --- a/appengine/swarming/server/acl_test.py
|
| +++ b/appengine/swarming/server/acl_test.py
|
| @@ -19,7 +19,8 @@ from test_support import test_case
|
|
|
| from proto import config_pb2
|
|
|
| -import acl
|
| +from server import acl
|
| +from server import task_request
|
|
|
|
|
| # Default names of authorization groups.
|
| @@ -32,108 +33,197 @@ BOT_BOOTSTRAP_GROUP = ADMINS_GROUP
|
| class AclTest(test_case.TestCase):
|
| def setUp(self):
|
| super(AclTest, self).setUp()
|
| -
|
| + def settings():
|
| + return config_pb2.SettingsCfg(
|
| + auth=config_pb2.AuthSettings(
|
| + admins_group='admins',
|
| + bot_bootstrap_group='bot_bootstrap',
|
| + privileged_users_group='privileged_users',
|
| + users_group='users',
|
| + view_all_bots_group='view_all_bots',
|
| + view_all_tasks_group='view_all_tasks'))
|
| + self.mock(config, 'settings', settings)
|
| auth_testing.reset_local_state()
|
| - utils.clear_cache(config.settings)
|
| + self._task_owned = task_request.TaskRequest(
|
| + authenticated=auth.get_current_identity())
|
| + self._task_other = task_request.TaskRequest(
|
| + authenticated=auth.Identity(auth.IDENTITY_USER, 'larry@localhost'))
|
|
|
| @staticmethod
|
| - def add_to_group(group):
|
| + def _add_to_group(group):
|
| auth.bootstrap_group(group, [auth.get_current_identity()])
|
| + auth_testing.reset_local_state()
|
|
|
| - def add_to_admin(self):
|
| + def test_nobody(self):
|
| + self.mock(auth, 'get_current_identity', lambda: auth.IDENTITY_ANONYMOUS)
|
| + self.assertFalse(acl.is_ip_whitelisted_machine())
|
| + self.assertFalse(acl.can_access())
|
| + self.assertFalse(acl.can_view_config())
|
| + self.assertFalse(acl.can_edit_config())
|
| + self.assertFalse(acl.can_create_bot())
|
| + self.assertFalse(acl.can_edit_bot())
|
| + self.assertFalse(acl.can_delete_bot())
|
| + self.assertFalse(acl.can_view_bot())
|
| + self.assertFalse(acl.can_create_task())
|
| + self.assertFalse(acl.can_schedule_high_priority_tasks())
|
| + self.assertFalse(acl.can_edit_task(self._task_owned))
|
| + self.assertFalse(acl.can_edit_task(self._task_other))
|
| + self.assertFalse(acl.can_edit_all_tasks())
|
| + self.assertFalse(acl.can_view_task(self._task_owned))
|
| + self.assertFalse(acl.can_view_task(self._task_other))
|
| + self.assertFalse(acl.can_view_all_tasks())
|
| +
|
| + def test_instance_admin(self):
|
| auth_testing.mock_is_admin(self, True)
|
| -
|
| - def mock_auth_config(self, **kwargs):
|
| - cfg = config_pb2.SettingsCfg(auth=config_pb2.AuthSettings(**kwargs))
|
| - self.mock(config, '_get_settings', lambda: ('test_rev', cfg))
|
| -
|
| - def test_is_admin_app_admin(self):
|
| - self.add_to_admin()
|
| - self.assertTrue(acl.is_admin())
|
| - self.assertEqual(acl.get_user_type(), 'admin')
|
| -
|
| - def test_is_admin_not_app_admin(self):
|
| - self.assertFalse(acl.is_admin())
|
| - self.assertIsNone(acl.get_user_type())
|
| -
|
| - def test_is_admin_default_group(self):
|
| - self.add_to_group(ADMINS_GROUP)
|
| - self.assertTrue(acl.is_admin())
|
| - self.assertEqual(acl.get_user_type(), 'admin')
|
| -
|
| - def test_is_admin_custom_group(self):
|
| - self.mock_auth_config(admins_group='test_group')
|
| - self.add_to_group('test_group')
|
| - self.assertTrue(acl.is_admin())
|
| - self.assertEqual(acl.get_user_type(), 'admin')
|
| -
|
| - def test_is_privileged_user_admin(self):
|
| - self.add_to_admin()
|
| - self.assertTrue(acl.is_privileged_user())
|
| - self.assertEqual(acl.get_user_type(), 'admin')
|
| -
|
| - def test_is_privileged_user_default_group(self):
|
| - self.add_to_group(PRIVILEGED_USERS_GROUP)
|
| - self.assertTrue(acl.is_privileged_user())
|
| - self.assertEqual(acl.get_user_type(), 'admin')
|
| -
|
| - def test_is_privileged_user_custom_group(self):
|
| - self.mock_auth_config(privileged_users_group='test_group')
|
| - self.add_to_group('test_group')
|
| - self.assertTrue(acl.is_privileged_user())
|
| - self.assertEqual(acl.get_user_type(), 'privileged user')
|
| -
|
| - def test_is_privileged_user_wrong_group(self):
|
| - self.mock_auth_config(privileged_users_group='test_group')
|
| - self.add_to_group('wrong_test_group')
|
| - self.assertFalse(acl.is_privileged_user())
|
| - self.assertIsNone(acl.get_user_type())
|
| -
|
| - def test_is_user_privileged(self):
|
| - self.mock_auth_config(privileged_users_group='test_group')
|
| - self.add_to_group('test_group')
|
| - self.assertTrue(acl.is_user())
|
| - self.assertEqual(acl.get_user_type(), 'privileged user')
|
| -
|
| - def test_is_user_default_group(self):
|
| - self.add_to_group(USERS_GROUP)
|
| - self.assertTrue(acl.is_user())
|
| - self.assertEqual(acl.get_user_type(), 'admin')
|
| -
|
| - def test_is_user_custom_group(self):
|
| - self.mock_auth_config(users_group='test_group')
|
| - self.add_to_group('test_group')
|
| - self.assertTrue(acl.is_user())
|
| - self.assertEqual(acl.get_user_type(), 'user')
|
| -
|
| - def test_is_user_wrong_group(self):
|
| - self.mock_auth_config(users_group='test_group')
|
| - self.add_to_group('wrong_test_group')
|
| - self.assertFalse(acl.is_user())
|
| - self.assertIsNone(acl.get_user_type())
|
| -
|
| - def test_is_bootstrapper_admin(self):
|
| - self.add_to_admin()
|
| - self.assertTrue(acl.is_bootstrapper())
|
| - self.assertEqual(acl.get_user_type(), 'admin')
|
| -
|
| - def test_is_bootstrapper_default_group(self):
|
| - self.add_to_group(BOT_BOOTSTRAP_GROUP)
|
| - self.assertTrue(acl.is_bootstrapper())
|
| - self.assertEqual(acl.get_user_type(), 'admin')
|
| -
|
| - def test_is_bootstrapper_custom_group(self):
|
| - self.mock_auth_config(bot_bootstrap_group='test_group')
|
| - self.add_to_group('test_group')
|
| - self.assertTrue(acl.is_bootstrapper())
|
| - self.assertIsNone(acl.get_user_type())
|
| -
|
| - def test_is_bootstrapper_wrong_group(self):
|
| - self.mock_auth_config(privileged_users_group='test_wrong_group',
|
| - bot_bootstrap_group='test_correct_group')
|
| - self.add_to_group('test_wrong_group')
|
| - self.assertFalse(acl.is_bootstrapper())
|
| - self.assertEqual(acl.get_user_type(), 'privileged user')
|
| + self.assertFalse(acl.is_ip_whitelisted_machine())
|
| + self.assertTrue(acl.can_access())
|
| + self.assertTrue(acl.can_view_config())
|
| + self.assertTrue(acl.can_edit_config())
|
| + self.assertTrue(acl.can_create_bot())
|
| + self.assertTrue(acl.can_edit_bot())
|
| + self.assertTrue(acl.can_delete_bot())
|
| + self.assertTrue(acl.can_view_bot())
|
| + self.assertTrue(acl.can_create_task())
|
| + self.assertTrue(acl.can_schedule_high_priority_tasks())
|
| + self.assertTrue(acl.can_edit_task(self._task_owned))
|
| + self.assertTrue(acl.can_edit_task(self._task_other))
|
| + self.assertTrue(acl.can_edit_all_tasks())
|
| + self.assertTrue(acl.can_view_task(self._task_owned))
|
| + self.assertTrue(acl.can_view_task(self._task_other))
|
| + self.assertTrue(acl.can_view_all_tasks())
|
| +
|
| + def test_ip_whitelisted(self):
|
| + self.mock(auth, 'is_in_ip_whitelist', lambda _name, _ip, _warn: True)
|
| + self.assertTrue(acl.is_ip_whitelisted_machine())
|
| + self.assertTrue(acl.can_access())
|
| + self.assertFalse(acl.can_view_config())
|
| + self.assertFalse(acl.can_edit_config())
|
| + self.assertFalse(acl.can_create_bot())
|
| + self.assertTrue(acl.can_edit_bot())
|
| + self.assertTrue(acl.can_delete_bot())
|
| + self.assertTrue(acl.can_view_bot())
|
| + self.assertTrue(acl.can_create_task())
|
| + self.assertTrue(acl.can_schedule_high_priority_tasks())
|
| + self.assertTrue(acl.can_edit_task(self._task_owned))
|
| + self.assertTrue(acl.can_edit_task(self._task_other))
|
| + self.assertFalse(acl.can_edit_all_tasks())
|
| + self.assertTrue(acl.can_view_task(self._task_owned))
|
| + self.assertTrue(acl.can_view_task(self._task_other))
|
| + self.assertFalse(acl.can_view_all_tasks())
|
| +
|
| + def test_admins(self):
|
| + self._add_to_group('admins')
|
| + self.assertFalse(acl.is_ip_whitelisted_machine())
|
| + self.assertTrue(acl.can_access())
|
| + self.assertTrue(acl.can_view_config())
|
| + self.assertTrue(acl.can_edit_config())
|
| + self.assertTrue(acl.can_create_bot())
|
| + self.assertTrue(acl.can_edit_bot())
|
| + self.assertTrue(acl.can_delete_bot())
|
| + self.assertTrue(acl.can_view_bot())
|
| + self.assertTrue(acl.can_create_task())
|
| + self.assertTrue(acl.can_schedule_high_priority_tasks())
|
| + self.assertTrue(acl.can_edit_task(self._task_owned))
|
| + self.assertTrue(acl.can_edit_task(self._task_other))
|
| + self.assertTrue(acl.can_edit_all_tasks())
|
| + self.assertTrue(acl.can_view_task(self._task_owned))
|
| + self.assertTrue(acl.can_view_task(self._task_other))
|
| + self.assertTrue(acl.can_view_all_tasks())
|
| +
|
| + def test_bot_bootstrap(self):
|
| + self._add_to_group('bot_bootstrap')
|
| + self.assertFalse(acl.is_ip_whitelisted_machine())
|
| + self.assertFalse(acl.can_access())
|
| + self.assertFalse(acl.can_view_config())
|
| + self.assertFalse(acl.can_edit_config())
|
| + self.assertTrue(acl.can_create_bot())
|
| + self.assertFalse(acl.can_edit_bot())
|
| + self.assertFalse(acl.can_delete_bot())
|
| + self.assertFalse(acl.can_view_bot())
|
| + self.assertFalse(acl.can_create_task())
|
| + self.assertFalse(acl.can_schedule_high_priority_tasks())
|
| + self.assertTrue(acl.can_edit_task(self._task_owned))
|
| + self.assertFalse(acl.can_edit_task(self._task_other))
|
| + self.assertFalse(acl.can_edit_all_tasks())
|
| + self.assertTrue(acl.can_view_task(self._task_owned))
|
| + self.assertFalse(acl.can_view_task(self._task_other))
|
| + self.assertFalse(acl.can_view_all_tasks())
|
| +
|
| + def test_privileged_users(self):
|
| + self._add_to_group('privileged_users')
|
| + self.assertFalse(acl.is_ip_whitelisted_machine())
|
| + self.assertTrue(acl.can_access())
|
| + self.assertFalse(acl.can_view_config())
|
| + self.assertFalse(acl.can_edit_config())
|
| + self.assertFalse(acl.can_create_bot())
|
| + self.assertTrue(acl.can_edit_bot())
|
| + self.assertFalse(acl.can_delete_bot())
|
| + self.assertTrue(acl.can_view_bot())
|
| + self.assertTrue(acl.can_create_task())
|
| + self.assertTrue(acl.can_schedule_high_priority_tasks())
|
| + self.assertTrue(acl.can_edit_task(self._task_owned))
|
| + self.assertTrue(acl.can_edit_task(self._task_other))
|
| + self.assertFalse(acl.can_edit_all_tasks())
|
| + self.assertTrue(acl.can_view_task(self._task_owned))
|
| + self.assertTrue(acl.can_view_task(self._task_other))
|
| + self.assertTrue(acl.can_view_all_tasks())
|
| +
|
| + def test_users(self):
|
| + self._add_to_group('users')
|
| + self.assertFalse(acl.is_ip_whitelisted_machine())
|
| + self.assertTrue(acl.can_access())
|
| + self.assertFalse(acl.can_view_config())
|
| + self.assertFalse(acl.can_edit_config())
|
| + self.assertFalse(acl.can_create_bot())
|
| + self.assertFalse(acl.can_edit_bot())
|
| + self.assertFalse(acl.can_delete_bot())
|
| + self.assertFalse(acl.can_view_bot())
|
| + self.assertTrue(acl.can_create_task())
|
| + self.assertFalse(acl.can_schedule_high_priority_tasks())
|
| + self.assertTrue(acl.can_edit_task(self._task_owned))
|
| + self.assertFalse(acl.can_edit_task(self._task_other))
|
| + self.assertFalse(acl.can_edit_all_tasks())
|
| + self.assertTrue(acl.can_view_task(self._task_owned))
|
| + self.assertFalse(acl.can_view_task(self._task_other))
|
| + self.assertFalse(acl.can_view_all_tasks())
|
| +
|
| + def test_view_all_bots(self):
|
| + self._add_to_group('view_all_bots')
|
| + self.assertFalse(acl.is_ip_whitelisted_machine())
|
| + self.assertTrue(acl.can_access())
|
| + self.assertFalse(acl.can_view_config())
|
| + self.assertFalse(acl.can_edit_config())
|
| + self.assertFalse(acl.can_create_bot())
|
| + self.assertFalse(acl.can_edit_bot())
|
| + self.assertFalse(acl.can_delete_bot())
|
| + self.assertTrue(acl.can_view_bot())
|
| + self.assertFalse(acl.can_create_task())
|
| + self.assertFalse(acl.can_schedule_high_priority_tasks())
|
| + self.assertTrue(acl.can_edit_task(self._task_owned))
|
| + self.assertFalse(acl.can_edit_task(self._task_other))
|
| + self.assertFalse(acl.can_edit_all_tasks())
|
| + self.assertTrue(acl.can_view_task(self._task_owned))
|
| + self.assertFalse(acl.can_view_task(self._task_other))
|
| + self.assertFalse(acl.can_view_all_tasks())
|
| +
|
| + def test_view_all_tasks(self):
|
| + self._add_to_group('view_all_tasks')
|
| + self.assertFalse(acl.is_ip_whitelisted_machine())
|
| + self.assertTrue(acl.can_access())
|
| + self.assertFalse(acl.can_view_config())
|
| + self.assertFalse(acl.can_edit_config())
|
| + self.assertFalse(acl.can_create_bot())
|
| + self.assertFalse(acl.can_edit_bot())
|
| + self.assertFalse(acl.can_delete_bot())
|
| + self.assertFalse(acl.can_view_bot())
|
| + self.assertFalse(acl.can_create_task())
|
| + self.assertFalse(acl.can_schedule_high_priority_tasks())
|
| + self.assertTrue(acl.can_edit_task(self._task_owned))
|
| + self.assertFalse(acl.can_edit_task(self._task_other))
|
| + self.assertFalse(acl.can_edit_all_tasks())
|
| + self.assertTrue(acl.can_view_task(self._task_owned))
|
| + self.assertTrue(acl.can_view_task(self._task_other))
|
| + self.assertTrue(acl.can_view_all_tasks())
|
|
|
|
|
| if __name__ == '__main__':
|
|
|