Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(11)

Side by Side Diff: third_party/google-endpoints/endpoints/test/users_id_token_test.py

Issue 2666783008: Add google-endpoints to third_party/. (Closed)
Patch Set: Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2016 Google Inc. All Rights Reserved.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Tests for users_id_token and validate_id_token."""
16
17 import base64
18 import json
19 import os
20 import string
21 import time
22 import unittest
23
24 import endpoints.api_config as api_config
25
26 import mox
27 from protorpc import message_types
28 from protorpc import messages
29 from protorpc import remote
30
31 import test_util
32 import endpoints.users_id_token as users_id_token
33
34 from google.appengine.api import memcache
35 from google.appengine.api import oauth
36 from google.appengine.api import urlfetch
37 from google.appengine.api import users
38
39
40 # The key response that allows the _SAMPLE_TOKEN to be verified. This key was
41 # retrieved from:
42 # http://www-googleapis-test.sandbox.google.com//oauth2/v1/raw_public_keys
43 # ...at the same time that _SAMPLE_TOKEN was generated.
44 # The first cert is too short, which caused an exception 'Plaintext too large'
45 # from RSA.encrypt (b/19127342); the second cert is the correct cert. Put both
46 # there to make sure the second cert is tried when the first failed.
47
48 _CACHED_CERT = {
49 'keyvalues': [
50 {
51 'algorithm': 'RSA',
52 'modulus': ('2bqhkZ+DZSuQvHX3rdoIni39gfl6zny0WZK6dLPP2lRmer1aEAP982'
53 'u2B1siXoXB8HN+pwCZMGV5kbHaG13InopeVNIMFl2IU4aql+hDS0+i'
54 'j+1Rrsa6wHWp4+3eKe9q+VqXMdulclegHjVtxDs76W1lpuP1e6Msc3'
55 'IuSXjR'),
56 'exponent': 'AQAB',
57 'keyid': '458790a80f9c9957e8df61332b9f06faa6472bad'
58 },
59 {
60 'algorithm': 'RSA',
61 'modulus': ('AL18Q+dq5ws4/V7KtgfhC6SwJH20GvUN5z3vf4SUSrpQG2/nySBvUh'
62 'Iv86Hkk4Uy7W+OTq2+csCGhjGnRxBx9BThT85G8F6IGNjcOyNHVtnR'
63 'ifX+T88sUB1l7jAISRMCrgHIRNmwDCmEe1fTqTUOdgDT8nB7pX7SA/'
64 'VH0q+t2xml'),
65 'exponent': 'AQAB',
66 'keyid': '7411abfccccb4c253cd3e75b4fa5887f49aa83d1'
67 },
68 ]
69 }
70
71
72 class ModuleInterfaceTest(test_util.ModuleInterfaceTest,
73 unittest.TestCase):
74
75 MODULE = users_id_token
76
77
78 class TestCache(object):
79 """Test stub to replace memcache for id_token verification."""
80
81 def __init__(self):
82 self._used_cached_value = False
83 self._value_was_set = False
84
85 @property
86 def used_cached_value(self):
87 return self._used_cached_value
88
89 @property
90 def value_was_set(self):
91 return self._value_was_set
92
93 # pylint: disable=g-bad-name
94 def get(self, key, *unused_args, **kwargs):
95 if (key == users_id_token._DEFAULT_CERT_URI and
96 kwargs.get('namespace', '') == users_id_token._CERT_NAMESPACE):
97 self._used_cached_value = True
98 return _CACHED_CERT
99 return None
100
101 def set(self, *unused_args, **unused_kwargs):
102 self._value_was_set = True
103
104
105 class UsersIdTokenTestBase(unittest.TestCase):
106 """A sample token based on JWT.
107
108 Sample token is based on a JWT with this body:
109 {
110 "iss":"accounts.google.com",
111 "email":"kevind@gmail.com",
112 "email_verified":"true",
113 "aud":"919214422084-c0jrodnkm7ntttjhhttilqjq5d7l7mu5.apps."
114 "googleusercontent.com",
115 "sub":"104564329451840817415",
116 "azp":"919214422084-c0jrodnkm7ntttjhhttilqjq5d7l7mu5.apps."
117 "googleusercontent.com",
118 "at_hash":"c9aVyHiathUC-pgRFjFWbw",
119 "iat":1360964700,
120 "exp":1360968600
121 }
122 """
123 _SAMPLE_TOKEN = ('eyJhbGciOiJSUzI1NiIsImtpZCI6Ijc0MTFhYmZjY2NjYjRjMjUzY2QzZTc'
124 '1YjRmYTU4ODdmNDlhYTgzZDEifQ.eyJpc3MiOiJhY2NvdW50cy5nb29nbGU'
125 'uY29tIiwiZW1haWwiOiJrZXZpbmRAZ21haWwuY29tIiwiZW1haWxfdmVyaW'
126 'ZpZWQiOiJ0cnVlIiwiYXVkIjoiOTE5MjE0NDIyMDg0LWMwanJvZG5rbTdud'
127 'HR0amhodHRpbHFqcTVkN2w3bXU1LmFwcHMuZ29vZ2xldXNlcmNvbnRlbnQu'
128 'Y29tIiwic3ViIjoiMTA0NTY0MzI5NDUxODQwODE3NDE1IiwiYXpwIjoiOTE'
129 '5MjE0NDIyMDg0LWMwanJvZG5rbTdudHR0amhodHRpbHFqcTVkN2w3bXU1Lm'
130 'FwcHMuZ29vZ2xldXNlcmNvbnRlbnQuY29tIiwiYXRfaGFzaCI6ImM5YVZ5S'
131 'GlhdGhVQy1wZ1JGakZXYnciLCJpYXQiOjEzNjA5NjQ3MDAsImV4cCI6MTM2'
132 'MDk2ODYwMH0.XwaGmw5n1XHJapwkn6pumK14l9Tiyn1q2C5VeYbvuScNS6Z'
133 '-kdb9mX87Hl2hbdUvHm6TNzabMVTgvHPATjuCAt2lXOpwm8iGnon6vTk5LM'
134 'm0tUAE25IAImvpSc59l0ySd4x2g3BvjauxwaYjkwYJRVczsVlTTB3iKlBhW'
135 'IT01vM')
136 _SAMPLE_AUDIENCES = ('919214422084-c0jrodnkm7ntttjhhttilqjq5d7l7mu5.apps.'
137 'googleusercontent.com',)
138 _SAMPLE_ALLOWED_CLIENT_IDS = ('919214422084-c0jrodnkm7ntttjhhttilqjq5d7l7mu5.'
139 'apps.googleusercontent.com',
140 '12345.apps.googleusercontent.com')
141 _SAMPLE_TIME_NOW = 1360964700
142 _SAMPLE_OAUTH_SCOPES = ['https://www.googleapis.com/auth/userinfo.email']
143 _SAMPLE_OAUTH_TOKEN_INFO = {
144 'issued_to': ('919214422084-c0jrodnkm7ntttjhhttilqjq5d7l7mu5.apps.'
145 'googleusercontent.com'),
146 'user_id': '108495933693426793887',
147 'expires_in': 3384,
148 'access_type': 'online',
149 'audience': ('919214422084-c0jrodnkm7ntttjhhttilqjq5d7l7mu5.apps.'
150 'googleusercontent.com'),
151 'scope': (
152 'https://www.googleapis.com/auth/userinfo.profile '
153 'https://www.googleapis.com/auth/userinfo.email'),
154 'email': 'kevind@gmail.com',
155 'verified_email': True
156 }
157
158 def setUp(self):
159 self.cache = TestCache()
160 self._saved_environ = os.environ.copy()
161 if 'AUTH_DOMAIN' not in os.environ:
162 os.environ['AUTH_DOMAIN'] = 'gmail.com'
163 self.mox = mox.Mox()
164
165 def tearDown(self):
166 self.mox.UnsetStubs()
167 os.environ = self._saved_environ
168
169 def GetSampleBody(self):
170 split_token = self._SAMPLE_TOKEN.split('.')
171 body = json.loads(users_id_token._urlsafe_b64decode(split_token[1]))
172 return body
173
174
175 class UsersIdTokenTest(UsersIdTokenTestBase):
176
177 def testSampleIdToken(self):
178 user = users_id_token._get_id_token_user(self._SAMPLE_TOKEN,
179 self._SAMPLE_AUDIENCES,
180 self._SAMPLE_ALLOWED_CLIENT_IDS,
181 self._SAMPLE_TIME_NOW, self.cache)
182 self.assertEqual(user.email(), 'kevind@gmail.com')
183 # User ID shouldn't be filled in. See notes in users_id_token.py.
184 self.assertIsNone(user.user_id())
185 self.assertTrue(self.cache.used_cached_value)
186
187 def testInvalidSignature(self):
188 """Verify that a body that doesn't match the signature fails."""
189 body = self.GetSampleBody()
190 # Modify the issued and expiration times.
191 body['iat'] += 60
192 body['exp'] += 60
193 encoded_body = base64.urlsafe_b64encode(json.dumps(body))
194
195 split_token = self._SAMPLE_TOKEN.split('.')
196 token = '.'.join((split_token[0], encoded_body, split_token[2]))
197
198 self.assertRaises(users_id_token._AppIdentityError,
199 users_id_token._verify_signed_jwt_with_certs,
200 token, self._SAMPLE_TIME_NOW, self.cache)
201
202 def testNoCertRaisesException(self):
203 """Verify that if we can't get certs, we fail."""
204 self.assertRaises(users_id_token._AppIdentityError,
205 users_id_token._verify_signed_jwt_with_certs,
206 self._SAMPLE_TOKEN, self._SAMPLE_TIME_NOW, self.cache,
207 'https://bad.url/not/in/test/cache')
208
209 def testGetCertExpirationTime(self):
210 """Test that we can correctly get cert expiration time from headers."""
211 tests = [({'Cache-Control': 'max-age=3600'}, 3600),
212 ({'Cache-Control': 'max-age=3600', 'Age': '1200'}, 2400),
213 ({}, 0),
214 ({'Age': '1'}, 0),
215 ({'Cache-Control': 'max-age=3600', 'Age': '3700'}, 0),
216 ({'Cache-Control': 'max-age=3600', 'Age': 'bad'}, 3600),
217 ({'Cache-Control': 'max-age=nomatch,max-age=1200'}, 1200),
218 ({'Cache-Control': 'max-age=invalid'}, 0)]
219 for headers, expected_result in tests:
220 result = users_id_token._get_cert_expiration_time(headers)
221 self.assertEqual(expected_result, result)
222
223 def testCertCacheControl(self):
224 """Test that cache control headers are respected."""
225 self.mox.StubOutWithMock(urlfetch, 'fetch')
226 tests = [({'Cache-Control': 'max-age=3600', 'Age': '1200'}, True),
227 ({'Cache-Control': 'max-age=100', 'Age': '100'}, False),
228 ({}, False)]
229 for test_headers, value_set in tests:
230
231 class DummyResponse(object):
232 status_code = 200
233 content = json.dumps(self._SAMPLE_OAUTH_TOKEN_INFO)
234 headers = test_headers
235
236 urlfetch.fetch(mox.IsA(basestring)).AndReturn(DummyResponse())
237 cache = TestCache()
238
239 self.mox.ReplayAll()
240 users_id_token._get_cached_certs('some_uri', cache)
241 self.mox.VerifyAll()
242 self.mox.ResetAll()
243
244 self.assertEqual(value_set, cache.value_was_set)
245
246 def testInvalidTokenExtraSections(self):
247 """Verify that a token with too many pieces fails."""
248 self.assertRaises(users_id_token._AppIdentityError,
249 users_id_token._verify_signed_jwt_with_certs,
250 self._SAMPLE_TOKEN + '.asdf', self._SAMPLE_TIME_NOW,
251 self.cache)
252
253 def testNoCrypto(self):
254 """Verify we throw an _AppIdentityError if the Crypto modules don't load."""
255 crypto_loaded = users_id_token._CRYPTO_LOADED
256 try:
257 users_id_token._CRYPTO_LOADED = False
258 self.assertRaises(users_id_token._AppIdentityError,
259 users_id_token._verify_signed_jwt_with_certs,
260 self._SAMPLE_TOKEN, self._SAMPLE_TIME_NOW,
261 self.cache)
262 finally:
263 users_id_token._CRYPTO_LOADED = crypto_loaded
264
265 def testExpiredToken(self):
266 """Verify that expired tokens will fail."""
267 expired_time_now = (self._SAMPLE_TIME_NOW +
268 users_id_token._MAX_TOKEN_LIFETIME_SECS + 1)
269 self.assertRaises(users_id_token._AppIdentityError,
270 users_id_token._verify_signed_jwt_with_certs,
271 self._SAMPLE_TOKEN, expired_time_now,
272 self.cache)
273 # Also verify that this doesn't return a user when called from
274 # users_id_token.
275 user = users_id_token._get_id_token_user(self._SAMPLE_TOKEN,
276 self._SAMPLE_AUDIENCES,
277 self._SAMPLE_ALLOWED_CLIENT_IDS,
278 expired_time_now, self.cache)
279 self.assertIsNone(user)
280
281 def testTimeTooEarly(self):
282 """Verify that we'll fail if the provided time_now is too early."""
283 early_time_now = (self._SAMPLE_TIME_NOW -
284 users_id_token._CLOCK_SKEW_SECS - 1)
285 self.assertRaises(users_id_token._AppIdentityError,
286 users_id_token._verify_signed_jwt_with_certs,
287 self._SAMPLE_TOKEN, early_time_now,
288 self.cache)
289
290 def CheckErrorLoggable(self, token):
291 """Verify that the error strings we log are valid, loggable strings."""
292 try:
293 users_id_token._verify_signed_jwt_with_certs(
294 token, self._SAMPLE_TIME_NOW, self.cache)
295 self.fail('Expected exception.')
296 except users_id_token._AppIdentityError, e:
297 # Make sure this works without an exception.
298 try:
299 str(e).decode('utf-8')
300 except UnicodeDecodeError:
301 printable = ''.join(c if c in string.printable
302 else '\\x%02x' % ord(c)
303 for c in str(e))
304 self.fail('Unsafe error sent to log: %s' % printable)
305
306 def testErrorStringLoggableWrongSegments(self):
307 """Check that the Wrong Segments error is loggable."""
308 self.CheckErrorLoggable('bad utf-8 \xff')
309
310 def testErrorStringLoggableBadHeader(self):
311 """Check that the Bad Header error is loggable."""
312 token_part = 'bad utf-8 \xff'
313 token = '.'.join([base64.urlsafe_b64encode(token_part)] * 3)
314 self.CheckErrorLoggable(token)
315
316 def testErrorStringLoggableBadBody(self):
317 """Check that the Unparseable Body error is loggable."""
318 token_body = 'bad utf-8 \xff'
319 token_parts = self._SAMPLE_TOKEN.split('.')
320 token = '.'.join([token_parts[0],
321 base64.urlsafe_b64encode(token_body),
322 token_parts[2]])
323 self.CheckErrorLoggable(token)
324
325 def CheckToken(self, field_update_dict, valid):
326 """Update the sample token and check if it's valid or invalid.
327
328 This updates the body of our sample token with the fields in
329 field_update_dict, then passes it to _verify_parsed_token. The result must
330 match the "valid" parameter.
331
332 Args:
333 field_update_dict: A dict of fields to update in the sample body.
334 valid: A boolean, compared against the result from _verify_parsed_token.
335 """
336 parsed_token = self.GetSampleBody()
337 parsed_token.update(field_update_dict)
338 result = users_id_token._verify_parsed_token(
339 parsed_token, self._SAMPLE_AUDIENCES, self._SAMPLE_ALLOWED_CLIENT_IDS)
340 self.assertEqual(valid, result)
341
342 def testInvalidIssuer(self):
343 self.CheckToken({'iss': 'invalid.issuer'}, False)
344
345 def testInvalidAudience(self):
346 self.CheckToken({'aud': 'invalid.audience'}, False)
347
348 def testInvalidClientId(self):
349 self.CheckToken({'azp': 'invalid.client.id'}, False)
350
351 def testSampleIdTokenWithOldFields(self):
352 self.CheckToken({'cid': 'Extra ignored field.'}, True)
353
354 def testSkipClientIdNotAllowedForIdTokens(self):
355 """Verify that SKIP_CLIENT_ID_CHECKS does not work for ID tokens."""
356 parsed_token = self.GetSampleBody()
357 result = users_id_token._verify_parsed_token(
358 parsed_token, self._SAMPLE_AUDIENCES,
359 users_id_token.SKIP_CLIENT_ID_CHECK)
360 self.assertEqual(False, result)
361
362 def testEmptyAudience(self):
363 parsed_token = self.GetSampleBody()
364 parsed_token.update({'aud': 'invalid.audience'})
365 result = users_id_token._verify_parsed_token(
366 parsed_token, [], self._SAMPLE_ALLOWED_CLIENT_IDS)
367 self.assertEqual(False, result)
368
369 def AttemptOauth(self, client_id, allowed_client_ids=None):
370 if allowed_client_ids is None:
371 allowed_client_ids = self._SAMPLE_ALLOWED_CLIENT_IDS
372 self.mox.StubOutWithMock(oauth, 'get_client_id')
373 # We have four cases:
374 # * no client ID is specified, so we raise for every scope.
375 # * the given client ID is in the whitelist or there is no
376 # whitelist, so we'll only be called once.
377 # * we have a client ID not on the whitelist, so we need a
378 # mock call for every scope.
379 if client_id is None:
380 for scope in self._SAMPLE_OAUTH_SCOPES:
381 oauth.get_client_id(scope).AndRaise(oauth.Error)
382 elif (list(allowed_client_ids) == users_id_token.SKIP_CLIENT_ID_CHECK or
383 client_id in allowed_client_ids):
384 scope = self._SAMPLE_OAUTH_SCOPES[0]
385 oauth.get_client_id(scope).AndReturn(client_id)
386 else:
387 for scope in self._SAMPLE_OAUTH_SCOPES:
388 oauth.get_client_id(scope).AndReturn(client_id)
389
390 self.mox.ReplayAll()
391 users_id_token._set_bearer_user_vars(allowed_client_ids,
392 self._SAMPLE_OAUTH_SCOPES)
393 self.mox.VerifyAll()
394
395 def assertOauthSucceeded(self, client_id):
396 self.AttemptOauth(client_id)
397 self.assertEqual(os.environ.get('ENDPOINTS_USE_OAUTH_SCOPE'),
398 self._SAMPLE_OAUTH_SCOPES[0])
399
400 def assertOauthFailed(self, client_id):
401 self.AttemptOauth(client_id)
402 self.assertNotIn('ENDPOINTS_USE_OAUTH_SCOPE', os.environ)
403
404 def testOauthInvalidClientId(self):
405 self.assertOauthFailed('abc.appspot.com')
406
407 def testOauthValidClientId(self):
408 self.assertOauthSucceeded(self._SAMPLE_ALLOWED_CLIENT_IDS[0])
409
410 def testOauthExplorerClientId(self):
411 self.assertOauthFailed(api_config.API_EXPLORER_CLIENT_ID)
412
413 def testOauthInvalidScope(self):
414 self.assertOauthFailed(None)
415
416 def testAllowAllClientIds(self):
417 client_id = 'clearly_fake_id'
418 self.AttemptOauth(client_id,
419 allowed_client_ids=users_id_token.SKIP_CLIENT_ID_CHECK)
420 self.assertEqual(os.environ.get('ENDPOINTS_USE_OAUTH_SCOPE'),
421 self._SAMPLE_OAUTH_SCOPES[0])
422
423 def AttemptOauthLocal(self, token_update=None):
424 token = self._SAMPLE_OAUTH_TOKEN_INFO.copy()
425 token.update(token_update or {})
426
427 class DummyResponse(object):
428 status_code = 200
429 content = json.dumps(token)
430
431 self.mox.StubOutWithMock(urlfetch, 'fetch')
432 urlfetch.fetch(mox.IsA(basestring)).AndReturn(DummyResponse())
433
434 self.mox.ReplayAll()
435 users_id_token._set_bearer_user_vars_local('unused_token',
436 self._SAMPLE_ALLOWED_CLIENT_IDS,
437 self._SAMPLE_OAUTH_SCOPES)
438 self.mox.VerifyAll()
439
440 def testOauthLocal(self):
441 self.AttemptOauthLocal()
442 self.assertNotIn('ENDPOINTS_USE_OAUTH_SCOPE', os.environ)
443 self.assertEqual('kevind@gmail.com',
444 os.environ.get('ENDPOINTS_AUTH_EMAIL'))
445 self.assertEqual('', os.environ.get('ENDPOINTS_AUTH_DOMAIN'))
446
447 def assertOauthLocalFailed(self, token_update):
448 self.AttemptOauthLocal(token_update)
449 self.assertNotIn('ENDPOINTS_USE_OAUTH_SCOPE', os.environ)
450 self.assertNotIn('ENDPOINTS_AUTH_EMAIL', os.environ)
451 self.assertNotIn('ENDPOINTS_AUTH_DOMAIN', os.environ)
452
453 def testOauthLocalBadEmail(self):
454 self.assertOauthLocalFailed({'verified_email': False})
455
456 def testOauthLocalBadClientId(self):
457 self.assertOauthLocalFailed({'issued_to': 'abc.appspot.com'})
458
459 def testOauthLocalBadScopes(self):
460 self.assertOauthLocalFailed({'scope': 'useless_scope and_another'})
461
462 def testGetCurrentUserNoAuthInfo(self):
463 self.assertRaises(users_id_token.InvalidGetUserCall,
464 users_id_token.get_current_user)
465
466 def testGetCurrentUserEmailOnly(self):
467 os.environ['ENDPOINTS_AUTH_EMAIL'] = 'test@gmail.com'
468 os.environ['ENDPOINTS_AUTH_DOMAIN'] = ''
469 user = users_id_token.get_current_user()
470 self.assertEqual(user.email(), 'test@gmail.com')
471 self.assertIsNone(user.user_id())
472
473 def testGetCurrentUserEmailAndAuth(self):
474 os.environ['ENDPOINTS_AUTH_EMAIL'] = 'test@gmail.com'
475 os.environ['ENDPOINTS_AUTH_DOMAIN'] = 'gmail.com'
476 user = users_id_token.get_current_user()
477 self.assertEqual(user.email(), 'test@gmail.com')
478 self.assertEqual(user.auth_domain(), 'gmail.com')
479 self.assertIsNone(user.user_id())
480
481 def testGetCurrentUserOauth(self):
482 self.mox.StubOutWithMock(oauth, 'get_current_user')
483 oauth.get_current_user('scope').AndReturn(users.User('test@gmail.com'))
484 self.mox.ReplayAll()
485
486 os.environ['ENDPOINTS_USE_OAUTH_SCOPE'] = 'scope'
487 user = users_id_token.get_current_user()
488 self.assertEqual(user.email(), 'test@gmail.com')
489 self.mox.VerifyAll()
490
491 def testGetTokenQueryParamOauthHeader(self):
492 os.environ['HTTP_AUTHORIZATION'] = 'OAuth ' + self._SAMPLE_TOKEN
493 token = users_id_token._get_token(None)
494 self.assertEqual(token, self._SAMPLE_TOKEN)
495
496 def testGetTokenQueryParamBearerHeader(self):
497 os.environ['HTTP_AUTHORIZATION'] = 'Bearer ' + self._SAMPLE_TOKEN
498 token = users_id_token._get_token(None)
499 self.assertEqual(token, self._SAMPLE_TOKEN)
500
501 def testGetTokenQueryParamInvalidBearerHeader(self):
502 # Capitalization matters. This should fail.
503 os.environ['HTTP_AUTHORIZATION'] = 'BEARER ' + self._SAMPLE_TOKEN
504 token = users_id_token._get_token(None)
505 self.assertIsNone(token)
506
507 def testGetTokenQueryParamInvalidHeader(self):
508 os.environ['HTTP_AUTHORIZATION'] = 'Invalid ' + self._SAMPLE_TOKEN
509 token = users_id_token._get_token(None)
510 self.assertIsNone(token)
511
512 def testGetTokenQueryParamBearer(self):
513 request = self.mox.CreateMock(messages.Message)
514 request.get_unrecognized_field_info('bearer_token').AndReturn(
515 (self._SAMPLE_TOKEN, messages.Variant.STRING))
516
517 self.mox.ReplayAll()
518 token = users_id_token._get_token(request)
519 self.mox.VerifyAll()
520 self.assertEqual(token, self._SAMPLE_TOKEN)
521
522 def testGetTokenQueryParamAccess(self):
523 request = self.mox.CreateMock(messages.Message)
524 request.get_unrecognized_field_info('bearer_token').AndReturn(
525 (None, None))
526 request.get_unrecognized_field_info('access_token').AndReturn(
527 (self._SAMPLE_TOKEN, messages.Variant.STRING))
528
529 self.mox.ReplayAll()
530 token = users_id_token._get_token(request)
531 self.mox.VerifyAll()
532 self.assertEqual(token, self._SAMPLE_TOKEN)
533
534 def testGetTokenNone(self):
535 request = self.mox.CreateMock(messages.Message)
536 request.get_unrecognized_field_info('bearer_token').AndReturn((None, None))
537 request.get_unrecognized_field_info('access_token').AndReturn((None, None))
538
539 self.mox.ReplayAll()
540 token = users_id_token._get_token(request)
541 self.mox.VerifyAll()
542 self.assertIsNone(token)
543
544
545 class UsersIdTokenTestWithSimpleApi(UsersIdTokenTestBase):
546
547 # pylint: disable=g-bad-name
548
549 @api_config.api('TestApi', 'v1')
550 class TestApiAnnotatedAtMethod(remote.Service):
551 """Describes TestApi."""
552
553 @api_config.method(
554 message_types.VoidMessage, message_types.VoidMessage,
555 audiences=UsersIdTokenTestBase._SAMPLE_AUDIENCES,
556 allowed_client_ids=UsersIdTokenTestBase._SAMPLE_ALLOWED_CLIENT_IDS,
557 scopes=UsersIdTokenTestBase._SAMPLE_OAUTH_SCOPES)
558 def method(self):
559 pass
560
561 @api_config.api(
562 'TestApi', 'v1', audiences=UsersIdTokenTestBase._SAMPLE_AUDIENCES,
563 allowed_client_ids=UsersIdTokenTestBase._SAMPLE_ALLOWED_CLIENT_IDS)
564 class TestApiAnnotatedAtApi(remote.Service):
565 """Describes TestApi."""
566
567 @api_config.method(message_types.VoidMessage, message_types.VoidMessage)
568 def method(self, request):
569 return request
570 # pylint: enable=g-bad-name
571
572 def testMaybeSetVarsAlreadySetOauth(self):
573 os.environ['ENDPOINTS_USE_OAUTH_SCOPE'] = (
574 'https://www.googleapis.com/auth/userinfo.email')
575 users_id_token._maybe_set_current_user_vars(
576 self.TestApiAnnotatedAtApi().method)
577 self.assertEqual('https://www.googleapis.com/auth/userinfo.email',
578 os.environ.get('ENDPOINTS_USE_OAUTH_SCOPE'))
579 self.assertNotIn('ENDPOINTS_AUTH_EMAIL', os.environ)
580 self.assertNotIn('ENDPOINTS_AUTH_DOMAIN', os.environ)
581
582 def testMaybeSetVarsAlreadySetIdToken(self):
583 os.environ['ENDPOINTS_AUTH_EMAIL'] = 'test@gmail.com'
584 os.environ['ENDPOINTS_AUTH_DOMAIN'] = 'gmail.com'
585 users_id_token._maybe_set_current_user_vars(
586 self.TestApiAnnotatedAtApi().method)
587 self.assertNotIn('ENDPOINTS_USE_OAUTH_SCOPE', os.environ)
588 self.assertEqual('test@gmail.com', os.environ.get('ENDPOINTS_AUTH_EMAIL'))
589 self.assertEqual('gmail.com', os.environ.get('ENDPOINTS_AUTH_DOMAIN'))
590
591 def testMaybeSetVarsAlreadySetIdTokenNoDomain(self):
592 os.environ['ENDPOINTS_AUTH_EMAIL'] = 'test@gmail.com'
593 os.environ['ENDPOINTS_AUTH_DOMAIN'] = ''
594 users_id_token._maybe_set_current_user_vars(
595 self.TestApiAnnotatedAtApi().method)
596 self.assertNotIn('ENDPOINTS_USE_OAUTH_SCOPE', os.environ)
597 self.assertEqual('test@gmail.com', os.environ.get('ENDPOINTS_AUTH_EMAIL'))
598 self.assertEqual('', os.environ.get('ENDPOINTS_AUTH_DOMAIN'))
599
600 def VerifyIdToken(self, cls, *args):
601 self.mox.StubOutWithMock(time, 'time')
602 self.mox.StubOutWithMock(users_id_token, '_get_id_token_user')
603 time.time().AndReturn(1001)
604 users_id_token._get_id_token_user(
605 self._SAMPLE_TOKEN,
606 self._SAMPLE_AUDIENCES,
607 self._SAMPLE_ALLOWED_CLIENT_IDS,
608 1001, memcache).AndReturn(users.User('test@gmail.com'))
609 self.mox.ReplayAll()
610
611 os.environ['HTTP_AUTHORIZATION'] = ('Bearer ' + self._SAMPLE_TOKEN)
612 if args:
613 cls.method(*args)
614 else:
615 users_id_token._maybe_set_current_user_vars(cls.method)
616 self.assertEqual(os.environ.get('ENDPOINTS_AUTH_EMAIL'), 'test@gmail.com')
617 self.mox.VerifyAll()
618
619 def testMaybeSetVarsIdTokenApiAnnotation(self):
620 self.VerifyIdToken(self.TestApiAnnotatedAtApi())
621
622 def testMaybeSetVarsIdTokenMethodAnnotation(self):
623 self.VerifyIdToken(self.TestApiAnnotatedAtMethod())
624
625 def testMethodCallParsesIdToken(self):
626 self.VerifyIdToken(self.TestApiAnnotatedAtApi(),
627 message_types.VoidMessage())
628
629 def testMaybeSetVarsWithActualRequestAccessToken(self):
630 dummy_scope = 'scope'
631 dummy_token = 'dummy_token'
632 dummy_email = 'test@gmail.com'
633 dummy_client_id = self._SAMPLE_ALLOWED_CLIENT_IDS[0]
634
635 @api_config.api('TestApi', 'v1',
636 allowed_client_ids=self._SAMPLE_ALLOWED_CLIENT_IDS,
637 scopes=[dummy_scope])
638 class TestApiScopes(remote.Service):
639 """Describes TestApiScopes."""
640
641 # pylint: disable=g-bad-name
642 @api_config.method(message_types.VoidMessage, message_types.VoidMessage)
643 def method(self, request):
644 return request
645
646 # users_id_token._get_id_token_user and time.time don't need to be stubbed
647 # because the scopes used will not be [EMAIL_SCOPE] hence _get_id_token_user
648 # will never be attempted
649
650 self.mox.StubOutWithMock(users_id_token, '_is_local_dev')
651 users_id_token._is_local_dev().AndReturn(False)
652
653 self.mox.StubOutWithMock(oauth, 'get_client_id')
654 oauth.get_client_id(dummy_scope).AndReturn(dummy_client_id)
655
656 self.mox.ReplayAll()
657
658 api_instance = TestApiScopes()
659 os.environ['HTTP_AUTHORIZATION'] = 'Bearer ' + dummy_token
660 api_instance.method(message_types.VoidMessage())
661 self.assertEqual(os.getenv('ENDPOINTS_USE_OAUTH_SCOPE'), dummy_scope)
662 self.mox.VerifyAll()
663
664 def testMaybeSetVarsFail(self):
665 self.mox.StubOutWithMock(time, 'time')
666 time.time().MultipleTimes().AndReturn(1001)
667 self.mox.StubOutWithMock(users_id_token, '_get_id_token_user')
668 users_id_token._get_id_token_user(
669 self._SAMPLE_TOKEN,
670 self._SAMPLE_AUDIENCES,
671 self._SAMPLE_ALLOWED_CLIENT_IDS,
672 1001, memcache).MultipleTimes().AndReturn(users.User('test@gmail.com'))
673 self.mox.ReplayAll()
674 # This token should correctly result in _get_id_token_user being called
675 os.environ['HTTP_AUTHORIZATION'] = ('Bearer ' + self._SAMPLE_TOKEN)
676 api_instance = self.TestApiAnnotatedAtApi()
677
678 # No im_self is present and no api_info can be used, so the method itself
679 # has no access to scopes, hence scopes will be null and neither of the
680 # token checks will occur
681 users_id_token._maybe_set_current_user_vars(api_instance.method.im_func)
682 self.assertNotIn('ENDPOINTS_USE_OAUTH_SCOPE', os.environ)
683 self.assertEqual(os.getenv('ENDPOINTS_AUTH_EMAIL'), '')
684 self.assertEqual(os.getenv('ENDPOINTS_AUTH_DOMAIN'), '')
685
686 # Test the same works when using the method and not im_func
687 os.environ.pop('ENDPOINTS_AUTH_EMAIL')
688 os.environ.pop('ENDPOINTS_AUTH_DOMAIN')
689 users_id_token._maybe_set_current_user_vars(api_instance.method)
690 self.assertEqual(os.getenv('ENDPOINTS_AUTH_EMAIL'), 'test@gmail.com')
691
692 # Test that it works using the api info from the API
693 os.environ.pop('ENDPOINTS_AUTH_EMAIL')
694 os.environ.pop('ENDPOINTS_AUTH_DOMAIN')
695 users_id_token._maybe_set_current_user_vars(api_instance.method.im_func,
696 api_info=api_instance.api_info)
697 self.assertEqual(os.getenv('ENDPOINTS_AUTH_EMAIL'), 'test@gmail.com')
698 self.mox.VerifyAll()
699
700
701 if __name__ == '__main__':
702 unittest.main()
OLDNEW
« no previous file with comments | « third_party/google-endpoints/endpoints/test/test_util.py ('k') | third_party/google-endpoints/endpoints/users_id_token.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698