OLD | NEW |
(Empty) | |
| 1 # Copyright 2016 Google Inc. All Rights Reserved. |
| 2 # |
| 3 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 # you may not use this file except in compliance with the License. |
| 5 # You may obtain a copy of the License at |
| 6 # |
| 7 # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 # |
| 9 # Unless required by applicable law or agreed to in writing, software |
| 10 # distributed under the License is distributed on an "AS IS" BASIS, |
| 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 # See the License for the specific language governing permissions and |
| 13 # limitations under the License. |
| 14 |
| 15 """Tests for users_id_token and validate_id_token.""" |
| 16 |
| 17 import base64 |
| 18 import json |
| 19 import os |
| 20 import string |
| 21 import time |
| 22 import unittest |
| 23 |
| 24 import endpoints.api_config as api_config |
| 25 |
| 26 import mox |
| 27 from protorpc import message_types |
| 28 from protorpc import messages |
| 29 from protorpc import remote |
| 30 |
| 31 import test_util |
| 32 import endpoints.users_id_token as users_id_token |
| 33 |
| 34 from google.appengine.api import memcache |
| 35 from google.appengine.api import oauth |
| 36 from google.appengine.api import urlfetch |
| 37 from google.appengine.api import users |
| 38 |
| 39 |
| 40 # The key response that allows the _SAMPLE_TOKEN to be verified. This key was |
| 41 # retrieved from: |
| 42 # http://www-googleapis-test.sandbox.google.com//oauth2/v1/raw_public_keys |
| 43 # ...at the same time that _SAMPLE_TOKEN was generated. |
| 44 # The first cert is too short, which caused an exception 'Plaintext too large' |
| 45 # from RSA.encrypt (b/19127342); the second cert is the correct cert. Put both |
| 46 # there to make sure the second cert is tried when the first failed. |
| 47 |
| 48 _CACHED_CERT = { |
| 49 'keyvalues': [ |
| 50 { |
| 51 'algorithm': 'RSA', |
| 52 'modulus': ('2bqhkZ+DZSuQvHX3rdoIni39gfl6zny0WZK6dLPP2lRmer1aEAP982' |
| 53 'u2B1siXoXB8HN+pwCZMGV5kbHaG13InopeVNIMFl2IU4aql+hDS0+i' |
| 54 'j+1Rrsa6wHWp4+3eKe9q+VqXMdulclegHjVtxDs76W1lpuP1e6Msc3' |
| 55 'IuSXjR'), |
| 56 'exponent': 'AQAB', |
| 57 'keyid': '458790a80f9c9957e8df61332b9f06faa6472bad' |
| 58 }, |
| 59 { |
| 60 'algorithm': 'RSA', |
| 61 'modulus': ('AL18Q+dq5ws4/V7KtgfhC6SwJH20GvUN5z3vf4SUSrpQG2/nySBvUh' |
| 62 'Iv86Hkk4Uy7W+OTq2+csCGhjGnRxBx9BThT85G8F6IGNjcOyNHVtnR' |
| 63 'ifX+T88sUB1l7jAISRMCrgHIRNmwDCmEe1fTqTUOdgDT8nB7pX7SA/' |
| 64 'VH0q+t2xml'), |
| 65 'exponent': 'AQAB', |
| 66 'keyid': '7411abfccccb4c253cd3e75b4fa5887f49aa83d1' |
| 67 }, |
| 68 ] |
| 69 } |
| 70 |
| 71 |
| 72 class ModuleInterfaceTest(test_util.ModuleInterfaceTest, |
| 73 unittest.TestCase): |
| 74 |
| 75 MODULE = users_id_token |
| 76 |
| 77 |
| 78 class TestCache(object): |
| 79 """Test stub to replace memcache for id_token verification.""" |
| 80 |
| 81 def __init__(self): |
| 82 self._used_cached_value = False |
| 83 self._value_was_set = False |
| 84 |
| 85 @property |
| 86 def used_cached_value(self): |
| 87 return self._used_cached_value |
| 88 |
| 89 @property |
| 90 def value_was_set(self): |
| 91 return self._value_was_set |
| 92 |
| 93 # pylint: disable=g-bad-name |
| 94 def get(self, key, *unused_args, **kwargs): |
| 95 if (key == users_id_token._DEFAULT_CERT_URI and |
| 96 kwargs.get('namespace', '') == users_id_token._CERT_NAMESPACE): |
| 97 self._used_cached_value = True |
| 98 return _CACHED_CERT |
| 99 return None |
| 100 |
| 101 def set(self, *unused_args, **unused_kwargs): |
| 102 self._value_was_set = True |
| 103 |
| 104 |
| 105 class UsersIdTokenTestBase(unittest.TestCase): |
| 106 """A sample token based on JWT. |
| 107 |
| 108 Sample token is based on a JWT with this body: |
| 109 { |
| 110 "iss":"accounts.google.com", |
| 111 "email":"kevind@gmail.com", |
| 112 "email_verified":"true", |
| 113 "aud":"919214422084-c0jrodnkm7ntttjhhttilqjq5d7l7mu5.apps." |
| 114 "googleusercontent.com", |
| 115 "sub":"104564329451840817415", |
| 116 "azp":"919214422084-c0jrodnkm7ntttjhhttilqjq5d7l7mu5.apps." |
| 117 "googleusercontent.com", |
| 118 "at_hash":"c9aVyHiathUC-pgRFjFWbw", |
| 119 "iat":1360964700, |
| 120 "exp":1360968600 |
| 121 } |
| 122 """ |
| 123 _SAMPLE_TOKEN = ('eyJhbGciOiJSUzI1NiIsImtpZCI6Ijc0MTFhYmZjY2NjYjRjMjUzY2QzZTc' |
| 124 '1YjRmYTU4ODdmNDlhYTgzZDEifQ.eyJpc3MiOiJhY2NvdW50cy5nb29nbGU' |
| 125 'uY29tIiwiZW1haWwiOiJrZXZpbmRAZ21haWwuY29tIiwiZW1haWxfdmVyaW' |
| 126 'ZpZWQiOiJ0cnVlIiwiYXVkIjoiOTE5MjE0NDIyMDg0LWMwanJvZG5rbTdud' |
| 127 'HR0amhodHRpbHFqcTVkN2w3bXU1LmFwcHMuZ29vZ2xldXNlcmNvbnRlbnQu' |
| 128 'Y29tIiwic3ViIjoiMTA0NTY0MzI5NDUxODQwODE3NDE1IiwiYXpwIjoiOTE' |
| 129 '5MjE0NDIyMDg0LWMwanJvZG5rbTdudHR0amhodHRpbHFqcTVkN2w3bXU1Lm' |
| 130 'FwcHMuZ29vZ2xldXNlcmNvbnRlbnQuY29tIiwiYXRfaGFzaCI6ImM5YVZ5S' |
| 131 'GlhdGhVQy1wZ1JGakZXYnciLCJpYXQiOjEzNjA5NjQ3MDAsImV4cCI6MTM2' |
| 132 'MDk2ODYwMH0.XwaGmw5n1XHJapwkn6pumK14l9Tiyn1q2C5VeYbvuScNS6Z' |
| 133 '-kdb9mX87Hl2hbdUvHm6TNzabMVTgvHPATjuCAt2lXOpwm8iGnon6vTk5LM' |
| 134 'm0tUAE25IAImvpSc59l0ySd4x2g3BvjauxwaYjkwYJRVczsVlTTB3iKlBhW' |
| 135 'IT01vM') |
| 136 _SAMPLE_AUDIENCES = ('919214422084-c0jrodnkm7ntttjhhttilqjq5d7l7mu5.apps.' |
| 137 'googleusercontent.com',) |
| 138 _SAMPLE_ALLOWED_CLIENT_IDS = ('919214422084-c0jrodnkm7ntttjhhttilqjq5d7l7mu5.' |
| 139 'apps.googleusercontent.com', |
| 140 '12345.apps.googleusercontent.com') |
| 141 _SAMPLE_TIME_NOW = 1360964700 |
| 142 _SAMPLE_OAUTH_SCOPES = ['https://www.googleapis.com/auth/userinfo.email'] |
| 143 _SAMPLE_OAUTH_TOKEN_INFO = { |
| 144 'issued_to': ('919214422084-c0jrodnkm7ntttjhhttilqjq5d7l7mu5.apps.' |
| 145 'googleusercontent.com'), |
| 146 'user_id': '108495933693426793887', |
| 147 'expires_in': 3384, |
| 148 'access_type': 'online', |
| 149 'audience': ('919214422084-c0jrodnkm7ntttjhhttilqjq5d7l7mu5.apps.' |
| 150 'googleusercontent.com'), |
| 151 'scope': ( |
| 152 'https://www.googleapis.com/auth/userinfo.profile ' |
| 153 'https://www.googleapis.com/auth/userinfo.email'), |
| 154 'email': 'kevind@gmail.com', |
| 155 'verified_email': True |
| 156 } |
| 157 |
| 158 def setUp(self): |
| 159 self.cache = TestCache() |
| 160 self._saved_environ = os.environ.copy() |
| 161 if 'AUTH_DOMAIN' not in os.environ: |
| 162 os.environ['AUTH_DOMAIN'] = 'gmail.com' |
| 163 self.mox = mox.Mox() |
| 164 |
| 165 def tearDown(self): |
| 166 self.mox.UnsetStubs() |
| 167 os.environ = self._saved_environ |
| 168 |
| 169 def GetSampleBody(self): |
| 170 split_token = self._SAMPLE_TOKEN.split('.') |
| 171 body = json.loads(users_id_token._urlsafe_b64decode(split_token[1])) |
| 172 return body |
| 173 |
| 174 |
| 175 class UsersIdTokenTest(UsersIdTokenTestBase): |
| 176 |
| 177 def testSampleIdToken(self): |
| 178 user = users_id_token._get_id_token_user(self._SAMPLE_TOKEN, |
| 179 self._SAMPLE_AUDIENCES, |
| 180 self._SAMPLE_ALLOWED_CLIENT_IDS, |
| 181 self._SAMPLE_TIME_NOW, self.cache) |
| 182 self.assertEqual(user.email(), 'kevind@gmail.com') |
| 183 # User ID shouldn't be filled in. See notes in users_id_token.py. |
| 184 self.assertIsNone(user.user_id()) |
| 185 self.assertTrue(self.cache.used_cached_value) |
| 186 |
| 187 def testInvalidSignature(self): |
| 188 """Verify that a body that doesn't match the signature fails.""" |
| 189 body = self.GetSampleBody() |
| 190 # Modify the issued and expiration times. |
| 191 body['iat'] += 60 |
| 192 body['exp'] += 60 |
| 193 encoded_body = base64.urlsafe_b64encode(json.dumps(body)) |
| 194 |
| 195 split_token = self._SAMPLE_TOKEN.split('.') |
| 196 token = '.'.join((split_token[0], encoded_body, split_token[2])) |
| 197 |
| 198 self.assertRaises(users_id_token._AppIdentityError, |
| 199 users_id_token._verify_signed_jwt_with_certs, |
| 200 token, self._SAMPLE_TIME_NOW, self.cache) |
| 201 |
| 202 def testNoCertRaisesException(self): |
| 203 """Verify that if we can't get certs, we fail.""" |
| 204 self.assertRaises(users_id_token._AppIdentityError, |
| 205 users_id_token._verify_signed_jwt_with_certs, |
| 206 self._SAMPLE_TOKEN, self._SAMPLE_TIME_NOW, self.cache, |
| 207 'https://bad.url/not/in/test/cache') |
| 208 |
| 209 def testGetCertExpirationTime(self): |
| 210 """Test that we can correctly get cert expiration time from headers.""" |
| 211 tests = [({'Cache-Control': 'max-age=3600'}, 3600), |
| 212 ({'Cache-Control': 'max-age=3600', 'Age': '1200'}, 2400), |
| 213 ({}, 0), |
| 214 ({'Age': '1'}, 0), |
| 215 ({'Cache-Control': 'max-age=3600', 'Age': '3700'}, 0), |
| 216 ({'Cache-Control': 'max-age=3600', 'Age': 'bad'}, 3600), |
| 217 ({'Cache-Control': 'max-age=nomatch,max-age=1200'}, 1200), |
| 218 ({'Cache-Control': 'max-age=invalid'}, 0)] |
| 219 for headers, expected_result in tests: |
| 220 result = users_id_token._get_cert_expiration_time(headers) |
| 221 self.assertEqual(expected_result, result) |
| 222 |
| 223 def testCertCacheControl(self): |
| 224 """Test that cache control headers are respected.""" |
| 225 self.mox.StubOutWithMock(urlfetch, 'fetch') |
| 226 tests = [({'Cache-Control': 'max-age=3600', 'Age': '1200'}, True), |
| 227 ({'Cache-Control': 'max-age=100', 'Age': '100'}, False), |
| 228 ({}, False)] |
| 229 for test_headers, value_set in tests: |
| 230 |
| 231 class DummyResponse(object): |
| 232 status_code = 200 |
| 233 content = json.dumps(self._SAMPLE_OAUTH_TOKEN_INFO) |
| 234 headers = test_headers |
| 235 |
| 236 urlfetch.fetch(mox.IsA(basestring)).AndReturn(DummyResponse()) |
| 237 cache = TestCache() |
| 238 |
| 239 self.mox.ReplayAll() |
| 240 users_id_token._get_cached_certs('some_uri', cache) |
| 241 self.mox.VerifyAll() |
| 242 self.mox.ResetAll() |
| 243 |
| 244 self.assertEqual(value_set, cache.value_was_set) |
| 245 |
| 246 def testInvalidTokenExtraSections(self): |
| 247 """Verify that a token with too many pieces fails.""" |
| 248 self.assertRaises(users_id_token._AppIdentityError, |
| 249 users_id_token._verify_signed_jwt_with_certs, |
| 250 self._SAMPLE_TOKEN + '.asdf', self._SAMPLE_TIME_NOW, |
| 251 self.cache) |
| 252 |
| 253 def testNoCrypto(self): |
| 254 """Verify we throw an _AppIdentityError if the Crypto modules don't load.""" |
| 255 crypto_loaded = users_id_token._CRYPTO_LOADED |
| 256 try: |
| 257 users_id_token._CRYPTO_LOADED = False |
| 258 self.assertRaises(users_id_token._AppIdentityError, |
| 259 users_id_token._verify_signed_jwt_with_certs, |
| 260 self._SAMPLE_TOKEN, self._SAMPLE_TIME_NOW, |
| 261 self.cache) |
| 262 finally: |
| 263 users_id_token._CRYPTO_LOADED = crypto_loaded |
| 264 |
| 265 def testExpiredToken(self): |
| 266 """Verify that expired tokens will fail.""" |
| 267 expired_time_now = (self._SAMPLE_TIME_NOW + |
| 268 users_id_token._MAX_TOKEN_LIFETIME_SECS + 1) |
| 269 self.assertRaises(users_id_token._AppIdentityError, |
| 270 users_id_token._verify_signed_jwt_with_certs, |
| 271 self._SAMPLE_TOKEN, expired_time_now, |
| 272 self.cache) |
| 273 # Also verify that this doesn't return a user when called from |
| 274 # users_id_token. |
| 275 user = users_id_token._get_id_token_user(self._SAMPLE_TOKEN, |
| 276 self._SAMPLE_AUDIENCES, |
| 277 self._SAMPLE_ALLOWED_CLIENT_IDS, |
| 278 expired_time_now, self.cache) |
| 279 self.assertIsNone(user) |
| 280 |
| 281 def testTimeTooEarly(self): |
| 282 """Verify that we'll fail if the provided time_now is too early.""" |
| 283 early_time_now = (self._SAMPLE_TIME_NOW - |
| 284 users_id_token._CLOCK_SKEW_SECS - 1) |
| 285 self.assertRaises(users_id_token._AppIdentityError, |
| 286 users_id_token._verify_signed_jwt_with_certs, |
| 287 self._SAMPLE_TOKEN, early_time_now, |
| 288 self.cache) |
| 289 |
| 290 def CheckErrorLoggable(self, token): |
| 291 """Verify that the error strings we log are valid, loggable strings.""" |
| 292 try: |
| 293 users_id_token._verify_signed_jwt_with_certs( |
| 294 token, self._SAMPLE_TIME_NOW, self.cache) |
| 295 self.fail('Expected exception.') |
| 296 except users_id_token._AppIdentityError, e: |
| 297 # Make sure this works without an exception. |
| 298 try: |
| 299 str(e).decode('utf-8') |
| 300 except UnicodeDecodeError: |
| 301 printable = ''.join(c if c in string.printable |
| 302 else '\\x%02x' % ord(c) |
| 303 for c in str(e)) |
| 304 self.fail('Unsafe error sent to log: %s' % printable) |
| 305 |
| 306 def testErrorStringLoggableWrongSegments(self): |
| 307 """Check that the Wrong Segments error is loggable.""" |
| 308 self.CheckErrorLoggable('bad utf-8 \xff') |
| 309 |
| 310 def testErrorStringLoggableBadHeader(self): |
| 311 """Check that the Bad Header error is loggable.""" |
| 312 token_part = 'bad utf-8 \xff' |
| 313 token = '.'.join([base64.urlsafe_b64encode(token_part)] * 3) |
| 314 self.CheckErrorLoggable(token) |
| 315 |
| 316 def testErrorStringLoggableBadBody(self): |
| 317 """Check that the Unparseable Body error is loggable.""" |
| 318 token_body = 'bad utf-8 \xff' |
| 319 token_parts = self._SAMPLE_TOKEN.split('.') |
| 320 token = '.'.join([token_parts[0], |
| 321 base64.urlsafe_b64encode(token_body), |
| 322 token_parts[2]]) |
| 323 self.CheckErrorLoggable(token) |
| 324 |
| 325 def CheckToken(self, field_update_dict, valid): |
| 326 """Update the sample token and check if it's valid or invalid. |
| 327 |
| 328 This updates the body of our sample token with the fields in |
| 329 field_update_dict, then passes it to _verify_parsed_token. The result must |
| 330 match the "valid" parameter. |
| 331 |
| 332 Args: |
| 333 field_update_dict: A dict of fields to update in the sample body. |
| 334 valid: A boolean, compared against the result from _verify_parsed_token. |
| 335 """ |
| 336 parsed_token = self.GetSampleBody() |
| 337 parsed_token.update(field_update_dict) |
| 338 result = users_id_token._verify_parsed_token( |
| 339 parsed_token, self._SAMPLE_AUDIENCES, self._SAMPLE_ALLOWED_CLIENT_IDS) |
| 340 self.assertEqual(valid, result) |
| 341 |
| 342 def testInvalidIssuer(self): |
| 343 self.CheckToken({'iss': 'invalid.issuer'}, False) |
| 344 |
| 345 def testInvalidAudience(self): |
| 346 self.CheckToken({'aud': 'invalid.audience'}, False) |
| 347 |
| 348 def testInvalidClientId(self): |
| 349 self.CheckToken({'azp': 'invalid.client.id'}, False) |
| 350 |
| 351 def testSampleIdTokenWithOldFields(self): |
| 352 self.CheckToken({'cid': 'Extra ignored field.'}, True) |
| 353 |
| 354 def testSkipClientIdNotAllowedForIdTokens(self): |
| 355 """Verify that SKIP_CLIENT_ID_CHECKS does not work for ID tokens.""" |
| 356 parsed_token = self.GetSampleBody() |
| 357 result = users_id_token._verify_parsed_token( |
| 358 parsed_token, self._SAMPLE_AUDIENCES, |
| 359 users_id_token.SKIP_CLIENT_ID_CHECK) |
| 360 self.assertEqual(False, result) |
| 361 |
| 362 def testEmptyAudience(self): |
| 363 parsed_token = self.GetSampleBody() |
| 364 parsed_token.update({'aud': 'invalid.audience'}) |
| 365 result = users_id_token._verify_parsed_token( |
| 366 parsed_token, [], self._SAMPLE_ALLOWED_CLIENT_IDS) |
| 367 self.assertEqual(False, result) |
| 368 |
| 369 def AttemptOauth(self, client_id, allowed_client_ids=None): |
| 370 if allowed_client_ids is None: |
| 371 allowed_client_ids = self._SAMPLE_ALLOWED_CLIENT_IDS |
| 372 self.mox.StubOutWithMock(oauth, 'get_client_id') |
| 373 # We have four cases: |
| 374 # * no client ID is specified, so we raise for every scope. |
| 375 # * the given client ID is in the whitelist or there is no |
| 376 # whitelist, so we'll only be called once. |
| 377 # * we have a client ID not on the whitelist, so we need a |
| 378 # mock call for every scope. |
| 379 if client_id is None: |
| 380 for scope in self._SAMPLE_OAUTH_SCOPES: |
| 381 oauth.get_client_id(scope).AndRaise(oauth.Error) |
| 382 elif (list(allowed_client_ids) == users_id_token.SKIP_CLIENT_ID_CHECK or |
| 383 client_id in allowed_client_ids): |
| 384 scope = self._SAMPLE_OAUTH_SCOPES[0] |
| 385 oauth.get_client_id(scope).AndReturn(client_id) |
| 386 else: |
| 387 for scope in self._SAMPLE_OAUTH_SCOPES: |
| 388 oauth.get_client_id(scope).AndReturn(client_id) |
| 389 |
| 390 self.mox.ReplayAll() |
| 391 users_id_token._set_bearer_user_vars(allowed_client_ids, |
| 392 self._SAMPLE_OAUTH_SCOPES) |
| 393 self.mox.VerifyAll() |
| 394 |
| 395 def assertOauthSucceeded(self, client_id): |
| 396 self.AttemptOauth(client_id) |
| 397 self.assertEqual(os.environ.get('ENDPOINTS_USE_OAUTH_SCOPE'), |
| 398 self._SAMPLE_OAUTH_SCOPES[0]) |
| 399 |
| 400 def assertOauthFailed(self, client_id): |
| 401 self.AttemptOauth(client_id) |
| 402 self.assertNotIn('ENDPOINTS_USE_OAUTH_SCOPE', os.environ) |
| 403 |
| 404 def testOauthInvalidClientId(self): |
| 405 self.assertOauthFailed('abc.appspot.com') |
| 406 |
| 407 def testOauthValidClientId(self): |
| 408 self.assertOauthSucceeded(self._SAMPLE_ALLOWED_CLIENT_IDS[0]) |
| 409 |
| 410 def testOauthExplorerClientId(self): |
| 411 self.assertOauthFailed(api_config.API_EXPLORER_CLIENT_ID) |
| 412 |
| 413 def testOauthInvalidScope(self): |
| 414 self.assertOauthFailed(None) |
| 415 |
| 416 def testAllowAllClientIds(self): |
| 417 client_id = 'clearly_fake_id' |
| 418 self.AttemptOauth(client_id, |
| 419 allowed_client_ids=users_id_token.SKIP_CLIENT_ID_CHECK) |
| 420 self.assertEqual(os.environ.get('ENDPOINTS_USE_OAUTH_SCOPE'), |
| 421 self._SAMPLE_OAUTH_SCOPES[0]) |
| 422 |
| 423 def AttemptOauthLocal(self, token_update=None): |
| 424 token = self._SAMPLE_OAUTH_TOKEN_INFO.copy() |
| 425 token.update(token_update or {}) |
| 426 |
| 427 class DummyResponse(object): |
| 428 status_code = 200 |
| 429 content = json.dumps(token) |
| 430 |
| 431 self.mox.StubOutWithMock(urlfetch, 'fetch') |
| 432 urlfetch.fetch(mox.IsA(basestring)).AndReturn(DummyResponse()) |
| 433 |
| 434 self.mox.ReplayAll() |
| 435 users_id_token._set_bearer_user_vars_local('unused_token', |
| 436 self._SAMPLE_ALLOWED_CLIENT_IDS, |
| 437 self._SAMPLE_OAUTH_SCOPES) |
| 438 self.mox.VerifyAll() |
| 439 |
| 440 def testOauthLocal(self): |
| 441 self.AttemptOauthLocal() |
| 442 self.assertNotIn('ENDPOINTS_USE_OAUTH_SCOPE', os.environ) |
| 443 self.assertEqual('kevind@gmail.com', |
| 444 os.environ.get('ENDPOINTS_AUTH_EMAIL')) |
| 445 self.assertEqual('', os.environ.get('ENDPOINTS_AUTH_DOMAIN')) |
| 446 |
| 447 def assertOauthLocalFailed(self, token_update): |
| 448 self.AttemptOauthLocal(token_update) |
| 449 self.assertNotIn('ENDPOINTS_USE_OAUTH_SCOPE', os.environ) |
| 450 self.assertNotIn('ENDPOINTS_AUTH_EMAIL', os.environ) |
| 451 self.assertNotIn('ENDPOINTS_AUTH_DOMAIN', os.environ) |
| 452 |
| 453 def testOauthLocalBadEmail(self): |
| 454 self.assertOauthLocalFailed({'verified_email': False}) |
| 455 |
| 456 def testOauthLocalBadClientId(self): |
| 457 self.assertOauthLocalFailed({'issued_to': 'abc.appspot.com'}) |
| 458 |
| 459 def testOauthLocalBadScopes(self): |
| 460 self.assertOauthLocalFailed({'scope': 'useless_scope and_another'}) |
| 461 |
| 462 def testGetCurrentUserNoAuthInfo(self): |
| 463 self.assertRaises(users_id_token.InvalidGetUserCall, |
| 464 users_id_token.get_current_user) |
| 465 |
| 466 def testGetCurrentUserEmailOnly(self): |
| 467 os.environ['ENDPOINTS_AUTH_EMAIL'] = 'test@gmail.com' |
| 468 os.environ['ENDPOINTS_AUTH_DOMAIN'] = '' |
| 469 user = users_id_token.get_current_user() |
| 470 self.assertEqual(user.email(), 'test@gmail.com') |
| 471 self.assertIsNone(user.user_id()) |
| 472 |
| 473 def testGetCurrentUserEmailAndAuth(self): |
| 474 os.environ['ENDPOINTS_AUTH_EMAIL'] = 'test@gmail.com' |
| 475 os.environ['ENDPOINTS_AUTH_DOMAIN'] = 'gmail.com' |
| 476 user = users_id_token.get_current_user() |
| 477 self.assertEqual(user.email(), 'test@gmail.com') |
| 478 self.assertEqual(user.auth_domain(), 'gmail.com') |
| 479 self.assertIsNone(user.user_id()) |
| 480 |
| 481 def testGetCurrentUserOauth(self): |
| 482 self.mox.StubOutWithMock(oauth, 'get_current_user') |
| 483 oauth.get_current_user('scope').AndReturn(users.User('test@gmail.com')) |
| 484 self.mox.ReplayAll() |
| 485 |
| 486 os.environ['ENDPOINTS_USE_OAUTH_SCOPE'] = 'scope' |
| 487 user = users_id_token.get_current_user() |
| 488 self.assertEqual(user.email(), 'test@gmail.com') |
| 489 self.mox.VerifyAll() |
| 490 |
| 491 def testGetTokenQueryParamOauthHeader(self): |
| 492 os.environ['HTTP_AUTHORIZATION'] = 'OAuth ' + self._SAMPLE_TOKEN |
| 493 token = users_id_token._get_token(None) |
| 494 self.assertEqual(token, self._SAMPLE_TOKEN) |
| 495 |
| 496 def testGetTokenQueryParamBearerHeader(self): |
| 497 os.environ['HTTP_AUTHORIZATION'] = 'Bearer ' + self._SAMPLE_TOKEN |
| 498 token = users_id_token._get_token(None) |
| 499 self.assertEqual(token, self._SAMPLE_TOKEN) |
| 500 |
| 501 def testGetTokenQueryParamInvalidBearerHeader(self): |
| 502 # Capitalization matters. This should fail. |
| 503 os.environ['HTTP_AUTHORIZATION'] = 'BEARER ' + self._SAMPLE_TOKEN |
| 504 token = users_id_token._get_token(None) |
| 505 self.assertIsNone(token) |
| 506 |
| 507 def testGetTokenQueryParamInvalidHeader(self): |
| 508 os.environ['HTTP_AUTHORIZATION'] = 'Invalid ' + self._SAMPLE_TOKEN |
| 509 token = users_id_token._get_token(None) |
| 510 self.assertIsNone(token) |
| 511 |
| 512 def testGetTokenQueryParamBearer(self): |
| 513 request = self.mox.CreateMock(messages.Message) |
| 514 request.get_unrecognized_field_info('bearer_token').AndReturn( |
| 515 (self._SAMPLE_TOKEN, messages.Variant.STRING)) |
| 516 |
| 517 self.mox.ReplayAll() |
| 518 token = users_id_token._get_token(request) |
| 519 self.mox.VerifyAll() |
| 520 self.assertEqual(token, self._SAMPLE_TOKEN) |
| 521 |
| 522 def testGetTokenQueryParamAccess(self): |
| 523 request = self.mox.CreateMock(messages.Message) |
| 524 request.get_unrecognized_field_info('bearer_token').AndReturn( |
| 525 (None, None)) |
| 526 request.get_unrecognized_field_info('access_token').AndReturn( |
| 527 (self._SAMPLE_TOKEN, messages.Variant.STRING)) |
| 528 |
| 529 self.mox.ReplayAll() |
| 530 token = users_id_token._get_token(request) |
| 531 self.mox.VerifyAll() |
| 532 self.assertEqual(token, self._SAMPLE_TOKEN) |
| 533 |
| 534 def testGetTokenNone(self): |
| 535 request = self.mox.CreateMock(messages.Message) |
| 536 request.get_unrecognized_field_info('bearer_token').AndReturn((None, None)) |
| 537 request.get_unrecognized_field_info('access_token').AndReturn((None, None)) |
| 538 |
| 539 self.mox.ReplayAll() |
| 540 token = users_id_token._get_token(request) |
| 541 self.mox.VerifyAll() |
| 542 self.assertIsNone(token) |
| 543 |
| 544 |
| 545 class UsersIdTokenTestWithSimpleApi(UsersIdTokenTestBase): |
| 546 |
| 547 # pylint: disable=g-bad-name |
| 548 |
| 549 @api_config.api('TestApi', 'v1') |
| 550 class TestApiAnnotatedAtMethod(remote.Service): |
| 551 """Describes TestApi.""" |
| 552 |
| 553 @api_config.method( |
| 554 message_types.VoidMessage, message_types.VoidMessage, |
| 555 audiences=UsersIdTokenTestBase._SAMPLE_AUDIENCES, |
| 556 allowed_client_ids=UsersIdTokenTestBase._SAMPLE_ALLOWED_CLIENT_IDS, |
| 557 scopes=UsersIdTokenTestBase._SAMPLE_OAUTH_SCOPES) |
| 558 def method(self): |
| 559 pass |
| 560 |
| 561 @api_config.api( |
| 562 'TestApi', 'v1', audiences=UsersIdTokenTestBase._SAMPLE_AUDIENCES, |
| 563 allowed_client_ids=UsersIdTokenTestBase._SAMPLE_ALLOWED_CLIENT_IDS) |
| 564 class TestApiAnnotatedAtApi(remote.Service): |
| 565 """Describes TestApi.""" |
| 566 |
| 567 @api_config.method(message_types.VoidMessage, message_types.VoidMessage) |
| 568 def method(self, request): |
| 569 return request |
| 570 # pylint: enable=g-bad-name |
| 571 |
| 572 def testMaybeSetVarsAlreadySetOauth(self): |
| 573 os.environ['ENDPOINTS_USE_OAUTH_SCOPE'] = ( |
| 574 'https://www.googleapis.com/auth/userinfo.email') |
| 575 users_id_token._maybe_set_current_user_vars( |
| 576 self.TestApiAnnotatedAtApi().method) |
| 577 self.assertEqual('https://www.googleapis.com/auth/userinfo.email', |
| 578 os.environ.get('ENDPOINTS_USE_OAUTH_SCOPE')) |
| 579 self.assertNotIn('ENDPOINTS_AUTH_EMAIL', os.environ) |
| 580 self.assertNotIn('ENDPOINTS_AUTH_DOMAIN', os.environ) |
| 581 |
| 582 def testMaybeSetVarsAlreadySetIdToken(self): |
| 583 os.environ['ENDPOINTS_AUTH_EMAIL'] = 'test@gmail.com' |
| 584 os.environ['ENDPOINTS_AUTH_DOMAIN'] = 'gmail.com' |
| 585 users_id_token._maybe_set_current_user_vars( |
| 586 self.TestApiAnnotatedAtApi().method) |
| 587 self.assertNotIn('ENDPOINTS_USE_OAUTH_SCOPE', os.environ) |
| 588 self.assertEqual('test@gmail.com', os.environ.get('ENDPOINTS_AUTH_EMAIL')) |
| 589 self.assertEqual('gmail.com', os.environ.get('ENDPOINTS_AUTH_DOMAIN')) |
| 590 |
| 591 def testMaybeSetVarsAlreadySetIdTokenNoDomain(self): |
| 592 os.environ['ENDPOINTS_AUTH_EMAIL'] = 'test@gmail.com' |
| 593 os.environ['ENDPOINTS_AUTH_DOMAIN'] = '' |
| 594 users_id_token._maybe_set_current_user_vars( |
| 595 self.TestApiAnnotatedAtApi().method) |
| 596 self.assertNotIn('ENDPOINTS_USE_OAUTH_SCOPE', os.environ) |
| 597 self.assertEqual('test@gmail.com', os.environ.get('ENDPOINTS_AUTH_EMAIL')) |
| 598 self.assertEqual('', os.environ.get('ENDPOINTS_AUTH_DOMAIN')) |
| 599 |
| 600 def VerifyIdToken(self, cls, *args): |
| 601 self.mox.StubOutWithMock(time, 'time') |
| 602 self.mox.StubOutWithMock(users_id_token, '_get_id_token_user') |
| 603 time.time().AndReturn(1001) |
| 604 users_id_token._get_id_token_user( |
| 605 self._SAMPLE_TOKEN, |
| 606 self._SAMPLE_AUDIENCES, |
| 607 self._SAMPLE_ALLOWED_CLIENT_IDS, |
| 608 1001, memcache).AndReturn(users.User('test@gmail.com')) |
| 609 self.mox.ReplayAll() |
| 610 |
| 611 os.environ['HTTP_AUTHORIZATION'] = ('Bearer ' + self._SAMPLE_TOKEN) |
| 612 if args: |
| 613 cls.method(*args) |
| 614 else: |
| 615 users_id_token._maybe_set_current_user_vars(cls.method) |
| 616 self.assertEqual(os.environ.get('ENDPOINTS_AUTH_EMAIL'), 'test@gmail.com') |
| 617 self.mox.VerifyAll() |
| 618 |
| 619 def testMaybeSetVarsIdTokenApiAnnotation(self): |
| 620 self.VerifyIdToken(self.TestApiAnnotatedAtApi()) |
| 621 |
| 622 def testMaybeSetVarsIdTokenMethodAnnotation(self): |
| 623 self.VerifyIdToken(self.TestApiAnnotatedAtMethod()) |
| 624 |
| 625 def testMethodCallParsesIdToken(self): |
| 626 self.VerifyIdToken(self.TestApiAnnotatedAtApi(), |
| 627 message_types.VoidMessage()) |
| 628 |
| 629 def testMaybeSetVarsWithActualRequestAccessToken(self): |
| 630 dummy_scope = 'scope' |
| 631 dummy_token = 'dummy_token' |
| 632 dummy_email = 'test@gmail.com' |
| 633 dummy_client_id = self._SAMPLE_ALLOWED_CLIENT_IDS[0] |
| 634 |
| 635 @api_config.api('TestApi', 'v1', |
| 636 allowed_client_ids=self._SAMPLE_ALLOWED_CLIENT_IDS, |
| 637 scopes=[dummy_scope]) |
| 638 class TestApiScopes(remote.Service): |
| 639 """Describes TestApiScopes.""" |
| 640 |
| 641 # pylint: disable=g-bad-name |
| 642 @api_config.method(message_types.VoidMessage, message_types.VoidMessage) |
| 643 def method(self, request): |
| 644 return request |
| 645 |
| 646 # users_id_token._get_id_token_user and time.time don't need to be stubbed |
| 647 # because the scopes used will not be [EMAIL_SCOPE] hence _get_id_token_user |
| 648 # will never be attempted |
| 649 |
| 650 self.mox.StubOutWithMock(users_id_token, '_is_local_dev') |
| 651 users_id_token._is_local_dev().AndReturn(False) |
| 652 |
| 653 self.mox.StubOutWithMock(oauth, 'get_client_id') |
| 654 oauth.get_client_id(dummy_scope).AndReturn(dummy_client_id) |
| 655 |
| 656 self.mox.ReplayAll() |
| 657 |
| 658 api_instance = TestApiScopes() |
| 659 os.environ['HTTP_AUTHORIZATION'] = 'Bearer ' + dummy_token |
| 660 api_instance.method(message_types.VoidMessage()) |
| 661 self.assertEqual(os.getenv('ENDPOINTS_USE_OAUTH_SCOPE'), dummy_scope) |
| 662 self.mox.VerifyAll() |
| 663 |
| 664 def testMaybeSetVarsFail(self): |
| 665 self.mox.StubOutWithMock(time, 'time') |
| 666 time.time().MultipleTimes().AndReturn(1001) |
| 667 self.mox.StubOutWithMock(users_id_token, '_get_id_token_user') |
| 668 users_id_token._get_id_token_user( |
| 669 self._SAMPLE_TOKEN, |
| 670 self._SAMPLE_AUDIENCES, |
| 671 self._SAMPLE_ALLOWED_CLIENT_IDS, |
| 672 1001, memcache).MultipleTimes().AndReturn(users.User('test@gmail.com')) |
| 673 self.mox.ReplayAll() |
| 674 # This token should correctly result in _get_id_token_user being called |
| 675 os.environ['HTTP_AUTHORIZATION'] = ('Bearer ' + self._SAMPLE_TOKEN) |
| 676 api_instance = self.TestApiAnnotatedAtApi() |
| 677 |
| 678 # No im_self is present and no api_info can be used, so the method itself |
| 679 # has no access to scopes, hence scopes will be null and neither of the |
| 680 # token checks will occur |
| 681 users_id_token._maybe_set_current_user_vars(api_instance.method.im_func) |
| 682 self.assertNotIn('ENDPOINTS_USE_OAUTH_SCOPE', os.environ) |
| 683 self.assertEqual(os.getenv('ENDPOINTS_AUTH_EMAIL'), '') |
| 684 self.assertEqual(os.getenv('ENDPOINTS_AUTH_DOMAIN'), '') |
| 685 |
| 686 # Test the same works when using the method and not im_func |
| 687 os.environ.pop('ENDPOINTS_AUTH_EMAIL') |
| 688 os.environ.pop('ENDPOINTS_AUTH_DOMAIN') |
| 689 users_id_token._maybe_set_current_user_vars(api_instance.method) |
| 690 self.assertEqual(os.getenv('ENDPOINTS_AUTH_EMAIL'), 'test@gmail.com') |
| 691 |
| 692 # Test that it works using the api info from the API |
| 693 os.environ.pop('ENDPOINTS_AUTH_EMAIL') |
| 694 os.environ.pop('ENDPOINTS_AUTH_DOMAIN') |
| 695 users_id_token._maybe_set_current_user_vars(api_instance.method.im_func, |
| 696 api_info=api_instance.api_info) |
| 697 self.assertEqual(os.getenv('ENDPOINTS_AUTH_EMAIL'), 'test@gmail.com') |
| 698 self.mox.VerifyAll() |
| 699 |
| 700 |
| 701 if __name__ == '__main__': |
| 702 unittest.main() |
OLD | NEW |