Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(16)

Unified Diff: third_party/gsutil/third_party/boto/boto/kinesis/layer1.py

Issue 1377933002: [catapult] - Copy Telemetry's gsutilz over to third_party. (Closed) Base URL: https://github.com/catapult-project/catapult.git@master
Patch Set: Rename to gsutil. Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/gsutil/third_party/boto/boto/kinesis/layer1.py
diff --git a/third_party/gsutil/third_party/boto/boto/kinesis/layer1.py b/third_party/gsutil/third_party/boto/boto/kinesis/layer1.py
new file mode 100644
index 0000000000000000000000000000000000000000..f1910ff4e9e6c20a161f5cefd2c2170b778a254b
--- /dev/null
+++ b/third_party/gsutil/third_party/boto/boto/kinesis/layer1.py
@@ -0,0 +1,875 @@
+# Copyright (c) 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved
+#
+# Permission is hereby granted, free of charge, to any person obtaining a
+# copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish, dis-
+# tribute, sublicense, and/or sell copies of the Software, and to permit
+# persons to whom the Software is furnished to do so, subject to the fol-
+# lowing conditions:
+#
+# The above copyright notice and this permission notice shall be included
+# in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
+# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+#
+
+import base64
+import boto
+
+from boto.connection import AWSQueryConnection
+from boto.regioninfo import RegionInfo
+from boto.exception import JSONResponseError
+from boto.kinesis import exceptions
+from boto.compat import json
+from boto.compat import six
+
+
+class KinesisConnection(AWSQueryConnection):
+ """
+ Amazon Kinesis Service API Reference
+ Amazon Kinesis is a managed service that scales elastically for
+ real time processing of streaming big data.
+ """
+ APIVersion = "2013-12-02"
+ DefaultRegionName = "us-east-1"
+ DefaultRegionEndpoint = "kinesis.us-east-1.amazonaws.com"
+ ServiceName = "Kinesis"
+ TargetPrefix = "Kinesis_20131202"
+ ResponseError = JSONResponseError
+
+ _faults = {
+ "ProvisionedThroughputExceededException": exceptions.ProvisionedThroughputExceededException,
+ "LimitExceededException": exceptions.LimitExceededException,
+ "ExpiredIteratorException": exceptions.ExpiredIteratorException,
+ "ResourceInUseException": exceptions.ResourceInUseException,
+ "ResourceNotFoundException": exceptions.ResourceNotFoundException,
+ "InvalidArgumentException": exceptions.InvalidArgumentException,
+ "SubscriptionRequiredException": exceptions.SubscriptionRequiredException
+ }
+
+
+ def __init__(self, **kwargs):
+ region = kwargs.pop('region', None)
+ if not region:
+ region = RegionInfo(self, self.DefaultRegionName,
+ self.DefaultRegionEndpoint)
+ if 'host' not in kwargs:
+ kwargs['host'] = region.endpoint
+ super(KinesisConnection, self).__init__(**kwargs)
+ self.region = region
+
+ def _required_auth_capability(self):
+ return ['hmac-v4']
+
+ def add_tags_to_stream(self, stream_name, tags):
+ """
+ Adds or updates tags for the specified Amazon Kinesis stream.
+ Each stream can have up to 10 tags.
+
+ If tags have already been assigned to the stream,
+ `AddTagsToStream` overwrites any existing tags that correspond
+ to the specified tag keys.
+
+ :type stream_name: string
+ :param stream_name: The name of the stream.
+
+ :type tags: map
+ :param tags: The set of key-value pairs to use to create the tags.
+
+ """
+ params = {'StreamName': stream_name, 'Tags': tags, }
+ return self.make_request(action='AddTagsToStream',
+ body=json.dumps(params))
+
+ def create_stream(self, stream_name, shard_count):
+ """
+ Creates a Amazon Kinesis stream. A stream captures and
+ transports data records that are continuously emitted from
+ different data sources or producers . Scale-out within an
+ Amazon Kinesis stream is explicitly supported by means of
+ shards, which are uniquely identified groups of data records
+ in an Amazon Kinesis stream.
+
+ You specify and control the number of shards that a stream is
+ composed of. Each open shard can support up to 5 read
+ transactions per second, up to a maximum total of 2 MB of data
+ read per second. Each shard can support up to 1000 records
+ written per second, up to a maximum total of 1 MB data written
+ per second. You can add shards to a stream if the amount of
+ data input increases and you can remove shards if the amount
+ of data input decreases.
+
+ The stream name identifies the stream. The name is scoped to
+ the AWS account used by the application. It is also scoped by
+ region. That is, two streams in two different accounts can
+ have the same name, and two streams in the same account, but
+ in two different regions, can have the same name.
+
+ `CreateStream` is an asynchronous operation. Upon receiving a
+ `CreateStream` request, Amazon Kinesis immediately returns and
+ sets the stream status to `CREATING`. After the stream is
+ created, Amazon Kinesis sets the stream status to `ACTIVE`.
+ You should perform read and write operations only on an
+ `ACTIVE` stream.
+
+ You receive a `LimitExceededException` when making a
+ `CreateStream` request if you try to do one of the following:
+
+
+ + Have more than five streams in the `CREATING` state at any
+ point in time.
+ + Create more shards than are authorized for your account.
+
+
+ The default limit for an AWS account is 10 shards per stream.
+ If you need to create a stream with more than 10 shards,
+ `contact AWS Support`_ to increase the limit on your account.
+
+ You can use `DescribeStream` to check the stream status, which
+ is returned in `StreamStatus`.
+
+ `CreateStream` has a limit of 5 transactions per second per
+ account.
+
+ :type stream_name: string
+ :param stream_name: A name to identify the stream. The stream name is
+ scoped to the AWS account used by the application that creates the
+ stream. It is also scoped by region. That is, two streams in two
+ different AWS accounts can have the same name, and two streams in
+ the same AWS account, but in two different regions, can have the
+ same name.
+
+ :type shard_count: integer
+ :param shard_count: The number of shards that the stream will use. The
+ throughput of the stream is a function of the number of shards;
+ more shards are required for greater provisioned throughput.
+ **Note:** The default limit for an AWS account is 10 shards per stream.
+ If you need to create a stream with more than 10 shards, `contact
+ AWS Support`_ to increase the limit on your account.
+
+ """
+ params = {
+ 'StreamName': stream_name,
+ 'ShardCount': shard_count,
+ }
+ return self.make_request(action='CreateStream',
+ body=json.dumps(params))
+
+ def delete_stream(self, stream_name):
+ """
+ Deletes a stream and all its shards and data. You must shut
+ down any applications that are operating on the stream before
+ you delete the stream. If an application attempts to operate
+ on a deleted stream, it will receive the exception
+ `ResourceNotFoundException`.
+
+ If the stream is in the `ACTIVE` state, you can delete it.
+ After a `DeleteStream` request, the specified stream is in the
+ `DELETING` state until Amazon Kinesis completes the deletion.
+
+ **Note:** Amazon Kinesis might continue to accept data read
+ and write operations, such as PutRecord, PutRecords, and
+ GetRecords, on a stream in the `DELETING` state until the
+ stream deletion is complete.
+
+ When you delete a stream, any shards in that stream are also
+ deleted, and any tags are dissociated from the stream.
+
+ You can use the DescribeStream operation to check the state of
+ the stream, which is returned in `StreamStatus`.
+
+ `DeleteStream` has a limit of 5 transactions per second per
+ account.
+
+ :type stream_name: string
+ :param stream_name: The name of the stream to delete.
+
+ """
+ params = {'StreamName': stream_name, }
+ return self.make_request(action='DeleteStream',
+ body=json.dumps(params))
+
+ def describe_stream(self, stream_name, limit=None,
+ exclusive_start_shard_id=None):
+ """
+ Describes the specified stream.
+
+ The information about the stream includes its current status,
+ its Amazon Resource Name (ARN), and an array of shard objects.
+ For each shard object, there is information about the hash key
+ and sequence number ranges that the shard spans, and the IDs
+ of any earlier shards that played in a role in creating the
+ shard. A sequence number is the identifier associated with
+ every record ingested in the Amazon Kinesis stream. The
+ sequence number is assigned when a record is put into the
+ stream.
+
+ You can limit the number of returned shards using the `Limit`
+ parameter. The number of shards in a stream may be too large
+ to return from a single call to `DescribeStream`. You can
+ detect this by using the `HasMoreShards` flag in the returned
+ output. `HasMoreShards` is set to `True` when there is more
+ data available.
+
+ `DescribeStream` is a paginated operation. If there are more
+ shards available, you can request them using the shard ID of
+ the last shard returned. Specify this ID in the
+ `ExclusiveStartShardId` parameter in a subsequent request to
+ `DescribeStream`.
+
+ `DescribeStream` has a limit of 10 transactions per second per
+ account.
+
+ :type stream_name: string
+ :param stream_name: The name of the stream to describe.
+
+ :type limit: integer
+ :param limit: The maximum number of shards to return.
+
+ :type exclusive_start_shard_id: string
+ :param exclusive_start_shard_id: The shard ID of the shard to start
+ with.
+
+ """
+ params = {'StreamName': stream_name, }
+ if limit is not None:
+ params['Limit'] = limit
+ if exclusive_start_shard_id is not None:
+ params['ExclusiveStartShardId'] = exclusive_start_shard_id
+ return self.make_request(action='DescribeStream',
+ body=json.dumps(params))
+
+ def get_records(self, shard_iterator, limit=None, b64_decode=True):
+ """
+ Gets data records from a shard.
+
+ Specify a shard iterator using the `ShardIterator` parameter.
+ The shard iterator specifies the position in the shard from
+ which you want to start reading data records sequentially. If
+ there are no records available in the portion of the shard
+ that the iterator points to, `GetRecords` returns an empty
+ list. Note that it might take multiple calls to get to a
+ portion of the shard that contains records.
+
+ You can scale by provisioning multiple shards. Your
+ application should have one thread per shard, each reading
+ continuously from its stream. To read from a stream
+ continually, call `GetRecords` in a loop. Use GetShardIterator
+ to get the shard iterator to specify in the first `GetRecords`
+ call. `GetRecords` returns a new shard iterator in
+ `NextShardIterator`. Specify the shard iterator returned in
+ `NextShardIterator` in subsequent calls to `GetRecords`. Note
+ that if the shard has been closed, the shard iterator can't
+ return more data and `GetRecords` returns `null` in
+ `NextShardIterator`. You can terminate the loop when the shard
+ is closed, or when the shard iterator reaches the record with
+ the sequence number or other attribute that marks it as the
+ last record to process.
+
+ Each data record can be up to 50 KB in size, and each shard
+ can read up to 2 MB per second. You can ensure that your calls
+ don't exceed the maximum supported size or throughput by using
+ the `Limit` parameter to specify the maximum number of records
+ that `GetRecords` can return. Consider your average record
+ size when determining this limit. For example, if your average
+ record size is 40 KB, you can limit the data returned to about
+ 1 MB per call by specifying 25 as the limit.
+
+ The size of the data returned by `GetRecords` will vary
+ depending on the utilization of the shard. The maximum size of
+ data that `GetRecords` can return is 10 MB. If a call returns
+ 10 MB of data, subsequent calls made within the next 5 seconds
+ throw `ProvisionedThroughputExceededException`. If there is
+ insufficient provisioned throughput on the shard, subsequent
+ calls made within the next 1 second throw
+ `ProvisionedThroughputExceededException`. Note that
+ `GetRecords` won't return any data when it throws an
+ exception. For this reason, we recommend that you wait one
+ second between calls to `GetRecords`; however, it's possible
+ that the application will get exceptions for longer than 1
+ second.
+
+ To detect whether the application is falling behind in
+ processing, add a timestamp to your records and note how long
+ it takes to process them. You can also monitor how much data
+ is in a stream using the CloudWatch metrics for write
+ operations ( `PutRecord` and `PutRecords`). For more
+ information, see `Monitoring Amazon Kinesis with Amazon
+ CloudWatch`_ in the Amazon Kinesis Developer Guide .
+
+ :type shard_iterator: string
+ :param shard_iterator: The position in the shard from which you want to
+ start sequentially reading data records. A shard iterator specifies
+ this position using the sequence number of a data record in the
+ shard.
+
+ :type limit: integer
+ :param limit: The maximum number of records to return. Specify a value
+ of up to 10,000. If you specify a value that is greater than
+ 10,000, `GetRecords` throws `InvalidArgumentException`.
+
+ :type b64_decode: boolean
+ :param b64_decode: Decode the Base64-encoded ``Data`` field of records.
+
+ """
+ params = {'ShardIterator': shard_iterator, }
+ if limit is not None:
+ params['Limit'] = limit
+
+ response = self.make_request(action='GetRecords',
+ body=json.dumps(params))
+
+ # Base64 decode the data
+ if b64_decode:
+ for record in response.get('Records', []):
+ record['Data'] = base64.b64decode(
+ record['Data'].encode('utf-8')).decode('utf-8')
+
+ return response
+
+ def get_shard_iterator(self, stream_name, shard_id, shard_iterator_type,
+ starting_sequence_number=None):
+ """
+ Gets a shard iterator. A shard iterator expires five minutes
+ after it is returned to the requester.
+
+ A shard iterator specifies the position in the shard from
+ which to start reading data records sequentially. A shard
+ iterator specifies this position using the sequence number of
+ a data record in a shard. A sequence number is the identifier
+ associated with every record ingested in the Amazon Kinesis
+ stream. The sequence number is assigned when a record is put
+ into the stream.
+
+ You must specify the shard iterator type. For example, you can
+ set the `ShardIteratorType` parameter to read exactly from the
+ position denoted by a specific sequence number by using the
+ `AT_SEQUENCE_NUMBER` shard iterator type, or right after the
+ sequence number by using the `AFTER_SEQUENCE_NUMBER` shard
+ iterator type, using sequence numbers returned by earlier
+ calls to PutRecord, PutRecords, GetRecords, or DescribeStream.
+ You can specify the shard iterator type `TRIM_HORIZON` in the
+ request to cause `ShardIterator` to point to the last
+ untrimmed record in the shard in the system, which is the
+ oldest data record in the shard. Or you can point to just
+ after the most recent record in the shard, by using the shard
+ iterator type `LATEST`, so that you always read the most
+ recent data in the shard.
+
+ When you repeatedly read from an Amazon Kinesis stream use a
+ GetShardIterator request to get the first shard iterator to to
+ use in your first `GetRecords` request and then use the shard
+ iterator returned by the `GetRecords` request in
+ `NextShardIterator` for subsequent reads. A new shard iterator
+ is returned by every `GetRecords` request in
+ `NextShardIterator`, which you use in the `ShardIterator`
+ parameter of the next `GetRecords` request.
+
+ If a `GetShardIterator` request is made too often, you receive
+ a `ProvisionedThroughputExceededException`. For more
+ information about throughput limits, see GetRecords.
+
+ If the shard is closed, the iterator can't return more data,
+ and `GetShardIterator` returns `null` for its `ShardIterator`.
+ A shard can be closed using SplitShard or MergeShards.
+
+ `GetShardIterator` has a limit of 5 transactions per second
+ per account per open shard.
+
+ :type stream_name: string
+ :param stream_name: The name of the stream.
+
+ :type shard_id: string
+ :param shard_id: The shard ID of the shard to get the iterator for.
+
+ :type shard_iterator_type: string
+ :param shard_iterator_type:
+ Determines how the shard iterator is used to start reading data records
+ from the shard.
+
+ The following are the valid shard iterator types:
+
+
+ + AT_SEQUENCE_NUMBER - Start reading exactly from the position denoted
+ by a specific sequence number.
+ + AFTER_SEQUENCE_NUMBER - Start reading right after the position
+ denoted by a specific sequence number.
+ + TRIM_HORIZON - Start reading at the last untrimmed record in the
+ shard in the system, which is the oldest data record in the shard.
+ + LATEST - Start reading just after the most recent record in the
+ shard, so that you always read the most recent data in the shard.
+
+ :type starting_sequence_number: string
+ :param starting_sequence_number: The sequence number of the data record
+ in the shard from which to start reading from.
+
+ """
+ params = {
+ 'StreamName': stream_name,
+ 'ShardId': shard_id,
+ 'ShardIteratorType': shard_iterator_type,
+ }
+ if starting_sequence_number is not None:
+ params['StartingSequenceNumber'] = starting_sequence_number
+ return self.make_request(action='GetShardIterator',
+ body=json.dumps(params))
+
+ def list_streams(self, limit=None, exclusive_start_stream_name=None):
+ """
+ Lists your streams.
+
+ The number of streams may be too large to return from a single
+ call to `ListStreams`. You can limit the number of returned
+ streams using the `Limit` parameter. If you do not specify a
+ value for the `Limit` parameter, Amazon Kinesis uses the
+ default limit, which is currently 10.
+
+ You can detect if there are more streams available to list by
+ using the `HasMoreStreams` flag from the returned output. If
+ there are more streams available, you can request more streams
+ by using the name of the last stream returned by the
+ `ListStreams` request in the `ExclusiveStartStreamName`
+ parameter in a subsequent request to `ListStreams`. The group
+ of stream names returned by the subsequent request is then
+ added to the list. You can continue this process until all the
+ stream names have been collected in the list.
+
+ `ListStreams` has a limit of 5 transactions per second per
+ account.
+
+ :type limit: integer
+ :param limit: The maximum number of streams to list.
+
+ :type exclusive_start_stream_name: string
+ :param exclusive_start_stream_name: The name of the stream to start the
+ list with.
+
+ """
+ params = {}
+ if limit is not None:
+ params['Limit'] = limit
+ if exclusive_start_stream_name is not None:
+ params['ExclusiveStartStreamName'] = exclusive_start_stream_name
+ return self.make_request(action='ListStreams',
+ body=json.dumps(params))
+
+ def list_tags_for_stream(self, stream_name, exclusive_start_tag_key=None,
+ limit=None):
+ """
+ Lists the tags for the specified Amazon Kinesis stream.
+
+ :type stream_name: string
+ :param stream_name: The name of the stream.
+
+ :type exclusive_start_tag_key: string
+ :param exclusive_start_tag_key: The key to use as the starting point
+ for the list of tags. If this parameter is set, `ListTagsForStream`
+ gets all tags that occur after `ExclusiveStartTagKey`.
+
+ :type limit: integer
+ :param limit: The number of tags to return. If this number is less than
+ the total number of tags associated with the stream, `HasMoreTags`
+ is set to `True`. To list additional tags, set
+ `ExclusiveStartTagKey` to the last key in the response.
+
+ """
+ params = {'StreamName': stream_name, }
+ if exclusive_start_tag_key is not None:
+ params['ExclusiveStartTagKey'] = exclusive_start_tag_key
+ if limit is not None:
+ params['Limit'] = limit
+ return self.make_request(action='ListTagsForStream',
+ body=json.dumps(params))
+
+ def merge_shards(self, stream_name, shard_to_merge,
+ adjacent_shard_to_merge):
+ """
+ Merges two adjacent shards in a stream and combines them into
+ a single shard to reduce the stream's capacity to ingest and
+ transport data. Two shards are considered adjacent if the
+ union of the hash key ranges for the two shards form a
+ contiguous set with no gaps. For example, if you have two
+ shards, one with a hash key range of 276...381 and the other
+ with a hash key range of 382...454, then you could merge these
+ two shards into a single shard that would have a hash key
+ range of 276...454. After the merge, the single child shard
+ receives data for all hash key values covered by the two
+ parent shards.
+
+ `MergeShards` is called when there is a need to reduce the
+ overall capacity of a stream because of excess capacity that
+ is not being used. You must specify the shard to be merged and
+ the adjacent shard for a stream. For more information about
+ merging shards, see `Merge Two Shards`_ in the Amazon Kinesis
+ Developer Guide .
+
+ If the stream is in the `ACTIVE` state, you can call
+ `MergeShards`. If a stream is in the `CREATING`, `UPDATING`,
+ or `DELETING` state, `MergeShards` returns a
+ `ResourceInUseException`. If the specified stream does not
+ exist, `MergeShards` returns a `ResourceNotFoundException`.
+
+ You can use DescribeStream to check the state of the stream,
+ which is returned in `StreamStatus`.
+
+ `MergeShards` is an asynchronous operation. Upon receiving a
+ `MergeShards` request, Amazon Kinesis immediately returns a
+ response and sets the `StreamStatus` to `UPDATING`. After the
+ operation is completed, Amazon Kinesis sets the `StreamStatus`
+ to `ACTIVE`. Read and write operations continue to work while
+ the stream is in the `UPDATING` state.
+
+ You use DescribeStream to determine the shard IDs that are
+ specified in the `MergeShards` request.
+
+ If you try to operate on too many streams in parallel using
+ CreateStream, DeleteStream, `MergeShards` or SplitShard, you
+ will receive a `LimitExceededException`.
+
+ `MergeShards` has limit of 5 transactions per second per
+ account.
+
+ :type stream_name: string
+ :param stream_name: The name of the stream for the merge.
+
+ :type shard_to_merge: string
+ :param shard_to_merge: The shard ID of the shard to combine with the
+ adjacent shard for the merge.
+
+ :type adjacent_shard_to_merge: string
+ :param adjacent_shard_to_merge: The shard ID of the adjacent shard for
+ the merge.
+
+ """
+ params = {
+ 'StreamName': stream_name,
+ 'ShardToMerge': shard_to_merge,
+ 'AdjacentShardToMerge': adjacent_shard_to_merge,
+ }
+ return self.make_request(action='MergeShards',
+ body=json.dumps(params))
+
+ def put_record(self, stream_name, data, partition_key,
+ explicit_hash_key=None,
+ sequence_number_for_ordering=None,
+ exclusive_minimum_sequence_number=None,
+ b64_encode=True):
+ """
+ This operation puts a data record into an Amazon Kinesis
+ stream from a producer. This operation must be called to send
+ data from the producer into the Amazon Kinesis stream for
+ real-time ingestion and subsequent processing. The `PutRecord`
+ operation requires the name of the stream that captures,
+ stores, and transports the data; a partition key; and the data
+ blob itself. The data blob could be a segment from a log file,
+ geographic/location data, website clickstream data, or any
+ other data type.
+
+ The partition key is used to distribute data across shards.
+ Amazon Kinesis segregates the data records that belong to a
+ data stream into multiple shards, using the partition key
+ associated with each data record to determine which shard a
+ given data record belongs to.
+
+ Partition keys are Unicode strings, with a maximum length
+ limit of 256 bytes. An MD5 hash function is used to map
+ partition keys to 128-bit integer values and to map associated
+ data records to shards using the hash key ranges of the
+ shards. You can override hashing the partition key to
+ determine the shard by explicitly specifying a hash value
+ using the `ExplicitHashKey` parameter. For more information,
+ see the `Amazon Kinesis Developer Guide`_.
+
+ `PutRecord` returns the shard ID of where the data record was
+ placed and the sequence number that was assigned to the data
+ record.
+
+ Sequence numbers generally increase over time. To guarantee
+ strictly increasing ordering, use the
+ `SequenceNumberForOrdering` parameter. For more information,
+ see the `Amazon Kinesis Developer Guide`_.
+
+ If a `PutRecord` request cannot be processed because of
+ insufficient provisioned throughput on the shard involved in
+ the request, `PutRecord` throws
+ `ProvisionedThroughputExceededException`.
+
+ Data records are accessible for only 24 hours from the time
+ that they are added to an Amazon Kinesis stream.
+
+ :type stream_name: string
+ :param stream_name: The name of the stream to put the data record into.
+
+ :type data: blob
+ :param data: The data blob to put into the record, which is
+ Base64-encoded when the blob is serialized.
+ The maximum size of the data blob (the payload after
+ Base64-decoding) is 50 kilobytes (KB)
+ Set `b64_encode` to disable automatic Base64 encoding.
+
+ :type partition_key: string
+ :param partition_key: Determines which shard in the stream the data
+ record is assigned to. Partition keys are Unicode strings with a
+ maximum length limit of 256 bytes. Amazon Kinesis uses the
+ partition key as input to a hash function that maps the partition
+ key and associated data to a specific shard. Specifically, an MD5
+ hash function is used to map partition keys to 128-bit integer
+ values and to map associated data records to shards. As a result of
+ this hashing mechanism, all data records with the same partition
+ key will map to the same shard within the stream.
+
+ :type explicit_hash_key: string
+ :param explicit_hash_key: The hash value used to explicitly determine
+ the shard the data record is assigned to by overriding the
+ partition key hash.
+
+ :type sequence_number_for_ordering: string
+ :param sequence_number_for_ordering: Guarantees strictly increasing
+ sequence numbers, for puts from the same client and to the same
+ partition key. Usage: set the `SequenceNumberForOrdering` of record
+ n to the sequence number of record n-1 (as returned in the
+ PutRecordResult when putting record n-1 ). If this parameter is not
+ set, records will be coarsely ordered based on arrival time.
+
+ :type b64_encode: boolean
+ :param b64_encode: Whether to Base64 encode `data`. Can be set to
+ ``False`` if `data` is already encoded to prevent double encoding.
+
+ """
+ params = {
+ 'StreamName': stream_name,
+ 'Data': data,
+ 'PartitionKey': partition_key,
+ }
+ if explicit_hash_key is not None:
+ params['ExplicitHashKey'] = explicit_hash_key
+ if sequence_number_for_ordering is not None:
+ params['SequenceNumberForOrdering'] = sequence_number_for_ordering
+ if b64_encode:
+ if not isinstance(params['Data'], six.binary_type):
+ params['Data'] = params['Data'].encode('utf-8')
+ params['Data'] = base64.b64encode(params['Data']).decode('utf-8')
+ return self.make_request(action='PutRecord',
+ body=json.dumps(params))
+
+ def put_records(self, records, stream_name, b64_encode=True):
+ """
+ Puts (writes) multiple data records from a producer into an
+ Amazon Kinesis stream in a single call (also referred to as a
+ `PutRecords` request). Use this operation to send data from a
+ data producer into the Amazon Kinesis stream for real-time
+ ingestion and processing. Each shard can support up to 1000
+ records written per second, up to a maximum total of 1 MB data
+ written per second.
+
+ You must specify the name of the stream that captures, stores,
+ and transports the data; and an array of request `Records`,
+ with each record in the array requiring a partition key and
+ data blob.
+
+ The data blob can be any type of data; for example, a segment
+ from a log file, geographic/location data, website clickstream
+ data, and so on.
+
+ The partition key is used by Amazon Kinesis as input to a hash
+ function that maps the partition key and associated data to a
+ specific shard. An MD5 hash function is used to map partition
+ keys to 128-bit integer values and to map associated data
+ records to shards. As a result of this hashing mechanism, all
+ data records with the same partition key map to the same shard
+ within the stream. For more information, see `Partition Key`_
+ in the Amazon Kinesis Developer Guide .
+
+ Each record in the `Records` array may include an optional
+ parameter, `ExplicitHashKey`, which overrides the partition
+ key to shard mapping. This parameter allows a data producer to
+ determine explicitly the shard where the record is stored. For
+ more information, see `Adding Multiple Records with
+ PutRecords`_ in the Amazon Kinesis Developer Guide .
+
+ The `PutRecords` response includes an array of response
+ `Records`. Each record in the response array directly
+ correlates with a record in the request array using natural
+ ordering, from the top to the bottom of the request and
+ response. The response `Records` array always includes the
+ same number of records as the request array.
+
+ The response `Records` array includes both successfully and
+ unsuccessfully processed records. Amazon Kinesis attempts to
+ process all records in each `PutRecords` request. A single
+ record failure does not stop the processing of subsequent
+ records.
+
+ A successfully-processed record includes `ShardId` and
+ `SequenceNumber` values. The `ShardId` parameter identifies
+ the shard in the stream where the record is stored. The
+ `SequenceNumber` parameter is an identifier assigned to the
+ put record, unique to all records in the stream.
+
+ An unsuccessfully-processed record includes `ErrorCode` and
+ `ErrorMessage` values. `ErrorCode` reflects the type of error
+ and can be one of the following values:
+ `ProvisionedThroughputExceededException` or `InternalFailure`.
+ `ErrorMessage` provides more detailed information about the
+ `ProvisionedThroughputExceededException` exception including
+ the account ID, stream name, and shard ID of the record that
+ was throttled.
+
+ Data records are accessible for only 24 hours from the time
+ that they are added to an Amazon Kinesis stream.
+
+ :type records: list
+ :param records: The records associated with the request.
+
+ :type stream_name: string
+ :param stream_name: The stream name associated with the request.
+
+ :type b64_encode: boolean
+ :param b64_encode: Whether to Base64 encode `data`. Can be set to
+ ``False`` if `data` is already encoded to prevent double encoding.
+
+ """
+ params = {'Records': records, 'StreamName': stream_name, }
+ if b64_encode:
+ for i in range(len(params['Records'])):
+ data = params['Records'][i]['Data']
+ if not isinstance(data, six.binary_type):
+ data = data.encode('utf-8')
+ params['Records'][i]['Data'] = base64.b64encode(
+ data).decode('utf-8')
+ return self.make_request(action='PutRecords',
+ body=json.dumps(params))
+
+ def remove_tags_from_stream(self, stream_name, tag_keys):
+ """
+ Deletes tags from the specified Amazon Kinesis stream.
+
+ If you specify a tag that does not exist, it is ignored.
+
+ :type stream_name: string
+ :param stream_name: The name of the stream.
+
+ :type tag_keys: list
+ :param tag_keys: A list of tag keys. Each corresponding tag is removed
+ from the stream.
+
+ """
+ params = {'StreamName': stream_name, 'TagKeys': tag_keys, }
+ return self.make_request(action='RemoveTagsFromStream',
+ body=json.dumps(params))
+
+ def split_shard(self, stream_name, shard_to_split, new_starting_hash_key):
+ """
+ Splits a shard into two new shards in the stream, to increase
+ the stream's capacity to ingest and transport data.
+ `SplitShard` is called when there is a need to increase the
+ overall capacity of stream because of an expected increase in
+ the volume of data records being ingested.
+
+ You can also use `SplitShard` when a shard appears to be
+ approaching its maximum utilization, for example, when the set
+ of producers sending data into the specific shard are suddenly
+ sending more than previously anticipated. You can also call
+ `SplitShard` to increase stream capacity, so that more Amazon
+ Kinesis applications can simultaneously read data from the
+ stream for real-time processing.
+
+ You must specify the shard to be split and the new hash key,
+ which is the position in the shard where the shard gets split
+ in two. In many cases, the new hash key might simply be the
+ average of the beginning and ending hash key, but it can be
+ any hash key value in the range being mapped into the shard.
+ For more information about splitting shards, see `Split a
+ Shard`_ in the Amazon Kinesis Developer Guide .
+
+ You can use DescribeStream to determine the shard ID and hash
+ key values for the `ShardToSplit` and `NewStartingHashKey`
+ parameters that are specified in the `SplitShard` request.
+
+ `SplitShard` is an asynchronous operation. Upon receiving a
+ `SplitShard` request, Amazon Kinesis immediately returns a
+ response and sets the stream status to `UPDATING`. After the
+ operation is completed, Amazon Kinesis sets the stream status
+ to `ACTIVE`. Read and write operations continue to work while
+ the stream is in the `UPDATING` state.
+
+ You can use `DescribeStream` to check the status of the
+ stream, which is returned in `StreamStatus`. If the stream is
+ in the `ACTIVE` state, you can call `SplitShard`. If a stream
+ is in `CREATING` or `UPDATING` or `DELETING` states,
+ `DescribeStream` returns a `ResourceInUseException`.
+
+ If the specified stream does not exist, `DescribeStream`
+ returns a `ResourceNotFoundException`. If you try to create
+ more shards than are authorized for your account, you receive
+ a `LimitExceededException`.
+
+ The default limit for an AWS account is 10 shards per stream.
+ If you need to create a stream with more than 10 shards,
+ `contact AWS Support`_ to increase the limit on your account.
+
+ If you try to operate on too many streams in parallel using
+ CreateStream, DeleteStream, MergeShards or SplitShard, you
+ receive a `LimitExceededException`.
+
+ `SplitShard` has limit of 5 transactions per second per
+ account.
+
+ :type stream_name: string
+ :param stream_name: The name of the stream for the shard split.
+
+ :type shard_to_split: string
+ :param shard_to_split: The shard ID of the shard to split.
+
+ :type new_starting_hash_key: string
+ :param new_starting_hash_key: A hash key value for the starting hash
+ key of one of the child shards created by the split. The hash key
+ range for a given shard constitutes a set of ordered contiguous
+ positive integers. The value for `NewStartingHashKey` must be in
+ the range of hash keys being mapped into the shard. The
+ `NewStartingHashKey` hash key value and all higher hash key values
+ in hash key range are distributed to one of the child shards. All
+ the lower hash key values in the range are distributed to the other
+ child shard.
+
+ """
+ params = {
+ 'StreamName': stream_name,
+ 'ShardToSplit': shard_to_split,
+ 'NewStartingHashKey': new_starting_hash_key,
+ }
+ return self.make_request(action='SplitShard',
+ body=json.dumps(params))
+
+ def make_request(self, action, body):
+ headers = {
+ 'X-Amz-Target': '%s.%s' % (self.TargetPrefix, action),
+ 'Host': self.region.endpoint,
+ 'Content-Type': 'application/x-amz-json-1.1',
+ 'Content-Length': str(len(body)),
+ }
+ http_request = self.build_base_http_request(
+ method='POST', path='/', auth_path='/', params={},
+ headers=headers, data=body)
+ response = self._mexe(http_request, sender=None,
+ override_num_retries=10)
+ response_body = response.read().decode('utf-8')
+ boto.log.debug(response.getheaders())
+ boto.log.debug(response_body)
+ if response.status == 200:
+ if response_body:
+ return json.loads(response_body)
+ else:
+ json_body = json.loads(response_body)
+ fault_name = json_body.get('__type', None)
+ exception_class = self._faults.get(fault_name, self.ResponseError)
+ raise exception_class(response.status, response.reason,
+ body=json_body)
+

Powered by Google App Engine
This is Rietveld 408576698