Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(10)

Unified Diff: third_party/google-endpoints/endpoints/test/users_id_token_test.py

Issue 2666783008: Add google-endpoints to third_party/. (Closed)
Patch Set: Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/google-endpoints/endpoints/test/users_id_token_test.py
diff --git a/third_party/google-endpoints/endpoints/test/users_id_token_test.py b/third_party/google-endpoints/endpoints/test/users_id_token_test.py
new file mode 100644
index 0000000000000000000000000000000000000000..038175289d5c422dfcfeacaca8ad5fdd6efa23e9
--- /dev/null
+++ b/third_party/google-endpoints/endpoints/test/users_id_token_test.py
@@ -0,0 +1,702 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Tests for users_id_token and validate_id_token."""
+
+import base64
+import json
+import os
+import string
+import time
+import unittest
+
+import endpoints.api_config as api_config
+
+import mox
+from protorpc import message_types
+from protorpc import messages
+from protorpc import remote
+
+import test_util
+import endpoints.users_id_token as users_id_token
+
+from google.appengine.api import memcache
+from google.appengine.api import oauth
+from google.appengine.api import urlfetch
+from google.appengine.api import users
+
+
+# The key response that allows the _SAMPLE_TOKEN to be verified. This key was
+# retrieved from:
+# http://www-googleapis-test.sandbox.google.com//oauth2/v1/raw_public_keys
+# ...at the same time that _SAMPLE_TOKEN was generated.
+# The first cert is too short, which caused an exception 'Plaintext too large'
+# from RSA.encrypt (b/19127342); the second cert is the correct cert. Put both
+# there to make sure the second cert is tried when the first failed.
+
+_CACHED_CERT = {
+ 'keyvalues': [
+ {
+ 'algorithm': 'RSA',
+ 'modulus': ('2bqhkZ+DZSuQvHX3rdoIni39gfl6zny0WZK6dLPP2lRmer1aEAP982'
+ 'u2B1siXoXB8HN+pwCZMGV5kbHaG13InopeVNIMFl2IU4aql+hDS0+i'
+ 'j+1Rrsa6wHWp4+3eKe9q+VqXMdulclegHjVtxDs76W1lpuP1e6Msc3'
+ 'IuSXjR'),
+ 'exponent': 'AQAB',
+ 'keyid': '458790a80f9c9957e8df61332b9f06faa6472bad'
+ },
+ {
+ 'algorithm': 'RSA',
+ 'modulus': ('AL18Q+dq5ws4/V7KtgfhC6SwJH20GvUN5z3vf4SUSrpQG2/nySBvUh'
+ 'Iv86Hkk4Uy7W+OTq2+csCGhjGnRxBx9BThT85G8F6IGNjcOyNHVtnR'
+ 'ifX+T88sUB1l7jAISRMCrgHIRNmwDCmEe1fTqTUOdgDT8nB7pX7SA/'
+ 'VH0q+t2xml'),
+ 'exponent': 'AQAB',
+ 'keyid': '7411abfccccb4c253cd3e75b4fa5887f49aa83d1'
+ },
+ ]
+}
+
+
+class ModuleInterfaceTest(test_util.ModuleInterfaceTest,
+ unittest.TestCase):
+
+ MODULE = users_id_token
+
+
+class TestCache(object):
+ """Test stub to replace memcache for id_token verification."""
+
+ def __init__(self):
+ self._used_cached_value = False
+ self._value_was_set = False
+
+ @property
+ def used_cached_value(self):
+ return self._used_cached_value
+
+ @property
+ def value_was_set(self):
+ return self._value_was_set
+
+ # pylint: disable=g-bad-name
+ def get(self, key, *unused_args, **kwargs):
+ if (key == users_id_token._DEFAULT_CERT_URI and
+ kwargs.get('namespace', '') == users_id_token._CERT_NAMESPACE):
+ self._used_cached_value = True
+ return _CACHED_CERT
+ return None
+
+ def set(self, *unused_args, **unused_kwargs):
+ self._value_was_set = True
+
+
+class UsersIdTokenTestBase(unittest.TestCase):
+ """A sample token based on JWT.
+
+ Sample token is based on a JWT with this body:
+ {
+ "iss":"accounts.google.com",
+ "email":"kevind@gmail.com",
+ "email_verified":"true",
+ "aud":"919214422084-c0jrodnkm7ntttjhhttilqjq5d7l7mu5.apps."
+ "googleusercontent.com",
+ "sub":"104564329451840817415",
+ "azp":"919214422084-c0jrodnkm7ntttjhhttilqjq5d7l7mu5.apps."
+ "googleusercontent.com",
+ "at_hash":"c9aVyHiathUC-pgRFjFWbw",
+ "iat":1360964700,
+ "exp":1360968600
+ }
+ """
+ _SAMPLE_TOKEN = ('eyJhbGciOiJSUzI1NiIsImtpZCI6Ijc0MTFhYmZjY2NjYjRjMjUzY2QzZTc'
+ '1YjRmYTU4ODdmNDlhYTgzZDEifQ.eyJpc3MiOiJhY2NvdW50cy5nb29nbGU'
+ 'uY29tIiwiZW1haWwiOiJrZXZpbmRAZ21haWwuY29tIiwiZW1haWxfdmVyaW'
+ 'ZpZWQiOiJ0cnVlIiwiYXVkIjoiOTE5MjE0NDIyMDg0LWMwanJvZG5rbTdud'
+ 'HR0amhodHRpbHFqcTVkN2w3bXU1LmFwcHMuZ29vZ2xldXNlcmNvbnRlbnQu'
+ 'Y29tIiwic3ViIjoiMTA0NTY0MzI5NDUxODQwODE3NDE1IiwiYXpwIjoiOTE'
+ '5MjE0NDIyMDg0LWMwanJvZG5rbTdudHR0amhodHRpbHFqcTVkN2w3bXU1Lm'
+ 'FwcHMuZ29vZ2xldXNlcmNvbnRlbnQuY29tIiwiYXRfaGFzaCI6ImM5YVZ5S'
+ 'GlhdGhVQy1wZ1JGakZXYnciLCJpYXQiOjEzNjA5NjQ3MDAsImV4cCI6MTM2'
+ 'MDk2ODYwMH0.XwaGmw5n1XHJapwkn6pumK14l9Tiyn1q2C5VeYbvuScNS6Z'
+ '-kdb9mX87Hl2hbdUvHm6TNzabMVTgvHPATjuCAt2lXOpwm8iGnon6vTk5LM'
+ 'm0tUAE25IAImvpSc59l0ySd4x2g3BvjauxwaYjkwYJRVczsVlTTB3iKlBhW'
+ 'IT01vM')
+ _SAMPLE_AUDIENCES = ('919214422084-c0jrodnkm7ntttjhhttilqjq5d7l7mu5.apps.'
+ 'googleusercontent.com',)
+ _SAMPLE_ALLOWED_CLIENT_IDS = ('919214422084-c0jrodnkm7ntttjhhttilqjq5d7l7mu5.'
+ 'apps.googleusercontent.com',
+ '12345.apps.googleusercontent.com')
+ _SAMPLE_TIME_NOW = 1360964700
+ _SAMPLE_OAUTH_SCOPES = ['https://www.googleapis.com/auth/userinfo.email']
+ _SAMPLE_OAUTH_TOKEN_INFO = {
+ 'issued_to': ('919214422084-c0jrodnkm7ntttjhhttilqjq5d7l7mu5.apps.'
+ 'googleusercontent.com'),
+ 'user_id': '108495933693426793887',
+ 'expires_in': 3384,
+ 'access_type': 'online',
+ 'audience': ('919214422084-c0jrodnkm7ntttjhhttilqjq5d7l7mu5.apps.'
+ 'googleusercontent.com'),
+ 'scope': (
+ 'https://www.googleapis.com/auth/userinfo.profile '
+ 'https://www.googleapis.com/auth/userinfo.email'),
+ 'email': 'kevind@gmail.com',
+ 'verified_email': True
+ }
+
+ def setUp(self):
+ self.cache = TestCache()
+ self._saved_environ = os.environ.copy()
+ if 'AUTH_DOMAIN' not in os.environ:
+ os.environ['AUTH_DOMAIN'] = 'gmail.com'
+ self.mox = mox.Mox()
+
+ def tearDown(self):
+ self.mox.UnsetStubs()
+ os.environ = self._saved_environ
+
+ def GetSampleBody(self):
+ split_token = self._SAMPLE_TOKEN.split('.')
+ body = json.loads(users_id_token._urlsafe_b64decode(split_token[1]))
+ return body
+
+
+class UsersIdTokenTest(UsersIdTokenTestBase):
+
+ def testSampleIdToken(self):
+ user = users_id_token._get_id_token_user(self._SAMPLE_TOKEN,
+ self._SAMPLE_AUDIENCES,
+ self._SAMPLE_ALLOWED_CLIENT_IDS,
+ self._SAMPLE_TIME_NOW, self.cache)
+ self.assertEqual(user.email(), 'kevind@gmail.com')
+ # User ID shouldn't be filled in. See notes in users_id_token.py.
+ self.assertIsNone(user.user_id())
+ self.assertTrue(self.cache.used_cached_value)
+
+ def testInvalidSignature(self):
+ """Verify that a body that doesn't match the signature fails."""
+ body = self.GetSampleBody()
+ # Modify the issued and expiration times.
+ body['iat'] += 60
+ body['exp'] += 60
+ encoded_body = base64.urlsafe_b64encode(json.dumps(body))
+
+ split_token = self._SAMPLE_TOKEN.split('.')
+ token = '.'.join((split_token[0], encoded_body, split_token[2]))
+
+ self.assertRaises(users_id_token._AppIdentityError,
+ users_id_token._verify_signed_jwt_with_certs,
+ token, self._SAMPLE_TIME_NOW, self.cache)
+
+ def testNoCertRaisesException(self):
+ """Verify that if we can't get certs, we fail."""
+ self.assertRaises(users_id_token._AppIdentityError,
+ users_id_token._verify_signed_jwt_with_certs,
+ self._SAMPLE_TOKEN, self._SAMPLE_TIME_NOW, self.cache,
+ 'https://bad.url/not/in/test/cache')
+
+ def testGetCertExpirationTime(self):
+ """Test that we can correctly get cert expiration time from headers."""
+ tests = [({'Cache-Control': 'max-age=3600'}, 3600),
+ ({'Cache-Control': 'max-age=3600', 'Age': '1200'}, 2400),
+ ({}, 0),
+ ({'Age': '1'}, 0),
+ ({'Cache-Control': 'max-age=3600', 'Age': '3700'}, 0),
+ ({'Cache-Control': 'max-age=3600', 'Age': 'bad'}, 3600),
+ ({'Cache-Control': 'max-age=nomatch,max-age=1200'}, 1200),
+ ({'Cache-Control': 'max-age=invalid'}, 0)]
+ for headers, expected_result in tests:
+ result = users_id_token._get_cert_expiration_time(headers)
+ self.assertEqual(expected_result, result)
+
+ def testCertCacheControl(self):
+ """Test that cache control headers are respected."""
+ self.mox.StubOutWithMock(urlfetch, 'fetch')
+ tests = [({'Cache-Control': 'max-age=3600', 'Age': '1200'}, True),
+ ({'Cache-Control': 'max-age=100', 'Age': '100'}, False),
+ ({}, False)]
+ for test_headers, value_set in tests:
+
+ class DummyResponse(object):
+ status_code = 200
+ content = json.dumps(self._SAMPLE_OAUTH_TOKEN_INFO)
+ headers = test_headers
+
+ urlfetch.fetch(mox.IsA(basestring)).AndReturn(DummyResponse())
+ cache = TestCache()
+
+ self.mox.ReplayAll()
+ users_id_token._get_cached_certs('some_uri', cache)
+ self.mox.VerifyAll()
+ self.mox.ResetAll()
+
+ self.assertEqual(value_set, cache.value_was_set)
+
+ def testInvalidTokenExtraSections(self):
+ """Verify that a token with too many pieces fails."""
+ self.assertRaises(users_id_token._AppIdentityError,
+ users_id_token._verify_signed_jwt_with_certs,
+ self._SAMPLE_TOKEN + '.asdf', self._SAMPLE_TIME_NOW,
+ self.cache)
+
+ def testNoCrypto(self):
+ """Verify we throw an _AppIdentityError if the Crypto modules don't load."""
+ crypto_loaded = users_id_token._CRYPTO_LOADED
+ try:
+ users_id_token._CRYPTO_LOADED = False
+ self.assertRaises(users_id_token._AppIdentityError,
+ users_id_token._verify_signed_jwt_with_certs,
+ self._SAMPLE_TOKEN, self._SAMPLE_TIME_NOW,
+ self.cache)
+ finally:
+ users_id_token._CRYPTO_LOADED = crypto_loaded
+
+ def testExpiredToken(self):
+ """Verify that expired tokens will fail."""
+ expired_time_now = (self._SAMPLE_TIME_NOW +
+ users_id_token._MAX_TOKEN_LIFETIME_SECS + 1)
+ self.assertRaises(users_id_token._AppIdentityError,
+ users_id_token._verify_signed_jwt_with_certs,
+ self._SAMPLE_TOKEN, expired_time_now,
+ self.cache)
+ # Also verify that this doesn't return a user when called from
+ # users_id_token.
+ user = users_id_token._get_id_token_user(self._SAMPLE_TOKEN,
+ self._SAMPLE_AUDIENCES,
+ self._SAMPLE_ALLOWED_CLIENT_IDS,
+ expired_time_now, self.cache)
+ self.assertIsNone(user)
+
+ def testTimeTooEarly(self):
+ """Verify that we'll fail if the provided time_now is too early."""
+ early_time_now = (self._SAMPLE_TIME_NOW -
+ users_id_token._CLOCK_SKEW_SECS - 1)
+ self.assertRaises(users_id_token._AppIdentityError,
+ users_id_token._verify_signed_jwt_with_certs,
+ self._SAMPLE_TOKEN, early_time_now,
+ self.cache)
+
+ def CheckErrorLoggable(self, token):
+ """Verify that the error strings we log are valid, loggable strings."""
+ try:
+ users_id_token._verify_signed_jwt_with_certs(
+ token, self._SAMPLE_TIME_NOW, self.cache)
+ self.fail('Expected exception.')
+ except users_id_token._AppIdentityError, e:
+ # Make sure this works without an exception.
+ try:
+ str(e).decode('utf-8')
+ except UnicodeDecodeError:
+ printable = ''.join(c if c in string.printable
+ else '\\x%02x' % ord(c)
+ for c in str(e))
+ self.fail('Unsafe error sent to log: %s' % printable)
+
+ def testErrorStringLoggableWrongSegments(self):
+ """Check that the Wrong Segments error is loggable."""
+ self.CheckErrorLoggable('bad utf-8 \xff')
+
+ def testErrorStringLoggableBadHeader(self):
+ """Check that the Bad Header error is loggable."""
+ token_part = 'bad utf-8 \xff'
+ token = '.'.join([base64.urlsafe_b64encode(token_part)] * 3)
+ self.CheckErrorLoggable(token)
+
+ def testErrorStringLoggableBadBody(self):
+ """Check that the Unparseable Body error is loggable."""
+ token_body = 'bad utf-8 \xff'
+ token_parts = self._SAMPLE_TOKEN.split('.')
+ token = '.'.join([token_parts[0],
+ base64.urlsafe_b64encode(token_body),
+ token_parts[2]])
+ self.CheckErrorLoggable(token)
+
+ def CheckToken(self, field_update_dict, valid):
+ """Update the sample token and check if it's valid or invalid.
+
+ This updates the body of our sample token with the fields in
+ field_update_dict, then passes it to _verify_parsed_token. The result must
+ match the "valid" parameter.
+
+ Args:
+ field_update_dict: A dict of fields to update in the sample body.
+ valid: A boolean, compared against the result from _verify_parsed_token.
+ """
+ parsed_token = self.GetSampleBody()
+ parsed_token.update(field_update_dict)
+ result = users_id_token._verify_parsed_token(
+ parsed_token, self._SAMPLE_AUDIENCES, self._SAMPLE_ALLOWED_CLIENT_IDS)
+ self.assertEqual(valid, result)
+
+ def testInvalidIssuer(self):
+ self.CheckToken({'iss': 'invalid.issuer'}, False)
+
+ def testInvalidAudience(self):
+ self.CheckToken({'aud': 'invalid.audience'}, False)
+
+ def testInvalidClientId(self):
+ self.CheckToken({'azp': 'invalid.client.id'}, False)
+
+ def testSampleIdTokenWithOldFields(self):
+ self.CheckToken({'cid': 'Extra ignored field.'}, True)
+
+ def testSkipClientIdNotAllowedForIdTokens(self):
+ """Verify that SKIP_CLIENT_ID_CHECKS does not work for ID tokens."""
+ parsed_token = self.GetSampleBody()
+ result = users_id_token._verify_parsed_token(
+ parsed_token, self._SAMPLE_AUDIENCES,
+ users_id_token.SKIP_CLIENT_ID_CHECK)
+ self.assertEqual(False, result)
+
+ def testEmptyAudience(self):
+ parsed_token = self.GetSampleBody()
+ parsed_token.update({'aud': 'invalid.audience'})
+ result = users_id_token._verify_parsed_token(
+ parsed_token, [], self._SAMPLE_ALLOWED_CLIENT_IDS)
+ self.assertEqual(False, result)
+
+ def AttemptOauth(self, client_id, allowed_client_ids=None):
+ if allowed_client_ids is None:
+ allowed_client_ids = self._SAMPLE_ALLOWED_CLIENT_IDS
+ self.mox.StubOutWithMock(oauth, 'get_client_id')
+ # We have four cases:
+ # * no client ID is specified, so we raise for every scope.
+ # * the given client ID is in the whitelist or there is no
+ # whitelist, so we'll only be called once.
+ # * we have a client ID not on the whitelist, so we need a
+ # mock call for every scope.
+ if client_id is None:
+ for scope in self._SAMPLE_OAUTH_SCOPES:
+ oauth.get_client_id(scope).AndRaise(oauth.Error)
+ elif (list(allowed_client_ids) == users_id_token.SKIP_CLIENT_ID_CHECK or
+ client_id in allowed_client_ids):
+ scope = self._SAMPLE_OAUTH_SCOPES[0]
+ oauth.get_client_id(scope).AndReturn(client_id)
+ else:
+ for scope in self._SAMPLE_OAUTH_SCOPES:
+ oauth.get_client_id(scope).AndReturn(client_id)
+
+ self.mox.ReplayAll()
+ users_id_token._set_bearer_user_vars(allowed_client_ids,
+ self._SAMPLE_OAUTH_SCOPES)
+ self.mox.VerifyAll()
+
+ def assertOauthSucceeded(self, client_id):
+ self.AttemptOauth(client_id)
+ self.assertEqual(os.environ.get('ENDPOINTS_USE_OAUTH_SCOPE'),
+ self._SAMPLE_OAUTH_SCOPES[0])
+
+ def assertOauthFailed(self, client_id):
+ self.AttemptOauth(client_id)
+ self.assertNotIn('ENDPOINTS_USE_OAUTH_SCOPE', os.environ)
+
+ def testOauthInvalidClientId(self):
+ self.assertOauthFailed('abc.appspot.com')
+
+ def testOauthValidClientId(self):
+ self.assertOauthSucceeded(self._SAMPLE_ALLOWED_CLIENT_IDS[0])
+
+ def testOauthExplorerClientId(self):
+ self.assertOauthFailed(api_config.API_EXPLORER_CLIENT_ID)
+
+ def testOauthInvalidScope(self):
+ self.assertOauthFailed(None)
+
+ def testAllowAllClientIds(self):
+ client_id = 'clearly_fake_id'
+ self.AttemptOauth(client_id,
+ allowed_client_ids=users_id_token.SKIP_CLIENT_ID_CHECK)
+ self.assertEqual(os.environ.get('ENDPOINTS_USE_OAUTH_SCOPE'),
+ self._SAMPLE_OAUTH_SCOPES[0])
+
+ def AttemptOauthLocal(self, token_update=None):
+ token = self._SAMPLE_OAUTH_TOKEN_INFO.copy()
+ token.update(token_update or {})
+
+ class DummyResponse(object):
+ status_code = 200
+ content = json.dumps(token)
+
+ self.mox.StubOutWithMock(urlfetch, 'fetch')
+ urlfetch.fetch(mox.IsA(basestring)).AndReturn(DummyResponse())
+
+ self.mox.ReplayAll()
+ users_id_token._set_bearer_user_vars_local('unused_token',
+ self._SAMPLE_ALLOWED_CLIENT_IDS,
+ self._SAMPLE_OAUTH_SCOPES)
+ self.mox.VerifyAll()
+
+ def testOauthLocal(self):
+ self.AttemptOauthLocal()
+ self.assertNotIn('ENDPOINTS_USE_OAUTH_SCOPE', os.environ)
+ self.assertEqual('kevind@gmail.com',
+ os.environ.get('ENDPOINTS_AUTH_EMAIL'))
+ self.assertEqual('', os.environ.get('ENDPOINTS_AUTH_DOMAIN'))
+
+ def assertOauthLocalFailed(self, token_update):
+ self.AttemptOauthLocal(token_update)
+ self.assertNotIn('ENDPOINTS_USE_OAUTH_SCOPE', os.environ)
+ self.assertNotIn('ENDPOINTS_AUTH_EMAIL', os.environ)
+ self.assertNotIn('ENDPOINTS_AUTH_DOMAIN', os.environ)
+
+ def testOauthLocalBadEmail(self):
+ self.assertOauthLocalFailed({'verified_email': False})
+
+ def testOauthLocalBadClientId(self):
+ self.assertOauthLocalFailed({'issued_to': 'abc.appspot.com'})
+
+ def testOauthLocalBadScopes(self):
+ self.assertOauthLocalFailed({'scope': 'useless_scope and_another'})
+
+ def testGetCurrentUserNoAuthInfo(self):
+ self.assertRaises(users_id_token.InvalidGetUserCall,
+ users_id_token.get_current_user)
+
+ def testGetCurrentUserEmailOnly(self):
+ os.environ['ENDPOINTS_AUTH_EMAIL'] = 'test@gmail.com'
+ os.environ['ENDPOINTS_AUTH_DOMAIN'] = ''
+ user = users_id_token.get_current_user()
+ self.assertEqual(user.email(), 'test@gmail.com')
+ self.assertIsNone(user.user_id())
+
+ def testGetCurrentUserEmailAndAuth(self):
+ os.environ['ENDPOINTS_AUTH_EMAIL'] = 'test@gmail.com'
+ os.environ['ENDPOINTS_AUTH_DOMAIN'] = 'gmail.com'
+ user = users_id_token.get_current_user()
+ self.assertEqual(user.email(), 'test@gmail.com')
+ self.assertEqual(user.auth_domain(), 'gmail.com')
+ self.assertIsNone(user.user_id())
+
+ def testGetCurrentUserOauth(self):
+ self.mox.StubOutWithMock(oauth, 'get_current_user')
+ oauth.get_current_user('scope').AndReturn(users.User('test@gmail.com'))
+ self.mox.ReplayAll()
+
+ os.environ['ENDPOINTS_USE_OAUTH_SCOPE'] = 'scope'
+ user = users_id_token.get_current_user()
+ self.assertEqual(user.email(), 'test@gmail.com')
+ self.mox.VerifyAll()
+
+ def testGetTokenQueryParamOauthHeader(self):
+ os.environ['HTTP_AUTHORIZATION'] = 'OAuth ' + self._SAMPLE_TOKEN
+ token = users_id_token._get_token(None)
+ self.assertEqual(token, self._SAMPLE_TOKEN)
+
+ def testGetTokenQueryParamBearerHeader(self):
+ os.environ['HTTP_AUTHORIZATION'] = 'Bearer ' + self._SAMPLE_TOKEN
+ token = users_id_token._get_token(None)
+ self.assertEqual(token, self._SAMPLE_TOKEN)
+
+ def testGetTokenQueryParamInvalidBearerHeader(self):
+ # Capitalization matters. This should fail.
+ os.environ['HTTP_AUTHORIZATION'] = 'BEARER ' + self._SAMPLE_TOKEN
+ token = users_id_token._get_token(None)
+ self.assertIsNone(token)
+
+ def testGetTokenQueryParamInvalidHeader(self):
+ os.environ['HTTP_AUTHORIZATION'] = 'Invalid ' + self._SAMPLE_TOKEN
+ token = users_id_token._get_token(None)
+ self.assertIsNone(token)
+
+ def testGetTokenQueryParamBearer(self):
+ request = self.mox.CreateMock(messages.Message)
+ request.get_unrecognized_field_info('bearer_token').AndReturn(
+ (self._SAMPLE_TOKEN, messages.Variant.STRING))
+
+ self.mox.ReplayAll()
+ token = users_id_token._get_token(request)
+ self.mox.VerifyAll()
+ self.assertEqual(token, self._SAMPLE_TOKEN)
+
+ def testGetTokenQueryParamAccess(self):
+ request = self.mox.CreateMock(messages.Message)
+ request.get_unrecognized_field_info('bearer_token').AndReturn(
+ (None, None))
+ request.get_unrecognized_field_info('access_token').AndReturn(
+ (self._SAMPLE_TOKEN, messages.Variant.STRING))
+
+ self.mox.ReplayAll()
+ token = users_id_token._get_token(request)
+ self.mox.VerifyAll()
+ self.assertEqual(token, self._SAMPLE_TOKEN)
+
+ def testGetTokenNone(self):
+ request = self.mox.CreateMock(messages.Message)
+ request.get_unrecognized_field_info('bearer_token').AndReturn((None, None))
+ request.get_unrecognized_field_info('access_token').AndReturn((None, None))
+
+ self.mox.ReplayAll()
+ token = users_id_token._get_token(request)
+ self.mox.VerifyAll()
+ self.assertIsNone(token)
+
+
+class UsersIdTokenTestWithSimpleApi(UsersIdTokenTestBase):
+
+ # pylint: disable=g-bad-name
+
+ @api_config.api('TestApi', 'v1')
+ class TestApiAnnotatedAtMethod(remote.Service):
+ """Describes TestApi."""
+
+ @api_config.method(
+ message_types.VoidMessage, message_types.VoidMessage,
+ audiences=UsersIdTokenTestBase._SAMPLE_AUDIENCES,
+ allowed_client_ids=UsersIdTokenTestBase._SAMPLE_ALLOWED_CLIENT_IDS,
+ scopes=UsersIdTokenTestBase._SAMPLE_OAUTH_SCOPES)
+ def method(self):
+ pass
+
+ @api_config.api(
+ 'TestApi', 'v1', audiences=UsersIdTokenTestBase._SAMPLE_AUDIENCES,
+ allowed_client_ids=UsersIdTokenTestBase._SAMPLE_ALLOWED_CLIENT_IDS)
+ class TestApiAnnotatedAtApi(remote.Service):
+ """Describes TestApi."""
+
+ @api_config.method(message_types.VoidMessage, message_types.VoidMessage)
+ def method(self, request):
+ return request
+ # pylint: enable=g-bad-name
+
+ def testMaybeSetVarsAlreadySetOauth(self):
+ os.environ['ENDPOINTS_USE_OAUTH_SCOPE'] = (
+ 'https://www.googleapis.com/auth/userinfo.email')
+ users_id_token._maybe_set_current_user_vars(
+ self.TestApiAnnotatedAtApi().method)
+ self.assertEqual('https://www.googleapis.com/auth/userinfo.email',
+ os.environ.get('ENDPOINTS_USE_OAUTH_SCOPE'))
+ self.assertNotIn('ENDPOINTS_AUTH_EMAIL', os.environ)
+ self.assertNotIn('ENDPOINTS_AUTH_DOMAIN', os.environ)
+
+ def testMaybeSetVarsAlreadySetIdToken(self):
+ os.environ['ENDPOINTS_AUTH_EMAIL'] = 'test@gmail.com'
+ os.environ['ENDPOINTS_AUTH_DOMAIN'] = 'gmail.com'
+ users_id_token._maybe_set_current_user_vars(
+ self.TestApiAnnotatedAtApi().method)
+ self.assertNotIn('ENDPOINTS_USE_OAUTH_SCOPE', os.environ)
+ self.assertEqual('test@gmail.com', os.environ.get('ENDPOINTS_AUTH_EMAIL'))
+ self.assertEqual('gmail.com', os.environ.get('ENDPOINTS_AUTH_DOMAIN'))
+
+ def testMaybeSetVarsAlreadySetIdTokenNoDomain(self):
+ os.environ['ENDPOINTS_AUTH_EMAIL'] = 'test@gmail.com'
+ os.environ['ENDPOINTS_AUTH_DOMAIN'] = ''
+ users_id_token._maybe_set_current_user_vars(
+ self.TestApiAnnotatedAtApi().method)
+ self.assertNotIn('ENDPOINTS_USE_OAUTH_SCOPE', os.environ)
+ self.assertEqual('test@gmail.com', os.environ.get('ENDPOINTS_AUTH_EMAIL'))
+ self.assertEqual('', os.environ.get('ENDPOINTS_AUTH_DOMAIN'))
+
+ def VerifyIdToken(self, cls, *args):
+ self.mox.StubOutWithMock(time, 'time')
+ self.mox.StubOutWithMock(users_id_token, '_get_id_token_user')
+ time.time().AndReturn(1001)
+ users_id_token._get_id_token_user(
+ self._SAMPLE_TOKEN,
+ self._SAMPLE_AUDIENCES,
+ self._SAMPLE_ALLOWED_CLIENT_IDS,
+ 1001, memcache).AndReturn(users.User('test@gmail.com'))
+ self.mox.ReplayAll()
+
+ os.environ['HTTP_AUTHORIZATION'] = ('Bearer ' + self._SAMPLE_TOKEN)
+ if args:
+ cls.method(*args)
+ else:
+ users_id_token._maybe_set_current_user_vars(cls.method)
+ self.assertEqual(os.environ.get('ENDPOINTS_AUTH_EMAIL'), 'test@gmail.com')
+ self.mox.VerifyAll()
+
+ def testMaybeSetVarsIdTokenApiAnnotation(self):
+ self.VerifyIdToken(self.TestApiAnnotatedAtApi())
+
+ def testMaybeSetVarsIdTokenMethodAnnotation(self):
+ self.VerifyIdToken(self.TestApiAnnotatedAtMethod())
+
+ def testMethodCallParsesIdToken(self):
+ self.VerifyIdToken(self.TestApiAnnotatedAtApi(),
+ message_types.VoidMessage())
+
+ def testMaybeSetVarsWithActualRequestAccessToken(self):
+ dummy_scope = 'scope'
+ dummy_token = 'dummy_token'
+ dummy_email = 'test@gmail.com'
+ dummy_client_id = self._SAMPLE_ALLOWED_CLIENT_IDS[0]
+
+ @api_config.api('TestApi', 'v1',
+ allowed_client_ids=self._SAMPLE_ALLOWED_CLIENT_IDS,
+ scopes=[dummy_scope])
+ class TestApiScopes(remote.Service):
+ """Describes TestApiScopes."""
+
+ # pylint: disable=g-bad-name
+ @api_config.method(message_types.VoidMessage, message_types.VoidMessage)
+ def method(self, request):
+ return request
+
+ # users_id_token._get_id_token_user and time.time don't need to be stubbed
+ # because the scopes used will not be [EMAIL_SCOPE] hence _get_id_token_user
+ # will never be attempted
+
+ self.mox.StubOutWithMock(users_id_token, '_is_local_dev')
+ users_id_token._is_local_dev().AndReturn(False)
+
+ self.mox.StubOutWithMock(oauth, 'get_client_id')
+ oauth.get_client_id(dummy_scope).AndReturn(dummy_client_id)
+
+ self.mox.ReplayAll()
+
+ api_instance = TestApiScopes()
+ os.environ['HTTP_AUTHORIZATION'] = 'Bearer ' + dummy_token
+ api_instance.method(message_types.VoidMessage())
+ self.assertEqual(os.getenv('ENDPOINTS_USE_OAUTH_SCOPE'), dummy_scope)
+ self.mox.VerifyAll()
+
+ def testMaybeSetVarsFail(self):
+ self.mox.StubOutWithMock(time, 'time')
+ time.time().MultipleTimes().AndReturn(1001)
+ self.mox.StubOutWithMock(users_id_token, '_get_id_token_user')
+ users_id_token._get_id_token_user(
+ self._SAMPLE_TOKEN,
+ self._SAMPLE_AUDIENCES,
+ self._SAMPLE_ALLOWED_CLIENT_IDS,
+ 1001, memcache).MultipleTimes().AndReturn(users.User('test@gmail.com'))
+ self.mox.ReplayAll()
+ # This token should correctly result in _get_id_token_user being called
+ os.environ['HTTP_AUTHORIZATION'] = ('Bearer ' + self._SAMPLE_TOKEN)
+ api_instance = self.TestApiAnnotatedAtApi()
+
+ # No im_self is present and no api_info can be used, so the method itself
+ # has no access to scopes, hence scopes will be null and neither of the
+ # token checks will occur
+ users_id_token._maybe_set_current_user_vars(api_instance.method.im_func)
+ self.assertNotIn('ENDPOINTS_USE_OAUTH_SCOPE', os.environ)
+ self.assertEqual(os.getenv('ENDPOINTS_AUTH_EMAIL'), '')
+ self.assertEqual(os.getenv('ENDPOINTS_AUTH_DOMAIN'), '')
+
+ # Test the same works when using the method and not im_func
+ os.environ.pop('ENDPOINTS_AUTH_EMAIL')
+ os.environ.pop('ENDPOINTS_AUTH_DOMAIN')
+ users_id_token._maybe_set_current_user_vars(api_instance.method)
+ self.assertEqual(os.getenv('ENDPOINTS_AUTH_EMAIL'), 'test@gmail.com')
+
+ # Test that it works using the api info from the API
+ os.environ.pop('ENDPOINTS_AUTH_EMAIL')
+ os.environ.pop('ENDPOINTS_AUTH_DOMAIN')
+ users_id_token._maybe_set_current_user_vars(api_instance.method.im_func,
+ api_info=api_instance.api_info)
+ self.assertEqual(os.getenv('ENDPOINTS_AUTH_EMAIL'), 'test@gmail.com')
+ self.mox.VerifyAll()
+
+
+if __name__ == '__main__':
+ unittest.main()
« no previous file with comments | « third_party/google-endpoints/endpoints/test/test_util.py ('k') | third_party/google-endpoints/endpoints/users_id_token.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698