| Index: appengine_module/gae_ts_mon/memcache_metric_store.py
|
| diff --git a/appengine_module/gae_ts_mon/memcache_metric_store.py b/appengine_module/gae_ts_mon/memcache_metric_store.py
|
| index e36bafee5a36fb71e6a49615179081932d7d4f26..748b6f4a42c8dd6a2ef0da5b69c1f99cfe4f6a3f 100644
|
| --- a/appengine_module/gae_ts_mon/memcache_metric_store.py
|
| +++ b/appengine_module/gae_ts_mon/memcache_metric_store.py
|
| @@ -6,7 +6,6 @@ import collections
|
| import copy
|
| import functools
|
| import logging
|
| -import operator
|
| import random
|
| import threading
|
|
|
| @@ -181,25 +180,56 @@ class MemcacheMetricStore(metric_store.MetricStore):
|
| yield (copy.copy(target), entities[metric_name].metric, start_time,
|
| fields_values)
|
|
|
| - def _apply_all(self, callables, arg):
|
| - ret = arg
|
| - for fn in callables:
|
| - ret = fn(ret)
|
| - return ret
|
| + def _apply_all(self, modifications, entry):
|
| + """Applies all the modifications, in order, to a memcache entry.
|
|
|
| - def _compare_and_set(self, metric_modify_fns, namespace):
|
| + All modifications must be for the same metric."""
|
| +
|
| + if not modifications: # pragma: no cover
|
| + return entry
|
| +
|
| + # All modifications should be for the same metric.
|
| + name = modifications[0].name
|
| +
|
| + if entry is None:
|
| + entry = (self._start_time(name), {})
|
| +
|
| + _, targets = entry
|
| +
|
| + target_key = self._target_key()
|
| + values = targets.setdefault(target_key, {})
|
| +
|
| + for mod in modifications:
|
| + value = values.get(mod.fields, 0)
|
| +
|
| + if mod.mod_type == 'set':
|
| + new_value, enforce_ge = mod.args
|
| + if enforce_ge and new_value < value:
|
| + raise errors.MonitoringDecreasingValueError(name, value, new_value)
|
| + value = new_value
|
| + elif mod.mod_type == 'incr':
|
| + delta, modify_fn = mod.args
|
| + if modify_fn is None:
|
| + modify_fn = metric_store.default_modify_fn(name)
|
| + value = modify_fn(value, delta)
|
| + else:
|
| + raise errors.UnknownModificationTypeError(mod.mod_type)
|
| +
|
| + values[mod.fields] = value
|
| +
|
| + return entry
|
| +
|
| + def _compare_and_set(self, modifications, namespace):
|
| client = self._client()
|
|
|
| # Metrics that we haven't updated yet. Metrics are removed from this dict
|
| # when they're successfully updated - if there are any left they will be
|
| # retried 10 times until everything has been updated.
|
| - # We might have more than one modify_fn for a metric if different field
|
| + # We might have more than one modification for a metric if different field
|
| # values were updated.
|
| - metrics = {}
|
| remaining = collections.defaultdict(list)
|
| - for metric, modify_fns in metric_modify_fns:
|
| - metrics[metric.name] = metric
|
| - remaining[metric.name].append(modify_fns)
|
| + for modification in modifications:
|
| + remaining[modification.name].append(modification)
|
|
|
| failed_keys_count = 0
|
|
|
| @@ -209,7 +239,7 @@ class MemcacheMetricStore(metric_store.MetricStore):
|
|
|
| for name in remaining:
|
| # Pick one of the shards to modify.
|
| - key = self._random_shard(metrics[name])
|
| + key = self._random_shard(self._state.metrics[name])
|
| keys.append(key)
|
| key_map[key] = name
|
|
|
| @@ -253,77 +283,27 @@ class MemcacheMetricStore(metric_store.MetricStore):
|
| # Update the cas_failures metric with the number of failed keys, but don't
|
| # do so recursively.
|
| if (failed_keys_count and
|
| - any(metric.name != cas_failures.name
|
| - for metric, _ in metric_modify_fns)):
|
| + any(modification.name != cas_failures.name
|
| + for modification in modifications)):
|
| cas_failures.increment_by(failed_keys_count)
|
|
|
| - def _create_modify_metric_fn(self, name, fields, modify_value_fn, delta):
|
| - """Returns a function that modifies a memcache row value.
|
| -
|
| - Calls modify_value_fn(old_value, delta) and puts the result back in the
|
| - memcache row.
|
| - """
|
| -
|
| - def modify_fn(entry):
|
| - if entry is None:
|
| - entry = (self._start_time(name), {})
|
| -
|
| - _, targets = entry
|
| -
|
| - target_key = self._target_key()
|
| - if target_key not in targets:
|
| - targets[target_key] = {}
|
| - values = targets[target_key]
|
| -
|
| - values[fields] = modify_value_fn(values.get(fields, 0), delta)
|
| -
|
| - return entry
|
| - return modify_fn
|
| -
|
| def _compare_and_set_metrics(self, modifications):
|
| - if any(name in self.METRIC_NAMES_EXCLUDED_FROM_INDEX
|
| - for name, fields, modify_value_fn, delta in modifications):
|
| + if any(mod.name in self.METRIC_NAMES_EXCLUDED_FROM_INDEX
|
| + for mod in modifications):
|
| raise errors.MonitoringError('Metric is magical, can\'t set it')
|
|
|
| - metric_modify_fns = [
|
| - (self._state.metrics[name],
|
| - self._create_modify_metric_fn(name, fields, modify_value_fn, delta))
|
| - for name, fields, modify_value_fn, delta in modifications]
|
| - self._compare_and_set(metric_modify_fns, self._namespace_for_job())
|
| -
|
| - def _create_set_value_fn(self, name, value, enforce_ge):
|
| - def modify_fn(old_value, _delta):
|
| - if enforce_ge and old_value is not None and value < old_value:
|
| - raise errors.MonitoringDecreasingValueError(name, old_value, value)
|
| - return value
|
| - return modify_fn
|
| + self._compare_and_set(modifications, self._namespace_for_job())
|
|
|
| def set(self, name, fields, value, enforce_ge=False):
|
| - modify_fn = self._create_set_value_fn(name, value, enforce_ge)
|
| - self._compare_and_set_metrics([(name, fields, modify_fn, None)])
|
| + self._compare_and_set_metrics([metric_store.Modification(
|
| + name, fields, 'set', (value, enforce_ge))])
|
|
|
| - def incr(self, name, fields, delta, modify_fn=operator.add):
|
| - if delta < 0:
|
| - raise errors.MonitoringDecreasingValueError(name, None, delta)
|
| - self._compare_and_set_metrics([(name, fields, modify_fn, delta)])
|
| + def incr(self, name, fields, delta, modify_fn=None):
|
| + self._compare_and_set_metrics([metric_store.Modification(
|
| + name, fields, 'incr', (delta, modify_fn))])
|
|
|
| def modify_multi(self, modifications):
|
| - mods = []
|
| - for mod in modifications:
|
| - if mod.mod_type == 'set':
|
| - value, enforce_ge = mod.args
|
| - modify_fn = self._create_set_value_fn(mod.name, value, enforce_ge)
|
| - delta = None
|
| - elif mod.mod_type == 'incr':
|
| - delta, modify_fn = mod.args
|
| - if delta < 0:
|
| - raise errors.MonitoringDecreasingValueError(mod.name, None, delta)
|
| - else:
|
| - raise errors.UnknownModificationTypeError(mod.mod_type)
|
| -
|
| - mods.append((mod.name, mod.fields, modify_fn, delta))
|
| -
|
| - self._compare_and_set_metrics(mods)
|
| + self._compare_and_set_metrics(modifications)
|
|
|
| def reset_for_unittest(self, name=None):
|
| if name is None:
|
|
|