Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(384)

Unified Diff: appengine_module/gae_ts_mon/memcache_metric_store.py

Issue 1531573003: Handle multiple modifications to distribution metrics correctly. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Add a missing test for coverage Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine_module/gae_ts_mon/memcache_metric_store.py
diff --git a/appengine_module/gae_ts_mon/memcache_metric_store.py b/appengine_module/gae_ts_mon/memcache_metric_store.py
index e36bafee5a36fb71e6a49615179081932d7d4f26..748b6f4a42c8dd6a2ef0da5b69c1f99cfe4f6a3f 100644
--- a/appengine_module/gae_ts_mon/memcache_metric_store.py
+++ b/appengine_module/gae_ts_mon/memcache_metric_store.py
@@ -6,7 +6,6 @@ import collections
import copy
import functools
import logging
-import operator
import random
import threading
@@ -181,25 +180,56 @@ class MemcacheMetricStore(metric_store.MetricStore):
yield (copy.copy(target), entities[metric_name].metric, start_time,
fields_values)
- def _apply_all(self, callables, arg):
- ret = arg
- for fn in callables:
- ret = fn(ret)
- return ret
+ def _apply_all(self, modifications, entry):
+ """Applies all the modifications, in order, to a memcache entry.
- def _compare_and_set(self, metric_modify_fns, namespace):
+ All modifications must be for the same metric."""
+
+ if not modifications: # pragma: no cover
+ return entry
+
+ # All modifications should be for the same metric.
+ name = modifications[0].name
+
+ if entry is None:
+ entry = (self._start_time(name), {})
+
+ _, targets = entry
+
+ target_key = self._target_key()
+ values = targets.setdefault(target_key, {})
+
+ for mod in modifications:
+ value = values.get(mod.fields, 0)
+
+ if mod.mod_type == 'set':
+ new_value, enforce_ge = mod.args
+ if enforce_ge and new_value < value:
+ raise errors.MonitoringDecreasingValueError(name, value, new_value)
+ value = new_value
+ elif mod.mod_type == 'incr':
+ delta, modify_fn = mod.args
+ if modify_fn is None:
+ modify_fn = metric_store.default_modify_fn(name)
+ value = modify_fn(value, delta)
+ else:
+ raise errors.UnknownModificationTypeError(mod.mod_type)
+
+ values[mod.fields] = value
+
+ return entry
+
+ def _compare_and_set(self, modifications, namespace):
client = self._client()
# Metrics that we haven't updated yet. Metrics are removed from this dict
# when they're successfully updated - if there are any left they will be
# retried 10 times until everything has been updated.
- # We might have more than one modify_fn for a metric if different field
+ # We might have more than one modification for a metric if different field
# values were updated.
- metrics = {}
remaining = collections.defaultdict(list)
- for metric, modify_fns in metric_modify_fns:
- metrics[metric.name] = metric
- remaining[metric.name].append(modify_fns)
+ for modification in modifications:
+ remaining[modification.name].append(modification)
failed_keys_count = 0
@@ -209,7 +239,7 @@ class MemcacheMetricStore(metric_store.MetricStore):
for name in remaining:
# Pick one of the shards to modify.
- key = self._random_shard(metrics[name])
+ key = self._random_shard(self._state.metrics[name])
keys.append(key)
key_map[key] = name
@@ -253,77 +283,27 @@ class MemcacheMetricStore(metric_store.MetricStore):
# Update the cas_failures metric with the number of failed keys, but don't
# do so recursively.
if (failed_keys_count and
- any(metric.name != cas_failures.name
- for metric, _ in metric_modify_fns)):
+ any(modification.name != cas_failures.name
+ for modification in modifications)):
cas_failures.increment_by(failed_keys_count)
- def _create_modify_metric_fn(self, name, fields, modify_value_fn, delta):
- """Returns a function that modifies a memcache row value.
-
- Calls modify_value_fn(old_value, delta) and puts the result back in the
- memcache row.
- """
-
- def modify_fn(entry):
- if entry is None:
- entry = (self._start_time(name), {})
-
- _, targets = entry
-
- target_key = self._target_key()
- if target_key not in targets:
- targets[target_key] = {}
- values = targets[target_key]
-
- values[fields] = modify_value_fn(values.get(fields, 0), delta)
-
- return entry
- return modify_fn
-
def _compare_and_set_metrics(self, modifications):
- if any(name in self.METRIC_NAMES_EXCLUDED_FROM_INDEX
- for name, fields, modify_value_fn, delta in modifications):
+ if any(mod.name in self.METRIC_NAMES_EXCLUDED_FROM_INDEX
+ for mod in modifications):
raise errors.MonitoringError('Metric is magical, can\'t set it')
- metric_modify_fns = [
- (self._state.metrics[name],
- self._create_modify_metric_fn(name, fields, modify_value_fn, delta))
- for name, fields, modify_value_fn, delta in modifications]
- self._compare_and_set(metric_modify_fns, self._namespace_for_job())
-
- def _create_set_value_fn(self, name, value, enforce_ge):
- def modify_fn(old_value, _delta):
- if enforce_ge and old_value is not None and value < old_value:
- raise errors.MonitoringDecreasingValueError(name, old_value, value)
- return value
- return modify_fn
+ self._compare_and_set(modifications, self._namespace_for_job())
def set(self, name, fields, value, enforce_ge=False):
- modify_fn = self._create_set_value_fn(name, value, enforce_ge)
- self._compare_and_set_metrics([(name, fields, modify_fn, None)])
+ self._compare_and_set_metrics([metric_store.Modification(
+ name, fields, 'set', (value, enforce_ge))])
- def incr(self, name, fields, delta, modify_fn=operator.add):
- if delta < 0:
- raise errors.MonitoringDecreasingValueError(name, None, delta)
- self._compare_and_set_metrics([(name, fields, modify_fn, delta)])
+ def incr(self, name, fields, delta, modify_fn=None):
+ self._compare_and_set_metrics([metric_store.Modification(
+ name, fields, 'incr', (delta, modify_fn))])
def modify_multi(self, modifications):
- mods = []
- for mod in modifications:
- if mod.mod_type == 'set':
- value, enforce_ge = mod.args
- modify_fn = self._create_set_value_fn(mod.name, value, enforce_ge)
- delta = None
- elif mod.mod_type == 'incr':
- delta, modify_fn = mod.args
- if delta < 0:
- raise errors.MonitoringDecreasingValueError(mod.name, None, delta)
- else:
- raise errors.UnknownModificationTypeError(mod.mod_type)
-
- mods.append((mod.name, mod.fields, modify_fn, delta))
-
- self._compare_and_set_metrics(mods)
+ self._compare_and_set_metrics(modifications)
def reset_for_unittest(self, name=None):
if name is None:
« no previous file with comments | « appengine_module/gae_ts_mon/deferred_metric_store.py ('k') | appengine_module/gae_ts_mon/test/deferred_metric_store_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698