Index: tools/telemetry/third_party/gsutilz/third_party/oauth2client/tests/test_service_account.py |
diff --git a/tools/telemetry/third_party/gsutilz/third_party/oauth2client/tests/test_service_account.py b/tools/telemetry/third_party/gsutilz/third_party/oauth2client/tests/test_service_account.py |
deleted file mode 100755 |
index 5d1a125b22dcfb3c21c1857c2dde24041ccf10a8..0000000000000000000000000000000000000000 |
--- a/tools/telemetry/third_party/gsutilz/third_party/oauth2client/tests/test_service_account.py |
+++ /dev/null |
@@ -1,124 +0,0 @@ |
-#!/usr/bin/python2.4 |
-# |
-# Copyright 2014 Google Inc. All rights reserved. |
-# |
-# Licensed under the Apache License, Version 2.0 (the "License"); |
-# you may not use this file except in compliance with the License. |
-# You may obtain a copy of the License at |
-# |
-# http://www.apache.org/licenses/LICENSE-2.0 |
-# |
-# Unless required by applicable law or agreed to in writing, software |
-# distributed under the License is distributed on an "AS IS" BASIS, |
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
-# See the License for the specific language governing permissions and |
-# limitations under the License. |
- |
- |
-"""Oauth2client tests. |
- |
-Unit tests for service account credentials implemented using RSA. |
-""" |
- |
-import json |
-import os |
-import rsa |
-import time |
-import unittest |
- |
-from .http_mock import HttpMockSequence |
-from oauth2client.service_account import _ServiceAccountCredentials |
- |
- |
-def datafile(filename): |
- # TODO(orestica): Refactor this using pkgutil.get_data |
- f = open(os.path.join(os.path.dirname(__file__), 'data', filename), 'rb') |
- data = f.read() |
- f.close() |
- return data |
- |
- |
-class ServiceAccountCredentialsTests(unittest.TestCase): |
- def setUp(self): |
- self.service_account_id = '123' |
- self.service_account_email = 'dummy@google.com' |
- self.private_key_id = 'ABCDEF' |
- self.private_key = datafile('pem_from_pkcs12.pem') |
- self.scopes = ['dummy_scope'] |
- self.credentials = _ServiceAccountCredentials(self.service_account_id, |
- self.service_account_email, |
- self.private_key_id, |
- self.private_key, |
- []) |
- |
- def test_sign_blob(self): |
- private_key_id, signature = self.credentials.sign_blob('Google') |
- self.assertEqual( self.private_key_id, private_key_id) |
- |
- pub_key = rsa.PublicKey.load_pkcs1_openssl_pem( |
- datafile('publickey_openssl.pem')) |
- |
- self.assertTrue(rsa.pkcs1.verify(b'Google', signature, pub_key)) |
- |
- try: |
- rsa.pkcs1.verify(b'Orest', signature, pub_key) |
- self.fail('Verification should have failed!') |
- except rsa.pkcs1.VerificationError: |
- pass # Expected |
- |
- try: |
- rsa.pkcs1.verify(b'Google', b'bad signature', pub_key) |
- self.fail('Verification should have failed!') |
- except rsa.pkcs1.VerificationError: |
- pass # Expected |
- |
- def test_service_account_email(self): |
- self.assertEqual(self.service_account_email, |
- self.credentials.service_account_email) |
- |
- def test_create_scoped_required_without_scopes(self): |
- self.assertTrue(self.credentials.create_scoped_required()) |
- |
- def test_create_scoped_required_with_scopes(self): |
- self.credentials = _ServiceAccountCredentials(self.service_account_id, |
- self.service_account_email, |
- self.private_key_id, |
- self.private_key, |
- self.scopes) |
- self.assertFalse(self.credentials.create_scoped_required()) |
- |
- def test_create_scoped(self): |
- new_credentials = self.credentials.create_scoped(self.scopes) |
- self.assertNotEqual(self.credentials, new_credentials) |
- self.assertTrue(isinstance(new_credentials, _ServiceAccountCredentials)) |
- self.assertEqual('dummy_scope', new_credentials._scopes) |
- |
- def test_access_token(self): |
- S = 2 # number of seconds in which the token expires |
- token_response_first = {'access_token': 'first_token', 'expires_in': S} |
- token_response_second = {'access_token': 'second_token', 'expires_in': S} |
- http = HttpMockSequence([ |
- ({'status': '200'}, json.dumps(token_response_first).encode('utf-8')), |
- ({'status': '200'}, json.dumps(token_response_second).encode('utf-8')), |
- ]) |
- |
- token = self.credentials.get_access_token(http=http) |
- self.assertEqual('first_token', token.access_token) |
- self.assertEqual(S - 1, token.expires_in) |
- self.assertFalse(self.credentials.access_token_expired) |
- self.assertEqual(token_response_first, self.credentials.token_response) |
- |
- token = self.credentials.get_access_token(http=http) |
- self.assertEqual('first_token', token.access_token) |
- self.assertEqual(S - 1, token.expires_in) |
- self.assertFalse(self.credentials.access_token_expired) |
- self.assertEqual(token_response_first, self.credentials.token_response) |
- |
- time.sleep(S + 0.5) # some margin to avoid flakiness |
- self.assertTrue(self.credentials.access_token_expired) |
- |
- token = self.credentials.get_access_token(http=http) |
- self.assertEqual('second_token', token.access_token) |
- self.assertEqual(S - 1, token.expires_in) |
- self.assertFalse(self.credentials.access_token_expired) |
- self.assertEqual(token_response_second, self.credentials.token_response) |