OLD | NEW |
(Empty) | |
| 1 # Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is govered by a BSD-style |
| 3 # license that can be found in the LICENSE file or at |
| 4 # https://developers.google.com/open-source/licenses/bsd |
| 5 |
| 6 """Classes to manage cached values. |
| 7 |
| 8 Monorail makes full use of the RAM of GAE frontends to reduce latency |
| 9 and load on the database. |
| 10 """ |
| 11 |
| 12 import logging |
| 13 |
| 14 from protorpc import protobuf |
| 15 |
| 16 from google.appengine.api import memcache |
| 17 |
| 18 from framework import framework_constants |
| 19 |
| 20 |
| 21 INVALIDATE_KIND_VALUES = ['user', 'project', 'issue', 'issue_id'] |
| 22 DEFAULT_MAX_SIZE = 10000 |
| 23 |
| 24 |
| 25 class RamCache(object): |
| 26 """An in-RAM cache with distributed invalidation.""" |
| 27 |
| 28 def __init__(self, cache_manager, kind, max_size=None): |
| 29 assert kind in INVALIDATE_KIND_VALUES |
| 30 self.cache_manager = cache_manager |
| 31 self.kind = kind |
| 32 self.cache = {} |
| 33 self.max_size = max_size or DEFAULT_MAX_SIZE |
| 34 |
| 35 def CacheItem(self, key, item): |
| 36 """Store item at key in this cache, discarding a random item if needed.""" |
| 37 if len(self.cache) >= self.max_size: |
| 38 self.cache.popitem() |
| 39 |
| 40 self.cache[key] = item |
| 41 |
| 42 def CacheAll(self, new_item_dict): |
| 43 """Cache all items in the given dict, dropping old items if needed.""" |
| 44 if len(new_item_dict) >= self.max_size: |
| 45 logging.warn('Dumping the entire cache! %s', self.kind) |
| 46 self.cache = {} |
| 47 else: |
| 48 while len(self.cache) + len(new_item_dict) > self.max_size: |
| 49 self.cache.popitem() |
| 50 |
| 51 self.cache.update(new_item_dict) |
| 52 |
| 53 def GetItem(self, key): |
| 54 """Return the cached item if present, otherwise None.""" |
| 55 return self.cache.get(key) |
| 56 |
| 57 def HasItem(self, key): |
| 58 """Return True if there is a value cached at the given key.""" |
| 59 return key in self.cache |
| 60 |
| 61 def GetAll(self, keys): |
| 62 """Look up the given keys. |
| 63 |
| 64 Args: |
| 65 keys: a list of cache keys to look up. |
| 66 |
| 67 Returns: |
| 68 A pair: (hits_dict, misses_list) where hits_dict is a dictionary of |
| 69 all the given keys and the values that were found in the cache, and |
| 70 misses_list is a list of given keys that were not in the cache. |
| 71 """ |
| 72 hits, misses = {}, [] |
| 73 for key in keys: |
| 74 try: |
| 75 hits[key] = self.cache[key] |
| 76 except KeyError: |
| 77 misses.append(key) |
| 78 |
| 79 return hits, misses |
| 80 |
| 81 def LocalInvalidate(self, key): |
| 82 """Drop the given key from this cache, without distributed notification.""" |
| 83 # logging.info('Locally invalidating %r in kind=%r', key, self.kind) |
| 84 self.cache.pop(key, None) |
| 85 |
| 86 def Invalidate(self, cnxn, key): |
| 87 """Drop key locally, and append it to the Invalidate DB table.""" |
| 88 self.InvalidateKeys(cnxn, [key]) |
| 89 |
| 90 def InvalidateKeys(self, cnxn, keys): |
| 91 """Drop keys locally, and append them to the Invalidate DB table.""" |
| 92 for key in keys: |
| 93 self.LocalInvalidate(key) |
| 94 if self.cache_manager: |
| 95 self.cache_manager.StoreInvalidateRows(cnxn, self.kind, keys) |
| 96 |
| 97 def LocalInvalidateAll(self): |
| 98 """Invalidate all keys locally: just start over with an empty dict.""" |
| 99 logging.info('Locally invalidating all in kind=%r', self.kind) |
| 100 self.cache = {} |
| 101 |
| 102 def InvalidateAll(self, cnxn): |
| 103 """Invalidate all keys in this cache.""" |
| 104 self.LocalInvalidateAll() |
| 105 if self.cache_manager: |
| 106 self.cache_manager.StoreInvalidateAll(cnxn, self.kind) |
| 107 |
| 108 |
| 109 class ValueCentricRamCache(RamCache): |
| 110 """Specialized version of RamCache that stores values in InvalidateTable. |
| 111 |
| 112 This is useful for caches that have non integer keys. |
| 113 """ |
| 114 |
| 115 def LocalInvalidate(self, value): |
| 116 """Use the specified value to drop entries from the local cache.""" |
| 117 keys_to_drop = [] |
| 118 # Loop through and collect all keys with the specified value. |
| 119 for k, v in self.cache.iteritems(): |
| 120 if v == value: |
| 121 keys_to_drop.append(k) |
| 122 for k in keys_to_drop: |
| 123 self.cache.pop(k, None) |
| 124 |
| 125 def InvalidateKeys(self, cnxn, keys): |
| 126 """Drop keys locally, and append their values to the Invalidate DB table.""" |
| 127 # Find values to invalidate. |
| 128 values = [self.cache[key] for key in keys if self.cache.has_key(key)] |
| 129 if len(values) == len(keys): |
| 130 for value in values: |
| 131 self.LocalInvalidate(value) |
| 132 if self.cache_manager: |
| 133 self.cache_manager.StoreInvalidateRows(cnxn, self.kind, values) |
| 134 else: |
| 135 # If a value is not found in the cache then invalidate the whole cache. |
| 136 # This is done to ensure that we are not in an inconsistent state or in a |
| 137 # race condition. |
| 138 self.InvalidateAll(cnxn) |
| 139 |
| 140 |
| 141 class AbstractTwoLevelCache(object): |
| 142 """A class to manage both RAM and memcache to retrieve objects. |
| 143 |
| 144 Subclasses must implement the FetchItems() method to get objects from |
| 145 the database when both caches miss. |
| 146 """ |
| 147 |
| 148 # When loading a huge number of issues from the database, do it in chunks |
| 149 # so as to avoid timeouts. |
| 150 _FETCH_BATCH_SIZE = 10000 |
| 151 |
| 152 def __init__( |
| 153 self, cache_manager, kind, memcache_prefix, pb_class, max_size=None, |
| 154 use_value_centric_cache=False): |
| 155 self.cache = cache_manager.MakeCache( |
| 156 kind, max_size=max_size, |
| 157 use_value_centric_cache=use_value_centric_cache) |
| 158 self.memcache_prefix = memcache_prefix |
| 159 self.pb_class = pb_class |
| 160 |
| 161 def CacheItem(self, key, value): |
| 162 """Add the given key-value pair to RAM and memcache.""" |
| 163 self.cache.CacheItem(key, value) |
| 164 self._WriteToMemcache({key: value}) |
| 165 |
| 166 def HasItem(self, key): |
| 167 """Return True if the given key is in the RAM cache.""" |
| 168 return self.cache.HasItem(key) |
| 169 |
| 170 def GetAnyOnHandItem(self, keys, start=None, end=None): |
| 171 """Try to find one of the specified items in RAM.""" |
| 172 if start is None: |
| 173 start = 0 |
| 174 if end is None: |
| 175 end = len(keys) |
| 176 for i in xrange(start, end): |
| 177 key = keys[i] |
| 178 if self.cache.HasItem(key): |
| 179 return self.cache.GetItem(key) |
| 180 |
| 181 # Note: We could check memcache here too, but the round-trips to memcache |
| 182 # are kind of slow. And, getting too many hits from memcache actually |
| 183 # fills our RAM cache too quickly and could lead to thrashing. |
| 184 |
| 185 return None |
| 186 |
| 187 def GetAll(self, cnxn, keys, use_cache=True, **kwargs): |
| 188 """Get values for the given keys from RAM, memcache, or the DB. |
| 189 |
| 190 Args: |
| 191 cnxn: connection to the database. |
| 192 keys: list of integer keys to look up. |
| 193 use_cache: set to False to always hit the database. |
| 194 **kwargs: any additional keywords are passed to FetchItems(). |
| 195 |
| 196 Returns: |
| 197 A pair: hits, misses. Where hits is {key: value} and misses is |
| 198 a list of any keys that were not found anywhere. |
| 199 """ |
| 200 if use_cache: |
| 201 result_dict, missed_keys = self.cache.GetAll(keys) |
| 202 else: |
| 203 result_dict, missed_keys = {}, list(keys) |
| 204 |
| 205 if missed_keys and use_cache: |
| 206 memcache_hits, missed_keys = self._ReadFromMemcache(missed_keys) |
| 207 result_dict.update(memcache_hits) |
| 208 |
| 209 while missed_keys: |
| 210 missed_batch = missed_keys[:self._FETCH_BATCH_SIZE] |
| 211 missed_keys = missed_keys[self._FETCH_BATCH_SIZE:] |
| 212 retrieved_dict = self.FetchItems(cnxn, missed_batch, **kwargs) |
| 213 result_dict.update(retrieved_dict) |
| 214 if use_cache: |
| 215 self.cache.CacheAll(retrieved_dict) |
| 216 self._WriteToMemcache(retrieved_dict) |
| 217 |
| 218 still_missing_keys = [key for key in keys if key not in result_dict] |
| 219 return result_dict, still_missing_keys |
| 220 |
| 221 def _ReadFromMemcache(self, keys): |
| 222 """Read the given keys from memcache, return {key: value}, missing_keys.""" |
| 223 memcache_hits = {} |
| 224 cached_dict = memcache.get_multi( |
| 225 [self._KeyToStr(key) for key in keys], key_prefix=self.memcache_prefix) |
| 226 |
| 227 for key_str, serialized_value in cached_dict.iteritems(): |
| 228 value = self._StrToValue(serialized_value) |
| 229 key = self._StrToKey(key_str) |
| 230 memcache_hits[key] = value |
| 231 self.cache.CacheItem(key, value) |
| 232 |
| 233 still_missing_keys = [key for key in keys if key not in memcache_hits] |
| 234 logging.info( |
| 235 'decoded %d values from memcache %s, missing %d', |
| 236 len(memcache_hits), self.memcache_prefix, len(still_missing_keys)) |
| 237 return memcache_hits, still_missing_keys |
| 238 |
| 239 def _WriteToMemcache(self, retrieved_dict): |
| 240 """Write entries for each key-value pair to memcache. Encode PBs.""" |
| 241 strs_to_cache = { |
| 242 self._KeyToStr(key): self._ValueToStr(value) |
| 243 for key, value in retrieved_dict.iteritems()} |
| 244 memcache.set_multi( |
| 245 strs_to_cache, key_prefix=self.memcache_prefix, |
| 246 time=framework_constants.MEMCACHE_EXPIRATION) |
| 247 logging.info('cached batch of %d values in memcache %s', |
| 248 len(retrieved_dict), self.memcache_prefix) |
| 249 |
| 250 def _KeyToStr(self, key): |
| 251 """Convert our int IDs to strings for use as memcache keys.""" |
| 252 return str(key) |
| 253 |
| 254 def _StrToKey(self, key_str): |
| 255 """Convert memcache keys back to the ints that we use as IDs.""" |
| 256 return int(key_str) |
| 257 |
| 258 def _ValueToStr(self, value): |
| 259 """Serialize an application object so that it can be stored in memcache.""" |
| 260 if not self.pb_class: |
| 261 return value |
| 262 elif self.pb_class == int: |
| 263 return str(value) |
| 264 else: |
| 265 return protobuf.encode_message(value) |
| 266 |
| 267 def _StrToValue(self, serialized_value): |
| 268 """Deserialize an application object that was stored in memcache.""" |
| 269 if not self.pb_class: |
| 270 return serialized_value |
| 271 elif self.pb_class == int: |
| 272 return int(serialized_value) |
| 273 else: |
| 274 return protobuf.decode_message(self.pb_class, serialized_value) |
| 275 |
| 276 def InvalidateKeys(self, cnxn, keys): |
| 277 """Drop the given keys from both RAM and memcache.""" |
| 278 self.cache.InvalidateKeys(cnxn, keys) |
| 279 memcache.delete_multi( |
| 280 [self._KeyToStr(key) for key in keys], key_prefix=self.memcache_prefix) |
| 281 |
| 282 def InvalidateAllKeys(self, cnxn, keys): |
| 283 """Drop the given keys from memcache and invalidate all keys in RAM. |
| 284 |
| 285 Useful for avoiding inserting many rows into the Invalidate table when |
| 286 invalidating a large group of keys all at once. Only use when necessary. |
| 287 """ |
| 288 self.cache.InvalidateAll(cnxn) |
| 289 memcache.delete_multi( |
| 290 [self._KeyToStr(key) for key in keys], key_prefix=self.memcache_prefix) |
| 291 |
| 292 def GetAllAlreadyInRam(self, keys): |
| 293 """Look only in RAM to return {key: values}, missed_keys.""" |
| 294 result_dict, missed_keys = self.cache.GetAll(keys) |
| 295 return result_dict, missed_keys |
| 296 |
| 297 def InvalidateAllRamEntries(self, cnxn): |
| 298 """Drop all RAM cache entries. It will refill as needed from memcache.""" |
| 299 self.cache.InvalidateAll(cnxn) |
| 300 |
| 301 def FetchItems(self, cnxn, keys, **kwargs): |
| 302 """On RAM and memcache miss, hit the database.""" |
| 303 raise NotImplementedError() |
OLD | NEW |