Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(513)

Side by Side Diff: third_party/gsutil/third_party/oauth2client/tests/test_oauth2client.py

Issue 1377933002: [catapult] - Copy Telemetry's gsutilz over to third_party. (Closed) Base URL: https://github.com/catapult-project/catapult.git@master
Patch Set: Rename to gsutil. Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 #!/usr/bin/python2.4
2 #
3 # Copyright 2014 Google Inc. All rights reserved.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17
18 """Oauth2client tests
19
20 Unit tests for oauth2client.
21 """
22
23 __author__ = 'jcgregorio@google.com (Joe Gregorio)'
24
25 import base64
26 import contextlib
27 import datetime
28 import json
29 import os
30 import sys
31 import time
32 import unittest
33
34 import mock
35 import six
36 from six.moves import urllib
37
38 from .http_mock import HttpMock
39 from .http_mock import HttpMockSequence
40 from oauth2client import GOOGLE_REVOKE_URI
41 from oauth2client import GOOGLE_TOKEN_URI
42 from oauth2client import client
43 from oauth2client.client import AccessTokenCredentials
44 from oauth2client.client import AccessTokenCredentialsError
45 from oauth2client.client import AccessTokenRefreshError
46 from oauth2client.client import ADC_HELP_MSG
47 from oauth2client.client import AssertionCredentials
48 from oauth2client.client import AUTHORIZED_USER
49 from oauth2client.client import Credentials
50 from oauth2client.client import DEFAULT_ENV_NAME
51 from oauth2client.client import ApplicationDefaultCredentialsError
52 from oauth2client.client import FlowExchangeError
53 from oauth2client.client import GoogleCredentials
54 from oauth2client.client import GOOGLE_APPLICATION_CREDENTIALS
55 from oauth2client.client import MemoryCache
56 from oauth2client.client import NonAsciiHeaderError
57 from oauth2client.client import OAuth2Credentials
58 from oauth2client.client import OAuth2WebServerFlow
59 from oauth2client.client import OOB_CALLBACK_URN
60 from oauth2client.client import REFRESH_STATUS_CODES
61 from oauth2client.client import SERVICE_ACCOUNT
62 from oauth2client.client import Storage
63 from oauth2client.client import TokenRevokeError
64 from oauth2client.client import VerifyJwtTokenError
65 from oauth2client.client import _extract_id_token
66 from oauth2client.client import _get_application_default_credential_from_file
67 from oauth2client.client import _get_environment
68 from oauth2client.client import _get_environment_variable_file
69 from oauth2client.client import _get_well_known_file
70 from oauth2client.client import _raise_exception_for_missing_fields
71 from oauth2client.client import _raise_exception_for_reading_json
72 from oauth2client.client import _update_query_params
73 from oauth2client.client import credentials_from_clientsecrets_and_code
74 from oauth2client.client import credentials_from_code
75 from oauth2client.client import flow_from_clientsecrets
76 from oauth2client.client import save_to_well_known_file
77 from oauth2client.clientsecrets import _loadfile
78 from oauth2client.service_account import _ServiceAccountCredentials
79
80 DATA_DIR = os.path.join(os.path.dirname(__file__), 'data')
81
82
83 # TODO(craigcitro): This is duplicated from
84 # googleapiclient.test_discovery; consolidate these definitions.
85 def assertUrisEqual(testcase, expected, actual):
86 """Test that URIs are the same, up to reordering of query parameters."""
87 expected = urllib.parse.urlparse(expected)
88 actual = urllib.parse.urlparse(actual)
89 testcase.assertEqual(expected.scheme, actual.scheme)
90 testcase.assertEqual(expected.netloc, actual.netloc)
91 testcase.assertEqual(expected.path, actual.path)
92 testcase.assertEqual(expected.params, actual.params)
93 testcase.assertEqual(expected.fragment, actual.fragment)
94 expected_query = urllib.parse.parse_qs(expected.query)
95 actual_query = urllib.parse.parse_qs(actual.query)
96 for name in expected_query.keys():
97 testcase.assertEqual(expected_query[name], actual_query[name])
98 for name in actual_query.keys():
99 testcase.assertEqual(expected_query[name], actual_query[name])
100
101
102 def datafile(filename):
103 return os.path.join(DATA_DIR, filename)
104
105
106 def load_and_cache(existing_file, fakename, cache_mock):
107 client_type, client_info = _loadfile(datafile(existing_file))
108 cache_mock.cache[fakename] = {client_type: client_info}
109
110
111 class CacheMock(object):
112 def __init__(self):
113 self.cache = {}
114
115 def get(self, key, namespace=''):
116 # ignoring namespace for easier testing
117 return self.cache.get(key, None)
118
119 def set(self, key, value, namespace=''):
120 # ignoring namespace for easier testing
121 self.cache[key] = value
122
123
124 class CredentialsTests(unittest.TestCase):
125
126 def test_to_from_json(self):
127 credentials = Credentials()
128 json = credentials.to_json()
129 restored = Credentials.new_from_json(json)
130
131
132 class MockResponse(object):
133 """Mock the response of urllib2.urlopen() call."""
134
135 def __init__(self, headers):
136 self._headers = headers
137
138 def info(self):
139 class Info:
140 def __init__(self, headers):
141 self.headers = headers
142
143 def get(self, key, default=None):
144 return self.headers.get(key, default)
145
146 return Info(self._headers)
147
148
149 @contextlib.contextmanager
150 def mock_module_import(module):
151 """Place a dummy objects in sys.modules to mock an import test."""
152 parts = module.split('.')
153 entries = ['.'.join(parts[:i+1]) for i in range(len(parts))]
154 for entry in entries:
155 sys.modules[entry] = object()
156
157 try:
158 yield
159
160 finally:
161 for entry in entries:
162 del sys.modules[entry]
163
164
165 class GoogleCredentialsTests(unittest.TestCase):
166
167 def setUp(self):
168 self.env_server_software = os.environ.get('SERVER_SOFTWARE', None)
169 self.env_google_application_credentials = (
170 os.environ.get(GOOGLE_APPLICATION_CREDENTIALS, None))
171 self.env_appdata = os.environ.get('APPDATA', None)
172 self.os_name = os.name
173 from oauth2client import client
174 client.SETTINGS.env_name = None
175
176 def tearDown(self):
177 self.reset_env('SERVER_SOFTWARE', self.env_server_software)
178 self.reset_env(GOOGLE_APPLICATION_CREDENTIALS,
179 self.env_google_application_credentials)
180 self.reset_env('APPDATA', self.env_appdata)
181 os.name = self.os_name
182
183 def reset_env(self, env, value):
184 """Set the environment variable 'env' to 'value'."""
185 if value is not None:
186 os.environ[env] = value
187 else:
188 os.environ.pop(env, '')
189
190 def validate_service_account_credentials(self, credentials):
191 self.assertTrue(isinstance(credentials, _ServiceAccountCredentials))
192 self.assertEqual('123', credentials._service_account_id)
193 self.assertEqual('dummy@google.com', credentials._service_account_email)
194 self.assertEqual('ABCDEF', credentials._private_key_id)
195 self.assertEqual('', credentials._scopes)
196
197 def validate_google_credentials(self, credentials):
198 self.assertTrue(isinstance(credentials, GoogleCredentials))
199 self.assertEqual(None, credentials.access_token)
200 self.assertEqual('123', credentials.client_id)
201 self.assertEqual('secret', credentials.client_secret)
202 self.assertEqual('alabalaportocala', credentials.refresh_token)
203 self.assertEqual(None, credentials.token_expiry)
204 self.assertEqual(GOOGLE_TOKEN_URI, credentials.token_uri)
205 self.assertEqual('Python client library', credentials.user_agent)
206
207 def get_a_google_credentials_object(self):
208 return GoogleCredentials(None, None, None, None, None, None, None, None)
209
210 def test_create_scoped_required(self):
211 self.assertFalse(
212 self.get_a_google_credentials_object().create_scoped_required())
213
214 def test_create_scoped(self):
215 credentials = self.get_a_google_credentials_object()
216 self.assertEqual(credentials, credentials.create_scoped(None))
217 self.assertEqual(credentials,
218 credentials.create_scoped(['dummy_scope']))
219
220 def test_get_environment_gae_production(self):
221 with mock_module_import('google.appengine'):
222 os.environ['SERVER_SOFTWARE'] = 'Google App Engine/XYZ'
223 self.assertEqual('GAE_PRODUCTION', _get_environment())
224
225 def test_get_environment_gae_local(self):
226 with mock_module_import('google.appengine'):
227 os.environ['SERVER_SOFTWARE'] = 'Development/XYZ'
228 self.assertEqual('GAE_LOCAL', _get_environment())
229
230 def test_get_environment_gce_production(self):
231 os.environ['SERVER_SOFTWARE'] = ''
232 response = MockResponse({'Metadata-Flavor': 'Google'})
233 with mock.patch.object(urllib.request, 'urlopen',
234 return_value=response,
235 autospec=True) as urlopen:
236 self.assertEqual('GCE_PRODUCTION', _get_environment())
237 urlopen.assert_called_once_with(
238 'http://169.254.169.254/', timeout=1)
239
240 def test_get_environment_unknown(self):
241 os.environ['SERVER_SOFTWARE'] = ''
242 with mock.patch.object(urllib.request, 'urlopen',
243 return_value=MockResponse({}),
244 autospec=True) as urlopen:
245 self.assertEqual(DEFAULT_ENV_NAME, _get_environment())
246 urlopen.assert_called_once_with(
247 'http://169.254.169.254/', timeout=1)
248
249 def test_get_environment_variable_file(self):
250 environment_variable_file = datafile(
251 os.path.join('gcloud', 'application_default_credentials.json'))
252 os.environ[GOOGLE_APPLICATION_CREDENTIALS] = environment_variable_file
253 self.assertEqual(environment_variable_file,
254 _get_environment_variable_file())
255
256 def test_get_environment_variable_file_error(self):
257 nonexistent_file = datafile('nonexistent')
258 os.environ[GOOGLE_APPLICATION_CREDENTIALS] = nonexistent_file
259 # we can't use self.assertRaisesRegexp() because it is only in Python 2.7+
260 try:
261 _get_environment_variable_file()
262 self.fail(nonexistent_file + ' should not exist.')
263 except ApplicationDefaultCredentialsError as error:
264 self.assertEqual('File ' + nonexistent_file +
265 ' (pointed by ' + GOOGLE_APPLICATION_CREDENTIALS +
266 ' environment variable) does not exist!',
267 str(error))
268
269 def test_get_well_known_file_on_windows(self):
270 ORIGINAL_ISDIR = os.path.isdir
271 try:
272 os.path.isdir = lambda path: True
273 well_known_file = datafile(
274 os.path.join(client._CLOUDSDK_CONFIG_DIRECTORY,
275 'application_default_credentials.json'))
276 os.name = 'nt'
277 os.environ['APPDATA'] = DATA_DIR
278 self.assertEqual(well_known_file, _get_well_known_file())
279 finally:
280 os.path.isdir = ORIGINAL_ISDIR
281
282 def test_get_well_known_file_with_custom_config_dir(self):
283 ORIGINAL_ENVIRON = os.environ
284 ORIGINAL_ISDIR = os.path.isdir
285 CUSTOM_DIR = 'CUSTOM_DIR'
286 EXPECTED_FILE = os.path.join(CUSTOM_DIR,
287 'application_default_credentials.json')
288 try:
289 os.environ = {client._CLOUDSDK_CONFIG_ENV_VAR: CUSTOM_DIR}
290 os.path.isdir = lambda path: True
291 well_known_file = _get_well_known_file()
292 self.assertEqual(well_known_file, EXPECTED_FILE)
293 finally:
294 os.environ = ORIGINAL_ENVIRON
295 os.path.isdir = ORIGINAL_ISDIR
296
297 def test_get_application_default_credential_from_file_service_account(self):
298 credentials_file = datafile(
299 os.path.join('gcloud', 'application_default_credentials.json'))
300 credentials = _get_application_default_credential_from_file(
301 credentials_file)
302 self.validate_service_account_credentials(credentials)
303
304 def test_save_to_well_known_file_service_account(self):
305 credential_file = datafile(
306 os.path.join('gcloud', 'application_default_credentials.json'))
307 credentials = _get_application_default_credential_from_file(
308 credential_file)
309 temp_credential_file = datafile(
310 os.path.join('gcloud', 'temp_well_known_file_service_account.json'))
311 save_to_well_known_file(credentials, temp_credential_file)
312 with open(temp_credential_file) as f:
313 d = json.load(f)
314 self.assertEqual('service_account', d['type'])
315 self.assertEqual('123', d['client_id'])
316 self.assertEqual('dummy@google.com', d['client_email'])
317 self.assertEqual('ABCDEF', d['private_key_id'])
318 os.remove(temp_credential_file)
319
320 def test_save_well_known_file_with_non_existent_config_dir(self):
321 credential_file = datafile(
322 os.path.join('gcloud', 'application_default_credentials.json'))
323 credentials = _get_application_default_credential_from_file(
324 credential_file)
325 ORIGINAL_ISDIR = os.path.isdir
326 try:
327 os.path.isdir = lambda path: False
328 self.assertRaises(OSError, save_to_well_known_file, credentials)
329 finally:
330 os.path.isdir = ORIGINAL_ISDIR
331
332 def test_get_application_default_credential_from_file_authorized_user(self):
333 credentials_file = datafile(
334 os.path.join('gcloud',
335 'application_default_credentials_authorized_user.json'))
336 credentials = _get_application_default_credential_from_file(
337 credentials_file)
338 self.validate_google_credentials(credentials)
339
340 def test_save_to_well_known_file_authorized_user(self):
341 credentials_file = datafile(
342 os.path.join('gcloud',
343 'application_default_credentials_authorized_user.json'))
344 credentials = _get_application_default_credential_from_file(
345 credentials_file)
346 temp_credential_file = datafile(
347 os.path.join('gcloud', 'temp_well_known_file_authorized_user.json'))
348 save_to_well_known_file(credentials, temp_credential_file)
349 with open(temp_credential_file) as f:
350 d = json.load(f)
351 self.assertEqual('authorized_user', d['type'])
352 self.assertEqual('123', d['client_id'])
353 self.assertEqual('secret', d['client_secret'])
354 self.assertEqual('alabalaportocala', d['refresh_token'])
355 os.remove(temp_credential_file)
356
357 def test_get_application_default_credential_from_malformed_file_1(self):
358 credentials_file = datafile(
359 os.path.join('gcloud',
360 'application_default_credentials_malformed_1.json'))
361 # we can't use self.assertRaisesRegexp() because it is only in Python 2.7+
362 try:
363 _get_application_default_credential_from_file(credentials_file)
364 self.fail('An exception was expected!')
365 except ApplicationDefaultCredentialsError as error:
366 self.assertEqual("'type' field should be defined "
367 "(and have one of the '" + AUTHORIZED_USER +
368 "' or '" + SERVICE_ACCOUNT + "' values)",
369 str(error))
370
371 def test_get_application_default_credential_from_malformed_file_2(self):
372 credentials_file = datafile(
373 os.path.join('gcloud',
374 'application_default_credentials_malformed_2.json'))
375 # we can't use self.assertRaisesRegexp() because it is only in Python 2.7+
376 try:
377 _get_application_default_credential_from_file(credentials_file)
378 self.fail('An exception was expected!')
379 except ApplicationDefaultCredentialsError as error:
380 self.assertEqual('The following field(s) must be defined: private_key_id',
381 str(error))
382
383 def test_get_application_default_credential_from_malformed_file_3(self):
384 credentials_file = datafile(
385 os.path.join('gcloud',
386 'application_default_credentials_malformed_3.json'))
387 self.assertRaises(ValueError, _get_application_default_credential_from_file,
388 credentials_file)
389
390 def test_raise_exception_for_missing_fields(self):
391 missing_fields = ['first', 'second', 'third']
392 # we can't use self.assertRaisesRegexp() because it is only in Python 2.7+
393 try:
394 _raise_exception_for_missing_fields(missing_fields)
395 self.fail('An exception was expected!')
396 except ApplicationDefaultCredentialsError as error:
397 self.assertEqual('The following field(s) must be defined: ' +
398 ', '.join(missing_fields),
399 str(error))
400
401 def test_raise_exception_for_reading_json(self):
402 credential_file = 'any_file'
403 extra_help = ' be good'
404 error = ApplicationDefaultCredentialsError('stuff happens')
405 # we can't use self.assertRaisesRegexp() because it is only in Python 2.7+
406 try:
407 _raise_exception_for_reading_json(credential_file, extra_help, error)
408 self.fail('An exception was expected!')
409 except ApplicationDefaultCredentialsError as ex:
410 self.assertEqual('An error was encountered while reading '
411 'json file: '+ credential_file +
412 extra_help + ': ' + str(error),
413 str(ex))
414
415 def test_get_application_default_from_environment_variable_service_account(
416 self):
417 os.environ['SERVER_SOFTWARE'] = ''
418 environment_variable_file = datafile(
419 os.path.join('gcloud', 'application_default_credentials.json'))
420 os.environ[GOOGLE_APPLICATION_CREDENTIALS] = environment_variable_file
421 self.validate_service_account_credentials(
422 GoogleCredentials.get_application_default())
423
424 def test_env_name(self):
425 from oauth2client import client
426 self.assertEqual(None, client.SETTINGS.env_name)
427 self.test_get_application_default_from_environment_variable_service_account( )
428 self.assertEqual(DEFAULT_ENV_NAME, client.SETTINGS.env_name)
429
430 def test_get_application_default_from_environment_variable_authorized_user(
431 self):
432 os.environ['SERVER_SOFTWARE'] = ''
433 environment_variable_file = datafile(
434 os.path.join('gcloud',
435 'application_default_credentials_authorized_user.json'))
436 os.environ[GOOGLE_APPLICATION_CREDENTIALS] = environment_variable_file
437 self.validate_google_credentials(
438 GoogleCredentials.get_application_default())
439
440 def test_get_application_default_from_environment_variable_malformed_file(
441 self):
442 os.environ['SERVER_SOFTWARE'] = ''
443 environment_variable_file = datafile(
444 os.path.join('gcloud',
445 'application_default_credentials_malformed_3.json'))
446 os.environ[GOOGLE_APPLICATION_CREDENTIALS] = environment_variable_file
447 # we can't use self.assertRaisesRegexp() because it is only in Python 2.7+
448 try:
449 GoogleCredentials.get_application_default()
450 self.fail('An exception was expected!')
451 except ApplicationDefaultCredentialsError as error:
452 self.assertTrue(str(error).startswith(
453 'An error was encountered while reading json file: ' +
454 environment_variable_file + ' (pointed to by ' +
455 GOOGLE_APPLICATION_CREDENTIALS + ' environment variable):'))
456
457 def test_get_application_default_environment_not_set_up(self):
458 # It is normal for this test to fail if run inside
459 # a Google Compute Engine VM or after 'gcloud auth login' command
460 # has been executed on a non Windows machine.
461 os.environ['SERVER_SOFTWARE'] = ''
462 os.environ[GOOGLE_APPLICATION_CREDENTIALS] = ''
463 os.environ['APPDATA'] = ''
464 # we can't use self.assertRaisesRegexp() because it is only in Python 2.7+
465 VALID_CONFIG_DIR = client._CLOUDSDK_CONFIG_DIRECTORY
466 ORIGINAL_ISDIR = os.path.isdir
467 try:
468 os.path.isdir = lambda path: True
469 client._CLOUDSDK_CONFIG_DIRECTORY = 'BOGUS_CONFIG_DIR'
470 GoogleCredentials.get_application_default()
471 self.fail('An exception was expected!')
472 except ApplicationDefaultCredentialsError as error:
473 self.assertEqual(ADC_HELP_MSG, str(error))
474 finally:
475 os.path.isdir = ORIGINAL_ISDIR
476 client._CLOUDSDK_CONFIG_DIRECTORY = VALID_CONFIG_DIR
477
478 def test_from_stream_service_account(self):
479 credentials_file = datafile(
480 os.path.join('gcloud', 'application_default_credentials.json'))
481 credentials = (
482 self.get_a_google_credentials_object().from_stream(credentials_file))
483 self.validate_service_account_credentials(credentials)
484
485 def test_from_stream_authorized_user(self):
486 credentials_file = datafile(
487 os.path.join('gcloud',
488 'application_default_credentials_authorized_user.json'))
489 credentials = (
490 self.get_a_google_credentials_object().from_stream(credentials_file))
491 self.validate_google_credentials(credentials)
492
493 def test_from_stream_malformed_file_1(self):
494 credentials_file = datafile(
495 os.path.join('gcloud',
496 'application_default_credentials_malformed_1.json'))
497 # we can't use self.assertRaisesRegexp() because it is only in Python 2.7+
498 try:
499 self.get_a_google_credentials_object().from_stream(credentials_file)
500 self.fail('An exception was expected!')
501 except ApplicationDefaultCredentialsError as error:
502 self.assertEqual("An error was encountered while reading json file: " +
503 credentials_file +
504 " (provided as parameter to the from_stream() method): "
505 "'type' field should be defined (and have one of the '" +
506 AUTHORIZED_USER + "' or '" + SERVICE_ACCOUNT +
507 "' values)",
508 str(error))
509
510 def test_from_stream_malformed_file_2(self):
511 credentials_file = datafile(
512 os.path.join('gcloud',
513 'application_default_credentials_malformed_2.json'))
514 # we can't use self.assertRaisesRegexp() because it is only in Python 2.7+
515 try:
516 self.get_a_google_credentials_object().from_stream(credentials_file)
517 self.fail('An exception was expected!')
518 except ApplicationDefaultCredentialsError as error:
519 self.assertEqual('An error was encountered while reading json file: ' +
520 credentials_file +
521 ' (provided as parameter to the from_stream() method): '
522 'The following field(s) must be defined: '
523 'private_key_id',
524 str(error))
525
526 def test_from_stream_malformed_file_3(self):
527 credentials_file = datafile(
528 os.path.join('gcloud',
529 'application_default_credentials_malformed_3.json'))
530 self.assertRaises(
531 ApplicationDefaultCredentialsError,
532 self.get_a_google_credentials_object().from_stream, credentials_file)
533
534
535 class DummyDeleteStorage(Storage):
536 delete_called = False
537
538 def locked_delete(self):
539 self.delete_called = True
540
541
542 def _token_revoke_test_helper(testcase, status, revoke_raise,
543 valid_bool_value, token_attr):
544 current_store = getattr(testcase.credentials, 'store', None)
545
546 dummy_store = DummyDeleteStorage()
547 testcase.credentials.set_store(dummy_store)
548
549 actual_do_revoke = testcase.credentials._do_revoke
550 testcase.token_from_revoke = None
551 def do_revoke_stub(http_request, token):
552 testcase.token_from_revoke = token
553 return actual_do_revoke(http_request, token)
554 testcase.credentials._do_revoke = do_revoke_stub
555
556 http = HttpMock(headers={'status': status})
557 if revoke_raise:
558 testcase.assertRaises(TokenRevokeError, testcase.credentials.revoke, http)
559 else:
560 testcase.credentials.revoke(http)
561
562 testcase.assertEqual(getattr(testcase.credentials, token_attr),
563 testcase.token_from_revoke)
564 testcase.assertEqual(valid_bool_value, testcase.credentials.invalid)
565 testcase.assertEqual(valid_bool_value, dummy_store.delete_called)
566
567 testcase.credentials.set_store(current_store)
568
569
570 class BasicCredentialsTests(unittest.TestCase):
571
572 def setUp(self):
573 access_token = 'foo'
574 client_id = 'some_client_id'
575 client_secret = 'cOuDdkfjxxnv+'
576 refresh_token = '1/0/a.df219fjls0'
577 token_expiry = datetime.datetime.utcnow()
578 user_agent = 'refresh_checker/1.0'
579 self.credentials = OAuth2Credentials(
580 access_token, client_id, client_secret,
581 refresh_token, token_expiry, GOOGLE_TOKEN_URI,
582 user_agent, revoke_uri=GOOGLE_REVOKE_URI)
583
584 def test_token_refresh_success(self):
585 for status_code in REFRESH_STATUS_CODES:
586 token_response = {'access_token': '1/3w', 'expires_in': 3600}
587 http = HttpMockSequence([
588 ({'status': status_code}, b''),
589 ({'status': '200'}, json.dumps(token_response).encode('utf-8')),
590 ({'status': '200'}, 'echo_request_headers'),
591 ])
592 http = self.credentials.authorize(http)
593 resp, content = http.request('http://example.com')
594 self.assertEqual(b'Bearer 1/3w', content[b'Authorization'])
595 self.assertFalse(self.credentials.access_token_expired)
596 self.assertEqual(token_response, self.credentials.token_response)
597
598 def test_token_refresh_failure(self):
599 for status_code in REFRESH_STATUS_CODES:
600 http = HttpMockSequence([
601 ({'status': status_code}, b''),
602 ({'status': '400'}, b'{"error":"access_denied"}'),
603 ])
604 http = self.credentials.authorize(http)
605 try:
606 http.request('http://example.com')
607 self.fail('should raise AccessTokenRefreshError exception')
608 except AccessTokenRefreshError:
609 pass
610 self.assertTrue(self.credentials.access_token_expired)
611 self.assertEqual(None, self.credentials.token_response)
612
613 def test_token_revoke_success(self):
614 _token_revoke_test_helper(
615 self, '200', revoke_raise=False,
616 valid_bool_value=True, token_attr='refresh_token')
617
618 def test_token_revoke_failure(self):
619 _token_revoke_test_helper(
620 self, '400', revoke_raise=True,
621 valid_bool_value=False, token_attr='refresh_token')
622
623 def test_token_revoke_fallback(self):
624 original_credentials = self.credentials.to_json()
625 self.credentials.refresh_token = None
626 _token_revoke_test_helper(
627 self, '200', revoke_raise=False,
628 valid_bool_value=True, token_attr='access_token')
629 self.credentials = self.credentials.from_json(original_credentials)
630
631 def test_non_401_error_response(self):
632 http = HttpMockSequence([
633 ({'status': '400'}, b''),
634 ])
635 http = self.credentials.authorize(http)
636 resp, content = http.request('http://example.com')
637 self.assertEqual(400, resp.status)
638 self.assertEqual(None, self.credentials.token_response)
639
640 def test_to_from_json(self):
641 json = self.credentials.to_json()
642 instance = OAuth2Credentials.from_json(json)
643 self.assertEqual(OAuth2Credentials, type(instance))
644 instance.token_expiry = None
645 self.credentials.token_expiry = None
646
647 self.assertEqual(instance.__dict__, self.credentials.__dict__)
648
649 def test_from_json_token_expiry(self):
650 data = json.loads(self.credentials.to_json())
651 data['token_expiry'] = None
652 instance = OAuth2Credentials.from_json(json.dumps(data))
653 self.assertTrue(isinstance(instance, OAuth2Credentials))
654
655 def test_unicode_header_checks(self):
656 access_token = u'foo'
657 client_id = u'some_client_id'
658 client_secret = u'cOuDdkfjxxnv+'
659 refresh_token = u'1/0/a.df219fjls0'
660 token_expiry = str(datetime.datetime.utcnow())
661 token_uri = str(GOOGLE_TOKEN_URI)
662 revoke_uri = str(GOOGLE_REVOKE_URI)
663 user_agent = u'refresh_checker/1.0'
664 credentials = OAuth2Credentials(access_token, client_id, client_secret,
665 refresh_token, token_expiry, token_uri,
666 user_agent, revoke_uri=revoke_uri)
667
668 # First, test that we correctly encode basic objects, making sure
669 # to include a bytes object. Note that oauth2client will normalize
670 # everything to bytes, no matter what python version we're in.
671 http = credentials.authorize(HttpMock(headers={'status': '200'}))
672 headers = {u'foo': 3, b'bar': True, 'baz': b'abc'}
673 cleaned_headers = {b'foo': b'3', b'bar': b'True', b'baz': b'abc'}
674 http.request(u'http://example.com', method=u'GET', headers=headers)
675 for k, v in cleaned_headers.items():
676 self.assertTrue(k in http.headers)
677 self.assertEqual(v, http.headers[k])
678
679 # Next, test that we do fail on unicode.
680 unicode_str = six.unichr(40960) + 'abcd'
681 self.assertRaises(
682 NonAsciiHeaderError,
683 http.request,
684 u'http://example.com', method=u'GET', headers={u'foo': unicode_str})
685
686 def test_no_unicode_in_request_params(self):
687 access_token = u'foo'
688 client_id = u'some_client_id'
689 client_secret = u'cOuDdkfjxxnv+'
690 refresh_token = u'1/0/a.df219fjls0'
691 token_expiry = str(datetime.datetime.utcnow())
692 token_uri = str(GOOGLE_TOKEN_URI)
693 revoke_uri = str(GOOGLE_REVOKE_URI)
694 user_agent = u'refresh_checker/1.0'
695 credentials = OAuth2Credentials(access_token, client_id, client_secret,
696 refresh_token, token_expiry, token_uri,
697 user_agent, revoke_uri=revoke_uri)
698
699 http = HttpMock(headers={'status': '200'})
700 http = credentials.authorize(http)
701 http.request(u'http://example.com', method=u'GET', headers={u'foo': u'bar'})
702 for k, v in six.iteritems(http.headers):
703 self.assertEqual(six.binary_type, type(k))
704 self.assertEqual(six.binary_type, type(v))
705
706 # Test again with unicode strings that can't simply be converted to ASCII.
707 try:
708 http.request(
709 u'http://example.com', method=u'GET', headers={u'foo': u'\N{COMET}'})
710 self.fail('Expected exception to be raised.')
711 except NonAsciiHeaderError:
712 pass
713
714 self.credentials.token_response = 'foobar'
715 instance = OAuth2Credentials.from_json(self.credentials.to_json())
716 self.assertEqual('foobar', instance.token_response)
717
718 def test_get_access_token(self):
719 S = 2 # number of seconds in which the token expires
720 token_response_first = {'access_token': 'first_token', 'expires_in': S}
721 token_response_second = {'access_token': 'second_token', 'expires_in': S}
722 http = HttpMockSequence([
723 ({'status': '200'}, json.dumps(token_response_first).encode('utf-8')),
724 ({'status': '200'}, json.dumps(token_response_second).encode('utf-8')),
725 ])
726
727 token = self.credentials.get_access_token(http=http)
728 self.assertEqual('first_token', token.access_token)
729 self.assertEqual(S - 1, token.expires_in)
730 self.assertFalse(self.credentials.access_token_expired)
731 self.assertEqual(token_response_first, self.credentials.token_response)
732
733 token = self.credentials.get_access_token(http=http)
734 self.assertEqual('first_token', token.access_token)
735 self.assertEqual(S - 1, token.expires_in)
736 self.assertFalse(self.credentials.access_token_expired)
737 self.assertEqual(token_response_first, self.credentials.token_response)
738
739 time.sleep(S + 0.5) # some margin to avoid flakiness
740 self.assertTrue(self.credentials.access_token_expired)
741
742 token = self.credentials.get_access_token(http=http)
743 self.assertEqual('second_token', token.access_token)
744 self.assertEqual(S - 1, token.expires_in)
745 self.assertFalse(self.credentials.access_token_expired)
746 self.assertEqual(token_response_second, self.credentials.token_response)
747
748
749 class AccessTokenCredentialsTests(unittest.TestCase):
750
751 def setUp(self):
752 access_token = 'foo'
753 user_agent = 'refresh_checker/1.0'
754 self.credentials = AccessTokenCredentials(access_token, user_agent,
755 revoke_uri=GOOGLE_REVOKE_URI)
756
757 def test_token_refresh_success(self):
758 for status_code in REFRESH_STATUS_CODES:
759 http = HttpMockSequence([
760 ({'status': status_code}, b''),
761 ])
762 http = self.credentials.authorize(http)
763 try:
764 resp, content = http.request('http://example.com')
765 self.fail('should throw exception if token expires')
766 except AccessTokenCredentialsError:
767 pass
768 except Exception:
769 self.fail('should only throw AccessTokenCredentialsError')
770
771 def test_token_revoke_success(self):
772 _token_revoke_test_helper(
773 self, '200', revoke_raise=False,
774 valid_bool_value=True, token_attr='access_token')
775
776 def test_token_revoke_failure(self):
777 _token_revoke_test_helper(
778 self, '400', revoke_raise=True,
779 valid_bool_value=False, token_attr='access_token')
780
781 def test_non_401_error_response(self):
782 http = HttpMockSequence([
783 ({'status': '400'}, b''),
784 ])
785 http = self.credentials.authorize(http)
786 resp, content = http.request('http://example.com')
787 self.assertEqual(400, resp.status)
788
789 def test_auth_header_sent(self):
790 http = HttpMockSequence([
791 ({'status': '200'}, 'echo_request_headers'),
792 ])
793 http = self.credentials.authorize(http)
794 resp, content = http.request('http://example.com')
795 self.assertEqual(b'Bearer foo', content[b'Authorization'])
796
797
798 class TestAssertionCredentials(unittest.TestCase):
799 assertion_text = 'This is the assertion'
800 assertion_type = 'http://www.google.com/assertionType'
801
802 class AssertionCredentialsTestImpl(AssertionCredentials):
803
804 def _generate_assertion(self):
805 return TestAssertionCredentials.assertion_text
806
807 def setUp(self):
808 user_agent = 'fun/2.0'
809 self.credentials = self.AssertionCredentialsTestImpl(self.assertion_type,
810 user_agent=user_agent)
811
812 def test_assertion_body(self):
813 body = urllib.parse.parse_qs(
814 self.credentials._generate_refresh_request_body())
815 self.assertEqual(self.assertion_text, body['assertion'][0])
816 self.assertEqual('urn:ietf:params:oauth:grant-type:jwt-bearer',
817 body['grant_type'][0])
818
819 def test_assertion_refresh(self):
820 http = HttpMockSequence([
821 ({'status': '200'}, b'{"access_token":"1/3w"}'),
822 ({'status': '200'}, 'echo_request_headers'),
823 ])
824 http = self.credentials.authorize(http)
825 resp, content = http.request('http://example.com')
826 self.assertEqual(b'Bearer 1/3w', content[b'Authorization'])
827
828 def test_token_revoke_success(self):
829 _token_revoke_test_helper(
830 self, '200', revoke_raise=False,
831 valid_bool_value=True, token_attr='access_token')
832
833 def test_token_revoke_failure(self):
834 _token_revoke_test_helper(
835 self, '400', revoke_raise=True,
836 valid_bool_value=False, token_attr='access_token')
837
838
839 class UpdateQueryParamsTest(unittest.TestCase):
840 def test_update_query_params_no_params(self):
841 uri = 'http://www.google.com'
842 updated = _update_query_params(uri, {'a': 'b'})
843 self.assertEqual(updated, uri + '?a=b')
844
845 def test_update_query_params_existing_params(self):
846 uri = 'http://www.google.com?x=y'
847 updated = _update_query_params(uri, {'a': 'b', 'c': 'd&'})
848 hardcoded_update = uri + '&a=b&c=d%26'
849 assertUrisEqual(self, updated, hardcoded_update)
850
851
852 class ExtractIdTokenTest(unittest.TestCase):
853 """Tests _extract_id_token()."""
854
855 def test_extract_success(self):
856 body = {'foo': 'bar'}
857 body_json = json.dumps(body).encode('ascii')
858 payload = base64.urlsafe_b64encode(body_json).strip(b'=')
859 jwt = b'stuff.' + payload + b'.signature'
860
861 extracted = _extract_id_token(jwt)
862 self.assertEqual(extracted, body)
863
864 def test_extract_failure(self):
865 body = {'foo': 'bar'}
866 body_json = json.dumps(body).encode('ascii')
867 payload = base64.urlsafe_b64encode(body_json).strip(b'=')
868 jwt = b'stuff.' + payload
869
870 self.assertRaises(VerifyJwtTokenError, _extract_id_token, jwt)
871
872
873 class OAuth2WebServerFlowTest(unittest.TestCase):
874
875 def setUp(self):
876 self.flow = OAuth2WebServerFlow(
877 client_id='client_id+1',
878 client_secret='secret+1',
879 scope='foo',
880 redirect_uri=OOB_CALLBACK_URN,
881 user_agent='unittest-sample/1.0',
882 revoke_uri='dummy_revoke_uri',
883 )
884
885 def test_construct_authorize_url(self):
886 authorize_url = self.flow.step1_get_authorize_url()
887
888 parsed = urllib.parse.urlparse(authorize_url)
889 q = urllib.parse.parse_qs(parsed[4])
890 self.assertEqual('client_id+1', q['client_id'][0])
891 self.assertEqual('code', q['response_type'][0])
892 self.assertEqual('foo', q['scope'][0])
893 self.assertEqual(OOB_CALLBACK_URN, q['redirect_uri'][0])
894 self.assertEqual('offline', q['access_type'][0])
895
896 def test_override_flow_via_kwargs(self):
897 """Passing kwargs to override defaults."""
898 flow = OAuth2WebServerFlow(
899 client_id='client_id+1',
900 client_secret='secret+1',
901 scope='foo',
902 redirect_uri=OOB_CALLBACK_URN,
903 user_agent='unittest-sample/1.0',
904 access_type='online',
905 response_type='token'
906 )
907 authorize_url = flow.step1_get_authorize_url()
908
909 parsed = urllib.parse.urlparse(authorize_url)
910 q = urllib.parse.parse_qs(parsed[4])
911 self.assertEqual('client_id+1', q['client_id'][0])
912 self.assertEqual('token', q['response_type'][0])
913 self.assertEqual('foo', q['scope'][0])
914 self.assertEqual(OOB_CALLBACK_URN, q['redirect_uri'][0])
915 self.assertEqual('online', q['access_type'][0])
916
917 def test_exchange_failure(self):
918 http = HttpMockSequence([
919 ({'status': '400'}, b'{"error":"invalid_request"}'),
920 ])
921
922 try:
923 credentials = self.flow.step2_exchange('some random code', http=http)
924 self.fail('should raise exception if exchange doesn\'t get 200')
925 except FlowExchangeError:
926 pass
927
928 def test_urlencoded_exchange_failure(self):
929 http = HttpMockSequence([
930 ({'status': '400'}, b'error=invalid_request'),
931 ])
932
933 try:
934 credentials = self.flow.step2_exchange('some random code', http=http)
935 self.fail('should raise exception if exchange doesn\'t get 200')
936 except FlowExchangeError as e:
937 self.assertEqual('invalid_request', str(e))
938
939 def test_exchange_failure_with_json_error(self):
940 # Some providers have 'error' attribute as a JSON object
941 # in place of regular string.
942 # This test makes sure no strange object-to-string coversion
943 # exceptions are being raised instead of FlowExchangeError.
944 http = HttpMockSequence([
945 ({'status': '400'},
946 b""" {"error": {
947 "type": "OAuthException",
948 "message": "Error validating verification code."} }"""),
949 ])
950
951 try:
952 credentials = self.flow.step2_exchange('some random code', http=http)
953 self.fail('should raise exception if exchange doesn\'t get 200')
954 except FlowExchangeError as e:
955 pass
956
957 def test_exchange_success(self):
958 http = HttpMockSequence([
959 ({'status': '200'},
960 b"""{ "access_token":"SlAV32hkKG",
961 "expires_in":3600,
962 "refresh_token":"8xLOxBtZp8" }"""),
963 ])
964
965 credentials = self.flow.step2_exchange('some random code', http=http)
966 self.assertEqual('SlAV32hkKG', credentials.access_token)
967 self.assertNotEqual(None, credentials.token_expiry)
968 self.assertEqual('8xLOxBtZp8', credentials.refresh_token)
969 self.assertEqual('dummy_revoke_uri', credentials.revoke_uri)
970
971 def test_exchange_dictlike(self):
972 class FakeDict(object):
973 def __init__(self, d):
974 self.d = d
975
976 def __getitem__(self, name):
977 return self.d[name]
978
979 def __contains__(self, name):
980 return name in self.d
981
982 code = 'some random code'
983 not_a_dict = FakeDict({'code': code})
984 payload = (b'{'
985 b' "access_token":"SlAV32hkKG",'
986 b' "expires_in":3600,'
987 b' "refresh_token":"8xLOxBtZp8"'
988 b'}')
989 http = HttpMockSequence([({'status': '200'}, payload),])
990
991 credentials = self.flow.step2_exchange(not_a_dict, http=http)
992 self.assertEqual('SlAV32hkKG', credentials.access_token)
993 self.assertNotEqual(None, credentials.token_expiry)
994 self.assertEqual('8xLOxBtZp8', credentials.refresh_token)
995 self.assertEqual('dummy_revoke_uri', credentials.revoke_uri)
996 request_code = urllib.parse.parse_qs(http.requests[0]['body'])['code'][0]
997 self.assertEqual(code, request_code)
998
999 def test_urlencoded_exchange_success(self):
1000 http = HttpMockSequence([
1001 ({'status': '200'}, b'access_token=SlAV32hkKG&expires_in=3600'),
1002 ])
1003
1004 credentials = self.flow.step2_exchange('some random code', http=http)
1005 self.assertEqual('SlAV32hkKG', credentials.access_token)
1006 self.assertNotEqual(None, credentials.token_expiry)
1007
1008 def test_urlencoded_expires_param(self):
1009 http = HttpMockSequence([
1010 # Note the 'expires=3600' where you'd normally
1011 # have if named 'expires_in'
1012 ({'status': '200'}, b'access_token=SlAV32hkKG&expires=3600'),
1013 ])
1014
1015 credentials = self.flow.step2_exchange('some random code', http=http)
1016 self.assertNotEqual(None, credentials.token_expiry)
1017
1018 def test_exchange_no_expires_in(self):
1019 http = HttpMockSequence([
1020 ({'status': '200'}, b"""{ "access_token":"SlAV32hkKG",
1021 "refresh_token":"8xLOxBtZp8" }"""),
1022 ])
1023
1024 credentials = self.flow.step2_exchange('some random code', http=http)
1025 self.assertEqual(None, credentials.token_expiry)
1026
1027 def test_urlencoded_exchange_no_expires_in(self):
1028 http = HttpMockSequence([
1029 # This might be redundant but just to make sure
1030 # urlencoded access_token gets parsed correctly
1031 ({'status': '200'}, b'access_token=SlAV32hkKG'),
1032 ])
1033
1034 credentials = self.flow.step2_exchange('some random code', http=http)
1035 self.assertEqual(None, credentials.token_expiry)
1036
1037 def test_exchange_fails_if_no_code(self):
1038 http = HttpMockSequence([
1039 ({'status': '200'}, b"""{ "access_token":"SlAV32hkKG",
1040 "refresh_token":"8xLOxBtZp8" }"""),
1041 ])
1042
1043 code = {'error': 'thou shall not pass'}
1044 try:
1045 credentials = self.flow.step2_exchange(code, http=http)
1046 self.fail('should raise exception if no code in dictionary.')
1047 except FlowExchangeError as e:
1048 self.assertTrue('shall not pass' in str(e))
1049
1050 def test_exchange_id_token_fail(self):
1051 http = HttpMockSequence([
1052 ({'status': '200'}, b"""{ "access_token":"SlAV32hkKG",
1053 "refresh_token":"8xLOxBtZp8",
1054 "id_token": "stuff.payload"}"""),
1055 ])
1056
1057 self.assertRaises(VerifyJwtTokenError, self.flow.step2_exchange,
1058 'some random code', http=http)
1059
1060 def test_exchange_id_token(self):
1061 body = {'foo': 'bar'}
1062 body_json = json.dumps(body).encode('ascii')
1063 payload = base64.urlsafe_b64encode(body_json).strip(b'=')
1064 jwt = (base64.urlsafe_b64encode(b'stuff') + b'.' + payload + b'.' +
1065 base64.urlsafe_b64encode(b'signature'))
1066
1067 http = HttpMockSequence([
1068 ({'status': '200'}, ("""{ "access_token":"SlAV32hkKG",
1069 "refresh_token":"8xLOxBtZp8",
1070 "id_token": "%s"}""" % jwt).encode('utf-8')),
1071 ])
1072
1073 credentials = self.flow.step2_exchange('some random code', http=http)
1074 self.assertEqual(credentials.id_token, body)
1075
1076
1077 class FlowFromCachedClientsecrets(unittest.TestCase):
1078
1079 def test_flow_from_clientsecrets_cached(self):
1080 cache_mock = CacheMock()
1081 load_and_cache('client_secrets.json', 'some_secrets', cache_mock)
1082
1083 flow = flow_from_clientsecrets(
1084 'some_secrets', '', redirect_uri='oob', cache=cache_mock)
1085 self.assertEqual('foo_client_secret', flow.client_secret)
1086
1087
1088 class CredentialsFromCodeTests(unittest.TestCase):
1089 def setUp(self):
1090 self.client_id = 'client_id_abc'
1091 self.client_secret = 'secret_use_code'
1092 self.scope = 'foo'
1093 self.code = '12345abcde'
1094 self.redirect_uri = 'postmessage'
1095
1096 def test_exchange_code_for_token(self):
1097 token = 'asdfghjkl'
1098 payload = json.dumps({'access_token': token, 'expires_in': 3600})
1099 http = HttpMockSequence([
1100 ({'status': '200'}, payload.encode('utf-8')),
1101 ])
1102 credentials = credentials_from_code(self.client_id, self.client_secret,
1103 self.scope, self.code, redirect_uri=self.redirect_uri,
1104 http=http)
1105 self.assertEqual(credentials.access_token, token)
1106 self.assertNotEqual(None, credentials.token_expiry)
1107
1108 def test_exchange_code_for_token_fail(self):
1109 http = HttpMockSequence([
1110 ({'status': '400'}, b'{"error":"invalid_request"}'),
1111 ])
1112
1113 try:
1114 credentials = credentials_from_code(self.client_id, self.client_secret,
1115 self.scope, self.code, redirect_uri=self.redirect_uri,
1116 http=http)
1117 self.fail('should raise exception if exchange doesn\'t get 200')
1118 except FlowExchangeError:
1119 pass
1120
1121 def test_exchange_code_and_file_for_token(self):
1122 http = HttpMockSequence([
1123 ({'status': '200'},
1124 b"""{ "access_token":"asdfghjkl",
1125 "expires_in":3600 }"""),
1126 ])
1127 credentials = credentials_from_clientsecrets_and_code(
1128 datafile('client_secrets.json'), self.scope,
1129 self.code, http=http)
1130 self.assertEqual(credentials.access_token, 'asdfghjkl')
1131 self.assertNotEqual(None, credentials.token_expiry)
1132
1133 def test_exchange_code_and_cached_file_for_token(self):
1134 http = HttpMockSequence([
1135 ({'status': '200'}, b'{ "access_token":"asdfghjkl"}'),
1136 ])
1137 cache_mock = CacheMock()
1138 load_and_cache('client_secrets.json', 'some_secrets', cache_mock)
1139
1140 credentials = credentials_from_clientsecrets_and_code(
1141 'some_secrets', self.scope,
1142 self.code, http=http, cache=cache_mock)
1143 self.assertEqual(credentials.access_token, 'asdfghjkl')
1144
1145 def test_exchange_code_and_file_for_token_fail(self):
1146 http = HttpMockSequence([
1147 ({'status': '400'}, b'{"error":"invalid_request"}'),
1148 ])
1149
1150 try:
1151 credentials = credentials_from_clientsecrets_and_code(
1152 datafile('client_secrets.json'), self.scope,
1153 self.code, http=http)
1154 self.fail('should raise exception if exchange doesn\'t get 200')
1155 except FlowExchangeError:
1156 pass
1157
1158
1159 class MemoryCacheTests(unittest.TestCase):
1160
1161 def test_get_set_delete(self):
1162 m = MemoryCache()
1163 self.assertEqual(None, m.get('foo'))
1164 self.assertEqual(None, m.delete('foo'))
1165 m.set('foo', 'bar')
1166 self.assertEqual('bar', m.get('foo'))
1167 m.delete('foo')
1168 self.assertEqual(None, m.get('foo'))
1169
1170
1171 class Test__save_private_file(unittest.TestCase):
1172
1173 def _save_helper(self, filename):
1174 contents = []
1175 contents_str = '[]'
1176 client._save_private_file(filename, contents)
1177 with open(filename, 'r') as f:
1178 stored_contents = f.read()
1179 self.assertEqual(stored_contents, contents_str)
1180
1181 stat_mode = os.stat(filename).st_mode
1182 # Octal 777, only last 3 positions matter for permissions mask.
1183 stat_mode &= 0o777
1184 self.assertEqual(stat_mode, 0o600)
1185
1186 def test_new(self):
1187 import tempfile
1188 filename = tempfile.mktemp()
1189 self.assertFalse(os.path.exists(filename))
1190 self._save_helper(filename)
1191
1192 def test_existing(self):
1193 import tempfile
1194 filename = tempfile.mktemp()
1195 with open(filename, 'w') as f:
1196 f.write('a bunch of nonsense longer than []')
1197 self.assertTrue(os.path.exists(filename))
1198 self._save_helper(filename)
1199
1200
1201 if __name__ == '__main__':
1202 unittest.main()
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698