OLD | NEW |
1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
2 # Copyright 2015 The LUCI Authors. All rights reserved. | 2 # Copyright 2015 The LUCI Authors. All rights reserved. |
3 # Use of this source code is governed under the Apache License, Version 2.0 | 3 # Use of this source code is governed under the Apache License, Version 2.0 |
4 # that can be found in the LICENSE file. | 4 # that can be found in the LICENSE file. |
5 | 5 |
6 import json | 6 import json |
7 import logging | 7 import logging |
8 import sys | 8 import sys |
9 import time | 9 import time |
10 import unittest | 10 import unittest |
11 | 11 |
12 import test_env | 12 import test_env |
13 test_env.setup_test_env() | 13 test_env.setup_test_env() |
14 | 14 |
15 import webapp2 | 15 import webapp2 |
16 import webtest | 16 import webtest |
17 | 17 |
| 18 from google.appengine.ext import ndb |
| 19 |
18 from components import auth | 20 from components import auth |
| 21 from components import utils |
19 from components.auth import delegation as auth_delegation | 22 from components.auth import delegation as auth_delegation |
20 from components.auth import handler | 23 from components.auth import handler |
21 from components.auth.proto import delegation_pb2 | 24 from components.auth.proto import delegation_pb2 |
22 from test_support import test_case | 25 from test_support import test_case |
23 | 26 |
24 from proto import config_pb2 | 27 from proto import config_pb2 |
25 import config | 28 import config |
26 import delegation | 29 import delegation |
27 | 30 |
28 | 31 |
29 def decode_token(token): | 32 def decode_token(token): |
30 return auth_delegation.unseal_token( | 33 return auth_delegation.unseal_token( |
31 auth_delegation.deserialize_token(token)) | 34 auth_delegation.deserialize_token(token)) |
32 | 35 |
33 | 36 |
34 class HandlersTest(test_case.TestCase): | 37 class HandlersTest(test_case.TestCase): |
35 def setUp(self): | 38 def setUp(self): |
36 super(HandlersTest, self).setUp() | 39 super(HandlersTest, self).setUp() |
37 self.app = webtest.TestApp( | 40 self.app = webtest.TestApp( |
38 webapp2.WSGIApplication(delegation.get_rest_api_routes(), debug=True), | 41 webapp2.WSGIApplication(delegation.get_rest_api_routes(), debug=True), |
39 extra_environ={'REMOTE_ADDR': '127.0.0.1'}) | 42 extra_environ={'REMOTE_ADDR': '127.1.2.3'}) |
40 # Don't bother with XSRF tokens in unit tests. | 43 # Don't bother with XSRF tokens in unit tests. |
41 self.mock( | 44 self.mock( |
42 delegation.CreateDelegationTokenHandler, 'xsrf_token_enforce_on', []) | 45 delegation.CreateDelegationTokenHandler, 'xsrf_token_enforce_on', []) |
43 # Simplify auth. | 46 # Simplify auth. |
44 def dumb_auth(req): | 47 def dumb_auth(req): |
45 return auth.Identity.from_bytes(req.headers['Mock-Peer-Id']) | 48 return auth.Identity.from_bytes(req.headers['Mock-Peer-Id']) |
46 self.mock( | 49 self.mock( |
47 handler.AuthenticatingHandler, 'get_auth_methods', | 50 handler.AuthenticatingHandler, 'get_auth_methods', |
48 classmethod(lambda *_: [dumb_auth])) | 51 classmethod(lambda *_: [dumb_auth])) |
49 | 52 |
(...skipping 14 matching lines...) Expand all Loading... |
64 | 67 |
65 subtokens = decode_token(resp.json_body['delegation_token']) | 68 subtokens = decode_token(resp.json_body['delegation_token']) |
66 self.assertEqual(1, len(subtokens.subtokens)) | 69 self.assertEqual(1, len(subtokens.subtokens)) |
67 t = subtokens.subtokens[0] | 70 t = subtokens.subtokens[0] |
68 self.assertEqual('user:a@a.com', t.issuer_id) | 71 self.assertEqual('user:a@a.com', t.issuer_id) |
69 self.assertTrue(t.creation_time >= time.time() - 30) | 72 self.assertTrue(t.creation_time >= time.time() - 30) |
70 self.assertEqual(3600, t.validity_duration) | 73 self.assertEqual(3600, t.validity_duration) |
71 self.assertFalse(t.audience) | 74 self.assertFalse(t.audience) |
72 self.assertEqual(t.services, ['*']) | 75 self.assertEqual(t.services, ['*']) |
73 self.assertFalse(t.HasField('impersonator_id')) | 76 self.assertFalse(t.HasField('impersonator_id')) |
| 77 self.assertTrue(t.subtoken_id is not None) |
| 78 |
| 79 # Entity is created. |
| 80 key = ndb.Key(delegation.AuthDelegationSubtoken, t.subtoken_id) |
| 81 ent = key.get() |
| 82 self.assertTrue(ent) |
| 83 self.assertTrue(ent.subtoken) |
| 84 self.assertEqual('127.1.2.3', ent.caller_ip) |
| 85 self.assertEqual('v1a', ent.auth_service_version) |
| 86 self.assertEqual('user:a@a.com', ent.issuer_id) |
| 87 self.assertEqual( |
| 88 t.creation_time*1e6, utils.datetime_to_timestamp(ent.creation_time)) |
| 89 self.assertEqual('', ent.impersonator_id) |
74 | 90 |
75 def test_with_impersonation(self): | 91 def test_with_impersonation(self): |
76 # This function is tested separately below. | 92 # This function is tested separately below. |
77 self.mock(delegation, 'check_can_create_token', lambda *_, **__: None) | 93 self.mock( |
| 94 delegation, 'check_can_create_token', |
| 95 lambda *_, **__: delegation.DEFAULT_RULE) |
78 | 96 |
79 resp = self.create_token({ | 97 resp = self.create_token({ |
80 'audience': ['user:b@a.com'], | 98 'audience': ['user:b@a.com'], |
81 'services': ['service:a'], | 99 'services': ['service:a'], |
82 'validity_duration': 12345, | 100 'validity_duration': 12345, |
83 'impersonate': 'user:c@a.com', | 101 'impersonate': 'user:c@a.com', |
84 }, 'user:a@a.com') | 102 }, 'user:a@a.com') |
85 self.assertEqual(resp.status_code, 201) | 103 self.assertEqual(resp.status_code, 201) |
86 self.assertEqual(12345, resp.json_body['validity_duration']) | 104 self.assertEqual(12345, resp.json_body['validity_duration']) |
87 | 105 |
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
140 | 158 |
141 class CheckCanCreateTokenTest(test_case.TestCase): | 159 class CheckCanCreateTokenTest(test_case.TestCase): |
142 def setUp(self): | 160 def setUp(self): |
143 super(CheckCanCreateTokenTest, self).setUp() | 161 super(CheckCanCreateTokenTest, self).setUp() |
144 self.rules = [] | 162 self.rules = [] |
145 self.mock( | 163 self.mock( |
146 config, 'get_delegation_config', | 164 config, 'get_delegation_config', |
147 lambda: config_pb2.DelegationConfig(rules=self.rules)) | 165 lambda: config_pb2.DelegationConfig(rules=self.rules)) |
148 | 166 |
149 def add_rule(self, **kwargs): | 167 def add_rule(self, **kwargs): |
150 self.rules.append(config_pb2.DelegationConfig.Rule(**kwargs)) | 168 r = config_pb2.DelegationConfig.Rule(**kwargs) |
| 169 self.rules.append(r) |
| 170 return r |
151 | 171 |
152 def test_get_delegation_rule(self): | 172 def test_get_delegation_rule(self): |
153 self.add_rule( | 173 self.add_rule( |
154 user_id=['service:a'], | 174 user_id=['service:a'], |
155 target_service=['service:b'], | 175 target_service=['service:b'], |
156 max_validity_duration=1) | 176 max_validity_duration=1) |
157 self.add_rule( | 177 self.add_rule( |
158 user_id=['service:a'], | 178 user_id=['service:a'], |
159 target_service=['*'], | 179 target_service=['*'], |
160 max_validity_duration=2) | 180 max_validity_duration=2) |
(...skipping 21 matching lines...) Expand all Loading... |
182 test(3, 'service:x', ['service:c']) | 202 test(3, 'service:x', ['service:c']) |
183 test(4, 'service:x', ['service:c', 'service:d']) | 203 test(4, 'service:x', ['service:c', 'service:d']) |
184 test(5, 'service:x', ['service:c', 'service:d', 'service:e']) | 204 test(5, 'service:x', ['service:c', 'service:d', 'service:e']) |
185 test(1, 'service:a', ['*']) | 205 test(1, 'service:a', ['*']) |
186 test(3, 'service:x', ['*']) | 206 test(3, 'service:x', ['*']) |
187 | 207 |
188 def make_subtoken(self, **kwargs): | 208 def make_subtoken(self, **kwargs): |
189 return delegation_pb2.Subtoken(**kwargs) | 209 return delegation_pb2.Subtoken(**kwargs) |
190 | 210 |
191 def test_validity_duration(self): | 211 def test_validity_duration(self): |
192 self.add_rule( | 212 rule = self.add_rule( |
193 user_id=['*'], target_service=['*'], max_validity_duration=300) | 213 user_id=['*'], target_service=['*'], max_validity_duration=300) |
194 | 214 |
195 tok = self.make_subtoken(issuer_id='user:a@a.com', validity_duration=300) | 215 tok = self.make_subtoken(issuer_id='user:a@a.com', validity_duration=300) |
196 delegation.check_can_create_token('user:a@a.com', tok) | 216 r = delegation.check_can_create_token('user:a@a.com', tok) |
| 217 self.assertEqual(rule, r) |
197 | 218 |
198 with self.assertRaises(auth.AuthorizationError): | 219 with self.assertRaises(auth.AuthorizationError): |
199 tok = self.make_subtoken(issuer_id='user:a@a.com', validity_duration=400) | 220 tok = self.make_subtoken(issuer_id='user:a@a.com', validity_duration=400) |
200 delegation.check_can_create_token('user:a@a.com', tok) | 221 delegation.check_can_create_token('user:a@a.com', tok) |
201 | 222 |
202 def test_impersonation_disallowed_by_default(self): | 223 def test_impersonation_disallowed_by_default(self): |
203 # Making delegation token. | 224 # Making delegation token. |
204 tok = self.make_subtoken(issuer_id='user:a@a.com', validity_duration=300) | 225 tok = self.make_subtoken(issuer_id='user:a@a.com', validity_duration=300) |
205 delegation.check_can_create_token('user:a@a.com', tok) | 226 r = delegation.check_can_create_token('user:a@a.com', tok) |
| 227 self.assertEqual(delegation.DEFAULT_RULE, r) |
206 | 228 |
207 # Making impersonation token. | 229 # Making impersonation token. |
208 with self.assertRaises(auth.AuthorizationError): | 230 with self.assertRaises(auth.AuthorizationError): |
209 tok = self.make_subtoken(issuer_id='user:a@a.com', validity_duration=300) | 231 tok = self.make_subtoken(issuer_id='user:a@a.com', validity_duration=300) |
210 delegation.check_can_create_token('user:not-a@a.com', tok) | 232 delegation.check_can_create_token('user:not-a@a.com', tok) |
211 | 233 |
212 def test_allowed_to_impersonate(self): | 234 def test_allowed_to_impersonate(self): |
213 self.add_rule( | 235 rule = self.add_rule( |
214 user_id=['user:a@a.com'], | 236 user_id=['user:a@a.com'], |
215 target_service=['*'], | 237 target_service=['*'], |
216 max_validity_duration=300, | 238 max_validity_duration=300, |
217 allowed_to_impersonate=[ | 239 allowed_to_impersonate=[ |
218 'user:directly@a.com', | 240 'user:directly@a.com', |
219 'user:*@viaglob.com', | 241 'user:*@viaglob.com', |
220 'group:via-group', | 242 'group:via-group', |
221 ]) | 243 ]) |
222 self.mock( | 244 self.mock( |
223 auth, 'is_group_member', | 245 auth, 'is_group_member', |
224 lambda g, m: g == 'via-group' and m.to_bytes() == 'user:in-group@a.com') | 246 lambda g, m: g == 'via-group' and m.to_bytes() == 'user:in-group@a.com') |
225 | 247 |
226 tok = self.make_subtoken( | 248 tok = self.make_subtoken( |
227 issuer_id='user:directly@a.com', validity_duration=300) | 249 issuer_id='user:directly@a.com', validity_duration=300) |
228 delegation.check_can_create_token('user:a@a.com', tok) | 250 r = delegation.check_can_create_token('user:a@a.com', tok) |
| 251 self.assertEqual(rule, r) |
229 | 252 |
230 tok = self.make_subtoken( | 253 tok = self.make_subtoken( |
231 issuer_id='user:someone@viaglob.com', validity_duration=300) | 254 issuer_id='user:someone@viaglob.com', validity_duration=300) |
232 delegation.check_can_create_token('user:a@a.com', tok) | 255 r = delegation.check_can_create_token('user:a@a.com', tok) |
| 256 self.assertEqual(rule, r) |
233 | 257 |
234 tok = self.make_subtoken( | 258 tok = self.make_subtoken( |
235 issuer_id='user:in-group@a.com', validity_duration=300) | 259 issuer_id='user:in-group@a.com', validity_duration=300) |
236 delegation.check_can_create_token('user:a@a.com', tok) | 260 r = delegation.check_can_create_token('user:a@a.com', tok) |
| 261 self.assertEqual(rule, r) |
237 | 262 |
238 # Trying to impersonate someone not allowed. | 263 # Trying to impersonate someone not allowed. |
239 with self.assertRaises(auth.AuthorizationError): | 264 with self.assertRaises(auth.AuthorizationError): |
240 tok = self.make_subtoken( | 265 tok = self.make_subtoken( |
241 issuer_id='user:unknown@a.com', validity_duration=300) | 266 issuer_id='user:unknown@a.com', validity_duration=300) |
242 delegation.check_can_create_token('user:a@a.com', tok) | 267 delegation.check_can_create_token('user:a@a.com', tok) |
243 | 268 |
244 | 269 |
245 if __name__ == '__main__': | 270 if __name__ == '__main__': |
246 if '-v' in sys.argv: | 271 if '-v' in sys.argv: |
247 unittest.TestCase.maxDiff = None | 272 unittest.TestCase.maxDiff = None |
248 logging.basicConfig(level=logging.DEBUG) | 273 logging.basicConfig(level=logging.DEBUG) |
249 else: | 274 else: |
250 logging.basicConfig(level=logging.FATAL) | 275 logging.basicConfig(level=logging.FATAL) |
251 unittest.main() | 276 unittest.main() |
OLD | NEW |