Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(746)

Side by Side Diff: infra_libs/ts_mon/common/metric_store.py

Issue 2213143002: Add infra_libs as a bootstrap dependency. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Removed the ugly import hack Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 import collections
6 import copy
7 import itertools
8 import threading
9 import time
10
11 from infra_libs.ts_mon.common import errors
12
13
14 """A light-weight representation of a set or an incr.
15
16 Args:
17 name: The metric name.
18 fields: The normalized field tuple.
19 mod_type: Either 'set' or 'incr'. Other values will raise
20 UnknownModificationTypeError when it's used.
21 args: (value, enforce_ge) for 'set' or (delta, modify_fn) for 'incr'.
22 """ # pylint: disable=pointless-string-statement
23 Modification = collections.namedtuple(
24 'Modification', ['name', 'fields', 'mod_type', 'args'])
25
26
27 def default_modify_fn(name):
28 def _modify_fn(value, delta):
29 if delta < 0:
30 raise errors.MonitoringDecreasingValueError(name, None, delta)
31 return value + delta
32 return _modify_fn
33
34
35 class MetricStore(object):
36 """A place to store values for each metric.
37
38 Several methods take "a normalized field tuple". This is a tuple of
39 (key, value) tuples sorted by key. (The reason this is given as a tuple
40 instead of a dict is because tuples are hashable and can be used as dict keys,
41 dicts can not).
42
43 The MetricStore is also responsible for keeping the start_time of each metric.
44 This is what goes into the start_timestamp_us field in the MetricsData proto
45 for cumulative metrics and distributions, and helps Monarch identify when a
46 counter was reset. This is the MetricStore's job because an implementation
47 might share counter values across multiple instances of a task (like on
48 Appengine), so the start time must be associated with that value so that it
49 can be reset for all tasks at once when the value is reset.
50
51 External metric stores (like those backed by memcache) may be cleared (either
52 wholly or partially) at any time. When this happens the MetricStore *must*
53 generate a new start_time for all the affected metrics.
54
55 Metrics can specify their own explicit start time if they are mirroring the
56 value of some external counter that started counting at a known time.
57
58 Otherwise the MetricStore's time_fn (defaults to time.time()) is called the
59 first time a metric is set or incremented, or after it is cleared externally.
60 """
61
62 def __init__(self, state, time_fn=None):
63 self._state = state
64 self._time_fn = time_fn or time.time
65
66 def get(self, name, fields, target_fields, default=None):
67 """Fetches the current value for the metric.
68
69 Args:
70 name (string): the metric's name.
71 fields (tuple): a normalized field tuple.
72 target_fields (dict or None): target fields to override.
73 default: the value to return if the metric has no value of this set of
74 field values.
75 """
76 raise NotImplementedError
77
78 def get_all(self):
79 """Returns an iterator over all the metrics present in the store.
80
81 The iterator yields 4-tuples:
82 (target, metric, start_time, field_values)
83 """
84 raise NotImplementedError
85
86 def set(self, name, fields, target_fields, value, enforce_ge=False):
87 """Sets the metric's value.
88
89 Args:
90 name: the metric's name.
91 fields: a normalized field tuple.
92 target_fields (dict or None): target fields to override.
93 value: the new value for the metric.
94 enforce_ge: if this is True, raise an exception if the new value is
95 less than the old value.
96
97 Raises:
98 MonitoringDecreasingValueError: if enforce_ge is True and the new value is
99 smaller than the old value.
100 """
101 raise NotImplementedError
102
103 def incr(self, name, fields, target_fields, delta, modify_fn=None):
104 """Increments the metric's value.
105
106 Args:
107 name: the metric's name.
108 fields: a normalized field tuple.
109 target_fields (dict or None): target fields to override.
110 delta: how much to increment the value by.
111 modify_fn: this function is called with the original value and the delta
112 as its arguments and is expected to return the new value. The
113 function must be idempotent as it may be called multiple times.
114 """
115 raise NotImplementedError
116
117 def modify_multi(self, modifications):
118 """Modifies multiple metrics in one go.
119
120 Args:
121 modifications: an iterable of Modification objects.
122 """
123 raise NotImplementedError
124
125 def reset_for_unittest(self, name=None):
126 """Clears the values metrics. Useful in unittests.
127
128 Args:
129 name: the name of an individual metric to reset, or if None resets all
130 metrics.
131 """
132 raise NotImplementedError
133
134 def initialize_context(self):
135 """Opens a request-local context for deferring metric updates."""
136 pass # pragma: no cover
137
138 def finalize_context(self):
139 """Closes a request-local context opened by initialize_context."""
140 pass # pragma: no cover
141
142 def _start_time(self, name):
143 if name in self._state.metrics:
144 ret = self._state.metrics[name].start_time
145 if ret is not None:
146 return ret
147
148 return self._time_fn()
149
150 @staticmethod
151 def _normalize_target_fields(target_fields):
152 """Converts target fields into a hashable tuple.
153
154 Args:
155 target_fields (dict): target fields to override the default target.
156 """
157 if not target_fields:
158 target_fields = {}
159 return tuple(sorted(target_fields.iteritems()))
160
161
162 class MetricFieldsValues(object):
163 def __init__(self):
164 # Map normalized fields to single metric values.
165 self._values = {}
166 self._thread_lock = threading.Lock()
167
168 def get_value(self, fields, default=None):
169 return self._values.get(fields, default)
170
171 def set_value(self, fields, value):
172 self._values[fields] = value
173
174 def iteritems(self):
175 # Make a copy of the metric values in case another thread (or this
176 # generator's consumer) modifies them while we're iterating.
177 with self._thread_lock:
178 values = copy.copy(self._values)
179 for fields, value in values.iteritems():
180 yield fields, value
181
182
183 class TargetFieldsValues(object):
184 def __init__(self, store):
185 # Map normalized target fields to MetricFieldsValues.
186 self._values = collections.defaultdict(MetricFieldsValues)
187 self._store = store
188 self._thread_lock = threading.Lock()
189
190 def get_target_values(self, target_fields):
191 key = self._store._normalize_target_fields(target_fields)
192 return self._values[key]
193
194 def get_value(self, fields, target_fields, default=None):
195 return self.get_target_values(target_fields).get_value(
196 fields, default)
197
198 def set_value(self, fields, target_fields, value):
199 self.get_target_values(target_fields).set_value(fields, value)
200
201 def iter_targets(self):
202 # Make a copy of the values in case another thread (or this
203 # generator's consumer) modifies them while we're iterating.
204 with self._thread_lock:
205 values = copy.copy(self._values)
206 for target_fields, fields_values in values.iteritems():
207 target = copy.copy(self._store._state.target)
208 if target_fields:
209 target.update({k: v for k, v in target_fields})
210 yield target, fields_values
211
212
213 class MetricValues(object):
214 def __init__(self, store, start_time):
215 self._start_time = start_time
216 self._values = TargetFieldsValues(store)
217
218 @property
219 def start_time(self):
220 return self._start_time
221
222 @property
223 def values(self):
224 return self._values
225
226 def get_value(self, fields, target_fields, default=None):
227 return self.values.get_value(fields, target_fields, default)
228
229 def set_value(self, fields, target_fields, value):
230 self.values.set_value(fields, target_fields, value)
231
232
233 class InProcessMetricStore(MetricStore):
234 """A thread-safe metric store that keeps values in memory."""
235
236 def __init__(self, state, time_fn=None):
237 super(InProcessMetricStore, self).__init__(state, time_fn=time_fn)
238
239 self._values = {}
240 self._thread_lock = threading.Lock()
241
242 def _entry(self, name):
243 if name not in self._values:
244 self._reset(name)
245
246 return self._values[name]
247
248 def get(self, name, fields, target_fields, default=None):
249 return self._entry(name).get_value(fields, target_fields, default)
250
251 def iter_field_values(self, name):
252 return itertools.chain.from_iterable(
253 x.iteritems() for _, x in self._entry(name).values.iter_targets())
254
255 def get_all(self):
256 # Make a copy of the metric values in case another thread (or this
257 # generator's consumer) modifies them while we're iterating.
258 with self._thread_lock:
259 values = copy.copy(self._values)
260
261 for name, metric_values in values.iteritems():
262 if name not in self._state.metrics:
263 continue
264 start_time = metric_values.start_time
265 for target, fields_values in metric_values.values.iter_targets():
266 yield target, self._state.metrics[name], start_time, fields_values
267
268 def set(self, name, fields, target_fields, value, enforce_ge=False):
269 with self._thread_lock:
270 if enforce_ge:
271 old_value = self._entry(name).get_value(fields, target_fields, 0)
272 if value < old_value:
273 raise errors.MonitoringDecreasingValueError(name, old_value, value)
274
275 self._entry(name).set_value(fields, target_fields, value)
276
277 def incr(self, name, fields, target_fields, delta, modify_fn=None):
278 if delta < 0:
279 raise errors.MonitoringDecreasingValueError(name, None, delta)
280
281 if modify_fn is None:
282 modify_fn = default_modify_fn(name)
283
284 with self._thread_lock:
285 self._entry(name).set_value(fields, target_fields, modify_fn(
286 self.get(name, fields, target_fields, 0), delta))
287
288 def modify_multi(self, modifications):
289 # This is only used by DeferredMetricStore on top of MemcacheMetricStore,
290 # but could be implemented here if required in the future.
291 raise NotImplementedError
292
293 def reset_for_unittest(self, name=None):
294 if name is not None:
295 self._reset(name)
296 else:
297 for name in self._values.keys():
298 self._reset(name)
299
300 def _reset(self, name):
301 self._values[name] = MetricValues(self, self._start_time(name))
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698