OLD | NEW |
(Empty) | |
| 1 # Copyright (c) 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved |
| 2 # |
| 3 # Permission is hereby granted, free of charge, to any person obtaining a |
| 4 # copy of this software and associated documentation files (the |
| 5 # "Software"), to deal in the Software without restriction, including |
| 6 # without limitation the rights to use, copy, modify, merge, publish, dis- |
| 7 # tribute, sublicense, and/or sell copies of the Software, and to permit |
| 8 # persons to whom the Software is furnished to do so, subject to the fol- |
| 9 # lowing conditions: |
| 10 # |
| 11 # The above copyright notice and this permission notice shall be included |
| 12 # in all copies or substantial portions of the Software. |
| 13 # |
| 14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS |
| 15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- |
| 16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT |
| 17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, |
| 18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| 19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
| 20 # IN THE SOFTWARE. |
| 21 # |
| 22 |
| 23 import base64 |
| 24 import boto |
| 25 |
| 26 from boto.connection import AWSQueryConnection |
| 27 from boto.regioninfo import RegionInfo |
| 28 from boto.exception import JSONResponseError |
| 29 from boto.kinesis import exceptions |
| 30 from boto.compat import json |
| 31 from boto.compat import six |
| 32 |
| 33 |
| 34 class KinesisConnection(AWSQueryConnection): |
| 35 """ |
| 36 Amazon Kinesis Service API Reference |
| 37 Amazon Kinesis is a managed service that scales elastically for |
| 38 real time processing of streaming big data. |
| 39 """ |
| 40 APIVersion = "2013-12-02" |
| 41 DefaultRegionName = "us-east-1" |
| 42 DefaultRegionEndpoint = "kinesis.us-east-1.amazonaws.com" |
| 43 ServiceName = "Kinesis" |
| 44 TargetPrefix = "Kinesis_20131202" |
| 45 ResponseError = JSONResponseError |
| 46 |
| 47 _faults = { |
| 48 "ProvisionedThroughputExceededException": exceptions.ProvisionedThroughp
utExceededException, |
| 49 "LimitExceededException": exceptions.LimitExceededException, |
| 50 "ExpiredIteratorException": exceptions.ExpiredIteratorException, |
| 51 "ResourceInUseException": exceptions.ResourceInUseException, |
| 52 "ResourceNotFoundException": exceptions.ResourceNotFoundException, |
| 53 "InvalidArgumentException": exceptions.InvalidArgumentException, |
| 54 "SubscriptionRequiredException": exceptions.SubscriptionRequiredExceptio
n |
| 55 } |
| 56 |
| 57 |
| 58 def __init__(self, **kwargs): |
| 59 region = kwargs.pop('region', None) |
| 60 if not region: |
| 61 region = RegionInfo(self, self.DefaultRegionName, |
| 62 self.DefaultRegionEndpoint) |
| 63 if 'host' not in kwargs: |
| 64 kwargs['host'] = region.endpoint |
| 65 super(KinesisConnection, self).__init__(**kwargs) |
| 66 self.region = region |
| 67 |
| 68 def _required_auth_capability(self): |
| 69 return ['hmac-v4'] |
| 70 |
| 71 def add_tags_to_stream(self, stream_name, tags): |
| 72 """ |
| 73 Adds or updates tags for the specified Amazon Kinesis stream. |
| 74 Each stream can have up to 10 tags. |
| 75 |
| 76 If tags have already been assigned to the stream, |
| 77 `AddTagsToStream` overwrites any existing tags that correspond |
| 78 to the specified tag keys. |
| 79 |
| 80 :type stream_name: string |
| 81 :param stream_name: The name of the stream. |
| 82 |
| 83 :type tags: map |
| 84 :param tags: The set of key-value pairs to use to create the tags. |
| 85 |
| 86 """ |
| 87 params = {'StreamName': stream_name, 'Tags': tags, } |
| 88 return self.make_request(action='AddTagsToStream', |
| 89 body=json.dumps(params)) |
| 90 |
| 91 def create_stream(self, stream_name, shard_count): |
| 92 """ |
| 93 Creates a Amazon Kinesis stream. A stream captures and |
| 94 transports data records that are continuously emitted from |
| 95 different data sources or producers . Scale-out within an |
| 96 Amazon Kinesis stream is explicitly supported by means of |
| 97 shards, which are uniquely identified groups of data records |
| 98 in an Amazon Kinesis stream. |
| 99 |
| 100 You specify and control the number of shards that a stream is |
| 101 composed of. Each open shard can support up to 5 read |
| 102 transactions per second, up to a maximum total of 2 MB of data |
| 103 read per second. Each shard can support up to 1000 records |
| 104 written per second, up to a maximum total of 1 MB data written |
| 105 per second. You can add shards to a stream if the amount of |
| 106 data input increases and you can remove shards if the amount |
| 107 of data input decreases. |
| 108 |
| 109 The stream name identifies the stream. The name is scoped to |
| 110 the AWS account used by the application. It is also scoped by |
| 111 region. That is, two streams in two different accounts can |
| 112 have the same name, and two streams in the same account, but |
| 113 in two different regions, can have the same name. |
| 114 |
| 115 `CreateStream` is an asynchronous operation. Upon receiving a |
| 116 `CreateStream` request, Amazon Kinesis immediately returns and |
| 117 sets the stream status to `CREATING`. After the stream is |
| 118 created, Amazon Kinesis sets the stream status to `ACTIVE`. |
| 119 You should perform read and write operations only on an |
| 120 `ACTIVE` stream. |
| 121 |
| 122 You receive a `LimitExceededException` when making a |
| 123 `CreateStream` request if you try to do one of the following: |
| 124 |
| 125 |
| 126 + Have more than five streams in the `CREATING` state at any |
| 127 point in time. |
| 128 + Create more shards than are authorized for your account. |
| 129 |
| 130 |
| 131 The default limit for an AWS account is 10 shards per stream. |
| 132 If you need to create a stream with more than 10 shards, |
| 133 `contact AWS Support`_ to increase the limit on your account. |
| 134 |
| 135 You can use `DescribeStream` to check the stream status, which |
| 136 is returned in `StreamStatus`. |
| 137 |
| 138 `CreateStream` has a limit of 5 transactions per second per |
| 139 account. |
| 140 |
| 141 :type stream_name: string |
| 142 :param stream_name: A name to identify the stream. The stream name is |
| 143 scoped to the AWS account used by the application that creates the |
| 144 stream. It is also scoped by region. That is, two streams in two |
| 145 different AWS accounts can have the same name, and two streams in |
| 146 the same AWS account, but in two different regions, can have the |
| 147 same name. |
| 148 |
| 149 :type shard_count: integer |
| 150 :param shard_count: The number of shards that the stream will use. The |
| 151 throughput of the stream is a function of the number of shards; |
| 152 more shards are required for greater provisioned throughput. |
| 153 **Note:** The default limit for an AWS account is 10 shards per stream. |
| 154 If you need to create a stream with more than 10 shards, `contact |
| 155 AWS Support`_ to increase the limit on your account. |
| 156 |
| 157 """ |
| 158 params = { |
| 159 'StreamName': stream_name, |
| 160 'ShardCount': shard_count, |
| 161 } |
| 162 return self.make_request(action='CreateStream', |
| 163 body=json.dumps(params)) |
| 164 |
| 165 def delete_stream(self, stream_name): |
| 166 """ |
| 167 Deletes a stream and all its shards and data. You must shut |
| 168 down any applications that are operating on the stream before |
| 169 you delete the stream. If an application attempts to operate |
| 170 on a deleted stream, it will receive the exception |
| 171 `ResourceNotFoundException`. |
| 172 |
| 173 If the stream is in the `ACTIVE` state, you can delete it. |
| 174 After a `DeleteStream` request, the specified stream is in the |
| 175 `DELETING` state until Amazon Kinesis completes the deletion. |
| 176 |
| 177 **Note:** Amazon Kinesis might continue to accept data read |
| 178 and write operations, such as PutRecord, PutRecords, and |
| 179 GetRecords, on a stream in the `DELETING` state until the |
| 180 stream deletion is complete. |
| 181 |
| 182 When you delete a stream, any shards in that stream are also |
| 183 deleted, and any tags are dissociated from the stream. |
| 184 |
| 185 You can use the DescribeStream operation to check the state of |
| 186 the stream, which is returned in `StreamStatus`. |
| 187 |
| 188 `DeleteStream` has a limit of 5 transactions per second per |
| 189 account. |
| 190 |
| 191 :type stream_name: string |
| 192 :param stream_name: The name of the stream to delete. |
| 193 |
| 194 """ |
| 195 params = {'StreamName': stream_name, } |
| 196 return self.make_request(action='DeleteStream', |
| 197 body=json.dumps(params)) |
| 198 |
| 199 def describe_stream(self, stream_name, limit=None, |
| 200 exclusive_start_shard_id=None): |
| 201 """ |
| 202 Describes the specified stream. |
| 203 |
| 204 The information about the stream includes its current status, |
| 205 its Amazon Resource Name (ARN), and an array of shard objects. |
| 206 For each shard object, there is information about the hash key |
| 207 and sequence number ranges that the shard spans, and the IDs |
| 208 of any earlier shards that played in a role in creating the |
| 209 shard. A sequence number is the identifier associated with |
| 210 every record ingested in the Amazon Kinesis stream. The |
| 211 sequence number is assigned when a record is put into the |
| 212 stream. |
| 213 |
| 214 You can limit the number of returned shards using the `Limit` |
| 215 parameter. The number of shards in a stream may be too large |
| 216 to return from a single call to `DescribeStream`. You can |
| 217 detect this by using the `HasMoreShards` flag in the returned |
| 218 output. `HasMoreShards` is set to `True` when there is more |
| 219 data available. |
| 220 |
| 221 `DescribeStream` is a paginated operation. If there are more |
| 222 shards available, you can request them using the shard ID of |
| 223 the last shard returned. Specify this ID in the |
| 224 `ExclusiveStartShardId` parameter in a subsequent request to |
| 225 `DescribeStream`. |
| 226 |
| 227 `DescribeStream` has a limit of 10 transactions per second per |
| 228 account. |
| 229 |
| 230 :type stream_name: string |
| 231 :param stream_name: The name of the stream to describe. |
| 232 |
| 233 :type limit: integer |
| 234 :param limit: The maximum number of shards to return. |
| 235 |
| 236 :type exclusive_start_shard_id: string |
| 237 :param exclusive_start_shard_id: The shard ID of the shard to start |
| 238 with. |
| 239 |
| 240 """ |
| 241 params = {'StreamName': stream_name, } |
| 242 if limit is not None: |
| 243 params['Limit'] = limit |
| 244 if exclusive_start_shard_id is not None: |
| 245 params['ExclusiveStartShardId'] = exclusive_start_shard_id |
| 246 return self.make_request(action='DescribeStream', |
| 247 body=json.dumps(params)) |
| 248 |
| 249 def get_records(self, shard_iterator, limit=None, b64_decode=True): |
| 250 """ |
| 251 Gets data records from a shard. |
| 252 |
| 253 Specify a shard iterator using the `ShardIterator` parameter. |
| 254 The shard iterator specifies the position in the shard from |
| 255 which you want to start reading data records sequentially. If |
| 256 there are no records available in the portion of the shard |
| 257 that the iterator points to, `GetRecords` returns an empty |
| 258 list. Note that it might take multiple calls to get to a |
| 259 portion of the shard that contains records. |
| 260 |
| 261 You can scale by provisioning multiple shards. Your |
| 262 application should have one thread per shard, each reading |
| 263 continuously from its stream. To read from a stream |
| 264 continually, call `GetRecords` in a loop. Use GetShardIterator |
| 265 to get the shard iterator to specify in the first `GetRecords` |
| 266 call. `GetRecords` returns a new shard iterator in |
| 267 `NextShardIterator`. Specify the shard iterator returned in |
| 268 `NextShardIterator` in subsequent calls to `GetRecords`. Note |
| 269 that if the shard has been closed, the shard iterator can't |
| 270 return more data and `GetRecords` returns `null` in |
| 271 `NextShardIterator`. You can terminate the loop when the shard |
| 272 is closed, or when the shard iterator reaches the record with |
| 273 the sequence number or other attribute that marks it as the |
| 274 last record to process. |
| 275 |
| 276 Each data record can be up to 50 KB in size, and each shard |
| 277 can read up to 2 MB per second. You can ensure that your calls |
| 278 don't exceed the maximum supported size or throughput by using |
| 279 the `Limit` parameter to specify the maximum number of records |
| 280 that `GetRecords` can return. Consider your average record |
| 281 size when determining this limit. For example, if your average |
| 282 record size is 40 KB, you can limit the data returned to about |
| 283 1 MB per call by specifying 25 as the limit. |
| 284 |
| 285 The size of the data returned by `GetRecords` will vary |
| 286 depending on the utilization of the shard. The maximum size of |
| 287 data that `GetRecords` can return is 10 MB. If a call returns |
| 288 10 MB of data, subsequent calls made within the next 5 seconds |
| 289 throw `ProvisionedThroughputExceededException`. If there is |
| 290 insufficient provisioned throughput on the shard, subsequent |
| 291 calls made within the next 1 second throw |
| 292 `ProvisionedThroughputExceededException`. Note that |
| 293 `GetRecords` won't return any data when it throws an |
| 294 exception. For this reason, we recommend that you wait one |
| 295 second between calls to `GetRecords`; however, it's possible |
| 296 that the application will get exceptions for longer than 1 |
| 297 second. |
| 298 |
| 299 To detect whether the application is falling behind in |
| 300 processing, add a timestamp to your records and note how long |
| 301 it takes to process them. You can also monitor how much data |
| 302 is in a stream using the CloudWatch metrics for write |
| 303 operations ( `PutRecord` and `PutRecords`). For more |
| 304 information, see `Monitoring Amazon Kinesis with Amazon |
| 305 CloudWatch`_ in the Amazon Kinesis Developer Guide . |
| 306 |
| 307 :type shard_iterator: string |
| 308 :param shard_iterator: The position in the shard from which you want to |
| 309 start sequentially reading data records. A shard iterator specifies |
| 310 this position using the sequence number of a data record in the |
| 311 shard. |
| 312 |
| 313 :type limit: integer |
| 314 :param limit: The maximum number of records to return. Specify a value |
| 315 of up to 10,000. If you specify a value that is greater than |
| 316 10,000, `GetRecords` throws `InvalidArgumentException`. |
| 317 |
| 318 :type b64_decode: boolean |
| 319 :param b64_decode: Decode the Base64-encoded ``Data`` field of records. |
| 320 |
| 321 """ |
| 322 params = {'ShardIterator': shard_iterator, } |
| 323 if limit is not None: |
| 324 params['Limit'] = limit |
| 325 |
| 326 response = self.make_request(action='GetRecords', |
| 327 body=json.dumps(params)) |
| 328 |
| 329 # Base64 decode the data |
| 330 if b64_decode: |
| 331 for record in response.get('Records', []): |
| 332 record['Data'] = base64.b64decode( |
| 333 record['Data'].encode('utf-8')).decode('utf-8') |
| 334 |
| 335 return response |
| 336 |
| 337 def get_shard_iterator(self, stream_name, shard_id, shard_iterator_type, |
| 338 starting_sequence_number=None): |
| 339 """ |
| 340 Gets a shard iterator. A shard iterator expires five minutes |
| 341 after it is returned to the requester. |
| 342 |
| 343 A shard iterator specifies the position in the shard from |
| 344 which to start reading data records sequentially. A shard |
| 345 iterator specifies this position using the sequence number of |
| 346 a data record in a shard. A sequence number is the identifier |
| 347 associated with every record ingested in the Amazon Kinesis |
| 348 stream. The sequence number is assigned when a record is put |
| 349 into the stream. |
| 350 |
| 351 You must specify the shard iterator type. For example, you can |
| 352 set the `ShardIteratorType` parameter to read exactly from the |
| 353 position denoted by a specific sequence number by using the |
| 354 `AT_SEQUENCE_NUMBER` shard iterator type, or right after the |
| 355 sequence number by using the `AFTER_SEQUENCE_NUMBER` shard |
| 356 iterator type, using sequence numbers returned by earlier |
| 357 calls to PutRecord, PutRecords, GetRecords, or DescribeStream. |
| 358 You can specify the shard iterator type `TRIM_HORIZON` in the |
| 359 request to cause `ShardIterator` to point to the last |
| 360 untrimmed record in the shard in the system, which is the |
| 361 oldest data record in the shard. Or you can point to just |
| 362 after the most recent record in the shard, by using the shard |
| 363 iterator type `LATEST`, so that you always read the most |
| 364 recent data in the shard. |
| 365 |
| 366 When you repeatedly read from an Amazon Kinesis stream use a |
| 367 GetShardIterator request to get the first shard iterator to to |
| 368 use in your first `GetRecords` request and then use the shard |
| 369 iterator returned by the `GetRecords` request in |
| 370 `NextShardIterator` for subsequent reads. A new shard iterator |
| 371 is returned by every `GetRecords` request in |
| 372 `NextShardIterator`, which you use in the `ShardIterator` |
| 373 parameter of the next `GetRecords` request. |
| 374 |
| 375 If a `GetShardIterator` request is made too often, you receive |
| 376 a `ProvisionedThroughputExceededException`. For more |
| 377 information about throughput limits, see GetRecords. |
| 378 |
| 379 If the shard is closed, the iterator can't return more data, |
| 380 and `GetShardIterator` returns `null` for its `ShardIterator`. |
| 381 A shard can be closed using SplitShard or MergeShards. |
| 382 |
| 383 `GetShardIterator` has a limit of 5 transactions per second |
| 384 per account per open shard. |
| 385 |
| 386 :type stream_name: string |
| 387 :param stream_name: The name of the stream. |
| 388 |
| 389 :type shard_id: string |
| 390 :param shard_id: The shard ID of the shard to get the iterator for. |
| 391 |
| 392 :type shard_iterator_type: string |
| 393 :param shard_iterator_type: |
| 394 Determines how the shard iterator is used to start reading data records |
| 395 from the shard. |
| 396 |
| 397 The following are the valid shard iterator types: |
| 398 |
| 399 |
| 400 + AT_SEQUENCE_NUMBER - Start reading exactly from the position denoted |
| 401 by a specific sequence number. |
| 402 + AFTER_SEQUENCE_NUMBER - Start reading right after the position |
| 403 denoted by a specific sequence number. |
| 404 + TRIM_HORIZON - Start reading at the last untrimmed record in the |
| 405 shard in the system, which is the oldest data record in the shard. |
| 406 + LATEST - Start reading just after the most recent record in the |
| 407 shard, so that you always read the most recent data in the shard. |
| 408 |
| 409 :type starting_sequence_number: string |
| 410 :param starting_sequence_number: The sequence number of the data record |
| 411 in the shard from which to start reading from. |
| 412 |
| 413 """ |
| 414 params = { |
| 415 'StreamName': stream_name, |
| 416 'ShardId': shard_id, |
| 417 'ShardIteratorType': shard_iterator_type, |
| 418 } |
| 419 if starting_sequence_number is not None: |
| 420 params['StartingSequenceNumber'] = starting_sequence_number |
| 421 return self.make_request(action='GetShardIterator', |
| 422 body=json.dumps(params)) |
| 423 |
| 424 def list_streams(self, limit=None, exclusive_start_stream_name=None): |
| 425 """ |
| 426 Lists your streams. |
| 427 |
| 428 The number of streams may be too large to return from a single |
| 429 call to `ListStreams`. You can limit the number of returned |
| 430 streams using the `Limit` parameter. If you do not specify a |
| 431 value for the `Limit` parameter, Amazon Kinesis uses the |
| 432 default limit, which is currently 10. |
| 433 |
| 434 You can detect if there are more streams available to list by |
| 435 using the `HasMoreStreams` flag from the returned output. If |
| 436 there are more streams available, you can request more streams |
| 437 by using the name of the last stream returned by the |
| 438 `ListStreams` request in the `ExclusiveStartStreamName` |
| 439 parameter in a subsequent request to `ListStreams`. The group |
| 440 of stream names returned by the subsequent request is then |
| 441 added to the list. You can continue this process until all the |
| 442 stream names have been collected in the list. |
| 443 |
| 444 `ListStreams` has a limit of 5 transactions per second per |
| 445 account. |
| 446 |
| 447 :type limit: integer |
| 448 :param limit: The maximum number of streams to list. |
| 449 |
| 450 :type exclusive_start_stream_name: string |
| 451 :param exclusive_start_stream_name: The name of the stream to start the |
| 452 list with. |
| 453 |
| 454 """ |
| 455 params = {} |
| 456 if limit is not None: |
| 457 params['Limit'] = limit |
| 458 if exclusive_start_stream_name is not None: |
| 459 params['ExclusiveStartStreamName'] = exclusive_start_stream_name |
| 460 return self.make_request(action='ListStreams', |
| 461 body=json.dumps(params)) |
| 462 |
| 463 def list_tags_for_stream(self, stream_name, exclusive_start_tag_key=None, |
| 464 limit=None): |
| 465 """ |
| 466 Lists the tags for the specified Amazon Kinesis stream. |
| 467 |
| 468 :type stream_name: string |
| 469 :param stream_name: The name of the stream. |
| 470 |
| 471 :type exclusive_start_tag_key: string |
| 472 :param exclusive_start_tag_key: The key to use as the starting point |
| 473 for the list of tags. If this parameter is set, `ListTagsForStream` |
| 474 gets all tags that occur after `ExclusiveStartTagKey`. |
| 475 |
| 476 :type limit: integer |
| 477 :param limit: The number of tags to return. If this number is less than |
| 478 the total number of tags associated with the stream, `HasMoreTags` |
| 479 is set to `True`. To list additional tags, set |
| 480 `ExclusiveStartTagKey` to the last key in the response. |
| 481 |
| 482 """ |
| 483 params = {'StreamName': stream_name, } |
| 484 if exclusive_start_tag_key is not None: |
| 485 params['ExclusiveStartTagKey'] = exclusive_start_tag_key |
| 486 if limit is not None: |
| 487 params['Limit'] = limit |
| 488 return self.make_request(action='ListTagsForStream', |
| 489 body=json.dumps(params)) |
| 490 |
| 491 def merge_shards(self, stream_name, shard_to_merge, |
| 492 adjacent_shard_to_merge): |
| 493 """ |
| 494 Merges two adjacent shards in a stream and combines them into |
| 495 a single shard to reduce the stream's capacity to ingest and |
| 496 transport data. Two shards are considered adjacent if the |
| 497 union of the hash key ranges for the two shards form a |
| 498 contiguous set with no gaps. For example, if you have two |
| 499 shards, one with a hash key range of 276...381 and the other |
| 500 with a hash key range of 382...454, then you could merge these |
| 501 two shards into a single shard that would have a hash key |
| 502 range of 276...454. After the merge, the single child shard |
| 503 receives data for all hash key values covered by the two |
| 504 parent shards. |
| 505 |
| 506 `MergeShards` is called when there is a need to reduce the |
| 507 overall capacity of a stream because of excess capacity that |
| 508 is not being used. You must specify the shard to be merged and |
| 509 the adjacent shard for a stream. For more information about |
| 510 merging shards, see `Merge Two Shards`_ in the Amazon Kinesis |
| 511 Developer Guide . |
| 512 |
| 513 If the stream is in the `ACTIVE` state, you can call |
| 514 `MergeShards`. If a stream is in the `CREATING`, `UPDATING`, |
| 515 or `DELETING` state, `MergeShards` returns a |
| 516 `ResourceInUseException`. If the specified stream does not |
| 517 exist, `MergeShards` returns a `ResourceNotFoundException`. |
| 518 |
| 519 You can use DescribeStream to check the state of the stream, |
| 520 which is returned in `StreamStatus`. |
| 521 |
| 522 `MergeShards` is an asynchronous operation. Upon receiving a |
| 523 `MergeShards` request, Amazon Kinesis immediately returns a |
| 524 response and sets the `StreamStatus` to `UPDATING`. After the |
| 525 operation is completed, Amazon Kinesis sets the `StreamStatus` |
| 526 to `ACTIVE`. Read and write operations continue to work while |
| 527 the stream is in the `UPDATING` state. |
| 528 |
| 529 You use DescribeStream to determine the shard IDs that are |
| 530 specified in the `MergeShards` request. |
| 531 |
| 532 If you try to operate on too many streams in parallel using |
| 533 CreateStream, DeleteStream, `MergeShards` or SplitShard, you |
| 534 will receive a `LimitExceededException`. |
| 535 |
| 536 `MergeShards` has limit of 5 transactions per second per |
| 537 account. |
| 538 |
| 539 :type stream_name: string |
| 540 :param stream_name: The name of the stream for the merge. |
| 541 |
| 542 :type shard_to_merge: string |
| 543 :param shard_to_merge: The shard ID of the shard to combine with the |
| 544 adjacent shard for the merge. |
| 545 |
| 546 :type adjacent_shard_to_merge: string |
| 547 :param adjacent_shard_to_merge: The shard ID of the adjacent shard for |
| 548 the merge. |
| 549 |
| 550 """ |
| 551 params = { |
| 552 'StreamName': stream_name, |
| 553 'ShardToMerge': shard_to_merge, |
| 554 'AdjacentShardToMerge': adjacent_shard_to_merge, |
| 555 } |
| 556 return self.make_request(action='MergeShards', |
| 557 body=json.dumps(params)) |
| 558 |
| 559 def put_record(self, stream_name, data, partition_key, |
| 560 explicit_hash_key=None, |
| 561 sequence_number_for_ordering=None, |
| 562 exclusive_minimum_sequence_number=None, |
| 563 b64_encode=True): |
| 564 """ |
| 565 This operation puts a data record into an Amazon Kinesis |
| 566 stream from a producer. This operation must be called to send |
| 567 data from the producer into the Amazon Kinesis stream for |
| 568 real-time ingestion and subsequent processing. The `PutRecord` |
| 569 operation requires the name of the stream that captures, |
| 570 stores, and transports the data; a partition key; and the data |
| 571 blob itself. The data blob could be a segment from a log file, |
| 572 geographic/location data, website clickstream data, or any |
| 573 other data type. |
| 574 |
| 575 The partition key is used to distribute data across shards. |
| 576 Amazon Kinesis segregates the data records that belong to a |
| 577 data stream into multiple shards, using the partition key |
| 578 associated with each data record to determine which shard a |
| 579 given data record belongs to. |
| 580 |
| 581 Partition keys are Unicode strings, with a maximum length |
| 582 limit of 256 bytes. An MD5 hash function is used to map |
| 583 partition keys to 128-bit integer values and to map associated |
| 584 data records to shards using the hash key ranges of the |
| 585 shards. You can override hashing the partition key to |
| 586 determine the shard by explicitly specifying a hash value |
| 587 using the `ExplicitHashKey` parameter. For more information, |
| 588 see the `Amazon Kinesis Developer Guide`_. |
| 589 |
| 590 `PutRecord` returns the shard ID of where the data record was |
| 591 placed and the sequence number that was assigned to the data |
| 592 record. |
| 593 |
| 594 Sequence numbers generally increase over time. To guarantee |
| 595 strictly increasing ordering, use the |
| 596 `SequenceNumberForOrdering` parameter. For more information, |
| 597 see the `Amazon Kinesis Developer Guide`_. |
| 598 |
| 599 If a `PutRecord` request cannot be processed because of |
| 600 insufficient provisioned throughput on the shard involved in |
| 601 the request, `PutRecord` throws |
| 602 `ProvisionedThroughputExceededException`. |
| 603 |
| 604 Data records are accessible for only 24 hours from the time |
| 605 that they are added to an Amazon Kinesis stream. |
| 606 |
| 607 :type stream_name: string |
| 608 :param stream_name: The name of the stream to put the data record into. |
| 609 |
| 610 :type data: blob |
| 611 :param data: The data blob to put into the record, which is |
| 612 Base64-encoded when the blob is serialized. |
| 613 The maximum size of the data blob (the payload after |
| 614 Base64-decoding) is 50 kilobytes (KB) |
| 615 Set `b64_encode` to disable automatic Base64 encoding. |
| 616 |
| 617 :type partition_key: string |
| 618 :param partition_key: Determines which shard in the stream the data |
| 619 record is assigned to. Partition keys are Unicode strings with a |
| 620 maximum length limit of 256 bytes. Amazon Kinesis uses the |
| 621 partition key as input to a hash function that maps the partition |
| 622 key and associated data to a specific shard. Specifically, an MD5 |
| 623 hash function is used to map partition keys to 128-bit integer |
| 624 values and to map associated data records to shards. As a result of |
| 625 this hashing mechanism, all data records with the same partition |
| 626 key will map to the same shard within the stream. |
| 627 |
| 628 :type explicit_hash_key: string |
| 629 :param explicit_hash_key: The hash value used to explicitly determine |
| 630 the shard the data record is assigned to by overriding the |
| 631 partition key hash. |
| 632 |
| 633 :type sequence_number_for_ordering: string |
| 634 :param sequence_number_for_ordering: Guarantees strictly increasing |
| 635 sequence numbers, for puts from the same client and to the same |
| 636 partition key. Usage: set the `SequenceNumberForOrdering` of record |
| 637 n to the sequence number of record n-1 (as returned in the |
| 638 PutRecordResult when putting record n-1 ). If this parameter is not |
| 639 set, records will be coarsely ordered based on arrival time. |
| 640 |
| 641 :type b64_encode: boolean |
| 642 :param b64_encode: Whether to Base64 encode `data`. Can be set to |
| 643 ``False`` if `data` is already encoded to prevent double encoding. |
| 644 |
| 645 """ |
| 646 params = { |
| 647 'StreamName': stream_name, |
| 648 'Data': data, |
| 649 'PartitionKey': partition_key, |
| 650 } |
| 651 if explicit_hash_key is not None: |
| 652 params['ExplicitHashKey'] = explicit_hash_key |
| 653 if sequence_number_for_ordering is not None: |
| 654 params['SequenceNumberForOrdering'] = sequence_number_for_ordering |
| 655 if b64_encode: |
| 656 if not isinstance(params['Data'], six.binary_type): |
| 657 params['Data'] = params['Data'].encode('utf-8') |
| 658 params['Data'] = base64.b64encode(params['Data']).decode('utf-8') |
| 659 return self.make_request(action='PutRecord', |
| 660 body=json.dumps(params)) |
| 661 |
| 662 def put_records(self, records, stream_name, b64_encode=True): |
| 663 """ |
| 664 Puts (writes) multiple data records from a producer into an |
| 665 Amazon Kinesis stream in a single call (also referred to as a |
| 666 `PutRecords` request). Use this operation to send data from a |
| 667 data producer into the Amazon Kinesis stream for real-time |
| 668 ingestion and processing. Each shard can support up to 1000 |
| 669 records written per second, up to a maximum total of 1 MB data |
| 670 written per second. |
| 671 |
| 672 You must specify the name of the stream that captures, stores, |
| 673 and transports the data; and an array of request `Records`, |
| 674 with each record in the array requiring a partition key and |
| 675 data blob. |
| 676 |
| 677 The data blob can be any type of data; for example, a segment |
| 678 from a log file, geographic/location data, website clickstream |
| 679 data, and so on. |
| 680 |
| 681 The partition key is used by Amazon Kinesis as input to a hash |
| 682 function that maps the partition key and associated data to a |
| 683 specific shard. An MD5 hash function is used to map partition |
| 684 keys to 128-bit integer values and to map associated data |
| 685 records to shards. As a result of this hashing mechanism, all |
| 686 data records with the same partition key map to the same shard |
| 687 within the stream. For more information, see `Partition Key`_ |
| 688 in the Amazon Kinesis Developer Guide . |
| 689 |
| 690 Each record in the `Records` array may include an optional |
| 691 parameter, `ExplicitHashKey`, which overrides the partition |
| 692 key to shard mapping. This parameter allows a data producer to |
| 693 determine explicitly the shard where the record is stored. For |
| 694 more information, see `Adding Multiple Records with |
| 695 PutRecords`_ in the Amazon Kinesis Developer Guide . |
| 696 |
| 697 The `PutRecords` response includes an array of response |
| 698 `Records`. Each record in the response array directly |
| 699 correlates with a record in the request array using natural |
| 700 ordering, from the top to the bottom of the request and |
| 701 response. The response `Records` array always includes the |
| 702 same number of records as the request array. |
| 703 |
| 704 The response `Records` array includes both successfully and |
| 705 unsuccessfully processed records. Amazon Kinesis attempts to |
| 706 process all records in each `PutRecords` request. A single |
| 707 record failure does not stop the processing of subsequent |
| 708 records. |
| 709 |
| 710 A successfully-processed record includes `ShardId` and |
| 711 `SequenceNumber` values. The `ShardId` parameter identifies |
| 712 the shard in the stream where the record is stored. The |
| 713 `SequenceNumber` parameter is an identifier assigned to the |
| 714 put record, unique to all records in the stream. |
| 715 |
| 716 An unsuccessfully-processed record includes `ErrorCode` and |
| 717 `ErrorMessage` values. `ErrorCode` reflects the type of error |
| 718 and can be one of the following values: |
| 719 `ProvisionedThroughputExceededException` or `InternalFailure`. |
| 720 `ErrorMessage` provides more detailed information about the |
| 721 `ProvisionedThroughputExceededException` exception including |
| 722 the account ID, stream name, and shard ID of the record that |
| 723 was throttled. |
| 724 |
| 725 Data records are accessible for only 24 hours from the time |
| 726 that they are added to an Amazon Kinesis stream. |
| 727 |
| 728 :type records: list |
| 729 :param records: The records associated with the request. |
| 730 |
| 731 :type stream_name: string |
| 732 :param stream_name: The stream name associated with the request. |
| 733 |
| 734 :type b64_encode: boolean |
| 735 :param b64_encode: Whether to Base64 encode `data`. Can be set to |
| 736 ``False`` if `data` is already encoded to prevent double encoding. |
| 737 |
| 738 """ |
| 739 params = {'Records': records, 'StreamName': stream_name, } |
| 740 if b64_encode: |
| 741 for i in range(len(params['Records'])): |
| 742 data = params['Records'][i]['Data'] |
| 743 if not isinstance(data, six.binary_type): |
| 744 data = data.encode('utf-8') |
| 745 params['Records'][i]['Data'] = base64.b64encode( |
| 746 data).decode('utf-8') |
| 747 return self.make_request(action='PutRecords', |
| 748 body=json.dumps(params)) |
| 749 |
| 750 def remove_tags_from_stream(self, stream_name, tag_keys): |
| 751 """ |
| 752 Deletes tags from the specified Amazon Kinesis stream. |
| 753 |
| 754 If you specify a tag that does not exist, it is ignored. |
| 755 |
| 756 :type stream_name: string |
| 757 :param stream_name: The name of the stream. |
| 758 |
| 759 :type tag_keys: list |
| 760 :param tag_keys: A list of tag keys. Each corresponding tag is removed |
| 761 from the stream. |
| 762 |
| 763 """ |
| 764 params = {'StreamName': stream_name, 'TagKeys': tag_keys, } |
| 765 return self.make_request(action='RemoveTagsFromStream', |
| 766 body=json.dumps(params)) |
| 767 |
| 768 def split_shard(self, stream_name, shard_to_split, new_starting_hash_key): |
| 769 """ |
| 770 Splits a shard into two new shards in the stream, to increase |
| 771 the stream's capacity to ingest and transport data. |
| 772 `SplitShard` is called when there is a need to increase the |
| 773 overall capacity of stream because of an expected increase in |
| 774 the volume of data records being ingested. |
| 775 |
| 776 You can also use `SplitShard` when a shard appears to be |
| 777 approaching its maximum utilization, for example, when the set |
| 778 of producers sending data into the specific shard are suddenly |
| 779 sending more than previously anticipated. You can also call |
| 780 `SplitShard` to increase stream capacity, so that more Amazon |
| 781 Kinesis applications can simultaneously read data from the |
| 782 stream for real-time processing. |
| 783 |
| 784 You must specify the shard to be split and the new hash key, |
| 785 which is the position in the shard where the shard gets split |
| 786 in two. In many cases, the new hash key might simply be the |
| 787 average of the beginning and ending hash key, but it can be |
| 788 any hash key value in the range being mapped into the shard. |
| 789 For more information about splitting shards, see `Split a |
| 790 Shard`_ in the Amazon Kinesis Developer Guide . |
| 791 |
| 792 You can use DescribeStream to determine the shard ID and hash |
| 793 key values for the `ShardToSplit` and `NewStartingHashKey` |
| 794 parameters that are specified in the `SplitShard` request. |
| 795 |
| 796 `SplitShard` is an asynchronous operation. Upon receiving a |
| 797 `SplitShard` request, Amazon Kinesis immediately returns a |
| 798 response and sets the stream status to `UPDATING`. After the |
| 799 operation is completed, Amazon Kinesis sets the stream status |
| 800 to `ACTIVE`. Read and write operations continue to work while |
| 801 the stream is in the `UPDATING` state. |
| 802 |
| 803 You can use `DescribeStream` to check the status of the |
| 804 stream, which is returned in `StreamStatus`. If the stream is |
| 805 in the `ACTIVE` state, you can call `SplitShard`. If a stream |
| 806 is in `CREATING` or `UPDATING` or `DELETING` states, |
| 807 `DescribeStream` returns a `ResourceInUseException`. |
| 808 |
| 809 If the specified stream does not exist, `DescribeStream` |
| 810 returns a `ResourceNotFoundException`. If you try to create |
| 811 more shards than are authorized for your account, you receive |
| 812 a `LimitExceededException`. |
| 813 |
| 814 The default limit for an AWS account is 10 shards per stream. |
| 815 If you need to create a stream with more than 10 shards, |
| 816 `contact AWS Support`_ to increase the limit on your account. |
| 817 |
| 818 If you try to operate on too many streams in parallel using |
| 819 CreateStream, DeleteStream, MergeShards or SplitShard, you |
| 820 receive a `LimitExceededException`. |
| 821 |
| 822 `SplitShard` has limit of 5 transactions per second per |
| 823 account. |
| 824 |
| 825 :type stream_name: string |
| 826 :param stream_name: The name of the stream for the shard split. |
| 827 |
| 828 :type shard_to_split: string |
| 829 :param shard_to_split: The shard ID of the shard to split. |
| 830 |
| 831 :type new_starting_hash_key: string |
| 832 :param new_starting_hash_key: A hash key value for the starting hash |
| 833 key of one of the child shards created by the split. The hash key |
| 834 range for a given shard constitutes a set of ordered contiguous |
| 835 positive integers. The value for `NewStartingHashKey` must be in |
| 836 the range of hash keys being mapped into the shard. The |
| 837 `NewStartingHashKey` hash key value and all higher hash key values |
| 838 in hash key range are distributed to one of the child shards. All |
| 839 the lower hash key values in the range are distributed to the other |
| 840 child shard. |
| 841 |
| 842 """ |
| 843 params = { |
| 844 'StreamName': stream_name, |
| 845 'ShardToSplit': shard_to_split, |
| 846 'NewStartingHashKey': new_starting_hash_key, |
| 847 } |
| 848 return self.make_request(action='SplitShard', |
| 849 body=json.dumps(params)) |
| 850 |
| 851 def make_request(self, action, body): |
| 852 headers = { |
| 853 'X-Amz-Target': '%s.%s' % (self.TargetPrefix, action), |
| 854 'Host': self.region.endpoint, |
| 855 'Content-Type': 'application/x-amz-json-1.1', |
| 856 'Content-Length': str(len(body)), |
| 857 } |
| 858 http_request = self.build_base_http_request( |
| 859 method='POST', path='/', auth_path='/', params={}, |
| 860 headers=headers, data=body) |
| 861 response = self._mexe(http_request, sender=None, |
| 862 override_num_retries=10) |
| 863 response_body = response.read().decode('utf-8') |
| 864 boto.log.debug(response.getheaders()) |
| 865 boto.log.debug(response_body) |
| 866 if response.status == 200: |
| 867 if response_body: |
| 868 return json.loads(response_body) |
| 869 else: |
| 870 json_body = json.loads(response_body) |
| 871 fault_name = json_body.get('__type', None) |
| 872 exception_class = self._faults.get(fault_name, self.ResponseError) |
| 873 raise exception_class(response.status, response.reason, |
| 874 body=json_body) |
| 875 |
OLD | NEW |