| OLD | NEW |
| 1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
| 2 # Copyright 2015 The LUCI Authors. All rights reserved. | 2 # Copyright 2015 The LUCI Authors. All rights reserved. |
| 3 # Use of this source code is governed under the Apache License, Version 2.0 | 3 # Use of this source code is governed under the Apache License, Version 2.0 |
| 4 # that can be found in the LICENSE file. | 4 # that can be found in the LICENSE file. |
| 5 | 5 |
| 6 import json | 6 import json |
| 7 import logging | 7 import logging |
| 8 import sys | 8 import sys |
| 9 import time | 9 import time |
| 10 import unittest | 10 import unittest |
| 11 | 11 |
| 12 import test_env | 12 import test_env |
| 13 test_env.setup_test_env() | 13 test_env.setup_test_env() |
| 14 | 14 |
| 15 import webapp2 | 15 import webapp2 |
| 16 import webtest | 16 import webtest |
| 17 | 17 |
| 18 from google.appengine.ext import ndb |
| 19 |
| 18 from components import auth | 20 from components import auth |
| 21 from components import utils |
| 19 from components.auth import delegation as auth_delegation | 22 from components.auth import delegation as auth_delegation |
| 20 from components.auth import handler | 23 from components.auth import handler |
| 21 from components.auth.proto import delegation_pb2 | 24 from components.auth.proto import delegation_pb2 |
| 22 from test_support import test_case | 25 from test_support import test_case |
| 23 | 26 |
| 24 from proto import config_pb2 | 27 from proto import config_pb2 |
| 25 import config | 28 import config |
| 26 import delegation | 29 import delegation |
| 27 | 30 |
| 28 | 31 |
| 29 def decode_token(token): | 32 def decode_token(token): |
| 30 return auth_delegation.unseal_token( | 33 return auth_delegation.unseal_token( |
| 31 auth_delegation.deserialize_token(token)) | 34 auth_delegation.deserialize_token(token)) |
| 32 | 35 |
| 33 | 36 |
| 34 class HandlersTest(test_case.TestCase): | 37 class HandlersTest(test_case.TestCase): |
| 35 def setUp(self): | 38 def setUp(self): |
| 36 super(HandlersTest, self).setUp() | 39 super(HandlersTest, self).setUp() |
| 37 self.app = webtest.TestApp( | 40 self.app = webtest.TestApp( |
| 38 webapp2.WSGIApplication(delegation.get_rest_api_routes(), debug=True), | 41 webapp2.WSGIApplication(delegation.get_rest_api_routes(), debug=True), |
| 39 extra_environ={'REMOTE_ADDR': '127.0.0.1'}) | 42 extra_environ={'REMOTE_ADDR': '127.1.2.3'}) |
| 40 # Don't bother with XSRF tokens in unit tests. | 43 # Don't bother with XSRF tokens in unit tests. |
| 41 self.mock( | 44 self.mock( |
| 42 delegation.CreateDelegationTokenHandler, 'xsrf_token_enforce_on', []) | 45 delegation.CreateDelegationTokenHandler, 'xsrf_token_enforce_on', []) |
| 43 # Simplify auth. | 46 # Simplify auth. |
| 44 def dumb_auth(req): | 47 def dumb_auth(req): |
| 45 return auth.Identity.from_bytes(req.headers['Mock-Peer-Id']) | 48 return auth.Identity.from_bytes(req.headers['Mock-Peer-Id']) |
| 46 self.mock( | 49 self.mock( |
| 47 handler.AuthenticatingHandler, 'get_auth_methods', | 50 handler.AuthenticatingHandler, 'get_auth_methods', |
| 48 classmethod(lambda *_: [dumb_auth])) | 51 classmethod(lambda *_: [dumb_auth])) |
| 49 | 52 |
| (...skipping 14 matching lines...) Expand all Loading... |
| 64 | 67 |
| 65 subtokens = decode_token(resp.json_body['delegation_token']) | 68 subtokens = decode_token(resp.json_body['delegation_token']) |
| 66 self.assertEqual(1, len(subtokens.subtokens)) | 69 self.assertEqual(1, len(subtokens.subtokens)) |
| 67 t = subtokens.subtokens[0] | 70 t = subtokens.subtokens[0] |
| 68 self.assertEqual('user:a@a.com', t.issuer_id) | 71 self.assertEqual('user:a@a.com', t.issuer_id) |
| 69 self.assertTrue(t.creation_time >= time.time() - 30) | 72 self.assertTrue(t.creation_time >= time.time() - 30) |
| 70 self.assertEqual(3600, t.validity_duration) | 73 self.assertEqual(3600, t.validity_duration) |
| 71 self.assertFalse(t.audience) | 74 self.assertFalse(t.audience) |
| 72 self.assertEqual(t.services, ['*']) | 75 self.assertEqual(t.services, ['*']) |
| 73 self.assertFalse(t.HasField('impersonator_id')) | 76 self.assertFalse(t.HasField('impersonator_id')) |
| 77 self.assertTrue(t.subtoken_id is not None) |
| 78 |
| 79 # Entity is created. |
| 80 key = ndb.Key(delegation.AuthDelegationSubtoken, t.subtoken_id) |
| 81 ent = key.get() |
| 82 self.assertTrue(ent) |
| 83 self.assertTrue(ent.subtoken) |
| 84 self.assertEqual('127.1.2.3', ent.caller_ip) |
| 85 self.assertEqual('v1a', ent.auth_service_version) |
| 86 self.assertEqual('user:a@a.com', ent.issuer_id) |
| 87 self.assertEqual( |
| 88 t.creation_time*1e6, utils.datetime_to_timestamp(ent.creation_time)) |
| 89 self.assertEqual('', ent.impersonator_id) |
| 74 | 90 |
| 75 def test_with_impersonation(self): | 91 def test_with_impersonation(self): |
| 76 # This function is tested separately below. | 92 # This function is tested separately below. |
| 77 self.mock(delegation, 'check_can_create_token', lambda *_, **__: None) | 93 self.mock( |
| 94 delegation, 'check_can_create_token', |
| 95 lambda *_, **__: delegation.DEFAULT_RULE) |
| 78 | 96 |
| 79 resp = self.create_token({ | 97 resp = self.create_token({ |
| 80 'audience': ['user:b@a.com'], | 98 'audience': ['user:b@a.com'], |
| 81 'services': ['service:a'], | 99 'services': ['service:a'], |
| 82 'validity_duration': 12345, | 100 'validity_duration': 12345, |
| 83 'impersonate': 'user:c@a.com', | 101 'impersonate': 'user:c@a.com', |
| 84 }, 'user:a@a.com') | 102 }, 'user:a@a.com') |
| 85 self.assertEqual(resp.status_code, 201) | 103 self.assertEqual(resp.status_code, 201) |
| 86 self.assertEqual(12345, resp.json_body['validity_duration']) | 104 self.assertEqual(12345, resp.json_body['validity_duration']) |
| 87 | 105 |
| (...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 140 | 158 |
| 141 class CheckCanCreateTokenTest(test_case.TestCase): | 159 class CheckCanCreateTokenTest(test_case.TestCase): |
| 142 def setUp(self): | 160 def setUp(self): |
| 143 super(CheckCanCreateTokenTest, self).setUp() | 161 super(CheckCanCreateTokenTest, self).setUp() |
| 144 self.rules = [] | 162 self.rules = [] |
| 145 self.mock( | 163 self.mock( |
| 146 config, 'get_delegation_config', | 164 config, 'get_delegation_config', |
| 147 lambda: config_pb2.DelegationConfig(rules=self.rules)) | 165 lambda: config_pb2.DelegationConfig(rules=self.rules)) |
| 148 | 166 |
| 149 def add_rule(self, **kwargs): | 167 def add_rule(self, **kwargs): |
| 150 self.rules.append(config_pb2.DelegationConfig.Rule(**kwargs)) | 168 r = config_pb2.DelegationConfig.Rule(**kwargs) |
| 169 self.rules.append(r) |
| 170 return r |
| 151 | 171 |
| 152 def test_get_delegation_rule(self): | 172 def test_get_delegation_rule(self): |
| 153 self.add_rule( | 173 self.add_rule( |
| 154 user_id=['service:a'], | 174 user_id=['service:a'], |
| 155 target_service=['service:b'], | 175 target_service=['service:b'], |
| 156 max_validity_duration=1) | 176 max_validity_duration=1) |
| 157 self.add_rule( | 177 self.add_rule( |
| 158 user_id=['service:a'], | 178 user_id=['service:a'], |
| 159 target_service=['*'], | 179 target_service=['*'], |
| 160 max_validity_duration=2) | 180 max_validity_duration=2) |
| (...skipping 21 matching lines...) Expand all Loading... |
| 182 test(3, 'service:x', ['service:c']) | 202 test(3, 'service:x', ['service:c']) |
| 183 test(4, 'service:x', ['service:c', 'service:d']) | 203 test(4, 'service:x', ['service:c', 'service:d']) |
| 184 test(5, 'service:x', ['service:c', 'service:d', 'service:e']) | 204 test(5, 'service:x', ['service:c', 'service:d', 'service:e']) |
| 185 test(1, 'service:a', ['*']) | 205 test(1, 'service:a', ['*']) |
| 186 test(3, 'service:x', ['*']) | 206 test(3, 'service:x', ['*']) |
| 187 | 207 |
| 188 def make_subtoken(self, **kwargs): | 208 def make_subtoken(self, **kwargs): |
| 189 return delegation_pb2.Subtoken(**kwargs) | 209 return delegation_pb2.Subtoken(**kwargs) |
| 190 | 210 |
| 191 def test_validity_duration(self): | 211 def test_validity_duration(self): |
| 192 self.add_rule( | 212 rule = self.add_rule( |
| 193 user_id=['*'], target_service=['*'], max_validity_duration=300) | 213 user_id=['*'], target_service=['*'], max_validity_duration=300) |
| 194 | 214 |
| 195 tok = self.make_subtoken(issuer_id='user:a@a.com', validity_duration=300) | 215 tok = self.make_subtoken(issuer_id='user:a@a.com', validity_duration=300) |
| 196 delegation.check_can_create_token('user:a@a.com', tok) | 216 r = delegation.check_can_create_token('user:a@a.com', tok) |
| 217 self.assertEqual(rule, r) |
| 197 | 218 |
| 198 with self.assertRaises(auth.AuthorizationError): | 219 with self.assertRaises(auth.AuthorizationError): |
| 199 tok = self.make_subtoken(issuer_id='user:a@a.com', validity_duration=400) | 220 tok = self.make_subtoken(issuer_id='user:a@a.com', validity_duration=400) |
| 200 delegation.check_can_create_token('user:a@a.com', tok) | 221 delegation.check_can_create_token('user:a@a.com', tok) |
| 201 | 222 |
| 202 def test_impersonation_disallowed_by_default(self): | 223 def test_impersonation_disallowed_by_default(self): |
| 203 # Making delegation token. | 224 # Making delegation token. |
| 204 tok = self.make_subtoken(issuer_id='user:a@a.com', validity_duration=300) | 225 tok = self.make_subtoken(issuer_id='user:a@a.com', validity_duration=300) |
| 205 delegation.check_can_create_token('user:a@a.com', tok) | 226 r = delegation.check_can_create_token('user:a@a.com', tok) |
| 227 self.assertEqual(delegation.DEFAULT_RULE, r) |
| 206 | 228 |
| 207 # Making impersonation token. | 229 # Making impersonation token. |
| 208 with self.assertRaises(auth.AuthorizationError): | 230 with self.assertRaises(auth.AuthorizationError): |
| 209 tok = self.make_subtoken(issuer_id='user:a@a.com', validity_duration=300) | 231 tok = self.make_subtoken(issuer_id='user:a@a.com', validity_duration=300) |
| 210 delegation.check_can_create_token('user:not-a@a.com', tok) | 232 delegation.check_can_create_token('user:not-a@a.com', tok) |
| 211 | 233 |
| 212 def test_allowed_to_impersonate(self): | 234 def test_allowed_to_impersonate(self): |
| 213 self.add_rule( | 235 rule = self.add_rule( |
| 214 user_id=['user:a@a.com'], | 236 user_id=['user:a@a.com'], |
| 215 target_service=['*'], | 237 target_service=['*'], |
| 216 max_validity_duration=300, | 238 max_validity_duration=300, |
| 217 allowed_to_impersonate=[ | 239 allowed_to_impersonate=[ |
| 218 'user:directly@a.com', | 240 'user:directly@a.com', |
| 219 'user:*@viaglob.com', | 241 'user:*@viaglob.com', |
| 220 'group:via-group', | 242 'group:via-group', |
| 221 ]) | 243 ]) |
| 222 self.mock( | 244 self.mock( |
| 223 auth, 'is_group_member', | 245 auth, 'is_group_member', |
| 224 lambda g, m: g == 'via-group' and m.to_bytes() == 'user:in-group@a.com') | 246 lambda g, m: g == 'via-group' and m.to_bytes() == 'user:in-group@a.com') |
| 225 | 247 |
| 226 tok = self.make_subtoken( | 248 tok = self.make_subtoken( |
| 227 issuer_id='user:directly@a.com', validity_duration=300) | 249 issuer_id='user:directly@a.com', validity_duration=300) |
| 228 delegation.check_can_create_token('user:a@a.com', tok) | 250 r = delegation.check_can_create_token('user:a@a.com', tok) |
| 251 self.assertEqual(rule, r) |
| 229 | 252 |
| 230 tok = self.make_subtoken( | 253 tok = self.make_subtoken( |
| 231 issuer_id='user:someone@viaglob.com', validity_duration=300) | 254 issuer_id='user:someone@viaglob.com', validity_duration=300) |
| 232 delegation.check_can_create_token('user:a@a.com', tok) | 255 r = delegation.check_can_create_token('user:a@a.com', tok) |
| 256 self.assertEqual(rule, r) |
| 233 | 257 |
| 234 tok = self.make_subtoken( | 258 tok = self.make_subtoken( |
| 235 issuer_id='user:in-group@a.com', validity_duration=300) | 259 issuer_id='user:in-group@a.com', validity_duration=300) |
| 236 delegation.check_can_create_token('user:a@a.com', tok) | 260 r = delegation.check_can_create_token('user:a@a.com', tok) |
| 261 self.assertEqual(rule, r) |
| 237 | 262 |
| 238 # Trying to impersonate someone not allowed. | 263 # Trying to impersonate someone not allowed. |
| 239 with self.assertRaises(auth.AuthorizationError): | 264 with self.assertRaises(auth.AuthorizationError): |
| 240 tok = self.make_subtoken( | 265 tok = self.make_subtoken( |
| 241 issuer_id='user:unknown@a.com', validity_duration=300) | 266 issuer_id='user:unknown@a.com', validity_duration=300) |
| 242 delegation.check_can_create_token('user:a@a.com', tok) | 267 delegation.check_can_create_token('user:a@a.com', tok) |
| 243 | 268 |
| 244 | 269 |
| 245 if __name__ == '__main__': | 270 if __name__ == '__main__': |
| 246 if '-v' in sys.argv: | 271 if '-v' in sys.argv: |
| 247 unittest.TestCase.maxDiff = None | 272 unittest.TestCase.maxDiff = None |
| 248 logging.basicConfig(level=logging.DEBUG) | 273 logging.basicConfig(level=logging.DEBUG) |
| 249 else: | 274 else: |
| 250 logging.basicConfig(level=logging.FATAL) | 275 logging.basicConfig(level=logging.FATAL) |
| 251 unittest.main() | 276 unittest.main() |
| OLD | NEW |