Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(276)

Side by Side Diff: third_party/gsutil/third_party/boto/boto/kinesis/layer1.py

Issue 1377933002: [catapult] - Copy Telemetry's gsutilz over to third_party. (Closed) Base URL: https://github.com/catapult-project/catapult.git@master
Patch Set: Rename to gsutil. Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright (c) 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved
2 #
3 # Permission is hereby granted, free of charge, to any person obtaining a
4 # copy of this software and associated documentation files (the
5 # "Software"), to deal in the Software without restriction, including
6 # without limitation the rights to use, copy, modify, merge, publish, dis-
7 # tribute, sublicense, and/or sell copies of the Software, and to permit
8 # persons to whom the Software is furnished to do so, subject to the fol-
9 # lowing conditions:
10 #
11 # The above copyright notice and this permission notice shall be included
12 # in all copies or substantial portions of the Software.
13 #
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20 # IN THE SOFTWARE.
21 #
22
23 import base64
24 import boto
25
26 from boto.connection import AWSQueryConnection
27 from boto.regioninfo import RegionInfo
28 from boto.exception import JSONResponseError
29 from boto.kinesis import exceptions
30 from boto.compat import json
31 from boto.compat import six
32
33
34 class KinesisConnection(AWSQueryConnection):
35 """
36 Amazon Kinesis Service API Reference
37 Amazon Kinesis is a managed service that scales elastically for
38 real time processing of streaming big data.
39 """
40 APIVersion = "2013-12-02"
41 DefaultRegionName = "us-east-1"
42 DefaultRegionEndpoint = "kinesis.us-east-1.amazonaws.com"
43 ServiceName = "Kinesis"
44 TargetPrefix = "Kinesis_20131202"
45 ResponseError = JSONResponseError
46
47 _faults = {
48 "ProvisionedThroughputExceededException": exceptions.ProvisionedThroughp utExceededException,
49 "LimitExceededException": exceptions.LimitExceededException,
50 "ExpiredIteratorException": exceptions.ExpiredIteratorException,
51 "ResourceInUseException": exceptions.ResourceInUseException,
52 "ResourceNotFoundException": exceptions.ResourceNotFoundException,
53 "InvalidArgumentException": exceptions.InvalidArgumentException,
54 "SubscriptionRequiredException": exceptions.SubscriptionRequiredExceptio n
55 }
56
57
58 def __init__(self, **kwargs):
59 region = kwargs.pop('region', None)
60 if not region:
61 region = RegionInfo(self, self.DefaultRegionName,
62 self.DefaultRegionEndpoint)
63 if 'host' not in kwargs:
64 kwargs['host'] = region.endpoint
65 super(KinesisConnection, self).__init__(**kwargs)
66 self.region = region
67
68 def _required_auth_capability(self):
69 return ['hmac-v4']
70
71 def add_tags_to_stream(self, stream_name, tags):
72 """
73 Adds or updates tags for the specified Amazon Kinesis stream.
74 Each stream can have up to 10 tags.
75
76 If tags have already been assigned to the stream,
77 `AddTagsToStream` overwrites any existing tags that correspond
78 to the specified tag keys.
79
80 :type stream_name: string
81 :param stream_name: The name of the stream.
82
83 :type tags: map
84 :param tags: The set of key-value pairs to use to create the tags.
85
86 """
87 params = {'StreamName': stream_name, 'Tags': tags, }
88 return self.make_request(action='AddTagsToStream',
89 body=json.dumps(params))
90
91 def create_stream(self, stream_name, shard_count):
92 """
93 Creates a Amazon Kinesis stream. A stream captures and
94 transports data records that are continuously emitted from
95 different data sources or producers . Scale-out within an
96 Amazon Kinesis stream is explicitly supported by means of
97 shards, which are uniquely identified groups of data records
98 in an Amazon Kinesis stream.
99
100 You specify and control the number of shards that a stream is
101 composed of. Each open shard can support up to 5 read
102 transactions per second, up to a maximum total of 2 MB of data
103 read per second. Each shard can support up to 1000 records
104 written per second, up to a maximum total of 1 MB data written
105 per second. You can add shards to a stream if the amount of
106 data input increases and you can remove shards if the amount
107 of data input decreases.
108
109 The stream name identifies the stream. The name is scoped to
110 the AWS account used by the application. It is also scoped by
111 region. That is, two streams in two different accounts can
112 have the same name, and two streams in the same account, but
113 in two different regions, can have the same name.
114
115 `CreateStream` is an asynchronous operation. Upon receiving a
116 `CreateStream` request, Amazon Kinesis immediately returns and
117 sets the stream status to `CREATING`. After the stream is
118 created, Amazon Kinesis sets the stream status to `ACTIVE`.
119 You should perform read and write operations only on an
120 `ACTIVE` stream.
121
122 You receive a `LimitExceededException` when making a
123 `CreateStream` request if you try to do one of the following:
124
125
126 + Have more than five streams in the `CREATING` state at any
127 point in time.
128 + Create more shards than are authorized for your account.
129
130
131 The default limit for an AWS account is 10 shards per stream.
132 If you need to create a stream with more than 10 shards,
133 `contact AWS Support`_ to increase the limit on your account.
134
135 You can use `DescribeStream` to check the stream status, which
136 is returned in `StreamStatus`.
137
138 `CreateStream` has a limit of 5 transactions per second per
139 account.
140
141 :type stream_name: string
142 :param stream_name: A name to identify the stream. The stream name is
143 scoped to the AWS account used by the application that creates the
144 stream. It is also scoped by region. That is, two streams in two
145 different AWS accounts can have the same name, and two streams in
146 the same AWS account, but in two different regions, can have the
147 same name.
148
149 :type shard_count: integer
150 :param shard_count: The number of shards that the stream will use. The
151 throughput of the stream is a function of the number of shards;
152 more shards are required for greater provisioned throughput.
153 **Note:** The default limit for an AWS account is 10 shards per stream.
154 If you need to create a stream with more than 10 shards, `contact
155 AWS Support`_ to increase the limit on your account.
156
157 """
158 params = {
159 'StreamName': stream_name,
160 'ShardCount': shard_count,
161 }
162 return self.make_request(action='CreateStream',
163 body=json.dumps(params))
164
165 def delete_stream(self, stream_name):
166 """
167 Deletes a stream and all its shards and data. You must shut
168 down any applications that are operating on the stream before
169 you delete the stream. If an application attempts to operate
170 on a deleted stream, it will receive the exception
171 `ResourceNotFoundException`.
172
173 If the stream is in the `ACTIVE` state, you can delete it.
174 After a `DeleteStream` request, the specified stream is in the
175 `DELETING` state until Amazon Kinesis completes the deletion.
176
177 **Note:** Amazon Kinesis might continue to accept data read
178 and write operations, such as PutRecord, PutRecords, and
179 GetRecords, on a stream in the `DELETING` state until the
180 stream deletion is complete.
181
182 When you delete a stream, any shards in that stream are also
183 deleted, and any tags are dissociated from the stream.
184
185 You can use the DescribeStream operation to check the state of
186 the stream, which is returned in `StreamStatus`.
187
188 `DeleteStream` has a limit of 5 transactions per second per
189 account.
190
191 :type stream_name: string
192 :param stream_name: The name of the stream to delete.
193
194 """
195 params = {'StreamName': stream_name, }
196 return self.make_request(action='DeleteStream',
197 body=json.dumps(params))
198
199 def describe_stream(self, stream_name, limit=None,
200 exclusive_start_shard_id=None):
201 """
202 Describes the specified stream.
203
204 The information about the stream includes its current status,
205 its Amazon Resource Name (ARN), and an array of shard objects.
206 For each shard object, there is information about the hash key
207 and sequence number ranges that the shard spans, and the IDs
208 of any earlier shards that played in a role in creating the
209 shard. A sequence number is the identifier associated with
210 every record ingested in the Amazon Kinesis stream. The
211 sequence number is assigned when a record is put into the
212 stream.
213
214 You can limit the number of returned shards using the `Limit`
215 parameter. The number of shards in a stream may be too large
216 to return from a single call to `DescribeStream`. You can
217 detect this by using the `HasMoreShards` flag in the returned
218 output. `HasMoreShards` is set to `True` when there is more
219 data available.
220
221 `DescribeStream` is a paginated operation. If there are more
222 shards available, you can request them using the shard ID of
223 the last shard returned. Specify this ID in the
224 `ExclusiveStartShardId` parameter in a subsequent request to
225 `DescribeStream`.
226
227 `DescribeStream` has a limit of 10 transactions per second per
228 account.
229
230 :type stream_name: string
231 :param stream_name: The name of the stream to describe.
232
233 :type limit: integer
234 :param limit: The maximum number of shards to return.
235
236 :type exclusive_start_shard_id: string
237 :param exclusive_start_shard_id: The shard ID of the shard to start
238 with.
239
240 """
241 params = {'StreamName': stream_name, }
242 if limit is not None:
243 params['Limit'] = limit
244 if exclusive_start_shard_id is not None:
245 params['ExclusiveStartShardId'] = exclusive_start_shard_id
246 return self.make_request(action='DescribeStream',
247 body=json.dumps(params))
248
249 def get_records(self, shard_iterator, limit=None, b64_decode=True):
250 """
251 Gets data records from a shard.
252
253 Specify a shard iterator using the `ShardIterator` parameter.
254 The shard iterator specifies the position in the shard from
255 which you want to start reading data records sequentially. If
256 there are no records available in the portion of the shard
257 that the iterator points to, `GetRecords` returns an empty
258 list. Note that it might take multiple calls to get to a
259 portion of the shard that contains records.
260
261 You can scale by provisioning multiple shards. Your
262 application should have one thread per shard, each reading
263 continuously from its stream. To read from a stream
264 continually, call `GetRecords` in a loop. Use GetShardIterator
265 to get the shard iterator to specify in the first `GetRecords`
266 call. `GetRecords` returns a new shard iterator in
267 `NextShardIterator`. Specify the shard iterator returned in
268 `NextShardIterator` in subsequent calls to `GetRecords`. Note
269 that if the shard has been closed, the shard iterator can't
270 return more data and `GetRecords` returns `null` in
271 `NextShardIterator`. You can terminate the loop when the shard
272 is closed, or when the shard iterator reaches the record with
273 the sequence number or other attribute that marks it as the
274 last record to process.
275
276 Each data record can be up to 50 KB in size, and each shard
277 can read up to 2 MB per second. You can ensure that your calls
278 don't exceed the maximum supported size or throughput by using
279 the `Limit` parameter to specify the maximum number of records
280 that `GetRecords` can return. Consider your average record
281 size when determining this limit. For example, if your average
282 record size is 40 KB, you can limit the data returned to about
283 1 MB per call by specifying 25 as the limit.
284
285 The size of the data returned by `GetRecords` will vary
286 depending on the utilization of the shard. The maximum size of
287 data that `GetRecords` can return is 10 MB. If a call returns
288 10 MB of data, subsequent calls made within the next 5 seconds
289 throw `ProvisionedThroughputExceededException`. If there is
290 insufficient provisioned throughput on the shard, subsequent
291 calls made within the next 1 second throw
292 `ProvisionedThroughputExceededException`. Note that
293 `GetRecords` won't return any data when it throws an
294 exception. For this reason, we recommend that you wait one
295 second between calls to `GetRecords`; however, it's possible
296 that the application will get exceptions for longer than 1
297 second.
298
299 To detect whether the application is falling behind in
300 processing, add a timestamp to your records and note how long
301 it takes to process them. You can also monitor how much data
302 is in a stream using the CloudWatch metrics for write
303 operations ( `PutRecord` and `PutRecords`). For more
304 information, see `Monitoring Amazon Kinesis with Amazon
305 CloudWatch`_ in the Amazon Kinesis Developer Guide .
306
307 :type shard_iterator: string
308 :param shard_iterator: The position in the shard from which you want to
309 start sequentially reading data records. A shard iterator specifies
310 this position using the sequence number of a data record in the
311 shard.
312
313 :type limit: integer
314 :param limit: The maximum number of records to return. Specify a value
315 of up to 10,000. If you specify a value that is greater than
316 10,000, `GetRecords` throws `InvalidArgumentException`.
317
318 :type b64_decode: boolean
319 :param b64_decode: Decode the Base64-encoded ``Data`` field of records.
320
321 """
322 params = {'ShardIterator': shard_iterator, }
323 if limit is not None:
324 params['Limit'] = limit
325
326 response = self.make_request(action='GetRecords',
327 body=json.dumps(params))
328
329 # Base64 decode the data
330 if b64_decode:
331 for record in response.get('Records', []):
332 record['Data'] = base64.b64decode(
333 record['Data'].encode('utf-8')).decode('utf-8')
334
335 return response
336
337 def get_shard_iterator(self, stream_name, shard_id, shard_iterator_type,
338 starting_sequence_number=None):
339 """
340 Gets a shard iterator. A shard iterator expires five minutes
341 after it is returned to the requester.
342
343 A shard iterator specifies the position in the shard from
344 which to start reading data records sequentially. A shard
345 iterator specifies this position using the sequence number of
346 a data record in a shard. A sequence number is the identifier
347 associated with every record ingested in the Amazon Kinesis
348 stream. The sequence number is assigned when a record is put
349 into the stream.
350
351 You must specify the shard iterator type. For example, you can
352 set the `ShardIteratorType` parameter to read exactly from the
353 position denoted by a specific sequence number by using the
354 `AT_SEQUENCE_NUMBER` shard iterator type, or right after the
355 sequence number by using the `AFTER_SEQUENCE_NUMBER` shard
356 iterator type, using sequence numbers returned by earlier
357 calls to PutRecord, PutRecords, GetRecords, or DescribeStream.
358 You can specify the shard iterator type `TRIM_HORIZON` in the
359 request to cause `ShardIterator` to point to the last
360 untrimmed record in the shard in the system, which is the
361 oldest data record in the shard. Or you can point to just
362 after the most recent record in the shard, by using the shard
363 iterator type `LATEST`, so that you always read the most
364 recent data in the shard.
365
366 When you repeatedly read from an Amazon Kinesis stream use a
367 GetShardIterator request to get the first shard iterator to to
368 use in your first `GetRecords` request and then use the shard
369 iterator returned by the `GetRecords` request in
370 `NextShardIterator` for subsequent reads. A new shard iterator
371 is returned by every `GetRecords` request in
372 `NextShardIterator`, which you use in the `ShardIterator`
373 parameter of the next `GetRecords` request.
374
375 If a `GetShardIterator` request is made too often, you receive
376 a `ProvisionedThroughputExceededException`. For more
377 information about throughput limits, see GetRecords.
378
379 If the shard is closed, the iterator can't return more data,
380 and `GetShardIterator` returns `null` for its `ShardIterator`.
381 A shard can be closed using SplitShard or MergeShards.
382
383 `GetShardIterator` has a limit of 5 transactions per second
384 per account per open shard.
385
386 :type stream_name: string
387 :param stream_name: The name of the stream.
388
389 :type shard_id: string
390 :param shard_id: The shard ID of the shard to get the iterator for.
391
392 :type shard_iterator_type: string
393 :param shard_iterator_type:
394 Determines how the shard iterator is used to start reading data records
395 from the shard.
396
397 The following are the valid shard iterator types:
398
399
400 + AT_SEQUENCE_NUMBER - Start reading exactly from the position denoted
401 by a specific sequence number.
402 + AFTER_SEQUENCE_NUMBER - Start reading right after the position
403 denoted by a specific sequence number.
404 + TRIM_HORIZON - Start reading at the last untrimmed record in the
405 shard in the system, which is the oldest data record in the shard.
406 + LATEST - Start reading just after the most recent record in the
407 shard, so that you always read the most recent data in the shard.
408
409 :type starting_sequence_number: string
410 :param starting_sequence_number: The sequence number of the data record
411 in the shard from which to start reading from.
412
413 """
414 params = {
415 'StreamName': stream_name,
416 'ShardId': shard_id,
417 'ShardIteratorType': shard_iterator_type,
418 }
419 if starting_sequence_number is not None:
420 params['StartingSequenceNumber'] = starting_sequence_number
421 return self.make_request(action='GetShardIterator',
422 body=json.dumps(params))
423
424 def list_streams(self, limit=None, exclusive_start_stream_name=None):
425 """
426 Lists your streams.
427
428 The number of streams may be too large to return from a single
429 call to `ListStreams`. You can limit the number of returned
430 streams using the `Limit` parameter. If you do not specify a
431 value for the `Limit` parameter, Amazon Kinesis uses the
432 default limit, which is currently 10.
433
434 You can detect if there are more streams available to list by
435 using the `HasMoreStreams` flag from the returned output. If
436 there are more streams available, you can request more streams
437 by using the name of the last stream returned by the
438 `ListStreams` request in the `ExclusiveStartStreamName`
439 parameter in a subsequent request to `ListStreams`. The group
440 of stream names returned by the subsequent request is then
441 added to the list. You can continue this process until all the
442 stream names have been collected in the list.
443
444 `ListStreams` has a limit of 5 transactions per second per
445 account.
446
447 :type limit: integer
448 :param limit: The maximum number of streams to list.
449
450 :type exclusive_start_stream_name: string
451 :param exclusive_start_stream_name: The name of the stream to start the
452 list with.
453
454 """
455 params = {}
456 if limit is not None:
457 params['Limit'] = limit
458 if exclusive_start_stream_name is not None:
459 params['ExclusiveStartStreamName'] = exclusive_start_stream_name
460 return self.make_request(action='ListStreams',
461 body=json.dumps(params))
462
463 def list_tags_for_stream(self, stream_name, exclusive_start_tag_key=None,
464 limit=None):
465 """
466 Lists the tags for the specified Amazon Kinesis stream.
467
468 :type stream_name: string
469 :param stream_name: The name of the stream.
470
471 :type exclusive_start_tag_key: string
472 :param exclusive_start_tag_key: The key to use as the starting point
473 for the list of tags. If this parameter is set, `ListTagsForStream`
474 gets all tags that occur after `ExclusiveStartTagKey`.
475
476 :type limit: integer
477 :param limit: The number of tags to return. If this number is less than
478 the total number of tags associated with the stream, `HasMoreTags`
479 is set to `True`. To list additional tags, set
480 `ExclusiveStartTagKey` to the last key in the response.
481
482 """
483 params = {'StreamName': stream_name, }
484 if exclusive_start_tag_key is not None:
485 params['ExclusiveStartTagKey'] = exclusive_start_tag_key
486 if limit is not None:
487 params['Limit'] = limit
488 return self.make_request(action='ListTagsForStream',
489 body=json.dumps(params))
490
491 def merge_shards(self, stream_name, shard_to_merge,
492 adjacent_shard_to_merge):
493 """
494 Merges two adjacent shards in a stream and combines them into
495 a single shard to reduce the stream's capacity to ingest and
496 transport data. Two shards are considered adjacent if the
497 union of the hash key ranges for the two shards form a
498 contiguous set with no gaps. For example, if you have two
499 shards, one with a hash key range of 276...381 and the other
500 with a hash key range of 382...454, then you could merge these
501 two shards into a single shard that would have a hash key
502 range of 276...454. After the merge, the single child shard
503 receives data for all hash key values covered by the two
504 parent shards.
505
506 `MergeShards` is called when there is a need to reduce the
507 overall capacity of a stream because of excess capacity that
508 is not being used. You must specify the shard to be merged and
509 the adjacent shard for a stream. For more information about
510 merging shards, see `Merge Two Shards`_ in the Amazon Kinesis
511 Developer Guide .
512
513 If the stream is in the `ACTIVE` state, you can call
514 `MergeShards`. If a stream is in the `CREATING`, `UPDATING`,
515 or `DELETING` state, `MergeShards` returns a
516 `ResourceInUseException`. If the specified stream does not
517 exist, `MergeShards` returns a `ResourceNotFoundException`.
518
519 You can use DescribeStream to check the state of the stream,
520 which is returned in `StreamStatus`.
521
522 `MergeShards` is an asynchronous operation. Upon receiving a
523 `MergeShards` request, Amazon Kinesis immediately returns a
524 response and sets the `StreamStatus` to `UPDATING`. After the
525 operation is completed, Amazon Kinesis sets the `StreamStatus`
526 to `ACTIVE`. Read and write operations continue to work while
527 the stream is in the `UPDATING` state.
528
529 You use DescribeStream to determine the shard IDs that are
530 specified in the `MergeShards` request.
531
532 If you try to operate on too many streams in parallel using
533 CreateStream, DeleteStream, `MergeShards` or SplitShard, you
534 will receive a `LimitExceededException`.
535
536 `MergeShards` has limit of 5 transactions per second per
537 account.
538
539 :type stream_name: string
540 :param stream_name: The name of the stream for the merge.
541
542 :type shard_to_merge: string
543 :param shard_to_merge: The shard ID of the shard to combine with the
544 adjacent shard for the merge.
545
546 :type adjacent_shard_to_merge: string
547 :param adjacent_shard_to_merge: The shard ID of the adjacent shard for
548 the merge.
549
550 """
551 params = {
552 'StreamName': stream_name,
553 'ShardToMerge': shard_to_merge,
554 'AdjacentShardToMerge': adjacent_shard_to_merge,
555 }
556 return self.make_request(action='MergeShards',
557 body=json.dumps(params))
558
559 def put_record(self, stream_name, data, partition_key,
560 explicit_hash_key=None,
561 sequence_number_for_ordering=None,
562 exclusive_minimum_sequence_number=None,
563 b64_encode=True):
564 """
565 This operation puts a data record into an Amazon Kinesis
566 stream from a producer. This operation must be called to send
567 data from the producer into the Amazon Kinesis stream for
568 real-time ingestion and subsequent processing. The `PutRecord`
569 operation requires the name of the stream that captures,
570 stores, and transports the data; a partition key; and the data
571 blob itself. The data blob could be a segment from a log file,
572 geographic/location data, website clickstream data, or any
573 other data type.
574
575 The partition key is used to distribute data across shards.
576 Amazon Kinesis segregates the data records that belong to a
577 data stream into multiple shards, using the partition key
578 associated with each data record to determine which shard a
579 given data record belongs to.
580
581 Partition keys are Unicode strings, with a maximum length
582 limit of 256 bytes. An MD5 hash function is used to map
583 partition keys to 128-bit integer values and to map associated
584 data records to shards using the hash key ranges of the
585 shards. You can override hashing the partition key to
586 determine the shard by explicitly specifying a hash value
587 using the `ExplicitHashKey` parameter. For more information,
588 see the `Amazon Kinesis Developer Guide`_.
589
590 `PutRecord` returns the shard ID of where the data record was
591 placed and the sequence number that was assigned to the data
592 record.
593
594 Sequence numbers generally increase over time. To guarantee
595 strictly increasing ordering, use the
596 `SequenceNumberForOrdering` parameter. For more information,
597 see the `Amazon Kinesis Developer Guide`_.
598
599 If a `PutRecord` request cannot be processed because of
600 insufficient provisioned throughput on the shard involved in
601 the request, `PutRecord` throws
602 `ProvisionedThroughputExceededException`.
603
604 Data records are accessible for only 24 hours from the time
605 that they are added to an Amazon Kinesis stream.
606
607 :type stream_name: string
608 :param stream_name: The name of the stream to put the data record into.
609
610 :type data: blob
611 :param data: The data blob to put into the record, which is
612 Base64-encoded when the blob is serialized.
613 The maximum size of the data blob (the payload after
614 Base64-decoding) is 50 kilobytes (KB)
615 Set `b64_encode` to disable automatic Base64 encoding.
616
617 :type partition_key: string
618 :param partition_key: Determines which shard in the stream the data
619 record is assigned to. Partition keys are Unicode strings with a
620 maximum length limit of 256 bytes. Amazon Kinesis uses the
621 partition key as input to a hash function that maps the partition
622 key and associated data to a specific shard. Specifically, an MD5
623 hash function is used to map partition keys to 128-bit integer
624 values and to map associated data records to shards. As a result of
625 this hashing mechanism, all data records with the same partition
626 key will map to the same shard within the stream.
627
628 :type explicit_hash_key: string
629 :param explicit_hash_key: The hash value used to explicitly determine
630 the shard the data record is assigned to by overriding the
631 partition key hash.
632
633 :type sequence_number_for_ordering: string
634 :param sequence_number_for_ordering: Guarantees strictly increasing
635 sequence numbers, for puts from the same client and to the same
636 partition key. Usage: set the `SequenceNumberForOrdering` of record
637 n to the sequence number of record n-1 (as returned in the
638 PutRecordResult when putting record n-1 ). If this parameter is not
639 set, records will be coarsely ordered based on arrival time.
640
641 :type b64_encode: boolean
642 :param b64_encode: Whether to Base64 encode `data`. Can be set to
643 ``False`` if `data` is already encoded to prevent double encoding.
644
645 """
646 params = {
647 'StreamName': stream_name,
648 'Data': data,
649 'PartitionKey': partition_key,
650 }
651 if explicit_hash_key is not None:
652 params['ExplicitHashKey'] = explicit_hash_key
653 if sequence_number_for_ordering is not None:
654 params['SequenceNumberForOrdering'] = sequence_number_for_ordering
655 if b64_encode:
656 if not isinstance(params['Data'], six.binary_type):
657 params['Data'] = params['Data'].encode('utf-8')
658 params['Data'] = base64.b64encode(params['Data']).decode('utf-8')
659 return self.make_request(action='PutRecord',
660 body=json.dumps(params))
661
662 def put_records(self, records, stream_name, b64_encode=True):
663 """
664 Puts (writes) multiple data records from a producer into an
665 Amazon Kinesis stream in a single call (also referred to as a
666 `PutRecords` request). Use this operation to send data from a
667 data producer into the Amazon Kinesis stream for real-time
668 ingestion and processing. Each shard can support up to 1000
669 records written per second, up to a maximum total of 1 MB data
670 written per second.
671
672 You must specify the name of the stream that captures, stores,
673 and transports the data; and an array of request `Records`,
674 with each record in the array requiring a partition key and
675 data blob.
676
677 The data blob can be any type of data; for example, a segment
678 from a log file, geographic/location data, website clickstream
679 data, and so on.
680
681 The partition key is used by Amazon Kinesis as input to a hash
682 function that maps the partition key and associated data to a
683 specific shard. An MD5 hash function is used to map partition
684 keys to 128-bit integer values and to map associated data
685 records to shards. As a result of this hashing mechanism, all
686 data records with the same partition key map to the same shard
687 within the stream. For more information, see `Partition Key`_
688 in the Amazon Kinesis Developer Guide .
689
690 Each record in the `Records` array may include an optional
691 parameter, `ExplicitHashKey`, which overrides the partition
692 key to shard mapping. This parameter allows a data producer to
693 determine explicitly the shard where the record is stored. For
694 more information, see `Adding Multiple Records with
695 PutRecords`_ in the Amazon Kinesis Developer Guide .
696
697 The `PutRecords` response includes an array of response
698 `Records`. Each record in the response array directly
699 correlates with a record in the request array using natural
700 ordering, from the top to the bottom of the request and
701 response. The response `Records` array always includes the
702 same number of records as the request array.
703
704 The response `Records` array includes both successfully and
705 unsuccessfully processed records. Amazon Kinesis attempts to
706 process all records in each `PutRecords` request. A single
707 record failure does not stop the processing of subsequent
708 records.
709
710 A successfully-processed record includes `ShardId` and
711 `SequenceNumber` values. The `ShardId` parameter identifies
712 the shard in the stream where the record is stored. The
713 `SequenceNumber` parameter is an identifier assigned to the
714 put record, unique to all records in the stream.
715
716 An unsuccessfully-processed record includes `ErrorCode` and
717 `ErrorMessage` values. `ErrorCode` reflects the type of error
718 and can be one of the following values:
719 `ProvisionedThroughputExceededException` or `InternalFailure`.
720 `ErrorMessage` provides more detailed information about the
721 `ProvisionedThroughputExceededException` exception including
722 the account ID, stream name, and shard ID of the record that
723 was throttled.
724
725 Data records are accessible for only 24 hours from the time
726 that they are added to an Amazon Kinesis stream.
727
728 :type records: list
729 :param records: The records associated with the request.
730
731 :type stream_name: string
732 :param stream_name: The stream name associated with the request.
733
734 :type b64_encode: boolean
735 :param b64_encode: Whether to Base64 encode `data`. Can be set to
736 ``False`` if `data` is already encoded to prevent double encoding.
737
738 """
739 params = {'Records': records, 'StreamName': stream_name, }
740 if b64_encode:
741 for i in range(len(params['Records'])):
742 data = params['Records'][i]['Data']
743 if not isinstance(data, six.binary_type):
744 data = data.encode('utf-8')
745 params['Records'][i]['Data'] = base64.b64encode(
746 data).decode('utf-8')
747 return self.make_request(action='PutRecords',
748 body=json.dumps(params))
749
750 def remove_tags_from_stream(self, stream_name, tag_keys):
751 """
752 Deletes tags from the specified Amazon Kinesis stream.
753
754 If you specify a tag that does not exist, it is ignored.
755
756 :type stream_name: string
757 :param stream_name: The name of the stream.
758
759 :type tag_keys: list
760 :param tag_keys: A list of tag keys. Each corresponding tag is removed
761 from the stream.
762
763 """
764 params = {'StreamName': stream_name, 'TagKeys': tag_keys, }
765 return self.make_request(action='RemoveTagsFromStream',
766 body=json.dumps(params))
767
768 def split_shard(self, stream_name, shard_to_split, new_starting_hash_key):
769 """
770 Splits a shard into two new shards in the stream, to increase
771 the stream's capacity to ingest and transport data.
772 `SplitShard` is called when there is a need to increase the
773 overall capacity of stream because of an expected increase in
774 the volume of data records being ingested.
775
776 You can also use `SplitShard` when a shard appears to be
777 approaching its maximum utilization, for example, when the set
778 of producers sending data into the specific shard are suddenly
779 sending more than previously anticipated. You can also call
780 `SplitShard` to increase stream capacity, so that more Amazon
781 Kinesis applications can simultaneously read data from the
782 stream for real-time processing.
783
784 You must specify the shard to be split and the new hash key,
785 which is the position in the shard where the shard gets split
786 in two. In many cases, the new hash key might simply be the
787 average of the beginning and ending hash key, but it can be
788 any hash key value in the range being mapped into the shard.
789 For more information about splitting shards, see `Split a
790 Shard`_ in the Amazon Kinesis Developer Guide .
791
792 You can use DescribeStream to determine the shard ID and hash
793 key values for the `ShardToSplit` and `NewStartingHashKey`
794 parameters that are specified in the `SplitShard` request.
795
796 `SplitShard` is an asynchronous operation. Upon receiving a
797 `SplitShard` request, Amazon Kinesis immediately returns a
798 response and sets the stream status to `UPDATING`. After the
799 operation is completed, Amazon Kinesis sets the stream status
800 to `ACTIVE`. Read and write operations continue to work while
801 the stream is in the `UPDATING` state.
802
803 You can use `DescribeStream` to check the status of the
804 stream, which is returned in `StreamStatus`. If the stream is
805 in the `ACTIVE` state, you can call `SplitShard`. If a stream
806 is in `CREATING` or `UPDATING` or `DELETING` states,
807 `DescribeStream` returns a `ResourceInUseException`.
808
809 If the specified stream does not exist, `DescribeStream`
810 returns a `ResourceNotFoundException`. If you try to create
811 more shards than are authorized for your account, you receive
812 a `LimitExceededException`.
813
814 The default limit for an AWS account is 10 shards per stream.
815 If you need to create a stream with more than 10 shards,
816 `contact AWS Support`_ to increase the limit on your account.
817
818 If you try to operate on too many streams in parallel using
819 CreateStream, DeleteStream, MergeShards or SplitShard, you
820 receive a `LimitExceededException`.
821
822 `SplitShard` has limit of 5 transactions per second per
823 account.
824
825 :type stream_name: string
826 :param stream_name: The name of the stream for the shard split.
827
828 :type shard_to_split: string
829 :param shard_to_split: The shard ID of the shard to split.
830
831 :type new_starting_hash_key: string
832 :param new_starting_hash_key: A hash key value for the starting hash
833 key of one of the child shards created by the split. The hash key
834 range for a given shard constitutes a set of ordered contiguous
835 positive integers. The value for `NewStartingHashKey` must be in
836 the range of hash keys being mapped into the shard. The
837 `NewStartingHashKey` hash key value and all higher hash key values
838 in hash key range are distributed to one of the child shards. All
839 the lower hash key values in the range are distributed to the other
840 child shard.
841
842 """
843 params = {
844 'StreamName': stream_name,
845 'ShardToSplit': shard_to_split,
846 'NewStartingHashKey': new_starting_hash_key,
847 }
848 return self.make_request(action='SplitShard',
849 body=json.dumps(params))
850
851 def make_request(self, action, body):
852 headers = {
853 'X-Amz-Target': '%s.%s' % (self.TargetPrefix, action),
854 'Host': self.region.endpoint,
855 'Content-Type': 'application/x-amz-json-1.1',
856 'Content-Length': str(len(body)),
857 }
858 http_request = self.build_base_http_request(
859 method='POST', path='/', auth_path='/', params={},
860 headers=headers, data=body)
861 response = self._mexe(http_request, sender=None,
862 override_num_retries=10)
863 response_body = response.read().decode('utf-8')
864 boto.log.debug(response.getheaders())
865 boto.log.debug(response_body)
866 if response.status == 200:
867 if response_body:
868 return json.loads(response_body)
869 else:
870 json_body = json.loads(response_body)
871 fault_name = json_body.get('__type', None)
872 exception_class = self._faults.get(fault_name, self.ResponseError)
873 raise exception_class(response.status, response.reason,
874 body=json_body)
875
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698