| Index: infra_libs/ts_mon/common/metric_store.py
|
| diff --git a/infra_libs/ts_mon/common/metric_store.py b/infra_libs/ts_mon/common/metric_store.py
|
| deleted file mode 100644
|
| index 4d21feeabd964d1a1a1354f0f77bbab14eb355d9..0000000000000000000000000000000000000000
|
| --- a/infra_libs/ts_mon/common/metric_store.py
|
| +++ /dev/null
|
| @@ -1,301 +0,0 @@
|
| -# Copyright 2015 The Chromium Authors. All rights reserved.
|
| -# Use of this source code is governed by a BSD-style license that can be
|
| -# found in the LICENSE file.
|
| -
|
| -import collections
|
| -import copy
|
| -import itertools
|
| -import threading
|
| -import time
|
| -
|
| -from infra_libs.ts_mon.common import errors
|
| -
|
| -
|
| -"""A light-weight representation of a set or an incr.
|
| -
|
| -Args:
|
| - name: The metric name.
|
| - fields: The normalized field tuple.
|
| - mod_type: Either 'set' or 'incr'. Other values will raise
|
| - UnknownModificationTypeError when it's used.
|
| - args: (value, enforce_ge) for 'set' or (delta, modify_fn) for 'incr'.
|
| -""" # pylint: disable=pointless-string-statement
|
| -Modification = collections.namedtuple(
|
| - 'Modification', ['name', 'fields', 'mod_type', 'args'])
|
| -
|
| -
|
| -def default_modify_fn(name):
|
| - def _modify_fn(value, delta):
|
| - if delta < 0:
|
| - raise errors.MonitoringDecreasingValueError(name, None, delta)
|
| - return value + delta
|
| - return _modify_fn
|
| -
|
| -
|
| -class MetricStore(object):
|
| - """A place to store values for each metric.
|
| -
|
| - Several methods take "a normalized field tuple". This is a tuple of
|
| - (key, value) tuples sorted by key. (The reason this is given as a tuple
|
| - instead of a dict is because tuples are hashable and can be used as dict keys,
|
| - dicts can not).
|
| -
|
| - The MetricStore is also responsible for keeping the start_time of each metric.
|
| - This is what goes into the start_timestamp_us field in the MetricsData proto
|
| - for cumulative metrics and distributions, and helps Monarch identify when a
|
| - counter was reset. This is the MetricStore's job because an implementation
|
| - might share counter values across multiple instances of a task (like on
|
| - Appengine), so the start time must be associated with that value so that it
|
| - can be reset for all tasks at once when the value is reset.
|
| -
|
| - External metric stores (like those backed by memcache) may be cleared (either
|
| - wholly or partially) at any time. When this happens the MetricStore *must*
|
| - generate a new start_time for all the affected metrics.
|
| -
|
| - Metrics can specify their own explicit start time if they are mirroring the
|
| - value of some external counter that started counting at a known time.
|
| -
|
| - Otherwise the MetricStore's time_fn (defaults to time.time()) is called the
|
| - first time a metric is set or incremented, or after it is cleared externally.
|
| - """
|
| -
|
| - def __init__(self, state, time_fn=None):
|
| - self._state = state
|
| - self._time_fn = time_fn or time.time
|
| -
|
| - def get(self, name, fields, target_fields, default=None):
|
| - """Fetches the current value for the metric.
|
| -
|
| - Args:
|
| - name (string): the metric's name.
|
| - fields (tuple): a normalized field tuple.
|
| - target_fields (dict or None): target fields to override.
|
| - default: the value to return if the metric has no value of this set of
|
| - field values.
|
| - """
|
| - raise NotImplementedError
|
| -
|
| - def get_all(self):
|
| - """Returns an iterator over all the metrics present in the store.
|
| -
|
| - The iterator yields 4-tuples:
|
| - (target, metric, start_time, field_values)
|
| - """
|
| - raise NotImplementedError
|
| -
|
| - def set(self, name, fields, target_fields, value, enforce_ge=False):
|
| - """Sets the metric's value.
|
| -
|
| - Args:
|
| - name: the metric's name.
|
| - fields: a normalized field tuple.
|
| - target_fields (dict or None): target fields to override.
|
| - value: the new value for the metric.
|
| - enforce_ge: if this is True, raise an exception if the new value is
|
| - less than the old value.
|
| -
|
| - Raises:
|
| - MonitoringDecreasingValueError: if enforce_ge is True and the new value is
|
| - smaller than the old value.
|
| - """
|
| - raise NotImplementedError
|
| -
|
| - def incr(self, name, fields, target_fields, delta, modify_fn=None):
|
| - """Increments the metric's value.
|
| -
|
| - Args:
|
| - name: the metric's name.
|
| - fields: a normalized field tuple.
|
| - target_fields (dict or None): target fields to override.
|
| - delta: how much to increment the value by.
|
| - modify_fn: this function is called with the original value and the delta
|
| - as its arguments and is expected to return the new value. The
|
| - function must be idempotent as it may be called multiple times.
|
| - """
|
| - raise NotImplementedError
|
| -
|
| - def modify_multi(self, modifications):
|
| - """Modifies multiple metrics in one go.
|
| -
|
| - Args:
|
| - modifications: an iterable of Modification objects.
|
| - """
|
| - raise NotImplementedError
|
| -
|
| - def reset_for_unittest(self, name=None):
|
| - """Clears the values metrics. Useful in unittests.
|
| -
|
| - Args:
|
| - name: the name of an individual metric to reset, or if None resets all
|
| - metrics.
|
| - """
|
| - raise NotImplementedError
|
| -
|
| - def initialize_context(self):
|
| - """Opens a request-local context for deferring metric updates."""
|
| - pass # pragma: no cover
|
| -
|
| - def finalize_context(self):
|
| - """Closes a request-local context opened by initialize_context."""
|
| - pass # pragma: no cover
|
| -
|
| - def _start_time(self, name):
|
| - if name in self._state.metrics:
|
| - ret = self._state.metrics[name].start_time
|
| - if ret is not None:
|
| - return ret
|
| -
|
| - return self._time_fn()
|
| -
|
| - @staticmethod
|
| - def _normalize_target_fields(target_fields):
|
| - """Converts target fields into a hashable tuple.
|
| -
|
| - Args:
|
| - target_fields (dict): target fields to override the default target.
|
| - """
|
| - if not target_fields:
|
| - target_fields = {}
|
| - return tuple(sorted(target_fields.iteritems()))
|
| -
|
| -
|
| -class MetricFieldsValues(object):
|
| - def __init__(self):
|
| - # Map normalized fields to single metric values.
|
| - self._values = {}
|
| - self._thread_lock = threading.Lock()
|
| -
|
| - def get_value(self, fields, default=None):
|
| - return self._values.get(fields, default)
|
| -
|
| - def set_value(self, fields, value):
|
| - self._values[fields] = value
|
| -
|
| - def iteritems(self):
|
| - # Make a copy of the metric values in case another thread (or this
|
| - # generator's consumer) modifies them while we're iterating.
|
| - with self._thread_lock:
|
| - values = copy.copy(self._values)
|
| - for fields, value in values.iteritems():
|
| - yield fields, value
|
| -
|
| -
|
| -class TargetFieldsValues(object):
|
| - def __init__(self, store):
|
| - # Map normalized target fields to MetricFieldsValues.
|
| - self._values = collections.defaultdict(MetricFieldsValues)
|
| - self._store = store
|
| - self._thread_lock = threading.Lock()
|
| -
|
| - def get_target_values(self, target_fields):
|
| - key = self._store._normalize_target_fields(target_fields)
|
| - return self._values[key]
|
| -
|
| - def get_value(self, fields, target_fields, default=None):
|
| - return self.get_target_values(target_fields).get_value(
|
| - fields, default)
|
| -
|
| - def set_value(self, fields, target_fields, value):
|
| - self.get_target_values(target_fields).set_value(fields, value)
|
| -
|
| - def iter_targets(self):
|
| - # Make a copy of the values in case another thread (or this
|
| - # generator's consumer) modifies them while we're iterating.
|
| - with self._thread_lock:
|
| - values = copy.copy(self._values)
|
| - for target_fields, fields_values in values.iteritems():
|
| - target = copy.copy(self._store._state.target)
|
| - if target_fields:
|
| - target.update({k: v for k, v in target_fields})
|
| - yield target, fields_values
|
| -
|
| -
|
| -class MetricValues(object):
|
| - def __init__(self, store, start_time):
|
| - self._start_time = start_time
|
| - self._values = TargetFieldsValues(store)
|
| -
|
| - @property
|
| - def start_time(self):
|
| - return self._start_time
|
| -
|
| - @property
|
| - def values(self):
|
| - return self._values
|
| -
|
| - def get_value(self, fields, target_fields, default=None):
|
| - return self.values.get_value(fields, target_fields, default)
|
| -
|
| - def set_value(self, fields, target_fields, value):
|
| - self.values.set_value(fields, target_fields, value)
|
| -
|
| -
|
| -class InProcessMetricStore(MetricStore):
|
| - """A thread-safe metric store that keeps values in memory."""
|
| -
|
| - def __init__(self, state, time_fn=None):
|
| - super(InProcessMetricStore, self).__init__(state, time_fn=time_fn)
|
| -
|
| - self._values = {}
|
| - self._thread_lock = threading.Lock()
|
| -
|
| - def _entry(self, name):
|
| - if name not in self._values:
|
| - self._reset(name)
|
| -
|
| - return self._values[name]
|
| -
|
| - def get(self, name, fields, target_fields, default=None):
|
| - return self._entry(name).get_value(fields, target_fields, default)
|
| -
|
| - def iter_field_values(self, name):
|
| - return itertools.chain.from_iterable(
|
| - x.iteritems() for _, x in self._entry(name).values.iter_targets())
|
| -
|
| - def get_all(self):
|
| - # Make a copy of the metric values in case another thread (or this
|
| - # generator's consumer) modifies them while we're iterating.
|
| - with self._thread_lock:
|
| - values = copy.copy(self._values)
|
| -
|
| - for name, metric_values in values.iteritems():
|
| - if name not in self._state.metrics:
|
| - continue
|
| - start_time = metric_values.start_time
|
| - for target, fields_values in metric_values.values.iter_targets():
|
| - yield target, self._state.metrics[name], start_time, fields_values
|
| -
|
| - def set(self, name, fields, target_fields, value, enforce_ge=False):
|
| - with self._thread_lock:
|
| - if enforce_ge:
|
| - old_value = self._entry(name).get_value(fields, target_fields, 0)
|
| - if value < old_value:
|
| - raise errors.MonitoringDecreasingValueError(name, old_value, value)
|
| -
|
| - self._entry(name).set_value(fields, target_fields, value)
|
| -
|
| - def incr(self, name, fields, target_fields, delta, modify_fn=None):
|
| - if delta < 0:
|
| - raise errors.MonitoringDecreasingValueError(name, None, delta)
|
| -
|
| - if modify_fn is None:
|
| - modify_fn = default_modify_fn(name)
|
| -
|
| - with self._thread_lock:
|
| - self._entry(name).set_value(fields, target_fields, modify_fn(
|
| - self.get(name, fields, target_fields, 0), delta))
|
| -
|
| - def modify_multi(self, modifications):
|
| - # This is only used by DeferredMetricStore on top of MemcacheMetricStore,
|
| - # but could be implemented here if required in the future.
|
| - raise NotImplementedError
|
| -
|
| - def reset_for_unittest(self, name=None):
|
| - if name is not None:
|
| - self._reset(name)
|
| - else:
|
| - for name in self._values.keys():
|
| - self._reset(name)
|
| -
|
| - def _reset(self, name):
|
| - self._values[name] = MetricValues(self, self._start_time(name))
|
|
|