Index: infra_libs/ts_mon/common/metric_store.py |
diff --git a/infra_libs/ts_mon/common/metric_store.py b/infra_libs/ts_mon/common/metric_store.py |
deleted file mode 100644 |
index 4d21feeabd964d1a1a1354f0f77bbab14eb355d9..0000000000000000000000000000000000000000 |
--- a/infra_libs/ts_mon/common/metric_store.py |
+++ /dev/null |
@@ -1,301 +0,0 @@ |
-# Copyright 2015 The Chromium Authors. All rights reserved. |
-# Use of this source code is governed by a BSD-style license that can be |
-# found in the LICENSE file. |
- |
-import collections |
-import copy |
-import itertools |
-import threading |
-import time |
- |
-from infra_libs.ts_mon.common import errors |
- |
- |
-"""A light-weight representation of a set or an incr. |
- |
-Args: |
- name: The metric name. |
- fields: The normalized field tuple. |
- mod_type: Either 'set' or 'incr'. Other values will raise |
- UnknownModificationTypeError when it's used. |
- args: (value, enforce_ge) for 'set' or (delta, modify_fn) for 'incr'. |
-""" # pylint: disable=pointless-string-statement |
-Modification = collections.namedtuple( |
- 'Modification', ['name', 'fields', 'mod_type', 'args']) |
- |
- |
-def default_modify_fn(name): |
- def _modify_fn(value, delta): |
- if delta < 0: |
- raise errors.MonitoringDecreasingValueError(name, None, delta) |
- return value + delta |
- return _modify_fn |
- |
- |
-class MetricStore(object): |
- """A place to store values for each metric. |
- |
- Several methods take "a normalized field tuple". This is a tuple of |
- (key, value) tuples sorted by key. (The reason this is given as a tuple |
- instead of a dict is because tuples are hashable and can be used as dict keys, |
- dicts can not). |
- |
- The MetricStore is also responsible for keeping the start_time of each metric. |
- This is what goes into the start_timestamp_us field in the MetricsData proto |
- for cumulative metrics and distributions, and helps Monarch identify when a |
- counter was reset. This is the MetricStore's job because an implementation |
- might share counter values across multiple instances of a task (like on |
- Appengine), so the start time must be associated with that value so that it |
- can be reset for all tasks at once when the value is reset. |
- |
- External metric stores (like those backed by memcache) may be cleared (either |
- wholly or partially) at any time. When this happens the MetricStore *must* |
- generate a new start_time for all the affected metrics. |
- |
- Metrics can specify their own explicit start time if they are mirroring the |
- value of some external counter that started counting at a known time. |
- |
- Otherwise the MetricStore's time_fn (defaults to time.time()) is called the |
- first time a metric is set or incremented, or after it is cleared externally. |
- """ |
- |
- def __init__(self, state, time_fn=None): |
- self._state = state |
- self._time_fn = time_fn or time.time |
- |
- def get(self, name, fields, target_fields, default=None): |
- """Fetches the current value for the metric. |
- |
- Args: |
- name (string): the metric's name. |
- fields (tuple): a normalized field tuple. |
- target_fields (dict or None): target fields to override. |
- default: the value to return if the metric has no value of this set of |
- field values. |
- """ |
- raise NotImplementedError |
- |
- def get_all(self): |
- """Returns an iterator over all the metrics present in the store. |
- |
- The iterator yields 4-tuples: |
- (target, metric, start_time, field_values) |
- """ |
- raise NotImplementedError |
- |
- def set(self, name, fields, target_fields, value, enforce_ge=False): |
- """Sets the metric's value. |
- |
- Args: |
- name: the metric's name. |
- fields: a normalized field tuple. |
- target_fields (dict or None): target fields to override. |
- value: the new value for the metric. |
- enforce_ge: if this is True, raise an exception if the new value is |
- less than the old value. |
- |
- Raises: |
- MonitoringDecreasingValueError: if enforce_ge is True and the new value is |
- smaller than the old value. |
- """ |
- raise NotImplementedError |
- |
- def incr(self, name, fields, target_fields, delta, modify_fn=None): |
- """Increments the metric's value. |
- |
- Args: |
- name: the metric's name. |
- fields: a normalized field tuple. |
- target_fields (dict or None): target fields to override. |
- delta: how much to increment the value by. |
- modify_fn: this function is called with the original value and the delta |
- as its arguments and is expected to return the new value. The |
- function must be idempotent as it may be called multiple times. |
- """ |
- raise NotImplementedError |
- |
- def modify_multi(self, modifications): |
- """Modifies multiple metrics in one go. |
- |
- Args: |
- modifications: an iterable of Modification objects. |
- """ |
- raise NotImplementedError |
- |
- def reset_for_unittest(self, name=None): |
- """Clears the values metrics. Useful in unittests. |
- |
- Args: |
- name: the name of an individual metric to reset, or if None resets all |
- metrics. |
- """ |
- raise NotImplementedError |
- |
- def initialize_context(self): |
- """Opens a request-local context for deferring metric updates.""" |
- pass # pragma: no cover |
- |
- def finalize_context(self): |
- """Closes a request-local context opened by initialize_context.""" |
- pass # pragma: no cover |
- |
- def _start_time(self, name): |
- if name in self._state.metrics: |
- ret = self._state.metrics[name].start_time |
- if ret is not None: |
- return ret |
- |
- return self._time_fn() |
- |
- @staticmethod |
- def _normalize_target_fields(target_fields): |
- """Converts target fields into a hashable tuple. |
- |
- Args: |
- target_fields (dict): target fields to override the default target. |
- """ |
- if not target_fields: |
- target_fields = {} |
- return tuple(sorted(target_fields.iteritems())) |
- |
- |
-class MetricFieldsValues(object): |
- def __init__(self): |
- # Map normalized fields to single metric values. |
- self._values = {} |
- self._thread_lock = threading.Lock() |
- |
- def get_value(self, fields, default=None): |
- return self._values.get(fields, default) |
- |
- def set_value(self, fields, value): |
- self._values[fields] = value |
- |
- def iteritems(self): |
- # Make a copy of the metric values in case another thread (or this |
- # generator's consumer) modifies them while we're iterating. |
- with self._thread_lock: |
- values = copy.copy(self._values) |
- for fields, value in values.iteritems(): |
- yield fields, value |
- |
- |
-class TargetFieldsValues(object): |
- def __init__(self, store): |
- # Map normalized target fields to MetricFieldsValues. |
- self._values = collections.defaultdict(MetricFieldsValues) |
- self._store = store |
- self._thread_lock = threading.Lock() |
- |
- def get_target_values(self, target_fields): |
- key = self._store._normalize_target_fields(target_fields) |
- return self._values[key] |
- |
- def get_value(self, fields, target_fields, default=None): |
- return self.get_target_values(target_fields).get_value( |
- fields, default) |
- |
- def set_value(self, fields, target_fields, value): |
- self.get_target_values(target_fields).set_value(fields, value) |
- |
- def iter_targets(self): |
- # Make a copy of the values in case another thread (or this |
- # generator's consumer) modifies them while we're iterating. |
- with self._thread_lock: |
- values = copy.copy(self._values) |
- for target_fields, fields_values in values.iteritems(): |
- target = copy.copy(self._store._state.target) |
- if target_fields: |
- target.update({k: v for k, v in target_fields}) |
- yield target, fields_values |
- |
- |
-class MetricValues(object): |
- def __init__(self, store, start_time): |
- self._start_time = start_time |
- self._values = TargetFieldsValues(store) |
- |
- @property |
- def start_time(self): |
- return self._start_time |
- |
- @property |
- def values(self): |
- return self._values |
- |
- def get_value(self, fields, target_fields, default=None): |
- return self.values.get_value(fields, target_fields, default) |
- |
- def set_value(self, fields, target_fields, value): |
- self.values.set_value(fields, target_fields, value) |
- |
- |
-class InProcessMetricStore(MetricStore): |
- """A thread-safe metric store that keeps values in memory.""" |
- |
- def __init__(self, state, time_fn=None): |
- super(InProcessMetricStore, self).__init__(state, time_fn=time_fn) |
- |
- self._values = {} |
- self._thread_lock = threading.Lock() |
- |
- def _entry(self, name): |
- if name not in self._values: |
- self._reset(name) |
- |
- return self._values[name] |
- |
- def get(self, name, fields, target_fields, default=None): |
- return self._entry(name).get_value(fields, target_fields, default) |
- |
- def iter_field_values(self, name): |
- return itertools.chain.from_iterable( |
- x.iteritems() for _, x in self._entry(name).values.iter_targets()) |
- |
- def get_all(self): |
- # Make a copy of the metric values in case another thread (or this |
- # generator's consumer) modifies them while we're iterating. |
- with self._thread_lock: |
- values = copy.copy(self._values) |
- |
- for name, metric_values in values.iteritems(): |
- if name not in self._state.metrics: |
- continue |
- start_time = metric_values.start_time |
- for target, fields_values in metric_values.values.iter_targets(): |
- yield target, self._state.metrics[name], start_time, fields_values |
- |
- def set(self, name, fields, target_fields, value, enforce_ge=False): |
- with self._thread_lock: |
- if enforce_ge: |
- old_value = self._entry(name).get_value(fields, target_fields, 0) |
- if value < old_value: |
- raise errors.MonitoringDecreasingValueError(name, old_value, value) |
- |
- self._entry(name).set_value(fields, target_fields, value) |
- |
- def incr(self, name, fields, target_fields, delta, modify_fn=None): |
- if delta < 0: |
- raise errors.MonitoringDecreasingValueError(name, None, delta) |
- |
- if modify_fn is None: |
- modify_fn = default_modify_fn(name) |
- |
- with self._thread_lock: |
- self._entry(name).set_value(fields, target_fields, modify_fn( |
- self.get(name, fields, target_fields, 0), delta)) |
- |
- def modify_multi(self, modifications): |
- # This is only used by DeferredMetricStore on top of MemcacheMetricStore, |
- # but could be implemented here if required in the future. |
- raise NotImplementedError |
- |
- def reset_for_unittest(self, name=None): |
- if name is not None: |
- self._reset(name) |
- else: |
- for name in self._values.keys(): |
- self._reset(name) |
- |
- def _reset(self, name): |
- self._values[name] = MetricValues(self, self._start_time(name)) |