| OLD | NEW |
| 1 # Copyright 2015 The Chromium Authors. All rights reserved. | 1 # Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 import collections | 5 import collections |
| 6 import copy | 6 import copy |
| 7 import itertools | 7 import itertools |
| 8 import threading | 8 import threading |
| 9 import time | 9 import time |
| 10 | 10 |
| 11 from infra_libs.ts_mon.common import errors | 11 from infra_libs.ts_mon.common import errors |
| 12 | 12 |
| 13 | 13 |
| 14 """A light-weight representation of a set or an incr. | |
| 15 | |
| 16 Args: | |
| 17 name: The metric name. | |
| 18 fields: The normalized field tuple. | |
| 19 mod_type: Either 'set' or 'incr'. Other values will raise | |
| 20 UnknownModificationTypeError when it's used. | |
| 21 args: (value, enforce_ge) for 'set' or (delta, modify_fn) for 'incr'. | |
| 22 """ # pylint: disable=pointless-string-statement | |
| 23 Modification = collections.namedtuple( | |
| 24 'Modification', ['name', 'fields', 'mod_type', 'args']) | |
| 25 | |
| 26 | |
| 27 def default_modify_fn(name): | 14 def default_modify_fn(name): |
| 28 def _modify_fn(value, delta): | 15 def _modify_fn(value, delta): |
| 29 if delta < 0: | 16 if delta < 0: |
| 30 raise errors.MonitoringDecreasingValueError(name, None, delta) | 17 raise errors.MonitoringDecreasingValueError(name, None, delta) |
| 31 return value + delta | 18 return value + delta |
| 32 return _modify_fn | 19 return _modify_fn |
| 33 | 20 |
| 34 | 21 |
| 35 class MetricStore(object): | 22 class MetricStore(object): |
| 36 """A place to store values for each metric. | 23 """A place to store values for each metric. |
| (...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 107 name: the metric's name. | 94 name: the metric's name. |
| 108 fields: a normalized field tuple. | 95 fields: a normalized field tuple. |
| 109 target_fields (dict or None): target fields to override. | 96 target_fields (dict or None): target fields to override. |
| 110 delta: how much to increment the value by. | 97 delta: how much to increment the value by. |
| 111 modify_fn: this function is called with the original value and the delta | 98 modify_fn: this function is called with the original value and the delta |
| 112 as its arguments and is expected to return the new value. The | 99 as its arguments and is expected to return the new value. The |
| 113 function must be idempotent as it may be called multiple times. | 100 function must be idempotent as it may be called multiple times. |
| 114 """ | 101 """ |
| 115 raise NotImplementedError | 102 raise NotImplementedError |
| 116 | 103 |
| 117 def modify_multi(self, modifications): | |
| 118 """Modifies multiple metrics in one go. | |
| 119 | |
| 120 Args: | |
| 121 modifications: an iterable of Modification objects. | |
| 122 """ | |
| 123 raise NotImplementedError | |
| 124 | |
| 125 def reset_for_unittest(self, name=None): | 104 def reset_for_unittest(self, name=None): |
| 126 """Clears the values metrics. Useful in unittests. | 105 """Clears the values metrics. Useful in unittests. |
| 127 | 106 |
| 128 Args: | 107 Args: |
| 129 name: the name of an individual metric to reset, or if None resets all | 108 name: the name of an individual metric to reset, or if None resets all |
| 130 metrics. | 109 metrics. |
| 131 """ | 110 """ |
| 132 raise NotImplementedError | 111 raise NotImplementedError |
| 133 | 112 |
| 134 def initialize_context(self): | |
| 135 """Opens a request-local context for deferring metric updates.""" | |
| 136 pass # pragma: no cover | |
| 137 | |
| 138 def finalize_context(self): | |
| 139 """Closes a request-local context opened by initialize_context.""" | |
| 140 pass # pragma: no cover | |
| 141 | |
| 142 def _start_time(self, name): | 113 def _start_time(self, name): |
| 143 if name in self._state.metrics: | 114 if name in self._state.metrics: |
| 144 ret = self._state.metrics[name].start_time | 115 ret = self._state.metrics[name].start_time |
| 145 if ret is not None: | 116 if ret is not None: |
| 146 return ret | 117 return ret |
| 147 | 118 |
| 148 return self._time_fn() | 119 return self._time_fn() |
| 149 | 120 |
| 150 @staticmethod | |
| 151 def _normalize_target_fields(target_fields): | |
| 152 """Converts target fields into a hashable tuple. | |
| 153 | 121 |
| 154 Args: | 122 class _TargetFieldsValues(object): |
| 155 target_fields (dict): target fields to override the default target. | 123 """Holds all values for a single metric. |
| 156 """ | 124 |
| 125 Values are keyed by metric fields and target fields (which override the |
| 126 default target fields configured globally for the process). |
| 127 """ |
| 128 |
| 129 def __init__(self, start_time): |
| 130 self.start_time = start_time |
| 131 |
| 132 # {normalized_target_fields: {normalized_metric_fields: value}} |
| 133 self._values = collections.defaultdict(dict) |
| 134 |
| 135 def _get_target_values(self, target_fields): |
| 136 # Normalize the target fields by converting them into a hashable tuple. |
| 157 if not target_fields: | 137 if not target_fields: |
| 158 target_fields = {} | 138 target_fields = {} |
| 159 return tuple(sorted(target_fields.iteritems())) | 139 key = tuple(sorted(target_fields.iteritems())) |
| 160 | 140 |
| 161 | |
| 162 class MetricFieldsValues(object): | |
| 163 def __init__(self): | |
| 164 # Map normalized fields to single metric values. | |
| 165 self._values = {} | |
| 166 self._thread_lock = threading.Lock() | |
| 167 | |
| 168 def get_value(self, fields, default=None): | |
| 169 return self._values.get(fields, default) | |
| 170 | |
| 171 def set_value(self, fields, value): | |
| 172 self._values[fields] = value | |
| 173 | |
| 174 def iteritems(self): | |
| 175 # Make a copy of the metric values in case another thread (or this | |
| 176 # generator's consumer) modifies them while we're iterating. | |
| 177 with self._thread_lock: | |
| 178 values = copy.copy(self._values) | |
| 179 for fields, value in values.iteritems(): | |
| 180 yield fields, value | |
| 181 | |
| 182 | |
| 183 class TargetFieldsValues(object): | |
| 184 def __init__(self, store): | |
| 185 # Map normalized target fields to MetricFieldsValues. | |
| 186 self._values = collections.defaultdict(MetricFieldsValues) | |
| 187 self._store = store | |
| 188 self._thread_lock = threading.Lock() | |
| 189 | |
| 190 def get_target_values(self, target_fields): | |
| 191 key = self._store._normalize_target_fields(target_fields) | |
| 192 return self._values[key] | 141 return self._values[key] |
| 193 | 142 |
| 194 def get_value(self, fields, target_fields, default=None): | 143 def get_value(self, fields, target_fields, default=None): |
| 195 return self.get_target_values(target_fields).get_value( | 144 return self._get_target_values(target_fields).get( |
| 196 fields, default) | 145 fields, default) |
| 197 | 146 |
| 198 def set_value(self, fields, target_fields, value): | 147 def set_value(self, fields, target_fields, value): |
| 199 self.get_target_values(target_fields).set_value(fields, value) | 148 self._get_target_values(target_fields)[fields] = value |
| 200 | 149 |
| 201 def iter_targets(self): | 150 def iter_targets(self, default_target): |
| 202 # Make a copy of the values in case another thread (or this | 151 for target_fields, fields_values in self._values.iteritems(): |
| 203 # generator's consumer) modifies them while we're iterating. | |
| 204 with self._thread_lock: | |
| 205 values = copy.copy(self._values) | |
| 206 for target_fields, fields_values in values.iteritems(): | |
| 207 target = copy.copy(self._store._state.target) | |
| 208 if target_fields: | 152 if target_fields: |
| 153 target = copy.copy(default_target) |
| 209 target.update({k: v for k, v in target_fields}) | 154 target.update({k: v for k, v in target_fields}) |
| 155 else: |
| 156 target = default_target |
| 210 yield target, fields_values | 157 yield target, fields_values |
| 211 | 158 |
| 212 | 159 def __deepcopy__(self, memo_dict): |
| 213 class MetricValues(object): | 160 ret = _TargetFieldsValues(self.start_time) |
| 214 def __init__(self, store, start_time): | 161 ret._values = copy.deepcopy(self._values, memo_dict) |
| 215 self._start_time = start_time | 162 return ret |
| 216 self._values = TargetFieldsValues(store) | |
| 217 | |
| 218 @property | |
| 219 def start_time(self): | |
| 220 return self._start_time | |
| 221 | |
| 222 @property | |
| 223 def values(self): | |
| 224 return self._values | |
| 225 | |
| 226 def get_value(self, fields, target_fields, default=None): | |
| 227 return self.values.get_value(fields, target_fields, default) | |
| 228 | |
| 229 def set_value(self, fields, target_fields, value): | |
| 230 self.values.set_value(fields, target_fields, value) | |
| 231 | 163 |
| 232 | 164 |
| 233 class InProcessMetricStore(MetricStore): | 165 class InProcessMetricStore(MetricStore): |
| 234 """A thread-safe metric store that keeps values in memory.""" | 166 """A thread-safe metric store that keeps values in memory.""" |
| 235 | 167 |
| 236 def __init__(self, state, time_fn=None): | 168 def __init__(self, state, time_fn=None): |
| 237 super(InProcessMetricStore, self).__init__(state, time_fn=time_fn) | 169 super(InProcessMetricStore, self).__init__(state, time_fn=time_fn) |
| 238 | 170 |
| 239 self._values = {} | 171 self._values = {} |
| 240 self._thread_lock = threading.Lock() | 172 self._thread_lock = threading.Lock() |
| 241 | 173 |
| 242 def _entry(self, name): | 174 def _entry(self, name): |
| 243 if name not in self._values: | 175 if name not in self._values: |
| 244 self._reset(name) | 176 self._reset(name) |
| 245 | 177 |
| 246 return self._values[name] | 178 return self._values[name] |
| 247 | 179 |
| 248 def get(self, name, fields, target_fields, default=None): | 180 def get(self, name, fields, target_fields, default=None): |
| 249 return self._entry(name).get_value(fields, target_fields, default) | 181 return self._entry(name).get_value(fields, target_fields, default) |
| 250 | 182 |
| 251 def iter_field_values(self, name): | 183 def iter_field_values(self, name): |
| 252 return itertools.chain.from_iterable( | 184 return itertools.chain.from_iterable( |
| 253 x.iteritems() for _, x in self._entry(name).values.iter_targets()) | 185 x.iteritems() for _, x |
| 186 in self._entry(name).iter_targets(self._state.target)) |
| 254 | 187 |
| 255 def get_all(self): | 188 def get_all(self): |
| 256 # Make a copy of the metric values in case another thread (or this | 189 # Make a copy of the metric values in case another thread (or this |
| 257 # generator's consumer) modifies them while we're iterating. | 190 # generator's consumer) modifies them while we're iterating. |
| 258 with self._thread_lock: | 191 with self._thread_lock: |
| 259 values = copy.copy(self._values) | 192 values = copy.deepcopy(self._values) |
| 260 end_time = self._time_fn() | 193 end_time = self._time_fn() |
| 261 | 194 |
| 262 for name, metric_values in values.iteritems(): | 195 for name, metric_values in values.iteritems(): |
| 263 if name not in self._state.metrics: | 196 if name not in self._state.metrics: |
| 264 continue | 197 continue |
| 265 start_time = metric_values.start_time | 198 start_time = metric_values.start_time |
| 266 for target, fields_values in metric_values.values.iter_targets(): | 199 for target, fields_values in metric_values.iter_targets( |
| 200 self._state.target): |
| 267 yield (target, self._state.metrics[name], start_time, end_time, | 201 yield (target, self._state.metrics[name], start_time, end_time, |
| 268 fields_values) | 202 fields_values) |
| 269 | 203 |
| 270 def set(self, name, fields, target_fields, value, enforce_ge=False): | 204 def set(self, name, fields, target_fields, value, enforce_ge=False): |
| 271 with self._thread_lock: | 205 with self._thread_lock: |
| 272 if enforce_ge: | 206 if enforce_ge: |
| 273 old_value = self._entry(name).get_value(fields, target_fields, 0) | 207 old_value = self._entry(name).get_value(fields, target_fields, 0) |
| 274 if value < old_value: | 208 if value < old_value: |
| 275 raise errors.MonitoringDecreasingValueError(name, old_value, value) | 209 raise errors.MonitoringDecreasingValueError(name, old_value, value) |
| 276 | 210 |
| 277 self._entry(name).set_value(fields, target_fields, value) | 211 self._entry(name).set_value(fields, target_fields, value) |
| 278 | 212 |
| 279 def incr(self, name, fields, target_fields, delta, modify_fn=None): | 213 def incr(self, name, fields, target_fields, delta, modify_fn=None): |
| 280 if delta < 0: | 214 if delta < 0: |
| 281 raise errors.MonitoringDecreasingValueError(name, None, delta) | 215 raise errors.MonitoringDecreasingValueError(name, None, delta) |
| 282 | 216 |
| 283 if modify_fn is None: | 217 if modify_fn is None: |
| 284 modify_fn = default_modify_fn(name) | 218 modify_fn = default_modify_fn(name) |
| 285 | 219 |
| 286 with self._thread_lock: | 220 with self._thread_lock: |
| 287 self._entry(name).set_value(fields, target_fields, modify_fn( | 221 self._entry(name).set_value(fields, target_fields, modify_fn( |
| 288 self.get(name, fields, target_fields, 0), delta)) | 222 self.get(name, fields, target_fields, 0), delta)) |
| 289 | 223 |
| 290 def modify_multi(self, modifications): | |
| 291 # This is only used by DeferredMetricStore on top of MemcacheMetricStore, | |
| 292 # but could be implemented here if required in the future. | |
| 293 raise NotImplementedError | |
| 294 | |
| 295 def reset_for_unittest(self, name=None): | 224 def reset_for_unittest(self, name=None): |
| 296 if name is not None: | 225 if name is not None: |
| 297 self._reset(name) | 226 self._reset(name) |
| 298 else: | 227 else: |
| 299 for name in self._values.keys(): | 228 for name in self._values.keys(): |
| 300 self._reset(name) | 229 self._reset(name) |
| 301 | 230 |
| 302 def _reset(self, name): | 231 def _reset(self, name): |
| 303 self._values[name] = MetricValues(self, self._start_time(name)) | 232 self._values[name] = _TargetFieldsValues(self._start_time(name)) |
| OLD | NEW |