Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(5)

Side by Side Diff: appengine/auth_service/delegation_test.py

Issue 2164733003: auth: Keep audit log of all generated delegation tokens. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-py@master
Patch Set: auth: Keep audit log of all generates delegation tokens. Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 # Copyright 2015 The LUCI Authors. All rights reserved. 2 # Copyright 2015 The LUCI Authors. All rights reserved.
3 # Use of this source code is governed under the Apache License, Version 2.0 3 # Use of this source code is governed under the Apache License, Version 2.0
4 # that can be found in the LICENSE file. 4 # that can be found in the LICENSE file.
5 5
6 import json 6 import json
7 import logging 7 import logging
8 import sys 8 import sys
9 import time 9 import time
10 import unittest 10 import unittest
11 11
12 import test_env 12 import test_env
13 test_env.setup_test_env() 13 test_env.setup_test_env()
14 14
15 import webapp2 15 import webapp2
16 import webtest 16 import webtest
17 17
18 from google.appengine.ext import ndb
19
18 from components import auth 20 from components import auth
21 from components import utils
19 from components.auth import delegation as auth_delegation 22 from components.auth import delegation as auth_delegation
20 from components.auth import handler 23 from components.auth import handler
21 from components.auth.proto import delegation_pb2 24 from components.auth.proto import delegation_pb2
22 from test_support import test_case 25 from test_support import test_case
23 26
24 from proto import config_pb2 27 from proto import config_pb2
25 import config 28 import config
26 import delegation 29 import delegation
27 30
28 31
29 def decode_token(token): 32 def decode_token(token):
30 return auth_delegation.unseal_token( 33 return auth_delegation.unseal_token(
31 auth_delegation.deserialize_token(token)) 34 auth_delegation.deserialize_token(token))
32 35
33 36
34 class HandlersTest(test_case.TestCase): 37 class HandlersTest(test_case.TestCase):
35 def setUp(self): 38 def setUp(self):
36 super(HandlersTest, self).setUp() 39 super(HandlersTest, self).setUp()
37 self.app = webtest.TestApp( 40 self.app = webtest.TestApp(
38 webapp2.WSGIApplication(delegation.get_rest_api_routes(), debug=True), 41 webapp2.WSGIApplication(delegation.get_rest_api_routes(), debug=True),
39 extra_environ={'REMOTE_ADDR': '127.0.0.1'}) 42 extra_environ={'REMOTE_ADDR': '127.1.2.3'})
40 # Don't bother with XSRF tokens in unit tests. 43 # Don't bother with XSRF tokens in unit tests.
41 self.mock( 44 self.mock(
42 delegation.CreateDelegationTokenHandler, 'xsrf_token_enforce_on', []) 45 delegation.CreateDelegationTokenHandler, 'xsrf_token_enforce_on', [])
43 # Simplify auth. 46 # Simplify auth.
44 def dumb_auth(req): 47 def dumb_auth(req):
45 return auth.Identity.from_bytes(req.headers['Mock-Peer-Id']) 48 return auth.Identity.from_bytes(req.headers['Mock-Peer-Id'])
46 self.mock( 49 self.mock(
47 handler.AuthenticatingHandler, 'get_auth_methods', 50 handler.AuthenticatingHandler, 'get_auth_methods',
48 classmethod(lambda *_: [dumb_auth])) 51 classmethod(lambda *_: [dumb_auth]))
49 52
(...skipping 14 matching lines...) Expand all
64 67
65 subtokens = decode_token(resp.json_body['delegation_token']) 68 subtokens = decode_token(resp.json_body['delegation_token'])
66 self.assertEqual(1, len(subtokens.subtokens)) 69 self.assertEqual(1, len(subtokens.subtokens))
67 t = subtokens.subtokens[0] 70 t = subtokens.subtokens[0]
68 self.assertEqual('user:a@a.com', t.issuer_id) 71 self.assertEqual('user:a@a.com', t.issuer_id)
69 self.assertTrue(t.creation_time >= time.time() - 30) 72 self.assertTrue(t.creation_time >= time.time() - 30)
70 self.assertEqual(3600, t.validity_duration) 73 self.assertEqual(3600, t.validity_duration)
71 self.assertFalse(t.audience) 74 self.assertFalse(t.audience)
72 self.assertEqual(t.services, ['*']) 75 self.assertEqual(t.services, ['*'])
73 self.assertFalse(t.HasField('impersonator_id')) 76 self.assertFalse(t.HasField('impersonator_id'))
77 self.assertTrue(t.subtoken_id is not None)
78
79 # Entity is created.
80 key = ndb.Key(delegation.AuthDelegationSubtoken, t.subtoken_id)
81 ent = key.get()
82 self.assertTrue(ent)
83 self.assertTrue(ent.subtoken)
84 self.assertEqual('127.1.2.3', ent.caller_ip)
85 self.assertEqual('v1a', ent.auth_service_version)
86 self.assertEqual('user:a@a.com', ent.issuer_id)
87 self.assertEqual(
88 t.creation_time*1e6, utils.datetime_to_timestamp(ent.creation_time))
89 self.assertEqual('', ent.impersonator_id)
74 90
75 def test_with_impersonation(self): 91 def test_with_impersonation(self):
76 # This function is tested separately below. 92 # This function is tested separately below.
77 self.mock(delegation, 'check_can_create_token', lambda *_, **__: None) 93 self.mock(
94 delegation, 'check_can_create_token',
95 lambda *_, **__: delegation.DEFAULT_RULE)
78 96
79 resp = self.create_token({ 97 resp = self.create_token({
80 'audience': ['user:b@a.com'], 98 'audience': ['user:b@a.com'],
81 'services': ['service:a'], 99 'services': ['service:a'],
82 'validity_duration': 12345, 100 'validity_duration': 12345,
83 'impersonate': 'user:c@a.com', 101 'impersonate': 'user:c@a.com',
84 }, 'user:a@a.com') 102 }, 'user:a@a.com')
85 self.assertEqual(resp.status_code, 201) 103 self.assertEqual(resp.status_code, 201)
86 self.assertEqual(12345, resp.json_body['validity_duration']) 104 self.assertEqual(12345, resp.json_body['validity_duration'])
87 105
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after
140 158
141 class CheckCanCreateTokenTest(test_case.TestCase): 159 class CheckCanCreateTokenTest(test_case.TestCase):
142 def setUp(self): 160 def setUp(self):
143 super(CheckCanCreateTokenTest, self).setUp() 161 super(CheckCanCreateTokenTest, self).setUp()
144 self.rules = [] 162 self.rules = []
145 self.mock( 163 self.mock(
146 config, 'get_delegation_config', 164 config, 'get_delegation_config',
147 lambda: config_pb2.DelegationConfig(rules=self.rules)) 165 lambda: config_pb2.DelegationConfig(rules=self.rules))
148 166
149 def add_rule(self, **kwargs): 167 def add_rule(self, **kwargs):
150 self.rules.append(config_pb2.DelegationConfig.Rule(**kwargs)) 168 r = config_pb2.DelegationConfig.Rule(**kwargs)
169 self.rules.append(r)
170 return r
151 171
152 def test_get_delegation_rule(self): 172 def test_get_delegation_rule(self):
153 self.add_rule( 173 self.add_rule(
154 user_id=['service:a'], 174 user_id=['service:a'],
155 target_service=['service:b'], 175 target_service=['service:b'],
156 max_validity_duration=1) 176 max_validity_duration=1)
157 self.add_rule( 177 self.add_rule(
158 user_id=['service:a'], 178 user_id=['service:a'],
159 target_service=['*'], 179 target_service=['*'],
160 max_validity_duration=2) 180 max_validity_duration=2)
(...skipping 21 matching lines...) Expand all
182 test(3, 'service:x', ['service:c']) 202 test(3, 'service:x', ['service:c'])
183 test(4, 'service:x', ['service:c', 'service:d']) 203 test(4, 'service:x', ['service:c', 'service:d'])
184 test(5, 'service:x', ['service:c', 'service:d', 'service:e']) 204 test(5, 'service:x', ['service:c', 'service:d', 'service:e'])
185 test(1, 'service:a', ['*']) 205 test(1, 'service:a', ['*'])
186 test(3, 'service:x', ['*']) 206 test(3, 'service:x', ['*'])
187 207
188 def make_subtoken(self, **kwargs): 208 def make_subtoken(self, **kwargs):
189 return delegation_pb2.Subtoken(**kwargs) 209 return delegation_pb2.Subtoken(**kwargs)
190 210
191 def test_validity_duration(self): 211 def test_validity_duration(self):
192 self.add_rule( 212 rule = self.add_rule(
193 user_id=['*'], target_service=['*'], max_validity_duration=300) 213 user_id=['*'], target_service=['*'], max_validity_duration=300)
194 214
195 tok = self.make_subtoken(issuer_id='user:a@a.com', validity_duration=300) 215 tok = self.make_subtoken(issuer_id='user:a@a.com', validity_duration=300)
196 delegation.check_can_create_token('user:a@a.com', tok) 216 r = delegation.check_can_create_token('user:a@a.com', tok)
217 self.assertEqual(rule, r)
197 218
198 with self.assertRaises(auth.AuthorizationError): 219 with self.assertRaises(auth.AuthorizationError):
199 tok = self.make_subtoken(issuer_id='user:a@a.com', validity_duration=400) 220 tok = self.make_subtoken(issuer_id='user:a@a.com', validity_duration=400)
200 delegation.check_can_create_token('user:a@a.com', tok) 221 delegation.check_can_create_token('user:a@a.com', tok)
201 222
202 def test_impersonation_disallowed_by_default(self): 223 def test_impersonation_disallowed_by_default(self):
203 # Making delegation token. 224 # Making delegation token.
204 tok = self.make_subtoken(issuer_id='user:a@a.com', validity_duration=300) 225 tok = self.make_subtoken(issuer_id='user:a@a.com', validity_duration=300)
205 delegation.check_can_create_token('user:a@a.com', tok) 226 r = delegation.check_can_create_token('user:a@a.com', tok)
227 self.assertEqual(delegation.DEFAULT_RULE, r)
206 228
207 # Making impersonation token. 229 # Making impersonation token.
208 with self.assertRaises(auth.AuthorizationError): 230 with self.assertRaises(auth.AuthorizationError):
209 tok = self.make_subtoken(issuer_id='user:a@a.com', validity_duration=300) 231 tok = self.make_subtoken(issuer_id='user:a@a.com', validity_duration=300)
210 delegation.check_can_create_token('user:not-a@a.com', tok) 232 delegation.check_can_create_token('user:not-a@a.com', tok)
211 233
212 def test_allowed_to_impersonate(self): 234 def test_allowed_to_impersonate(self):
213 self.add_rule( 235 rule = self.add_rule(
214 user_id=['user:a@a.com'], 236 user_id=['user:a@a.com'],
215 target_service=['*'], 237 target_service=['*'],
216 max_validity_duration=300, 238 max_validity_duration=300,
217 allowed_to_impersonate=[ 239 allowed_to_impersonate=[
218 'user:directly@a.com', 240 'user:directly@a.com',
219 'user:*@viaglob.com', 241 'user:*@viaglob.com',
220 'group:via-group', 242 'group:via-group',
221 ]) 243 ])
222 self.mock( 244 self.mock(
223 auth, 'is_group_member', 245 auth, 'is_group_member',
224 lambda g, m: g == 'via-group' and m.to_bytes() == 'user:in-group@a.com') 246 lambda g, m: g == 'via-group' and m.to_bytes() == 'user:in-group@a.com')
225 247
226 tok = self.make_subtoken( 248 tok = self.make_subtoken(
227 issuer_id='user:directly@a.com', validity_duration=300) 249 issuer_id='user:directly@a.com', validity_duration=300)
228 delegation.check_can_create_token('user:a@a.com', tok) 250 r = delegation.check_can_create_token('user:a@a.com', tok)
251 self.assertEqual(rule, r)
229 252
230 tok = self.make_subtoken( 253 tok = self.make_subtoken(
231 issuer_id='user:someone@viaglob.com', validity_duration=300) 254 issuer_id='user:someone@viaglob.com', validity_duration=300)
232 delegation.check_can_create_token('user:a@a.com', tok) 255 r = delegation.check_can_create_token('user:a@a.com', tok)
256 self.assertEqual(rule, r)
233 257
234 tok = self.make_subtoken( 258 tok = self.make_subtoken(
235 issuer_id='user:in-group@a.com', validity_duration=300) 259 issuer_id='user:in-group@a.com', validity_duration=300)
236 delegation.check_can_create_token('user:a@a.com', tok) 260 r = delegation.check_can_create_token('user:a@a.com', tok)
261 self.assertEqual(rule, r)
237 262
238 # Trying to impersonate someone not allowed. 263 # Trying to impersonate someone not allowed.
239 with self.assertRaises(auth.AuthorizationError): 264 with self.assertRaises(auth.AuthorizationError):
240 tok = self.make_subtoken( 265 tok = self.make_subtoken(
241 issuer_id='user:unknown@a.com', validity_duration=300) 266 issuer_id='user:unknown@a.com', validity_duration=300)
242 delegation.check_can_create_token('user:a@a.com', tok) 267 delegation.check_can_create_token('user:a@a.com', tok)
243 268
244 269
245 if __name__ == '__main__': 270 if __name__ == '__main__':
246 if '-v' in sys.argv: 271 if '-v' in sys.argv:
247 unittest.TestCase.maxDiff = None 272 unittest.TestCase.maxDiff = None
248 logging.basicConfig(level=logging.DEBUG) 273 logging.basicConfig(level=logging.DEBUG)
249 else: 274 else:
250 logging.basicConfig(level=logging.FATAL) 275 logging.basicConfig(level=logging.FATAL)
251 unittest.main() 276 unittest.main()
OLDNEW
« no previous file with comments | « appengine/auth_service/delegation.py ('k') | appengine/components/components/auth/proto/delegation.proto » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698