Chromium Code Reviews| Index: appengine_module/gae_ts_mon/memcache_metric_store.py |
| diff --git a/appengine_module/gae_ts_mon/memcache_metric_store.py b/appengine_module/gae_ts_mon/memcache_metric_store.py |
| index e36bafee5a36fb71e6a49615179081932d7d4f26..27814e9cc7b2c46a1037001c01081d6ab99ae6da 100644 |
| --- a/appengine_module/gae_ts_mon/memcache_metric_store.py |
| +++ b/appengine_module/gae_ts_mon/memcache_metric_store.py |
| @@ -181,25 +181,57 @@ class MemcacheMetricStore(metric_store.MetricStore): |
| yield (copy.copy(target), entities[metric_name].metric, start_time, |
| fields_values) |
| - def _apply_all(self, callables, arg): |
| - ret = arg |
| - for fn in callables: |
| - ret = fn(ret) |
| - return ret |
| + def _apply_all(self, modifications, entry): |
| + """Applies all the modifications, in order, to a memcache entry. |
| - def _compare_and_set(self, metric_modify_fns, namespace): |
| + All modifications must be for the same metric.""" |
| + |
| + assert len(modifications) > 0 |
|
Sergey Berezin
2015/12/17 22:28:50
While useful for debugging, I'm generally afraid o
dsansome
2015/12/18 00:54:33
Done.
|
| + |
| + # All modifications should be for the same metric. |
| + name = modifications[0].name |
| + |
| + if entry is None: |
| + entry = (self._start_time(name), {}) |
| + |
| + _, targets = entry |
| + |
| + target_key = self._target_key() |
| + if target_key not in targets: |
| + targets[target_key] = {} |
| + values = targets[target_key] |
|
Sergey Berezin
2015/12/17 22:28:50
nit: values = targets.setdefault(target_key, {})
dsansome
2015/12/18 00:54:33
Done.
|
| + |
| + for mod in modifications: |
| + value = values.get(mod.fields, 0) |
| + |
| + if mod.mod_type == 'set': |
| + new_value, enforce_ge = mod.args |
| + if enforce_ge and new_value < value: |
| + raise errors.MonitoringDecreasingValueError(name, value, new_value) |
| + value = new_value |
| + elif mod.mod_type == 'incr': |
| + delta, modify_fn = mod.args |
| + if delta < 0: |
|
Sergey Berezin
2015/12/17 22:28:50
Technically, you can add a negative number to a di
dsansome
2015/12/18 00:54:33
Done.
|
| + raise errors.MonitoringDecreasingValueError(name, None, delta) |
| + value = modify_fn(value, delta) |
| + else: |
| + raise errors.UnknownModificationTypeError(mod.mod_type) |
| + |
| + values[mod.fields] = value |
| + |
| + return entry |
| + |
| + def _compare_and_set(self, modifications, namespace): |
| client = self._client() |
| # Metrics that we haven't updated yet. Metrics are removed from this dict |
| # when they're successfully updated - if there are any left they will be |
| # retried 10 times until everything has been updated. |
| - # We might have more than one modify_fn for a metric if different field |
| + # We might have more than one modification for a metric if different field |
| # values were updated. |
| - metrics = {} |
| remaining = collections.defaultdict(list) |
| - for metric, modify_fns in metric_modify_fns: |
| - metrics[metric.name] = metric |
| - remaining[metric.name].append(modify_fns) |
| + for modification in modifications: |
| + remaining[modification.name].append(modification) |
| failed_keys_count = 0 |
| @@ -209,7 +241,7 @@ class MemcacheMetricStore(metric_store.MetricStore): |
| for name in remaining: |
| # Pick one of the shards to modify. |
| - key = self._random_shard(metrics[name]) |
| + key = self._random_shard(self._state.metrics[name]) |
| keys.append(key) |
| key_map[key] = name |
| @@ -253,77 +285,27 @@ class MemcacheMetricStore(metric_store.MetricStore): |
| # Update the cas_failures metric with the number of failed keys, but don't |
| # do so recursively. |
| if (failed_keys_count and |
| - any(metric.name != cas_failures.name |
| - for metric, _ in metric_modify_fns)): |
| + any(modification.name != cas_failures.name |
| + for modification in modifications)): |
| cas_failures.increment_by(failed_keys_count) |
| - def _create_modify_metric_fn(self, name, fields, modify_value_fn, delta): |
| - """Returns a function that modifies a memcache row value. |
| - |
| - Calls modify_value_fn(old_value, delta) and puts the result back in the |
| - memcache row. |
| - """ |
| - |
| - def modify_fn(entry): |
| - if entry is None: |
| - entry = (self._start_time(name), {}) |
| - |
| - _, targets = entry |
| - |
| - target_key = self._target_key() |
| - if target_key not in targets: |
| - targets[target_key] = {} |
| - values = targets[target_key] |
| - |
| - values[fields] = modify_value_fn(values.get(fields, 0), delta) |
| - |
| - return entry |
| - return modify_fn |
| - |
| def _compare_and_set_metrics(self, modifications): |
| - if any(name in self.METRIC_NAMES_EXCLUDED_FROM_INDEX |
| - for name, fields, modify_value_fn, delta in modifications): |
| + if any(mod.name in self.METRIC_NAMES_EXCLUDED_FROM_INDEX |
| + for mod in modifications): |
| raise errors.MonitoringError('Metric is magical, can\'t set it') |
| - metric_modify_fns = [ |
| - (self._state.metrics[name], |
| - self._create_modify_metric_fn(name, fields, modify_value_fn, delta)) |
| - for name, fields, modify_value_fn, delta in modifications] |
| - self._compare_and_set(metric_modify_fns, self._namespace_for_job()) |
| - |
| - def _create_set_value_fn(self, name, value, enforce_ge): |
| - def modify_fn(old_value, _delta): |
| - if enforce_ge and old_value is not None and value < old_value: |
| - raise errors.MonitoringDecreasingValueError(name, old_value, value) |
| - return value |
| - return modify_fn |
| + self._compare_and_set(modifications, self._namespace_for_job()) |
| def set(self, name, fields, value, enforce_ge=False): |
| - modify_fn = self._create_set_value_fn(name, value, enforce_ge) |
| - self._compare_and_set_metrics([(name, fields, modify_fn, None)]) |
| + self._compare_and_set_metrics([metric_store.Modification( |
| + name, fields, 'set', (value, enforce_ge))]) |
| def incr(self, name, fields, delta, modify_fn=operator.add): |
| - if delta < 0: |
| - raise errors.MonitoringDecreasingValueError(name, None, delta) |
| - self._compare_and_set_metrics([(name, fields, modify_fn, delta)]) |
| + self._compare_and_set_metrics([metric_store.Modification( |
| + name, fields, 'incr', (delta, modify_fn))]) |
| def modify_multi(self, modifications): |
| - mods = [] |
| - for mod in modifications: |
| - if mod.mod_type == 'set': |
| - value, enforce_ge = mod.args |
| - modify_fn = self._create_set_value_fn(mod.name, value, enforce_ge) |
| - delta = None |
| - elif mod.mod_type == 'incr': |
| - delta, modify_fn = mod.args |
| - if delta < 0: |
| - raise errors.MonitoringDecreasingValueError(mod.name, None, delta) |
| - else: |
| - raise errors.UnknownModificationTypeError(mod.mod_type) |
| - |
| - mods.append((mod.name, mod.fields, modify_fn, delta)) |
| - |
| - self._compare_and_set_metrics(mods) |
| + self._compare_and_set_metrics(modifications) |
| def reset_for_unittest(self, name=None): |
| if name is None: |