Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2898)

Unified Diff: appengine/monorail/services/cachemanager_svc.py

Issue 1868553004: Open Source Monorail (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Rebase Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « appengine/monorail/services/api_svc_v1.py ('k') | appengine/monorail/services/caches.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: appengine/monorail/services/cachemanager_svc.py
diff --git a/appengine/monorail/services/cachemanager_svc.py b/appengine/monorail/services/cachemanager_svc.py
new file mode 100644
index 0000000000000000000000000000000000000000..9c6ddb8d37dbab81eb9d9262f9437a11be1a4dcb
--- /dev/null
+++ b/appengine/monorail/services/cachemanager_svc.py
@@ -0,0 +1,154 @@
+# Copyright 2016 The Chromium Authors. All rights reserved.
+# Use of this source code is govered by a BSD-style
+# license that can be found in the LICENSE file or at
+# https://developers.google.com/open-source/licenses/bsd
+
+"""A simple in-RAM cache with distributed invalidation.
+
+Here's how it works:
+ + Each frontend or backend job has one CacheManager which
+ owns a set of RamCache objects, which are basically dictionaries.
+ + Each job can put objects in its own local cache, and retrieve them.
+ + When an item is modified, the item at the corresponding cache key
+ is invalidated, which means two things: (a) it is dropped from the
+ local RAM cache, and (b) the key is written to the Invalidate table.
+ + On each incoming request, the job checks the Invalidate table for
+ any entries added since the last time that it checked. If it finds
+ any, it drops all RamCache entries for the corresponding key.
+ + There is also a cron task that truncates old Invalidate entries
+ when the table is too large. If a frontend job sees more than the
+ max Invalidate rows, it will drop everything from all caches,
+ because it does not know what it missed due to truncation.
+ + The special key 0 means to drop all cache entries.
+
+This approach makes jobs use cached values that are not stale at the
+time that processing of each request begins. There is no guarantee that
+an item will not be modified by some other job and that the cached entry
+could become stale during the lifetime of that same request.
+
+TODO(jrobbins): Listener hook so that client code can register its own
+handler for invalidation events. E.g., the sorting code has a cache that
+is correctly invalidated on each issue change, but needs to be completely
+dropped when a config is modified.
+
+TODO(jrobbins): If this part of the system becomes a bottleneck, consider
+some optimizations: (a) splitting the table into multiple tables by
+kind, or (b) sharding the table by cache_key. Or, maybe leverage memcache
+to avoid even hitting the DB in the frequent case where nothing has changed.
+"""
+
+import collections
+import logging
+
+from framework import jsonfeed
+from framework import sql
+from services import caches
+
+
+INVALIDATE_TABLE_NAME = 'Invalidate'
+INVALIDATE_COLS = ['timestep', 'kind', 'cache_key']
+INVALIDATE_ALL_KEYS = 0
+MAX_INVALIDATE_ROWS_TO_CONSIDER = 1000
+
+
+class CacheManager(object):
+ """Service class to manage RAM caches and shared Invalidate table."""
+
+ def __init__(self):
+ self.cache_registry = collections.defaultdict(list)
+ self.processed_invalidations_up_to = 0
+ self.invalidate_tbl = sql.SQLTableManager(INVALIDATE_TABLE_NAME)
+
+ def MakeCache(self, kind, max_size=None, use_value_centric_cache=False):
+ """Make a new cache and register it for future invalidations."""
+ if use_value_centric_cache:
+ cache = caches.ValueCentricRamCache(self, kind, max_size=max_size)
+ else:
+ cache = caches.RamCache(self, kind, max_size=max_size)
+ self.cache_registry[kind].append(cache)
+ return cache
+
+ def _InvalidateAllCaches(self):
+ """Invalidate all cache entries."""
+ for cache_list in self.cache_registry.values():
+ for cache in cache_list:
+ cache.LocalInvalidateAll()
+
+ def _ProcessInvalidationRows(self, rows):
+ """Invalidate cache entries indicated by database rows."""
+ for timestep, kind, key in rows:
+ self.processed_invalidations_up_to = max(
+ self.processed_invalidations_up_to, timestep)
+ for cache in self.cache_registry[kind]:
+ if key == INVALIDATE_ALL_KEYS:
+ cache.LocalInvalidateAll()
+ else:
+ cache.LocalInvalidate(key)
+
+ def DoDistributedInvalidation(self, cnxn):
+ """Drop any cache entries that were invalidated by other jobs."""
+ # Only consider a reasonable number of rows so that we can never
+ # get bogged down on this step. If there are too many rows to
+ # process, just invalidate all caches, and process the last group
+ # of rows to update processed_invalidations_up_to.
+ rows = self.invalidate_tbl.Select(
+ cnxn, cols=INVALIDATE_COLS,
+ where=[('timestep > %s', [self.processed_invalidations_up_to])],
+ order_by=[('timestep DESC', [])],
+ limit=MAX_INVALIDATE_ROWS_TO_CONSIDER)
+
+ if len(rows) == MAX_INVALIDATE_ROWS_TO_CONSIDER:
+ logging.info('Invaliditing all caches: there are too many invalidations')
+ self._InvalidateAllCaches()
+
+ logging.info('Saw %d invalidation rows', len(rows))
+ self._ProcessInvalidationRows(rows)
+
+ def StoreInvalidateRows(self, cnxn, kind, keys):
+ """Store rows to let all jobs know to invalidate the given keys."""
+ assert kind in caches.INVALIDATE_KIND_VALUES
+ self.invalidate_tbl.InsertRows(
+ cnxn, ['kind', 'cache_key'], [(kind, key) for key in keys])
+
+ def StoreInvalidateAll(self, cnxn, kind):
+ """Store a value to tell all jobs to invalidate all items of this kind."""
+ last_timestep = self.invalidate_tbl.InsertRow(
+ cnxn, kind=kind, cache_key=INVALIDATE_ALL_KEYS)
+ self.invalidate_tbl.Delete(
+ cnxn, kind=kind, where=[('timestep < %s', [last_timestep])])
+
+
+class RamCacheConsolidate(jsonfeed.InternalTask):
+ """Drop old Invalidate rows when there are too many of them."""
+
+ def HandleRequest(self, mr):
+ """Drop excessive rows in the Invalidate table and return some stats.
+
+ Args:
+ mr: common information parsed from the HTTP request.
+
+ Returns:
+ Results dictionary in JSON format. The stats are just for debugging,
+ they are not used by any other part of the system.
+ """
+ tbl = self.services.cache_manager.invalidate_tbl
+ old_count = tbl.SelectValue(mr.cnxn, 'COUNT(*)')
+
+ # Delete anything other than the last 1000 rows because we won't
+ # look at them anyway. If a job gets a request and sees 1000 new
+ # rows, it will drop all caches of all types, so it is as if there
+ # were
+ if old_count > MAX_INVALIDATE_ROWS_TO_CONSIDER:
+ kept_timesteps = tbl.Select(
+ mr.cnxn, ['timestep'],
+ order_by=[('timestep DESC', [])],
+ limit=MAX_INVALIDATE_ROWS_TO_CONSIDER)
+ earliest_kept = kept_timesteps[-1][0]
+ tbl.Delete(mr.cnxn, where=[('timestep < %s', [earliest_kept])])
+
+ new_count = tbl.SelectValue(mr.cnxn, 'COUNT(*)')
+
+ return {
+ 'old_count': old_count,
+ 'new_count': new_count,
+ }
« no previous file with comments | « appengine/monorail/services/api_svc_v1.py ('k') | appengine/monorail/services/caches.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698