Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(230)

Unified Diff: gslib/tests/test_oauth2_client.py

Issue 698893003: Update checked in version of gsutil to version 4.6 (Closed) Base URL: http://dart.googlecode.com/svn/third_party/gsutil/
Patch Set: Created 6 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « gslib/tests/test_notification.py ('k') | gslib/tests/test_parallel_cp.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: gslib/tests/test_oauth2_client.py
===================================================================
--- gslib/tests/test_oauth2_client.py (revision 33376)
+++ gslib/tests/test_oauth2_client.py (working copy)
@@ -1,273 +0,0 @@
-import datetime
-import logging
-import os
-import stat
-
-import gslib.tests.testcase as testcase
-from gslib.tests.util import unittest
-import gslib.util
-import gslib.third_party.oauth2_plugin.oauth2_client as oauth2_client
-
-LOG = logging.getLogger('test_oauth2_client')
-
-ACCESS_TOKEN = 'abc123'
-TOKEN_URI = 'https://provider.example.com/oauth/provider?mode=token'
-AUTH_URI = 'https://provider.example.com/oauth/provider?mode=authorize'
-DEFAULT_CA_CERTS_FILE = os.path.abspath(
- os.path.join('gslib', 'data', 'cacerts.txt'))
-
-class MockDateTime:
- def __init__(self):
- self.mock_now = None
-
- def utcnow(self):
- return self.mock_now
-
-class MockOAuth2ServiceAccountClient(oauth2_client.OAuth2ServiceAccountClient):
- def __init__(self, client_id, private_key, password, auth_uri, token_uri,
- datetime_strategy):
- super(MockOAuth2ServiceAccountClient, self).__init__(
- client_id, private_key, password, auth_uri=auth_uri,
- token_uri=token_uri, datetime_strategy=datetime_strategy,
- ca_certs_file=DEFAULT_CA_CERTS_FILE)
- self.Reset()
-
- def Reset(self):
- self.fetched_token = False
-
- def FetchAccessToken(self):
- self.fetched_token = True
- return oauth2_client.AccessToken(
- ACCESS_TOKEN,
- GetExpiry(self.datetime_strategy, 3600),
- datetime_strategy=self.datetime_strategy)
-
-
-class MockOAuth2UserAccountClient(oauth2_client.OAuth2UserAccountClient):
- def __init__(self, token_uri, client_id, client_secret, refresh_token,
- auth_uri, datetime_strategy):
- super(MockOAuth2UserAccountClient, self).__init__(
- token_uri, client_id, client_secret, refresh_token, auth_uri=auth_uri,
- datetime_strategy=datetime_strategy,
- ca_certs_file=DEFAULT_CA_CERTS_FILE)
- self.Reset()
-
- def Reset(self):
- self.fetched_token = False
-
- def FetchAccessToken(self):
- self.fetched_token = True
- return oauth2_client.AccessToken(
- ACCESS_TOKEN,
- GetExpiry(self.datetime_strategy, 3600),
- datetime_strategy=self.datetime_strategy)
-
-def GetExpiry(datetime_strategy, lengthInSeconds):
- token_expiry = (datetime_strategy.utcnow()
- + datetime.timedelta(seconds=lengthInSeconds))
- return token_expiry
-
-def CreateMockUserAccountClient(start_time, mock_datetime):
- return MockOAuth2UserAccountClient(
- TOKEN_URI, 'clid', 'clsecret', 'ref_token_abc123', AUTH_URI,
- mock_datetime)
-
-def CreateMockServiceAccountClient(start_time, mock_datetime):
- return MockOAuth2ServiceAccountClient(
- 'clid', 'private_key', 'password', AUTH_URI, TOKEN_URI,
- mock_datetime)
-
-
-class OAuth2UserAccountClientTest(testcase.GsUtilUnitTestCase):
-
- def setUp(self):
- self.tempdirs = []
- self.mock_datetime = MockDateTime()
- self.start_time = datetime.datetime(2011, 3, 1, 10, 25, 13, 300826)
- self.mock_datetime.mock_now = self.start_time
-
-
- def testGetAccessTokenUserAccount(self):
- self.client = CreateMockUserAccountClient(self.start_time,
- self.mock_datetime)
- self._RunGetAccessTokenTest()
-
-
- def testGetAccessTokenServiceAccount(self):
- self.client = CreateMockServiceAccountClient(self.start_time,
- self.mock_datetime)
- self._RunGetAccessTokenTest()
-
-
- def _RunGetAccessTokenTest(self):
- refresh_token = 'ref_token'
- access_token_1 = 'abc123'
-
- self.assertFalse(self.client.fetched_token)
- token_1 = self.client.GetAccessToken()
-
- # There's no access token in the cache; verify that we fetched a fresh
- # token.
- self.assertTrue(self.client.fetched_token)
- self.assertEquals(access_token_1, token_1.token)
- self.assertEquals(self.start_time + datetime.timedelta(minutes=60),
- token_1.expiry)
-
- # Advance time by less than expiry time, and fetch another token.
- self.client.Reset()
- self.mock_datetime.mock_now = (
- self.start_time + datetime.timedelta(minutes=55))
- token_2 = self.client.GetAccessToken()
-
- # Since the access token wasn't expired, we get the cache token, and there
- # was no refresh request.
- self.assertEquals(token_1, token_2)
- self.assertEquals(access_token_1, token_2.token)
- self.assertFalse(self.client.fetched_token)
-
- # Advance time past expiry time, and fetch another token.
- self.client.Reset()
- self.mock_datetime.mock_now = (
- self.start_time + datetime.timedelta(minutes=55, seconds=1))
- self.client.datetime_strategy = self.mock_datetime
- access_token_2 = 'zyx456'
- token_3 = self.client.GetAccessToken()
-
- # This should have resulted in a refresh request and a fresh access token.
- self.assertTrue(self.client.fetched_token)
- self.assertEquals(
- self.mock_datetime.mock_now + datetime.timedelta(minutes=60),
- token_3.expiry)
-
-
-class AccessTokenTest(unittest.TestCase):
-
- def testShouldRefresh(self):
- mock_datetime = MockDateTime()
- start = datetime.datetime(2011, 3, 1, 11, 25, 13, 300826)
- expiry = start + datetime.timedelta(minutes=60)
- token = oauth2_client.AccessToken(
- 'foo', expiry, datetime_strategy=mock_datetime)
-
- mock_datetime.mock_now = start
- self.assertFalse(token.ShouldRefresh())
-
- mock_datetime.mock_now = start + datetime.timedelta(minutes=54)
- self.assertFalse(token.ShouldRefresh())
-
- mock_datetime.mock_now = start + datetime.timedelta(minutes=55)
- self.assertFalse(token.ShouldRefresh())
-
- mock_datetime.mock_now = start + datetime.timedelta(
- minutes=55, seconds=1)
- self.assertTrue(token.ShouldRefresh())
-
- mock_datetime.mock_now = start + datetime.timedelta(
- minutes=61)
- self.assertTrue(token.ShouldRefresh())
-
- mock_datetime.mock_now = start + datetime.timedelta(minutes=58)
- self.assertFalse(token.ShouldRefresh(time_delta=120))
-
- mock_datetime.mock_now = start + datetime.timedelta(
- minutes=58, seconds=1)
- self.assertTrue(token.ShouldRefresh(time_delta=120))
-
- def testShouldRefreshNoExpiry(self):
- mock_datetime = MockDateTime()
- start = datetime.datetime(2011, 3, 1, 11, 25, 13, 300826)
- token = oauth2_client.AccessToken(
- 'foo', None, datetime_strategy=mock_datetime)
-
- mock_datetime.mock_now = start
- self.assertFalse(token.ShouldRefresh())
-
- mock_datetime.mock_now = start + datetime.timedelta(
- minutes=472)
- self.assertFalse(token.ShouldRefresh())
-
- def testSerialization(self):
- expiry = datetime.datetime(2011, 3, 1, 11, 25, 13, 300826)
- token = oauth2_client.AccessToken('foo', expiry)
- serialized_token = token.Serialize()
- LOG.debug('testSerialization: serialized_token=%s' % serialized_token)
-
- token2 = oauth2_client.AccessToken.UnSerialize(serialized_token)
- self.assertEquals(token, token2)
-
-class FileSystemTokenCacheTest(unittest.TestCase):
-
- def setUp(self):
- self.cache = oauth2_client.FileSystemTokenCache()
- self.start_time = datetime.datetime(2011, 3, 1, 10, 25, 13, 300826)
- self.token_1 = oauth2_client.AccessToken('token1', self.start_time)
- self.token_2 = oauth2_client.AccessToken(
- 'token2', self.start_time + datetime.timedelta(seconds=492))
- self.key = 'token1key'
-
- def tearDown(self):
- try:
- os.unlink(self.cache.CacheFileName(self.key))
- except:
- pass
-
- def testPut(self):
- self.cache.PutToken(self.key, self.token_1)
- # Assert that the cache file exists and has correct permissions.
- if not gslib.util.IS_WINDOWS:
- self.assertEquals(
- 0600,
- stat.S_IMODE(os.stat(self.cache.CacheFileName(self.key)).st_mode))
-
- def testPutGet(self):
- # No cache file present.
- self.assertEquals(None, self.cache.GetToken(self.key))
-
- # Put a token
- self.cache.PutToken(self.key, self.token_1)
- cached_token = self.cache.GetToken(self.key)
- self.assertEquals(self.token_1, cached_token)
-
- # Put a different token
- self.cache.PutToken(self.key, self.token_2)
- cached_token = self.cache.GetToken(self.key)
- self.assertEquals(self.token_2, cached_token)
-
- def testGetBadFile(self):
- f = open(self.cache.CacheFileName(self.key), 'w')
- f.write('blah')
- f.close()
- self.assertEquals(None, self.cache.GetToken(self.key))
-
- def testCacheFileName(self):
- cache = oauth2_client.FileSystemTokenCache(
- path_pattern='/var/run/ccache/token.%(uid)s.%(key)s')
- if gslib.util.IS_WINDOWS:
- uid = '_'
- else:
- uid = os.getuid()
- self.assertEquals('/var/run/ccache/token.%s.abc123' % uid,
- cache.CacheFileName('abc123'))
-
- cache = oauth2_client.FileSystemTokenCache(
- path_pattern='/var/run/ccache/token.%(key)s')
- self.assertEquals('/var/run/ccache/token.abc123',
- cache.CacheFileName('abc123'))
-
-
-class RefreshTokenTest(unittest.TestCase):
- def setUp(self):
- self.mock_datetime = MockDateTime()
- self.start_time = datetime.datetime(2011, 3, 1, 10, 25, 13, 300826)
- self.mock_datetime.mock_now = self.start_time
- self.client = CreateMockUserAccountClient(self.start_time,
- self.mock_datetime)
-
-
- def testUniqeId(self):
- cred_id = self.client.CacheKey()
- self.assertEquals('0720afed6871f12761fbea3271f451e6ba184bf5', cred_id)
-
- def testGetAuthorizationHeader(self):
- self.assertEquals('Bearer %s' % ACCESS_TOKEN,
- self.client.GetAuthorizationHeader())
« no previous file with comments | « gslib/tests/test_notification.py ('k') | gslib/tests/test_parallel_cp.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698