Index: appengine/monorail/services/caches.py |
diff --git a/appengine/monorail/services/caches.py b/appengine/monorail/services/caches.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..ccb82c87b00f55833d5075cf3bfd80fcf7645be1 |
--- /dev/null |
+++ b/appengine/monorail/services/caches.py |
@@ -0,0 +1,303 @@ |
+# Copyright 2016 The Chromium Authors. All rights reserved. |
+# Use of this source code is govered by a BSD-style |
+# license that can be found in the LICENSE file or at |
+# https://developers.google.com/open-source/licenses/bsd |
+ |
+"""Classes to manage cached values. |
+ |
+Monorail makes full use of the RAM of GAE frontends to reduce latency |
+and load on the database. |
+""" |
+ |
+import logging |
+ |
+from protorpc import protobuf |
+ |
+from google.appengine.api import memcache |
+ |
+from framework import framework_constants |
+ |
+ |
+INVALIDATE_KIND_VALUES = ['user', 'project', 'issue', 'issue_id'] |
+DEFAULT_MAX_SIZE = 10000 |
+ |
+ |
+class RamCache(object): |
+ """An in-RAM cache with distributed invalidation.""" |
+ |
+ def __init__(self, cache_manager, kind, max_size=None): |
+ assert kind in INVALIDATE_KIND_VALUES |
+ self.cache_manager = cache_manager |
+ self.kind = kind |
+ self.cache = {} |
+ self.max_size = max_size or DEFAULT_MAX_SIZE |
+ |
+ def CacheItem(self, key, item): |
+ """Store item at key in this cache, discarding a random item if needed.""" |
+ if len(self.cache) >= self.max_size: |
+ self.cache.popitem() |
+ |
+ self.cache[key] = item |
+ |
+ def CacheAll(self, new_item_dict): |
+ """Cache all items in the given dict, dropping old items if needed.""" |
+ if len(new_item_dict) >= self.max_size: |
+ logging.warn('Dumping the entire cache! %s', self.kind) |
+ self.cache = {} |
+ else: |
+ while len(self.cache) + len(new_item_dict) > self.max_size: |
+ self.cache.popitem() |
+ |
+ self.cache.update(new_item_dict) |
+ |
+ def GetItem(self, key): |
+ """Return the cached item if present, otherwise None.""" |
+ return self.cache.get(key) |
+ |
+ def HasItem(self, key): |
+ """Return True if there is a value cached at the given key.""" |
+ return key in self.cache |
+ |
+ def GetAll(self, keys): |
+ """Look up the given keys. |
+ |
+ Args: |
+ keys: a list of cache keys to look up. |
+ |
+ Returns: |
+ A pair: (hits_dict, misses_list) where hits_dict is a dictionary of |
+ all the given keys and the values that were found in the cache, and |
+ misses_list is a list of given keys that were not in the cache. |
+ """ |
+ hits, misses = {}, [] |
+ for key in keys: |
+ try: |
+ hits[key] = self.cache[key] |
+ except KeyError: |
+ misses.append(key) |
+ |
+ return hits, misses |
+ |
+ def LocalInvalidate(self, key): |
+ """Drop the given key from this cache, without distributed notification.""" |
+ # logging.info('Locally invalidating %r in kind=%r', key, self.kind) |
+ self.cache.pop(key, None) |
+ |
+ def Invalidate(self, cnxn, key): |
+ """Drop key locally, and append it to the Invalidate DB table.""" |
+ self.InvalidateKeys(cnxn, [key]) |
+ |
+ def InvalidateKeys(self, cnxn, keys): |
+ """Drop keys locally, and append them to the Invalidate DB table.""" |
+ for key in keys: |
+ self.LocalInvalidate(key) |
+ if self.cache_manager: |
+ self.cache_manager.StoreInvalidateRows(cnxn, self.kind, keys) |
+ |
+ def LocalInvalidateAll(self): |
+ """Invalidate all keys locally: just start over with an empty dict.""" |
+ logging.info('Locally invalidating all in kind=%r', self.kind) |
+ self.cache = {} |
+ |
+ def InvalidateAll(self, cnxn): |
+ """Invalidate all keys in this cache.""" |
+ self.LocalInvalidateAll() |
+ if self.cache_manager: |
+ self.cache_manager.StoreInvalidateAll(cnxn, self.kind) |
+ |
+ |
+class ValueCentricRamCache(RamCache): |
+ """Specialized version of RamCache that stores values in InvalidateTable. |
+ |
+ This is useful for caches that have non integer keys. |
+ """ |
+ |
+ def LocalInvalidate(self, value): |
+ """Use the specified value to drop entries from the local cache.""" |
+ keys_to_drop = [] |
+ # Loop through and collect all keys with the specified value. |
+ for k, v in self.cache.iteritems(): |
+ if v == value: |
+ keys_to_drop.append(k) |
+ for k in keys_to_drop: |
+ self.cache.pop(k, None) |
+ |
+ def InvalidateKeys(self, cnxn, keys): |
+ """Drop keys locally, and append their values to the Invalidate DB table.""" |
+ # Find values to invalidate. |
+ values = [self.cache[key] for key in keys if self.cache.has_key(key)] |
+ if len(values) == len(keys): |
+ for value in values: |
+ self.LocalInvalidate(value) |
+ if self.cache_manager: |
+ self.cache_manager.StoreInvalidateRows(cnxn, self.kind, values) |
+ else: |
+ # If a value is not found in the cache then invalidate the whole cache. |
+ # This is done to ensure that we are not in an inconsistent state or in a |
+ # race condition. |
+ self.InvalidateAll(cnxn) |
+ |
+ |
+class AbstractTwoLevelCache(object): |
+ """A class to manage both RAM and memcache to retrieve objects. |
+ |
+ Subclasses must implement the FetchItems() method to get objects from |
+ the database when both caches miss. |
+ """ |
+ |
+ # When loading a huge number of issues from the database, do it in chunks |
+ # so as to avoid timeouts. |
+ _FETCH_BATCH_SIZE = 10000 |
+ |
+ def __init__( |
+ self, cache_manager, kind, memcache_prefix, pb_class, max_size=None, |
+ use_value_centric_cache=False): |
+ self.cache = cache_manager.MakeCache( |
+ kind, max_size=max_size, |
+ use_value_centric_cache=use_value_centric_cache) |
+ self.memcache_prefix = memcache_prefix |
+ self.pb_class = pb_class |
+ |
+ def CacheItem(self, key, value): |
+ """Add the given key-value pair to RAM and memcache.""" |
+ self.cache.CacheItem(key, value) |
+ self._WriteToMemcache({key: value}) |
+ |
+ def HasItem(self, key): |
+ """Return True if the given key is in the RAM cache.""" |
+ return self.cache.HasItem(key) |
+ |
+ def GetAnyOnHandItem(self, keys, start=None, end=None): |
+ """Try to find one of the specified items in RAM.""" |
+ if start is None: |
+ start = 0 |
+ if end is None: |
+ end = len(keys) |
+ for i in xrange(start, end): |
+ key = keys[i] |
+ if self.cache.HasItem(key): |
+ return self.cache.GetItem(key) |
+ |
+ # Note: We could check memcache here too, but the round-trips to memcache |
+ # are kind of slow. And, getting too many hits from memcache actually |
+ # fills our RAM cache too quickly and could lead to thrashing. |
+ |
+ return None |
+ |
+ def GetAll(self, cnxn, keys, use_cache=True, **kwargs): |
+ """Get values for the given keys from RAM, memcache, or the DB. |
+ |
+ Args: |
+ cnxn: connection to the database. |
+ keys: list of integer keys to look up. |
+ use_cache: set to False to always hit the database. |
+ **kwargs: any additional keywords are passed to FetchItems(). |
+ |
+ Returns: |
+ A pair: hits, misses. Where hits is {key: value} and misses is |
+ a list of any keys that were not found anywhere. |
+ """ |
+ if use_cache: |
+ result_dict, missed_keys = self.cache.GetAll(keys) |
+ else: |
+ result_dict, missed_keys = {}, list(keys) |
+ |
+ if missed_keys and use_cache: |
+ memcache_hits, missed_keys = self._ReadFromMemcache(missed_keys) |
+ result_dict.update(memcache_hits) |
+ |
+ while missed_keys: |
+ missed_batch = missed_keys[:self._FETCH_BATCH_SIZE] |
+ missed_keys = missed_keys[self._FETCH_BATCH_SIZE:] |
+ retrieved_dict = self.FetchItems(cnxn, missed_batch, **kwargs) |
+ result_dict.update(retrieved_dict) |
+ if use_cache: |
+ self.cache.CacheAll(retrieved_dict) |
+ self._WriteToMemcache(retrieved_dict) |
+ |
+ still_missing_keys = [key for key in keys if key not in result_dict] |
+ return result_dict, still_missing_keys |
+ |
+ def _ReadFromMemcache(self, keys): |
+ """Read the given keys from memcache, return {key: value}, missing_keys.""" |
+ memcache_hits = {} |
+ cached_dict = memcache.get_multi( |
+ [self._KeyToStr(key) for key in keys], key_prefix=self.memcache_prefix) |
+ |
+ for key_str, serialized_value in cached_dict.iteritems(): |
+ value = self._StrToValue(serialized_value) |
+ key = self._StrToKey(key_str) |
+ memcache_hits[key] = value |
+ self.cache.CacheItem(key, value) |
+ |
+ still_missing_keys = [key for key in keys if key not in memcache_hits] |
+ logging.info( |
+ 'decoded %d values from memcache %s, missing %d', |
+ len(memcache_hits), self.memcache_prefix, len(still_missing_keys)) |
+ return memcache_hits, still_missing_keys |
+ |
+ def _WriteToMemcache(self, retrieved_dict): |
+ """Write entries for each key-value pair to memcache. Encode PBs.""" |
+ strs_to_cache = { |
+ self._KeyToStr(key): self._ValueToStr(value) |
+ for key, value in retrieved_dict.iteritems()} |
+ memcache.set_multi( |
+ strs_to_cache, key_prefix=self.memcache_prefix, |
+ time=framework_constants.MEMCACHE_EXPIRATION) |
+ logging.info('cached batch of %d values in memcache %s', |
+ len(retrieved_dict), self.memcache_prefix) |
+ |
+ def _KeyToStr(self, key): |
+ """Convert our int IDs to strings for use as memcache keys.""" |
+ return str(key) |
+ |
+ def _StrToKey(self, key_str): |
+ """Convert memcache keys back to the ints that we use as IDs.""" |
+ return int(key_str) |
+ |
+ def _ValueToStr(self, value): |
+ """Serialize an application object so that it can be stored in memcache.""" |
+ if not self.pb_class: |
+ return value |
+ elif self.pb_class == int: |
+ return str(value) |
+ else: |
+ return protobuf.encode_message(value) |
+ |
+ def _StrToValue(self, serialized_value): |
+ """Deserialize an application object that was stored in memcache.""" |
+ if not self.pb_class: |
+ return serialized_value |
+ elif self.pb_class == int: |
+ return int(serialized_value) |
+ else: |
+ return protobuf.decode_message(self.pb_class, serialized_value) |
+ |
+ def InvalidateKeys(self, cnxn, keys): |
+ """Drop the given keys from both RAM and memcache.""" |
+ self.cache.InvalidateKeys(cnxn, keys) |
+ memcache.delete_multi( |
+ [self._KeyToStr(key) for key in keys], key_prefix=self.memcache_prefix) |
+ |
+ def InvalidateAllKeys(self, cnxn, keys): |
+ """Drop the given keys from memcache and invalidate all keys in RAM. |
+ |
+ Useful for avoiding inserting many rows into the Invalidate table when |
+ invalidating a large group of keys all at once. Only use when necessary. |
+ """ |
+ self.cache.InvalidateAll(cnxn) |
+ memcache.delete_multi( |
+ [self._KeyToStr(key) for key in keys], key_prefix=self.memcache_prefix) |
+ |
+ def GetAllAlreadyInRam(self, keys): |
+ """Look only in RAM to return {key: values}, missed_keys.""" |
+ result_dict, missed_keys = self.cache.GetAll(keys) |
+ return result_dict, missed_keys |
+ |
+ def InvalidateAllRamEntries(self, cnxn): |
+ """Drop all RAM cache entries. It will refill as needed from memcache.""" |
+ self.cache.InvalidateAll(cnxn) |
+ |
+ def FetchItems(self, cnxn, keys, **kwargs): |
+ """On RAM and memcache miss, hit the database.""" |
+ raise NotImplementedError() |