Index: tools/telemetry/third_party/gsutilz/third_party/gcs-oauth2-boto-plugin/gcs_oauth2_boto_plugin/test_oauth2_client.py |
diff --git a/tools/telemetry/third_party/gsutilz/third_party/gcs-oauth2-boto-plugin/gcs_oauth2_boto_plugin/test_oauth2_client.py b/tools/telemetry/third_party/gsutilz/third_party/gcs-oauth2-boto-plugin/gcs_oauth2_boto_plugin/test_oauth2_client.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..3fe5ce0a8f44532477e3786eafe2d8495a5dde7d |
--- /dev/null |
+++ b/tools/telemetry/third_party/gsutilz/third_party/gcs-oauth2-boto-plugin/gcs_oauth2_boto_plugin/test_oauth2_client.py |
@@ -0,0 +1,385 @@ |
+# Copyright 2014 Google Inc. All Rights Reserved. |
+# |
+# Licensed under the Apache License, Version 2.0 (the "License"); |
+# you may not use this file except in compliance with the License. |
+# You may obtain a copy of the License at |
+# |
+# http://www.apache.org/licenses/LICENSE-2.0 |
+# |
+# Unless required by applicable law or agreed to in writing, software |
+# distributed under the License is distributed on an "AS IS" BASIS, |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
+# See the License for the specific language governing permissions and |
+# limitations under the License. |
+ |
+"""Unit tests for oauth2_client and related classes.""" |
+ |
+from __future__ import absolute_import |
+ |
+import datetime |
+import httplib2 |
+import logging |
+import mox |
+import os |
+import stat |
+import sys |
+import unittest |
+ |
+from freezegun import freeze_time |
+ |
+from gcs_oauth2_boto_plugin import oauth2_client |
+ |
+LOG = logging.getLogger('test_oauth2_client') |
+ |
+ACCESS_TOKEN = 'abc123' |
+TOKEN_URI = 'https://provider.example.com/oauth/provider?mode=token' |
+AUTH_URI = 'https://provider.example.com/oauth/provider?mode=authorize' |
+DEFAULT_CA_CERTS_FILE = os.path.abspath( |
+ os.path.join('gslib', 'data', 'cacerts.txt')) |
+ |
+IS_WINDOWS = 'win32' in str(sys.platform).lower() |
+ |
+ |
+class MockDateTime(object): |
+ def __init__(self): |
+ self.mock_now = None |
+ |
+ def utcnow(self): # pylint: disable=invalid-name |
+ return self.mock_now |
+ |
+ |
+class MockOAuth2ServiceAccountClient(oauth2_client.OAuth2ServiceAccountClient): |
+ """Mock service account client for testing OAuth2 with service accounts.""" |
+ |
+ def __init__(self, client_id, private_key, password, auth_uri, token_uri, |
+ datetime_strategy): |
+ super(MockOAuth2ServiceAccountClient, self).__init__( |
+ client_id, private_key, password, auth_uri=auth_uri, |
+ token_uri=token_uri, datetime_strategy=datetime_strategy, |
+ ca_certs_file=DEFAULT_CA_CERTS_FILE) |
+ self.Reset() |
+ |
+ def Reset(self): |
+ self.fetched_token = False |
+ |
+ def FetchAccessToken(self): |
+ self.fetched_token = True |
+ return oauth2_client.AccessToken( |
+ ACCESS_TOKEN, |
+ GetExpiry(self.datetime_strategy, 3600), |
+ datetime_strategy=self.datetime_strategy) |
+ |
+ |
+class MockOAuth2UserAccountClient(oauth2_client.OAuth2UserAccountClient): |
+ """Mock user account client for testing OAuth2 with user accounts.""" |
+ |
+ def __init__(self, token_uri, client_id, client_secret, refresh_token, |
+ auth_uri, datetime_strategy): |
+ super(MockOAuth2UserAccountClient, self).__init__( |
+ token_uri, client_id, client_secret, refresh_token, auth_uri=auth_uri, |
+ datetime_strategy=datetime_strategy, |
+ ca_certs_file=DEFAULT_CA_CERTS_FILE) |
+ self.Reset() |
+ |
+ def Reset(self): |
+ self.fetched_token = False |
+ |
+ def FetchAccessToken(self): |
+ self.fetched_token = True |
+ return oauth2_client.AccessToken( |
+ ACCESS_TOKEN, |
+ GetExpiry(self.datetime_strategy, 3600), |
+ datetime_strategy=self.datetime_strategy) |
+ |
+ |
+def GetExpiry(datetime_strategy, length_in_seconds): |
+ token_expiry = (datetime_strategy.utcnow() |
+ + datetime.timedelta(seconds=length_in_seconds)) |
+ return token_expiry |
+ |
+ |
+def CreateMockUserAccountClient(mock_datetime): |
+ return MockOAuth2UserAccountClient( |
+ TOKEN_URI, 'clid', 'clsecret', 'ref_token_abc123', AUTH_URI, |
+ mock_datetime) |
+ |
+ |
+def CreateMockServiceAccountClient(mock_datetime): |
+ return MockOAuth2ServiceAccountClient( |
+ 'clid', 'private_key', 'password', AUTH_URI, TOKEN_URI, mock_datetime) |
+ |
+ |
+class OAuth2AccountClientTest(unittest.TestCase): |
+ """Unit tests for OAuth2UserAccountClient and OAuth2ServiceAccountClient.""" |
+ |
+ def setUp(self): |
+ self.tempdirs = [] |
+ self.mock_datetime = MockDateTime() |
+ self.start_time = datetime.datetime(2011, 3, 1, 11, 25, 13, 300826) |
+ self.mock_datetime.mock_now = self.start_time |
+ |
+ def testGetAccessTokenUserAccount(self): |
+ self.client = CreateMockUserAccountClient(self.mock_datetime) |
+ self._RunGetAccessTokenTest() |
+ |
+ def testGetAccessTokenServiceAccount(self): |
+ self.client = CreateMockServiceAccountClient(self.mock_datetime) |
+ self._RunGetAccessTokenTest() |
+ |
+ def _RunGetAccessTokenTest(self): |
+ """Tests access token gets with self.client.""" |
+ access_token_1 = 'abc123' |
+ |
+ self.assertFalse(self.client.fetched_token) |
+ token_1 = self.client.GetAccessToken() |
+ |
+ # There's no access token in the cache; verify that we fetched a fresh |
+ # token. |
+ self.assertTrue(self.client.fetched_token) |
+ self.assertEquals(access_token_1, token_1.token) |
+ self.assertEquals(self.start_time + datetime.timedelta(minutes=60), |
+ token_1.expiry) |
+ |
+ # Advance time by less than expiry time, and fetch another token. |
+ self.client.Reset() |
+ self.mock_datetime.mock_now = ( |
+ self.start_time + datetime.timedelta(minutes=55)) |
+ token_2 = self.client.GetAccessToken() |
+ |
+ # Since the access token wasn't expired, we get the cache token, and there |
+ # was no refresh request. |
+ self.assertEquals(token_1, token_2) |
+ self.assertEquals(access_token_1, token_2.token) |
+ self.assertFalse(self.client.fetched_token) |
+ |
+ # Advance time past expiry time, and fetch another token. |
+ self.client.Reset() |
+ self.mock_datetime.mock_now = ( |
+ self.start_time + datetime.timedelta(minutes=55, seconds=1)) |
+ self.client.datetime_strategy = self.mock_datetime |
+ token_3 = self.client.GetAccessToken() |
+ |
+ # This should have resulted in a refresh request and a fresh access token. |
+ self.assertTrue(self.client.fetched_token) |
+ self.assertEquals( |
+ self.mock_datetime.mock_now + datetime.timedelta(minutes=60), |
+ token_3.expiry) |
+ |
+ |
+class AccessTokenTest(unittest.TestCase): |
+ """Unit tests for access token functions.""" |
+ |
+ def testShouldRefresh(self): |
+ """Tests that token.ShouldRefresh returns the correct value.""" |
+ mock_datetime = MockDateTime() |
+ start = datetime.datetime(2011, 3, 1, 11, 25, 13, 300826) |
+ expiry = start + datetime.timedelta(minutes=60) |
+ token = oauth2_client.AccessToken( |
+ 'foo', expiry, datetime_strategy=mock_datetime) |
+ |
+ mock_datetime.mock_now = start |
+ self.assertFalse(token.ShouldRefresh()) |
+ |
+ mock_datetime.mock_now = start + datetime.timedelta(minutes=54) |
+ self.assertFalse(token.ShouldRefresh()) |
+ |
+ mock_datetime.mock_now = start + datetime.timedelta(minutes=55) |
+ self.assertFalse(token.ShouldRefresh()) |
+ |
+ mock_datetime.mock_now = start + datetime.timedelta( |
+ minutes=55, seconds=1) |
+ self.assertTrue(token.ShouldRefresh()) |
+ |
+ mock_datetime.mock_now = start + datetime.timedelta( |
+ minutes=61) |
+ self.assertTrue(token.ShouldRefresh()) |
+ |
+ mock_datetime.mock_now = start + datetime.timedelta(minutes=58) |
+ self.assertFalse(token.ShouldRefresh(time_delta=120)) |
+ |
+ mock_datetime.mock_now = start + datetime.timedelta( |
+ minutes=58, seconds=1) |
+ self.assertTrue(token.ShouldRefresh(time_delta=120)) |
+ |
+ def testShouldRefreshNoExpiry(self): |
+ """Tests token.ShouldRefresh with no expiry time.""" |
+ mock_datetime = MockDateTime() |
+ start = datetime.datetime(2011, 3, 1, 11, 25, 13, 300826) |
+ token = oauth2_client.AccessToken( |
+ 'foo', None, datetime_strategy=mock_datetime) |
+ |
+ mock_datetime.mock_now = start |
+ self.assertFalse(token.ShouldRefresh()) |
+ |
+ mock_datetime.mock_now = start + datetime.timedelta( |
+ minutes=472) |
+ self.assertFalse(token.ShouldRefresh()) |
+ |
+ def testSerialization(self): |
+ """Tests token serialization.""" |
+ expiry = datetime.datetime(2011, 3, 1, 11, 25, 13, 300826) |
+ token = oauth2_client.AccessToken('foo', expiry) |
+ serialized_token = token.Serialize() |
+ LOG.debug('testSerialization: serialized_token=%s', serialized_token) |
+ |
+ token2 = oauth2_client.AccessToken.UnSerialize(serialized_token) |
+ self.assertEquals(token, token2) |
+ |
+ |
+class FileSystemTokenCacheTest(unittest.TestCase): |
+ """Unit tests for FileSystemTokenCache.""" |
+ |
+ def setUp(self): |
+ self.cache = oauth2_client.FileSystemTokenCache() |
+ self.start_time = datetime.datetime(2011, 3, 1, 10, 25, 13, 300826) |
+ self.token_1 = oauth2_client.AccessToken('token1', self.start_time) |
+ self.token_2 = oauth2_client.AccessToken( |
+ 'token2', self.start_time + datetime.timedelta(seconds=492)) |
+ self.key = 'token1key' |
+ |
+ def tearDown(self): |
+ try: |
+ os.unlink(self.cache.CacheFileName(self.key)) |
+ except: # pylint: disable=bare-except |
+ pass |
+ |
+ def testPut(self): |
+ self.cache.PutToken(self.key, self.token_1) |
+ # Assert that the cache file exists and has correct permissions. |
+ if not IS_WINDOWS: |
+ self.assertEquals( |
+ 0600, |
+ stat.S_IMODE(os.stat(self.cache.CacheFileName(self.key)).st_mode)) |
+ |
+ def testPutGet(self): |
+ """Tests putting and getting various tokens.""" |
+ # No cache file present. |
+ self.assertEquals(None, self.cache.GetToken(self.key)) |
+ |
+ # Put a token |
+ self.cache.PutToken(self.key, self.token_1) |
+ cached_token = self.cache.GetToken(self.key) |
+ self.assertEquals(self.token_1, cached_token) |
+ |
+ # Put a different token |
+ self.cache.PutToken(self.key, self.token_2) |
+ cached_token = self.cache.GetToken(self.key) |
+ self.assertEquals(self.token_2, cached_token) |
+ |
+ def testGetBadFile(self): |
+ f = open(self.cache.CacheFileName(self.key), 'w') |
+ f.write('blah') |
+ f.close() |
+ self.assertEquals(None, self.cache.GetToken(self.key)) |
+ |
+ def testCacheFileName(self): |
+ """Tests configuring the cache with a specific file name.""" |
+ cache = oauth2_client.FileSystemTokenCache( |
+ path_pattern='/var/run/ccache/token.%(uid)s.%(key)s') |
+ if IS_WINDOWS: |
+ uid = '_' |
+ else: |
+ uid = os.getuid() |
+ self.assertEquals('/var/run/ccache/token.%s.abc123' % uid, |
+ cache.CacheFileName('abc123')) |
+ |
+ cache = oauth2_client.FileSystemTokenCache( |
+ path_pattern='/var/run/ccache/token.%(key)s') |
+ self.assertEquals('/var/run/ccache/token.abc123', |
+ cache.CacheFileName('abc123')) |
+ |
+ |
+class RefreshTokenTest(unittest.TestCase): |
+ """Unit tests for refresh tokens.""" |
+ |
+ def setUp(self): |
+ self.mock_datetime = MockDateTime() |
+ self.start_time = datetime.datetime(2011, 3, 1, 10, 25, 13, 300826) |
+ self.mock_datetime.mock_now = self.start_time |
+ self.client = CreateMockUserAccountClient(self.mock_datetime) |
+ |
+ def testUniqeId(self): |
+ cred_id = self.client.CacheKey() |
+ self.assertEquals('0720afed6871f12761fbea3271f451e6ba184bf5', cred_id) |
+ |
+ def testGetAuthorizationHeader(self): |
+ self.assertEquals('Bearer %s' % ACCESS_TOKEN, |
+ self.client.GetAuthorizationHeader()) |
+ |
+ |
+class FakeResponse: |
+ def __init__(self, status): |
+ self._status = status |
+ |
+ @property |
+ def status(self): |
+ return self._status |
+ |
+ |
+class OAuth2GCEClientTest(unittest.TestCase): |
+ """Unit tests for OAuth2GCEClient.""" |
+ |
+ def setUp(self): |
+ self.mox = mox.Mox() |
+ self.mox.StubOutClassWithMocks(httplib2, 'Http') |
+ self.mock_http = httplib2.Http() |
+ |
+ def tearDown(self): |
+ self.mox.UnsetStubs() |
+ |
+ @freeze_time('2014-03-26 01:01:01') |
+ def testFetchAccessToken(self): |
+ token = 'my_token' |
+ |
+ self.mock_http.request( |
+ oauth2_client.META_TOKEN_URI, |
+ method='GET', |
+ body=None, |
+ headers=oauth2_client.META_HEADERS).AndReturn(( |
+ FakeResponse(200), |
+ '{"access_token":"%(TOKEN)s",' |
+ '"expires_in": %(EXPIRES_IN)d}' % { |
+ 'TOKEN': token, |
+ 'EXPIRES_IN': 42 |
+ })) |
+ |
+ self.mox.ReplayAll() |
+ |
+ client = oauth2_client.OAuth2GCEClient() |
+ |
+ self.assertEqual( |
+ str(client.FetchAccessToken()), |
+ 'AccessToken(token=%s, expiry=2014-03-26 01:01:43Z)' % token) |
+ |
+ self.mox.VerifyAll() |
+ |
+ def testIsGCENotFound(self): |
+ self.mock_http.request(oauth2_client.METADATA_SERVER).AndReturn(( |
+ FakeResponse(404), '')) |
+ |
+ self.mox.ReplayAll() |
+ self.assertFalse(oauth2_client._IsGCE()) |
+ |
+ self.mox.VerifyAll() |
+ |
+ def testIsGCEServerNotFound(self): |
+ self.mock_http.request(oauth2_client.METADATA_SERVER).AndRaise( |
+ httplib2.ServerNotFoundError) |
+ |
+ self.mox.ReplayAll() |
+ self.assertFalse(oauth2_client._IsGCE()) |
+ |
+ self.mox.VerifyAll() |
+ |
+ def testIsGCETrue(self): |
+ self.mock_http.request(oauth2_client.METADATA_SERVER).AndReturn(( |
+ FakeResponse(200), '')) |
+ |
+ self.mox.ReplayAll() |
+ self.assertTrue(oauth2_client._IsGCE()) |
+ |
+ self.mox.VerifyAll() |
+ |
+ |
+if __name__ == '__main__': |
+ unittest.main() |