OLD | NEW |
(Empty) | |
| 1 #!/usr/bin/python2.4 |
| 2 # |
| 3 # Copyright 2014 Google Inc. All rights reserved. |
| 4 # |
| 5 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 # you may not use this file except in compliance with the License. |
| 7 # You may obtain a copy of the License at |
| 8 # |
| 9 # http://www.apache.org/licenses/LICENSE-2.0 |
| 10 # |
| 11 # Unless required by applicable law or agreed to in writing, software |
| 12 # distributed under the License is distributed on an "AS IS" BASIS, |
| 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 # See the License for the specific language governing permissions and |
| 15 # limitations under the License. |
| 16 |
| 17 |
| 18 """Oauth2client tests |
| 19 |
| 20 Unit tests for oauth2client. |
| 21 """ |
| 22 |
| 23 __author__ = 'jcgregorio@google.com (Joe Gregorio)' |
| 24 |
| 25 import base64 |
| 26 import contextlib |
| 27 import datetime |
| 28 import json |
| 29 import os |
| 30 import sys |
| 31 import time |
| 32 import unittest |
| 33 |
| 34 import mock |
| 35 import six |
| 36 from six.moves import urllib |
| 37 |
| 38 from .http_mock import HttpMock |
| 39 from .http_mock import HttpMockSequence |
| 40 from oauth2client import GOOGLE_REVOKE_URI |
| 41 from oauth2client import GOOGLE_TOKEN_URI |
| 42 from oauth2client import client |
| 43 from oauth2client.client import AccessTokenCredentials |
| 44 from oauth2client.client import AccessTokenCredentialsError |
| 45 from oauth2client.client import AccessTokenRefreshError |
| 46 from oauth2client.client import ADC_HELP_MSG |
| 47 from oauth2client.client import AssertionCredentials |
| 48 from oauth2client.client import AUTHORIZED_USER |
| 49 from oauth2client.client import Credentials |
| 50 from oauth2client.client import DEFAULT_ENV_NAME |
| 51 from oauth2client.client import ApplicationDefaultCredentialsError |
| 52 from oauth2client.client import FlowExchangeError |
| 53 from oauth2client.client import GoogleCredentials |
| 54 from oauth2client.client import GOOGLE_APPLICATION_CREDENTIALS |
| 55 from oauth2client.client import MemoryCache |
| 56 from oauth2client.client import NonAsciiHeaderError |
| 57 from oauth2client.client import OAuth2Credentials |
| 58 from oauth2client.client import OAuth2WebServerFlow |
| 59 from oauth2client.client import OOB_CALLBACK_URN |
| 60 from oauth2client.client import REFRESH_STATUS_CODES |
| 61 from oauth2client.client import SERVICE_ACCOUNT |
| 62 from oauth2client.client import Storage |
| 63 from oauth2client.client import TokenRevokeError |
| 64 from oauth2client.client import VerifyJwtTokenError |
| 65 from oauth2client.client import _extract_id_token |
| 66 from oauth2client.client import _get_application_default_credential_from_file |
| 67 from oauth2client.client import _get_environment |
| 68 from oauth2client.client import _get_environment_variable_file |
| 69 from oauth2client.client import _get_well_known_file |
| 70 from oauth2client.client import _raise_exception_for_missing_fields |
| 71 from oauth2client.client import _raise_exception_for_reading_json |
| 72 from oauth2client.client import _update_query_params |
| 73 from oauth2client.client import credentials_from_clientsecrets_and_code |
| 74 from oauth2client.client import credentials_from_code |
| 75 from oauth2client.client import flow_from_clientsecrets |
| 76 from oauth2client.client import save_to_well_known_file |
| 77 from oauth2client.clientsecrets import _loadfile |
| 78 from oauth2client.service_account import _ServiceAccountCredentials |
| 79 |
| 80 DATA_DIR = os.path.join(os.path.dirname(__file__), 'data') |
| 81 |
| 82 |
| 83 # TODO(craigcitro): This is duplicated from |
| 84 # googleapiclient.test_discovery; consolidate these definitions. |
| 85 def assertUrisEqual(testcase, expected, actual): |
| 86 """Test that URIs are the same, up to reordering of query parameters.""" |
| 87 expected = urllib.parse.urlparse(expected) |
| 88 actual = urllib.parse.urlparse(actual) |
| 89 testcase.assertEqual(expected.scheme, actual.scheme) |
| 90 testcase.assertEqual(expected.netloc, actual.netloc) |
| 91 testcase.assertEqual(expected.path, actual.path) |
| 92 testcase.assertEqual(expected.params, actual.params) |
| 93 testcase.assertEqual(expected.fragment, actual.fragment) |
| 94 expected_query = urllib.parse.parse_qs(expected.query) |
| 95 actual_query = urllib.parse.parse_qs(actual.query) |
| 96 for name in expected_query.keys(): |
| 97 testcase.assertEqual(expected_query[name], actual_query[name]) |
| 98 for name in actual_query.keys(): |
| 99 testcase.assertEqual(expected_query[name], actual_query[name]) |
| 100 |
| 101 |
| 102 def datafile(filename): |
| 103 return os.path.join(DATA_DIR, filename) |
| 104 |
| 105 |
| 106 def load_and_cache(existing_file, fakename, cache_mock): |
| 107 client_type, client_info = _loadfile(datafile(existing_file)) |
| 108 cache_mock.cache[fakename] = {client_type: client_info} |
| 109 |
| 110 |
| 111 class CacheMock(object): |
| 112 def __init__(self): |
| 113 self.cache = {} |
| 114 |
| 115 def get(self, key, namespace=''): |
| 116 # ignoring namespace for easier testing |
| 117 return self.cache.get(key, None) |
| 118 |
| 119 def set(self, key, value, namespace=''): |
| 120 # ignoring namespace for easier testing |
| 121 self.cache[key] = value |
| 122 |
| 123 |
| 124 class CredentialsTests(unittest.TestCase): |
| 125 |
| 126 def test_to_from_json(self): |
| 127 credentials = Credentials() |
| 128 json = credentials.to_json() |
| 129 restored = Credentials.new_from_json(json) |
| 130 |
| 131 |
| 132 class MockResponse(object): |
| 133 """Mock the response of urllib2.urlopen() call.""" |
| 134 |
| 135 def __init__(self, headers): |
| 136 self._headers = headers |
| 137 |
| 138 def info(self): |
| 139 class Info: |
| 140 def __init__(self, headers): |
| 141 self.headers = headers |
| 142 |
| 143 def get(self, key, default=None): |
| 144 return self.headers.get(key, default) |
| 145 |
| 146 return Info(self._headers) |
| 147 |
| 148 |
| 149 @contextlib.contextmanager |
| 150 def mock_module_import(module): |
| 151 """Place a dummy objects in sys.modules to mock an import test.""" |
| 152 parts = module.split('.') |
| 153 entries = ['.'.join(parts[:i+1]) for i in range(len(parts))] |
| 154 for entry in entries: |
| 155 sys.modules[entry] = object() |
| 156 |
| 157 try: |
| 158 yield |
| 159 |
| 160 finally: |
| 161 for entry in entries: |
| 162 del sys.modules[entry] |
| 163 |
| 164 |
| 165 class GoogleCredentialsTests(unittest.TestCase): |
| 166 |
| 167 def setUp(self): |
| 168 self.env_server_software = os.environ.get('SERVER_SOFTWARE', None) |
| 169 self.env_google_application_credentials = ( |
| 170 os.environ.get(GOOGLE_APPLICATION_CREDENTIALS, None)) |
| 171 self.env_appdata = os.environ.get('APPDATA', None) |
| 172 self.os_name = os.name |
| 173 from oauth2client import client |
| 174 client.SETTINGS.env_name = None |
| 175 |
| 176 def tearDown(self): |
| 177 self.reset_env('SERVER_SOFTWARE', self.env_server_software) |
| 178 self.reset_env(GOOGLE_APPLICATION_CREDENTIALS, |
| 179 self.env_google_application_credentials) |
| 180 self.reset_env('APPDATA', self.env_appdata) |
| 181 os.name = self.os_name |
| 182 |
| 183 def reset_env(self, env, value): |
| 184 """Set the environment variable 'env' to 'value'.""" |
| 185 if value is not None: |
| 186 os.environ[env] = value |
| 187 else: |
| 188 os.environ.pop(env, '') |
| 189 |
| 190 def validate_service_account_credentials(self, credentials): |
| 191 self.assertTrue(isinstance(credentials, _ServiceAccountCredentials)) |
| 192 self.assertEqual('123', credentials._service_account_id) |
| 193 self.assertEqual('dummy@google.com', credentials._service_account_email) |
| 194 self.assertEqual('ABCDEF', credentials._private_key_id) |
| 195 self.assertEqual('', credentials._scopes) |
| 196 |
| 197 def validate_google_credentials(self, credentials): |
| 198 self.assertTrue(isinstance(credentials, GoogleCredentials)) |
| 199 self.assertEqual(None, credentials.access_token) |
| 200 self.assertEqual('123', credentials.client_id) |
| 201 self.assertEqual('secret', credentials.client_secret) |
| 202 self.assertEqual('alabalaportocala', credentials.refresh_token) |
| 203 self.assertEqual(None, credentials.token_expiry) |
| 204 self.assertEqual(GOOGLE_TOKEN_URI, credentials.token_uri) |
| 205 self.assertEqual('Python client library', credentials.user_agent) |
| 206 |
| 207 def get_a_google_credentials_object(self): |
| 208 return GoogleCredentials(None, None, None, None, None, None, None, None) |
| 209 |
| 210 def test_create_scoped_required(self): |
| 211 self.assertFalse( |
| 212 self.get_a_google_credentials_object().create_scoped_required()) |
| 213 |
| 214 def test_create_scoped(self): |
| 215 credentials = self.get_a_google_credentials_object() |
| 216 self.assertEqual(credentials, credentials.create_scoped(None)) |
| 217 self.assertEqual(credentials, |
| 218 credentials.create_scoped(['dummy_scope'])) |
| 219 |
| 220 def test_get_environment_gae_production(self): |
| 221 with mock_module_import('google.appengine'): |
| 222 os.environ['SERVER_SOFTWARE'] = 'Google App Engine/XYZ' |
| 223 self.assertEqual('GAE_PRODUCTION', _get_environment()) |
| 224 |
| 225 def test_get_environment_gae_local(self): |
| 226 with mock_module_import('google.appengine'): |
| 227 os.environ['SERVER_SOFTWARE'] = 'Development/XYZ' |
| 228 self.assertEqual('GAE_LOCAL', _get_environment()) |
| 229 |
| 230 def test_get_environment_gce_production(self): |
| 231 os.environ['SERVER_SOFTWARE'] = '' |
| 232 response = MockResponse({'Metadata-Flavor': 'Google'}) |
| 233 with mock.patch.object(urllib.request, 'urlopen', |
| 234 return_value=response, |
| 235 autospec=True) as urlopen: |
| 236 self.assertEqual('GCE_PRODUCTION', _get_environment()) |
| 237 urlopen.assert_called_once_with( |
| 238 'http://169.254.169.254/', timeout=1) |
| 239 |
| 240 def test_get_environment_unknown(self): |
| 241 os.environ['SERVER_SOFTWARE'] = '' |
| 242 with mock.patch.object(urllib.request, 'urlopen', |
| 243 return_value=MockResponse({}), |
| 244 autospec=True) as urlopen: |
| 245 self.assertEqual(DEFAULT_ENV_NAME, _get_environment()) |
| 246 urlopen.assert_called_once_with( |
| 247 'http://169.254.169.254/', timeout=1) |
| 248 |
| 249 def test_get_environment_variable_file(self): |
| 250 environment_variable_file = datafile( |
| 251 os.path.join('gcloud', 'application_default_credentials.json')) |
| 252 os.environ[GOOGLE_APPLICATION_CREDENTIALS] = environment_variable_file |
| 253 self.assertEqual(environment_variable_file, |
| 254 _get_environment_variable_file()) |
| 255 |
| 256 def test_get_environment_variable_file_error(self): |
| 257 nonexistent_file = datafile('nonexistent') |
| 258 os.environ[GOOGLE_APPLICATION_CREDENTIALS] = nonexistent_file |
| 259 # we can't use self.assertRaisesRegexp() because it is only in Python 2.7+ |
| 260 try: |
| 261 _get_environment_variable_file() |
| 262 self.fail(nonexistent_file + ' should not exist.') |
| 263 except ApplicationDefaultCredentialsError as error: |
| 264 self.assertEqual('File ' + nonexistent_file + |
| 265 ' (pointed by ' + GOOGLE_APPLICATION_CREDENTIALS + |
| 266 ' environment variable) does not exist!', |
| 267 str(error)) |
| 268 |
| 269 def test_get_well_known_file_on_windows(self): |
| 270 ORIGINAL_ISDIR = os.path.isdir |
| 271 try: |
| 272 os.path.isdir = lambda path: True |
| 273 well_known_file = datafile( |
| 274 os.path.join(client._CLOUDSDK_CONFIG_DIRECTORY, |
| 275 'application_default_credentials.json')) |
| 276 os.name = 'nt' |
| 277 os.environ['APPDATA'] = DATA_DIR |
| 278 self.assertEqual(well_known_file, _get_well_known_file()) |
| 279 finally: |
| 280 os.path.isdir = ORIGINAL_ISDIR |
| 281 |
| 282 def test_get_well_known_file_with_custom_config_dir(self): |
| 283 ORIGINAL_ENVIRON = os.environ |
| 284 ORIGINAL_ISDIR = os.path.isdir |
| 285 CUSTOM_DIR = 'CUSTOM_DIR' |
| 286 EXPECTED_FILE = os.path.join(CUSTOM_DIR, |
| 287 'application_default_credentials.json') |
| 288 try: |
| 289 os.environ = {client._CLOUDSDK_CONFIG_ENV_VAR: CUSTOM_DIR} |
| 290 os.path.isdir = lambda path: True |
| 291 well_known_file = _get_well_known_file() |
| 292 self.assertEqual(well_known_file, EXPECTED_FILE) |
| 293 finally: |
| 294 os.environ = ORIGINAL_ENVIRON |
| 295 os.path.isdir = ORIGINAL_ISDIR |
| 296 |
| 297 def test_get_application_default_credential_from_file_service_account(self): |
| 298 credentials_file = datafile( |
| 299 os.path.join('gcloud', 'application_default_credentials.json')) |
| 300 credentials = _get_application_default_credential_from_file( |
| 301 credentials_file) |
| 302 self.validate_service_account_credentials(credentials) |
| 303 |
| 304 def test_save_to_well_known_file_service_account(self): |
| 305 credential_file = datafile( |
| 306 os.path.join('gcloud', 'application_default_credentials.json')) |
| 307 credentials = _get_application_default_credential_from_file( |
| 308 credential_file) |
| 309 temp_credential_file = datafile( |
| 310 os.path.join('gcloud', 'temp_well_known_file_service_account.json')) |
| 311 save_to_well_known_file(credentials, temp_credential_file) |
| 312 with open(temp_credential_file) as f: |
| 313 d = json.load(f) |
| 314 self.assertEqual('service_account', d['type']) |
| 315 self.assertEqual('123', d['client_id']) |
| 316 self.assertEqual('dummy@google.com', d['client_email']) |
| 317 self.assertEqual('ABCDEF', d['private_key_id']) |
| 318 os.remove(temp_credential_file) |
| 319 |
| 320 def test_save_well_known_file_with_non_existent_config_dir(self): |
| 321 credential_file = datafile( |
| 322 os.path.join('gcloud', 'application_default_credentials.json')) |
| 323 credentials = _get_application_default_credential_from_file( |
| 324 credential_file) |
| 325 ORIGINAL_ISDIR = os.path.isdir |
| 326 try: |
| 327 os.path.isdir = lambda path: False |
| 328 self.assertRaises(OSError, save_to_well_known_file, credentials) |
| 329 finally: |
| 330 os.path.isdir = ORIGINAL_ISDIR |
| 331 |
| 332 def test_get_application_default_credential_from_file_authorized_user(self): |
| 333 credentials_file = datafile( |
| 334 os.path.join('gcloud', |
| 335 'application_default_credentials_authorized_user.json')) |
| 336 credentials = _get_application_default_credential_from_file( |
| 337 credentials_file) |
| 338 self.validate_google_credentials(credentials) |
| 339 |
| 340 def test_save_to_well_known_file_authorized_user(self): |
| 341 credentials_file = datafile( |
| 342 os.path.join('gcloud', |
| 343 'application_default_credentials_authorized_user.json')) |
| 344 credentials = _get_application_default_credential_from_file( |
| 345 credentials_file) |
| 346 temp_credential_file = datafile( |
| 347 os.path.join('gcloud', 'temp_well_known_file_authorized_user.json')) |
| 348 save_to_well_known_file(credentials, temp_credential_file) |
| 349 with open(temp_credential_file) as f: |
| 350 d = json.load(f) |
| 351 self.assertEqual('authorized_user', d['type']) |
| 352 self.assertEqual('123', d['client_id']) |
| 353 self.assertEqual('secret', d['client_secret']) |
| 354 self.assertEqual('alabalaportocala', d['refresh_token']) |
| 355 os.remove(temp_credential_file) |
| 356 |
| 357 def test_get_application_default_credential_from_malformed_file_1(self): |
| 358 credentials_file = datafile( |
| 359 os.path.join('gcloud', |
| 360 'application_default_credentials_malformed_1.json')) |
| 361 # we can't use self.assertRaisesRegexp() because it is only in Python 2.7+ |
| 362 try: |
| 363 _get_application_default_credential_from_file(credentials_file) |
| 364 self.fail('An exception was expected!') |
| 365 except ApplicationDefaultCredentialsError as error: |
| 366 self.assertEqual("'type' field should be defined " |
| 367 "(and have one of the '" + AUTHORIZED_USER + |
| 368 "' or '" + SERVICE_ACCOUNT + "' values)", |
| 369 str(error)) |
| 370 |
| 371 def test_get_application_default_credential_from_malformed_file_2(self): |
| 372 credentials_file = datafile( |
| 373 os.path.join('gcloud', |
| 374 'application_default_credentials_malformed_2.json')) |
| 375 # we can't use self.assertRaisesRegexp() because it is only in Python 2.7+ |
| 376 try: |
| 377 _get_application_default_credential_from_file(credentials_file) |
| 378 self.fail('An exception was expected!') |
| 379 except ApplicationDefaultCredentialsError as error: |
| 380 self.assertEqual('The following field(s) must be defined: private_key_id', |
| 381 str(error)) |
| 382 |
| 383 def test_get_application_default_credential_from_malformed_file_3(self): |
| 384 credentials_file = datafile( |
| 385 os.path.join('gcloud', |
| 386 'application_default_credentials_malformed_3.json')) |
| 387 self.assertRaises(ValueError, _get_application_default_credential_from_file, |
| 388 credentials_file) |
| 389 |
| 390 def test_raise_exception_for_missing_fields(self): |
| 391 missing_fields = ['first', 'second', 'third'] |
| 392 # we can't use self.assertRaisesRegexp() because it is only in Python 2.7+ |
| 393 try: |
| 394 _raise_exception_for_missing_fields(missing_fields) |
| 395 self.fail('An exception was expected!') |
| 396 except ApplicationDefaultCredentialsError as error: |
| 397 self.assertEqual('The following field(s) must be defined: ' + |
| 398 ', '.join(missing_fields), |
| 399 str(error)) |
| 400 |
| 401 def test_raise_exception_for_reading_json(self): |
| 402 credential_file = 'any_file' |
| 403 extra_help = ' be good' |
| 404 error = ApplicationDefaultCredentialsError('stuff happens') |
| 405 # we can't use self.assertRaisesRegexp() because it is only in Python 2.7+ |
| 406 try: |
| 407 _raise_exception_for_reading_json(credential_file, extra_help, error) |
| 408 self.fail('An exception was expected!') |
| 409 except ApplicationDefaultCredentialsError as ex: |
| 410 self.assertEqual('An error was encountered while reading ' |
| 411 'json file: '+ credential_file + |
| 412 extra_help + ': ' + str(error), |
| 413 str(ex)) |
| 414 |
| 415 def test_get_application_default_from_environment_variable_service_account( |
| 416 self): |
| 417 os.environ['SERVER_SOFTWARE'] = '' |
| 418 environment_variable_file = datafile( |
| 419 os.path.join('gcloud', 'application_default_credentials.json')) |
| 420 os.environ[GOOGLE_APPLICATION_CREDENTIALS] = environment_variable_file |
| 421 self.validate_service_account_credentials( |
| 422 GoogleCredentials.get_application_default()) |
| 423 |
| 424 def test_env_name(self): |
| 425 from oauth2client import client |
| 426 self.assertEqual(None, client.SETTINGS.env_name) |
| 427 self.test_get_application_default_from_environment_variable_service_account(
) |
| 428 self.assertEqual(DEFAULT_ENV_NAME, client.SETTINGS.env_name) |
| 429 |
| 430 def test_get_application_default_from_environment_variable_authorized_user( |
| 431 self): |
| 432 os.environ['SERVER_SOFTWARE'] = '' |
| 433 environment_variable_file = datafile( |
| 434 os.path.join('gcloud', |
| 435 'application_default_credentials_authorized_user.json')) |
| 436 os.environ[GOOGLE_APPLICATION_CREDENTIALS] = environment_variable_file |
| 437 self.validate_google_credentials( |
| 438 GoogleCredentials.get_application_default()) |
| 439 |
| 440 def test_get_application_default_from_environment_variable_malformed_file( |
| 441 self): |
| 442 os.environ['SERVER_SOFTWARE'] = '' |
| 443 environment_variable_file = datafile( |
| 444 os.path.join('gcloud', |
| 445 'application_default_credentials_malformed_3.json')) |
| 446 os.environ[GOOGLE_APPLICATION_CREDENTIALS] = environment_variable_file |
| 447 # we can't use self.assertRaisesRegexp() because it is only in Python 2.7+ |
| 448 try: |
| 449 GoogleCredentials.get_application_default() |
| 450 self.fail('An exception was expected!') |
| 451 except ApplicationDefaultCredentialsError as error: |
| 452 self.assertTrue(str(error).startswith( |
| 453 'An error was encountered while reading json file: ' + |
| 454 environment_variable_file + ' (pointed to by ' + |
| 455 GOOGLE_APPLICATION_CREDENTIALS + ' environment variable):')) |
| 456 |
| 457 def test_get_application_default_environment_not_set_up(self): |
| 458 # It is normal for this test to fail if run inside |
| 459 # a Google Compute Engine VM or after 'gcloud auth login' command |
| 460 # has been executed on a non Windows machine. |
| 461 os.environ['SERVER_SOFTWARE'] = '' |
| 462 os.environ[GOOGLE_APPLICATION_CREDENTIALS] = '' |
| 463 os.environ['APPDATA'] = '' |
| 464 # we can't use self.assertRaisesRegexp() because it is only in Python 2.7+ |
| 465 VALID_CONFIG_DIR = client._CLOUDSDK_CONFIG_DIRECTORY |
| 466 ORIGINAL_ISDIR = os.path.isdir |
| 467 try: |
| 468 os.path.isdir = lambda path: True |
| 469 client._CLOUDSDK_CONFIG_DIRECTORY = 'BOGUS_CONFIG_DIR' |
| 470 GoogleCredentials.get_application_default() |
| 471 self.fail('An exception was expected!') |
| 472 except ApplicationDefaultCredentialsError as error: |
| 473 self.assertEqual(ADC_HELP_MSG, str(error)) |
| 474 finally: |
| 475 os.path.isdir = ORIGINAL_ISDIR |
| 476 client._CLOUDSDK_CONFIG_DIRECTORY = VALID_CONFIG_DIR |
| 477 |
| 478 def test_from_stream_service_account(self): |
| 479 credentials_file = datafile( |
| 480 os.path.join('gcloud', 'application_default_credentials.json')) |
| 481 credentials = ( |
| 482 self.get_a_google_credentials_object().from_stream(credentials_file)) |
| 483 self.validate_service_account_credentials(credentials) |
| 484 |
| 485 def test_from_stream_authorized_user(self): |
| 486 credentials_file = datafile( |
| 487 os.path.join('gcloud', |
| 488 'application_default_credentials_authorized_user.json')) |
| 489 credentials = ( |
| 490 self.get_a_google_credentials_object().from_stream(credentials_file)) |
| 491 self.validate_google_credentials(credentials) |
| 492 |
| 493 def test_from_stream_malformed_file_1(self): |
| 494 credentials_file = datafile( |
| 495 os.path.join('gcloud', |
| 496 'application_default_credentials_malformed_1.json')) |
| 497 # we can't use self.assertRaisesRegexp() because it is only in Python 2.7+ |
| 498 try: |
| 499 self.get_a_google_credentials_object().from_stream(credentials_file) |
| 500 self.fail('An exception was expected!') |
| 501 except ApplicationDefaultCredentialsError as error: |
| 502 self.assertEqual("An error was encountered while reading json file: " + |
| 503 credentials_file + |
| 504 " (provided as parameter to the from_stream() method): " |
| 505 "'type' field should be defined (and have one of the '" + |
| 506 AUTHORIZED_USER + "' or '" + SERVICE_ACCOUNT + |
| 507 "' values)", |
| 508 str(error)) |
| 509 |
| 510 def test_from_stream_malformed_file_2(self): |
| 511 credentials_file = datafile( |
| 512 os.path.join('gcloud', |
| 513 'application_default_credentials_malformed_2.json')) |
| 514 # we can't use self.assertRaisesRegexp() because it is only in Python 2.7+ |
| 515 try: |
| 516 self.get_a_google_credentials_object().from_stream(credentials_file) |
| 517 self.fail('An exception was expected!') |
| 518 except ApplicationDefaultCredentialsError as error: |
| 519 self.assertEqual('An error was encountered while reading json file: ' + |
| 520 credentials_file + |
| 521 ' (provided as parameter to the from_stream() method): ' |
| 522 'The following field(s) must be defined: ' |
| 523 'private_key_id', |
| 524 str(error)) |
| 525 |
| 526 def test_from_stream_malformed_file_3(self): |
| 527 credentials_file = datafile( |
| 528 os.path.join('gcloud', |
| 529 'application_default_credentials_malformed_3.json')) |
| 530 self.assertRaises( |
| 531 ApplicationDefaultCredentialsError, |
| 532 self.get_a_google_credentials_object().from_stream, credentials_file) |
| 533 |
| 534 |
| 535 class DummyDeleteStorage(Storage): |
| 536 delete_called = False |
| 537 |
| 538 def locked_delete(self): |
| 539 self.delete_called = True |
| 540 |
| 541 |
| 542 def _token_revoke_test_helper(testcase, status, revoke_raise, |
| 543 valid_bool_value, token_attr): |
| 544 current_store = getattr(testcase.credentials, 'store', None) |
| 545 |
| 546 dummy_store = DummyDeleteStorage() |
| 547 testcase.credentials.set_store(dummy_store) |
| 548 |
| 549 actual_do_revoke = testcase.credentials._do_revoke |
| 550 testcase.token_from_revoke = None |
| 551 def do_revoke_stub(http_request, token): |
| 552 testcase.token_from_revoke = token |
| 553 return actual_do_revoke(http_request, token) |
| 554 testcase.credentials._do_revoke = do_revoke_stub |
| 555 |
| 556 http = HttpMock(headers={'status': status}) |
| 557 if revoke_raise: |
| 558 testcase.assertRaises(TokenRevokeError, testcase.credentials.revoke, http) |
| 559 else: |
| 560 testcase.credentials.revoke(http) |
| 561 |
| 562 testcase.assertEqual(getattr(testcase.credentials, token_attr), |
| 563 testcase.token_from_revoke) |
| 564 testcase.assertEqual(valid_bool_value, testcase.credentials.invalid) |
| 565 testcase.assertEqual(valid_bool_value, dummy_store.delete_called) |
| 566 |
| 567 testcase.credentials.set_store(current_store) |
| 568 |
| 569 |
| 570 class BasicCredentialsTests(unittest.TestCase): |
| 571 |
| 572 def setUp(self): |
| 573 access_token = 'foo' |
| 574 client_id = 'some_client_id' |
| 575 client_secret = 'cOuDdkfjxxnv+' |
| 576 refresh_token = '1/0/a.df219fjls0' |
| 577 token_expiry = datetime.datetime.utcnow() |
| 578 user_agent = 'refresh_checker/1.0' |
| 579 self.credentials = OAuth2Credentials( |
| 580 access_token, client_id, client_secret, |
| 581 refresh_token, token_expiry, GOOGLE_TOKEN_URI, |
| 582 user_agent, revoke_uri=GOOGLE_REVOKE_URI) |
| 583 |
| 584 def test_token_refresh_success(self): |
| 585 for status_code in REFRESH_STATUS_CODES: |
| 586 token_response = {'access_token': '1/3w', 'expires_in': 3600} |
| 587 http = HttpMockSequence([ |
| 588 ({'status': status_code}, b''), |
| 589 ({'status': '200'}, json.dumps(token_response).encode('utf-8')), |
| 590 ({'status': '200'}, 'echo_request_headers'), |
| 591 ]) |
| 592 http = self.credentials.authorize(http) |
| 593 resp, content = http.request('http://example.com') |
| 594 self.assertEqual(b'Bearer 1/3w', content[b'Authorization']) |
| 595 self.assertFalse(self.credentials.access_token_expired) |
| 596 self.assertEqual(token_response, self.credentials.token_response) |
| 597 |
| 598 def test_token_refresh_failure(self): |
| 599 for status_code in REFRESH_STATUS_CODES: |
| 600 http = HttpMockSequence([ |
| 601 ({'status': status_code}, b''), |
| 602 ({'status': '400'}, b'{"error":"access_denied"}'), |
| 603 ]) |
| 604 http = self.credentials.authorize(http) |
| 605 try: |
| 606 http.request('http://example.com') |
| 607 self.fail('should raise AccessTokenRefreshError exception') |
| 608 except AccessTokenRefreshError: |
| 609 pass |
| 610 self.assertTrue(self.credentials.access_token_expired) |
| 611 self.assertEqual(None, self.credentials.token_response) |
| 612 |
| 613 def test_token_revoke_success(self): |
| 614 _token_revoke_test_helper( |
| 615 self, '200', revoke_raise=False, |
| 616 valid_bool_value=True, token_attr='refresh_token') |
| 617 |
| 618 def test_token_revoke_failure(self): |
| 619 _token_revoke_test_helper( |
| 620 self, '400', revoke_raise=True, |
| 621 valid_bool_value=False, token_attr='refresh_token') |
| 622 |
| 623 def test_token_revoke_fallback(self): |
| 624 original_credentials = self.credentials.to_json() |
| 625 self.credentials.refresh_token = None |
| 626 _token_revoke_test_helper( |
| 627 self, '200', revoke_raise=False, |
| 628 valid_bool_value=True, token_attr='access_token') |
| 629 self.credentials = self.credentials.from_json(original_credentials) |
| 630 |
| 631 def test_non_401_error_response(self): |
| 632 http = HttpMockSequence([ |
| 633 ({'status': '400'}, b''), |
| 634 ]) |
| 635 http = self.credentials.authorize(http) |
| 636 resp, content = http.request('http://example.com') |
| 637 self.assertEqual(400, resp.status) |
| 638 self.assertEqual(None, self.credentials.token_response) |
| 639 |
| 640 def test_to_from_json(self): |
| 641 json = self.credentials.to_json() |
| 642 instance = OAuth2Credentials.from_json(json) |
| 643 self.assertEqual(OAuth2Credentials, type(instance)) |
| 644 instance.token_expiry = None |
| 645 self.credentials.token_expiry = None |
| 646 |
| 647 self.assertEqual(instance.__dict__, self.credentials.__dict__) |
| 648 |
| 649 def test_from_json_token_expiry(self): |
| 650 data = json.loads(self.credentials.to_json()) |
| 651 data['token_expiry'] = None |
| 652 instance = OAuth2Credentials.from_json(json.dumps(data)) |
| 653 self.assertTrue(isinstance(instance, OAuth2Credentials)) |
| 654 |
| 655 def test_unicode_header_checks(self): |
| 656 access_token = u'foo' |
| 657 client_id = u'some_client_id' |
| 658 client_secret = u'cOuDdkfjxxnv+' |
| 659 refresh_token = u'1/0/a.df219fjls0' |
| 660 token_expiry = str(datetime.datetime.utcnow()) |
| 661 token_uri = str(GOOGLE_TOKEN_URI) |
| 662 revoke_uri = str(GOOGLE_REVOKE_URI) |
| 663 user_agent = u'refresh_checker/1.0' |
| 664 credentials = OAuth2Credentials(access_token, client_id, client_secret, |
| 665 refresh_token, token_expiry, token_uri, |
| 666 user_agent, revoke_uri=revoke_uri) |
| 667 |
| 668 # First, test that we correctly encode basic objects, making sure |
| 669 # to include a bytes object. Note that oauth2client will normalize |
| 670 # everything to bytes, no matter what python version we're in. |
| 671 http = credentials.authorize(HttpMock(headers={'status': '200'})) |
| 672 headers = {u'foo': 3, b'bar': True, 'baz': b'abc'} |
| 673 cleaned_headers = {b'foo': b'3', b'bar': b'True', b'baz': b'abc'} |
| 674 http.request(u'http://example.com', method=u'GET', headers=headers) |
| 675 for k, v in cleaned_headers.items(): |
| 676 self.assertTrue(k in http.headers) |
| 677 self.assertEqual(v, http.headers[k]) |
| 678 |
| 679 # Next, test that we do fail on unicode. |
| 680 unicode_str = six.unichr(40960) + 'abcd' |
| 681 self.assertRaises( |
| 682 NonAsciiHeaderError, |
| 683 http.request, |
| 684 u'http://example.com', method=u'GET', headers={u'foo': unicode_str}) |
| 685 |
| 686 def test_no_unicode_in_request_params(self): |
| 687 access_token = u'foo' |
| 688 client_id = u'some_client_id' |
| 689 client_secret = u'cOuDdkfjxxnv+' |
| 690 refresh_token = u'1/0/a.df219fjls0' |
| 691 token_expiry = str(datetime.datetime.utcnow()) |
| 692 token_uri = str(GOOGLE_TOKEN_URI) |
| 693 revoke_uri = str(GOOGLE_REVOKE_URI) |
| 694 user_agent = u'refresh_checker/1.0' |
| 695 credentials = OAuth2Credentials(access_token, client_id, client_secret, |
| 696 refresh_token, token_expiry, token_uri, |
| 697 user_agent, revoke_uri=revoke_uri) |
| 698 |
| 699 http = HttpMock(headers={'status': '200'}) |
| 700 http = credentials.authorize(http) |
| 701 http.request(u'http://example.com', method=u'GET', headers={u'foo': u'bar'}) |
| 702 for k, v in six.iteritems(http.headers): |
| 703 self.assertEqual(six.binary_type, type(k)) |
| 704 self.assertEqual(six.binary_type, type(v)) |
| 705 |
| 706 # Test again with unicode strings that can't simply be converted to ASCII. |
| 707 try: |
| 708 http.request( |
| 709 u'http://example.com', method=u'GET', headers={u'foo': u'\N{COMET}'}) |
| 710 self.fail('Expected exception to be raised.') |
| 711 except NonAsciiHeaderError: |
| 712 pass |
| 713 |
| 714 self.credentials.token_response = 'foobar' |
| 715 instance = OAuth2Credentials.from_json(self.credentials.to_json()) |
| 716 self.assertEqual('foobar', instance.token_response) |
| 717 |
| 718 def test_get_access_token(self): |
| 719 S = 2 # number of seconds in which the token expires |
| 720 token_response_first = {'access_token': 'first_token', 'expires_in': S} |
| 721 token_response_second = {'access_token': 'second_token', 'expires_in': S} |
| 722 http = HttpMockSequence([ |
| 723 ({'status': '200'}, json.dumps(token_response_first).encode('utf-8')), |
| 724 ({'status': '200'}, json.dumps(token_response_second).encode('utf-8')), |
| 725 ]) |
| 726 |
| 727 token = self.credentials.get_access_token(http=http) |
| 728 self.assertEqual('first_token', token.access_token) |
| 729 self.assertEqual(S - 1, token.expires_in) |
| 730 self.assertFalse(self.credentials.access_token_expired) |
| 731 self.assertEqual(token_response_first, self.credentials.token_response) |
| 732 |
| 733 token = self.credentials.get_access_token(http=http) |
| 734 self.assertEqual('first_token', token.access_token) |
| 735 self.assertEqual(S - 1, token.expires_in) |
| 736 self.assertFalse(self.credentials.access_token_expired) |
| 737 self.assertEqual(token_response_first, self.credentials.token_response) |
| 738 |
| 739 time.sleep(S + 0.5) # some margin to avoid flakiness |
| 740 self.assertTrue(self.credentials.access_token_expired) |
| 741 |
| 742 token = self.credentials.get_access_token(http=http) |
| 743 self.assertEqual('second_token', token.access_token) |
| 744 self.assertEqual(S - 1, token.expires_in) |
| 745 self.assertFalse(self.credentials.access_token_expired) |
| 746 self.assertEqual(token_response_second, self.credentials.token_response) |
| 747 |
| 748 |
| 749 class AccessTokenCredentialsTests(unittest.TestCase): |
| 750 |
| 751 def setUp(self): |
| 752 access_token = 'foo' |
| 753 user_agent = 'refresh_checker/1.0' |
| 754 self.credentials = AccessTokenCredentials(access_token, user_agent, |
| 755 revoke_uri=GOOGLE_REVOKE_URI) |
| 756 |
| 757 def test_token_refresh_success(self): |
| 758 for status_code in REFRESH_STATUS_CODES: |
| 759 http = HttpMockSequence([ |
| 760 ({'status': status_code}, b''), |
| 761 ]) |
| 762 http = self.credentials.authorize(http) |
| 763 try: |
| 764 resp, content = http.request('http://example.com') |
| 765 self.fail('should throw exception if token expires') |
| 766 except AccessTokenCredentialsError: |
| 767 pass |
| 768 except Exception: |
| 769 self.fail('should only throw AccessTokenCredentialsError') |
| 770 |
| 771 def test_token_revoke_success(self): |
| 772 _token_revoke_test_helper( |
| 773 self, '200', revoke_raise=False, |
| 774 valid_bool_value=True, token_attr='access_token') |
| 775 |
| 776 def test_token_revoke_failure(self): |
| 777 _token_revoke_test_helper( |
| 778 self, '400', revoke_raise=True, |
| 779 valid_bool_value=False, token_attr='access_token') |
| 780 |
| 781 def test_non_401_error_response(self): |
| 782 http = HttpMockSequence([ |
| 783 ({'status': '400'}, b''), |
| 784 ]) |
| 785 http = self.credentials.authorize(http) |
| 786 resp, content = http.request('http://example.com') |
| 787 self.assertEqual(400, resp.status) |
| 788 |
| 789 def test_auth_header_sent(self): |
| 790 http = HttpMockSequence([ |
| 791 ({'status': '200'}, 'echo_request_headers'), |
| 792 ]) |
| 793 http = self.credentials.authorize(http) |
| 794 resp, content = http.request('http://example.com') |
| 795 self.assertEqual(b'Bearer foo', content[b'Authorization']) |
| 796 |
| 797 |
| 798 class TestAssertionCredentials(unittest.TestCase): |
| 799 assertion_text = 'This is the assertion' |
| 800 assertion_type = 'http://www.google.com/assertionType' |
| 801 |
| 802 class AssertionCredentialsTestImpl(AssertionCredentials): |
| 803 |
| 804 def _generate_assertion(self): |
| 805 return TestAssertionCredentials.assertion_text |
| 806 |
| 807 def setUp(self): |
| 808 user_agent = 'fun/2.0' |
| 809 self.credentials = self.AssertionCredentialsTestImpl(self.assertion_type, |
| 810 user_agent=user_agent) |
| 811 |
| 812 def test_assertion_body(self): |
| 813 body = urllib.parse.parse_qs( |
| 814 self.credentials._generate_refresh_request_body()) |
| 815 self.assertEqual(self.assertion_text, body['assertion'][0]) |
| 816 self.assertEqual('urn:ietf:params:oauth:grant-type:jwt-bearer', |
| 817 body['grant_type'][0]) |
| 818 |
| 819 def test_assertion_refresh(self): |
| 820 http = HttpMockSequence([ |
| 821 ({'status': '200'}, b'{"access_token":"1/3w"}'), |
| 822 ({'status': '200'}, 'echo_request_headers'), |
| 823 ]) |
| 824 http = self.credentials.authorize(http) |
| 825 resp, content = http.request('http://example.com') |
| 826 self.assertEqual(b'Bearer 1/3w', content[b'Authorization']) |
| 827 |
| 828 def test_token_revoke_success(self): |
| 829 _token_revoke_test_helper( |
| 830 self, '200', revoke_raise=False, |
| 831 valid_bool_value=True, token_attr='access_token') |
| 832 |
| 833 def test_token_revoke_failure(self): |
| 834 _token_revoke_test_helper( |
| 835 self, '400', revoke_raise=True, |
| 836 valid_bool_value=False, token_attr='access_token') |
| 837 |
| 838 |
| 839 class UpdateQueryParamsTest(unittest.TestCase): |
| 840 def test_update_query_params_no_params(self): |
| 841 uri = 'http://www.google.com' |
| 842 updated = _update_query_params(uri, {'a': 'b'}) |
| 843 self.assertEqual(updated, uri + '?a=b') |
| 844 |
| 845 def test_update_query_params_existing_params(self): |
| 846 uri = 'http://www.google.com?x=y' |
| 847 updated = _update_query_params(uri, {'a': 'b', 'c': 'd&'}) |
| 848 hardcoded_update = uri + '&a=b&c=d%26' |
| 849 assertUrisEqual(self, updated, hardcoded_update) |
| 850 |
| 851 |
| 852 class ExtractIdTokenTest(unittest.TestCase): |
| 853 """Tests _extract_id_token().""" |
| 854 |
| 855 def test_extract_success(self): |
| 856 body = {'foo': 'bar'} |
| 857 body_json = json.dumps(body).encode('ascii') |
| 858 payload = base64.urlsafe_b64encode(body_json).strip(b'=') |
| 859 jwt = b'stuff.' + payload + b'.signature' |
| 860 |
| 861 extracted = _extract_id_token(jwt) |
| 862 self.assertEqual(extracted, body) |
| 863 |
| 864 def test_extract_failure(self): |
| 865 body = {'foo': 'bar'} |
| 866 body_json = json.dumps(body).encode('ascii') |
| 867 payload = base64.urlsafe_b64encode(body_json).strip(b'=') |
| 868 jwt = b'stuff.' + payload |
| 869 |
| 870 self.assertRaises(VerifyJwtTokenError, _extract_id_token, jwt) |
| 871 |
| 872 |
| 873 class OAuth2WebServerFlowTest(unittest.TestCase): |
| 874 |
| 875 def setUp(self): |
| 876 self.flow = OAuth2WebServerFlow( |
| 877 client_id='client_id+1', |
| 878 client_secret='secret+1', |
| 879 scope='foo', |
| 880 redirect_uri=OOB_CALLBACK_URN, |
| 881 user_agent='unittest-sample/1.0', |
| 882 revoke_uri='dummy_revoke_uri', |
| 883 ) |
| 884 |
| 885 def test_construct_authorize_url(self): |
| 886 authorize_url = self.flow.step1_get_authorize_url() |
| 887 |
| 888 parsed = urllib.parse.urlparse(authorize_url) |
| 889 q = urllib.parse.parse_qs(parsed[4]) |
| 890 self.assertEqual('client_id+1', q['client_id'][0]) |
| 891 self.assertEqual('code', q['response_type'][0]) |
| 892 self.assertEqual('foo', q['scope'][0]) |
| 893 self.assertEqual(OOB_CALLBACK_URN, q['redirect_uri'][0]) |
| 894 self.assertEqual('offline', q['access_type'][0]) |
| 895 |
| 896 def test_override_flow_via_kwargs(self): |
| 897 """Passing kwargs to override defaults.""" |
| 898 flow = OAuth2WebServerFlow( |
| 899 client_id='client_id+1', |
| 900 client_secret='secret+1', |
| 901 scope='foo', |
| 902 redirect_uri=OOB_CALLBACK_URN, |
| 903 user_agent='unittest-sample/1.0', |
| 904 access_type='online', |
| 905 response_type='token' |
| 906 ) |
| 907 authorize_url = flow.step1_get_authorize_url() |
| 908 |
| 909 parsed = urllib.parse.urlparse(authorize_url) |
| 910 q = urllib.parse.parse_qs(parsed[4]) |
| 911 self.assertEqual('client_id+1', q['client_id'][0]) |
| 912 self.assertEqual('token', q['response_type'][0]) |
| 913 self.assertEqual('foo', q['scope'][0]) |
| 914 self.assertEqual(OOB_CALLBACK_URN, q['redirect_uri'][0]) |
| 915 self.assertEqual('online', q['access_type'][0]) |
| 916 |
| 917 def test_exchange_failure(self): |
| 918 http = HttpMockSequence([ |
| 919 ({'status': '400'}, b'{"error":"invalid_request"}'), |
| 920 ]) |
| 921 |
| 922 try: |
| 923 credentials = self.flow.step2_exchange('some random code', http=http) |
| 924 self.fail('should raise exception if exchange doesn\'t get 200') |
| 925 except FlowExchangeError: |
| 926 pass |
| 927 |
| 928 def test_urlencoded_exchange_failure(self): |
| 929 http = HttpMockSequence([ |
| 930 ({'status': '400'}, b'error=invalid_request'), |
| 931 ]) |
| 932 |
| 933 try: |
| 934 credentials = self.flow.step2_exchange('some random code', http=http) |
| 935 self.fail('should raise exception if exchange doesn\'t get 200') |
| 936 except FlowExchangeError as e: |
| 937 self.assertEqual('invalid_request', str(e)) |
| 938 |
| 939 def test_exchange_failure_with_json_error(self): |
| 940 # Some providers have 'error' attribute as a JSON object |
| 941 # in place of regular string. |
| 942 # This test makes sure no strange object-to-string coversion |
| 943 # exceptions are being raised instead of FlowExchangeError. |
| 944 http = HttpMockSequence([ |
| 945 ({'status': '400'}, |
| 946 b""" {"error": { |
| 947 "type": "OAuthException", |
| 948 "message": "Error validating verification code."} }"""), |
| 949 ]) |
| 950 |
| 951 try: |
| 952 credentials = self.flow.step2_exchange('some random code', http=http) |
| 953 self.fail('should raise exception if exchange doesn\'t get 200') |
| 954 except FlowExchangeError as e: |
| 955 pass |
| 956 |
| 957 def test_exchange_success(self): |
| 958 http = HttpMockSequence([ |
| 959 ({'status': '200'}, |
| 960 b"""{ "access_token":"SlAV32hkKG", |
| 961 "expires_in":3600, |
| 962 "refresh_token":"8xLOxBtZp8" }"""), |
| 963 ]) |
| 964 |
| 965 credentials = self.flow.step2_exchange('some random code', http=http) |
| 966 self.assertEqual('SlAV32hkKG', credentials.access_token) |
| 967 self.assertNotEqual(None, credentials.token_expiry) |
| 968 self.assertEqual('8xLOxBtZp8', credentials.refresh_token) |
| 969 self.assertEqual('dummy_revoke_uri', credentials.revoke_uri) |
| 970 |
| 971 def test_exchange_dictlike(self): |
| 972 class FakeDict(object): |
| 973 def __init__(self, d): |
| 974 self.d = d |
| 975 |
| 976 def __getitem__(self, name): |
| 977 return self.d[name] |
| 978 |
| 979 def __contains__(self, name): |
| 980 return name in self.d |
| 981 |
| 982 code = 'some random code' |
| 983 not_a_dict = FakeDict({'code': code}) |
| 984 payload = (b'{' |
| 985 b' "access_token":"SlAV32hkKG",' |
| 986 b' "expires_in":3600,' |
| 987 b' "refresh_token":"8xLOxBtZp8"' |
| 988 b'}') |
| 989 http = HttpMockSequence([({'status': '200'}, payload),]) |
| 990 |
| 991 credentials = self.flow.step2_exchange(not_a_dict, http=http) |
| 992 self.assertEqual('SlAV32hkKG', credentials.access_token) |
| 993 self.assertNotEqual(None, credentials.token_expiry) |
| 994 self.assertEqual('8xLOxBtZp8', credentials.refresh_token) |
| 995 self.assertEqual('dummy_revoke_uri', credentials.revoke_uri) |
| 996 request_code = urllib.parse.parse_qs(http.requests[0]['body'])['code'][0] |
| 997 self.assertEqual(code, request_code) |
| 998 |
| 999 def test_urlencoded_exchange_success(self): |
| 1000 http = HttpMockSequence([ |
| 1001 ({'status': '200'}, b'access_token=SlAV32hkKG&expires_in=3600'), |
| 1002 ]) |
| 1003 |
| 1004 credentials = self.flow.step2_exchange('some random code', http=http) |
| 1005 self.assertEqual('SlAV32hkKG', credentials.access_token) |
| 1006 self.assertNotEqual(None, credentials.token_expiry) |
| 1007 |
| 1008 def test_urlencoded_expires_param(self): |
| 1009 http = HttpMockSequence([ |
| 1010 # Note the 'expires=3600' where you'd normally |
| 1011 # have if named 'expires_in' |
| 1012 ({'status': '200'}, b'access_token=SlAV32hkKG&expires=3600'), |
| 1013 ]) |
| 1014 |
| 1015 credentials = self.flow.step2_exchange('some random code', http=http) |
| 1016 self.assertNotEqual(None, credentials.token_expiry) |
| 1017 |
| 1018 def test_exchange_no_expires_in(self): |
| 1019 http = HttpMockSequence([ |
| 1020 ({'status': '200'}, b"""{ "access_token":"SlAV32hkKG", |
| 1021 "refresh_token":"8xLOxBtZp8" }"""), |
| 1022 ]) |
| 1023 |
| 1024 credentials = self.flow.step2_exchange('some random code', http=http) |
| 1025 self.assertEqual(None, credentials.token_expiry) |
| 1026 |
| 1027 def test_urlencoded_exchange_no_expires_in(self): |
| 1028 http = HttpMockSequence([ |
| 1029 # This might be redundant but just to make sure |
| 1030 # urlencoded access_token gets parsed correctly |
| 1031 ({'status': '200'}, b'access_token=SlAV32hkKG'), |
| 1032 ]) |
| 1033 |
| 1034 credentials = self.flow.step2_exchange('some random code', http=http) |
| 1035 self.assertEqual(None, credentials.token_expiry) |
| 1036 |
| 1037 def test_exchange_fails_if_no_code(self): |
| 1038 http = HttpMockSequence([ |
| 1039 ({'status': '200'}, b"""{ "access_token":"SlAV32hkKG", |
| 1040 "refresh_token":"8xLOxBtZp8" }"""), |
| 1041 ]) |
| 1042 |
| 1043 code = {'error': 'thou shall not pass'} |
| 1044 try: |
| 1045 credentials = self.flow.step2_exchange(code, http=http) |
| 1046 self.fail('should raise exception if no code in dictionary.') |
| 1047 except FlowExchangeError as e: |
| 1048 self.assertTrue('shall not pass' in str(e)) |
| 1049 |
| 1050 def test_exchange_id_token_fail(self): |
| 1051 http = HttpMockSequence([ |
| 1052 ({'status': '200'}, b"""{ "access_token":"SlAV32hkKG", |
| 1053 "refresh_token":"8xLOxBtZp8", |
| 1054 "id_token": "stuff.payload"}"""), |
| 1055 ]) |
| 1056 |
| 1057 self.assertRaises(VerifyJwtTokenError, self.flow.step2_exchange, |
| 1058 'some random code', http=http) |
| 1059 |
| 1060 def test_exchange_id_token(self): |
| 1061 body = {'foo': 'bar'} |
| 1062 body_json = json.dumps(body).encode('ascii') |
| 1063 payload = base64.urlsafe_b64encode(body_json).strip(b'=') |
| 1064 jwt = (base64.urlsafe_b64encode(b'stuff') + b'.' + payload + b'.' + |
| 1065 base64.urlsafe_b64encode(b'signature')) |
| 1066 |
| 1067 http = HttpMockSequence([ |
| 1068 ({'status': '200'}, ("""{ "access_token":"SlAV32hkKG", |
| 1069 "refresh_token":"8xLOxBtZp8", |
| 1070 "id_token": "%s"}""" % jwt).encode('utf-8')), |
| 1071 ]) |
| 1072 |
| 1073 credentials = self.flow.step2_exchange('some random code', http=http) |
| 1074 self.assertEqual(credentials.id_token, body) |
| 1075 |
| 1076 |
| 1077 class FlowFromCachedClientsecrets(unittest.TestCase): |
| 1078 |
| 1079 def test_flow_from_clientsecrets_cached(self): |
| 1080 cache_mock = CacheMock() |
| 1081 load_and_cache('client_secrets.json', 'some_secrets', cache_mock) |
| 1082 |
| 1083 flow = flow_from_clientsecrets( |
| 1084 'some_secrets', '', redirect_uri='oob', cache=cache_mock) |
| 1085 self.assertEqual('foo_client_secret', flow.client_secret) |
| 1086 |
| 1087 |
| 1088 class CredentialsFromCodeTests(unittest.TestCase): |
| 1089 def setUp(self): |
| 1090 self.client_id = 'client_id_abc' |
| 1091 self.client_secret = 'secret_use_code' |
| 1092 self.scope = 'foo' |
| 1093 self.code = '12345abcde' |
| 1094 self.redirect_uri = 'postmessage' |
| 1095 |
| 1096 def test_exchange_code_for_token(self): |
| 1097 token = 'asdfghjkl' |
| 1098 payload = json.dumps({'access_token': token, 'expires_in': 3600}) |
| 1099 http = HttpMockSequence([ |
| 1100 ({'status': '200'}, payload.encode('utf-8')), |
| 1101 ]) |
| 1102 credentials = credentials_from_code(self.client_id, self.client_secret, |
| 1103 self.scope, self.code, redirect_uri=self.redirect_uri, |
| 1104 http=http) |
| 1105 self.assertEqual(credentials.access_token, token) |
| 1106 self.assertNotEqual(None, credentials.token_expiry) |
| 1107 |
| 1108 def test_exchange_code_for_token_fail(self): |
| 1109 http = HttpMockSequence([ |
| 1110 ({'status': '400'}, b'{"error":"invalid_request"}'), |
| 1111 ]) |
| 1112 |
| 1113 try: |
| 1114 credentials = credentials_from_code(self.client_id, self.client_secret, |
| 1115 self.scope, self.code, redirect_uri=self.redirect_uri, |
| 1116 http=http) |
| 1117 self.fail('should raise exception if exchange doesn\'t get 200') |
| 1118 except FlowExchangeError: |
| 1119 pass |
| 1120 |
| 1121 def test_exchange_code_and_file_for_token(self): |
| 1122 http = HttpMockSequence([ |
| 1123 ({'status': '200'}, |
| 1124 b"""{ "access_token":"asdfghjkl", |
| 1125 "expires_in":3600 }"""), |
| 1126 ]) |
| 1127 credentials = credentials_from_clientsecrets_and_code( |
| 1128 datafile('client_secrets.json'), self.scope, |
| 1129 self.code, http=http) |
| 1130 self.assertEqual(credentials.access_token, 'asdfghjkl') |
| 1131 self.assertNotEqual(None, credentials.token_expiry) |
| 1132 |
| 1133 def test_exchange_code_and_cached_file_for_token(self): |
| 1134 http = HttpMockSequence([ |
| 1135 ({'status': '200'}, b'{ "access_token":"asdfghjkl"}'), |
| 1136 ]) |
| 1137 cache_mock = CacheMock() |
| 1138 load_and_cache('client_secrets.json', 'some_secrets', cache_mock) |
| 1139 |
| 1140 credentials = credentials_from_clientsecrets_and_code( |
| 1141 'some_secrets', self.scope, |
| 1142 self.code, http=http, cache=cache_mock) |
| 1143 self.assertEqual(credentials.access_token, 'asdfghjkl') |
| 1144 |
| 1145 def test_exchange_code_and_file_for_token_fail(self): |
| 1146 http = HttpMockSequence([ |
| 1147 ({'status': '400'}, b'{"error":"invalid_request"}'), |
| 1148 ]) |
| 1149 |
| 1150 try: |
| 1151 credentials = credentials_from_clientsecrets_and_code( |
| 1152 datafile('client_secrets.json'), self.scope, |
| 1153 self.code, http=http) |
| 1154 self.fail('should raise exception if exchange doesn\'t get 200') |
| 1155 except FlowExchangeError: |
| 1156 pass |
| 1157 |
| 1158 |
| 1159 class MemoryCacheTests(unittest.TestCase): |
| 1160 |
| 1161 def test_get_set_delete(self): |
| 1162 m = MemoryCache() |
| 1163 self.assertEqual(None, m.get('foo')) |
| 1164 self.assertEqual(None, m.delete('foo')) |
| 1165 m.set('foo', 'bar') |
| 1166 self.assertEqual('bar', m.get('foo')) |
| 1167 m.delete('foo') |
| 1168 self.assertEqual(None, m.get('foo')) |
| 1169 |
| 1170 |
| 1171 class Test__save_private_file(unittest.TestCase): |
| 1172 |
| 1173 def _save_helper(self, filename): |
| 1174 contents = [] |
| 1175 contents_str = '[]' |
| 1176 client._save_private_file(filename, contents) |
| 1177 with open(filename, 'r') as f: |
| 1178 stored_contents = f.read() |
| 1179 self.assertEqual(stored_contents, contents_str) |
| 1180 |
| 1181 stat_mode = os.stat(filename).st_mode |
| 1182 # Octal 777, only last 3 positions matter for permissions mask. |
| 1183 stat_mode &= 0o777 |
| 1184 self.assertEqual(stat_mode, 0o600) |
| 1185 |
| 1186 def test_new(self): |
| 1187 import tempfile |
| 1188 filename = tempfile.mktemp() |
| 1189 self.assertFalse(os.path.exists(filename)) |
| 1190 self._save_helper(filename) |
| 1191 |
| 1192 def test_existing(self): |
| 1193 import tempfile |
| 1194 filename = tempfile.mktemp() |
| 1195 with open(filename, 'w') as f: |
| 1196 f.write('a bunch of nonsense longer than []') |
| 1197 self.assertTrue(os.path.exists(filename)) |
| 1198 self._save_helper(filename) |
| 1199 |
| 1200 |
| 1201 if __name__ == '__main__': |
| 1202 unittest.main() |
OLD | NEW |