Index: tools/telemetry/third_party/gsutil/third_party/boto/tests/unit/sns/test_connection.py |
diff --git a/tools/telemetry/third_party/gsutil/third_party/boto/tests/unit/sns/test_connection.py b/tools/telemetry/third_party/gsutil/third_party/boto/tests/unit/sns/test_connection.py |
deleted file mode 100644 |
index 2cce22115ea41bba3f1b8ce1e15d7f520fd27c06..0000000000000000000000000000000000000000 |
--- a/tools/telemetry/third_party/gsutil/third_party/boto/tests/unit/sns/test_connection.py |
+++ /dev/null |
@@ -1,281 +0,0 @@ |
-#!/usr/bin/env python |
-# Copyright (c) 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved |
-# |
-# Permission is hereby granted, free of charge, to any person obtaining a |
-# copy of this software and associated documentation files (the |
-# "Software"), to deal in the Software without restriction, including |
-# without limitation the rights to use, copy, modify, merge, publish, dis- |
-# tribute, sublicense, and/or sell copies of the Software, and to permit |
-# persons to whom the Software is furnished to do so, subject to the fol- |
-# lowing conditions: |
-# |
-# The above copyright notice and this permission notice shall be included |
-# in all copies or substantial portions of the Software. |
-# |
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS |
-# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- |
-# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT |
-# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, |
-# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
-# IN THE SOFTWARE. |
-# |
-import json |
-from tests.unit import unittest |
-from tests.unit import AWSMockServiceTestCase |
-from mock import Mock |
- |
-from boto.sns.connection import SNSConnection |
- |
-QUEUE_POLICY = { |
- u'Policy': |
- (u'{"Version":"2008-10-17","Id":"arn:aws:sqs:us-east-1:' |
- 'idnum:testqueuepolicy/SQSDefaultPolicy","Statement":' |
- '[{"Sid":"sidnum","Effect":"Allow","Principal":{"AWS":"*"},' |
- '"Action":"SQS:GetQueueUrl","Resource":' |
- '"arn:aws:sqs:us-east-1:idnum:testqueuepolicy"}]}')} |
- |
- |
-class TestSNSConnection(AWSMockServiceTestCase): |
- connection_class = SNSConnection |
- |
- def setUp(self): |
- super(TestSNSConnection, self).setUp() |
- |
- def default_body(self): |
- return b"{}" |
- |
- def test_sqs_with_existing_policy(self): |
- self.set_http_response(status_code=200) |
- |
- queue = Mock() |
- queue.get_attributes.return_value = QUEUE_POLICY |
- queue.arn = 'arn:aws:sqs:us-east-1:idnum:queuename' |
- |
- self.service_connection.subscribe_sqs_queue('topic_arn', queue) |
- self.assert_request_parameters({ |
- 'Action': 'Subscribe', |
- 'ContentType': 'JSON', |
- 'Endpoint': 'arn:aws:sqs:us-east-1:idnum:queuename', |
- 'Protocol': 'sqs', |
- 'TopicArn': 'topic_arn', |
- 'Version': '2010-03-31', |
- }, ignore_params_values=[]) |
- |
- # Verify that the queue policy was properly updated. |
- actual_policy = json.loads(queue.set_attribute.call_args[0][1]) |
- self.assertEqual(actual_policy['Version'], '2008-10-17') |
- # A new statement should be appended to the end of the statement list. |
- self.assertEqual(len(actual_policy['Statement']), 2) |
- self.assertEqual(actual_policy['Statement'][1]['Action'], |
- 'SQS:SendMessage') |
- |
- def test_sqs_with_no_previous_policy(self): |
- self.set_http_response(status_code=200) |
- |
- queue = Mock() |
- queue.get_attributes.return_value = {} |
- queue.arn = 'arn:aws:sqs:us-east-1:idnum:queuename' |
- |
- self.service_connection.subscribe_sqs_queue('topic_arn', queue) |
- self.assert_request_parameters({ |
- 'Action': 'Subscribe', |
- 'ContentType': 'JSON', |
- 'Endpoint': 'arn:aws:sqs:us-east-1:idnum:queuename', |
- 'Protocol': 'sqs', |
- 'TopicArn': 'topic_arn', |
- 'Version': '2010-03-31', |
- }, ignore_params_values=[]) |
- actual_policy = json.loads(queue.set_attribute.call_args[0][1]) |
- # Only a single statement should be part of the policy. |
- self.assertEqual(len(actual_policy['Statement']), 1) |
- |
- def test_publish_with_positional_args(self): |
- self.set_http_response(status_code=200) |
- |
- self.service_connection.publish('topic', 'message', 'subject') |
- self.assert_request_parameters({ |
- 'Action': 'Publish', |
- 'TopicArn': 'topic', |
- 'Subject': 'subject', |
- 'Message': 'message', |
- }, ignore_params_values=['Version', 'ContentType']) |
- |
- def test_publish_with_kwargs(self): |
- self.set_http_response(status_code=200) |
- |
- self.service_connection.publish(topic='topic', |
- message='message', |
- subject='subject') |
- self.assert_request_parameters({ |
- 'Action': 'Publish', |
- 'TopicArn': 'topic', |
- 'Subject': 'subject', |
- 'Message': 'message', |
- }, ignore_params_values=['Version', 'ContentType']) |
- |
- def test_publish_with_target_arn(self): |
- self.set_http_response(status_code=200) |
- |
- self.service_connection.publish(target_arn='target_arn', |
- message='message', |
- subject='subject') |
- self.assert_request_parameters({ |
- 'Action': 'Publish', |
- 'TargetArn': 'target_arn', |
- 'Subject': 'subject', |
- 'Message': 'message', |
- }, ignore_params_values=['Version', 'ContentType']) |
- |
- def test_create_platform_application(self): |
- self.set_http_response(status_code=200) |
- |
- self.service_connection.create_platform_application( |
- name='MyApp', |
- platform='APNS', |
- attributes={ |
- 'PlatformPrincipal': 'a ssl certificate', |
- 'PlatformCredential': 'a private key' |
- } |
- ) |
- self.assert_request_parameters({ |
- 'Action': 'CreatePlatformApplication', |
- 'Name': 'MyApp', |
- 'Platform': 'APNS', |
- 'Attributes.entry.1.key': 'PlatformCredential', |
- 'Attributes.entry.1.value': 'a private key', |
- 'Attributes.entry.2.key': 'PlatformPrincipal', |
- 'Attributes.entry.2.value': 'a ssl certificate', |
- }, ignore_params_values=['Version', 'ContentType']) |
- |
- def test_set_platform_application_attributes(self): |
- self.set_http_response(status_code=200) |
- |
- self.service_connection.set_platform_application_attributes( |
- platform_application_arn='arn:myapp', |
- attributes={'PlatformPrincipal': 'a ssl certificate', |
- 'PlatformCredential': 'a private key'}) |
- self.assert_request_parameters({ |
- 'Action': 'SetPlatformApplicationAttributes', |
- 'PlatformApplicationArn': 'arn:myapp', |
- 'Attributes.entry.1.key': 'PlatformCredential', |
- 'Attributes.entry.1.value': 'a private key', |
- 'Attributes.entry.2.key': 'PlatformPrincipal', |
- 'Attributes.entry.2.value': 'a ssl certificate', |
- }, ignore_params_values=['Version', 'ContentType']) |
- |
- def test_create_platform_endpoint(self): |
- self.set_http_response(status_code=200) |
- |
- self.service_connection.create_platform_endpoint( |
- platform_application_arn='arn:myapp', |
- token='abcde12345', |
- custom_user_data='john', |
- attributes={'Enabled': False}) |
- self.assert_request_parameters({ |
- 'Action': 'CreatePlatformEndpoint', |
- 'PlatformApplicationArn': 'arn:myapp', |
- 'Token': 'abcde12345', |
- 'CustomUserData': 'john', |
- 'Attributes.entry.1.key': 'Enabled', |
- 'Attributes.entry.1.value': False, |
- }, ignore_params_values=['Version', 'ContentType']) |
- |
- def test_set_endpoint_attributes(self): |
- self.set_http_response(status_code=200) |
- |
- self.service_connection.set_endpoint_attributes( |
- endpoint_arn='arn:myendpoint', |
- attributes={'CustomUserData': 'john', |
- 'Enabled': False}) |
- self.assert_request_parameters({ |
- 'Action': 'SetEndpointAttributes', |
- 'EndpointArn': 'arn:myendpoint', |
- 'Attributes.entry.1.key': 'CustomUserData', |
- 'Attributes.entry.1.value': 'john', |
- 'Attributes.entry.2.key': 'Enabled', |
- 'Attributes.entry.2.value': False, |
- }, ignore_params_values=['Version', 'ContentType']) |
- |
- def test_message_is_required(self): |
- self.set_http_response(status_code=200) |
- |
- with self.assertRaises(TypeError): |
- self.service_connection.publish(topic='topic', subject='subject') |
- |
- def test_publish_with_json(self): |
- self.set_http_response(status_code=200) |
- |
- self.service_connection.publish( |
- message=json.dumps({ |
- 'default': 'Ignored.', |
- 'GCM': { |
- 'data': 'goes here', |
- } |
- }), |
- message_structure='json', |
- subject='subject', |
- target_arn='target_arn' |
- ) |
- self.assert_request_parameters({ |
- 'Action': 'Publish', |
- 'TargetArn': 'target_arn', |
- 'Subject': 'subject', |
- 'MessageStructure': 'json', |
- }, ignore_params_values=['Version', 'ContentType', 'Message']) |
- self.assertDictEqual( |
- json.loads(self.actual_request.params["Message"]), |
- {"default": "Ignored.", "GCM": {"data": "goes here"}}) |
- |
- def test_publish_with_utf8_message(self): |
- self.set_http_response(status_code=200) |
- subject = message = u'We \u2665 utf-8'.encode('utf-8') |
- self.service_connection.publish('topic', message, subject) |
- self.assert_request_parameters({ |
- 'Action': 'Publish', |
- 'TopicArn': 'topic', |
- 'Subject': subject, |
- 'Message': message, |
- }, ignore_params_values=['Version', 'ContentType']) |
- |
- def test_publish_with_attributes(self): |
- self.set_http_response(status_code=200) |
- |
- self.service_connection.publish( |
- message=json.dumps({ |
- 'default': 'Ignored.', |
- 'GCM': { |
- 'data': 'goes here', |
- } |
- }, sort_keys=True), |
- message_structure='json', |
- subject='subject', |
- target_arn='target_arn', |
- message_attributes={ |
- 'name1': { |
- 'data_type': 'Number', |
- 'string_value': '42' |
- }, |
- 'name2': { |
- 'data_type': 'String', |
- 'string_value': 'Bob' |
- }, |
- }, |
- ) |
- self.assert_request_parameters({ |
- 'Action': 'Publish', |
- 'TargetArn': 'target_arn', |
- 'Subject': 'subject', |
- 'Message': '{"GCM": {"data": "goes here"}, "default": "Ignored."}', |
- 'MessageStructure': 'json', |
- 'MessageAttributes.entry.1.Name': 'name1', |
- 'MessageAttributes.entry.1.Value.DataType': 'Number', |
- 'MessageAttributes.entry.1.Value.StringValue': '42', |
- 'MessageAttributes.entry.2.Name': 'name2', |
- 'MessageAttributes.entry.2.Value.DataType': 'String', |
- 'MessageAttributes.entry.2.Value.StringValue': 'Bob', |
- }, ignore_params_values=['Version', 'ContentType']) |
- |
- |
-if __name__ == '__main__': |
- unittest.main() |