Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(5)

Unified Diff: third_party/gsutil/third_party/boto/boto/dynamodb2/results.py

Issue 1377933002: [catapult] - Copy Telemetry's gsutilz over to third_party. (Closed) Base URL: https://github.com/catapult-project/catapult.git@master
Patch Set: Rename to gsutil. Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/gsutil/third_party/boto/boto/dynamodb2/results.py
diff --git a/third_party/gsutil/third_party/boto/boto/dynamodb2/results.py b/third_party/gsutil/third_party/boto/boto/dynamodb2/results.py
new file mode 100644
index 0000000000000000000000000000000000000000..36f04d0a960908742ecdaeccf1eaa4ff4efd3409
--- /dev/null
+++ b/third_party/gsutil/third_party/boto/boto/dynamodb2/results.py
@@ -0,0 +1,204 @@
+class ResultSet(object):
+ """
+ A class used to lazily handle page-to-page navigation through a set of
+ results.
+
+ It presents a transparent iterator interface, so that all the user has
+ to do is use it in a typical ``for`` loop (or list comprehension, etc.)
+ to fetch results, even if they weren't present in the current page of
+ results.
+
+ This is used by the ``Table.query`` & ``Table.scan`` methods.
+
+ Example::
+
+ >>> users = Table('users')
+ >>> results = ResultSet()
+ >>> results.to_call(users.query, username__gte='johndoe')
+ # Now iterate. When it runs out of results, it'll fetch the next page.
+ >>> for res in results:
+ ... print res['username']
+
+ """
+ def __init__(self, max_page_size=None):
+ super(ResultSet, self).__init__()
+ self.the_callable = None
+ self.call_args = []
+ self.call_kwargs = {}
+ self._results = []
+ self._offset = -1
+ self._results_left = True
+ self._last_key_seen = None
+ self._fetches = 0
+ self._max_page_size = max_page_size
+ self._limit = None
+
+ @property
+ def first_key(self):
+ return 'exclusive_start_key'
+
+ def _reset(self):
+ """
+ Resets the internal state of the ``ResultSet``.
+
+ This prevents results from being cached long-term & consuming
+ excess memory.
+
+ Largely internal.
+ """
+ self._results = []
+ self._offset = 0
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ self._offset += 1
+
+ if self._offset >= len(self._results):
+ if self._results_left is False:
+ raise StopIteration()
+
+ self.fetch_more()
+
+ # It's possible that previous call to ``fetch_more`` may not return
+ # anything useful but there may be more results. Loop until we get
+ # something back, making sure we guard for no results left.
+ while not len(self._results) and self._results_left:
+ self.fetch_more()
+
+ if self._offset < len(self._results):
+ if self._limit is not None:
+ self._limit -= 1
+
+ if self._limit < 0:
+ raise StopIteration()
+
+ return self._results[self._offset]
+ else:
+ raise StopIteration()
+
+ next = __next__
+
+ def to_call(self, the_callable, *args, **kwargs):
+ """
+ Sets up the callable & any arguments to run it with.
+
+ This is stored for subsequent calls so that those queries can be
+ run without requiring user intervention.
+
+ Example::
+
+ # Just an example callable.
+ >>> def squares_to(y):
+ ... for x in range(1, y):
+ ... yield x**2
+ >>> rs = ResultSet()
+ # Set up what to call & arguments.
+ >>> rs.to_call(squares_to, y=3)
+
+ """
+ if not callable(the_callable):
+ raise ValueError(
+ 'You must supply an object or function to be called.'
+ )
+
+ # We pop the ``limit``, if present, to track how many we should return
+ # to the user. This isn't the same as the ``limit`` that the low-level
+ # DDB api calls use (which limit page size, not the overall result set).
+ self._limit = kwargs.pop('limit', None)
+
+ if self._limit is not None and self._limit < 0:
+ self._limit = None
+
+ self.the_callable = the_callable
+ self.call_args = args
+ self.call_kwargs = kwargs
+
+ def fetch_more(self):
+ """
+ When the iterator runs out of results, this method is run to re-execute
+ the callable (& arguments) to fetch the next page.
+
+ Largely internal.
+ """
+ self._reset()
+
+ args = self.call_args[:]
+ kwargs = self.call_kwargs.copy()
+
+ if self._last_key_seen is not None:
+ kwargs[self.first_key] = self._last_key_seen
+
+ # If the page size is greater than limit set them
+ # to the same value
+ if self._limit and self._max_page_size and self._max_page_size > self._limit:
+ self._max_page_size = self._limit
+
+ # Put in the max page size.
+ if self._max_page_size is not None:
+ kwargs['limit'] = self._max_page_size
+ elif self._limit is not None:
+ # If max_page_size is not set and limit is available
+ # use it as the page size
+ kwargs['limit'] = self._limit
+
+ results = self.the_callable(*args, **kwargs)
+ self._fetches += 1
+ new_results = results.get('results', [])
+ self._last_key_seen = results.get('last_key', None)
+
+ if len(new_results):
+ self._results.extend(results['results'])
+
+ # Check the limit, if it's present.
+ if self._limit is not None and self._limit >= 0:
+ limit = self._limit
+ limit -= len(results['results'])
+ # If we've exceeded the limit, we don't have any more
+ # results to look for.
+ if limit <= 0:
+ self._results_left = False
+
+ if self._last_key_seen is None:
+ self._results_left = False
+
+
+class BatchGetResultSet(ResultSet):
+ def __init__(self, *args, **kwargs):
+ self._keys_left = kwargs.pop('keys', [])
+ self._max_batch_get = kwargs.pop('max_batch_get', 100)
+ super(BatchGetResultSet, self).__init__(*args, **kwargs)
+
+ def fetch_more(self):
+ self._reset()
+
+ args = self.call_args[:]
+ kwargs = self.call_kwargs.copy()
+
+ # Slice off the max we can fetch.
+ kwargs['keys'] = self._keys_left[:self._max_batch_get]
+ self._keys_left = self._keys_left[self._max_batch_get:]
+
+ if len(self._keys_left) <= 0:
+ self._results_left = False
+
+ results = self.the_callable(*args, **kwargs)
+
+ if not len(results.get('results', [])):
+ return
+
+ self._results.extend(results['results'])
+
+ for offset, key_data in enumerate(results.get('unprocessed_keys', [])):
+ # We've got an unprocessed key. Reinsert it into the list.
+ # DynamoDB only returns valid keys, so there should be no risk of
+ # missing keys ever making it here.
+ self._keys_left.insert(offset, key_data)
+
+ if len(self._keys_left) > 0:
+ self._results_left = True
+
+ # Decrease the limit, if it's present.
+ if self.call_kwargs.get('limit'):
+ self.call_kwargs['limit'] -= len(results['results'])

Powered by Google App Engine
This is Rietveld 408576698