| Index: third_party/gsutil/third_party/boto/tests/integration/kinesis/test_kinesis.py
|
| diff --git a/third_party/gsutil/third_party/boto/tests/integration/kinesis/test_kinesis.py b/third_party/gsutil/third_party/boto/tests/integration/kinesis/test_kinesis.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..3c61feeba53ab72662273d7b347f0168750275cc
|
| --- /dev/null
|
| +++ b/third_party/gsutil/third_party/boto/tests/integration/kinesis/test_kinesis.py
|
| @@ -0,0 +1,116 @@
|
| +# Copyright (c) 2013 Amazon.com, Inc. or its affiliates.
|
| +# All rights reserved.
|
| +#
|
| +# Permission is hereby granted, free of charge, to any person obtaining a
|
| +# copy of this software and associated documentation files (the
|
| +# "Software"), to deal in the Software without restriction, including
|
| +# without limitation the rights to use, copy, modify, merge, publish, dis-
|
| +# tribute, sublicense, and/or sell copies of the Software, and to permit
|
| +# persons to whom the Software is furnished to do so, subject to the fol-
|
| +# lowing conditions:
|
| +#
|
| +# The above copyright notice and this permission notice shall be included
|
| +# in all copies or substantial portions of the Software.
|
| +#
|
| +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
| +# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
|
| +# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
|
| +# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
| +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
| +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
| +# IN THE SOFTWARE.
|
| +
|
| +import time
|
| +
|
| +import boto
|
| +from tests.compat import unittest
|
| +from boto.kinesis.exceptions import ResourceNotFoundException
|
| +
|
| +
|
| +class TimeoutError(Exception):
|
| + pass
|
| +
|
| +
|
| +class TestKinesis(unittest.TestCase):
|
| + def setUp(self):
|
| + self.kinesis = boto.connect_kinesis()
|
| +
|
| + def test_kinesis(self):
|
| + kinesis = self.kinesis
|
| +
|
| + # Create a new stream
|
| + kinesis.create_stream('test', 1)
|
| + self.addCleanup(self.kinesis.delete_stream, 'test')
|
| +
|
| + # Wait for the stream to be ready
|
| + tries = 0
|
| + while tries < 10:
|
| + tries += 1
|
| + time.sleep(15)
|
| + response = kinesis.describe_stream('test')
|
| +
|
| + if response['StreamDescription']['StreamStatus'] == 'ACTIVE':
|
| + shard_id = response['StreamDescription']['Shards'][0]['ShardId']
|
| + break
|
| + else:
|
| + raise TimeoutError('Stream is still not active, aborting...')
|
| +
|
| + # Make a tag.
|
| + kinesis.add_tags_to_stream(stream_name='test', tags={'foo': 'bar'})
|
| +
|
| + # Check that the correct tag is there.
|
| + response = kinesis.list_tags_for_stream(stream_name='test')
|
| + self.assertEqual(len(response['Tags']), 1)
|
| + self.assertEqual(response['Tags'][0],
|
| + {'Key':'foo', 'Value': 'bar'})
|
| +
|
| + # Remove the tag and ensure it is removed.
|
| + kinesis.remove_tags_from_stream(stream_name='test', tag_keys=['foo'])
|
| + response = kinesis.list_tags_for_stream(stream_name='test')
|
| + self.assertEqual(len(response['Tags']), 0)
|
| +
|
| + # Get ready to process some data from the stream
|
| + response = kinesis.get_shard_iterator('test', shard_id, 'TRIM_HORIZON')
|
| + shard_iterator = response['ShardIterator']
|
| +
|
| + # Write some data to the stream
|
| + data = 'Some data ...'
|
| + record = {
|
| + 'Data': data,
|
| + 'PartitionKey': data,
|
| + }
|
| + response = kinesis.put_record('test', data, data)
|
| + response = kinesis.put_records([record, record.copy()], 'test')
|
| +
|
| + # Wait for the data to show up
|
| + tries = 0
|
| + num_collected = 0
|
| + num_expected_records = 3
|
| + collected_records = []
|
| + while tries < 100:
|
| + tries += 1
|
| + time.sleep(1)
|
| +
|
| + response = kinesis.get_records(shard_iterator)
|
| + shard_iterator = response['NextShardIterator']
|
| + for record in response['Records']:
|
| + if 'Data' in record:
|
| + collected_records.append(record['Data'])
|
| + num_collected += 1
|
| + if num_collected >= num_expected_records:
|
| + self.assertEqual(num_expected_records, num_collected)
|
| + break
|
| + else:
|
| + raise TimeoutError('No records found, aborting...')
|
| +
|
| + # Read the data, which should be the same as what we wrote
|
| + for record in collected_records:
|
| + self.assertEqual(data, record)
|
| +
|
| + def test_describe_non_existent_stream(self):
|
| + with self.assertRaises(ResourceNotFoundException) as cm:
|
| + self.kinesis.describe_stream('this-stream-shouldnt-exist')
|
| +
|
| + # Assert things about the data we passed along.
|
| + self.assertEqual(cm.exception.error_code, None)
|
| + self.assertTrue('not found' in cm.exception.message)
|
|
|