Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(211)

Unified Diff: third_party/gsutil/third_party/boto/boto/dynamodb2/table.py

Issue 1377933002: [catapult] - Copy Telemetry's gsutilz over to third_party. (Closed) Base URL: https://github.com/catapult-project/catapult.git@master
Patch Set: Rename to gsutil. Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/gsutil/third_party/boto/boto/dynamodb2/table.py
diff --git a/third_party/gsutil/third_party/boto/boto/dynamodb2/table.py b/third_party/gsutil/third_party/boto/boto/dynamodb2/table.py
new file mode 100644
index 0000000000000000000000000000000000000000..d02ff5c7deb4acbb53925c85ab0e489649192a4d
--- /dev/null
+++ b/third_party/gsutil/third_party/boto/boto/dynamodb2/table.py
@@ -0,0 +1,1722 @@
+import boto
+from boto.dynamodb2 import exceptions
+from boto.dynamodb2.fields import (HashKey, RangeKey,
+ AllIndex, KeysOnlyIndex, IncludeIndex,
+ GlobalAllIndex, GlobalKeysOnlyIndex,
+ GlobalIncludeIndex)
+from boto.dynamodb2.items import Item
+from boto.dynamodb2.layer1 import DynamoDBConnection
+from boto.dynamodb2.results import ResultSet, BatchGetResultSet
+from boto.dynamodb2.types import (NonBooleanDynamizer, Dynamizer, FILTER_OPERATORS,
+ QUERY_OPERATORS, STRING)
+from boto.exception import JSONResponseError
+
+
+class Table(object):
+ """
+ Interacts & models the behavior of a DynamoDB table.
+
+ The ``Table`` object represents a set (or rough categorization) of
+ records within DynamoDB. The important part is that all records within the
+ table, while largely-schema-free, share the same schema & are essentially
+ namespaced for use in your application. For example, you might have a
+ ``users`` table or a ``forums`` table.
+ """
+ max_batch_get = 100
+
+ _PROJECTION_TYPE_TO_INDEX = dict(
+ global_indexes=dict(
+ ALL=GlobalAllIndex,
+ KEYS_ONLY=GlobalKeysOnlyIndex,
+ INCLUDE=GlobalIncludeIndex,
+ ), local_indexes=dict(
+ ALL=AllIndex,
+ KEYS_ONLY=KeysOnlyIndex,
+ INCLUDE=IncludeIndex,
+ )
+ )
+
+ def __init__(self, table_name, schema=None, throughput=None, indexes=None,
+ global_indexes=None, connection=None):
+ """
+ Sets up a new in-memory ``Table``.
+
+ This is useful if the table already exists within DynamoDB & you simply
+ want to use it for additional interactions. The only required parameter
+ is the ``table_name``. However, under the hood, the object will call
+ ``describe_table`` to determine the schema/indexes/throughput. You
+ can avoid this extra call by passing in ``schema`` & ``indexes``.
+
+ **IMPORTANT** - If you're creating a new ``Table`` for the first time,
+ you should use the ``Table.create`` method instead, as it will
+ persist the table structure to DynamoDB.
+
+ Requires a ``table_name`` parameter, which should be a simple string
+ of the name of the table.
+
+ Optionally accepts a ``schema`` parameter, which should be a list of
+ ``BaseSchemaField`` subclasses representing the desired schema.
+
+ Optionally accepts a ``throughput`` parameter, which should be a
+ dictionary. If provided, it should specify a ``read`` & ``write`` key,
+ both of which should have an integer value associated with them.
+
+ Optionally accepts a ``indexes`` parameter, which should be a list of
+ ``BaseIndexField`` subclasses representing the desired indexes.
+
+ Optionally accepts a ``global_indexes`` parameter, which should be a
+ list of ``GlobalBaseIndexField`` subclasses representing the desired
+ indexes.
+
+ Optionally accepts a ``connection`` parameter, which should be a
+ ``DynamoDBConnection`` instance (or subclass). This is primarily useful
+ for specifying alternate connection parameters.
+
+ Example::
+
+ # The simple, it-already-exists case.
+ >>> conn = Table('users')
+
+ # The full, minimum-extra-calls case.
+ >>> from boto import dynamodb2
+ >>> users = Table('users', schema=[
+ ... HashKey('username'),
+ ... RangeKey('date_joined', data_type=NUMBER)
+ ... ], throughput={
+ ... 'read':20,
+ ... 'write': 10,
+ ... }, indexes=[
+ ... KeysOnlyIndex('MostRecentlyJoined', parts=[
+ ... HashKey('username')
+ ... RangeKey('date_joined')
+ ... ]),
+ ... ], global_indexes=[
+ ... GlobalAllIndex('UsersByZipcode', parts=[
+ ... HashKey('zipcode'),
+ ... RangeKey('username'),
+ ... ],
+ ... throughput={
+ ... 'read':10,
+ ... 'write":10,
+ ... }),
+ ... ], connection=dynamodb2.connect_to_region('us-west-2',
+ ... aws_access_key_id='key',
+ ... aws_secret_access_key='key',
+ ... ))
+
+ """
+ self.table_name = table_name
+ self.connection = connection
+ self.throughput = {
+ 'read': 5,
+ 'write': 5,
+ }
+ self.schema = schema
+ self.indexes = indexes
+ self.global_indexes = global_indexes
+
+ if self.connection is None:
+ self.connection = DynamoDBConnection()
+
+ if throughput is not None:
+ self.throughput = throughput
+
+ self._dynamizer = NonBooleanDynamizer()
+
+ def use_boolean(self):
+ self._dynamizer = Dynamizer()
+
+ @classmethod
+ def create(cls, table_name, schema, throughput=None, indexes=None,
+ global_indexes=None, connection=None):
+ """
+ Creates a new table in DynamoDB & returns an in-memory ``Table`` object.
+
+ This will setup a brand new table within DynamoDB. The ``table_name``
+ must be unique for your AWS account. The ``schema`` is also required
+ to define the key structure of the table.
+
+ **IMPORTANT** - You should consider the usage pattern of your table
+ up-front, as the schema can **NOT** be modified once the table is
+ created, requiring the creation of a new table & migrating the data
+ should you wish to revise it.
+
+ **IMPORTANT** - If the table already exists in DynamoDB, additional
+ calls to this method will result in an error. If you just need
+ a ``Table`` object to interact with the existing table, you should
+ just initialize a new ``Table`` object, which requires only the
+ ``table_name``.
+
+ Requires a ``table_name`` parameter, which should be a simple string
+ of the name of the table.
+
+ Requires a ``schema`` parameter, which should be a list of
+ ``BaseSchemaField`` subclasses representing the desired schema.
+
+ Optionally accepts a ``throughput`` parameter, which should be a
+ dictionary. If provided, it should specify a ``read`` & ``write`` key,
+ both of which should have an integer value associated with them.
+
+ Optionally accepts a ``indexes`` parameter, which should be a list of
+ ``BaseIndexField`` subclasses representing the desired indexes.
+
+ Optionally accepts a ``global_indexes`` parameter, which should be a
+ list of ``GlobalBaseIndexField`` subclasses representing the desired
+ indexes.
+
+ Optionally accepts a ``connection`` parameter, which should be a
+ ``DynamoDBConnection`` instance (or subclass). This is primarily useful
+ for specifying alternate connection parameters.
+
+ Example::
+
+ >>> users = Table.create('users', schema=[
+ ... HashKey('username'),
+ ... RangeKey('date_joined', data_type=NUMBER)
+ ... ], throughput={
+ ... 'read':20,
+ ... 'write': 10,
+ ... }, indexes=[
+ ... KeysOnlyIndex('MostRecentlyJoined', parts=[
+ ... RangeKey('date_joined')
+ ... ]), global_indexes=[
+ ... GlobalAllIndex('UsersByZipcode', parts=[
+ ... HashKey('zipcode'),
+ ... RangeKey('username'),
+ ... ],
+ ... throughput={
+ ... 'read':10,
+ ... 'write':10,
+ ... }),
+ ... ])
+
+ """
+ table = cls(table_name=table_name, connection=connection)
+ table.schema = schema
+
+ if throughput is not None:
+ table.throughput = throughput
+
+ if indexes is not None:
+ table.indexes = indexes
+
+ if global_indexes is not None:
+ table.global_indexes = global_indexes
+
+ # Prep the schema.
+ raw_schema = []
+ attr_defs = []
+ seen_attrs = set()
+
+ for field in table.schema:
+ raw_schema.append(field.schema())
+ # Build the attributes off what we know.
+ seen_attrs.add(field.name)
+ attr_defs.append(field.definition())
+
+ raw_throughput = {
+ 'ReadCapacityUnits': int(table.throughput['read']),
+ 'WriteCapacityUnits': int(table.throughput['write']),
+ }
+ kwargs = {}
+
+ kwarg_map = {
+ 'indexes': 'local_secondary_indexes',
+ 'global_indexes': 'global_secondary_indexes',
+ }
+ for index_attr in ('indexes', 'global_indexes'):
+ table_indexes = getattr(table, index_attr)
+ if table_indexes:
+ raw_indexes = []
+ for index_field in table_indexes:
+ raw_indexes.append(index_field.schema())
+ # Make sure all attributes specified in the indexes are
+ # added to the definition
+ for field in index_field.parts:
+ if field.name not in seen_attrs:
+ seen_attrs.add(field.name)
+ attr_defs.append(field.definition())
+
+ kwargs[kwarg_map[index_attr]] = raw_indexes
+
+ table.connection.create_table(
+ table_name=table.table_name,
+ attribute_definitions=attr_defs,
+ key_schema=raw_schema,
+ provisioned_throughput=raw_throughput,
+ **kwargs
+ )
+ return table
+
+ def _introspect_schema(self, raw_schema, raw_attributes=None):
+ """
+ Given a raw schema structure back from a DynamoDB response, parse
+ out & build the high-level Python objects that represent them.
+ """
+ schema = []
+ sane_attributes = {}
+
+ if raw_attributes:
+ for field in raw_attributes:
+ sane_attributes[field['AttributeName']] = field['AttributeType']
+
+ for field in raw_schema:
+ data_type = sane_attributes.get(field['AttributeName'], STRING)
+
+ if field['KeyType'] == 'HASH':
+ schema.append(
+ HashKey(field['AttributeName'], data_type=data_type)
+ )
+ elif field['KeyType'] == 'RANGE':
+ schema.append(
+ RangeKey(field['AttributeName'], data_type=data_type)
+ )
+ else:
+ raise exceptions.UnknownSchemaFieldError(
+ "%s was seen, but is unknown. Please report this at "
+ "https://github.com/boto/boto/issues." % field['KeyType']
+ )
+
+ return schema
+
+ def _introspect_all_indexes(self, raw_indexes, map_indexes_projection):
+ """
+ Given a raw index/global index structure back from a DynamoDB response,
+ parse out & build the high-level Python objects that represent them.
+ """
+ indexes = []
+
+ for field in raw_indexes:
+ index_klass = map_indexes_projection.get('ALL')
+ kwargs = {
+ 'parts': []
+ }
+
+ if field['Projection']['ProjectionType'] == 'ALL':
+ index_klass = map_indexes_projection.get('ALL')
+ elif field['Projection']['ProjectionType'] == 'KEYS_ONLY':
+ index_klass = map_indexes_projection.get('KEYS_ONLY')
+ elif field['Projection']['ProjectionType'] == 'INCLUDE':
+ index_klass = map_indexes_projection.get('INCLUDE')
+ kwargs['includes'] = field['Projection']['NonKeyAttributes']
+ else:
+ raise exceptions.UnknownIndexFieldError(
+ "%s was seen, but is unknown. Please report this at "
+ "https://github.com/boto/boto/issues." % \
+ field['Projection']['ProjectionType']
+ )
+
+ name = field['IndexName']
+ kwargs['parts'] = self._introspect_schema(field['KeySchema'], None)
+ indexes.append(index_klass(name, **kwargs))
+
+ return indexes
+
+ def _introspect_indexes(self, raw_indexes):
+ """
+ Given a raw index structure back from a DynamoDB response, parse
+ out & build the high-level Python objects that represent them.
+ """
+ return self._introspect_all_indexes(
+ raw_indexes, self._PROJECTION_TYPE_TO_INDEX.get('local_indexes'))
+
+ def _introspect_global_indexes(self, raw_global_indexes):
+ """
+ Given a raw global index structure back from a DynamoDB response, parse
+ out & build the high-level Python objects that represent them.
+ """
+ return self._introspect_all_indexes(
+ raw_global_indexes,
+ self._PROJECTION_TYPE_TO_INDEX.get('global_indexes'))
+
+ def describe(self):
+ """
+ Describes the current structure of the table in DynamoDB.
+
+ This information will be used to update the ``schema``, ``indexes``,
+ ``global_indexes`` and ``throughput`` information on the ``Table``. Some
+ calls, such as those involving creating keys or querying, will require
+ this information to be populated.
+
+ It also returns the full raw data structure from DynamoDB, in the
+ event you'd like to parse out additional information (such as the
+ ``ItemCount`` or usage information).
+
+ Example::
+
+ >>> users.describe()
+ {
+ # Lots of keys here...
+ }
+ >>> len(users.schema)
+ 2
+
+ """
+ result = self.connection.describe_table(self.table_name)
+
+ # Blindly update throughput, since what's on DynamoDB's end is likely
+ # more correct.
+ raw_throughput = result['Table']['ProvisionedThroughput']
+ self.throughput['read'] = int(raw_throughput['ReadCapacityUnits'])
+ self.throughput['write'] = int(raw_throughput['WriteCapacityUnits'])
+
+ if not self.schema:
+ # Since we have the data, build the schema.
+ raw_schema = result['Table'].get('KeySchema', [])
+ raw_attributes = result['Table'].get('AttributeDefinitions', [])
+ self.schema = self._introspect_schema(raw_schema, raw_attributes)
+
+ if not self.indexes:
+ # Build the index information as well.
+ raw_indexes = result['Table'].get('LocalSecondaryIndexes', [])
+ self.indexes = self._introspect_indexes(raw_indexes)
+
+ # Build the global index information as well.
+ raw_global_indexes = result['Table'].get('GlobalSecondaryIndexes', [])
+ self.global_indexes = self._introspect_global_indexes(raw_global_indexes)
+
+ # This is leaky.
+ return result
+
+ def update(self, throughput=None, global_indexes=None):
+ """
+ Updates table attributes and global indexes in DynamoDB.
+
+ Optionally accepts a ``throughput`` parameter, which should be a
+ dictionary. If provided, it should specify a ``read`` & ``write`` key,
+ both of which should have an integer value associated with them.
+
+ Optionally accepts a ``global_indexes`` parameter, which should be a
+ dictionary. If provided, it should specify the index name, which is also
+ a dict containing a ``read`` & ``write`` key, both of which
+ should have an integer value associated with them. If you are writing
+ new code, please use ``Table.update_global_secondary_index``.
+
+ Returns ``True`` on success.
+
+ Example::
+
+ # For a read-heavier application...
+ >>> users.update(throughput={
+ ... 'read': 20,
+ ... 'write': 10,
+ ... })
+ True
+
+ # To also update the global index(es) throughput.
+ >>> users.update(throughput={
+ ... 'read': 20,
+ ... 'write': 10,
+ ... },
+ ... global_secondary_indexes={
+ ... 'TheIndexNameHere': {
+ ... 'read': 15,
+ ... 'write': 5,
+ ... }
+ ... })
+ True
+ """
+
+ data = None
+
+ if throughput:
+ self.throughput = throughput
+ data = {
+ 'ReadCapacityUnits': int(self.throughput['read']),
+ 'WriteCapacityUnits': int(self.throughput['write']),
+ }
+
+ gsi_data = None
+
+ if global_indexes:
+ gsi_data = []
+
+ for gsi_name, gsi_throughput in global_indexes.items():
+ gsi_data.append({
+ "Update": {
+ "IndexName": gsi_name,
+ "ProvisionedThroughput": {
+ "ReadCapacityUnits": int(gsi_throughput['read']),
+ "WriteCapacityUnits": int(gsi_throughput['write']),
+ },
+ },
+ })
+
+ if throughput or global_indexes:
+ self.connection.update_table(
+ self.table_name,
+ provisioned_throughput=data,
+ global_secondary_index_updates=gsi_data,
+ )
+
+ return True
+ else:
+ msg = 'You need to provide either the throughput or the ' \
+ 'global_indexes to update method'
+ boto.log.error(msg)
+
+ return False
+
+ def create_global_secondary_index(self, global_index):
+ """
+ Creates a global index in DynamoDB after the table has been created.
+
+ Requires a ``global_indexes`` parameter, which should be a
+ ``GlobalBaseIndexField`` subclass representing the desired index.
+
+ To update ``global_indexes`` information on the ``Table``, you'll need
+ to call ``Table.describe``.
+
+ Returns ``True`` on success.
+
+ Example::
+
+ # To create a global index
+ >>> users.create_global_secondary_index(
+ ... global_index=GlobalAllIndex(
+ ... 'TheIndexNameHere', parts=[
+ ... HashKey('requiredHashkey', data_type=STRING),
+ ... RangeKey('optionalRangeKey', data_type=STRING)
+ ... ],
+ ... throughput={
+ ... 'read': 2,
+ ... 'write': 1,
+ ... })
+ ... )
+ True
+
+ """
+
+ if global_index:
+ gsi_data = []
+ gsi_data_attr_def = []
+
+ gsi_data.append({
+ "Create": global_index.schema()
+ })
+
+ for attr_def in global_index.parts:
+ gsi_data_attr_def.append(attr_def.definition())
+
+ self.connection.update_table(
+ self.table_name,
+ global_secondary_index_updates=gsi_data,
+ attribute_definitions=gsi_data_attr_def
+ )
+
+ return True
+ else:
+ msg = 'You need to provide the global_index to ' \
+ 'create_global_secondary_index method'
+ boto.log.error(msg)
+
+ return False
+
+ def delete_global_secondary_index(self, global_index_name):
+ """
+ Deletes a global index in DynamoDB after the table has been created.
+
+ Requires a ``global_index_name`` parameter, which should be a simple
+ string of the name of the global secondary index.
+
+ To update ``global_indexes`` information on the ``Table``, you'll need
+ to call ``Table.describe``.
+
+ Returns ``True`` on success.
+
+ Example::
+
+ # To delete a global index
+ >>> users.delete_global_secondary_index('TheIndexNameHere')
+ True
+
+ """
+
+ if global_index_name:
+ gsi_data = [
+ {
+ "Delete": {
+ "IndexName": global_index_name
+ }
+ }
+ ]
+
+ self.connection.update_table(
+ self.table_name,
+ global_secondary_index_updates=gsi_data,
+ )
+
+ return True
+ else:
+ msg = 'You need to provide the global index name to ' \
+ 'delete_global_secondary_index method'
+ boto.log.error(msg)
+
+ return False
+
+ def update_global_secondary_index(self, global_indexes):
+ """
+ Updates a global index(es) in DynamoDB after the table has been created.
+
+ Requires a ``global_indexes`` parameter, which should be a
+ dictionary. If provided, it should specify the index name, which is also
+ a dict containing a ``read`` & ``write`` key, both of which
+ should have an integer value associated with them.
+
+ To update ``global_indexes`` information on the ``Table``, you'll need
+ to call ``Table.describe``.
+
+ Returns ``True`` on success.
+
+ Example::
+
+ # To update a global index
+ >>> users.update_global_secondary_index(global_indexes={
+ ... 'TheIndexNameHere': {
+ ... 'read': 15,
+ ... 'write': 5,
+ ... }
+ ... })
+ True
+
+ """
+
+ if global_indexes:
+ gsi_data = []
+
+ for gsi_name, gsi_throughput in global_indexes.items():
+ gsi_data.append({
+ "Update": {
+ "IndexName": gsi_name,
+ "ProvisionedThroughput": {
+ "ReadCapacityUnits": int(gsi_throughput['read']),
+ "WriteCapacityUnits": int(gsi_throughput['write']),
+ },
+ },
+ })
+
+ self.connection.update_table(
+ self.table_name,
+ global_secondary_index_updates=gsi_data,
+ )
+ return True
+ else:
+ msg = 'You need to provide the global indexes to ' \
+ 'update_global_secondary_index method'
+ boto.log.error(msg)
+
+ return False
+
+ def delete(self):
+ """
+ Deletes a table in DynamoDB.
+
+ **IMPORTANT** - Be careful when using this method, there is no undo.
+
+ Returns ``True`` on success.
+
+ Example::
+
+ >>> users.delete()
+ True
+
+ """
+ self.connection.delete_table(self.table_name)
+ return True
+
+ def _encode_keys(self, keys):
+ """
+ Given a flat Python dictionary of keys/values, converts it into the
+ nested dictionary DynamoDB expects.
+
+ Converts::
+
+ {
+ 'username': 'john',
+ 'tags': [1, 2, 5],
+ }
+
+ ...to...::
+
+ {
+ 'username': {'S': 'john'},
+ 'tags': {'NS': ['1', '2', '5']},
+ }
+
+ """
+ raw_key = {}
+
+ for key, value in keys.items():
+ raw_key[key] = self._dynamizer.encode(value)
+
+ return raw_key
+
+ def get_item(self, consistent=False, attributes=None, **kwargs):
+ """
+ Fetches an item (record) from a table in DynamoDB.
+
+ To specify the key of the item you'd like to get, you can specify the
+ key attributes as kwargs.
+
+ Optionally accepts a ``consistent`` parameter, which should be a
+ boolean. If you provide ``True``, it will perform
+ a consistent (but more expensive) read from DynamoDB.
+ (Default: ``False``)
+
+ Optionally accepts an ``attributes`` parameter, which should be a
+ list of fieldname to fetch. (Default: ``None``, which means all fields
+ should be fetched)
+
+ Returns an ``Item`` instance containing all the data for that record.
+
+ Raises an ``ItemNotFound`` exception if the item is not found.
+
+ Example::
+
+ # A simple hash key.
+ >>> john = users.get_item(username='johndoe')
+ >>> john['first_name']
+ 'John'
+
+ # A complex hash+range key.
+ >>> john = users.get_item(username='johndoe', last_name='Doe')
+ >>> john['first_name']
+ 'John'
+
+ # A consistent read (assuming the data might have just changed).
+ >>> john = users.get_item(username='johndoe', consistent=True)
+ >>> john['first_name']
+ 'Johann'
+
+ # With a key that is an invalid variable name in Python.
+ # Also, assumes a different schema than previous examples.
+ >>> john = users.get_item(**{
+ ... 'date-joined': 127549192,
+ ... })
+ >>> john['first_name']
+ 'John'
+
+ """
+ raw_key = self._encode_keys(kwargs)
+ item_data = self.connection.get_item(
+ self.table_name,
+ raw_key,
+ attributes_to_get=attributes,
+ consistent_read=consistent
+ )
+ if 'Item' not in item_data:
+ raise exceptions.ItemNotFound("Item %s couldn't be found." % kwargs)
+ item = Item(self)
+ item.load(item_data)
+ return item
+
+ def has_item(self, **kwargs):
+ """
+ Return whether an item (record) exists within a table in DynamoDB.
+
+ To specify the key of the item you'd like to get, you can specify the
+ key attributes as kwargs.
+
+ Optionally accepts a ``consistent`` parameter, which should be a
+ boolean. If you provide ``True``, it will perform
+ a consistent (but more expensive) read from DynamoDB.
+ (Default: ``False``)
+
+ Optionally accepts an ``attributes`` parameter, which should be a
+ list of fieldnames to fetch. (Default: ``None``, which means all fields
+ should be fetched)
+
+ Returns ``True`` if an ``Item`` is present, ``False`` if not.
+
+ Example::
+
+ # Simple, just hash-key schema.
+ >>> users.has_item(username='johndoe')
+ True
+
+ # Complex schema, item not present.
+ >>> users.has_item(
+ ... username='johndoe',
+ ... date_joined='2014-01-07'
+ ... )
+ False
+
+ """
+ try:
+ self.get_item(**kwargs)
+ except (JSONResponseError, exceptions.ItemNotFound):
+ return False
+
+ return True
+
+ def lookup(self, *args, **kwargs):
+ """
+ Look up an entry in DynamoDB. This is mostly backwards compatible
+ with boto.dynamodb. Unlike get_item, it takes hash_key and range_key first,
+ although you may still specify keyword arguments instead.
+
+ Also unlike the get_item command, if the returned item has no keys
+ (i.e., it does not exist in DynamoDB), a None result is returned, instead
+ of an empty key object.
+
+ Example::
+ >>> user = users.lookup(username)
+ >>> user = users.lookup(username, consistent=True)
+ >>> app = apps.lookup('my_customer_id', 'my_app_id')
+
+ """
+ if not self.schema:
+ self.describe()
+ for x, arg in enumerate(args):
+ kwargs[self.schema[x].name] = arg
+ ret = self.get_item(**kwargs)
+ if not ret.keys():
+ return None
+ return ret
+
+ def new_item(self, *args):
+ """
+ Returns a new, blank item
+
+ This is mostly for consistency with boto.dynamodb
+ """
+ if not self.schema:
+ self.describe()
+ data = {}
+ for x, arg in enumerate(args):
+ data[self.schema[x].name] = arg
+ return Item(self, data=data)
+
+ def put_item(self, data, overwrite=False):
+ """
+ Saves an entire item to DynamoDB.
+
+ By default, if any part of the ``Item``'s original data doesn't match
+ what's currently in DynamoDB, this request will fail. This prevents
+ other processes from updating the data in between when you read the
+ item & when your request to update the item's data is processed, which
+ would typically result in some data loss.
+
+ Requires a ``data`` parameter, which should be a dictionary of the data
+ you'd like to store in DynamoDB.
+
+ Optionally accepts an ``overwrite`` parameter, which should be a
+ boolean. If you provide ``True``, this will tell DynamoDB to blindly
+ overwrite whatever data is present, if any.
+
+ Returns ``True`` on success.
+
+ Example::
+
+ >>> users.put_item(data={
+ ... 'username': 'jane',
+ ... 'first_name': 'Jane',
+ ... 'last_name': 'Doe',
+ ... 'date_joined': 126478915,
+ ... })
+ True
+
+ """
+ item = Item(self, data=data)
+ return item.save(overwrite=overwrite)
+
+ def _put_item(self, item_data, expects=None):
+ """
+ The internal variant of ``put_item`` (full data). This is used by the
+ ``Item`` objects, since that operation is represented at the
+ table-level by the API, but conceptually maps better to telling an
+ individual ``Item`` to save itself.
+ """
+ kwargs = {}
+
+ if expects is not None:
+ kwargs['expected'] = expects
+
+ self.connection.put_item(self.table_name, item_data, **kwargs)
+ return True
+
+ def _update_item(self, key, item_data, expects=None):
+ """
+ The internal variant of ``put_item`` (partial data). This is used by the
+ ``Item`` objects, since that operation is represented at the
+ table-level by the API, but conceptually maps better to telling an
+ individual ``Item`` to save itself.
+ """
+ raw_key = self._encode_keys(key)
+ kwargs = {}
+
+ if expects is not None:
+ kwargs['expected'] = expects
+
+ self.connection.update_item(self.table_name, raw_key, item_data, **kwargs)
+ return True
+
+ def delete_item(self, expected=None, conditional_operator=None, **kwargs):
+ """
+ Deletes a single item. You can perform a conditional delete operation
+ that deletes the item if it exists, or if it has an expected attribute
+ value.
+
+ Conditional deletes are useful for only deleting items if specific
+ conditions are met. If those conditions are met, DynamoDB performs
+ the delete. Otherwise, the item is not deleted.
+
+ To specify the expected attribute values of the item, you can pass a
+ dictionary of conditions to ``expected``. Each condition should follow
+ the pattern ``<attributename>__<comparison_operator>=<value_to_expect>``.
+
+ **IMPORTANT** - Be careful when using this method, there is no undo.
+
+ To specify the key of the item you'd like to get, you can specify the
+ key attributes as kwargs.
+
+ Optionally accepts an ``expected`` parameter which is a dictionary of
+ expected attribute value conditions.
+
+ Optionally accepts a ``conditional_operator`` which applies to the
+ expected attribute value conditions:
+
+ + `AND` - If all of the conditions evaluate to true (default)
+ + `OR` - True if at least one condition evaluates to true
+
+ Returns ``True`` on success, ``False`` on failed conditional delete.
+
+ Example::
+
+ # A simple hash key.
+ >>> users.delete_item(username='johndoe')
+ True
+
+ # A complex hash+range key.
+ >>> users.delete_item(username='jane', last_name='Doe')
+ True
+
+ # With a key that is an invalid variable name in Python.
+ # Also, assumes a different schema than previous examples.
+ >>> users.delete_item(**{
+ ... 'date-joined': 127549192,
+ ... })
+ True
+
+ # Conditional delete
+ >>> users.delete_item(username='johndoe',
+ ... expected={'balance__eq': 0})
+ True
+ """
+ expected = self._build_filters(expected, using=FILTER_OPERATORS)
+ raw_key = self._encode_keys(kwargs)
+
+ try:
+ self.connection.delete_item(self.table_name, raw_key,
+ expected=expected,
+ conditional_operator=conditional_operator)
+ except exceptions.ConditionalCheckFailedException:
+ return False
+
+ return True
+
+ def get_key_fields(self):
+ """
+ Returns the fields necessary to make a key for a table.
+
+ If the ``Table`` does not already have a populated ``schema``,
+ this will request it via a ``Table.describe`` call.
+
+ Returns a list of fieldnames (strings).
+
+ Example::
+
+ # A simple hash key.
+ >>> users.get_key_fields()
+ ['username']
+
+ # A complex hash+range key.
+ >>> users.get_key_fields()
+ ['username', 'last_name']
+
+ """
+ if not self.schema:
+ # We don't know the structure of the table. Get a description to
+ # populate the schema.
+ self.describe()
+
+ return [field.name for field in self.schema]
+
+ def batch_write(self):
+ """
+ Allows the batching of writes to DynamoDB.
+
+ Since each write/delete call to DynamoDB has a cost associated with it,
+ when loading lots of data, it makes sense to batch them, creating as
+ few calls as possible.
+
+ This returns a context manager that will transparently handle creating
+ these batches. The object you get back lightly-resembles a ``Table``
+ object, sharing just the ``put_item`` & ``delete_item`` methods
+ (which are all that DynamoDB can batch in terms of writing data).
+
+ DynamoDB's maximum batch size is 25 items per request. If you attempt
+ to put/delete more than that, the context manager will batch as many
+ as it can up to that number, then flush them to DynamoDB & continue
+ batching as more calls come in.
+
+ Example::
+
+ # Assuming a table with one record...
+ >>> with users.batch_write() as batch:
+ ... batch.put_item(data={
+ ... 'username': 'johndoe',
+ ... 'first_name': 'John',
+ ... 'last_name': 'Doe',
+ ... 'owner': 1,
+ ... })
+ ... # Nothing across the wire yet.
+ ... batch.delete_item(username='bob')
+ ... # Still no requests sent.
+ ... batch.put_item(data={
+ ... 'username': 'jane',
+ ... 'first_name': 'Jane',
+ ... 'last_name': 'Doe',
+ ... 'date_joined': 127436192,
+ ... })
+ ... # Nothing yet, but once we leave the context, the
+ ... # put/deletes will be sent.
+
+ """
+ # PHENOMENAL COSMIC DOCS!!! itty-bitty code.
+ return BatchTable(self)
+
+ def _build_filters(self, filter_kwargs, using=QUERY_OPERATORS):
+ """
+ An internal method for taking query/scan-style ``**kwargs`` & turning
+ them into the raw structure DynamoDB expects for filtering.
+ """
+ if filter_kwargs is None:
+ return
+
+ filters = {}
+
+ for field_and_op, value in filter_kwargs.items():
+ field_bits = field_and_op.split('__')
+ fieldname = '__'.join(field_bits[:-1])
+
+ try:
+ op = using[field_bits[-1]]
+ except KeyError:
+ raise exceptions.UnknownFilterTypeError(
+ "Operator '%s' from '%s' is not recognized." % (
+ field_bits[-1],
+ field_and_op
+ )
+ )
+
+ lookup = {
+ 'AttributeValueList': [],
+ 'ComparisonOperator': op,
+ }
+
+ # Special-case the ``NULL/NOT_NULL`` case.
+ if field_bits[-1] == 'null':
+ del lookup['AttributeValueList']
+
+ if value is False:
+ lookup['ComparisonOperator'] = 'NOT_NULL'
+ else:
+ lookup['ComparisonOperator'] = 'NULL'
+ # Special-case the ``BETWEEN`` case.
+ elif field_bits[-1] == 'between':
+ if len(value) == 2 and isinstance(value, (list, tuple)):
+ lookup['AttributeValueList'].append(
+ self._dynamizer.encode(value[0])
+ )
+ lookup['AttributeValueList'].append(
+ self._dynamizer.encode(value[1])
+ )
+ # Special-case the ``IN`` case
+ elif field_bits[-1] == 'in':
+ for val in value:
+ lookup['AttributeValueList'].append(self._dynamizer.encode(val))
+ else:
+ # Fix up the value for encoding, because it was built to only work
+ # with ``set``s.
+ if isinstance(value, (list, tuple)):
+ value = set(value)
+ lookup['AttributeValueList'].append(
+ self._dynamizer.encode(value)
+ )
+
+ # Finally, insert it into the filters.
+ filters[fieldname] = lookup
+
+ return filters
+
+ def query(self, limit=None, index=None, reverse=False, consistent=False,
+ attributes=None, max_page_size=None, **filter_kwargs):
+ """
+ **WARNING:** This method is provided **strictly** for
+ backward-compatibility. It returns results in an incorrect order.
+
+ If you are writing new code, please use ``Table.query_2``.
+ """
+ reverse = not reverse
+ return self.query_2(limit=limit, index=index, reverse=reverse,
+ consistent=consistent, attributes=attributes,
+ max_page_size=max_page_size, **filter_kwargs)
+
+ def query_2(self, limit=None, index=None, reverse=False,
+ consistent=False, attributes=None, max_page_size=None,
+ query_filter=None, conditional_operator=None,
+ **filter_kwargs):
+ """
+ Queries for a set of matching items in a DynamoDB table.
+
+ Queries can be performed against a hash key, a hash+range key or
+ against any data stored in your local secondary indexes. Query filters
+ can be used to filter on arbitrary fields.
+
+ **Note** - You can not query against arbitrary fields within the data
+ stored in DynamoDB unless you specify ``query_filter`` values.
+
+ To specify the filters of the items you'd like to get, you can specify
+ the filters as kwargs. Each filter kwarg should follow the pattern
+ ``<fieldname>__<filter_operation>=<value_to_look_for>``. Query filters
+ are specified in the same way.
+
+ Optionally accepts a ``limit`` parameter, which should be an integer
+ count of the total number of items to return. (Default: ``None`` -
+ all results)
+
+ Optionally accepts an ``index`` parameter, which should be a string of
+ name of the local secondary index you want to query against.
+ (Default: ``None``)
+
+ Optionally accepts a ``reverse`` parameter, which will present the
+ results in reverse order. (Default: ``False`` - normal order)
+
+ Optionally accepts a ``consistent`` parameter, which should be a
+ boolean. If you provide ``True``, it will force a consistent read of
+ the data (more expensive). (Default: ``False`` - use eventually
+ consistent reads)
+
+ Optionally accepts a ``attributes`` parameter, which should be a
+ tuple. If you provide any attributes only these will be fetched
+ from DynamoDB. This uses the ``AttributesToGet`` and set's
+ ``Select`` to ``SPECIFIC_ATTRIBUTES`` API.
+
+ Optionally accepts a ``max_page_size`` parameter, which should be an
+ integer count of the maximum number of items to retrieve
+ **per-request**. This is useful in making faster requests & prevent
+ the scan from drowning out other queries. (Default: ``None`` -
+ fetch as many as DynamoDB will return)
+
+ Optionally accepts a ``query_filter`` which is a dictionary of filter
+ conditions against any arbitrary field in the returned data.
+
+ Optionally accepts a ``conditional_operator`` which applies to the
+ query filter conditions:
+
+ + `AND` - True if all filter conditions evaluate to true (default)
+ + `OR` - True if at least one filter condition evaluates to true
+
+ Returns a ``ResultSet``, which transparently handles the pagination of
+ results you get back.
+
+ Example::
+
+ # Look for last names equal to "Doe".
+ >>> results = users.query(last_name__eq='Doe')
+ >>> for res in results:
+ ... print res['first_name']
+ 'John'
+ 'Jane'
+
+ # Look for last names beginning with "D", in reverse order, limit 3.
+ >>> results = users.query(
+ ... last_name__beginswith='D',
+ ... reverse=True,
+ ... limit=3
+ ... )
+ >>> for res in results:
+ ... print res['first_name']
+ 'Alice'
+ 'Jane'
+ 'John'
+
+ # Use an LSI & a consistent read.
+ >>> results = users.query(
+ ... date_joined__gte=1236451000,
+ ... owner__eq=1,
+ ... index='DateJoinedIndex',
+ ... consistent=True
+ ... )
+ >>> for res in results:
+ ... print res['first_name']
+ 'Alice'
+ 'Bob'
+ 'John'
+ 'Fred'
+
+ # Filter by non-indexed field(s)
+ >>> results = users.query(
+ ... last_name__eq='Doe',
+ ... reverse=True,
+ ... query_filter={
+ ... 'first_name__beginswith': 'A'
+ ... }
+ ... )
+ >>> for res in results:
+ ... print res['first_name'] + ' ' + res['last_name']
+ 'Alice Doe'
+
+ """
+ if self.schema:
+ if len(self.schema) == 1:
+ if len(filter_kwargs) <= 1:
+ if not self.global_indexes or not len(self.global_indexes):
+ # If the schema only has one field, there's <= 1 filter
+ # param & no Global Secondary Indexes, this is user
+ # error. Bail early.
+ raise exceptions.QueryError(
+ "You must specify more than one key to filter on."
+ )
+
+ if attributes is not None:
+ select = 'SPECIFIC_ATTRIBUTES'
+ else:
+ select = None
+
+ results = ResultSet(
+ max_page_size=max_page_size
+ )
+ kwargs = filter_kwargs.copy()
+ kwargs.update({
+ 'limit': limit,
+ 'index': index,
+ 'reverse': reverse,
+ 'consistent': consistent,
+ 'select': select,
+ 'attributes_to_get': attributes,
+ 'query_filter': query_filter,
+ 'conditional_operator': conditional_operator,
+ })
+ results.to_call(self._query, **kwargs)
+ return results
+
+ def query_count(self, index=None, consistent=False, conditional_operator=None,
+ query_filter=None, scan_index_forward=True, limit=None,
+ exclusive_start_key=None, **filter_kwargs):
+ """
+ Queries the exact count of matching items in a DynamoDB table.
+
+ Queries can be performed against a hash key, a hash+range key or
+ against any data stored in your local secondary indexes. Query filters
+ can be used to filter on arbitrary fields.
+
+ To specify the filters of the items you'd like to get, you can specify
+ the filters as kwargs. Each filter kwarg should follow the pattern
+ ``<fieldname>__<filter_operation>=<value_to_look_for>``. Query filters
+ are specified in the same way.
+
+ Optionally accepts an ``index`` parameter, which should be a string of
+ name of the local secondary index you want to query against.
+ (Default: ``None``)
+
+ Optionally accepts a ``consistent`` parameter, which should be a
+ boolean. If you provide ``True``, it will force a consistent read of
+ the data (more expensive). (Default: ``False`` - use eventually
+ consistent reads)
+
+ Optionally accepts a ``query_filter`` which is a dictionary of filter
+ conditions against any arbitrary field in the returned data.
+
+ Optionally accepts a ``conditional_operator`` which applies to the
+ query filter conditions:
+
+ + `AND` - True if all filter conditions evaluate to true (default)
+ + `OR` - True if at least one filter condition evaluates to true
+
+ Optionally accept a ``exclusive_start_key`` which is used to get
+ the remaining items when a query cannot return the complete count.
+
+ Returns an integer which represents the exact amount of matched
+ items.
+
+ :type scan_index_forward: boolean
+ :param scan_index_forward: Specifies ascending (true) or descending
+ (false) traversal of the index. DynamoDB returns results reflecting
+ the requested order determined by the range key. If the data type
+ is Number, the results are returned in numeric order. For String,
+ the results are returned in order of ASCII character code values.
+ For Binary, DynamoDB treats each byte of the binary data as
+ unsigned when it compares binary values.
+
+ If ScanIndexForward is not specified, the results are returned in
+ ascending order.
+
+ :type limit: integer
+ :param limit: The maximum number of items to evaluate (not necessarily
+ the number of matching items).
+
+ Example::
+
+ # Look for last names equal to "Doe".
+ >>> users.query_count(last_name__eq='Doe')
+ 5
+
+ # Use an LSI & a consistent read.
+ >>> users.query_count(
+ ... date_joined__gte=1236451000,
+ ... owner__eq=1,
+ ... index='DateJoinedIndex',
+ ... consistent=True
+ ... )
+ 2
+
+ """
+ key_conditions = self._build_filters(
+ filter_kwargs,
+ using=QUERY_OPERATORS
+ )
+
+ built_query_filter = self._build_filters(
+ query_filter,
+ using=FILTER_OPERATORS
+ )
+
+ count_buffer = 0
+ last_evaluated_key = exclusive_start_key
+
+ while True:
+ raw_results = self.connection.query(
+ self.table_name,
+ index_name=index,
+ consistent_read=consistent,
+ select='COUNT',
+ key_conditions=key_conditions,
+ query_filter=built_query_filter,
+ conditional_operator=conditional_operator,
+ limit=limit,
+ scan_index_forward=scan_index_forward,
+ exclusive_start_key=last_evaluated_key
+ )
+
+ count_buffer += int(raw_results.get('Count', 0))
+ last_evaluated_key = raw_results.get('LastEvaluatedKey')
+ if not last_evaluated_key or count_buffer < 1:
+ break
+
+ return count_buffer
+
+ def _query(self, limit=None, index=None, reverse=False, consistent=False,
+ exclusive_start_key=None, select=None, attributes_to_get=None,
+ query_filter=None, conditional_operator=None, **filter_kwargs):
+ """
+ The internal method that performs the actual queries. Used extensively
+ by ``ResultSet`` to perform each (paginated) request.
+ """
+ kwargs = {
+ 'limit': limit,
+ 'index_name': index,
+ 'consistent_read': consistent,
+ 'select': select,
+ 'attributes_to_get': attributes_to_get,
+ 'conditional_operator': conditional_operator,
+ }
+
+ if reverse:
+ kwargs['scan_index_forward'] = False
+
+ if exclusive_start_key:
+ kwargs['exclusive_start_key'] = {}
+
+ for key, value in exclusive_start_key.items():
+ kwargs['exclusive_start_key'][key] = \
+ self._dynamizer.encode(value)
+
+ # Convert the filters into something we can actually use.
+ kwargs['key_conditions'] = self._build_filters(
+ filter_kwargs,
+ using=QUERY_OPERATORS
+ )
+
+ kwargs['query_filter'] = self._build_filters(
+ query_filter,
+ using=FILTER_OPERATORS
+ )
+
+ raw_results = self.connection.query(
+ self.table_name,
+ **kwargs
+ )
+ results = []
+ last_key = None
+
+ for raw_item in raw_results.get('Items', []):
+ item = Item(self)
+ item.load({
+ 'Item': raw_item,
+ })
+ results.append(item)
+
+ if raw_results.get('LastEvaluatedKey', None):
+ last_key = {}
+
+ for key, value in raw_results['LastEvaluatedKey'].items():
+ last_key[key] = self._dynamizer.decode(value)
+
+ return {
+ 'results': results,
+ 'last_key': last_key,
+ }
+
+ def scan(self, limit=None, segment=None, total_segments=None,
+ max_page_size=None, attributes=None, conditional_operator=None,
+ **filter_kwargs):
+ """
+ Scans across all items within a DynamoDB table.
+
+ Scans can be performed against a hash key or a hash+range key. You can
+ additionally filter the results after the table has been read but
+ before the response is returned by using query filters.
+
+ To specify the filters of the items you'd like to get, you can specify
+ the filters as kwargs. Each filter kwarg should follow the pattern
+ ``<fieldname>__<filter_operation>=<value_to_look_for>``.
+
+ Optionally accepts a ``limit`` parameter, which should be an integer
+ count of the total number of items to return. (Default: ``None`` -
+ all results)
+
+ Optionally accepts a ``segment`` parameter, which should be an integer
+ of the segment to retrieve on. Please see the documentation about
+ Parallel Scans (Default: ``None`` - no segments)
+
+ Optionally accepts a ``total_segments`` parameter, which should be an
+ integer count of number of segments to divide the table into.
+ Please see the documentation about Parallel Scans (Default: ``None`` -
+ no segments)
+
+ Optionally accepts a ``max_page_size`` parameter, which should be an
+ integer count of the maximum number of items to retrieve
+ **per-request**. This is useful in making faster requests & prevent
+ the scan from drowning out other queries. (Default: ``None`` -
+ fetch as many as DynamoDB will return)
+
+ Optionally accepts an ``attributes`` parameter, which should be a
+ tuple. If you provide any attributes only these will be fetched
+ from DynamoDB. This uses the ``AttributesToGet`` and set's
+ ``Select`` to ``SPECIFIC_ATTRIBUTES`` API.
+
+ Returns a ``ResultSet``, which transparently handles the pagination of
+ results you get back.
+
+ Example::
+
+ # All results.
+ >>> everything = users.scan()
+
+ # Look for last names beginning with "D".
+ >>> results = users.scan(last_name__beginswith='D')
+ >>> for res in results:
+ ... print res['first_name']
+ 'Alice'
+ 'John'
+ 'Jane'
+
+ # Use an ``IN`` filter & limit.
+ >>> results = users.scan(
+ ... age__in=[25, 26, 27, 28, 29],
+ ... limit=1
+ ... )
+ >>> for res in results:
+ ... print res['first_name']
+ 'Alice'
+
+ """
+ results = ResultSet(
+ max_page_size=max_page_size
+ )
+ kwargs = filter_kwargs.copy()
+ kwargs.update({
+ 'limit': limit,
+ 'segment': segment,
+ 'total_segments': total_segments,
+ 'attributes': attributes,
+ 'conditional_operator': conditional_operator,
+ })
+ results.to_call(self._scan, **kwargs)
+ return results
+
+ def _scan(self, limit=None, exclusive_start_key=None, segment=None,
+ total_segments=None, attributes=None, conditional_operator=None,
+ **filter_kwargs):
+ """
+ The internal method that performs the actual scan. Used extensively
+ by ``ResultSet`` to perform each (paginated) request.
+ """
+ kwargs = {
+ 'limit': limit,
+ 'segment': segment,
+ 'total_segments': total_segments,
+ 'attributes_to_get': attributes,
+ 'conditional_operator': conditional_operator,
+ }
+
+ if exclusive_start_key:
+ kwargs['exclusive_start_key'] = {}
+
+ for key, value in exclusive_start_key.items():
+ kwargs['exclusive_start_key'][key] = \
+ self._dynamizer.encode(value)
+
+ # Convert the filters into something we can actually use.
+ kwargs['scan_filter'] = self._build_filters(
+ filter_kwargs,
+ using=FILTER_OPERATORS
+ )
+
+ raw_results = self.connection.scan(
+ self.table_name,
+ **kwargs
+ )
+ results = []
+ last_key = None
+
+ for raw_item in raw_results.get('Items', []):
+ item = Item(self)
+ item.load({
+ 'Item': raw_item,
+ })
+ results.append(item)
+
+ if raw_results.get('LastEvaluatedKey', None):
+ last_key = {}
+
+ for key, value in raw_results['LastEvaluatedKey'].items():
+ last_key[key] = self._dynamizer.decode(value)
+
+ return {
+ 'results': results,
+ 'last_key': last_key,
+ }
+
+ def batch_get(self, keys, consistent=False, attributes=None):
+ """
+ Fetches many specific items in batch from a table.
+
+ Requires a ``keys`` parameter, which should be a list of dictionaries.
+ Each dictionary should consist of the keys values to specify.
+
+ Optionally accepts a ``consistent`` parameter, which should be a
+ boolean. If you provide ``True``, a strongly consistent read will be
+ used. (Default: False)
+
+ Optionally accepts an ``attributes`` parameter, which should be a
+ tuple. If you provide any attributes only these will be fetched
+ from DynamoDB.
+
+ Returns a ``ResultSet``, which transparently handles the pagination of
+ results you get back.
+
+ Example::
+
+ >>> results = users.batch_get(keys=[
+ ... {
+ ... 'username': 'johndoe',
+ ... },
+ ... {
+ ... 'username': 'jane',
+ ... },
+ ... {
+ ... 'username': 'fred',
+ ... },
+ ... ])
+ >>> for res in results:
+ ... print res['first_name']
+ 'John'
+ 'Jane'
+ 'Fred'
+
+ """
+ # We pass the keys to the constructor instead, so it can maintain it's
+ # own internal state as to what keys have been processed.
+ results = BatchGetResultSet(keys=keys, max_batch_get=self.max_batch_get)
+ results.to_call(self._batch_get, consistent=consistent, attributes=attributes)
+ return results
+
+ def _batch_get(self, keys, consistent=False, attributes=None):
+ """
+ The internal method that performs the actual batch get. Used extensively
+ by ``BatchGetResultSet`` to perform each (paginated) request.
+ """
+ items = {
+ self.table_name: {
+ 'Keys': [],
+ },
+ }
+
+ if consistent:
+ items[self.table_name]['ConsistentRead'] = True
+
+ if attributes is not None:
+ items[self.table_name]['AttributesToGet'] = attributes
+
+ for key_data in keys:
+ raw_key = {}
+
+ for key, value in key_data.items():
+ raw_key[key] = self._dynamizer.encode(value)
+
+ items[self.table_name]['Keys'].append(raw_key)
+
+ raw_results = self.connection.batch_get_item(request_items=items)
+ results = []
+ unprocessed_keys = []
+
+ for raw_item in raw_results['Responses'].get(self.table_name, []):
+ item = Item(self)
+ item.load({
+ 'Item': raw_item,
+ })
+ results.append(item)
+
+ raw_unproccessed = raw_results.get('UnprocessedKeys', {})
+
+ for raw_key in raw_unproccessed.get('Keys', []):
+ py_key = {}
+
+ for key, value in raw_key.items():
+ py_key[key] = self._dynamizer.decode(value)
+
+ unprocessed_keys.append(py_key)
+
+ return {
+ 'results': results,
+ # NEVER return a ``last_key``. Just in-case any part of
+ # ``ResultSet`` peeks through, since much of the
+ # original underlying implementation is based on this key.
+ 'last_key': None,
+ 'unprocessed_keys': unprocessed_keys,
+ }
+
+ def count(self):
+ """
+ Returns a (very) eventually consistent count of the number of items
+ in a table.
+
+ Lag time is about 6 hours, so don't expect a high degree of accuracy.
+
+ Example::
+
+ >>> users.count()
+ 6
+
+ """
+ info = self.describe()
+ return info['Table'].get('ItemCount', 0)
+
+
+class BatchTable(object):
+ """
+ Used by ``Table`` as the context manager for batch writes.
+
+ You likely don't want to try to use this object directly.
+ """
+ def __init__(self, table):
+ self.table = table
+ self._to_put = []
+ self._to_delete = []
+ self._unprocessed = []
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, type, value, traceback):
+ if self._to_put or self._to_delete:
+ # Flush anything that's left.
+ self.flush()
+
+ if self._unprocessed:
+ # Finally, handle anything that wasn't processed.
+ self.resend_unprocessed()
+
+ def put_item(self, data, overwrite=False):
+ self._to_put.append(data)
+
+ if self.should_flush():
+ self.flush()
+
+ def delete_item(self, **kwargs):
+ self._to_delete.append(kwargs)
+
+ if self.should_flush():
+ self.flush()
+
+ def should_flush(self):
+ if len(self._to_put) + len(self._to_delete) == 25:
+ return True
+
+ return False
+
+ def flush(self):
+ batch_data = {
+ self.table.table_name: [
+ # We'll insert data here shortly.
+ ],
+ }
+
+ for put in self._to_put:
+ item = Item(self.table, data=put)
+ batch_data[self.table.table_name].append({
+ 'PutRequest': {
+ 'Item': item.prepare_full(),
+ }
+ })
+
+ for delete in self._to_delete:
+ batch_data[self.table.table_name].append({
+ 'DeleteRequest': {
+ 'Key': self.table._encode_keys(delete),
+ }
+ })
+
+ resp = self.table.connection.batch_write_item(batch_data)
+ self.handle_unprocessed(resp)
+
+ self._to_put = []
+ self._to_delete = []
+ return True
+
+ def handle_unprocessed(self, resp):
+ if len(resp.get('UnprocessedItems', [])):
+ table_name = self.table.table_name
+ unprocessed = resp['UnprocessedItems'].get(table_name, [])
+
+ # Some items have not been processed. Stow them for now &
+ # re-attempt processing on ``__exit__``.
+ msg = "%s items were unprocessed. Storing for later."
+ boto.log.info(msg % len(unprocessed))
+ self._unprocessed.extend(unprocessed)
+
+ def resend_unprocessed(self):
+ # If there are unprocessed records (for instance, the user was over
+ # their throughput limitations), iterate over them & send until they're
+ # all there.
+ boto.log.info(
+ "Re-sending %s unprocessed items." % len(self._unprocessed)
+ )
+
+ while len(self._unprocessed):
+ # Again, do 25 at a time.
+ to_resend = self._unprocessed[:25]
+ # Remove them from the list.
+ self._unprocessed = self._unprocessed[25:]
+ batch_data = {
+ self.table.table_name: to_resend
+ }
+ boto.log.info("Sending %s items" % len(to_resend))
+ resp = self.table.connection.batch_write_item(batch_data)
+ self.handle_unprocessed(resp)
+ boto.log.info(
+ "%s unprocessed items left" % len(self._unprocessed)
+ )

Powered by Google App Engine
This is Rietveld 408576698