Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(135)

Side by Side Diff: appengine/monorail/services/caches.py

Issue 1868553004: Open Source Monorail (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Rebase Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is govered by a BSD-style
3 # license that can be found in the LICENSE file or at
4 # https://developers.google.com/open-source/licenses/bsd
5
6 """Classes to manage cached values.
7
8 Monorail makes full use of the RAM of GAE frontends to reduce latency
9 and load on the database.
10 """
11
12 import logging
13
14 from protorpc import protobuf
15
16 from google.appengine.api import memcache
17
18 from framework import framework_constants
19
20
21 INVALIDATE_KIND_VALUES = ['user', 'project', 'issue', 'issue_id']
22 DEFAULT_MAX_SIZE = 10000
23
24
25 class RamCache(object):
26 """An in-RAM cache with distributed invalidation."""
27
28 def __init__(self, cache_manager, kind, max_size=None):
29 assert kind in INVALIDATE_KIND_VALUES
30 self.cache_manager = cache_manager
31 self.kind = kind
32 self.cache = {}
33 self.max_size = max_size or DEFAULT_MAX_SIZE
34
35 def CacheItem(self, key, item):
36 """Store item at key in this cache, discarding a random item if needed."""
37 if len(self.cache) >= self.max_size:
38 self.cache.popitem()
39
40 self.cache[key] = item
41
42 def CacheAll(self, new_item_dict):
43 """Cache all items in the given dict, dropping old items if needed."""
44 if len(new_item_dict) >= self.max_size:
45 logging.warn('Dumping the entire cache! %s', self.kind)
46 self.cache = {}
47 else:
48 while len(self.cache) + len(new_item_dict) > self.max_size:
49 self.cache.popitem()
50
51 self.cache.update(new_item_dict)
52
53 def GetItem(self, key):
54 """Return the cached item if present, otherwise None."""
55 return self.cache.get(key)
56
57 def HasItem(self, key):
58 """Return True if there is a value cached at the given key."""
59 return key in self.cache
60
61 def GetAll(self, keys):
62 """Look up the given keys.
63
64 Args:
65 keys: a list of cache keys to look up.
66
67 Returns:
68 A pair: (hits_dict, misses_list) where hits_dict is a dictionary of
69 all the given keys and the values that were found in the cache, and
70 misses_list is a list of given keys that were not in the cache.
71 """
72 hits, misses = {}, []
73 for key in keys:
74 try:
75 hits[key] = self.cache[key]
76 except KeyError:
77 misses.append(key)
78
79 return hits, misses
80
81 def LocalInvalidate(self, key):
82 """Drop the given key from this cache, without distributed notification."""
83 # logging.info('Locally invalidating %r in kind=%r', key, self.kind)
84 self.cache.pop(key, None)
85
86 def Invalidate(self, cnxn, key):
87 """Drop key locally, and append it to the Invalidate DB table."""
88 self.InvalidateKeys(cnxn, [key])
89
90 def InvalidateKeys(self, cnxn, keys):
91 """Drop keys locally, and append them to the Invalidate DB table."""
92 for key in keys:
93 self.LocalInvalidate(key)
94 if self.cache_manager:
95 self.cache_manager.StoreInvalidateRows(cnxn, self.kind, keys)
96
97 def LocalInvalidateAll(self):
98 """Invalidate all keys locally: just start over with an empty dict."""
99 logging.info('Locally invalidating all in kind=%r', self.kind)
100 self.cache = {}
101
102 def InvalidateAll(self, cnxn):
103 """Invalidate all keys in this cache."""
104 self.LocalInvalidateAll()
105 if self.cache_manager:
106 self.cache_manager.StoreInvalidateAll(cnxn, self.kind)
107
108
109 class ValueCentricRamCache(RamCache):
110 """Specialized version of RamCache that stores values in InvalidateTable.
111
112 This is useful for caches that have non integer keys.
113 """
114
115 def LocalInvalidate(self, value):
116 """Use the specified value to drop entries from the local cache."""
117 keys_to_drop = []
118 # Loop through and collect all keys with the specified value.
119 for k, v in self.cache.iteritems():
120 if v == value:
121 keys_to_drop.append(k)
122 for k in keys_to_drop:
123 self.cache.pop(k, None)
124
125 def InvalidateKeys(self, cnxn, keys):
126 """Drop keys locally, and append their values to the Invalidate DB table."""
127 # Find values to invalidate.
128 values = [self.cache[key] for key in keys if self.cache.has_key(key)]
129 if len(values) == len(keys):
130 for value in values:
131 self.LocalInvalidate(value)
132 if self.cache_manager:
133 self.cache_manager.StoreInvalidateRows(cnxn, self.kind, values)
134 else:
135 # If a value is not found in the cache then invalidate the whole cache.
136 # This is done to ensure that we are not in an inconsistent state or in a
137 # race condition.
138 self.InvalidateAll(cnxn)
139
140
141 class AbstractTwoLevelCache(object):
142 """A class to manage both RAM and memcache to retrieve objects.
143
144 Subclasses must implement the FetchItems() method to get objects from
145 the database when both caches miss.
146 """
147
148 # When loading a huge number of issues from the database, do it in chunks
149 # so as to avoid timeouts.
150 _FETCH_BATCH_SIZE = 10000
151
152 def __init__(
153 self, cache_manager, kind, memcache_prefix, pb_class, max_size=None,
154 use_value_centric_cache=False):
155 self.cache = cache_manager.MakeCache(
156 kind, max_size=max_size,
157 use_value_centric_cache=use_value_centric_cache)
158 self.memcache_prefix = memcache_prefix
159 self.pb_class = pb_class
160
161 def CacheItem(self, key, value):
162 """Add the given key-value pair to RAM and memcache."""
163 self.cache.CacheItem(key, value)
164 self._WriteToMemcache({key: value})
165
166 def HasItem(self, key):
167 """Return True if the given key is in the RAM cache."""
168 return self.cache.HasItem(key)
169
170 def GetAnyOnHandItem(self, keys, start=None, end=None):
171 """Try to find one of the specified items in RAM."""
172 if start is None:
173 start = 0
174 if end is None:
175 end = len(keys)
176 for i in xrange(start, end):
177 key = keys[i]
178 if self.cache.HasItem(key):
179 return self.cache.GetItem(key)
180
181 # Note: We could check memcache here too, but the round-trips to memcache
182 # are kind of slow. And, getting too many hits from memcache actually
183 # fills our RAM cache too quickly and could lead to thrashing.
184
185 return None
186
187 def GetAll(self, cnxn, keys, use_cache=True, **kwargs):
188 """Get values for the given keys from RAM, memcache, or the DB.
189
190 Args:
191 cnxn: connection to the database.
192 keys: list of integer keys to look up.
193 use_cache: set to False to always hit the database.
194 **kwargs: any additional keywords are passed to FetchItems().
195
196 Returns:
197 A pair: hits, misses. Where hits is {key: value} and misses is
198 a list of any keys that were not found anywhere.
199 """
200 if use_cache:
201 result_dict, missed_keys = self.cache.GetAll(keys)
202 else:
203 result_dict, missed_keys = {}, list(keys)
204
205 if missed_keys and use_cache:
206 memcache_hits, missed_keys = self._ReadFromMemcache(missed_keys)
207 result_dict.update(memcache_hits)
208
209 while missed_keys:
210 missed_batch = missed_keys[:self._FETCH_BATCH_SIZE]
211 missed_keys = missed_keys[self._FETCH_BATCH_SIZE:]
212 retrieved_dict = self.FetchItems(cnxn, missed_batch, **kwargs)
213 result_dict.update(retrieved_dict)
214 if use_cache:
215 self.cache.CacheAll(retrieved_dict)
216 self._WriteToMemcache(retrieved_dict)
217
218 still_missing_keys = [key for key in keys if key not in result_dict]
219 return result_dict, still_missing_keys
220
221 def _ReadFromMemcache(self, keys):
222 """Read the given keys from memcache, return {key: value}, missing_keys."""
223 memcache_hits = {}
224 cached_dict = memcache.get_multi(
225 [self._KeyToStr(key) for key in keys], key_prefix=self.memcache_prefix)
226
227 for key_str, serialized_value in cached_dict.iteritems():
228 value = self._StrToValue(serialized_value)
229 key = self._StrToKey(key_str)
230 memcache_hits[key] = value
231 self.cache.CacheItem(key, value)
232
233 still_missing_keys = [key for key in keys if key not in memcache_hits]
234 logging.info(
235 'decoded %d values from memcache %s, missing %d',
236 len(memcache_hits), self.memcache_prefix, len(still_missing_keys))
237 return memcache_hits, still_missing_keys
238
239 def _WriteToMemcache(self, retrieved_dict):
240 """Write entries for each key-value pair to memcache. Encode PBs."""
241 strs_to_cache = {
242 self._KeyToStr(key): self._ValueToStr(value)
243 for key, value in retrieved_dict.iteritems()}
244 memcache.set_multi(
245 strs_to_cache, key_prefix=self.memcache_prefix,
246 time=framework_constants.MEMCACHE_EXPIRATION)
247 logging.info('cached batch of %d values in memcache %s',
248 len(retrieved_dict), self.memcache_prefix)
249
250 def _KeyToStr(self, key):
251 """Convert our int IDs to strings for use as memcache keys."""
252 return str(key)
253
254 def _StrToKey(self, key_str):
255 """Convert memcache keys back to the ints that we use as IDs."""
256 return int(key_str)
257
258 def _ValueToStr(self, value):
259 """Serialize an application object so that it can be stored in memcache."""
260 if not self.pb_class:
261 return value
262 elif self.pb_class == int:
263 return str(value)
264 else:
265 return protobuf.encode_message(value)
266
267 def _StrToValue(self, serialized_value):
268 """Deserialize an application object that was stored in memcache."""
269 if not self.pb_class:
270 return serialized_value
271 elif self.pb_class == int:
272 return int(serialized_value)
273 else:
274 return protobuf.decode_message(self.pb_class, serialized_value)
275
276 def InvalidateKeys(self, cnxn, keys):
277 """Drop the given keys from both RAM and memcache."""
278 self.cache.InvalidateKeys(cnxn, keys)
279 memcache.delete_multi(
280 [self._KeyToStr(key) for key in keys], key_prefix=self.memcache_prefix)
281
282 def InvalidateAllKeys(self, cnxn, keys):
283 """Drop the given keys from memcache and invalidate all keys in RAM.
284
285 Useful for avoiding inserting many rows into the Invalidate table when
286 invalidating a large group of keys all at once. Only use when necessary.
287 """
288 self.cache.InvalidateAll(cnxn)
289 memcache.delete_multi(
290 [self._KeyToStr(key) for key in keys], key_prefix=self.memcache_prefix)
291
292 def GetAllAlreadyInRam(self, keys):
293 """Look only in RAM to return {key: values}, missed_keys."""
294 result_dict, missed_keys = self.cache.GetAll(keys)
295 return result_dict, missed_keys
296
297 def InvalidateAllRamEntries(self, cnxn):
298 """Drop all RAM cache entries. It will refill as needed from memcache."""
299 self.cache.InvalidateAll(cnxn)
300
301 def FetchItems(self, cnxn, keys, **kwargs):
302 """On RAM and memcache miss, hit the database."""
303 raise NotImplementedError()
OLDNEW
« no previous file with comments | « appengine/monorail/services/cachemanager_svc.py ('k') | appengine/monorail/services/client_config_svc.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698