| OLD | NEW |
| (Empty) |
| 1 # Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 # Use of this source code is governed by a BSD-style license that can be | |
| 3 # found in the LICENSE file. | |
| 4 | |
| 5 import collections | |
| 6 import copy | |
| 7 import itertools | |
| 8 import threading | |
| 9 import time | |
| 10 | |
| 11 from infra_libs.ts_mon.common import errors | |
| 12 | |
| 13 | |
| 14 """A light-weight representation of a set or an incr. | |
| 15 | |
| 16 Args: | |
| 17 name: The metric name. | |
| 18 fields: The normalized field tuple. | |
| 19 mod_type: Either 'set' or 'incr'. Other values will raise | |
| 20 UnknownModificationTypeError when it's used. | |
| 21 args: (value, enforce_ge) for 'set' or (delta, modify_fn) for 'incr'. | |
| 22 """ # pylint: disable=pointless-string-statement | |
| 23 Modification = collections.namedtuple( | |
| 24 'Modification', ['name', 'fields', 'mod_type', 'args']) | |
| 25 | |
| 26 | |
| 27 def default_modify_fn(name): | |
| 28 def _modify_fn(value, delta): | |
| 29 if delta < 0: | |
| 30 raise errors.MonitoringDecreasingValueError(name, None, delta) | |
| 31 return value + delta | |
| 32 return _modify_fn | |
| 33 | |
| 34 | |
| 35 class MetricStore(object): | |
| 36 """A place to store values for each metric. | |
| 37 | |
| 38 Several methods take "a normalized field tuple". This is a tuple of | |
| 39 (key, value) tuples sorted by key. (The reason this is given as a tuple | |
| 40 instead of a dict is because tuples are hashable and can be used as dict keys, | |
| 41 dicts can not). | |
| 42 | |
| 43 The MetricStore is also responsible for keeping the start_time of each metric. | |
| 44 This is what goes into the start_timestamp_us field in the MetricsData proto | |
| 45 for cumulative metrics and distributions, and helps Monarch identify when a | |
| 46 counter was reset. This is the MetricStore's job because an implementation | |
| 47 might share counter values across multiple instances of a task (like on | |
| 48 Appengine), so the start time must be associated with that value so that it | |
| 49 can be reset for all tasks at once when the value is reset. | |
| 50 | |
| 51 External metric stores (like those backed by memcache) may be cleared (either | |
| 52 wholly or partially) at any time. When this happens the MetricStore *must* | |
| 53 generate a new start_time for all the affected metrics. | |
| 54 | |
| 55 Metrics can specify their own explicit start time if they are mirroring the | |
| 56 value of some external counter that started counting at a known time. | |
| 57 | |
| 58 Otherwise the MetricStore's time_fn (defaults to time.time()) is called the | |
| 59 first time a metric is set or incremented, or after it is cleared externally. | |
| 60 """ | |
| 61 | |
| 62 def __init__(self, state, time_fn=None): | |
| 63 self._state = state | |
| 64 self._time_fn = time_fn or time.time | |
| 65 | |
| 66 def get(self, name, fields, target_fields, default=None): | |
| 67 """Fetches the current value for the metric. | |
| 68 | |
| 69 Args: | |
| 70 name (string): the metric's name. | |
| 71 fields (tuple): a normalized field tuple. | |
| 72 target_fields (dict or None): target fields to override. | |
| 73 default: the value to return if the metric has no value of this set of | |
| 74 field values. | |
| 75 """ | |
| 76 raise NotImplementedError | |
| 77 | |
| 78 def get_all(self): | |
| 79 """Returns an iterator over all the metrics present in the store. | |
| 80 | |
| 81 The iterator yields 4-tuples: | |
| 82 (target, metric, start_time, field_values) | |
| 83 """ | |
| 84 raise NotImplementedError | |
| 85 | |
| 86 def set(self, name, fields, target_fields, value, enforce_ge=False): | |
| 87 """Sets the metric's value. | |
| 88 | |
| 89 Args: | |
| 90 name: the metric's name. | |
| 91 fields: a normalized field tuple. | |
| 92 target_fields (dict or None): target fields to override. | |
| 93 value: the new value for the metric. | |
| 94 enforce_ge: if this is True, raise an exception if the new value is | |
| 95 less than the old value. | |
| 96 | |
| 97 Raises: | |
| 98 MonitoringDecreasingValueError: if enforce_ge is True and the new value is | |
| 99 smaller than the old value. | |
| 100 """ | |
| 101 raise NotImplementedError | |
| 102 | |
| 103 def incr(self, name, fields, target_fields, delta, modify_fn=None): | |
| 104 """Increments the metric's value. | |
| 105 | |
| 106 Args: | |
| 107 name: the metric's name. | |
| 108 fields: a normalized field tuple. | |
| 109 target_fields (dict or None): target fields to override. | |
| 110 delta: how much to increment the value by. | |
| 111 modify_fn: this function is called with the original value and the delta | |
| 112 as its arguments and is expected to return the new value. The | |
| 113 function must be idempotent as it may be called multiple times. | |
| 114 """ | |
| 115 raise NotImplementedError | |
| 116 | |
| 117 def modify_multi(self, modifications): | |
| 118 """Modifies multiple metrics in one go. | |
| 119 | |
| 120 Args: | |
| 121 modifications: an iterable of Modification objects. | |
| 122 """ | |
| 123 raise NotImplementedError | |
| 124 | |
| 125 def reset_for_unittest(self, name=None): | |
| 126 """Clears the values metrics. Useful in unittests. | |
| 127 | |
| 128 Args: | |
| 129 name: the name of an individual metric to reset, or if None resets all | |
| 130 metrics. | |
| 131 """ | |
| 132 raise NotImplementedError | |
| 133 | |
| 134 def initialize_context(self): | |
| 135 """Opens a request-local context for deferring metric updates.""" | |
| 136 pass # pragma: no cover | |
| 137 | |
| 138 def finalize_context(self): | |
| 139 """Closes a request-local context opened by initialize_context.""" | |
| 140 pass # pragma: no cover | |
| 141 | |
| 142 def _start_time(self, name): | |
| 143 if name in self._state.metrics: | |
| 144 ret = self._state.metrics[name].start_time | |
| 145 if ret is not None: | |
| 146 return ret | |
| 147 | |
| 148 return self._time_fn() | |
| 149 | |
| 150 @staticmethod | |
| 151 def _normalize_target_fields(target_fields): | |
| 152 """Converts target fields into a hashable tuple. | |
| 153 | |
| 154 Args: | |
| 155 target_fields (dict): target fields to override the default target. | |
| 156 """ | |
| 157 if not target_fields: | |
| 158 target_fields = {} | |
| 159 return tuple(sorted(target_fields.iteritems())) | |
| 160 | |
| 161 | |
| 162 class MetricFieldsValues(object): | |
| 163 def __init__(self): | |
| 164 # Map normalized fields to single metric values. | |
| 165 self._values = {} | |
| 166 self._thread_lock = threading.Lock() | |
| 167 | |
| 168 def get_value(self, fields, default=None): | |
| 169 return self._values.get(fields, default) | |
| 170 | |
| 171 def set_value(self, fields, value): | |
| 172 self._values[fields] = value | |
| 173 | |
| 174 def iteritems(self): | |
| 175 # Make a copy of the metric values in case another thread (or this | |
| 176 # generator's consumer) modifies them while we're iterating. | |
| 177 with self._thread_lock: | |
| 178 values = copy.copy(self._values) | |
| 179 for fields, value in values.iteritems(): | |
| 180 yield fields, value | |
| 181 | |
| 182 | |
| 183 class TargetFieldsValues(object): | |
| 184 def __init__(self, store): | |
| 185 # Map normalized target fields to MetricFieldsValues. | |
| 186 self._values = collections.defaultdict(MetricFieldsValues) | |
| 187 self._store = store | |
| 188 self._thread_lock = threading.Lock() | |
| 189 | |
| 190 def get_target_values(self, target_fields): | |
| 191 key = self._store._normalize_target_fields(target_fields) | |
| 192 return self._values[key] | |
| 193 | |
| 194 def get_value(self, fields, target_fields, default=None): | |
| 195 return self.get_target_values(target_fields).get_value( | |
| 196 fields, default) | |
| 197 | |
| 198 def set_value(self, fields, target_fields, value): | |
| 199 self.get_target_values(target_fields).set_value(fields, value) | |
| 200 | |
| 201 def iter_targets(self): | |
| 202 # Make a copy of the values in case another thread (or this | |
| 203 # generator's consumer) modifies them while we're iterating. | |
| 204 with self._thread_lock: | |
| 205 values = copy.copy(self._values) | |
| 206 for target_fields, fields_values in values.iteritems(): | |
| 207 target = copy.copy(self._store._state.target) | |
| 208 if target_fields: | |
| 209 target.update({k: v for k, v in target_fields}) | |
| 210 yield target, fields_values | |
| 211 | |
| 212 | |
| 213 class MetricValues(object): | |
| 214 def __init__(self, store, start_time): | |
| 215 self._start_time = start_time | |
| 216 self._values = TargetFieldsValues(store) | |
| 217 | |
| 218 @property | |
| 219 def start_time(self): | |
| 220 return self._start_time | |
| 221 | |
| 222 @property | |
| 223 def values(self): | |
| 224 return self._values | |
| 225 | |
| 226 def get_value(self, fields, target_fields, default=None): | |
| 227 return self.values.get_value(fields, target_fields, default) | |
| 228 | |
| 229 def set_value(self, fields, target_fields, value): | |
| 230 self.values.set_value(fields, target_fields, value) | |
| 231 | |
| 232 | |
| 233 class InProcessMetricStore(MetricStore): | |
| 234 """A thread-safe metric store that keeps values in memory.""" | |
| 235 | |
| 236 def __init__(self, state, time_fn=None): | |
| 237 super(InProcessMetricStore, self).__init__(state, time_fn=time_fn) | |
| 238 | |
| 239 self._values = {} | |
| 240 self._thread_lock = threading.Lock() | |
| 241 | |
| 242 def _entry(self, name): | |
| 243 if name not in self._values: | |
| 244 self._reset(name) | |
| 245 | |
| 246 return self._values[name] | |
| 247 | |
| 248 def get(self, name, fields, target_fields, default=None): | |
| 249 return self._entry(name).get_value(fields, target_fields, default) | |
| 250 | |
| 251 def iter_field_values(self, name): | |
| 252 return itertools.chain.from_iterable( | |
| 253 x.iteritems() for _, x in self._entry(name).values.iter_targets()) | |
| 254 | |
| 255 def get_all(self): | |
| 256 # Make a copy of the metric values in case another thread (or this | |
| 257 # generator's consumer) modifies them while we're iterating. | |
| 258 with self._thread_lock: | |
| 259 values = copy.copy(self._values) | |
| 260 | |
| 261 for name, metric_values in values.iteritems(): | |
| 262 if name not in self._state.metrics: | |
| 263 continue | |
| 264 start_time = metric_values.start_time | |
| 265 for target, fields_values in metric_values.values.iter_targets(): | |
| 266 yield target, self._state.metrics[name], start_time, fields_values | |
| 267 | |
| 268 def set(self, name, fields, target_fields, value, enforce_ge=False): | |
| 269 with self._thread_lock: | |
| 270 if enforce_ge: | |
| 271 old_value = self._entry(name).get_value(fields, target_fields, 0) | |
| 272 if value < old_value: | |
| 273 raise errors.MonitoringDecreasingValueError(name, old_value, value) | |
| 274 | |
| 275 self._entry(name).set_value(fields, target_fields, value) | |
| 276 | |
| 277 def incr(self, name, fields, target_fields, delta, modify_fn=None): | |
| 278 if delta < 0: | |
| 279 raise errors.MonitoringDecreasingValueError(name, None, delta) | |
| 280 | |
| 281 if modify_fn is None: | |
| 282 modify_fn = default_modify_fn(name) | |
| 283 | |
| 284 with self._thread_lock: | |
| 285 self._entry(name).set_value(fields, target_fields, modify_fn( | |
| 286 self.get(name, fields, target_fields, 0), delta)) | |
| 287 | |
| 288 def modify_multi(self, modifications): | |
| 289 # This is only used by DeferredMetricStore on top of MemcacheMetricStore, | |
| 290 # but could be implemented here if required in the future. | |
| 291 raise NotImplementedError | |
| 292 | |
| 293 def reset_for_unittest(self, name=None): | |
| 294 if name is not None: | |
| 295 self._reset(name) | |
| 296 else: | |
| 297 for name in self._values.keys(): | |
| 298 self._reset(name) | |
| 299 | |
| 300 def _reset(self, name): | |
| 301 self._values[name] = MetricValues(self, self._start_time(name)) | |
| OLD | NEW |