Unified Diff: appengine/monorail/services/

Issue 1868553004: Open Source Monorail (Closed) Base URL:
Patch Set: Rebase Created 4 years, 8 months ago
« no previous file with comments | « appengine/monorail/services/ ('k') | appengine/monorail/services/ » ('j') | no next file with comments »
Index: appengine/monorail/services/
diff --git a/appengine/monorail/services/ b/appengine/monorail/services/
new file mode 100644
index 0000000000000000000000000000000000000000..9c6ddb8d37dbab81eb9d9262f9437a11be1a4dcb
--- /dev/null
+++ b/appengine/monorail/services/
@@ -0,0 +1,154 @@
+# Copyright 2016 The Chromium Authors. All rights reserved.
+# Use of this source code is govered by a BSD-style
+# license that can be found in the LICENSE file or at
+"""A simple in-RAM cache with distributed invalidation.
+Here's how it works:
+ + Each frontend or backend job has one CacheManager which
+ owns a set of RamCache objects, which are basically dictionaries.
+ + Each job can put objects in its own local cache, and retrieve them.
+ + When an item is modified, the item at the corresponding cache key
+ is invalidated, which means two things: (a) it is dropped from the
+ local RAM cache, and (b) the key is written to the Invalidate table.
+ + On each incoming request, the job checks the Invalidate table for
+ any entries added since the last time that it checked. If it finds
+ any, it drops all RamCache entries for the corresponding key.
+ + There is also a cron task that truncates old Invalidate entries
+ when the table is too large. If a frontend job sees more than the
+ max Invalidate rows, it will drop everything from all caches,
+ because it does not know what it missed due to truncation.
+ + The special key 0 means to drop all cache entries.
+This approach makes jobs use cached values that are not stale at the
+time that processing of each request begins. There is no guarantee that
+an item will not be modified by some other job and that the cached entry
+could become stale during the lifetime of that same request.
+TODO(jrobbins): Listener hook so that client code can register its own
+handler for invalidation events. E.g., the sorting code has a cache that
+is correctly invalidated on each issue change, but needs to be completely
+dropped when a config is modified.
+TODO(jrobbins): If this part of the system becomes a bottleneck, consider
+some optimizations: (a) splitting the table into multiple tables by
+kind, or (b) sharding the table by cache_key. Or, maybe leverage memcache
+to avoid even hitting the DB in the frequent case where nothing has changed.
+import collections
+import logging
+from framework import jsonfeed
+from framework import sql
+from services import caches
+INVALIDATE_COLS = ['timestep', 'kind', 'cache_key']
+class CacheManager(object):
+ """Service class to manage RAM caches and shared Invalidate table."""
+ def __init__(self):
+ self.cache_registry = collections.defaultdict(list)
+ self.processed_invalidations_up_to = 0
+ self.invalidate_tbl = sql.SQLTableManager(INVALIDATE_TABLE_NAME)
+ def MakeCache(self, kind, max_size=None, use_value_centric_cache=False):
+ """Make a new cache and register it for future invalidations."""
+ if use_value_centric_cache:
+ cache = caches.ValueCentricRamCache(self, kind, max_size=max_size)
+ else:
+ cache = caches.RamCache(self, kind, max_size=max_size)
+ self.cache_registry[kind].append(cache)
+ return cache
+ def _InvalidateAllCaches(self):
+ """Invalidate all cache entries."""
+ for cache_list in self.cache_registry.values():
+ for cache in cache_list:
+ cache.LocalInvalidateAll()
+ def _ProcessInvalidationRows(self, rows):
+ """Invalidate cache entries indicated by database rows."""
+ for timestep, kind, key in rows:
+ self.processed_invalidations_up_to = max(
+ self.processed_invalidations_up_to, timestep)
+ for cache in self.cache_registry[kind]:
+ cache.LocalInvalidateAll()
+ else:
+ cache.LocalInvalidate(key)
+ def DoDistributedInvalidation(self, cnxn):
+ """Drop any cache entries that were invalidated by other jobs."""
+ # Only consider a reasonable number of rows so that we can never
+ # get bogged down on this step. If there are too many rows to
+ # process, just invalidate all caches, and process the last group
+ # of rows to update processed_invalidations_up_to.
+ rows = self.invalidate_tbl.Select(
+ cnxn, cols=INVALIDATE_COLS,
+ where=[('timestep > %s', [self.processed_invalidations_up_to])],
+ order_by=[('timestep DESC', [])],
+'Invaliditing all caches: there are too many invalidations')
+ self._InvalidateAllCaches()
+'Saw %d invalidation rows', len(rows))
+ self._ProcessInvalidationRows(rows)
+ def StoreInvalidateRows(self, cnxn, kind, keys):
+ """Store rows to let all jobs know to invalidate the given keys."""
+ assert kind in caches.INVALIDATE_KIND_VALUES
+ self.invalidate_tbl.InsertRows(
+ cnxn, ['kind', 'cache_key'], [(kind, key) for key in keys])
+ def StoreInvalidateAll(self, cnxn, kind):
+ """Store a value to tell all jobs to invalidate all items of this kind."""
+ last_timestep = self.invalidate_tbl.InsertRow(
+ cnxn, kind=kind, cache_key=INVALIDATE_ALL_KEYS)
+ self.invalidate_tbl.Delete(
+ cnxn, kind=kind, where=[('timestep < %s', [last_timestep])])
+class RamCacheConsolidate(jsonfeed.InternalTask):
+ """Drop old Invalidate rows when there are too many of them."""
+ def HandleRequest(self, mr):
+ """Drop excessive rows in the Invalidate table and return some stats.
+ Args:
+ mr: common information parsed from the HTTP request.
+ Returns:
+ Results dictionary in JSON format. The stats are just for debugging,
+ they are not used by any other part of the system.
+ """
+ tbl =
+ old_count = tbl.SelectValue(mr.cnxn, 'COUNT(*)')
+ # Delete anything other than the last 1000 rows because we won't
+ # look at them anyway. If a job gets a request and sees 1000 new
+ # rows, it will drop all caches of all types, so it is as if there
+ # were
+ kept_timesteps = tbl.Select(
+ mr.cnxn, ['timestep'],
+ order_by=[('timestep DESC', [])],
+ earliest_kept = kept_timesteps[-1][0]
+ tbl.Delete(mr.cnxn, where=[('timestep < %s', [earliest_kept])])
+ new_count = tbl.SelectValue(mr.cnxn, 'COUNT(*)')
+ return {
+ 'old_count': old_count,
+ 'new_count': new_count,
+ }
