| Index: third_party/gsutil/third_party/boto/tests/integration/sns/test_sns_sqs_subscription.py
|
| diff --git a/third_party/gsutil/third_party/boto/tests/integration/sns/test_sns_sqs_subscription.py b/third_party/gsutil/third_party/boto/tests/integration/sns/test_sns_sqs_subscription.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..a3656b9658f294d098d96ab36c5722781ff7488b
|
| --- /dev/null
|
| +++ b/third_party/gsutil/third_party/boto/tests/integration/sns/test_sns_sqs_subscription.py
|
| @@ -0,0 +1,101 @@
|
| +# Copyright (c) 2006-2010 Mitch Garnaat http://garnaat.org/
|
| +# Copyright (c) 2010, Eucalyptus Systems, Inc.
|
| +# All rights reserved.
|
| +#
|
| +# Permission is hereby granted, free of charge, to any person obtaining a
|
| +# copy of this software and associated documentation files (the
|
| +# "Software"), to deal in the Software without restriction, including
|
| +# without limitation the rights to use, copy, modify, merge, publish, dis-
|
| +# tribute, sublicense, and/or sell copies of the Software, and to permit
|
| +# persons to whom the Software is furnished to do so, subject to the fol-
|
| +# lowing conditions:
|
| +#
|
| +# The above copyright notice and this permission notice shall be included
|
| +# in all copies or substantial portions of the Software.
|
| +#
|
| +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
| +# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
|
| +# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
|
| +# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
| +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
| +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
| +# IN THE SOFTWARE.
|
| +
|
| +"""
|
| +Unit tests for subscribing SQS queues to SNS topics.
|
| +"""
|
| +
|
| +import hashlib
|
| +import time
|
| +
|
| +from tests.unit import unittest
|
| +
|
| +from boto.compat import json
|
| +from boto.sqs.connection import SQSConnection
|
| +from boto.sns.connection import SNSConnection
|
| +
|
| +class SNSSubcribeSQSTest(unittest.TestCase):
|
| +
|
| + sqs = True
|
| + sns = True
|
| +
|
| + def setUp(self):
|
| + self.sqsc = SQSConnection()
|
| + self.snsc = SNSConnection()
|
| +
|
| + def get_policy_statements(self, queue):
|
| + attrs = queue.get_attributes('Policy')
|
| + policy = json.loads(attrs.get('Policy', "{}"))
|
| + return policy.get('Statement', {})
|
| +
|
| + def test_correct_sid(self):
|
| + now = time.time()
|
| + topic_name = queue_name = "test_correct_sid%d" % (now)
|
| +
|
| + timeout = 60
|
| + queue = self.sqsc.create_queue(queue_name, timeout)
|
| + self.addCleanup(self.sqsc.delete_queue, queue, True)
|
| + queue_arn = queue.arn
|
| +
|
| + topic = self.snsc.create_topic(topic_name)
|
| + topic_arn = topic['CreateTopicResponse']['CreateTopicResult']\
|
| + ['TopicArn']
|
| + self.addCleanup(self.snsc.delete_topic, topic_arn)
|
| +
|
| + expected_sid = hashlib.md5((topic_arn + queue_arn).encode('utf-8')).hexdigest()
|
| + resp = self.snsc.subscribe_sqs_queue(topic_arn, queue)
|
| +
|
| + found_expected_sid = False
|
| + statements = self.get_policy_statements(queue)
|
| + for statement in statements:
|
| + if statement['Sid'] == expected_sid:
|
| + found_expected_sid = True
|
| + break
|
| + self.assertTrue(found_expected_sid)
|
| +
|
| + def test_idempotent_subscribe(self):
|
| + now = time.time()
|
| + topic_name = queue_name = "test_idempotent_subscribe%d" % (now)
|
| +
|
| + timeout = 60
|
| + queue = self.sqsc.create_queue(queue_name, timeout)
|
| + self.addCleanup(self.sqsc.delete_queue, queue, True)
|
| + initial_statements = self.get_policy_statements(queue)
|
| + queue_arn = queue.arn
|
| +
|
| + topic = self.snsc.create_topic(topic_name)
|
| + topic_arn = topic['CreateTopicResponse']['CreateTopicResult']\
|
| + ['TopicArn']
|
| + self.addCleanup(self.snsc.delete_topic, topic_arn)
|
| +
|
| + resp = self.snsc.subscribe_sqs_queue(topic_arn, queue)
|
| + time.sleep(3)
|
| + first_subscribe_statements = self.get_policy_statements(queue)
|
| + self.assertEqual(len(first_subscribe_statements),
|
| + len(initial_statements) + 1)
|
| +
|
| + resp2 = self.snsc.subscribe_sqs_queue(topic_arn, queue)
|
| + time.sleep(3)
|
| + second_subscribe_statements = self.get_policy_statements(queue)
|
| + self.assertEqual(len(second_subscribe_statements),
|
| + len(first_subscribe_statements))
|
|
|