| Index: client/third_party/infra_libs/ts_mon/common/metric_store.py
|
| diff --git a/client/third_party/infra_libs/ts_mon/common/metric_store.py b/client/third_party/infra_libs/ts_mon/common/metric_store.py
|
| index 7c887a5b5532ab1ba47bba194dcb9b450f64e847..8e202064a092c7d4fe258d28828faf1c2549d1d4 100644
|
| --- a/client/third_party/infra_libs/ts_mon/common/metric_store.py
|
| +++ b/client/third_party/infra_libs/ts_mon/common/metric_store.py
|
| @@ -11,19 +11,6 @@ import time
|
| from infra_libs.ts_mon.common import errors
|
|
|
|
|
| -"""A light-weight representation of a set or an incr.
|
| -
|
| -Args:
|
| - name: The metric name.
|
| - fields: The normalized field tuple.
|
| - mod_type: Either 'set' or 'incr'. Other values will raise
|
| - UnknownModificationTypeError when it's used.
|
| - args: (value, enforce_ge) for 'set' or (delta, modify_fn) for 'incr'.
|
| -""" # pylint: disable=pointless-string-statement
|
| -Modification = collections.namedtuple(
|
| - 'Modification', ['name', 'fields', 'mod_type', 'args'])
|
| -
|
| -
|
| def default_modify_fn(name):
|
| def _modify_fn(value, delta):
|
| if delta < 0:
|
| @@ -114,14 +101,6 @@ class MetricStore(object):
|
| """
|
| raise NotImplementedError
|
|
|
| - def modify_multi(self, modifications):
|
| - """Modifies multiple metrics in one go.
|
| -
|
| - Args:
|
| - modifications: an iterable of Modification objects.
|
| - """
|
| - raise NotImplementedError
|
| -
|
| def reset_for_unittest(self, name=None):
|
| """Clears the values metrics. Useful in unittests.
|
|
|
| @@ -131,14 +110,6 @@ class MetricStore(object):
|
| """
|
| raise NotImplementedError
|
|
|
| - def initialize_context(self):
|
| - """Opens a request-local context for deferring metric updates."""
|
| - pass # pragma: no cover
|
| -
|
| - def finalize_context(self):
|
| - """Closes a request-local context opened by initialize_context."""
|
| - pass # pragma: no cover
|
| -
|
| def _start_time(self, name):
|
| if name in self._state.metrics:
|
| ret = self._state.metrics[name].start_time
|
| @@ -147,87 +118,48 @@ class MetricStore(object):
|
|
|
| return self._time_fn()
|
|
|
| - @staticmethod
|
| - def _normalize_target_fields(target_fields):
|
| - """Converts target fields into a hashable tuple.
|
| -
|
| - Args:
|
| - target_fields (dict): target fields to override the default target.
|
| - """
|
| - if not target_fields:
|
| - target_fields = {}
|
| - return tuple(sorted(target_fields.iteritems()))
|
| -
|
| -
|
| -class MetricFieldsValues(object):
|
| - def __init__(self):
|
| - # Map normalized fields to single metric values.
|
| - self._values = {}
|
| - self._thread_lock = threading.Lock()
|
|
|
| - def get_value(self, fields, default=None):
|
| - return self._values.get(fields, default)
|
| +class _TargetFieldsValues(object):
|
| + """Holds all values for a single metric.
|
|
|
| - def set_value(self, fields, value):
|
| - self._values[fields] = value
|
| + Values are keyed by metric fields and target fields (which override the
|
| + default target fields configured globally for the process).
|
| + """
|
|
|
| - def iteritems(self):
|
| - # Make a copy of the metric values in case another thread (or this
|
| - # generator's consumer) modifies them while we're iterating.
|
| - with self._thread_lock:
|
| - values = copy.copy(self._values)
|
| - for fields, value in values.iteritems():
|
| - yield fields, value
|
| + def __init__(self, start_time):
|
| + self.start_time = start_time
|
|
|
| + # {normalized_target_fields: {normalized_metric_fields: value}}
|
| + self._values = collections.defaultdict(dict)
|
|
|
| -class TargetFieldsValues(object):
|
| - def __init__(self, store):
|
| - # Map normalized target fields to MetricFieldsValues.
|
| - self._values = collections.defaultdict(MetricFieldsValues)
|
| - self._store = store
|
| - self._thread_lock = threading.Lock()
|
| + def _get_target_values(self, target_fields):
|
| + # Normalize the target fields by converting them into a hashable tuple.
|
| + if not target_fields:
|
| + target_fields = {}
|
| + key = tuple(sorted(target_fields.iteritems()))
|
|
|
| - def get_target_values(self, target_fields):
|
| - key = self._store._normalize_target_fields(target_fields)
|
| return self._values[key]
|
|
|
| def get_value(self, fields, target_fields, default=None):
|
| - return self.get_target_values(target_fields).get_value(
|
| + return self._get_target_values(target_fields).get(
|
| fields, default)
|
|
|
| def set_value(self, fields, target_fields, value):
|
| - self.get_target_values(target_fields).set_value(fields, value)
|
| + self._get_target_values(target_fields)[fields] = value
|
|
|
| - def iter_targets(self):
|
| - # Make a copy of the values in case another thread (or this
|
| - # generator's consumer) modifies them while we're iterating.
|
| - with self._thread_lock:
|
| - values = copy.copy(self._values)
|
| - for target_fields, fields_values in values.iteritems():
|
| - target = copy.copy(self._store._state.target)
|
| + def iter_targets(self, default_target):
|
| + for target_fields, fields_values in self._values.iteritems():
|
| if target_fields:
|
| + target = copy.copy(default_target)
|
| target.update({k: v for k, v in target_fields})
|
| + else:
|
| + target = default_target
|
| yield target, fields_values
|
|
|
| -
|
| -class MetricValues(object):
|
| - def __init__(self, store, start_time):
|
| - self._start_time = start_time
|
| - self._values = TargetFieldsValues(store)
|
| -
|
| - @property
|
| - def start_time(self):
|
| - return self._start_time
|
| -
|
| - @property
|
| - def values(self):
|
| - return self._values
|
| -
|
| - def get_value(self, fields, target_fields, default=None):
|
| - return self.values.get_value(fields, target_fields, default)
|
| -
|
| - def set_value(self, fields, target_fields, value):
|
| - self.values.set_value(fields, target_fields, value)
|
| + def __deepcopy__(self, memo_dict):
|
| + ret = _TargetFieldsValues(self.start_time)
|
| + ret._values = copy.deepcopy(self._values, memo_dict)
|
| + return ret
|
|
|
|
|
| class InProcessMetricStore(MetricStore):
|
| @@ -250,20 +182,22 @@ class InProcessMetricStore(MetricStore):
|
|
|
| def iter_field_values(self, name):
|
| return itertools.chain.from_iterable(
|
| - x.iteritems() for _, x in self._entry(name).values.iter_targets())
|
| + x.iteritems() for _, x
|
| + in self._entry(name).iter_targets(self._state.target))
|
|
|
| def get_all(self):
|
| # Make a copy of the metric values in case another thread (or this
|
| # generator's consumer) modifies them while we're iterating.
|
| with self._thread_lock:
|
| - values = copy.copy(self._values)
|
| - end_time = self._time_fn()
|
| + values = copy.deepcopy(self._values)
|
| + end_time = self._time_fn()
|
|
|
| for name, metric_values in values.iteritems():
|
| if name not in self._state.metrics:
|
| continue
|
| start_time = metric_values.start_time
|
| - for target, fields_values in metric_values.values.iter_targets():
|
| + for target, fields_values in metric_values.iter_targets(
|
| + self._state.target):
|
| yield (target, self._state.metrics[name], start_time, end_time,
|
| fields_values)
|
|
|
| @@ -287,11 +221,6 @@ class InProcessMetricStore(MetricStore):
|
| self._entry(name).set_value(fields, target_fields, modify_fn(
|
| self.get(name, fields, target_fields, 0), delta))
|
|
|
| - def modify_multi(self, modifications):
|
| - # This is only used by DeferredMetricStore on top of MemcacheMetricStore,
|
| - # but could be implemented here if required in the future.
|
| - raise NotImplementedError
|
| -
|
| def reset_for_unittest(self, name=None):
|
| if name is not None:
|
| self._reset(name)
|
| @@ -300,4 +229,4 @@ class InProcessMetricStore(MetricStore):
|
| self._reset(name)
|
|
|
| def _reset(self, name):
|
| - self._values[name] = MetricValues(self, self._start_time(name))
|
| + self._values[name] = _TargetFieldsValues(self._start_time(name))
|
|
|