Index: appengine/monorail/services/cachemanager_svc.py |
diff --git a/appengine/monorail/services/cachemanager_svc.py b/appengine/monorail/services/cachemanager_svc.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..9c6ddb8d37dbab81eb9d9262f9437a11be1a4dcb |
--- /dev/null |
+++ b/appengine/monorail/services/cachemanager_svc.py |
@@ -0,0 +1,154 @@ |
+# Copyright 2016 The Chromium Authors. All rights reserved. |
+# Use of this source code is govered by a BSD-style |
+# license that can be found in the LICENSE file or at |
+# https://developers.google.com/open-source/licenses/bsd |
+ |
+"""A simple in-RAM cache with distributed invalidation. |
+ |
+Here's how it works: |
+ + Each frontend or backend job has one CacheManager which |
+ owns a set of RamCache objects, which are basically dictionaries. |
+ + Each job can put objects in its own local cache, and retrieve them. |
+ + When an item is modified, the item at the corresponding cache key |
+ is invalidated, which means two things: (a) it is dropped from the |
+ local RAM cache, and (b) the key is written to the Invalidate table. |
+ + On each incoming request, the job checks the Invalidate table for |
+ any entries added since the last time that it checked. If it finds |
+ any, it drops all RamCache entries for the corresponding key. |
+ + There is also a cron task that truncates old Invalidate entries |
+ when the table is too large. If a frontend job sees more than the |
+ max Invalidate rows, it will drop everything from all caches, |
+ because it does not know what it missed due to truncation. |
+ + The special key 0 means to drop all cache entries. |
+ |
+This approach makes jobs use cached values that are not stale at the |
+time that processing of each request begins. There is no guarantee that |
+an item will not be modified by some other job and that the cached entry |
+could become stale during the lifetime of that same request. |
+ |
+TODO(jrobbins): Listener hook so that client code can register its own |
+handler for invalidation events. E.g., the sorting code has a cache that |
+is correctly invalidated on each issue change, but needs to be completely |
+dropped when a config is modified. |
+ |
+TODO(jrobbins): If this part of the system becomes a bottleneck, consider |
+some optimizations: (a) splitting the table into multiple tables by |
+kind, or (b) sharding the table by cache_key. Or, maybe leverage memcache |
+to avoid even hitting the DB in the frequent case where nothing has changed. |
+""" |
+ |
+import collections |
+import logging |
+ |
+from framework import jsonfeed |
+from framework import sql |
+from services import caches |
+ |
+ |
+INVALIDATE_TABLE_NAME = 'Invalidate' |
+INVALIDATE_COLS = ['timestep', 'kind', 'cache_key'] |
+INVALIDATE_ALL_KEYS = 0 |
+MAX_INVALIDATE_ROWS_TO_CONSIDER = 1000 |
+ |
+ |
+class CacheManager(object): |
+ """Service class to manage RAM caches and shared Invalidate table.""" |
+ |
+ def __init__(self): |
+ self.cache_registry = collections.defaultdict(list) |
+ self.processed_invalidations_up_to = 0 |
+ self.invalidate_tbl = sql.SQLTableManager(INVALIDATE_TABLE_NAME) |
+ |
+ def MakeCache(self, kind, max_size=None, use_value_centric_cache=False): |
+ """Make a new cache and register it for future invalidations.""" |
+ if use_value_centric_cache: |
+ cache = caches.ValueCentricRamCache(self, kind, max_size=max_size) |
+ else: |
+ cache = caches.RamCache(self, kind, max_size=max_size) |
+ self.cache_registry[kind].append(cache) |
+ return cache |
+ |
+ def _InvalidateAllCaches(self): |
+ """Invalidate all cache entries.""" |
+ for cache_list in self.cache_registry.values(): |
+ for cache in cache_list: |
+ cache.LocalInvalidateAll() |
+ |
+ def _ProcessInvalidationRows(self, rows): |
+ """Invalidate cache entries indicated by database rows.""" |
+ for timestep, kind, key in rows: |
+ self.processed_invalidations_up_to = max( |
+ self.processed_invalidations_up_to, timestep) |
+ for cache in self.cache_registry[kind]: |
+ if key == INVALIDATE_ALL_KEYS: |
+ cache.LocalInvalidateAll() |
+ else: |
+ cache.LocalInvalidate(key) |
+ |
+ def DoDistributedInvalidation(self, cnxn): |
+ """Drop any cache entries that were invalidated by other jobs.""" |
+ # Only consider a reasonable number of rows so that we can never |
+ # get bogged down on this step. If there are too many rows to |
+ # process, just invalidate all caches, and process the last group |
+ # of rows to update processed_invalidations_up_to. |
+ rows = self.invalidate_tbl.Select( |
+ cnxn, cols=INVALIDATE_COLS, |
+ where=[('timestep > %s', [self.processed_invalidations_up_to])], |
+ order_by=[('timestep DESC', [])], |
+ limit=MAX_INVALIDATE_ROWS_TO_CONSIDER) |
+ |
+ if len(rows) == MAX_INVALIDATE_ROWS_TO_CONSIDER: |
+ logging.info('Invaliditing all caches: there are too many invalidations') |
+ self._InvalidateAllCaches() |
+ |
+ logging.info('Saw %d invalidation rows', len(rows)) |
+ self._ProcessInvalidationRows(rows) |
+ |
+ def StoreInvalidateRows(self, cnxn, kind, keys): |
+ """Store rows to let all jobs know to invalidate the given keys.""" |
+ assert kind in caches.INVALIDATE_KIND_VALUES |
+ self.invalidate_tbl.InsertRows( |
+ cnxn, ['kind', 'cache_key'], [(kind, key) for key in keys]) |
+ |
+ def StoreInvalidateAll(self, cnxn, kind): |
+ """Store a value to tell all jobs to invalidate all items of this kind.""" |
+ last_timestep = self.invalidate_tbl.InsertRow( |
+ cnxn, kind=kind, cache_key=INVALIDATE_ALL_KEYS) |
+ self.invalidate_tbl.Delete( |
+ cnxn, kind=kind, where=[('timestep < %s', [last_timestep])]) |
+ |
+ |
+class RamCacheConsolidate(jsonfeed.InternalTask): |
+ """Drop old Invalidate rows when there are too many of them.""" |
+ |
+ def HandleRequest(self, mr): |
+ """Drop excessive rows in the Invalidate table and return some stats. |
+ |
+ Args: |
+ mr: common information parsed from the HTTP request. |
+ |
+ Returns: |
+ Results dictionary in JSON format. The stats are just for debugging, |
+ they are not used by any other part of the system. |
+ """ |
+ tbl = self.services.cache_manager.invalidate_tbl |
+ old_count = tbl.SelectValue(mr.cnxn, 'COUNT(*)') |
+ |
+ # Delete anything other than the last 1000 rows because we won't |
+ # look at them anyway. If a job gets a request and sees 1000 new |
+ # rows, it will drop all caches of all types, so it is as if there |
+ # were |
+ if old_count > MAX_INVALIDATE_ROWS_TO_CONSIDER: |
+ kept_timesteps = tbl.Select( |
+ mr.cnxn, ['timestep'], |
+ order_by=[('timestep DESC', [])], |
+ limit=MAX_INVALIDATE_ROWS_TO_CONSIDER) |
+ earliest_kept = kept_timesteps[-1][0] |
+ tbl.Delete(mr.cnxn, where=[('timestep < %s', [earliest_kept])]) |
+ |
+ new_count = tbl.SelectValue(mr.cnxn, 'COUNT(*)') |
+ |
+ return { |
+ 'old_count': old_count, |
+ 'new_count': new_count, |
+ } |