| Index: third_party/boto/tests/integration/kinesis/test_kinesis.py
|
| ===================================================================
|
| --- third_party/boto/tests/integration/kinesis/test_kinesis.py (revision 33376)
|
| +++ third_party/boto/tests/integration/kinesis/test_kinesis.py (working copy)
|
| @@ -20,9 +20,11 @@
|
| # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
| # IN THE SOFTWARE.
|
|
|
| -import boto
|
| import time
|
|
|
| +import boto
|
| +from boto.kinesis.exceptions import ResourceNotFoundException
|
| +
|
| from unittest import TestCase
|
|
|
|
|
| @@ -34,15 +36,12 @@
|
| def setUp(self):
|
| self.kinesis = boto.connect_kinesis()
|
|
|
| - def tearDown(self):
|
| - # Delete the stream even if there is a failure
|
| - self.kinesis.delete_stream('test')
|
| -
|
| def test_kinesis(self):
|
| kinesis = self.kinesis
|
|
|
| # Create a new stream
|
| kinesis.create_stream('test', 1)
|
| + self.addCleanup(self.kinesis.delete_stream, 'test')
|
|
|
| # Wait for the stream to be ready
|
| tries = 0
|
| @@ -70,7 +69,7 @@
|
| while tries < 100:
|
| tries += 1
|
| time.sleep(1)
|
| -
|
| +
|
| response = kinesis.get_records(shard_iterator)
|
| shard_iterator = response['NextShardIterator']
|
|
|
| @@ -82,3 +81,11 @@
|
| # Read the data, which should be the same as what we wrote
|
| self.assertEqual(1, len(response['Records']))
|
| self.assertEqual(data, response['Records'][0]['Data'])
|
| +
|
| + def test_describe_non_existent_stream(self):
|
| + with self.assertRaises(ResourceNotFoundException) as cm:
|
| + self.kinesis.describe_stream('this-stream-shouldnt-exist')
|
| +
|
| + # Assert things about the data we passed along.
|
| + self.assertEqual(cm.exception.error_code, None)
|
| + self.assertTrue('not found' in cm.exception.message)
|
|
|