Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(287)

Unified Diff: third_party/boto/tests/unit/provider/test_provider.py

Issue 698893003: Update checked in version of gsutil to version 4.6 (Closed) Base URL: http://dart.googlecode.com/svn/third_party/gsutil/
Patch Set: Created 6 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/boto/tests/unit/provider/test_provider.py
===================================================================
--- third_party/boto/tests/unit/provider/test_provider.py (revision 33376)
+++ third_party/boto/tests/unit/provider/test_provider.py (working copy)
@@ -3,8 +3,10 @@
from tests.unit import unittest
import mock
+import os
from boto import provider
+from boto.compat import expanduser
INSTANCE_CONFIG = {
@@ -24,17 +26,25 @@
def setUp(self):
self.environ = {}
self.config = {}
+ self.shared_config = {}
self.metadata_patch = mock.patch('boto.utils.get_instance_metadata')
self.config_patch = mock.patch('boto.provider.config.get',
self.get_config)
self.has_config_patch = mock.patch('boto.provider.config.has_option',
self.has_config)
+ self.config_object_patch = mock.patch.object(
+ provider.Config, 'get', self.get_shared_config)
+ self.has_config_object_patch = mock.patch.object(
+ provider.Config, 'has_option', self.has_shared_config)
self.environ_patch = mock.patch('os.environ', self.environ)
self.get_instance_metadata = self.metadata_patch.start()
+ self.get_instance_metadata.return_value = None
self.config_patch.start()
self.has_config_patch.start()
+ self.config_object_patch.start()
+ self.has_config_object_patch.start()
self.environ_patch.start()
@@ -42,6 +52,8 @@
self.metadata_patch.stop()
self.config_patch.stop()
self.has_config_patch.stop()
+ self.config_object_patch.stop()
+ self.has_config_object_patch.stop()
self.environ_patch.stop()
def has_config(self, section_name, key):
@@ -57,6 +69,19 @@
except KeyError:
return None
+ def has_shared_config(self, section_name, key):
+ try:
+ self.shared_config[section_name][key]
+ return True
+ except KeyError:
+ return False
+
+ def get_shared_config(self, section_name, key):
+ try:
+ return self.shared_config[section_name][key]
+ except KeyError:
+ return None
+
def test_passed_in_values_are_used(self):
p = provider.Provider('aws', 'access_key', 'secret_key', 'security_token')
self.assertEqual(p.access_key, 'access_key')
@@ -80,6 +105,43 @@
self.assertEqual(p.secret_key, 'env_secret_key')
self.assertEqual(p.security_token, 'env_security_token')
+ def test_config_profile_values_are_used(self):
+ self.config = {
+ 'profile dev': {
+ 'aws_access_key_id': 'dev_access_key',
+ 'aws_secret_access_key': 'dev_secret_key',
+ }, 'profile prod': {
+ 'aws_access_key_id': 'prod_access_key',
+ 'aws_secret_access_key': 'prod_secret_key',
+ }, 'Credentials': {
+ 'aws_access_key_id': 'default_access_key',
+ 'aws_secret_access_key': 'default_secret_key'
+ }
+ }
+ p = provider.Provider('aws', profile_name='prod')
+ self.assertEqual(p.access_key, 'prod_access_key')
+ self.assertEqual(p.secret_key, 'prod_secret_key')
+ q = provider.Provider('aws', profile_name='dev')
+ self.assertEqual(q.access_key, 'dev_access_key')
+ self.assertEqual(q.secret_key, 'dev_secret_key')
+
+ def test_config_missing_profile(self):
+ # None of these default profiles should be loaded!
+ self.shared_config = {
+ 'default': {
+ 'aws_access_key_id': 'shared_access_key',
+ 'aws_secret_access_key': 'shared_secret_key',
+ }
+ }
+ self.config = {
+ 'Credentials': {
+ 'aws_access_key_id': 'default_access_key',
+ 'aws_secret_access_key': 'default_secret_key'
+ }
+ }
+ with self.assertRaises(provider.ProfileNotFoundError):
+ provider.Provider('aws', profile_name='doesntexist')
+
def test_config_values_are_used(self):
self.config = {
'Credentials': {
@@ -132,9 +194,36 @@
if not imported:
del sys.modules['keyring']
- def test_env_vars_beat_config_values(self):
+ def test_passed_in_values_beat_env_vars(self):
self.environ['AWS_ACCESS_KEY_ID'] = 'env_access_key'
self.environ['AWS_SECRET_ACCESS_KEY'] = 'env_secret_key'
+ self.environ['AWS_SECURITY_TOKEN'] = 'env_security_token'
+ p = provider.Provider('aws', 'access_key', 'secret_key')
+ self.assertEqual(p.access_key, 'access_key')
+ self.assertEqual(p.secret_key, 'secret_key')
+ self.assertEqual(p.security_token, None)
+
+ def test_env_vars_beat_shared_creds_values(self):
+ self.environ['AWS_ACCESS_KEY_ID'] = 'env_access_key'
+ self.environ['AWS_SECRET_ACCESS_KEY'] = 'env_secret_key'
+ self.shared_config = {
+ 'default': {
+ 'aws_access_key_id': 'shared_access_key',
+ 'aws_secret_access_key': 'shared_secret_key',
+ }
+ }
+ p = provider.Provider('aws')
+ self.assertEqual(p.access_key, 'env_access_key')
+ self.assertEqual(p.secret_key, 'env_secret_key')
+ self.assertIsNone(p.security_token)
+
+ def test_shared_creds_beat_config_values(self):
+ self.shared_config = {
+ 'default': {
+ 'aws_access_key_id': 'shared_access_key',
+ 'aws_secret_access_key': 'shared_secret_key',
+ }
+ }
self.config = {
'Credentials': {
'aws_access_key_id': 'cfg_access_key',
@@ -142,14 +231,70 @@
}
}
p = provider.Provider('aws')
- self.assertEqual(p.access_key, 'env_access_key')
- self.assertEqual(p.secret_key, 'env_secret_key')
+ self.assertEqual(p.access_key, 'shared_access_key')
+ self.assertEqual(p.secret_key, 'shared_secret_key')
self.assertIsNone(p.security_token)
+ def test_shared_creds_profile_beats_defaults(self):
+ self.shared_config = {
+ 'default': {
+ 'aws_access_key_id': 'shared_access_key',
+ 'aws_secret_access_key': 'shared_secret_key',
+ },
+ 'foo': {
+ 'aws_access_key_id': 'foo_access_key',
+ 'aws_secret_access_key': 'foo_secret_key',
+ }
+ }
+ p = provider.Provider('aws', profile_name='foo')
+ self.assertEqual(p.access_key, 'foo_access_key')
+ self.assertEqual(p.secret_key, 'foo_secret_key')
+ self.assertIsNone(p.security_token)
+
+ def test_env_profile_loads_profile(self):
+ self.environ['AWS_PROFILE'] = 'foo'
+ self.shared_config = {
+ 'default': {
+ 'aws_access_key_id': 'shared_access_key',
+ 'aws_secret_access_key': 'shared_secret_key',
+ },
+ 'foo': {
+ 'aws_access_key_id': 'shared_access_key_foo',
+ 'aws_secret_access_key': 'shared_secret_key_foo',
+ }
+ }
+ self.config = {
+ 'profile foo': {
+ 'aws_access_key_id': 'cfg_access_key_foo',
+ 'aws_secret_access_key': 'cfg_secret_key_foo',
+ },
+ 'Credentials': {
+ 'aws_access_key_id': 'cfg_access_key',
+ 'aws_secret_access_key': 'cfg_secret_key',
+ }
+ }
+ p = provider.Provider('aws')
+ self.assertEqual(p.access_key, 'shared_access_key_foo')
+ self.assertEqual(p.secret_key, 'shared_secret_key_foo')
+ self.assertIsNone(p.security_token)
+
+ self.shared_config = {}
+ p = provider.Provider('aws')
+ self.assertEqual(p.access_key, 'cfg_access_key_foo')
+ self.assertEqual(p.secret_key, 'cfg_secret_key_foo')
+ self.assertIsNone(p.security_token)
+
def test_env_vars_security_token_beats_config_values(self):
self.environ['AWS_ACCESS_KEY_ID'] = 'env_access_key'
self.environ['AWS_SECRET_ACCESS_KEY'] = 'env_secret_key'
self.environ['AWS_SECURITY_TOKEN'] = 'env_security_token'
+ self.shared_config = {
+ 'default': {
+ 'aws_access_key_id': 'shared_access_key',
+ 'aws_secret_access_key': 'shared_secret_key',
+ 'aws_security_token': 'shared_security_token',
+ }
+ }
self.config = {
'Credentials': {
'aws_access_key_id': 'cfg_access_key',
@@ -162,6 +307,14 @@
self.assertEqual(p.secret_key, 'env_secret_key')
self.assertEqual(p.security_token, 'env_security_token')
+ self.environ.clear()
+ p = provider.Provider('aws')
+ self.assertEqual(p.security_token, 'shared_security_token')
+
+ self.shared_config.clear()
+ p = provider.Provider('aws')
+ self.assertEqual(p.security_token, 'cfg_security_token')
+
def test_metadata_server_credentials(self):
self.get_instance_metadata.return_value = INSTANCE_CONFIG
p = provider.Provider('aws')
@@ -173,7 +326,7 @@
'meta-data/iam/security-credentials/')
def test_refresh_credentials(self):
- now = datetime.now()
+ now = datetime.utcnow()
first_expiration = (now + timedelta(seconds=10)).strftime(
"%Y-%m-%dT%H:%M:%SZ")
credentials = {
@@ -220,6 +373,51 @@
timeout=4.0, num_retries=10,
data='meta-data/iam/security-credentials/')
+ def test_provider_google(self):
+ self.environ['GS_ACCESS_KEY_ID'] = 'env_access_key'
+ self.environ['GS_SECRET_ACCESS_KEY'] = 'env_secret_key'
+ self.shared_config = {
+ 'default': {
+ 'gs_access_key_id': 'shared_access_key',
+ 'gs_secret_access_key': 'shared_secret_key',
+ }
+ }
+ self.config = {
+ 'Credentials': {
+ 'gs_access_key_id': 'cfg_access_key',
+ 'gs_secret_access_key': 'cfg_secret_key',
+ }
+ }
+ p = provider.Provider('google')
+ self.assertEqual(p.access_key, 'env_access_key')
+ self.assertEqual(p.secret_key, 'env_secret_key')
+ self.environ.clear()
+ p = provider.Provider('google')
+ self.assertEqual(p.access_key, 'shared_access_key')
+ self.assertEqual(p.secret_key, 'shared_secret_key')
+
+ self.shared_config.clear()
+ p = provider.Provider('google')
+ self.assertEqual(p.access_key, 'cfg_access_key')
+ self.assertEqual(p.secret_key, 'cfg_secret_key')
+
+ @mock.patch('os.path.isfile', return_value=True)
+ @mock.patch.object(provider.Config, 'load_from_path')
+ def test_shared_config_loading(self, load_from_path, exists):
+ provider.Provider('aws')
+ path = os.path.join(expanduser('~'), '.aws', 'credentials')
+ exists.assert_called_once_with(path)
+ load_from_path.assert_called_once_with(path)
+
+ exists.reset_mock()
+ load_from_path.reset_mock()
+
+ provider.Provider('google')
+ path = os.path.join(expanduser('~'), '.google', 'credentials')
+ exists.assert_called_once_with(path)
+ load_from_path.assert_called_once_with(path)
+
+
if __name__ == '__main__':
unittest.main()
« no previous file with comments | « third_party/boto/tests/unit/mws/test_response.py ('k') | third_party/boto/tests/unit/rds/test_connection.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698