Index: appengine_module/gae_ts_mon/memcache_metric_store.py |
diff --git a/appengine_module/gae_ts_mon/memcache_metric_store.py b/appengine_module/gae_ts_mon/memcache_metric_store.py |
index e36bafee5a36fb71e6a49615179081932d7d4f26..748b6f4a42c8dd6a2ef0da5b69c1f99cfe4f6a3f 100644 |
--- a/appengine_module/gae_ts_mon/memcache_metric_store.py |
+++ b/appengine_module/gae_ts_mon/memcache_metric_store.py |
@@ -6,7 +6,6 @@ import collections |
import copy |
import functools |
import logging |
-import operator |
import random |
import threading |
@@ -181,25 +180,56 @@ class MemcacheMetricStore(metric_store.MetricStore): |
yield (copy.copy(target), entities[metric_name].metric, start_time, |
fields_values) |
- def _apply_all(self, callables, arg): |
- ret = arg |
- for fn in callables: |
- ret = fn(ret) |
- return ret |
+ def _apply_all(self, modifications, entry): |
+ """Applies all the modifications, in order, to a memcache entry. |
- def _compare_and_set(self, metric_modify_fns, namespace): |
+ All modifications must be for the same metric.""" |
+ |
+ if not modifications: # pragma: no cover |
+ return entry |
+ |
+ # All modifications should be for the same metric. |
+ name = modifications[0].name |
+ |
+ if entry is None: |
+ entry = (self._start_time(name), {}) |
+ |
+ _, targets = entry |
+ |
+ target_key = self._target_key() |
+ values = targets.setdefault(target_key, {}) |
+ |
+ for mod in modifications: |
+ value = values.get(mod.fields, 0) |
+ |
+ if mod.mod_type == 'set': |
+ new_value, enforce_ge = mod.args |
+ if enforce_ge and new_value < value: |
+ raise errors.MonitoringDecreasingValueError(name, value, new_value) |
+ value = new_value |
+ elif mod.mod_type == 'incr': |
+ delta, modify_fn = mod.args |
+ if modify_fn is None: |
+ modify_fn = metric_store.default_modify_fn(name) |
+ value = modify_fn(value, delta) |
+ else: |
+ raise errors.UnknownModificationTypeError(mod.mod_type) |
+ |
+ values[mod.fields] = value |
+ |
+ return entry |
+ |
+ def _compare_and_set(self, modifications, namespace): |
client = self._client() |
# Metrics that we haven't updated yet. Metrics are removed from this dict |
# when they're successfully updated - if there are any left they will be |
# retried 10 times until everything has been updated. |
- # We might have more than one modify_fn for a metric if different field |
+ # We might have more than one modification for a metric if different field |
# values were updated. |
- metrics = {} |
remaining = collections.defaultdict(list) |
- for metric, modify_fns in metric_modify_fns: |
- metrics[metric.name] = metric |
- remaining[metric.name].append(modify_fns) |
+ for modification in modifications: |
+ remaining[modification.name].append(modification) |
failed_keys_count = 0 |
@@ -209,7 +239,7 @@ class MemcacheMetricStore(metric_store.MetricStore): |
for name in remaining: |
# Pick one of the shards to modify. |
- key = self._random_shard(metrics[name]) |
+ key = self._random_shard(self._state.metrics[name]) |
keys.append(key) |
key_map[key] = name |
@@ -253,77 +283,27 @@ class MemcacheMetricStore(metric_store.MetricStore): |
# Update the cas_failures metric with the number of failed keys, but don't |
# do so recursively. |
if (failed_keys_count and |
- any(metric.name != cas_failures.name |
- for metric, _ in metric_modify_fns)): |
+ any(modification.name != cas_failures.name |
+ for modification in modifications)): |
cas_failures.increment_by(failed_keys_count) |
- def _create_modify_metric_fn(self, name, fields, modify_value_fn, delta): |
- """Returns a function that modifies a memcache row value. |
- |
- Calls modify_value_fn(old_value, delta) and puts the result back in the |
- memcache row. |
- """ |
- |
- def modify_fn(entry): |
- if entry is None: |
- entry = (self._start_time(name), {}) |
- |
- _, targets = entry |
- |
- target_key = self._target_key() |
- if target_key not in targets: |
- targets[target_key] = {} |
- values = targets[target_key] |
- |
- values[fields] = modify_value_fn(values.get(fields, 0), delta) |
- |
- return entry |
- return modify_fn |
- |
def _compare_and_set_metrics(self, modifications): |
- if any(name in self.METRIC_NAMES_EXCLUDED_FROM_INDEX |
- for name, fields, modify_value_fn, delta in modifications): |
+ if any(mod.name in self.METRIC_NAMES_EXCLUDED_FROM_INDEX |
+ for mod in modifications): |
raise errors.MonitoringError('Metric is magical, can\'t set it') |
- metric_modify_fns = [ |
- (self._state.metrics[name], |
- self._create_modify_metric_fn(name, fields, modify_value_fn, delta)) |
- for name, fields, modify_value_fn, delta in modifications] |
- self._compare_and_set(metric_modify_fns, self._namespace_for_job()) |
- |
- def _create_set_value_fn(self, name, value, enforce_ge): |
- def modify_fn(old_value, _delta): |
- if enforce_ge and old_value is not None and value < old_value: |
- raise errors.MonitoringDecreasingValueError(name, old_value, value) |
- return value |
- return modify_fn |
+ self._compare_and_set(modifications, self._namespace_for_job()) |
def set(self, name, fields, value, enforce_ge=False): |
- modify_fn = self._create_set_value_fn(name, value, enforce_ge) |
- self._compare_and_set_metrics([(name, fields, modify_fn, None)]) |
+ self._compare_and_set_metrics([metric_store.Modification( |
+ name, fields, 'set', (value, enforce_ge))]) |
- def incr(self, name, fields, delta, modify_fn=operator.add): |
- if delta < 0: |
- raise errors.MonitoringDecreasingValueError(name, None, delta) |
- self._compare_and_set_metrics([(name, fields, modify_fn, delta)]) |
+ def incr(self, name, fields, delta, modify_fn=None): |
+ self._compare_and_set_metrics([metric_store.Modification( |
+ name, fields, 'incr', (delta, modify_fn))]) |
def modify_multi(self, modifications): |
- mods = [] |
- for mod in modifications: |
- if mod.mod_type == 'set': |
- value, enforce_ge = mod.args |
- modify_fn = self._create_set_value_fn(mod.name, value, enforce_ge) |
- delta = None |
- elif mod.mod_type == 'incr': |
- delta, modify_fn = mod.args |
- if delta < 0: |
- raise errors.MonitoringDecreasingValueError(mod.name, None, delta) |
- else: |
- raise errors.UnknownModificationTypeError(mod.mod_type) |
- |
- mods.append((mod.name, mod.fields, modify_fn, delta)) |
- |
- self._compare_and_set_metrics(mods) |
+ self._compare_and_set_metrics(modifications) |
def reset_for_unittest(self, name=None): |
if name is None: |