Index: tools/telemetry/third_party/gsutil/third_party/boto/boto/dynamodb2/table.py |
diff --git a/tools/telemetry/third_party/gsutil/third_party/boto/boto/dynamodb2/table.py b/tools/telemetry/third_party/gsutil/third_party/boto/boto/dynamodb2/table.py |
deleted file mode 100644 |
index d02ff5c7deb4acbb53925c85ab0e489649192a4d..0000000000000000000000000000000000000000 |
--- a/tools/telemetry/third_party/gsutil/third_party/boto/boto/dynamodb2/table.py |
+++ /dev/null |
@@ -1,1722 +0,0 @@ |
-import boto |
-from boto.dynamodb2 import exceptions |
-from boto.dynamodb2.fields import (HashKey, RangeKey, |
- AllIndex, KeysOnlyIndex, IncludeIndex, |
- GlobalAllIndex, GlobalKeysOnlyIndex, |
- GlobalIncludeIndex) |
-from boto.dynamodb2.items import Item |
-from boto.dynamodb2.layer1 import DynamoDBConnection |
-from boto.dynamodb2.results import ResultSet, BatchGetResultSet |
-from boto.dynamodb2.types import (NonBooleanDynamizer, Dynamizer, FILTER_OPERATORS, |
- QUERY_OPERATORS, STRING) |
-from boto.exception import JSONResponseError |
- |
- |
-class Table(object): |
- """ |
- Interacts & models the behavior of a DynamoDB table. |
- |
- The ``Table`` object represents a set (or rough categorization) of |
- records within DynamoDB. The important part is that all records within the |
- table, while largely-schema-free, share the same schema & are essentially |
- namespaced for use in your application. For example, you might have a |
- ``users`` table or a ``forums`` table. |
- """ |
- max_batch_get = 100 |
- |
- _PROJECTION_TYPE_TO_INDEX = dict( |
- global_indexes=dict( |
- ALL=GlobalAllIndex, |
- KEYS_ONLY=GlobalKeysOnlyIndex, |
- INCLUDE=GlobalIncludeIndex, |
- ), local_indexes=dict( |
- ALL=AllIndex, |
- KEYS_ONLY=KeysOnlyIndex, |
- INCLUDE=IncludeIndex, |
- ) |
- ) |
- |
- def __init__(self, table_name, schema=None, throughput=None, indexes=None, |
- global_indexes=None, connection=None): |
- """ |
- Sets up a new in-memory ``Table``. |
- |
- This is useful if the table already exists within DynamoDB & you simply |
- want to use it for additional interactions. The only required parameter |
- is the ``table_name``. However, under the hood, the object will call |
- ``describe_table`` to determine the schema/indexes/throughput. You |
- can avoid this extra call by passing in ``schema`` & ``indexes``. |
- |
- **IMPORTANT** - If you're creating a new ``Table`` for the first time, |
- you should use the ``Table.create`` method instead, as it will |
- persist the table structure to DynamoDB. |
- |
- Requires a ``table_name`` parameter, which should be a simple string |
- of the name of the table. |
- |
- Optionally accepts a ``schema`` parameter, which should be a list of |
- ``BaseSchemaField`` subclasses representing the desired schema. |
- |
- Optionally accepts a ``throughput`` parameter, which should be a |
- dictionary. If provided, it should specify a ``read`` & ``write`` key, |
- both of which should have an integer value associated with them. |
- |
- Optionally accepts a ``indexes`` parameter, which should be a list of |
- ``BaseIndexField`` subclasses representing the desired indexes. |
- |
- Optionally accepts a ``global_indexes`` parameter, which should be a |
- list of ``GlobalBaseIndexField`` subclasses representing the desired |
- indexes. |
- |
- Optionally accepts a ``connection`` parameter, which should be a |
- ``DynamoDBConnection`` instance (or subclass). This is primarily useful |
- for specifying alternate connection parameters. |
- |
- Example:: |
- |
- # The simple, it-already-exists case. |
- >>> conn = Table('users') |
- |
- # The full, minimum-extra-calls case. |
- >>> from boto import dynamodb2 |
- >>> users = Table('users', schema=[ |
- ... HashKey('username'), |
- ... RangeKey('date_joined', data_type=NUMBER) |
- ... ], throughput={ |
- ... 'read':20, |
- ... 'write': 10, |
- ... }, indexes=[ |
- ... KeysOnlyIndex('MostRecentlyJoined', parts=[ |
- ... HashKey('username') |
- ... RangeKey('date_joined') |
- ... ]), |
- ... ], global_indexes=[ |
- ... GlobalAllIndex('UsersByZipcode', parts=[ |
- ... HashKey('zipcode'), |
- ... RangeKey('username'), |
- ... ], |
- ... throughput={ |
- ... 'read':10, |
- ... 'write":10, |
- ... }), |
- ... ], connection=dynamodb2.connect_to_region('us-west-2', |
- ... aws_access_key_id='key', |
- ... aws_secret_access_key='key', |
- ... )) |
- |
- """ |
- self.table_name = table_name |
- self.connection = connection |
- self.throughput = { |
- 'read': 5, |
- 'write': 5, |
- } |
- self.schema = schema |
- self.indexes = indexes |
- self.global_indexes = global_indexes |
- |
- if self.connection is None: |
- self.connection = DynamoDBConnection() |
- |
- if throughput is not None: |
- self.throughput = throughput |
- |
- self._dynamizer = NonBooleanDynamizer() |
- |
- def use_boolean(self): |
- self._dynamizer = Dynamizer() |
- |
- @classmethod |
- def create(cls, table_name, schema, throughput=None, indexes=None, |
- global_indexes=None, connection=None): |
- """ |
- Creates a new table in DynamoDB & returns an in-memory ``Table`` object. |
- |
- This will setup a brand new table within DynamoDB. The ``table_name`` |
- must be unique for your AWS account. The ``schema`` is also required |
- to define the key structure of the table. |
- |
- **IMPORTANT** - You should consider the usage pattern of your table |
- up-front, as the schema can **NOT** be modified once the table is |
- created, requiring the creation of a new table & migrating the data |
- should you wish to revise it. |
- |
- **IMPORTANT** - If the table already exists in DynamoDB, additional |
- calls to this method will result in an error. If you just need |
- a ``Table`` object to interact with the existing table, you should |
- just initialize a new ``Table`` object, which requires only the |
- ``table_name``. |
- |
- Requires a ``table_name`` parameter, which should be a simple string |
- of the name of the table. |
- |
- Requires a ``schema`` parameter, which should be a list of |
- ``BaseSchemaField`` subclasses representing the desired schema. |
- |
- Optionally accepts a ``throughput`` parameter, which should be a |
- dictionary. If provided, it should specify a ``read`` & ``write`` key, |
- both of which should have an integer value associated with them. |
- |
- Optionally accepts a ``indexes`` parameter, which should be a list of |
- ``BaseIndexField`` subclasses representing the desired indexes. |
- |
- Optionally accepts a ``global_indexes`` parameter, which should be a |
- list of ``GlobalBaseIndexField`` subclasses representing the desired |
- indexes. |
- |
- Optionally accepts a ``connection`` parameter, which should be a |
- ``DynamoDBConnection`` instance (or subclass). This is primarily useful |
- for specifying alternate connection parameters. |
- |
- Example:: |
- |
- >>> users = Table.create('users', schema=[ |
- ... HashKey('username'), |
- ... RangeKey('date_joined', data_type=NUMBER) |
- ... ], throughput={ |
- ... 'read':20, |
- ... 'write': 10, |
- ... }, indexes=[ |
- ... KeysOnlyIndex('MostRecentlyJoined', parts=[ |
- ... RangeKey('date_joined') |
- ... ]), global_indexes=[ |
- ... GlobalAllIndex('UsersByZipcode', parts=[ |
- ... HashKey('zipcode'), |
- ... RangeKey('username'), |
- ... ], |
- ... throughput={ |
- ... 'read':10, |
- ... 'write':10, |
- ... }), |
- ... ]) |
- |
- """ |
- table = cls(table_name=table_name, connection=connection) |
- table.schema = schema |
- |
- if throughput is not None: |
- table.throughput = throughput |
- |
- if indexes is not None: |
- table.indexes = indexes |
- |
- if global_indexes is not None: |
- table.global_indexes = global_indexes |
- |
- # Prep the schema. |
- raw_schema = [] |
- attr_defs = [] |
- seen_attrs = set() |
- |
- for field in table.schema: |
- raw_schema.append(field.schema()) |
- # Build the attributes off what we know. |
- seen_attrs.add(field.name) |
- attr_defs.append(field.definition()) |
- |
- raw_throughput = { |
- 'ReadCapacityUnits': int(table.throughput['read']), |
- 'WriteCapacityUnits': int(table.throughput['write']), |
- } |
- kwargs = {} |
- |
- kwarg_map = { |
- 'indexes': 'local_secondary_indexes', |
- 'global_indexes': 'global_secondary_indexes', |
- } |
- for index_attr in ('indexes', 'global_indexes'): |
- table_indexes = getattr(table, index_attr) |
- if table_indexes: |
- raw_indexes = [] |
- for index_field in table_indexes: |
- raw_indexes.append(index_field.schema()) |
- # Make sure all attributes specified in the indexes are |
- # added to the definition |
- for field in index_field.parts: |
- if field.name not in seen_attrs: |
- seen_attrs.add(field.name) |
- attr_defs.append(field.definition()) |
- |
- kwargs[kwarg_map[index_attr]] = raw_indexes |
- |
- table.connection.create_table( |
- table_name=table.table_name, |
- attribute_definitions=attr_defs, |
- key_schema=raw_schema, |
- provisioned_throughput=raw_throughput, |
- **kwargs |
- ) |
- return table |
- |
- def _introspect_schema(self, raw_schema, raw_attributes=None): |
- """ |
- Given a raw schema structure back from a DynamoDB response, parse |
- out & build the high-level Python objects that represent them. |
- """ |
- schema = [] |
- sane_attributes = {} |
- |
- if raw_attributes: |
- for field in raw_attributes: |
- sane_attributes[field['AttributeName']] = field['AttributeType'] |
- |
- for field in raw_schema: |
- data_type = sane_attributes.get(field['AttributeName'], STRING) |
- |
- if field['KeyType'] == 'HASH': |
- schema.append( |
- HashKey(field['AttributeName'], data_type=data_type) |
- ) |
- elif field['KeyType'] == 'RANGE': |
- schema.append( |
- RangeKey(field['AttributeName'], data_type=data_type) |
- ) |
- else: |
- raise exceptions.UnknownSchemaFieldError( |
- "%s was seen, but is unknown. Please report this at " |
- "https://github.com/boto/boto/issues." % field['KeyType'] |
- ) |
- |
- return schema |
- |
- def _introspect_all_indexes(self, raw_indexes, map_indexes_projection): |
- """ |
- Given a raw index/global index structure back from a DynamoDB response, |
- parse out & build the high-level Python objects that represent them. |
- """ |
- indexes = [] |
- |
- for field in raw_indexes: |
- index_klass = map_indexes_projection.get('ALL') |
- kwargs = { |
- 'parts': [] |
- } |
- |
- if field['Projection']['ProjectionType'] == 'ALL': |
- index_klass = map_indexes_projection.get('ALL') |
- elif field['Projection']['ProjectionType'] == 'KEYS_ONLY': |
- index_klass = map_indexes_projection.get('KEYS_ONLY') |
- elif field['Projection']['ProjectionType'] == 'INCLUDE': |
- index_klass = map_indexes_projection.get('INCLUDE') |
- kwargs['includes'] = field['Projection']['NonKeyAttributes'] |
- else: |
- raise exceptions.UnknownIndexFieldError( |
- "%s was seen, but is unknown. Please report this at " |
- "https://github.com/boto/boto/issues." % \ |
- field['Projection']['ProjectionType'] |
- ) |
- |
- name = field['IndexName'] |
- kwargs['parts'] = self._introspect_schema(field['KeySchema'], None) |
- indexes.append(index_klass(name, **kwargs)) |
- |
- return indexes |
- |
- def _introspect_indexes(self, raw_indexes): |
- """ |
- Given a raw index structure back from a DynamoDB response, parse |
- out & build the high-level Python objects that represent them. |
- """ |
- return self._introspect_all_indexes( |
- raw_indexes, self._PROJECTION_TYPE_TO_INDEX.get('local_indexes')) |
- |
- def _introspect_global_indexes(self, raw_global_indexes): |
- """ |
- Given a raw global index structure back from a DynamoDB response, parse |
- out & build the high-level Python objects that represent them. |
- """ |
- return self._introspect_all_indexes( |
- raw_global_indexes, |
- self._PROJECTION_TYPE_TO_INDEX.get('global_indexes')) |
- |
- def describe(self): |
- """ |
- Describes the current structure of the table in DynamoDB. |
- |
- This information will be used to update the ``schema``, ``indexes``, |
- ``global_indexes`` and ``throughput`` information on the ``Table``. Some |
- calls, such as those involving creating keys or querying, will require |
- this information to be populated. |
- |
- It also returns the full raw data structure from DynamoDB, in the |
- event you'd like to parse out additional information (such as the |
- ``ItemCount`` or usage information). |
- |
- Example:: |
- |
- >>> users.describe() |
- { |
- # Lots of keys here... |
- } |
- >>> len(users.schema) |
- 2 |
- |
- """ |
- result = self.connection.describe_table(self.table_name) |
- |
- # Blindly update throughput, since what's on DynamoDB's end is likely |
- # more correct. |
- raw_throughput = result['Table']['ProvisionedThroughput'] |
- self.throughput['read'] = int(raw_throughput['ReadCapacityUnits']) |
- self.throughput['write'] = int(raw_throughput['WriteCapacityUnits']) |
- |
- if not self.schema: |
- # Since we have the data, build the schema. |
- raw_schema = result['Table'].get('KeySchema', []) |
- raw_attributes = result['Table'].get('AttributeDefinitions', []) |
- self.schema = self._introspect_schema(raw_schema, raw_attributes) |
- |
- if not self.indexes: |
- # Build the index information as well. |
- raw_indexes = result['Table'].get('LocalSecondaryIndexes', []) |
- self.indexes = self._introspect_indexes(raw_indexes) |
- |
- # Build the global index information as well. |
- raw_global_indexes = result['Table'].get('GlobalSecondaryIndexes', []) |
- self.global_indexes = self._introspect_global_indexes(raw_global_indexes) |
- |
- # This is leaky. |
- return result |
- |
- def update(self, throughput=None, global_indexes=None): |
- """ |
- Updates table attributes and global indexes in DynamoDB. |
- |
- Optionally accepts a ``throughput`` parameter, which should be a |
- dictionary. If provided, it should specify a ``read`` & ``write`` key, |
- both of which should have an integer value associated with them. |
- |
- Optionally accepts a ``global_indexes`` parameter, which should be a |
- dictionary. If provided, it should specify the index name, which is also |
- a dict containing a ``read`` & ``write`` key, both of which |
- should have an integer value associated with them. If you are writing |
- new code, please use ``Table.update_global_secondary_index``. |
- |
- Returns ``True`` on success. |
- |
- Example:: |
- |
- # For a read-heavier application... |
- >>> users.update(throughput={ |
- ... 'read': 20, |
- ... 'write': 10, |
- ... }) |
- True |
- |
- # To also update the global index(es) throughput. |
- >>> users.update(throughput={ |
- ... 'read': 20, |
- ... 'write': 10, |
- ... }, |
- ... global_secondary_indexes={ |
- ... 'TheIndexNameHere': { |
- ... 'read': 15, |
- ... 'write': 5, |
- ... } |
- ... }) |
- True |
- """ |
- |
- data = None |
- |
- if throughput: |
- self.throughput = throughput |
- data = { |
- 'ReadCapacityUnits': int(self.throughput['read']), |
- 'WriteCapacityUnits': int(self.throughput['write']), |
- } |
- |
- gsi_data = None |
- |
- if global_indexes: |
- gsi_data = [] |
- |
- for gsi_name, gsi_throughput in global_indexes.items(): |
- gsi_data.append({ |
- "Update": { |
- "IndexName": gsi_name, |
- "ProvisionedThroughput": { |
- "ReadCapacityUnits": int(gsi_throughput['read']), |
- "WriteCapacityUnits": int(gsi_throughput['write']), |
- }, |
- }, |
- }) |
- |
- if throughput or global_indexes: |
- self.connection.update_table( |
- self.table_name, |
- provisioned_throughput=data, |
- global_secondary_index_updates=gsi_data, |
- ) |
- |
- return True |
- else: |
- msg = 'You need to provide either the throughput or the ' \ |
- 'global_indexes to update method' |
- boto.log.error(msg) |
- |
- return False |
- |
- def create_global_secondary_index(self, global_index): |
- """ |
- Creates a global index in DynamoDB after the table has been created. |
- |
- Requires a ``global_indexes`` parameter, which should be a |
- ``GlobalBaseIndexField`` subclass representing the desired index. |
- |
- To update ``global_indexes`` information on the ``Table``, you'll need |
- to call ``Table.describe``. |
- |
- Returns ``True`` on success. |
- |
- Example:: |
- |
- # To create a global index |
- >>> users.create_global_secondary_index( |
- ... global_index=GlobalAllIndex( |
- ... 'TheIndexNameHere', parts=[ |
- ... HashKey('requiredHashkey', data_type=STRING), |
- ... RangeKey('optionalRangeKey', data_type=STRING) |
- ... ], |
- ... throughput={ |
- ... 'read': 2, |
- ... 'write': 1, |
- ... }) |
- ... ) |
- True |
- |
- """ |
- |
- if global_index: |
- gsi_data = [] |
- gsi_data_attr_def = [] |
- |
- gsi_data.append({ |
- "Create": global_index.schema() |
- }) |
- |
- for attr_def in global_index.parts: |
- gsi_data_attr_def.append(attr_def.definition()) |
- |
- self.connection.update_table( |
- self.table_name, |
- global_secondary_index_updates=gsi_data, |
- attribute_definitions=gsi_data_attr_def |
- ) |
- |
- return True |
- else: |
- msg = 'You need to provide the global_index to ' \ |
- 'create_global_secondary_index method' |
- boto.log.error(msg) |
- |
- return False |
- |
- def delete_global_secondary_index(self, global_index_name): |
- """ |
- Deletes a global index in DynamoDB after the table has been created. |
- |
- Requires a ``global_index_name`` parameter, which should be a simple |
- string of the name of the global secondary index. |
- |
- To update ``global_indexes`` information on the ``Table``, you'll need |
- to call ``Table.describe``. |
- |
- Returns ``True`` on success. |
- |
- Example:: |
- |
- # To delete a global index |
- >>> users.delete_global_secondary_index('TheIndexNameHere') |
- True |
- |
- """ |
- |
- if global_index_name: |
- gsi_data = [ |
- { |
- "Delete": { |
- "IndexName": global_index_name |
- } |
- } |
- ] |
- |
- self.connection.update_table( |
- self.table_name, |
- global_secondary_index_updates=gsi_data, |
- ) |
- |
- return True |
- else: |
- msg = 'You need to provide the global index name to ' \ |
- 'delete_global_secondary_index method' |
- boto.log.error(msg) |
- |
- return False |
- |
- def update_global_secondary_index(self, global_indexes): |
- """ |
- Updates a global index(es) in DynamoDB after the table has been created. |
- |
- Requires a ``global_indexes`` parameter, which should be a |
- dictionary. If provided, it should specify the index name, which is also |
- a dict containing a ``read`` & ``write`` key, both of which |
- should have an integer value associated with them. |
- |
- To update ``global_indexes`` information on the ``Table``, you'll need |
- to call ``Table.describe``. |
- |
- Returns ``True`` on success. |
- |
- Example:: |
- |
- # To update a global index |
- >>> users.update_global_secondary_index(global_indexes={ |
- ... 'TheIndexNameHere': { |
- ... 'read': 15, |
- ... 'write': 5, |
- ... } |
- ... }) |
- True |
- |
- """ |
- |
- if global_indexes: |
- gsi_data = [] |
- |
- for gsi_name, gsi_throughput in global_indexes.items(): |
- gsi_data.append({ |
- "Update": { |
- "IndexName": gsi_name, |
- "ProvisionedThroughput": { |
- "ReadCapacityUnits": int(gsi_throughput['read']), |
- "WriteCapacityUnits": int(gsi_throughput['write']), |
- }, |
- }, |
- }) |
- |
- self.connection.update_table( |
- self.table_name, |
- global_secondary_index_updates=gsi_data, |
- ) |
- return True |
- else: |
- msg = 'You need to provide the global indexes to ' \ |
- 'update_global_secondary_index method' |
- boto.log.error(msg) |
- |
- return False |
- |
- def delete(self): |
- """ |
- Deletes a table in DynamoDB. |
- |
- **IMPORTANT** - Be careful when using this method, there is no undo. |
- |
- Returns ``True`` on success. |
- |
- Example:: |
- |
- >>> users.delete() |
- True |
- |
- """ |
- self.connection.delete_table(self.table_name) |
- return True |
- |
- def _encode_keys(self, keys): |
- """ |
- Given a flat Python dictionary of keys/values, converts it into the |
- nested dictionary DynamoDB expects. |
- |
- Converts:: |
- |
- { |
- 'username': 'john', |
- 'tags': [1, 2, 5], |
- } |
- |
- ...to...:: |
- |
- { |
- 'username': {'S': 'john'}, |
- 'tags': {'NS': ['1', '2', '5']}, |
- } |
- |
- """ |
- raw_key = {} |
- |
- for key, value in keys.items(): |
- raw_key[key] = self._dynamizer.encode(value) |
- |
- return raw_key |
- |
- def get_item(self, consistent=False, attributes=None, **kwargs): |
- """ |
- Fetches an item (record) from a table in DynamoDB. |
- |
- To specify the key of the item you'd like to get, you can specify the |
- key attributes as kwargs. |
- |
- Optionally accepts a ``consistent`` parameter, which should be a |
- boolean. If you provide ``True``, it will perform |
- a consistent (but more expensive) read from DynamoDB. |
- (Default: ``False``) |
- |
- Optionally accepts an ``attributes`` parameter, which should be a |
- list of fieldname to fetch. (Default: ``None``, which means all fields |
- should be fetched) |
- |
- Returns an ``Item`` instance containing all the data for that record. |
- |
- Raises an ``ItemNotFound`` exception if the item is not found. |
- |
- Example:: |
- |
- # A simple hash key. |
- >>> john = users.get_item(username='johndoe') |
- >>> john['first_name'] |
- 'John' |
- |
- # A complex hash+range key. |
- >>> john = users.get_item(username='johndoe', last_name='Doe') |
- >>> john['first_name'] |
- 'John' |
- |
- # A consistent read (assuming the data might have just changed). |
- >>> john = users.get_item(username='johndoe', consistent=True) |
- >>> john['first_name'] |
- 'Johann' |
- |
- # With a key that is an invalid variable name in Python. |
- # Also, assumes a different schema than previous examples. |
- >>> john = users.get_item(**{ |
- ... 'date-joined': 127549192, |
- ... }) |
- >>> john['first_name'] |
- 'John' |
- |
- """ |
- raw_key = self._encode_keys(kwargs) |
- item_data = self.connection.get_item( |
- self.table_name, |
- raw_key, |
- attributes_to_get=attributes, |
- consistent_read=consistent |
- ) |
- if 'Item' not in item_data: |
- raise exceptions.ItemNotFound("Item %s couldn't be found." % kwargs) |
- item = Item(self) |
- item.load(item_data) |
- return item |
- |
- def has_item(self, **kwargs): |
- """ |
- Return whether an item (record) exists within a table in DynamoDB. |
- |
- To specify the key of the item you'd like to get, you can specify the |
- key attributes as kwargs. |
- |
- Optionally accepts a ``consistent`` parameter, which should be a |
- boolean. If you provide ``True``, it will perform |
- a consistent (but more expensive) read from DynamoDB. |
- (Default: ``False``) |
- |
- Optionally accepts an ``attributes`` parameter, which should be a |
- list of fieldnames to fetch. (Default: ``None``, which means all fields |
- should be fetched) |
- |
- Returns ``True`` if an ``Item`` is present, ``False`` if not. |
- |
- Example:: |
- |
- # Simple, just hash-key schema. |
- >>> users.has_item(username='johndoe') |
- True |
- |
- # Complex schema, item not present. |
- >>> users.has_item( |
- ... username='johndoe', |
- ... date_joined='2014-01-07' |
- ... ) |
- False |
- |
- """ |
- try: |
- self.get_item(**kwargs) |
- except (JSONResponseError, exceptions.ItemNotFound): |
- return False |
- |
- return True |
- |
- def lookup(self, *args, **kwargs): |
- """ |
- Look up an entry in DynamoDB. This is mostly backwards compatible |
- with boto.dynamodb. Unlike get_item, it takes hash_key and range_key first, |
- although you may still specify keyword arguments instead. |
- |
- Also unlike the get_item command, if the returned item has no keys |
- (i.e., it does not exist in DynamoDB), a None result is returned, instead |
- of an empty key object. |
- |
- Example:: |
- >>> user = users.lookup(username) |
- >>> user = users.lookup(username, consistent=True) |
- >>> app = apps.lookup('my_customer_id', 'my_app_id') |
- |
- """ |
- if not self.schema: |
- self.describe() |
- for x, arg in enumerate(args): |
- kwargs[self.schema[x].name] = arg |
- ret = self.get_item(**kwargs) |
- if not ret.keys(): |
- return None |
- return ret |
- |
- def new_item(self, *args): |
- """ |
- Returns a new, blank item |
- |
- This is mostly for consistency with boto.dynamodb |
- """ |
- if not self.schema: |
- self.describe() |
- data = {} |
- for x, arg in enumerate(args): |
- data[self.schema[x].name] = arg |
- return Item(self, data=data) |
- |
- def put_item(self, data, overwrite=False): |
- """ |
- Saves an entire item to DynamoDB. |
- |
- By default, if any part of the ``Item``'s original data doesn't match |
- what's currently in DynamoDB, this request will fail. This prevents |
- other processes from updating the data in between when you read the |
- item & when your request to update the item's data is processed, which |
- would typically result in some data loss. |
- |
- Requires a ``data`` parameter, which should be a dictionary of the data |
- you'd like to store in DynamoDB. |
- |
- Optionally accepts an ``overwrite`` parameter, which should be a |
- boolean. If you provide ``True``, this will tell DynamoDB to blindly |
- overwrite whatever data is present, if any. |
- |
- Returns ``True`` on success. |
- |
- Example:: |
- |
- >>> users.put_item(data={ |
- ... 'username': 'jane', |
- ... 'first_name': 'Jane', |
- ... 'last_name': 'Doe', |
- ... 'date_joined': 126478915, |
- ... }) |
- True |
- |
- """ |
- item = Item(self, data=data) |
- return item.save(overwrite=overwrite) |
- |
- def _put_item(self, item_data, expects=None): |
- """ |
- The internal variant of ``put_item`` (full data). This is used by the |
- ``Item`` objects, since that operation is represented at the |
- table-level by the API, but conceptually maps better to telling an |
- individual ``Item`` to save itself. |
- """ |
- kwargs = {} |
- |
- if expects is not None: |
- kwargs['expected'] = expects |
- |
- self.connection.put_item(self.table_name, item_data, **kwargs) |
- return True |
- |
- def _update_item(self, key, item_data, expects=None): |
- """ |
- The internal variant of ``put_item`` (partial data). This is used by the |
- ``Item`` objects, since that operation is represented at the |
- table-level by the API, but conceptually maps better to telling an |
- individual ``Item`` to save itself. |
- """ |
- raw_key = self._encode_keys(key) |
- kwargs = {} |
- |
- if expects is not None: |
- kwargs['expected'] = expects |
- |
- self.connection.update_item(self.table_name, raw_key, item_data, **kwargs) |
- return True |
- |
- def delete_item(self, expected=None, conditional_operator=None, **kwargs): |
- """ |
- Deletes a single item. You can perform a conditional delete operation |
- that deletes the item if it exists, or if it has an expected attribute |
- value. |
- |
- Conditional deletes are useful for only deleting items if specific |
- conditions are met. If those conditions are met, DynamoDB performs |
- the delete. Otherwise, the item is not deleted. |
- |
- To specify the expected attribute values of the item, you can pass a |
- dictionary of conditions to ``expected``. Each condition should follow |
- the pattern ``<attributename>__<comparison_operator>=<value_to_expect>``. |
- |
- **IMPORTANT** - Be careful when using this method, there is no undo. |
- |
- To specify the key of the item you'd like to get, you can specify the |
- key attributes as kwargs. |
- |
- Optionally accepts an ``expected`` parameter which is a dictionary of |
- expected attribute value conditions. |
- |
- Optionally accepts a ``conditional_operator`` which applies to the |
- expected attribute value conditions: |
- |
- + `AND` - If all of the conditions evaluate to true (default) |
- + `OR` - True if at least one condition evaluates to true |
- |
- Returns ``True`` on success, ``False`` on failed conditional delete. |
- |
- Example:: |
- |
- # A simple hash key. |
- >>> users.delete_item(username='johndoe') |
- True |
- |
- # A complex hash+range key. |
- >>> users.delete_item(username='jane', last_name='Doe') |
- True |
- |
- # With a key that is an invalid variable name in Python. |
- # Also, assumes a different schema than previous examples. |
- >>> users.delete_item(**{ |
- ... 'date-joined': 127549192, |
- ... }) |
- True |
- |
- # Conditional delete |
- >>> users.delete_item(username='johndoe', |
- ... expected={'balance__eq': 0}) |
- True |
- """ |
- expected = self._build_filters(expected, using=FILTER_OPERATORS) |
- raw_key = self._encode_keys(kwargs) |
- |
- try: |
- self.connection.delete_item(self.table_name, raw_key, |
- expected=expected, |
- conditional_operator=conditional_operator) |
- except exceptions.ConditionalCheckFailedException: |
- return False |
- |
- return True |
- |
- def get_key_fields(self): |
- """ |
- Returns the fields necessary to make a key for a table. |
- |
- If the ``Table`` does not already have a populated ``schema``, |
- this will request it via a ``Table.describe`` call. |
- |
- Returns a list of fieldnames (strings). |
- |
- Example:: |
- |
- # A simple hash key. |
- >>> users.get_key_fields() |
- ['username'] |
- |
- # A complex hash+range key. |
- >>> users.get_key_fields() |
- ['username', 'last_name'] |
- |
- """ |
- if not self.schema: |
- # We don't know the structure of the table. Get a description to |
- # populate the schema. |
- self.describe() |
- |
- return [field.name for field in self.schema] |
- |
- def batch_write(self): |
- """ |
- Allows the batching of writes to DynamoDB. |
- |
- Since each write/delete call to DynamoDB has a cost associated with it, |
- when loading lots of data, it makes sense to batch them, creating as |
- few calls as possible. |
- |
- This returns a context manager that will transparently handle creating |
- these batches. The object you get back lightly-resembles a ``Table`` |
- object, sharing just the ``put_item`` & ``delete_item`` methods |
- (which are all that DynamoDB can batch in terms of writing data). |
- |
- DynamoDB's maximum batch size is 25 items per request. If you attempt |
- to put/delete more than that, the context manager will batch as many |
- as it can up to that number, then flush them to DynamoDB & continue |
- batching as more calls come in. |
- |
- Example:: |
- |
- # Assuming a table with one record... |
- >>> with users.batch_write() as batch: |
- ... batch.put_item(data={ |
- ... 'username': 'johndoe', |
- ... 'first_name': 'John', |
- ... 'last_name': 'Doe', |
- ... 'owner': 1, |
- ... }) |
- ... # Nothing across the wire yet. |
- ... batch.delete_item(username='bob') |
- ... # Still no requests sent. |
- ... batch.put_item(data={ |
- ... 'username': 'jane', |
- ... 'first_name': 'Jane', |
- ... 'last_name': 'Doe', |
- ... 'date_joined': 127436192, |
- ... }) |
- ... # Nothing yet, but once we leave the context, the |
- ... # put/deletes will be sent. |
- |
- """ |
- # PHENOMENAL COSMIC DOCS!!! itty-bitty code. |
- return BatchTable(self) |
- |
- def _build_filters(self, filter_kwargs, using=QUERY_OPERATORS): |
- """ |
- An internal method for taking query/scan-style ``**kwargs`` & turning |
- them into the raw structure DynamoDB expects for filtering. |
- """ |
- if filter_kwargs is None: |
- return |
- |
- filters = {} |
- |
- for field_and_op, value in filter_kwargs.items(): |
- field_bits = field_and_op.split('__') |
- fieldname = '__'.join(field_bits[:-1]) |
- |
- try: |
- op = using[field_bits[-1]] |
- except KeyError: |
- raise exceptions.UnknownFilterTypeError( |
- "Operator '%s' from '%s' is not recognized." % ( |
- field_bits[-1], |
- field_and_op |
- ) |
- ) |
- |
- lookup = { |
- 'AttributeValueList': [], |
- 'ComparisonOperator': op, |
- } |
- |
- # Special-case the ``NULL/NOT_NULL`` case. |
- if field_bits[-1] == 'null': |
- del lookup['AttributeValueList'] |
- |
- if value is False: |
- lookup['ComparisonOperator'] = 'NOT_NULL' |
- else: |
- lookup['ComparisonOperator'] = 'NULL' |
- # Special-case the ``BETWEEN`` case. |
- elif field_bits[-1] == 'between': |
- if len(value) == 2 and isinstance(value, (list, tuple)): |
- lookup['AttributeValueList'].append( |
- self._dynamizer.encode(value[0]) |
- ) |
- lookup['AttributeValueList'].append( |
- self._dynamizer.encode(value[1]) |
- ) |
- # Special-case the ``IN`` case |
- elif field_bits[-1] == 'in': |
- for val in value: |
- lookup['AttributeValueList'].append(self._dynamizer.encode(val)) |
- else: |
- # Fix up the value for encoding, because it was built to only work |
- # with ``set``s. |
- if isinstance(value, (list, tuple)): |
- value = set(value) |
- lookup['AttributeValueList'].append( |
- self._dynamizer.encode(value) |
- ) |
- |
- # Finally, insert it into the filters. |
- filters[fieldname] = lookup |
- |
- return filters |
- |
- def query(self, limit=None, index=None, reverse=False, consistent=False, |
- attributes=None, max_page_size=None, **filter_kwargs): |
- """ |
- **WARNING:** This method is provided **strictly** for |
- backward-compatibility. It returns results in an incorrect order. |
- |
- If you are writing new code, please use ``Table.query_2``. |
- """ |
- reverse = not reverse |
- return self.query_2(limit=limit, index=index, reverse=reverse, |
- consistent=consistent, attributes=attributes, |
- max_page_size=max_page_size, **filter_kwargs) |
- |
- def query_2(self, limit=None, index=None, reverse=False, |
- consistent=False, attributes=None, max_page_size=None, |
- query_filter=None, conditional_operator=None, |
- **filter_kwargs): |
- """ |
- Queries for a set of matching items in a DynamoDB table. |
- |
- Queries can be performed against a hash key, a hash+range key or |
- against any data stored in your local secondary indexes. Query filters |
- can be used to filter on arbitrary fields. |
- |
- **Note** - You can not query against arbitrary fields within the data |
- stored in DynamoDB unless you specify ``query_filter`` values. |
- |
- To specify the filters of the items you'd like to get, you can specify |
- the filters as kwargs. Each filter kwarg should follow the pattern |
- ``<fieldname>__<filter_operation>=<value_to_look_for>``. Query filters |
- are specified in the same way. |
- |
- Optionally accepts a ``limit`` parameter, which should be an integer |
- count of the total number of items to return. (Default: ``None`` - |
- all results) |
- |
- Optionally accepts an ``index`` parameter, which should be a string of |
- name of the local secondary index you want to query against. |
- (Default: ``None``) |
- |
- Optionally accepts a ``reverse`` parameter, which will present the |
- results in reverse order. (Default: ``False`` - normal order) |
- |
- Optionally accepts a ``consistent`` parameter, which should be a |
- boolean. If you provide ``True``, it will force a consistent read of |
- the data (more expensive). (Default: ``False`` - use eventually |
- consistent reads) |
- |
- Optionally accepts a ``attributes`` parameter, which should be a |
- tuple. If you provide any attributes only these will be fetched |
- from DynamoDB. This uses the ``AttributesToGet`` and set's |
- ``Select`` to ``SPECIFIC_ATTRIBUTES`` API. |
- |
- Optionally accepts a ``max_page_size`` parameter, which should be an |
- integer count of the maximum number of items to retrieve |
- **per-request**. This is useful in making faster requests & prevent |
- the scan from drowning out other queries. (Default: ``None`` - |
- fetch as many as DynamoDB will return) |
- |
- Optionally accepts a ``query_filter`` which is a dictionary of filter |
- conditions against any arbitrary field in the returned data. |
- |
- Optionally accepts a ``conditional_operator`` which applies to the |
- query filter conditions: |
- |
- + `AND` - True if all filter conditions evaluate to true (default) |
- + `OR` - True if at least one filter condition evaluates to true |
- |
- Returns a ``ResultSet``, which transparently handles the pagination of |
- results you get back. |
- |
- Example:: |
- |
- # Look for last names equal to "Doe". |
- >>> results = users.query(last_name__eq='Doe') |
- >>> for res in results: |
- ... print res['first_name'] |
- 'John' |
- 'Jane' |
- |
- # Look for last names beginning with "D", in reverse order, limit 3. |
- >>> results = users.query( |
- ... last_name__beginswith='D', |
- ... reverse=True, |
- ... limit=3 |
- ... ) |
- >>> for res in results: |
- ... print res['first_name'] |
- 'Alice' |
- 'Jane' |
- 'John' |
- |
- # Use an LSI & a consistent read. |
- >>> results = users.query( |
- ... date_joined__gte=1236451000, |
- ... owner__eq=1, |
- ... index='DateJoinedIndex', |
- ... consistent=True |
- ... ) |
- >>> for res in results: |
- ... print res['first_name'] |
- 'Alice' |
- 'Bob' |
- 'John' |
- 'Fred' |
- |
- # Filter by non-indexed field(s) |
- >>> results = users.query( |
- ... last_name__eq='Doe', |
- ... reverse=True, |
- ... query_filter={ |
- ... 'first_name__beginswith': 'A' |
- ... } |
- ... ) |
- >>> for res in results: |
- ... print res['first_name'] + ' ' + res['last_name'] |
- 'Alice Doe' |
- |
- """ |
- if self.schema: |
- if len(self.schema) == 1: |
- if len(filter_kwargs) <= 1: |
- if not self.global_indexes or not len(self.global_indexes): |
- # If the schema only has one field, there's <= 1 filter |
- # param & no Global Secondary Indexes, this is user |
- # error. Bail early. |
- raise exceptions.QueryError( |
- "You must specify more than one key to filter on." |
- ) |
- |
- if attributes is not None: |
- select = 'SPECIFIC_ATTRIBUTES' |
- else: |
- select = None |
- |
- results = ResultSet( |
- max_page_size=max_page_size |
- ) |
- kwargs = filter_kwargs.copy() |
- kwargs.update({ |
- 'limit': limit, |
- 'index': index, |
- 'reverse': reverse, |
- 'consistent': consistent, |
- 'select': select, |
- 'attributes_to_get': attributes, |
- 'query_filter': query_filter, |
- 'conditional_operator': conditional_operator, |
- }) |
- results.to_call(self._query, **kwargs) |
- return results |
- |
- def query_count(self, index=None, consistent=False, conditional_operator=None, |
- query_filter=None, scan_index_forward=True, limit=None, |
- exclusive_start_key=None, **filter_kwargs): |
- """ |
- Queries the exact count of matching items in a DynamoDB table. |
- |
- Queries can be performed against a hash key, a hash+range key or |
- against any data stored in your local secondary indexes. Query filters |
- can be used to filter on arbitrary fields. |
- |
- To specify the filters of the items you'd like to get, you can specify |
- the filters as kwargs. Each filter kwarg should follow the pattern |
- ``<fieldname>__<filter_operation>=<value_to_look_for>``. Query filters |
- are specified in the same way. |
- |
- Optionally accepts an ``index`` parameter, which should be a string of |
- name of the local secondary index you want to query against. |
- (Default: ``None``) |
- |
- Optionally accepts a ``consistent`` parameter, which should be a |
- boolean. If you provide ``True``, it will force a consistent read of |
- the data (more expensive). (Default: ``False`` - use eventually |
- consistent reads) |
- |
- Optionally accepts a ``query_filter`` which is a dictionary of filter |
- conditions against any arbitrary field in the returned data. |
- |
- Optionally accepts a ``conditional_operator`` which applies to the |
- query filter conditions: |
- |
- + `AND` - True if all filter conditions evaluate to true (default) |
- + `OR` - True if at least one filter condition evaluates to true |
- |
- Optionally accept a ``exclusive_start_key`` which is used to get |
- the remaining items when a query cannot return the complete count. |
- |
- Returns an integer which represents the exact amount of matched |
- items. |
- |
- :type scan_index_forward: boolean |
- :param scan_index_forward: Specifies ascending (true) or descending |
- (false) traversal of the index. DynamoDB returns results reflecting |
- the requested order determined by the range key. If the data type |
- is Number, the results are returned in numeric order. For String, |
- the results are returned in order of ASCII character code values. |
- For Binary, DynamoDB treats each byte of the binary data as |
- unsigned when it compares binary values. |
- |
- If ScanIndexForward is not specified, the results are returned in |
- ascending order. |
- |
- :type limit: integer |
- :param limit: The maximum number of items to evaluate (not necessarily |
- the number of matching items). |
- |
- Example:: |
- |
- # Look for last names equal to "Doe". |
- >>> users.query_count(last_name__eq='Doe') |
- 5 |
- |
- # Use an LSI & a consistent read. |
- >>> users.query_count( |
- ... date_joined__gte=1236451000, |
- ... owner__eq=1, |
- ... index='DateJoinedIndex', |
- ... consistent=True |
- ... ) |
- 2 |
- |
- """ |
- key_conditions = self._build_filters( |
- filter_kwargs, |
- using=QUERY_OPERATORS |
- ) |
- |
- built_query_filter = self._build_filters( |
- query_filter, |
- using=FILTER_OPERATORS |
- ) |
- |
- count_buffer = 0 |
- last_evaluated_key = exclusive_start_key |
- |
- while True: |
- raw_results = self.connection.query( |
- self.table_name, |
- index_name=index, |
- consistent_read=consistent, |
- select='COUNT', |
- key_conditions=key_conditions, |
- query_filter=built_query_filter, |
- conditional_operator=conditional_operator, |
- limit=limit, |
- scan_index_forward=scan_index_forward, |
- exclusive_start_key=last_evaluated_key |
- ) |
- |
- count_buffer += int(raw_results.get('Count', 0)) |
- last_evaluated_key = raw_results.get('LastEvaluatedKey') |
- if not last_evaluated_key or count_buffer < 1: |
- break |
- |
- return count_buffer |
- |
- def _query(self, limit=None, index=None, reverse=False, consistent=False, |
- exclusive_start_key=None, select=None, attributes_to_get=None, |
- query_filter=None, conditional_operator=None, **filter_kwargs): |
- """ |
- The internal method that performs the actual queries. Used extensively |
- by ``ResultSet`` to perform each (paginated) request. |
- """ |
- kwargs = { |
- 'limit': limit, |
- 'index_name': index, |
- 'consistent_read': consistent, |
- 'select': select, |
- 'attributes_to_get': attributes_to_get, |
- 'conditional_operator': conditional_operator, |
- } |
- |
- if reverse: |
- kwargs['scan_index_forward'] = False |
- |
- if exclusive_start_key: |
- kwargs['exclusive_start_key'] = {} |
- |
- for key, value in exclusive_start_key.items(): |
- kwargs['exclusive_start_key'][key] = \ |
- self._dynamizer.encode(value) |
- |
- # Convert the filters into something we can actually use. |
- kwargs['key_conditions'] = self._build_filters( |
- filter_kwargs, |
- using=QUERY_OPERATORS |
- ) |
- |
- kwargs['query_filter'] = self._build_filters( |
- query_filter, |
- using=FILTER_OPERATORS |
- ) |
- |
- raw_results = self.connection.query( |
- self.table_name, |
- **kwargs |
- ) |
- results = [] |
- last_key = None |
- |
- for raw_item in raw_results.get('Items', []): |
- item = Item(self) |
- item.load({ |
- 'Item': raw_item, |
- }) |
- results.append(item) |
- |
- if raw_results.get('LastEvaluatedKey', None): |
- last_key = {} |
- |
- for key, value in raw_results['LastEvaluatedKey'].items(): |
- last_key[key] = self._dynamizer.decode(value) |
- |
- return { |
- 'results': results, |
- 'last_key': last_key, |
- } |
- |
- def scan(self, limit=None, segment=None, total_segments=None, |
- max_page_size=None, attributes=None, conditional_operator=None, |
- **filter_kwargs): |
- """ |
- Scans across all items within a DynamoDB table. |
- |
- Scans can be performed against a hash key or a hash+range key. You can |
- additionally filter the results after the table has been read but |
- before the response is returned by using query filters. |
- |
- To specify the filters of the items you'd like to get, you can specify |
- the filters as kwargs. Each filter kwarg should follow the pattern |
- ``<fieldname>__<filter_operation>=<value_to_look_for>``. |
- |
- Optionally accepts a ``limit`` parameter, which should be an integer |
- count of the total number of items to return. (Default: ``None`` - |
- all results) |
- |
- Optionally accepts a ``segment`` parameter, which should be an integer |
- of the segment to retrieve on. Please see the documentation about |
- Parallel Scans (Default: ``None`` - no segments) |
- |
- Optionally accepts a ``total_segments`` parameter, which should be an |
- integer count of number of segments to divide the table into. |
- Please see the documentation about Parallel Scans (Default: ``None`` - |
- no segments) |
- |
- Optionally accepts a ``max_page_size`` parameter, which should be an |
- integer count of the maximum number of items to retrieve |
- **per-request**. This is useful in making faster requests & prevent |
- the scan from drowning out other queries. (Default: ``None`` - |
- fetch as many as DynamoDB will return) |
- |
- Optionally accepts an ``attributes`` parameter, which should be a |
- tuple. If you provide any attributes only these will be fetched |
- from DynamoDB. This uses the ``AttributesToGet`` and set's |
- ``Select`` to ``SPECIFIC_ATTRIBUTES`` API. |
- |
- Returns a ``ResultSet``, which transparently handles the pagination of |
- results you get back. |
- |
- Example:: |
- |
- # All results. |
- >>> everything = users.scan() |
- |
- # Look for last names beginning with "D". |
- >>> results = users.scan(last_name__beginswith='D') |
- >>> for res in results: |
- ... print res['first_name'] |
- 'Alice' |
- 'John' |
- 'Jane' |
- |
- # Use an ``IN`` filter & limit. |
- >>> results = users.scan( |
- ... age__in=[25, 26, 27, 28, 29], |
- ... limit=1 |
- ... ) |
- >>> for res in results: |
- ... print res['first_name'] |
- 'Alice' |
- |
- """ |
- results = ResultSet( |
- max_page_size=max_page_size |
- ) |
- kwargs = filter_kwargs.copy() |
- kwargs.update({ |
- 'limit': limit, |
- 'segment': segment, |
- 'total_segments': total_segments, |
- 'attributes': attributes, |
- 'conditional_operator': conditional_operator, |
- }) |
- results.to_call(self._scan, **kwargs) |
- return results |
- |
- def _scan(self, limit=None, exclusive_start_key=None, segment=None, |
- total_segments=None, attributes=None, conditional_operator=None, |
- **filter_kwargs): |
- """ |
- The internal method that performs the actual scan. Used extensively |
- by ``ResultSet`` to perform each (paginated) request. |
- """ |
- kwargs = { |
- 'limit': limit, |
- 'segment': segment, |
- 'total_segments': total_segments, |
- 'attributes_to_get': attributes, |
- 'conditional_operator': conditional_operator, |
- } |
- |
- if exclusive_start_key: |
- kwargs['exclusive_start_key'] = {} |
- |
- for key, value in exclusive_start_key.items(): |
- kwargs['exclusive_start_key'][key] = \ |
- self._dynamizer.encode(value) |
- |
- # Convert the filters into something we can actually use. |
- kwargs['scan_filter'] = self._build_filters( |
- filter_kwargs, |
- using=FILTER_OPERATORS |
- ) |
- |
- raw_results = self.connection.scan( |
- self.table_name, |
- **kwargs |
- ) |
- results = [] |
- last_key = None |
- |
- for raw_item in raw_results.get('Items', []): |
- item = Item(self) |
- item.load({ |
- 'Item': raw_item, |
- }) |
- results.append(item) |
- |
- if raw_results.get('LastEvaluatedKey', None): |
- last_key = {} |
- |
- for key, value in raw_results['LastEvaluatedKey'].items(): |
- last_key[key] = self._dynamizer.decode(value) |
- |
- return { |
- 'results': results, |
- 'last_key': last_key, |
- } |
- |
- def batch_get(self, keys, consistent=False, attributes=None): |
- """ |
- Fetches many specific items in batch from a table. |
- |
- Requires a ``keys`` parameter, which should be a list of dictionaries. |
- Each dictionary should consist of the keys values to specify. |
- |
- Optionally accepts a ``consistent`` parameter, which should be a |
- boolean. If you provide ``True``, a strongly consistent read will be |
- used. (Default: False) |
- |
- Optionally accepts an ``attributes`` parameter, which should be a |
- tuple. If you provide any attributes only these will be fetched |
- from DynamoDB. |
- |
- Returns a ``ResultSet``, which transparently handles the pagination of |
- results you get back. |
- |
- Example:: |
- |
- >>> results = users.batch_get(keys=[ |
- ... { |
- ... 'username': 'johndoe', |
- ... }, |
- ... { |
- ... 'username': 'jane', |
- ... }, |
- ... { |
- ... 'username': 'fred', |
- ... }, |
- ... ]) |
- >>> for res in results: |
- ... print res['first_name'] |
- 'John' |
- 'Jane' |
- 'Fred' |
- |
- """ |
- # We pass the keys to the constructor instead, so it can maintain it's |
- # own internal state as to what keys have been processed. |
- results = BatchGetResultSet(keys=keys, max_batch_get=self.max_batch_get) |
- results.to_call(self._batch_get, consistent=consistent, attributes=attributes) |
- return results |
- |
- def _batch_get(self, keys, consistent=False, attributes=None): |
- """ |
- The internal method that performs the actual batch get. Used extensively |
- by ``BatchGetResultSet`` to perform each (paginated) request. |
- """ |
- items = { |
- self.table_name: { |
- 'Keys': [], |
- }, |
- } |
- |
- if consistent: |
- items[self.table_name]['ConsistentRead'] = True |
- |
- if attributes is not None: |
- items[self.table_name]['AttributesToGet'] = attributes |
- |
- for key_data in keys: |
- raw_key = {} |
- |
- for key, value in key_data.items(): |
- raw_key[key] = self._dynamizer.encode(value) |
- |
- items[self.table_name]['Keys'].append(raw_key) |
- |
- raw_results = self.connection.batch_get_item(request_items=items) |
- results = [] |
- unprocessed_keys = [] |
- |
- for raw_item in raw_results['Responses'].get(self.table_name, []): |
- item = Item(self) |
- item.load({ |
- 'Item': raw_item, |
- }) |
- results.append(item) |
- |
- raw_unproccessed = raw_results.get('UnprocessedKeys', {}) |
- |
- for raw_key in raw_unproccessed.get('Keys', []): |
- py_key = {} |
- |
- for key, value in raw_key.items(): |
- py_key[key] = self._dynamizer.decode(value) |
- |
- unprocessed_keys.append(py_key) |
- |
- return { |
- 'results': results, |
- # NEVER return a ``last_key``. Just in-case any part of |
- # ``ResultSet`` peeks through, since much of the |
- # original underlying implementation is based on this key. |
- 'last_key': None, |
- 'unprocessed_keys': unprocessed_keys, |
- } |
- |
- def count(self): |
- """ |
- Returns a (very) eventually consistent count of the number of items |
- in a table. |
- |
- Lag time is about 6 hours, so don't expect a high degree of accuracy. |
- |
- Example:: |
- |
- >>> users.count() |
- 6 |
- |
- """ |
- info = self.describe() |
- return info['Table'].get('ItemCount', 0) |
- |
- |
-class BatchTable(object): |
- """ |
- Used by ``Table`` as the context manager for batch writes. |
- |
- You likely don't want to try to use this object directly. |
- """ |
- def __init__(self, table): |
- self.table = table |
- self._to_put = [] |
- self._to_delete = [] |
- self._unprocessed = [] |
- |
- def __enter__(self): |
- return self |
- |
- def __exit__(self, type, value, traceback): |
- if self._to_put or self._to_delete: |
- # Flush anything that's left. |
- self.flush() |
- |
- if self._unprocessed: |
- # Finally, handle anything that wasn't processed. |
- self.resend_unprocessed() |
- |
- def put_item(self, data, overwrite=False): |
- self._to_put.append(data) |
- |
- if self.should_flush(): |
- self.flush() |
- |
- def delete_item(self, **kwargs): |
- self._to_delete.append(kwargs) |
- |
- if self.should_flush(): |
- self.flush() |
- |
- def should_flush(self): |
- if len(self._to_put) + len(self._to_delete) == 25: |
- return True |
- |
- return False |
- |
- def flush(self): |
- batch_data = { |
- self.table.table_name: [ |
- # We'll insert data here shortly. |
- ], |
- } |
- |
- for put in self._to_put: |
- item = Item(self.table, data=put) |
- batch_data[self.table.table_name].append({ |
- 'PutRequest': { |
- 'Item': item.prepare_full(), |
- } |
- }) |
- |
- for delete in self._to_delete: |
- batch_data[self.table.table_name].append({ |
- 'DeleteRequest': { |
- 'Key': self.table._encode_keys(delete), |
- } |
- }) |
- |
- resp = self.table.connection.batch_write_item(batch_data) |
- self.handle_unprocessed(resp) |
- |
- self._to_put = [] |
- self._to_delete = [] |
- return True |
- |
- def handle_unprocessed(self, resp): |
- if len(resp.get('UnprocessedItems', [])): |
- table_name = self.table.table_name |
- unprocessed = resp['UnprocessedItems'].get(table_name, []) |
- |
- # Some items have not been processed. Stow them for now & |
- # re-attempt processing on ``__exit__``. |
- msg = "%s items were unprocessed. Storing for later." |
- boto.log.info(msg % len(unprocessed)) |
- self._unprocessed.extend(unprocessed) |
- |
- def resend_unprocessed(self): |
- # If there are unprocessed records (for instance, the user was over |
- # their throughput limitations), iterate over them & send until they're |
- # all there. |
- boto.log.info( |
- "Re-sending %s unprocessed items." % len(self._unprocessed) |
- ) |
- |
- while len(self._unprocessed): |
- # Again, do 25 at a time. |
- to_resend = self._unprocessed[:25] |
- # Remove them from the list. |
- self._unprocessed = self._unprocessed[25:] |
- batch_data = { |
- self.table.table_name: to_resend |
- } |
- boto.log.info("Sending %s items" % len(to_resend)) |
- resp = self.table.connection.batch_write_item(batch_data) |
- self.handle_unprocessed(resp) |
- boto.log.info( |
- "%s unprocessed items left" % len(self._unprocessed) |
- ) |