OLD | NEW |
1 # Copyright 2015 The Chromium Authors. All rights reserved. | 1 # Copyright 2015 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 import collections | 5 import collections |
6 import logging | 6 import logging |
7 import operator | |
8 import threading | 7 import threading |
9 import time | 8 import time |
10 | 9 |
11 from infra_libs.ts_mon.common import errors | 10 from infra_libs.ts_mon.common import errors |
12 | 11 |
13 | 12 |
14 """A light-weight representation of a set or an incr. | 13 """A light-weight representation of a set or an incr. |
15 | 14 |
16 Args: | 15 Args: |
17 name: The metric name. | 16 name: The metric name. |
18 fields: The normalized field tuple. | 17 fields: The normalized field tuple. |
19 mod_type: Either 'set' or 'incr'. Other values will raise | 18 mod_type: Either 'set' or 'incr'. Other values will raise |
20 UnknownModificationTypeError when it's used. | 19 UnknownModificationTypeError when it's used. |
21 args: (value, enforce_ge) for 'set' or (delta, modify_fn) for 'incr'. | 20 args: (value, enforce_ge) for 'set' or (delta, modify_fn) for 'incr'. |
22 """ # pylint: disable=pointless-string-statement | 21 """ # pylint: disable=pointless-string-statement |
23 Modification = collections.namedtuple( | 22 Modification = collections.namedtuple( |
24 'Modification', ['name', 'fields', 'mod_type', 'args']) | 23 'Modification', ['name', 'fields', 'mod_type', 'args']) |
25 | 24 |
26 | 25 |
27 def combine_modifications(old, new): | 26 def default_modify_fn(name): |
28 """Combines two modifications into one. | 27 def _modify_fn(value, delta): |
29 | 28 if delta < 0: |
30 The returned modification will be the result as if the second modification had | 29 raise errors.MonitoringDecreasingValueError(name, None, delta) |
31 been applied after the first. | 30 return value + delta |
32 """ | 31 return _modify_fn |
33 | |
34 if old is None or new.mod_type == 'set': | |
35 # A 'set' will override any previous value. | |
36 return new | |
37 elif new.mod_type == 'incr': | |
38 # For two 'incr's sum their delta args, for an 'incr' on top of a 'set' add | |
39 # the delta to the set value. | |
40 return Modification( | |
41 old.name, old.fields, old.mod_type, | |
42 (old.args[0] + new.args[0], old.args[1])) | |
43 else: | |
44 raise errors.UnknownModificationTypeError(new.mod_type) | |
45 | 32 |
46 | 33 |
47 class MetricStore(object): | 34 class MetricStore(object): |
48 """A place to store values for each metric. | 35 """A place to store values for each metric. |
49 | 36 |
50 Several methods take "a normalized field tuple". This is a tuple of | 37 Several methods take "a normalized field tuple". This is a tuple of |
51 (key, value) tuples sorted by key. (The reason this is given as a tuple | 38 (key, value) tuples sorted by key. (The reason this is given as a tuple |
52 instead of a dict is because tuples are hashable and can be used as dict keys, | 39 instead of a dict is because tuples are hashable and can be used as dict keys, |
53 dicts can not). | 40 dicts can not). |
54 | 41 |
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
103 value: the new value for the metric. | 90 value: the new value for the metric. |
104 enforce_ge: if this is True, raise an exception if the new value is | 91 enforce_ge: if this is True, raise an exception if the new value is |
105 less than the old value. | 92 less than the old value. |
106 | 93 |
107 Raises: | 94 Raises: |
108 MonitoringDecreasingValueError: if enforce_ge is True and the new value is | 95 MonitoringDecreasingValueError: if enforce_ge is True and the new value is |
109 smaller than the old value. | 96 smaller than the old value. |
110 """ | 97 """ |
111 raise NotImplementedError | 98 raise NotImplementedError |
112 | 99 |
113 def incr(self, name, fields, delta, modify_fn=operator.add): | 100 def incr(self, name, fields, delta, modify_fn=None): |
114 """Increments the metric's value. | 101 """Increments the metric's value. |
115 | 102 |
116 Args: | 103 Args: |
117 name: the metric's name. | 104 name: the metric's name. |
118 fields: a normalized field tuple. | 105 fields: a normalized field tuple. |
119 delta: how much to increment the value by. | 106 delta: how much to increment the value by. |
120 modify_fn: this function is called with the original value and the delta | 107 modify_fn: this function is called with the original value and the delta |
121 as its arguments and is expected to return the new value. The | 108 as its arguments and is expected to return the new value. The |
122 function must be idempotent as it may be called multiple times. | 109 function must be idempotent as it may be called multiple times. |
123 """ | 110 """ |
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
183 | 170 |
184 def set(self, name, fields, value, enforce_ge=False): | 171 def set(self, name, fields, value, enforce_ge=False): |
185 with self._thread_lock: | 172 with self._thread_lock: |
186 if enforce_ge: | 173 if enforce_ge: |
187 old_value = self._entry(name)[1].get(fields, 0) | 174 old_value = self._entry(name)[1].get(fields, 0) |
188 if value < old_value: | 175 if value < old_value: |
189 raise errors.MonitoringDecreasingValueError(name, old_value, value) | 176 raise errors.MonitoringDecreasingValueError(name, old_value, value) |
190 | 177 |
191 self._entry(name)[1][fields] = value | 178 self._entry(name)[1][fields] = value |
192 | 179 |
193 def incr(self, name, fields, delta, modify_fn=operator.add): | 180 def incr(self, name, fields, delta, modify_fn=None): |
194 if delta < 0: | 181 if delta < 0: |
195 raise errors.MonitoringDecreasingValueError(name, None, delta) | 182 raise errors.MonitoringDecreasingValueError(name, None, delta) |
196 | 183 |
| 184 if modify_fn is None: |
| 185 modify_fn = default_modify_fn(name) |
| 186 |
197 with self._thread_lock: | 187 with self._thread_lock: |
198 self._entry(name)[1][fields] = modify_fn(self.get(name, fields, 0), delta) | 188 self._entry(name)[1][fields] = modify_fn(self.get(name, fields, 0), delta) |
199 | 189 |
200 def modify_multi(self, modifications): | 190 def modify_multi(self, modifications): |
201 # This is only used by DeferredMetricStore on top of MemcacheMetricStore, | 191 # This is only used by DeferredMetricStore on top of MemcacheMetricStore, |
202 # but could be implemented here if required in the future. | 192 # but could be implemented here if required in the future. |
203 raise NotImplementedError | 193 raise NotImplementedError |
204 | 194 |
205 def reset_for_unittest(self, name=None): | 195 def reset_for_unittest(self, name=None): |
206 if name is not None: | 196 if name is not None: |
207 self._reset(name) | 197 self._reset(name) |
208 else: | 198 else: |
209 for name in self._values.keys(): | 199 for name in self._values.keys(): |
210 self._reset(name) | 200 self._reset(name) |
211 | 201 |
212 def _reset(self, name): | 202 def _reset(self, name): |
213 self._values[name] = (self._start_time(name), {}) | 203 self._values[name] = (self._start_time(name), {}) |
OLD | NEW |