| Index: third_party/google-endpoints/endpoints/test/users_id_token_test.py
|
| diff --git a/third_party/google-endpoints/endpoints/test/users_id_token_test.py b/third_party/google-endpoints/endpoints/test/users_id_token_test.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..038175289d5c422dfcfeacaca8ad5fdd6efa23e9
|
| --- /dev/null
|
| +++ b/third_party/google-endpoints/endpoints/test/users_id_token_test.py
|
| @@ -0,0 +1,702 @@
|
| +# Copyright 2016 Google Inc. All Rights Reserved.
|
| +#
|
| +# Licensed under the Apache License, Version 2.0 (the "License");
|
| +# you may not use this file except in compliance with the License.
|
| +# You may obtain a copy of the License at
|
| +#
|
| +# http://www.apache.org/licenses/LICENSE-2.0
|
| +#
|
| +# Unless required by applicable law or agreed to in writing, software
|
| +# distributed under the License is distributed on an "AS IS" BASIS,
|
| +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| +# See the License for the specific language governing permissions and
|
| +# limitations under the License.
|
| +
|
| +"""Tests for users_id_token and validate_id_token."""
|
| +
|
| +import base64
|
| +import json
|
| +import os
|
| +import string
|
| +import time
|
| +import unittest
|
| +
|
| +import endpoints.api_config as api_config
|
| +
|
| +import mox
|
| +from protorpc import message_types
|
| +from protorpc import messages
|
| +from protorpc import remote
|
| +
|
| +import test_util
|
| +import endpoints.users_id_token as users_id_token
|
| +
|
| +from google.appengine.api import memcache
|
| +from google.appengine.api import oauth
|
| +from google.appengine.api import urlfetch
|
| +from google.appengine.api import users
|
| +
|
| +
|
| +# The key response that allows the _SAMPLE_TOKEN to be verified. This key was
|
| +# retrieved from:
|
| +# http://www-googleapis-test.sandbox.google.com//oauth2/v1/raw_public_keys
|
| +# ...at the same time that _SAMPLE_TOKEN was generated.
|
| +# The first cert is too short, which caused an exception 'Plaintext too large'
|
| +# from RSA.encrypt (b/19127342); the second cert is the correct cert. Put both
|
| +# there to make sure the second cert is tried when the first failed.
|
| +
|
| +_CACHED_CERT = {
|
| + 'keyvalues': [
|
| + {
|
| + 'algorithm': 'RSA',
|
| + 'modulus': ('2bqhkZ+DZSuQvHX3rdoIni39gfl6zny0WZK6dLPP2lRmer1aEAP982'
|
| + 'u2B1siXoXB8HN+pwCZMGV5kbHaG13InopeVNIMFl2IU4aql+hDS0+i'
|
| + 'j+1Rrsa6wHWp4+3eKe9q+VqXMdulclegHjVtxDs76W1lpuP1e6Msc3'
|
| + 'IuSXjR'),
|
| + 'exponent': 'AQAB',
|
| + 'keyid': '458790a80f9c9957e8df61332b9f06faa6472bad'
|
| + },
|
| + {
|
| + 'algorithm': 'RSA',
|
| + 'modulus': ('AL18Q+dq5ws4/V7KtgfhC6SwJH20GvUN5z3vf4SUSrpQG2/nySBvUh'
|
| + 'Iv86Hkk4Uy7W+OTq2+csCGhjGnRxBx9BThT85G8F6IGNjcOyNHVtnR'
|
| + 'ifX+T88sUB1l7jAISRMCrgHIRNmwDCmEe1fTqTUOdgDT8nB7pX7SA/'
|
| + 'VH0q+t2xml'),
|
| + 'exponent': 'AQAB',
|
| + 'keyid': '7411abfccccb4c253cd3e75b4fa5887f49aa83d1'
|
| + },
|
| + ]
|
| +}
|
| +
|
| +
|
| +class ModuleInterfaceTest(test_util.ModuleInterfaceTest,
|
| + unittest.TestCase):
|
| +
|
| + MODULE = users_id_token
|
| +
|
| +
|
| +class TestCache(object):
|
| + """Test stub to replace memcache for id_token verification."""
|
| +
|
| + def __init__(self):
|
| + self._used_cached_value = False
|
| + self._value_was_set = False
|
| +
|
| + @property
|
| + def used_cached_value(self):
|
| + return self._used_cached_value
|
| +
|
| + @property
|
| + def value_was_set(self):
|
| + return self._value_was_set
|
| +
|
| + # pylint: disable=g-bad-name
|
| + def get(self, key, *unused_args, **kwargs):
|
| + if (key == users_id_token._DEFAULT_CERT_URI and
|
| + kwargs.get('namespace', '') == users_id_token._CERT_NAMESPACE):
|
| + self._used_cached_value = True
|
| + return _CACHED_CERT
|
| + return None
|
| +
|
| + def set(self, *unused_args, **unused_kwargs):
|
| + self._value_was_set = True
|
| +
|
| +
|
| +class UsersIdTokenTestBase(unittest.TestCase):
|
| + """A sample token based on JWT.
|
| +
|
| + Sample token is based on a JWT with this body:
|
| + {
|
| + "iss":"accounts.google.com",
|
| + "email":"kevind@gmail.com",
|
| + "email_verified":"true",
|
| + "aud":"919214422084-c0jrodnkm7ntttjhhttilqjq5d7l7mu5.apps."
|
| + "googleusercontent.com",
|
| + "sub":"104564329451840817415",
|
| + "azp":"919214422084-c0jrodnkm7ntttjhhttilqjq5d7l7mu5.apps."
|
| + "googleusercontent.com",
|
| + "at_hash":"c9aVyHiathUC-pgRFjFWbw",
|
| + "iat":1360964700,
|
| + "exp":1360968600
|
| + }
|
| + """
|
| + _SAMPLE_TOKEN = ('eyJhbGciOiJSUzI1NiIsImtpZCI6Ijc0MTFhYmZjY2NjYjRjMjUzY2QzZTc'
|
| + '1YjRmYTU4ODdmNDlhYTgzZDEifQ.eyJpc3MiOiJhY2NvdW50cy5nb29nbGU'
|
| + 'uY29tIiwiZW1haWwiOiJrZXZpbmRAZ21haWwuY29tIiwiZW1haWxfdmVyaW'
|
| + 'ZpZWQiOiJ0cnVlIiwiYXVkIjoiOTE5MjE0NDIyMDg0LWMwanJvZG5rbTdud'
|
| + 'HR0amhodHRpbHFqcTVkN2w3bXU1LmFwcHMuZ29vZ2xldXNlcmNvbnRlbnQu'
|
| + 'Y29tIiwic3ViIjoiMTA0NTY0MzI5NDUxODQwODE3NDE1IiwiYXpwIjoiOTE'
|
| + '5MjE0NDIyMDg0LWMwanJvZG5rbTdudHR0amhodHRpbHFqcTVkN2w3bXU1Lm'
|
| + 'FwcHMuZ29vZ2xldXNlcmNvbnRlbnQuY29tIiwiYXRfaGFzaCI6ImM5YVZ5S'
|
| + 'GlhdGhVQy1wZ1JGakZXYnciLCJpYXQiOjEzNjA5NjQ3MDAsImV4cCI6MTM2'
|
| + 'MDk2ODYwMH0.XwaGmw5n1XHJapwkn6pumK14l9Tiyn1q2C5VeYbvuScNS6Z'
|
| + '-kdb9mX87Hl2hbdUvHm6TNzabMVTgvHPATjuCAt2lXOpwm8iGnon6vTk5LM'
|
| + 'm0tUAE25IAImvpSc59l0ySd4x2g3BvjauxwaYjkwYJRVczsVlTTB3iKlBhW'
|
| + 'IT01vM')
|
| + _SAMPLE_AUDIENCES = ('919214422084-c0jrodnkm7ntttjhhttilqjq5d7l7mu5.apps.'
|
| + 'googleusercontent.com',)
|
| + _SAMPLE_ALLOWED_CLIENT_IDS = ('919214422084-c0jrodnkm7ntttjhhttilqjq5d7l7mu5.'
|
| + 'apps.googleusercontent.com',
|
| + '12345.apps.googleusercontent.com')
|
| + _SAMPLE_TIME_NOW = 1360964700
|
| + _SAMPLE_OAUTH_SCOPES = ['https://www.googleapis.com/auth/userinfo.email']
|
| + _SAMPLE_OAUTH_TOKEN_INFO = {
|
| + 'issued_to': ('919214422084-c0jrodnkm7ntttjhhttilqjq5d7l7mu5.apps.'
|
| + 'googleusercontent.com'),
|
| + 'user_id': '108495933693426793887',
|
| + 'expires_in': 3384,
|
| + 'access_type': 'online',
|
| + 'audience': ('919214422084-c0jrodnkm7ntttjhhttilqjq5d7l7mu5.apps.'
|
| + 'googleusercontent.com'),
|
| + 'scope': (
|
| + 'https://www.googleapis.com/auth/userinfo.profile '
|
| + 'https://www.googleapis.com/auth/userinfo.email'),
|
| + 'email': 'kevind@gmail.com',
|
| + 'verified_email': True
|
| + }
|
| +
|
| + def setUp(self):
|
| + self.cache = TestCache()
|
| + self._saved_environ = os.environ.copy()
|
| + if 'AUTH_DOMAIN' not in os.environ:
|
| + os.environ['AUTH_DOMAIN'] = 'gmail.com'
|
| + self.mox = mox.Mox()
|
| +
|
| + def tearDown(self):
|
| + self.mox.UnsetStubs()
|
| + os.environ = self._saved_environ
|
| +
|
| + def GetSampleBody(self):
|
| + split_token = self._SAMPLE_TOKEN.split('.')
|
| + body = json.loads(users_id_token._urlsafe_b64decode(split_token[1]))
|
| + return body
|
| +
|
| +
|
| +class UsersIdTokenTest(UsersIdTokenTestBase):
|
| +
|
| + def testSampleIdToken(self):
|
| + user = users_id_token._get_id_token_user(self._SAMPLE_TOKEN,
|
| + self._SAMPLE_AUDIENCES,
|
| + self._SAMPLE_ALLOWED_CLIENT_IDS,
|
| + self._SAMPLE_TIME_NOW, self.cache)
|
| + self.assertEqual(user.email(), 'kevind@gmail.com')
|
| + # User ID shouldn't be filled in. See notes in users_id_token.py.
|
| + self.assertIsNone(user.user_id())
|
| + self.assertTrue(self.cache.used_cached_value)
|
| +
|
| + def testInvalidSignature(self):
|
| + """Verify that a body that doesn't match the signature fails."""
|
| + body = self.GetSampleBody()
|
| + # Modify the issued and expiration times.
|
| + body['iat'] += 60
|
| + body['exp'] += 60
|
| + encoded_body = base64.urlsafe_b64encode(json.dumps(body))
|
| +
|
| + split_token = self._SAMPLE_TOKEN.split('.')
|
| + token = '.'.join((split_token[0], encoded_body, split_token[2]))
|
| +
|
| + self.assertRaises(users_id_token._AppIdentityError,
|
| + users_id_token._verify_signed_jwt_with_certs,
|
| + token, self._SAMPLE_TIME_NOW, self.cache)
|
| +
|
| + def testNoCertRaisesException(self):
|
| + """Verify that if we can't get certs, we fail."""
|
| + self.assertRaises(users_id_token._AppIdentityError,
|
| + users_id_token._verify_signed_jwt_with_certs,
|
| + self._SAMPLE_TOKEN, self._SAMPLE_TIME_NOW, self.cache,
|
| + 'https://bad.url/not/in/test/cache')
|
| +
|
| + def testGetCertExpirationTime(self):
|
| + """Test that we can correctly get cert expiration time from headers."""
|
| + tests = [({'Cache-Control': 'max-age=3600'}, 3600),
|
| + ({'Cache-Control': 'max-age=3600', 'Age': '1200'}, 2400),
|
| + ({}, 0),
|
| + ({'Age': '1'}, 0),
|
| + ({'Cache-Control': 'max-age=3600', 'Age': '3700'}, 0),
|
| + ({'Cache-Control': 'max-age=3600', 'Age': 'bad'}, 3600),
|
| + ({'Cache-Control': 'max-age=nomatch,max-age=1200'}, 1200),
|
| + ({'Cache-Control': 'max-age=invalid'}, 0)]
|
| + for headers, expected_result in tests:
|
| + result = users_id_token._get_cert_expiration_time(headers)
|
| + self.assertEqual(expected_result, result)
|
| +
|
| + def testCertCacheControl(self):
|
| + """Test that cache control headers are respected."""
|
| + self.mox.StubOutWithMock(urlfetch, 'fetch')
|
| + tests = [({'Cache-Control': 'max-age=3600', 'Age': '1200'}, True),
|
| + ({'Cache-Control': 'max-age=100', 'Age': '100'}, False),
|
| + ({}, False)]
|
| + for test_headers, value_set in tests:
|
| +
|
| + class DummyResponse(object):
|
| + status_code = 200
|
| + content = json.dumps(self._SAMPLE_OAUTH_TOKEN_INFO)
|
| + headers = test_headers
|
| +
|
| + urlfetch.fetch(mox.IsA(basestring)).AndReturn(DummyResponse())
|
| + cache = TestCache()
|
| +
|
| + self.mox.ReplayAll()
|
| + users_id_token._get_cached_certs('some_uri', cache)
|
| + self.mox.VerifyAll()
|
| + self.mox.ResetAll()
|
| +
|
| + self.assertEqual(value_set, cache.value_was_set)
|
| +
|
| + def testInvalidTokenExtraSections(self):
|
| + """Verify that a token with too many pieces fails."""
|
| + self.assertRaises(users_id_token._AppIdentityError,
|
| + users_id_token._verify_signed_jwt_with_certs,
|
| + self._SAMPLE_TOKEN + '.asdf', self._SAMPLE_TIME_NOW,
|
| + self.cache)
|
| +
|
| + def testNoCrypto(self):
|
| + """Verify we throw an _AppIdentityError if the Crypto modules don't load."""
|
| + crypto_loaded = users_id_token._CRYPTO_LOADED
|
| + try:
|
| + users_id_token._CRYPTO_LOADED = False
|
| + self.assertRaises(users_id_token._AppIdentityError,
|
| + users_id_token._verify_signed_jwt_with_certs,
|
| + self._SAMPLE_TOKEN, self._SAMPLE_TIME_NOW,
|
| + self.cache)
|
| + finally:
|
| + users_id_token._CRYPTO_LOADED = crypto_loaded
|
| +
|
| + def testExpiredToken(self):
|
| + """Verify that expired tokens will fail."""
|
| + expired_time_now = (self._SAMPLE_TIME_NOW +
|
| + users_id_token._MAX_TOKEN_LIFETIME_SECS + 1)
|
| + self.assertRaises(users_id_token._AppIdentityError,
|
| + users_id_token._verify_signed_jwt_with_certs,
|
| + self._SAMPLE_TOKEN, expired_time_now,
|
| + self.cache)
|
| + # Also verify that this doesn't return a user when called from
|
| + # users_id_token.
|
| + user = users_id_token._get_id_token_user(self._SAMPLE_TOKEN,
|
| + self._SAMPLE_AUDIENCES,
|
| + self._SAMPLE_ALLOWED_CLIENT_IDS,
|
| + expired_time_now, self.cache)
|
| + self.assertIsNone(user)
|
| +
|
| + def testTimeTooEarly(self):
|
| + """Verify that we'll fail if the provided time_now is too early."""
|
| + early_time_now = (self._SAMPLE_TIME_NOW -
|
| + users_id_token._CLOCK_SKEW_SECS - 1)
|
| + self.assertRaises(users_id_token._AppIdentityError,
|
| + users_id_token._verify_signed_jwt_with_certs,
|
| + self._SAMPLE_TOKEN, early_time_now,
|
| + self.cache)
|
| +
|
| + def CheckErrorLoggable(self, token):
|
| + """Verify that the error strings we log are valid, loggable strings."""
|
| + try:
|
| + users_id_token._verify_signed_jwt_with_certs(
|
| + token, self._SAMPLE_TIME_NOW, self.cache)
|
| + self.fail('Expected exception.')
|
| + except users_id_token._AppIdentityError, e:
|
| + # Make sure this works without an exception.
|
| + try:
|
| + str(e).decode('utf-8')
|
| + except UnicodeDecodeError:
|
| + printable = ''.join(c if c in string.printable
|
| + else '\\x%02x' % ord(c)
|
| + for c in str(e))
|
| + self.fail('Unsafe error sent to log: %s' % printable)
|
| +
|
| + def testErrorStringLoggableWrongSegments(self):
|
| + """Check that the Wrong Segments error is loggable."""
|
| + self.CheckErrorLoggable('bad utf-8 \xff')
|
| +
|
| + def testErrorStringLoggableBadHeader(self):
|
| + """Check that the Bad Header error is loggable."""
|
| + token_part = 'bad utf-8 \xff'
|
| + token = '.'.join([base64.urlsafe_b64encode(token_part)] * 3)
|
| + self.CheckErrorLoggable(token)
|
| +
|
| + def testErrorStringLoggableBadBody(self):
|
| + """Check that the Unparseable Body error is loggable."""
|
| + token_body = 'bad utf-8 \xff'
|
| + token_parts = self._SAMPLE_TOKEN.split('.')
|
| + token = '.'.join([token_parts[0],
|
| + base64.urlsafe_b64encode(token_body),
|
| + token_parts[2]])
|
| + self.CheckErrorLoggable(token)
|
| +
|
| + def CheckToken(self, field_update_dict, valid):
|
| + """Update the sample token and check if it's valid or invalid.
|
| +
|
| + This updates the body of our sample token with the fields in
|
| + field_update_dict, then passes it to _verify_parsed_token. The result must
|
| + match the "valid" parameter.
|
| +
|
| + Args:
|
| + field_update_dict: A dict of fields to update in the sample body.
|
| + valid: A boolean, compared against the result from _verify_parsed_token.
|
| + """
|
| + parsed_token = self.GetSampleBody()
|
| + parsed_token.update(field_update_dict)
|
| + result = users_id_token._verify_parsed_token(
|
| + parsed_token, self._SAMPLE_AUDIENCES, self._SAMPLE_ALLOWED_CLIENT_IDS)
|
| + self.assertEqual(valid, result)
|
| +
|
| + def testInvalidIssuer(self):
|
| + self.CheckToken({'iss': 'invalid.issuer'}, False)
|
| +
|
| + def testInvalidAudience(self):
|
| + self.CheckToken({'aud': 'invalid.audience'}, False)
|
| +
|
| + def testInvalidClientId(self):
|
| + self.CheckToken({'azp': 'invalid.client.id'}, False)
|
| +
|
| + def testSampleIdTokenWithOldFields(self):
|
| + self.CheckToken({'cid': 'Extra ignored field.'}, True)
|
| +
|
| + def testSkipClientIdNotAllowedForIdTokens(self):
|
| + """Verify that SKIP_CLIENT_ID_CHECKS does not work for ID tokens."""
|
| + parsed_token = self.GetSampleBody()
|
| + result = users_id_token._verify_parsed_token(
|
| + parsed_token, self._SAMPLE_AUDIENCES,
|
| + users_id_token.SKIP_CLIENT_ID_CHECK)
|
| + self.assertEqual(False, result)
|
| +
|
| + def testEmptyAudience(self):
|
| + parsed_token = self.GetSampleBody()
|
| + parsed_token.update({'aud': 'invalid.audience'})
|
| + result = users_id_token._verify_parsed_token(
|
| + parsed_token, [], self._SAMPLE_ALLOWED_CLIENT_IDS)
|
| + self.assertEqual(False, result)
|
| +
|
| + def AttemptOauth(self, client_id, allowed_client_ids=None):
|
| + if allowed_client_ids is None:
|
| + allowed_client_ids = self._SAMPLE_ALLOWED_CLIENT_IDS
|
| + self.mox.StubOutWithMock(oauth, 'get_client_id')
|
| + # We have four cases:
|
| + # * no client ID is specified, so we raise for every scope.
|
| + # * the given client ID is in the whitelist or there is no
|
| + # whitelist, so we'll only be called once.
|
| + # * we have a client ID not on the whitelist, so we need a
|
| + # mock call for every scope.
|
| + if client_id is None:
|
| + for scope in self._SAMPLE_OAUTH_SCOPES:
|
| + oauth.get_client_id(scope).AndRaise(oauth.Error)
|
| + elif (list(allowed_client_ids) == users_id_token.SKIP_CLIENT_ID_CHECK or
|
| + client_id in allowed_client_ids):
|
| + scope = self._SAMPLE_OAUTH_SCOPES[0]
|
| + oauth.get_client_id(scope).AndReturn(client_id)
|
| + else:
|
| + for scope in self._SAMPLE_OAUTH_SCOPES:
|
| + oauth.get_client_id(scope).AndReturn(client_id)
|
| +
|
| + self.mox.ReplayAll()
|
| + users_id_token._set_bearer_user_vars(allowed_client_ids,
|
| + self._SAMPLE_OAUTH_SCOPES)
|
| + self.mox.VerifyAll()
|
| +
|
| + def assertOauthSucceeded(self, client_id):
|
| + self.AttemptOauth(client_id)
|
| + self.assertEqual(os.environ.get('ENDPOINTS_USE_OAUTH_SCOPE'),
|
| + self._SAMPLE_OAUTH_SCOPES[0])
|
| +
|
| + def assertOauthFailed(self, client_id):
|
| + self.AttemptOauth(client_id)
|
| + self.assertNotIn('ENDPOINTS_USE_OAUTH_SCOPE', os.environ)
|
| +
|
| + def testOauthInvalidClientId(self):
|
| + self.assertOauthFailed('abc.appspot.com')
|
| +
|
| + def testOauthValidClientId(self):
|
| + self.assertOauthSucceeded(self._SAMPLE_ALLOWED_CLIENT_IDS[0])
|
| +
|
| + def testOauthExplorerClientId(self):
|
| + self.assertOauthFailed(api_config.API_EXPLORER_CLIENT_ID)
|
| +
|
| + def testOauthInvalidScope(self):
|
| + self.assertOauthFailed(None)
|
| +
|
| + def testAllowAllClientIds(self):
|
| + client_id = 'clearly_fake_id'
|
| + self.AttemptOauth(client_id,
|
| + allowed_client_ids=users_id_token.SKIP_CLIENT_ID_CHECK)
|
| + self.assertEqual(os.environ.get('ENDPOINTS_USE_OAUTH_SCOPE'),
|
| + self._SAMPLE_OAUTH_SCOPES[0])
|
| +
|
| + def AttemptOauthLocal(self, token_update=None):
|
| + token = self._SAMPLE_OAUTH_TOKEN_INFO.copy()
|
| + token.update(token_update or {})
|
| +
|
| + class DummyResponse(object):
|
| + status_code = 200
|
| + content = json.dumps(token)
|
| +
|
| + self.mox.StubOutWithMock(urlfetch, 'fetch')
|
| + urlfetch.fetch(mox.IsA(basestring)).AndReturn(DummyResponse())
|
| +
|
| + self.mox.ReplayAll()
|
| + users_id_token._set_bearer_user_vars_local('unused_token',
|
| + self._SAMPLE_ALLOWED_CLIENT_IDS,
|
| + self._SAMPLE_OAUTH_SCOPES)
|
| + self.mox.VerifyAll()
|
| +
|
| + def testOauthLocal(self):
|
| + self.AttemptOauthLocal()
|
| + self.assertNotIn('ENDPOINTS_USE_OAUTH_SCOPE', os.environ)
|
| + self.assertEqual('kevind@gmail.com',
|
| + os.environ.get('ENDPOINTS_AUTH_EMAIL'))
|
| + self.assertEqual('', os.environ.get('ENDPOINTS_AUTH_DOMAIN'))
|
| +
|
| + def assertOauthLocalFailed(self, token_update):
|
| + self.AttemptOauthLocal(token_update)
|
| + self.assertNotIn('ENDPOINTS_USE_OAUTH_SCOPE', os.environ)
|
| + self.assertNotIn('ENDPOINTS_AUTH_EMAIL', os.environ)
|
| + self.assertNotIn('ENDPOINTS_AUTH_DOMAIN', os.environ)
|
| +
|
| + def testOauthLocalBadEmail(self):
|
| + self.assertOauthLocalFailed({'verified_email': False})
|
| +
|
| + def testOauthLocalBadClientId(self):
|
| + self.assertOauthLocalFailed({'issued_to': 'abc.appspot.com'})
|
| +
|
| + def testOauthLocalBadScopes(self):
|
| + self.assertOauthLocalFailed({'scope': 'useless_scope and_another'})
|
| +
|
| + def testGetCurrentUserNoAuthInfo(self):
|
| + self.assertRaises(users_id_token.InvalidGetUserCall,
|
| + users_id_token.get_current_user)
|
| +
|
| + def testGetCurrentUserEmailOnly(self):
|
| + os.environ['ENDPOINTS_AUTH_EMAIL'] = 'test@gmail.com'
|
| + os.environ['ENDPOINTS_AUTH_DOMAIN'] = ''
|
| + user = users_id_token.get_current_user()
|
| + self.assertEqual(user.email(), 'test@gmail.com')
|
| + self.assertIsNone(user.user_id())
|
| +
|
| + def testGetCurrentUserEmailAndAuth(self):
|
| + os.environ['ENDPOINTS_AUTH_EMAIL'] = 'test@gmail.com'
|
| + os.environ['ENDPOINTS_AUTH_DOMAIN'] = 'gmail.com'
|
| + user = users_id_token.get_current_user()
|
| + self.assertEqual(user.email(), 'test@gmail.com')
|
| + self.assertEqual(user.auth_domain(), 'gmail.com')
|
| + self.assertIsNone(user.user_id())
|
| +
|
| + def testGetCurrentUserOauth(self):
|
| + self.mox.StubOutWithMock(oauth, 'get_current_user')
|
| + oauth.get_current_user('scope').AndReturn(users.User('test@gmail.com'))
|
| + self.mox.ReplayAll()
|
| +
|
| + os.environ['ENDPOINTS_USE_OAUTH_SCOPE'] = 'scope'
|
| + user = users_id_token.get_current_user()
|
| + self.assertEqual(user.email(), 'test@gmail.com')
|
| + self.mox.VerifyAll()
|
| +
|
| + def testGetTokenQueryParamOauthHeader(self):
|
| + os.environ['HTTP_AUTHORIZATION'] = 'OAuth ' + self._SAMPLE_TOKEN
|
| + token = users_id_token._get_token(None)
|
| + self.assertEqual(token, self._SAMPLE_TOKEN)
|
| +
|
| + def testGetTokenQueryParamBearerHeader(self):
|
| + os.environ['HTTP_AUTHORIZATION'] = 'Bearer ' + self._SAMPLE_TOKEN
|
| + token = users_id_token._get_token(None)
|
| + self.assertEqual(token, self._SAMPLE_TOKEN)
|
| +
|
| + def testGetTokenQueryParamInvalidBearerHeader(self):
|
| + # Capitalization matters. This should fail.
|
| + os.environ['HTTP_AUTHORIZATION'] = 'BEARER ' + self._SAMPLE_TOKEN
|
| + token = users_id_token._get_token(None)
|
| + self.assertIsNone(token)
|
| +
|
| + def testGetTokenQueryParamInvalidHeader(self):
|
| + os.environ['HTTP_AUTHORIZATION'] = 'Invalid ' + self._SAMPLE_TOKEN
|
| + token = users_id_token._get_token(None)
|
| + self.assertIsNone(token)
|
| +
|
| + def testGetTokenQueryParamBearer(self):
|
| + request = self.mox.CreateMock(messages.Message)
|
| + request.get_unrecognized_field_info('bearer_token').AndReturn(
|
| + (self._SAMPLE_TOKEN, messages.Variant.STRING))
|
| +
|
| + self.mox.ReplayAll()
|
| + token = users_id_token._get_token(request)
|
| + self.mox.VerifyAll()
|
| + self.assertEqual(token, self._SAMPLE_TOKEN)
|
| +
|
| + def testGetTokenQueryParamAccess(self):
|
| + request = self.mox.CreateMock(messages.Message)
|
| + request.get_unrecognized_field_info('bearer_token').AndReturn(
|
| + (None, None))
|
| + request.get_unrecognized_field_info('access_token').AndReturn(
|
| + (self._SAMPLE_TOKEN, messages.Variant.STRING))
|
| +
|
| + self.mox.ReplayAll()
|
| + token = users_id_token._get_token(request)
|
| + self.mox.VerifyAll()
|
| + self.assertEqual(token, self._SAMPLE_TOKEN)
|
| +
|
| + def testGetTokenNone(self):
|
| + request = self.mox.CreateMock(messages.Message)
|
| + request.get_unrecognized_field_info('bearer_token').AndReturn((None, None))
|
| + request.get_unrecognized_field_info('access_token').AndReturn((None, None))
|
| +
|
| + self.mox.ReplayAll()
|
| + token = users_id_token._get_token(request)
|
| + self.mox.VerifyAll()
|
| + self.assertIsNone(token)
|
| +
|
| +
|
| +class UsersIdTokenTestWithSimpleApi(UsersIdTokenTestBase):
|
| +
|
| + # pylint: disable=g-bad-name
|
| +
|
| + @api_config.api('TestApi', 'v1')
|
| + class TestApiAnnotatedAtMethod(remote.Service):
|
| + """Describes TestApi."""
|
| +
|
| + @api_config.method(
|
| + message_types.VoidMessage, message_types.VoidMessage,
|
| + audiences=UsersIdTokenTestBase._SAMPLE_AUDIENCES,
|
| + allowed_client_ids=UsersIdTokenTestBase._SAMPLE_ALLOWED_CLIENT_IDS,
|
| + scopes=UsersIdTokenTestBase._SAMPLE_OAUTH_SCOPES)
|
| + def method(self):
|
| + pass
|
| +
|
| + @api_config.api(
|
| + 'TestApi', 'v1', audiences=UsersIdTokenTestBase._SAMPLE_AUDIENCES,
|
| + allowed_client_ids=UsersIdTokenTestBase._SAMPLE_ALLOWED_CLIENT_IDS)
|
| + class TestApiAnnotatedAtApi(remote.Service):
|
| + """Describes TestApi."""
|
| +
|
| + @api_config.method(message_types.VoidMessage, message_types.VoidMessage)
|
| + def method(self, request):
|
| + return request
|
| + # pylint: enable=g-bad-name
|
| +
|
| + def testMaybeSetVarsAlreadySetOauth(self):
|
| + os.environ['ENDPOINTS_USE_OAUTH_SCOPE'] = (
|
| + 'https://www.googleapis.com/auth/userinfo.email')
|
| + users_id_token._maybe_set_current_user_vars(
|
| + self.TestApiAnnotatedAtApi().method)
|
| + self.assertEqual('https://www.googleapis.com/auth/userinfo.email',
|
| + os.environ.get('ENDPOINTS_USE_OAUTH_SCOPE'))
|
| + self.assertNotIn('ENDPOINTS_AUTH_EMAIL', os.environ)
|
| + self.assertNotIn('ENDPOINTS_AUTH_DOMAIN', os.environ)
|
| +
|
| + def testMaybeSetVarsAlreadySetIdToken(self):
|
| + os.environ['ENDPOINTS_AUTH_EMAIL'] = 'test@gmail.com'
|
| + os.environ['ENDPOINTS_AUTH_DOMAIN'] = 'gmail.com'
|
| + users_id_token._maybe_set_current_user_vars(
|
| + self.TestApiAnnotatedAtApi().method)
|
| + self.assertNotIn('ENDPOINTS_USE_OAUTH_SCOPE', os.environ)
|
| + self.assertEqual('test@gmail.com', os.environ.get('ENDPOINTS_AUTH_EMAIL'))
|
| + self.assertEqual('gmail.com', os.environ.get('ENDPOINTS_AUTH_DOMAIN'))
|
| +
|
| + def testMaybeSetVarsAlreadySetIdTokenNoDomain(self):
|
| + os.environ['ENDPOINTS_AUTH_EMAIL'] = 'test@gmail.com'
|
| + os.environ['ENDPOINTS_AUTH_DOMAIN'] = ''
|
| + users_id_token._maybe_set_current_user_vars(
|
| + self.TestApiAnnotatedAtApi().method)
|
| + self.assertNotIn('ENDPOINTS_USE_OAUTH_SCOPE', os.environ)
|
| + self.assertEqual('test@gmail.com', os.environ.get('ENDPOINTS_AUTH_EMAIL'))
|
| + self.assertEqual('', os.environ.get('ENDPOINTS_AUTH_DOMAIN'))
|
| +
|
| + def VerifyIdToken(self, cls, *args):
|
| + self.mox.StubOutWithMock(time, 'time')
|
| + self.mox.StubOutWithMock(users_id_token, '_get_id_token_user')
|
| + time.time().AndReturn(1001)
|
| + users_id_token._get_id_token_user(
|
| + self._SAMPLE_TOKEN,
|
| + self._SAMPLE_AUDIENCES,
|
| + self._SAMPLE_ALLOWED_CLIENT_IDS,
|
| + 1001, memcache).AndReturn(users.User('test@gmail.com'))
|
| + self.mox.ReplayAll()
|
| +
|
| + os.environ['HTTP_AUTHORIZATION'] = ('Bearer ' + self._SAMPLE_TOKEN)
|
| + if args:
|
| + cls.method(*args)
|
| + else:
|
| + users_id_token._maybe_set_current_user_vars(cls.method)
|
| + self.assertEqual(os.environ.get('ENDPOINTS_AUTH_EMAIL'), 'test@gmail.com')
|
| + self.mox.VerifyAll()
|
| +
|
| + def testMaybeSetVarsIdTokenApiAnnotation(self):
|
| + self.VerifyIdToken(self.TestApiAnnotatedAtApi())
|
| +
|
| + def testMaybeSetVarsIdTokenMethodAnnotation(self):
|
| + self.VerifyIdToken(self.TestApiAnnotatedAtMethod())
|
| +
|
| + def testMethodCallParsesIdToken(self):
|
| + self.VerifyIdToken(self.TestApiAnnotatedAtApi(),
|
| + message_types.VoidMessage())
|
| +
|
| + def testMaybeSetVarsWithActualRequestAccessToken(self):
|
| + dummy_scope = 'scope'
|
| + dummy_token = 'dummy_token'
|
| + dummy_email = 'test@gmail.com'
|
| + dummy_client_id = self._SAMPLE_ALLOWED_CLIENT_IDS[0]
|
| +
|
| + @api_config.api('TestApi', 'v1',
|
| + allowed_client_ids=self._SAMPLE_ALLOWED_CLIENT_IDS,
|
| + scopes=[dummy_scope])
|
| + class TestApiScopes(remote.Service):
|
| + """Describes TestApiScopes."""
|
| +
|
| + # pylint: disable=g-bad-name
|
| + @api_config.method(message_types.VoidMessage, message_types.VoidMessage)
|
| + def method(self, request):
|
| + return request
|
| +
|
| + # users_id_token._get_id_token_user and time.time don't need to be stubbed
|
| + # because the scopes used will not be [EMAIL_SCOPE] hence _get_id_token_user
|
| + # will never be attempted
|
| +
|
| + self.mox.StubOutWithMock(users_id_token, '_is_local_dev')
|
| + users_id_token._is_local_dev().AndReturn(False)
|
| +
|
| + self.mox.StubOutWithMock(oauth, 'get_client_id')
|
| + oauth.get_client_id(dummy_scope).AndReturn(dummy_client_id)
|
| +
|
| + self.mox.ReplayAll()
|
| +
|
| + api_instance = TestApiScopes()
|
| + os.environ['HTTP_AUTHORIZATION'] = 'Bearer ' + dummy_token
|
| + api_instance.method(message_types.VoidMessage())
|
| + self.assertEqual(os.getenv('ENDPOINTS_USE_OAUTH_SCOPE'), dummy_scope)
|
| + self.mox.VerifyAll()
|
| +
|
| + def testMaybeSetVarsFail(self):
|
| + self.mox.StubOutWithMock(time, 'time')
|
| + time.time().MultipleTimes().AndReturn(1001)
|
| + self.mox.StubOutWithMock(users_id_token, '_get_id_token_user')
|
| + users_id_token._get_id_token_user(
|
| + self._SAMPLE_TOKEN,
|
| + self._SAMPLE_AUDIENCES,
|
| + self._SAMPLE_ALLOWED_CLIENT_IDS,
|
| + 1001, memcache).MultipleTimes().AndReturn(users.User('test@gmail.com'))
|
| + self.mox.ReplayAll()
|
| + # This token should correctly result in _get_id_token_user being called
|
| + os.environ['HTTP_AUTHORIZATION'] = ('Bearer ' + self._SAMPLE_TOKEN)
|
| + api_instance = self.TestApiAnnotatedAtApi()
|
| +
|
| + # No im_self is present and no api_info can be used, so the method itself
|
| + # has no access to scopes, hence scopes will be null and neither of the
|
| + # token checks will occur
|
| + users_id_token._maybe_set_current_user_vars(api_instance.method.im_func)
|
| + self.assertNotIn('ENDPOINTS_USE_OAUTH_SCOPE', os.environ)
|
| + self.assertEqual(os.getenv('ENDPOINTS_AUTH_EMAIL'), '')
|
| + self.assertEqual(os.getenv('ENDPOINTS_AUTH_DOMAIN'), '')
|
| +
|
| + # Test the same works when using the method and not im_func
|
| + os.environ.pop('ENDPOINTS_AUTH_EMAIL')
|
| + os.environ.pop('ENDPOINTS_AUTH_DOMAIN')
|
| + users_id_token._maybe_set_current_user_vars(api_instance.method)
|
| + self.assertEqual(os.getenv('ENDPOINTS_AUTH_EMAIL'), 'test@gmail.com')
|
| +
|
| + # Test that it works using the api info from the API
|
| + os.environ.pop('ENDPOINTS_AUTH_EMAIL')
|
| + os.environ.pop('ENDPOINTS_AUTH_DOMAIN')
|
| + users_id_token._maybe_set_current_user_vars(api_instance.method.im_func,
|
| + api_info=api_instance.api_info)
|
| + self.assertEqual(os.getenv('ENDPOINTS_AUTH_EMAIL'), 'test@gmail.com')
|
| + self.mox.VerifyAll()
|
| +
|
| +
|
| +if __name__ == '__main__':
|
| + unittest.main()
|
|
|