Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(46)

Side by Side Diff: appengine/monorail/services/cachemanager_svc.py

Issue 1868553004: Open Source Monorail (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Rebase Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/monorail/services/api_svc_v1.py ('k') | appengine/monorail/services/caches.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is govered by a BSD-style
3 # license that can be found in the LICENSE file or at
4 # https://developers.google.com/open-source/licenses/bsd
5
6 """A simple in-RAM cache with distributed invalidation.
7
8 Here's how it works:
9 + Each frontend or backend job has one CacheManager which
10 owns a set of RamCache objects, which are basically dictionaries.
11 + Each job can put objects in its own local cache, and retrieve them.
12 + When an item is modified, the item at the corresponding cache key
13 is invalidated, which means two things: (a) it is dropped from the
14 local RAM cache, and (b) the key is written to the Invalidate table.
15 + On each incoming request, the job checks the Invalidate table for
16 any entries added since the last time that it checked. If it finds
17 any, it drops all RamCache entries for the corresponding key.
18 + There is also a cron task that truncates old Invalidate entries
19 when the table is too large. If a frontend job sees more than the
20 max Invalidate rows, it will drop everything from all caches,
21 because it does not know what it missed due to truncation.
22 + The special key 0 means to drop all cache entries.
23
24 This approach makes jobs use cached values that are not stale at the
25 time that processing of each request begins. There is no guarantee that
26 an item will not be modified by some other job and that the cached entry
27 could become stale during the lifetime of that same request.
28
29 TODO(jrobbins): Listener hook so that client code can register its own
30 handler for invalidation events. E.g., the sorting code has a cache that
31 is correctly invalidated on each issue change, but needs to be completely
32 dropped when a config is modified.
33
34 TODO(jrobbins): If this part of the system becomes a bottleneck, consider
35 some optimizations: (a) splitting the table into multiple tables by
36 kind, or (b) sharding the table by cache_key. Or, maybe leverage memcache
37 to avoid even hitting the DB in the frequent case where nothing has changed.
38 """
39
40 import collections
41 import logging
42
43 from framework import jsonfeed
44 from framework import sql
45 from services import caches
46
47
48 INVALIDATE_TABLE_NAME = 'Invalidate'
49 INVALIDATE_COLS = ['timestep', 'kind', 'cache_key']
50 INVALIDATE_ALL_KEYS = 0
51 MAX_INVALIDATE_ROWS_TO_CONSIDER = 1000
52
53
54 class CacheManager(object):
55 """Service class to manage RAM caches and shared Invalidate table."""
56
57 def __init__(self):
58 self.cache_registry = collections.defaultdict(list)
59 self.processed_invalidations_up_to = 0
60 self.invalidate_tbl = sql.SQLTableManager(INVALIDATE_TABLE_NAME)
61
62 def MakeCache(self, kind, max_size=None, use_value_centric_cache=False):
63 """Make a new cache and register it for future invalidations."""
64 if use_value_centric_cache:
65 cache = caches.ValueCentricRamCache(self, kind, max_size=max_size)
66 else:
67 cache = caches.RamCache(self, kind, max_size=max_size)
68 self.cache_registry[kind].append(cache)
69 return cache
70
71 def _InvalidateAllCaches(self):
72 """Invalidate all cache entries."""
73 for cache_list in self.cache_registry.values():
74 for cache in cache_list:
75 cache.LocalInvalidateAll()
76
77 def _ProcessInvalidationRows(self, rows):
78 """Invalidate cache entries indicated by database rows."""
79 for timestep, kind, key in rows:
80 self.processed_invalidations_up_to = max(
81 self.processed_invalidations_up_to, timestep)
82 for cache in self.cache_registry[kind]:
83 if key == INVALIDATE_ALL_KEYS:
84 cache.LocalInvalidateAll()
85 else:
86 cache.LocalInvalidate(key)
87
88 def DoDistributedInvalidation(self, cnxn):
89 """Drop any cache entries that were invalidated by other jobs."""
90 # Only consider a reasonable number of rows so that we can never
91 # get bogged down on this step. If there are too many rows to
92 # process, just invalidate all caches, and process the last group
93 # of rows to update processed_invalidations_up_to.
94 rows = self.invalidate_tbl.Select(
95 cnxn, cols=INVALIDATE_COLS,
96 where=[('timestep > %s', [self.processed_invalidations_up_to])],
97 order_by=[('timestep DESC', [])],
98 limit=MAX_INVALIDATE_ROWS_TO_CONSIDER)
99
100 if len(rows) == MAX_INVALIDATE_ROWS_TO_CONSIDER:
101 logging.info('Invaliditing all caches: there are too many invalidations')
102 self._InvalidateAllCaches()
103
104 logging.info('Saw %d invalidation rows', len(rows))
105 self._ProcessInvalidationRows(rows)
106
107 def StoreInvalidateRows(self, cnxn, kind, keys):
108 """Store rows to let all jobs know to invalidate the given keys."""
109 assert kind in caches.INVALIDATE_KIND_VALUES
110 self.invalidate_tbl.InsertRows(
111 cnxn, ['kind', 'cache_key'], [(kind, key) for key in keys])
112
113 def StoreInvalidateAll(self, cnxn, kind):
114 """Store a value to tell all jobs to invalidate all items of this kind."""
115 last_timestep = self.invalidate_tbl.InsertRow(
116 cnxn, kind=kind, cache_key=INVALIDATE_ALL_KEYS)
117 self.invalidate_tbl.Delete(
118 cnxn, kind=kind, where=[('timestep < %s', [last_timestep])])
119
120
121 class RamCacheConsolidate(jsonfeed.InternalTask):
122 """Drop old Invalidate rows when there are too many of them."""
123
124 def HandleRequest(self, mr):
125 """Drop excessive rows in the Invalidate table and return some stats.
126
127 Args:
128 mr: common information parsed from the HTTP request.
129
130 Returns:
131 Results dictionary in JSON format. The stats are just for debugging,
132 they are not used by any other part of the system.
133 """
134 tbl = self.services.cache_manager.invalidate_tbl
135 old_count = tbl.SelectValue(mr.cnxn, 'COUNT(*)')
136
137 # Delete anything other than the last 1000 rows because we won't
138 # look at them anyway. If a job gets a request and sees 1000 new
139 # rows, it will drop all caches of all types, so it is as if there
140 # were
141 if old_count > MAX_INVALIDATE_ROWS_TO_CONSIDER:
142 kept_timesteps = tbl.Select(
143 mr.cnxn, ['timestep'],
144 order_by=[('timestep DESC', [])],
145 limit=MAX_INVALIDATE_ROWS_TO_CONSIDER)
146 earliest_kept = kept_timesteps[-1][0]
147 tbl.Delete(mr.cnxn, where=[('timestep < %s', [earliest_kept])])
148
149 new_count = tbl.SelectValue(mr.cnxn, 'COUNT(*)')
150
151 return {
152 'old_count': old_count,
153 'new_count': new_count,
154 }
OLDNEW
« no previous file with comments | « appengine/monorail/services/api_svc_v1.py ('k') | appengine/monorail/services/caches.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698