Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(321)

Side by Side Diff: appengine_module/gae_ts_mon/memcache_metric_store.py

Issue 1531573003: Handle multiple modifications to distribution metrics correctly. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Add a missing test for coverage Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2015 The Chromium Authors. All rights reserved. 1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 import collections 5 import collections
6 import copy 6 import copy
7 import functools 7 import functools
8 import logging 8 import logging
9 import operator
10 import random 9 import random
11 import threading 10 import threading
12 11
13 from google.appengine.api import memcache 12 from google.appengine.api import memcache
14 from google.appengine.api import modules 13 from google.appengine.api import modules
15 from google.appengine.ext import ndb 14 from google.appengine.ext import ndb
16 15
17 from infra_libs.ts_mon.common import errors 16 from infra_libs.ts_mon.common import errors
18 from infra_libs.ts_mon.common import metric_store 17 from infra_libs.ts_mon.common import metric_store
19 from infra_libs.ts_mon.common import metrics 18 from infra_libs.ts_mon.common import metrics
(...skipping 154 matching lines...) Expand 10 before | Expand all | Expand 10 after
174 if key in shard_map: 173 if key in shard_map:
175 # This row is one shard of a sharded metric - put the shard number 174 # This row is one shard of a sharded metric - put the shard number
176 # in the task number. 175 # in the task number.
177 target.task_num, metric_name = shard_map[key] 176 target.task_num, metric_name = shard_map[key]
178 else: 177 else:
179 target.task_num, metric_name = 0, key 178 target.task_num, metric_name = 0, key
180 179
181 yield (copy.copy(target), entities[metric_name].metric, start_time, 180 yield (copy.copy(target), entities[metric_name].metric, start_time,
182 fields_values) 181 fields_values)
183 182
184 def _apply_all(self, callables, arg): 183 def _apply_all(self, modifications, entry):
185 ret = arg 184 """Applies all the modifications, in order, to a memcache entry.
186 for fn in callables:
187 ret = fn(ret)
188 return ret
189 185
190 def _compare_and_set(self, metric_modify_fns, namespace): 186 All modifications must be for the same metric."""
187
188 if not modifications: # pragma: no cover
189 return entry
190
191 # All modifications should be for the same metric.
192 name = modifications[0].name
193
194 if entry is None:
195 entry = (self._start_time(name), {})
196
197 _, targets = entry
198
199 target_key = self._target_key()
200 values = targets.setdefault(target_key, {})
201
202 for mod in modifications:
203 value = values.get(mod.fields, 0)
204
205 if mod.mod_type == 'set':
206 new_value, enforce_ge = mod.args
207 if enforce_ge and new_value < value:
208 raise errors.MonitoringDecreasingValueError(name, value, new_value)
209 value = new_value
210 elif mod.mod_type == 'incr':
211 delta, modify_fn = mod.args
212 if modify_fn is None:
213 modify_fn = metric_store.default_modify_fn(name)
214 value = modify_fn(value, delta)
215 else:
216 raise errors.UnknownModificationTypeError(mod.mod_type)
217
218 values[mod.fields] = value
219
220 return entry
221
222 def _compare_and_set(self, modifications, namespace):
191 client = self._client() 223 client = self._client()
192 224
193 # Metrics that we haven't updated yet. Metrics are removed from this dict 225 # Metrics that we haven't updated yet. Metrics are removed from this dict
194 # when they're successfully updated - if there are any left they will be 226 # when they're successfully updated - if there are any left they will be
195 # retried 10 times until everything has been updated. 227 # retried 10 times until everything has been updated.
196 # We might have more than one modify_fn for a metric if different field 228 # We might have more than one modification for a metric if different field
197 # values were updated. 229 # values were updated.
198 metrics = {}
199 remaining = collections.defaultdict(list) 230 remaining = collections.defaultdict(list)
200 for metric, modify_fns in metric_modify_fns: 231 for modification in modifications:
201 metrics[metric.name] = metric 232 remaining[modification.name].append(modification)
202 remaining[metric.name].append(modify_fns)
203 233
204 failed_keys_count = 0 234 failed_keys_count = 0
205 235
206 for _ in xrange(self.CAS_RETRIES): 236 for _ in xrange(self.CAS_RETRIES):
207 keys = [] 237 keys = []
208 key_map = {} # key -> metric name (for sharded metrics) 238 key_map = {} # key -> metric name (for sharded metrics)
209 239
210 for name in remaining: 240 for name in remaining:
211 # Pick one of the shards to modify. 241 # Pick one of the shards to modify.
212 key = self._random_shard(metrics[name]) 242 key = self._random_shard(self._state.metrics[name])
213 keys.append(key) 243 keys.append(key)
214 key_map[key] = name 244 key_map[key] = name
215 245
216 # Get all existing entries. 246 # Get all existing entries.
217 cas_mapping = {} # key -> new value (for existing entries) 247 cas_mapping = {} # key -> new value (for existing entries)
218 add_mapping = {} # key -> new value (for new entries) 248 add_mapping = {} # key -> new value (for new entries)
219 249
220 entries = client.get_multi(keys, for_cas=True, namespace=namespace) 250 entries = client.get_multi(keys, for_cas=True, namespace=namespace)
221 for key, entry in entries.iteritems(): 251 for key, entry in entries.iteritems():
222 cas_mapping[key] = self._apply_all(remaining[key_map[key]], entry) 252 cas_mapping[key] = self._apply_all(remaining[key_map[key]], entry)
(...skipping 23 matching lines...) Expand all
246 remaining = still_remaining 276 remaining = still_remaining
247 else: 277 else:
248 logging.warning( 278 logging.warning(
249 'Memcache compare-and-set failed %d times for keys %s in ' 279 'Memcache compare-and-set failed %d times for keys %s in '
250 'namespace %s', 280 'namespace %s',
251 self.CAS_RETRIES, remaining.keys(), namespace) 281 self.CAS_RETRIES, remaining.keys(), namespace)
252 282
253 # Update the cas_failures metric with the number of failed keys, but don't 283 # Update the cas_failures metric with the number of failed keys, but don't
254 # do so recursively. 284 # do so recursively.
255 if (failed_keys_count and 285 if (failed_keys_count and
256 any(metric.name != cas_failures.name 286 any(modification.name != cas_failures.name
257 for metric, _ in metric_modify_fns)): 287 for modification in modifications)):
258 cas_failures.increment_by(failed_keys_count) 288 cas_failures.increment_by(failed_keys_count)
259 289
260 def _create_modify_metric_fn(self, name, fields, modify_value_fn, delta):
261 """Returns a function that modifies a memcache row value.
262
263 Calls modify_value_fn(old_value, delta) and puts the result back in the
264 memcache row.
265 """
266
267 def modify_fn(entry):
268 if entry is None:
269 entry = (self._start_time(name), {})
270
271 _, targets = entry
272
273 target_key = self._target_key()
274 if target_key not in targets:
275 targets[target_key] = {}
276 values = targets[target_key]
277
278 values[fields] = modify_value_fn(values.get(fields, 0), delta)
279
280 return entry
281 return modify_fn
282
283 def _compare_and_set_metrics(self, modifications): 290 def _compare_and_set_metrics(self, modifications):
284 if any(name in self.METRIC_NAMES_EXCLUDED_FROM_INDEX 291 if any(mod.name in self.METRIC_NAMES_EXCLUDED_FROM_INDEX
285 for name, fields, modify_value_fn, delta in modifications): 292 for mod in modifications):
286 raise errors.MonitoringError('Metric is magical, can\'t set it') 293 raise errors.MonitoringError('Metric is magical, can\'t set it')
287 294
288 metric_modify_fns = [ 295 self._compare_and_set(modifications, self._namespace_for_job())
289 (self._state.metrics[name],
290 self._create_modify_metric_fn(name, fields, modify_value_fn, delta))
291 for name, fields, modify_value_fn, delta in modifications]
292 self._compare_and_set(metric_modify_fns, self._namespace_for_job())
293
294 def _create_set_value_fn(self, name, value, enforce_ge):
295 def modify_fn(old_value, _delta):
296 if enforce_ge and old_value is not None and value < old_value:
297 raise errors.MonitoringDecreasingValueError(name, old_value, value)
298 return value
299 return modify_fn
300 296
301 def set(self, name, fields, value, enforce_ge=False): 297 def set(self, name, fields, value, enforce_ge=False):
302 modify_fn = self._create_set_value_fn(name, value, enforce_ge) 298 self._compare_and_set_metrics([metric_store.Modification(
303 self._compare_and_set_metrics([(name, fields, modify_fn, None)]) 299 name, fields, 'set', (value, enforce_ge))])
304 300
305 def incr(self, name, fields, delta, modify_fn=operator.add): 301 def incr(self, name, fields, delta, modify_fn=None):
306 if delta < 0: 302 self._compare_and_set_metrics([metric_store.Modification(
307 raise errors.MonitoringDecreasingValueError(name, None, delta) 303 name, fields, 'incr', (delta, modify_fn))])
308 self._compare_and_set_metrics([(name, fields, modify_fn, delta)])
309 304
310 def modify_multi(self, modifications): 305 def modify_multi(self, modifications):
311 mods = [] 306 self._compare_and_set_metrics(modifications)
312 for mod in modifications:
313 if mod.mod_type == 'set':
314 value, enforce_ge = mod.args
315 modify_fn = self._create_set_value_fn(mod.name, value, enforce_ge)
316 delta = None
317 elif mod.mod_type == 'incr':
318 delta, modify_fn = mod.args
319 if delta < 0:
320 raise errors.MonitoringDecreasingValueError(mod.name, None, delta)
321 else:
322 raise errors.UnknownModificationTypeError(mod.mod_type)
323
324 mods.append((mod.name, mod.fields, modify_fn, delta))
325
326 self._compare_and_set_metrics(mods)
327 307
328 def reset_for_unittest(self, name=None): 308 def reset_for_unittest(self, name=None):
329 if name is None: 309 if name is None:
330 self._client().delete_multi(self._state.metrics.keys(), 310 self._client().delete_multi(self._state.metrics.keys(),
331 namespace=self._namespace_for_job()) 311 namespace=self._namespace_for_job())
332 else: 312 else:
333 self._client().delete(name, namespace=self._namespace_for_job()) 313 self._client().delete(name, namespace=self._namespace_for_job())
OLDNEW
« no previous file with comments | « appengine_module/gae_ts_mon/deferred_metric_store.py ('k') | appengine_module/gae_ts_mon/test/deferred_metric_store_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698