Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(376)

Side by Side Diff: appengine_module/gae_ts_mon/memcache_metric_store.py

Issue 1531573003: Handle multiple modifications to distribution metrics correctly. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2015 The Chromium Authors. All rights reserved. 1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 import collections 5 import collections
6 import copy 6 import copy
7 import functools 7 import functools
8 import logging 8 import logging
9 import operator 9 import operator
10 import random 10 import random
(...skipping 163 matching lines...) Expand 10 before | Expand all | Expand 10 after
174 if key in shard_map: 174 if key in shard_map:
175 # This row is one shard of a sharded metric - put the shard number 175 # This row is one shard of a sharded metric - put the shard number
176 # in the task number. 176 # in the task number.
177 target.task_num, metric_name = shard_map[key] 177 target.task_num, metric_name = shard_map[key]
178 else: 178 else:
179 target.task_num, metric_name = 0, key 179 target.task_num, metric_name = 0, key
180 180
181 yield (copy.copy(target), entities[metric_name].metric, start_time, 181 yield (copy.copy(target), entities[metric_name].metric, start_time,
182 fields_values) 182 fields_values)
183 183
184 def _apply_all(self, callables, arg): 184 def _apply_all(self, modifications, entry):
185 ret = arg 185 """Applies all the modifications, in order, to a memcache entry.
186 for fn in callables:
187 ret = fn(ret)
188 return ret
189 186
190 def _compare_and_set(self, metric_modify_fns, namespace): 187 All modifications must be for the same metric."""
188
189 assert len(modifications) > 0
Sergey Berezin 2015/12/17 22:28:50 While useful for debugging, I'm generally afraid o
dsansome 2015/12/18 00:54:33 Done.
190
191 # All modifications should be for the same metric.
192 name = modifications[0].name
193
194 if entry is None:
195 entry = (self._start_time(name), {})
196
197 _, targets = entry
198
199 target_key = self._target_key()
200 if target_key not in targets:
201 targets[target_key] = {}
202 values = targets[target_key]
Sergey Berezin 2015/12/17 22:28:50 nit: values = targets.setdefault(target_key, {})
dsansome 2015/12/18 00:54:33 Done.
203
204 for mod in modifications:
205 value = values.get(mod.fields, 0)
206
207 if mod.mod_type == 'set':
208 new_value, enforce_ge = mod.args
209 if enforce_ge and new_value < value:
210 raise errors.MonitoringDecreasingValueError(name, value, new_value)
211 value = new_value
212 elif mod.mod_type == 'incr':
213 delta, modify_fn = mod.args
214 if delta < 0:
Sergey Berezin 2015/12/17 22:28:50 Technically, you can add a negative number to a di
dsansome 2015/12/18 00:54:33 Done.
215 raise errors.MonitoringDecreasingValueError(name, None, delta)
216 value = modify_fn(value, delta)
217 else:
218 raise errors.UnknownModificationTypeError(mod.mod_type)
219
220 values[mod.fields] = value
221
222 return entry
223
224 def _compare_and_set(self, modifications, namespace):
191 client = self._client() 225 client = self._client()
192 226
193 # Metrics that we haven't updated yet. Metrics are removed from this dict 227 # Metrics that we haven't updated yet. Metrics are removed from this dict
194 # when they're successfully updated - if there are any left they will be 228 # when they're successfully updated - if there are any left they will be
195 # retried 10 times until everything has been updated. 229 # retried 10 times until everything has been updated.
196 # We might have more than one modify_fn for a metric if different field 230 # We might have more than one modification for a metric if different field
197 # values were updated. 231 # values were updated.
198 metrics = {}
199 remaining = collections.defaultdict(list) 232 remaining = collections.defaultdict(list)
200 for metric, modify_fns in metric_modify_fns: 233 for modification in modifications:
201 metrics[metric.name] = metric 234 remaining[modification.name].append(modification)
202 remaining[metric.name].append(modify_fns)
203 235
204 failed_keys_count = 0 236 failed_keys_count = 0
205 237
206 for _ in xrange(self.CAS_RETRIES): 238 for _ in xrange(self.CAS_RETRIES):
207 keys = [] 239 keys = []
208 key_map = {} # key -> metric name (for sharded metrics) 240 key_map = {} # key -> metric name (for sharded metrics)
209 241
210 for name in remaining: 242 for name in remaining:
211 # Pick one of the shards to modify. 243 # Pick one of the shards to modify.
212 key = self._random_shard(metrics[name]) 244 key = self._random_shard(self._state.metrics[name])
213 keys.append(key) 245 keys.append(key)
214 key_map[key] = name 246 key_map[key] = name
215 247
216 # Get all existing entries. 248 # Get all existing entries.
217 cas_mapping = {} # key -> new value (for existing entries) 249 cas_mapping = {} # key -> new value (for existing entries)
218 add_mapping = {} # key -> new value (for new entries) 250 add_mapping = {} # key -> new value (for new entries)
219 251
220 entries = client.get_multi(keys, for_cas=True, namespace=namespace) 252 entries = client.get_multi(keys, for_cas=True, namespace=namespace)
221 for key, entry in entries.iteritems(): 253 for key, entry in entries.iteritems():
222 cas_mapping[key] = self._apply_all(remaining[key_map[key]], entry) 254 cas_mapping[key] = self._apply_all(remaining[key_map[key]], entry)
(...skipping 23 matching lines...) Expand all
246 remaining = still_remaining 278 remaining = still_remaining
247 else: 279 else:
248 logging.warning( 280 logging.warning(
249 'Memcache compare-and-set failed %d times for keys %s in ' 281 'Memcache compare-and-set failed %d times for keys %s in '
250 'namespace %s', 282 'namespace %s',
251 self.CAS_RETRIES, remaining.keys(), namespace) 283 self.CAS_RETRIES, remaining.keys(), namespace)
252 284
253 # Update the cas_failures metric with the number of failed keys, but don't 285 # Update the cas_failures metric with the number of failed keys, but don't
254 # do so recursively. 286 # do so recursively.
255 if (failed_keys_count and 287 if (failed_keys_count and
256 any(metric.name != cas_failures.name 288 any(modification.name != cas_failures.name
257 for metric, _ in metric_modify_fns)): 289 for modification in modifications)):
258 cas_failures.increment_by(failed_keys_count) 290 cas_failures.increment_by(failed_keys_count)
259 291
260 def _create_modify_metric_fn(self, name, fields, modify_value_fn, delta):
261 """Returns a function that modifies a memcache row value.
262
263 Calls modify_value_fn(old_value, delta) and puts the result back in the
264 memcache row.
265 """
266
267 def modify_fn(entry):
268 if entry is None:
269 entry = (self._start_time(name), {})
270
271 _, targets = entry
272
273 target_key = self._target_key()
274 if target_key not in targets:
275 targets[target_key] = {}
276 values = targets[target_key]
277
278 values[fields] = modify_value_fn(values.get(fields, 0), delta)
279
280 return entry
281 return modify_fn
282
283 def _compare_and_set_metrics(self, modifications): 292 def _compare_and_set_metrics(self, modifications):
284 if any(name in self.METRIC_NAMES_EXCLUDED_FROM_INDEX 293 if any(mod.name in self.METRIC_NAMES_EXCLUDED_FROM_INDEX
285 for name, fields, modify_value_fn, delta in modifications): 294 for mod in modifications):
286 raise errors.MonitoringError('Metric is magical, can\'t set it') 295 raise errors.MonitoringError('Metric is magical, can\'t set it')
287 296
288 metric_modify_fns = [ 297 self._compare_and_set(modifications, self._namespace_for_job())
289 (self._state.metrics[name],
290 self._create_modify_metric_fn(name, fields, modify_value_fn, delta))
291 for name, fields, modify_value_fn, delta in modifications]
292 self._compare_and_set(metric_modify_fns, self._namespace_for_job())
293
294 def _create_set_value_fn(self, name, value, enforce_ge):
295 def modify_fn(old_value, _delta):
296 if enforce_ge and old_value is not None and value < old_value:
297 raise errors.MonitoringDecreasingValueError(name, old_value, value)
298 return value
299 return modify_fn
300 298
301 def set(self, name, fields, value, enforce_ge=False): 299 def set(self, name, fields, value, enforce_ge=False):
302 modify_fn = self._create_set_value_fn(name, value, enforce_ge) 300 self._compare_and_set_metrics([metric_store.Modification(
303 self._compare_and_set_metrics([(name, fields, modify_fn, None)]) 301 name, fields, 'set', (value, enforce_ge))])
304 302
305 def incr(self, name, fields, delta, modify_fn=operator.add): 303 def incr(self, name, fields, delta, modify_fn=operator.add):
306 if delta < 0: 304 self._compare_and_set_metrics([metric_store.Modification(
307 raise errors.MonitoringDecreasingValueError(name, None, delta) 305 name, fields, 'incr', (delta, modify_fn))])
308 self._compare_and_set_metrics([(name, fields, modify_fn, delta)])
309 306
310 def modify_multi(self, modifications): 307 def modify_multi(self, modifications):
311 mods = [] 308 self._compare_and_set_metrics(modifications)
312 for mod in modifications:
313 if mod.mod_type == 'set':
314 value, enforce_ge = mod.args
315 modify_fn = self._create_set_value_fn(mod.name, value, enforce_ge)
316 delta = None
317 elif mod.mod_type == 'incr':
318 delta, modify_fn = mod.args
319 if delta < 0:
320 raise errors.MonitoringDecreasingValueError(mod.name, None, delta)
321 else:
322 raise errors.UnknownModificationTypeError(mod.mod_type)
323
324 mods.append((mod.name, mod.fields, modify_fn, delta))
325
326 self._compare_and_set_metrics(mods)
327 309
328 def reset_for_unittest(self, name=None): 310 def reset_for_unittest(self, name=None):
329 if name is None: 311 if name is None:
330 self._client().delete_multi(self._state.metrics.keys(), 312 self._client().delete_multi(self._state.metrics.keys(),
331 namespace=self._namespace_for_job()) 313 namespace=self._namespace_for_job())
332 else: 314 else:
333 self._client().delete(name, namespace=self._namespace_for_job()) 315 self._client().delete(name, namespace=self._namespace_for_job())
OLDNEW
« no previous file with comments | « appengine_module/gae_ts_mon/deferred_metric_store.py ('k') | appengine_module/gae_ts_mon/test/deferred_metric_store_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698