OLD | NEW |
| (Empty) |
1 # Copyright 2015 The Chromium Authors. All rights reserved. | |
2 # Use of this source code is governed by a BSD-style license that can be | |
3 # found in the LICENSE file. | |
4 | |
5 import collections | |
6 import copy | |
7 import itertools | |
8 import threading | |
9 import time | |
10 | |
11 from infra_libs.ts_mon.common import errors | |
12 | |
13 | |
14 """A light-weight representation of a set or an incr. | |
15 | |
16 Args: | |
17 name: The metric name. | |
18 fields: The normalized field tuple. | |
19 mod_type: Either 'set' or 'incr'. Other values will raise | |
20 UnknownModificationTypeError when it's used. | |
21 args: (value, enforce_ge) for 'set' or (delta, modify_fn) for 'incr'. | |
22 """ # pylint: disable=pointless-string-statement | |
23 Modification = collections.namedtuple( | |
24 'Modification', ['name', 'fields', 'mod_type', 'args']) | |
25 | |
26 | |
27 def default_modify_fn(name): | |
28 def _modify_fn(value, delta): | |
29 if delta < 0: | |
30 raise errors.MonitoringDecreasingValueError(name, None, delta) | |
31 return value + delta | |
32 return _modify_fn | |
33 | |
34 | |
35 class MetricStore(object): | |
36 """A place to store values for each metric. | |
37 | |
38 Several methods take "a normalized field tuple". This is a tuple of | |
39 (key, value) tuples sorted by key. (The reason this is given as a tuple | |
40 instead of a dict is because tuples are hashable and can be used as dict keys, | |
41 dicts can not). | |
42 | |
43 The MetricStore is also responsible for keeping the start_time of each metric. | |
44 This is what goes into the start_timestamp_us field in the MetricsData proto | |
45 for cumulative metrics and distributions, and helps Monarch identify when a | |
46 counter was reset. This is the MetricStore's job because an implementation | |
47 might share counter values across multiple instances of a task (like on | |
48 Appengine), so the start time must be associated with that value so that it | |
49 can be reset for all tasks at once when the value is reset. | |
50 | |
51 External metric stores (like those backed by memcache) may be cleared (either | |
52 wholly or partially) at any time. When this happens the MetricStore *must* | |
53 generate a new start_time for all the affected metrics. | |
54 | |
55 Metrics can specify their own explicit start time if they are mirroring the | |
56 value of some external counter that started counting at a known time. | |
57 | |
58 Otherwise the MetricStore's time_fn (defaults to time.time()) is called the | |
59 first time a metric is set or incremented, or after it is cleared externally. | |
60 """ | |
61 | |
62 def __init__(self, state, time_fn=None): | |
63 self._state = state | |
64 self._time_fn = time_fn or time.time | |
65 | |
66 def get(self, name, fields, target_fields, default=None): | |
67 """Fetches the current value for the metric. | |
68 | |
69 Args: | |
70 name (string): the metric's name. | |
71 fields (tuple): a normalized field tuple. | |
72 target_fields (dict or None): target fields to override. | |
73 default: the value to return if the metric has no value of this set of | |
74 field values. | |
75 """ | |
76 raise NotImplementedError | |
77 | |
78 def get_all(self): | |
79 """Returns an iterator over all the metrics present in the store. | |
80 | |
81 The iterator yields 4-tuples: | |
82 (target, metric, start_time, field_values) | |
83 """ | |
84 raise NotImplementedError | |
85 | |
86 def set(self, name, fields, target_fields, value, enforce_ge=False): | |
87 """Sets the metric's value. | |
88 | |
89 Args: | |
90 name: the metric's name. | |
91 fields: a normalized field tuple. | |
92 target_fields (dict or None): target fields to override. | |
93 value: the new value for the metric. | |
94 enforce_ge: if this is True, raise an exception if the new value is | |
95 less than the old value. | |
96 | |
97 Raises: | |
98 MonitoringDecreasingValueError: if enforce_ge is True and the new value is | |
99 smaller than the old value. | |
100 """ | |
101 raise NotImplementedError | |
102 | |
103 def incr(self, name, fields, target_fields, delta, modify_fn=None): | |
104 """Increments the metric's value. | |
105 | |
106 Args: | |
107 name: the metric's name. | |
108 fields: a normalized field tuple. | |
109 target_fields (dict or None): target fields to override. | |
110 delta: how much to increment the value by. | |
111 modify_fn: this function is called with the original value and the delta | |
112 as its arguments and is expected to return the new value. The | |
113 function must be idempotent as it may be called multiple times. | |
114 """ | |
115 raise NotImplementedError | |
116 | |
117 def modify_multi(self, modifications): | |
118 """Modifies multiple metrics in one go. | |
119 | |
120 Args: | |
121 modifications: an iterable of Modification objects. | |
122 """ | |
123 raise NotImplementedError | |
124 | |
125 def reset_for_unittest(self, name=None): | |
126 """Clears the values metrics. Useful in unittests. | |
127 | |
128 Args: | |
129 name: the name of an individual metric to reset, or if None resets all | |
130 metrics. | |
131 """ | |
132 raise NotImplementedError | |
133 | |
134 def initialize_context(self): | |
135 """Opens a request-local context for deferring metric updates.""" | |
136 pass # pragma: no cover | |
137 | |
138 def finalize_context(self): | |
139 """Closes a request-local context opened by initialize_context.""" | |
140 pass # pragma: no cover | |
141 | |
142 def _start_time(self, name): | |
143 if name in self._state.metrics: | |
144 ret = self._state.metrics[name].start_time | |
145 if ret is not None: | |
146 return ret | |
147 | |
148 return self._time_fn() | |
149 | |
150 @staticmethod | |
151 def _normalize_target_fields(target_fields): | |
152 """Converts target fields into a hashable tuple. | |
153 | |
154 Args: | |
155 target_fields (dict): target fields to override the default target. | |
156 """ | |
157 if not target_fields: | |
158 target_fields = {} | |
159 return tuple(sorted(target_fields.iteritems())) | |
160 | |
161 | |
162 class MetricFieldsValues(object): | |
163 def __init__(self): | |
164 # Map normalized fields to single metric values. | |
165 self._values = {} | |
166 self._thread_lock = threading.Lock() | |
167 | |
168 def get_value(self, fields, default=None): | |
169 return self._values.get(fields, default) | |
170 | |
171 def set_value(self, fields, value): | |
172 self._values[fields] = value | |
173 | |
174 def iteritems(self): | |
175 # Make a copy of the metric values in case another thread (or this | |
176 # generator's consumer) modifies them while we're iterating. | |
177 with self._thread_lock: | |
178 values = copy.copy(self._values) | |
179 for fields, value in values.iteritems(): | |
180 yield fields, value | |
181 | |
182 | |
183 class TargetFieldsValues(object): | |
184 def __init__(self, store): | |
185 # Map normalized target fields to MetricFieldsValues. | |
186 self._values = collections.defaultdict(MetricFieldsValues) | |
187 self._store = store | |
188 self._thread_lock = threading.Lock() | |
189 | |
190 def get_target_values(self, target_fields): | |
191 key = self._store._normalize_target_fields(target_fields) | |
192 return self._values[key] | |
193 | |
194 def get_value(self, fields, target_fields, default=None): | |
195 return self.get_target_values(target_fields).get_value( | |
196 fields, default) | |
197 | |
198 def set_value(self, fields, target_fields, value): | |
199 self.get_target_values(target_fields).set_value(fields, value) | |
200 | |
201 def iter_targets(self): | |
202 # Make a copy of the values in case another thread (or this | |
203 # generator's consumer) modifies them while we're iterating. | |
204 with self._thread_lock: | |
205 values = copy.copy(self._values) | |
206 for target_fields, fields_values in values.iteritems(): | |
207 target = copy.copy(self._store._state.target) | |
208 if target_fields: | |
209 target.update({k: v for k, v in target_fields}) | |
210 yield target, fields_values | |
211 | |
212 | |
213 class MetricValues(object): | |
214 def __init__(self, store, start_time): | |
215 self._start_time = start_time | |
216 self._values = TargetFieldsValues(store) | |
217 | |
218 @property | |
219 def start_time(self): | |
220 return self._start_time | |
221 | |
222 @property | |
223 def values(self): | |
224 return self._values | |
225 | |
226 def get_value(self, fields, target_fields, default=None): | |
227 return self.values.get_value(fields, target_fields, default) | |
228 | |
229 def set_value(self, fields, target_fields, value): | |
230 self.values.set_value(fields, target_fields, value) | |
231 | |
232 | |
233 class InProcessMetricStore(MetricStore): | |
234 """A thread-safe metric store that keeps values in memory.""" | |
235 | |
236 def __init__(self, state, time_fn=None): | |
237 super(InProcessMetricStore, self).__init__(state, time_fn=time_fn) | |
238 | |
239 self._values = {} | |
240 self._thread_lock = threading.Lock() | |
241 | |
242 def _entry(self, name): | |
243 if name not in self._values: | |
244 self._reset(name) | |
245 | |
246 return self._values[name] | |
247 | |
248 def get(self, name, fields, target_fields, default=None): | |
249 return self._entry(name).get_value(fields, target_fields, default) | |
250 | |
251 def iter_field_values(self, name): | |
252 return itertools.chain.from_iterable( | |
253 x.iteritems() for _, x in self._entry(name).values.iter_targets()) | |
254 | |
255 def get_all(self): | |
256 # Make a copy of the metric values in case another thread (or this | |
257 # generator's consumer) modifies them while we're iterating. | |
258 with self._thread_lock: | |
259 values = copy.copy(self._values) | |
260 | |
261 for name, metric_values in values.iteritems(): | |
262 if name not in self._state.metrics: | |
263 continue | |
264 start_time = metric_values.start_time | |
265 for target, fields_values in metric_values.values.iter_targets(): | |
266 yield target, self._state.metrics[name], start_time, fields_values | |
267 | |
268 def set(self, name, fields, target_fields, value, enforce_ge=False): | |
269 with self._thread_lock: | |
270 if enforce_ge: | |
271 old_value = self._entry(name).get_value(fields, target_fields, 0) | |
272 if value < old_value: | |
273 raise errors.MonitoringDecreasingValueError(name, old_value, value) | |
274 | |
275 self._entry(name).set_value(fields, target_fields, value) | |
276 | |
277 def incr(self, name, fields, target_fields, delta, modify_fn=None): | |
278 if delta < 0: | |
279 raise errors.MonitoringDecreasingValueError(name, None, delta) | |
280 | |
281 if modify_fn is None: | |
282 modify_fn = default_modify_fn(name) | |
283 | |
284 with self._thread_lock: | |
285 self._entry(name).set_value(fields, target_fields, modify_fn( | |
286 self.get(name, fields, target_fields, 0), delta)) | |
287 | |
288 def modify_multi(self, modifications): | |
289 # This is only used by DeferredMetricStore on top of MemcacheMetricStore, | |
290 # but could be implemented here if required in the future. | |
291 raise NotImplementedError | |
292 | |
293 def reset_for_unittest(self, name=None): | |
294 if name is not None: | |
295 self._reset(name) | |
296 else: | |
297 for name in self._values.keys(): | |
298 self._reset(name) | |
299 | |
300 def _reset(self, name): | |
301 self._values[name] = MetricValues(self, self._start_time(name)) | |
OLD | NEW |