Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1214)

Unified Diff: infra_libs/ts_mon/common/metric_store.py

Issue 2213143002: Add infra_libs as a bootstrap dependency. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Removed the ugly import hack Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: infra_libs/ts_mon/common/metric_store.py
diff --git a/infra_libs/ts_mon/common/metric_store.py b/infra_libs/ts_mon/common/metric_store.py
deleted file mode 100644
index 4d21feeabd964d1a1a1354f0f77bbab14eb355d9..0000000000000000000000000000000000000000
--- a/infra_libs/ts_mon/common/metric_store.py
+++ /dev/null
@@ -1,301 +0,0 @@
-# Copyright 2015 The Chromium Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-import collections
-import copy
-import itertools
-import threading
-import time
-
-from infra_libs.ts_mon.common import errors
-
-
-"""A light-weight representation of a set or an incr.
-
-Args:
- name: The metric name.
- fields: The normalized field tuple.
- mod_type: Either 'set' or 'incr'. Other values will raise
- UnknownModificationTypeError when it's used.
- args: (value, enforce_ge) for 'set' or (delta, modify_fn) for 'incr'.
-""" # pylint: disable=pointless-string-statement
-Modification = collections.namedtuple(
- 'Modification', ['name', 'fields', 'mod_type', 'args'])
-
-
-def default_modify_fn(name):
- def _modify_fn(value, delta):
- if delta < 0:
- raise errors.MonitoringDecreasingValueError(name, None, delta)
- return value + delta
- return _modify_fn
-
-
-class MetricStore(object):
- """A place to store values for each metric.
-
- Several methods take "a normalized field tuple". This is a tuple of
- (key, value) tuples sorted by key. (The reason this is given as a tuple
- instead of a dict is because tuples are hashable and can be used as dict keys,
- dicts can not).
-
- The MetricStore is also responsible for keeping the start_time of each metric.
- This is what goes into the start_timestamp_us field in the MetricsData proto
- for cumulative metrics and distributions, and helps Monarch identify when a
- counter was reset. This is the MetricStore's job because an implementation
- might share counter values across multiple instances of a task (like on
- Appengine), so the start time must be associated with that value so that it
- can be reset for all tasks at once when the value is reset.
-
- External metric stores (like those backed by memcache) may be cleared (either
- wholly or partially) at any time. When this happens the MetricStore *must*
- generate a new start_time for all the affected metrics.
-
- Metrics can specify their own explicit start time if they are mirroring the
- value of some external counter that started counting at a known time.
-
- Otherwise the MetricStore's time_fn (defaults to time.time()) is called the
- first time a metric is set or incremented, or after it is cleared externally.
- """
-
- def __init__(self, state, time_fn=None):
- self._state = state
- self._time_fn = time_fn or time.time
-
- def get(self, name, fields, target_fields, default=None):
- """Fetches the current value for the metric.
-
- Args:
- name (string): the metric's name.
- fields (tuple): a normalized field tuple.
- target_fields (dict or None): target fields to override.
- default: the value to return if the metric has no value of this set of
- field values.
- """
- raise NotImplementedError
-
- def get_all(self):
- """Returns an iterator over all the metrics present in the store.
-
- The iterator yields 4-tuples:
- (target, metric, start_time, field_values)
- """
- raise NotImplementedError
-
- def set(self, name, fields, target_fields, value, enforce_ge=False):
- """Sets the metric's value.
-
- Args:
- name: the metric's name.
- fields: a normalized field tuple.
- target_fields (dict or None): target fields to override.
- value: the new value for the metric.
- enforce_ge: if this is True, raise an exception if the new value is
- less than the old value.
-
- Raises:
- MonitoringDecreasingValueError: if enforce_ge is True and the new value is
- smaller than the old value.
- """
- raise NotImplementedError
-
- def incr(self, name, fields, target_fields, delta, modify_fn=None):
- """Increments the metric's value.
-
- Args:
- name: the metric's name.
- fields: a normalized field tuple.
- target_fields (dict or None): target fields to override.
- delta: how much to increment the value by.
- modify_fn: this function is called with the original value and the delta
- as its arguments and is expected to return the new value. The
- function must be idempotent as it may be called multiple times.
- """
- raise NotImplementedError
-
- def modify_multi(self, modifications):
- """Modifies multiple metrics in one go.
-
- Args:
- modifications: an iterable of Modification objects.
- """
- raise NotImplementedError
-
- def reset_for_unittest(self, name=None):
- """Clears the values metrics. Useful in unittests.
-
- Args:
- name: the name of an individual metric to reset, or if None resets all
- metrics.
- """
- raise NotImplementedError
-
- def initialize_context(self):
- """Opens a request-local context for deferring metric updates."""
- pass # pragma: no cover
-
- def finalize_context(self):
- """Closes a request-local context opened by initialize_context."""
- pass # pragma: no cover
-
- def _start_time(self, name):
- if name in self._state.metrics:
- ret = self._state.metrics[name].start_time
- if ret is not None:
- return ret
-
- return self._time_fn()
-
- @staticmethod
- def _normalize_target_fields(target_fields):
- """Converts target fields into a hashable tuple.
-
- Args:
- target_fields (dict): target fields to override the default target.
- """
- if not target_fields:
- target_fields = {}
- return tuple(sorted(target_fields.iteritems()))
-
-
-class MetricFieldsValues(object):
- def __init__(self):
- # Map normalized fields to single metric values.
- self._values = {}
- self._thread_lock = threading.Lock()
-
- def get_value(self, fields, default=None):
- return self._values.get(fields, default)
-
- def set_value(self, fields, value):
- self._values[fields] = value
-
- def iteritems(self):
- # Make a copy of the metric values in case another thread (or this
- # generator's consumer) modifies them while we're iterating.
- with self._thread_lock:
- values = copy.copy(self._values)
- for fields, value in values.iteritems():
- yield fields, value
-
-
-class TargetFieldsValues(object):
- def __init__(self, store):
- # Map normalized target fields to MetricFieldsValues.
- self._values = collections.defaultdict(MetricFieldsValues)
- self._store = store
- self._thread_lock = threading.Lock()
-
- def get_target_values(self, target_fields):
- key = self._store._normalize_target_fields(target_fields)
- return self._values[key]
-
- def get_value(self, fields, target_fields, default=None):
- return self.get_target_values(target_fields).get_value(
- fields, default)
-
- def set_value(self, fields, target_fields, value):
- self.get_target_values(target_fields).set_value(fields, value)
-
- def iter_targets(self):
- # Make a copy of the values in case another thread (or this
- # generator's consumer) modifies them while we're iterating.
- with self._thread_lock:
- values = copy.copy(self._values)
- for target_fields, fields_values in values.iteritems():
- target = copy.copy(self._store._state.target)
- if target_fields:
- target.update({k: v for k, v in target_fields})
- yield target, fields_values
-
-
-class MetricValues(object):
- def __init__(self, store, start_time):
- self._start_time = start_time
- self._values = TargetFieldsValues(store)
-
- @property
- def start_time(self):
- return self._start_time
-
- @property
- def values(self):
- return self._values
-
- def get_value(self, fields, target_fields, default=None):
- return self.values.get_value(fields, target_fields, default)
-
- def set_value(self, fields, target_fields, value):
- self.values.set_value(fields, target_fields, value)
-
-
-class InProcessMetricStore(MetricStore):
- """A thread-safe metric store that keeps values in memory."""
-
- def __init__(self, state, time_fn=None):
- super(InProcessMetricStore, self).__init__(state, time_fn=time_fn)
-
- self._values = {}
- self._thread_lock = threading.Lock()
-
- def _entry(self, name):
- if name not in self._values:
- self._reset(name)
-
- return self._values[name]
-
- def get(self, name, fields, target_fields, default=None):
- return self._entry(name).get_value(fields, target_fields, default)
-
- def iter_field_values(self, name):
- return itertools.chain.from_iterable(
- x.iteritems() for _, x in self._entry(name).values.iter_targets())
-
- def get_all(self):
- # Make a copy of the metric values in case another thread (or this
- # generator's consumer) modifies them while we're iterating.
- with self._thread_lock:
- values = copy.copy(self._values)
-
- for name, metric_values in values.iteritems():
- if name not in self._state.metrics:
- continue
- start_time = metric_values.start_time
- for target, fields_values in metric_values.values.iter_targets():
- yield target, self._state.metrics[name], start_time, fields_values
-
- def set(self, name, fields, target_fields, value, enforce_ge=False):
- with self._thread_lock:
- if enforce_ge:
- old_value = self._entry(name).get_value(fields, target_fields, 0)
- if value < old_value:
- raise errors.MonitoringDecreasingValueError(name, old_value, value)
-
- self._entry(name).set_value(fields, target_fields, value)
-
- def incr(self, name, fields, target_fields, delta, modify_fn=None):
- if delta < 0:
- raise errors.MonitoringDecreasingValueError(name, None, delta)
-
- if modify_fn is None:
- modify_fn = default_modify_fn(name)
-
- with self._thread_lock:
- self._entry(name).set_value(fields, target_fields, modify_fn(
- self.get(name, fields, target_fields, 0), delta))
-
- def modify_multi(self, modifications):
- # This is only used by DeferredMetricStore on top of MemcacheMetricStore,
- # but could be implemented here if required in the future.
- raise NotImplementedError
-
- def reset_for_unittest(self, name=None):
- if name is not None:
- self._reset(name)
- else:
- for name in self._values.keys():
- self._reset(name)
-
- def _reset(self, name):
- self._values[name] = MetricValues(self, self._start_time(name))

Powered by Google App Engine
This is Rietveld 408576698