Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(88)

Side by Side Diff: client/third_party/infra_libs/ts_mon/common/metric_store.py

Issue 2991803002: Update infra_libs to 1.1.15 / 0b44aba87c1c6538439df6d24a409870810747ab (Closed)
Patch Set: fix Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2015 The Chromium Authors. All rights reserved. 1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 import collections 5 import collections
6 import copy 6 import copy
7 import itertools 7 import itertools
8 import threading 8 import threading
9 import time 9 import time
10 10
11 from infra_libs.ts_mon.common import errors 11 from infra_libs.ts_mon.common import errors
12 12
13 13
14 """A light-weight representation of a set or an incr.
15
16 Args:
17 name: The metric name.
18 fields: The normalized field tuple.
19 mod_type: Either 'set' or 'incr'. Other values will raise
20 UnknownModificationTypeError when it's used.
21 args: (value, enforce_ge) for 'set' or (delta, modify_fn) for 'incr'.
22 """ # pylint: disable=pointless-string-statement
23 Modification = collections.namedtuple(
24 'Modification', ['name', 'fields', 'mod_type', 'args'])
25
26
27 def default_modify_fn(name): 14 def default_modify_fn(name):
28 def _modify_fn(value, delta): 15 def _modify_fn(value, delta):
29 if delta < 0: 16 if delta < 0:
30 raise errors.MonitoringDecreasingValueError(name, None, delta) 17 raise errors.MonitoringDecreasingValueError(name, None, delta)
31 return value + delta 18 return value + delta
32 return _modify_fn 19 return _modify_fn
33 20
34 21
35 class MetricStore(object): 22 class MetricStore(object):
36 """A place to store values for each metric. 23 """A place to store values for each metric.
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after
107 name: the metric's name. 94 name: the metric's name.
108 fields: a normalized field tuple. 95 fields: a normalized field tuple.
109 target_fields (dict or None): target fields to override. 96 target_fields (dict or None): target fields to override.
110 delta: how much to increment the value by. 97 delta: how much to increment the value by.
111 modify_fn: this function is called with the original value and the delta 98 modify_fn: this function is called with the original value and the delta
112 as its arguments and is expected to return the new value. The 99 as its arguments and is expected to return the new value. The
113 function must be idempotent as it may be called multiple times. 100 function must be idempotent as it may be called multiple times.
114 """ 101 """
115 raise NotImplementedError 102 raise NotImplementedError
116 103
117 def modify_multi(self, modifications):
118 """Modifies multiple metrics in one go.
119
120 Args:
121 modifications: an iterable of Modification objects.
122 """
123 raise NotImplementedError
124
125 def reset_for_unittest(self, name=None): 104 def reset_for_unittest(self, name=None):
126 """Clears the values metrics. Useful in unittests. 105 """Clears the values metrics. Useful in unittests.
127 106
128 Args: 107 Args:
129 name: the name of an individual metric to reset, or if None resets all 108 name: the name of an individual metric to reset, or if None resets all
130 metrics. 109 metrics.
131 """ 110 """
132 raise NotImplementedError 111 raise NotImplementedError
133 112
134 def initialize_context(self):
135 """Opens a request-local context for deferring metric updates."""
136 pass # pragma: no cover
137
138 def finalize_context(self):
139 """Closes a request-local context opened by initialize_context."""
140 pass # pragma: no cover
141
142 def _start_time(self, name): 113 def _start_time(self, name):
143 if name in self._state.metrics: 114 if name in self._state.metrics:
144 ret = self._state.metrics[name].start_time 115 ret = self._state.metrics[name].start_time
145 if ret is not None: 116 if ret is not None:
146 return ret 117 return ret
147 118
148 return self._time_fn() 119 return self._time_fn()
149 120
150 @staticmethod
151 def _normalize_target_fields(target_fields):
152 """Converts target fields into a hashable tuple.
153 121
154 Args: 122 class _TargetFieldsValues(object):
155 target_fields (dict): target fields to override the default target. 123 """Holds all values for a single metric.
156 """ 124
125 Values are keyed by metric fields and target fields (which override the
126 default target fields configured globally for the process).
127 """
128
129 def __init__(self, start_time):
130 self.start_time = start_time
131
132 # {normalized_target_fields: {normalized_metric_fields: value}}
133 self._values = collections.defaultdict(dict)
134
135 def _get_target_values(self, target_fields):
136 # Normalize the target fields by converting them into a hashable tuple.
157 if not target_fields: 137 if not target_fields:
158 target_fields = {} 138 target_fields = {}
159 return tuple(sorted(target_fields.iteritems())) 139 key = tuple(sorted(target_fields.iteritems()))
160 140
161
162 class MetricFieldsValues(object):
163 def __init__(self):
164 # Map normalized fields to single metric values.
165 self._values = {}
166 self._thread_lock = threading.Lock()
167
168 def get_value(self, fields, default=None):
169 return self._values.get(fields, default)
170
171 def set_value(self, fields, value):
172 self._values[fields] = value
173
174 def iteritems(self):
175 # Make a copy of the metric values in case another thread (or this
176 # generator's consumer) modifies them while we're iterating.
177 with self._thread_lock:
178 values = copy.copy(self._values)
179 for fields, value in values.iteritems():
180 yield fields, value
181
182
183 class TargetFieldsValues(object):
184 def __init__(self, store):
185 # Map normalized target fields to MetricFieldsValues.
186 self._values = collections.defaultdict(MetricFieldsValues)
187 self._store = store
188 self._thread_lock = threading.Lock()
189
190 def get_target_values(self, target_fields):
191 key = self._store._normalize_target_fields(target_fields)
192 return self._values[key] 141 return self._values[key]
193 142
194 def get_value(self, fields, target_fields, default=None): 143 def get_value(self, fields, target_fields, default=None):
195 return self.get_target_values(target_fields).get_value( 144 return self._get_target_values(target_fields).get(
196 fields, default) 145 fields, default)
197 146
198 def set_value(self, fields, target_fields, value): 147 def set_value(self, fields, target_fields, value):
199 self.get_target_values(target_fields).set_value(fields, value) 148 self._get_target_values(target_fields)[fields] = value
200 149
201 def iter_targets(self): 150 def iter_targets(self, default_target):
202 # Make a copy of the values in case another thread (or this 151 for target_fields, fields_values in self._values.iteritems():
203 # generator's consumer) modifies them while we're iterating.
204 with self._thread_lock:
205 values = copy.copy(self._values)
206 for target_fields, fields_values in values.iteritems():
207 target = copy.copy(self._store._state.target)
208 if target_fields: 152 if target_fields:
153 target = copy.copy(default_target)
209 target.update({k: v for k, v in target_fields}) 154 target.update({k: v for k, v in target_fields})
155 else:
156 target = default_target
210 yield target, fields_values 157 yield target, fields_values
211 158
212 159 def __deepcopy__(self, memo_dict):
213 class MetricValues(object): 160 ret = _TargetFieldsValues(self.start_time)
214 def __init__(self, store, start_time): 161 ret._values = copy.deepcopy(self._values, memo_dict)
215 self._start_time = start_time 162 return ret
216 self._values = TargetFieldsValues(store)
217
218 @property
219 def start_time(self):
220 return self._start_time
221
222 @property
223 def values(self):
224 return self._values
225
226 def get_value(self, fields, target_fields, default=None):
227 return self.values.get_value(fields, target_fields, default)
228
229 def set_value(self, fields, target_fields, value):
230 self.values.set_value(fields, target_fields, value)
231 163
232 164
233 class InProcessMetricStore(MetricStore): 165 class InProcessMetricStore(MetricStore):
234 """A thread-safe metric store that keeps values in memory.""" 166 """A thread-safe metric store that keeps values in memory."""
235 167
236 def __init__(self, state, time_fn=None): 168 def __init__(self, state, time_fn=None):
237 super(InProcessMetricStore, self).__init__(state, time_fn=time_fn) 169 super(InProcessMetricStore, self).__init__(state, time_fn=time_fn)
238 170
239 self._values = {} 171 self._values = {}
240 self._thread_lock = threading.Lock() 172 self._thread_lock = threading.Lock()
241 173
242 def _entry(self, name): 174 def _entry(self, name):
243 if name not in self._values: 175 if name not in self._values:
244 self._reset(name) 176 self._reset(name)
245 177
246 return self._values[name] 178 return self._values[name]
247 179
248 def get(self, name, fields, target_fields, default=None): 180 def get(self, name, fields, target_fields, default=None):
249 return self._entry(name).get_value(fields, target_fields, default) 181 return self._entry(name).get_value(fields, target_fields, default)
250 182
251 def iter_field_values(self, name): 183 def iter_field_values(self, name):
252 return itertools.chain.from_iterable( 184 return itertools.chain.from_iterable(
253 x.iteritems() for _, x in self._entry(name).values.iter_targets()) 185 x.iteritems() for _, x
186 in self._entry(name).iter_targets(self._state.target))
254 187
255 def get_all(self): 188 def get_all(self):
256 # Make a copy of the metric values in case another thread (or this 189 # Make a copy of the metric values in case another thread (or this
257 # generator's consumer) modifies them while we're iterating. 190 # generator's consumer) modifies them while we're iterating.
258 with self._thread_lock: 191 with self._thread_lock:
259 values = copy.copy(self._values) 192 values = copy.deepcopy(self._values)
260 end_time = self._time_fn() 193 end_time = self._time_fn()
261 194
262 for name, metric_values in values.iteritems(): 195 for name, metric_values in values.iteritems():
263 if name not in self._state.metrics: 196 if name not in self._state.metrics:
264 continue 197 continue
265 start_time = metric_values.start_time 198 start_time = metric_values.start_time
266 for target, fields_values in metric_values.values.iter_targets(): 199 for target, fields_values in metric_values.iter_targets(
200 self._state.target):
267 yield (target, self._state.metrics[name], start_time, end_time, 201 yield (target, self._state.metrics[name], start_time, end_time,
268 fields_values) 202 fields_values)
269 203
270 def set(self, name, fields, target_fields, value, enforce_ge=False): 204 def set(self, name, fields, target_fields, value, enforce_ge=False):
271 with self._thread_lock: 205 with self._thread_lock:
272 if enforce_ge: 206 if enforce_ge:
273 old_value = self._entry(name).get_value(fields, target_fields, 0) 207 old_value = self._entry(name).get_value(fields, target_fields, 0)
274 if value < old_value: 208 if value < old_value:
275 raise errors.MonitoringDecreasingValueError(name, old_value, value) 209 raise errors.MonitoringDecreasingValueError(name, old_value, value)
276 210
277 self._entry(name).set_value(fields, target_fields, value) 211 self._entry(name).set_value(fields, target_fields, value)
278 212
279 def incr(self, name, fields, target_fields, delta, modify_fn=None): 213 def incr(self, name, fields, target_fields, delta, modify_fn=None):
280 if delta < 0: 214 if delta < 0:
281 raise errors.MonitoringDecreasingValueError(name, None, delta) 215 raise errors.MonitoringDecreasingValueError(name, None, delta)
282 216
283 if modify_fn is None: 217 if modify_fn is None:
284 modify_fn = default_modify_fn(name) 218 modify_fn = default_modify_fn(name)
285 219
286 with self._thread_lock: 220 with self._thread_lock:
287 self._entry(name).set_value(fields, target_fields, modify_fn( 221 self._entry(name).set_value(fields, target_fields, modify_fn(
288 self.get(name, fields, target_fields, 0), delta)) 222 self.get(name, fields, target_fields, 0), delta))
289 223
290 def modify_multi(self, modifications):
291 # This is only used by DeferredMetricStore on top of MemcacheMetricStore,
292 # but could be implemented here if required in the future.
293 raise NotImplementedError
294
295 def reset_for_unittest(self, name=None): 224 def reset_for_unittest(self, name=None):
296 if name is not None: 225 if name is not None:
297 self._reset(name) 226 self._reset(name)
298 else: 227 else:
299 for name in self._values.keys(): 228 for name in self._values.keys():
300 self._reset(name) 229 self._reset(name)
301 230
302 def _reset(self, name): 231 def _reset(self, name):
303 self._values[name] = MetricValues(self, self._start_time(name)) 232 self._values[name] = _TargetFieldsValues(self._start_time(name))
OLDNEW
« no previous file with comments | « client/third_party/infra_libs/ts_mon/common/interface.py ('k') | client/third_party/infra_libs/ts_mon/common/metrics.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698