OLD | NEW |
1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
2 # Copyright 2015 The Swarming Authors. All rights reserved. | 2 # Copyright 2015 The Swarming Authors. All rights reserved. |
3 # Use of this source code is governed by the Apache v2.0 license that can be | 3 # Use of this source code is governed by the Apache v2.0 license that can be |
4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
5 | 5 |
6 import wsgiref.headers | |
7 | |
8 from test_env import future | 6 from test_env import future |
9 import test_env | 7 import test_env |
10 test_env.setup_test_env() | 8 test_env.setup_test_env() |
11 | 9 |
12 from test_support import test_case | 10 from test_support import test_case |
13 import mock | 11 import mock |
14 | 12 |
15 from components import auth | 13 from components import auth |
16 | 14 |
| 15 from proto import project_config_pb2 |
17 from proto import service_config_pb2 | 16 from proto import service_config_pb2 |
18 import acl | 17 import acl |
| 18 import projects |
19 import storage | 19 import storage |
20 | 20 |
21 | 21 |
22 class AclTestCase(test_case.TestCase): | 22 class AclTestCase(test_case.TestCase): |
23 def setUp(self): | 23 def setUp(self): |
24 super(AclTestCase, self).setUp() | 24 super(AclTestCase, self).setUp() |
| 25 self.mock(auth, 'get_current_identity', mock.Mock()) |
| 26 auth.get_current_identity.return_value = auth.Anonymous |
25 self.mock(auth, 'is_admin', lambda *_: False) | 27 self.mock(auth, 'is_admin', lambda *_: False) |
26 self.mock(auth, 'is_group_member', mock.Mock(return_value=False)) | 28 self.mock(auth, 'is_group_member', mock.Mock(return_value=False)) |
27 | 29 |
28 acl_cfg = service_config_pb2.AclCfg( | 30 acl_cfg = service_config_pb2.AclCfg( |
29 service_access_group='service-admins', | 31 service_access_group='service-admins', |
30 project_access_group='project-admins', | 32 project_access_group='project-admins', |
31 ) | 33 ) |
32 self.mock(storage, 'get_self_config_async', lambda *_: future(acl_cfg)) | 34 self.mock(storage, 'get_self_config_async', lambda *_: future(acl_cfg)) |
33 | 35 |
34 def test_admin_can_read_all(self): | 36 def test_admin_can_read_all(self): |
35 self.mock(auth, 'is_admin', mock.Mock(return_value=True)) | 37 self.mock(auth, 'is_admin', mock.Mock(return_value=True)) |
36 self.assertTrue(acl.can_read_config_set('services/swarming')) | 38 self.assertTrue(acl.can_read_config_set('services/swarming')) |
37 self.assertTrue(acl.can_read_config_set('projects/chromium')) | 39 self.assertTrue(acl.can_read_config_set('projects/chromium')) |
38 self.assertTrue(acl.can_read_project_list()) | 40 self.assertTrue(acl.has_project_access('chromium')) |
39 | 41 |
40 def test_can_read_service_config(self): | 42 def test_can_read_service_config(self): |
41 auth.is_group_member.return_value = True | 43 auth.is_group_member.return_value = True |
42 self.assertTrue(acl.can_read_config_set('services/swarming')) | 44 self.assertTrue(acl.can_read_config_set('services/swarming')) |
43 auth.is_group_member.access_called_once_with('service-admins') | 45 auth.is_group_member.access_called_once_with('service-admins') |
44 | 46 |
45 def test_can_read_service_config_header(self): | 47 def test_can_read_service_config_same_app(self): |
46 headers = wsgiref.headers.Headers([ | 48 self.mock(auth, 'get_current_identity', mock.Mock()) |
47 ('X-Appengine-Inbound-Appid', 'swarming'), | 49 auth.get_current_identity.return_value = auth.Identity( |
48 ]) | 50 'user', 'swarming@appspot.gserviceaccount.com') |
49 self.assertTrue( | 51 self.assertTrue( |
50 acl.can_read_config_set('services/swarming', headers=headers)) | 52 acl.can_read_config_set('services/swarming')) |
51 | 53 |
52 def test_can_read_service_config_no_access(self): | 54 def test_can_read_service_config_no_access(self): |
53 self.assertFalse(acl.can_read_config_set('services/swarming')) | 55 self.assertFalse(acl.can_read_config_set('services/swarming')) |
54 | 56 |
55 def test_can_read_project_config(self): | 57 def test_has_project_access(self): |
56 auth.is_group_member.return_value = True | 58 self.mock(projects, 'get_metadata', mock.Mock()) |
57 self.assertTrue(acl.can_read_config_set('projects/swarming')) | 59 projects.get_metadata.return_value = project_config_pb2.ProjectCfg( |
58 auth.is_group_member.access_called_once_with('project-admins') | 60 access='googlers' |
| 61 ) |
| 62 |
| 63 self.assertFalse(acl.can_read_config_set('projects/secret')) |
| 64 |
| 65 auth.is_group_member.side_effect = lambda name: name == 'googlers' |
| 66 self.assertTrue(acl.can_read_config_set('projects/secret')) |
| 67 |
| 68 auth.is_group_member.side_effect = lambda name: name == 'project-admins' |
| 69 self.assertTrue(acl.can_read_config_set('projects/secret')) |
59 | 70 |
60 def test_can_read_project_config_no_access(self): | 71 def test_can_read_project_config_no_access(self): |
61 self.assertFalse(acl.can_read_config_set('projects/swarming')) | 72 self.assertFalse(acl.has_project_access('projects/swarming')) |
62 self.assertFalse(acl.can_read_config_set('projects/swarming/refs/heads/x')) | 73 self.assertFalse(acl.can_read_config_set('projects/swarming/refs/heads/x')) |
63 | 74 |
64 def test_malformed_config_set(self): | 75 def test_malformed_config_set(self): |
65 with self.assertRaises(ValueError): | 76 with self.assertRaises(ValueError): |
66 acl.can_read_config_set('invalid config set') | 77 acl.can_read_config_set('invalid config set') |
67 | 78 |
68 | 79 |
69 if __name__ == '__main__': | 80 if __name__ == '__main__': |
70 test_env.main() | 81 test_env.main() |
OLD | NEW |