Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(640)

Side by Side Diff: appengine_module/gae_ts_mon/deferred_metric_store.py

Issue 1531573003: Handle multiple modifications to distribution metrics correctly. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2015 The Chromium Authors. All rights reserved. 1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 import logging 5 import logging
6 import operator 6 import operator
7 import threading 7 import threading
8 8
9 from infra_libs.ts_mon.common import metric_store 9 from infra_libs.ts_mon.common import metric_store
10 10
11 11
12 class FinalizeWithoutInitializeError(Exception): 12 class FinalizeWithoutInitializeError(Exception):
13 def __init__(self): 13 def __init__(self):
14 super(FinalizeWithoutInitializeError, self).__init__( 14 super(FinalizeWithoutInitializeError, self).__init__(
15 'finalize_context called before initialize_context in a thread') 15 'finalize_context called before initialize_context in a thread')
16 16
17 17
18 class DeferredMetricStore(metric_store.MetricStore): 18 class DeferredMetricStore(metric_store.MetricStore):
19 def __init__(self, state, base_store, time_fn=None): 19 def __init__(self, state, base_store, time_fn=None):
20 super(DeferredMetricStore, self).__init__(state, time_fn=time_fn) 20 super(DeferredMetricStore, self).__init__(state, time_fn=time_fn)
21 21
22 self._base_store = base_store 22 self._base_store = base_store
23 self._thread_local = threading.local() 23 self._thread_local = threading.local()
24 24
25 def initialize_context(self): 25 def initialize_context(self):
26 self._thread_local.deferred = {} 26 self._thread_local.deferred = []
27 27
28 def finalize_context(self): 28 def finalize_context(self):
29 try: 29 try:
30 deferred = self._thread_local.deferred 30 deferred = self._thread_local.deferred
31 except AttributeError: 31 except AttributeError:
32 raise FinalizeWithoutInitializeError() 32 raise FinalizeWithoutInitializeError()
33 else: 33 else:
34 del self._thread_local.deferred 34 del self._thread_local.deferred
35 self._thread_local.finalizing = True 35 self._thread_local.finalizing = True
36 try: 36 try:
37 self._base_store.modify_multi(deferred.itervalues()) 37 self._base_store.modify_multi(deferred)
38 finally: 38 finally:
39 self._thread_local.finalizing = False 39 self._thread_local.finalizing = False
40 40
41 def update_metric_index(self): 41 def update_metric_index(self):
42 self._base_store.update_metric_index() 42 self._base_store.update_metric_index()
43 43
44 def get(self, name, fields, default=None): 44 def get(self, name, fields, default=None):
45 return self._base_store.get(name, fields, default) 45 return self._base_store.get(name, fields, default)
46 46
47 def get_all(self): 47 def get_all(self):
48 return self._base_store.get_all() 48 return self._base_store.get_all()
49 49
50 def set(self, name, fields, value, enforce_ge=False): 50 def set(self, name, fields, value, enforce_ge=False):
51 try: 51 try:
52 deferred = self._thread_local.deferred 52 deferred = self._thread_local.deferred
53 except AttributeError: 53 except AttributeError:
54 if not getattr( 54 if not getattr(
55 self._thread_local, 'finalizing', False): # pragma: no cover 55 self._thread_local, 'finalizing', False): # pragma: no cover
56 logging.warning( 56 logging.warning(
57 'DeferredMetricStore is used without a context. Have you wrapped ' 57 'DeferredMetricStore is used without a context. Have you wrapped '
58 'your WSGIApplication with gae_ts_mon.initialize?') 58 'your WSGIApplication with gae_ts_mon.initialize?')
59 self._base_store.set(name, fields, value, enforce_ge) 59 self._base_store.set(name, fields, value, enforce_ge)
60 else: 60 else:
61 key = (name, fields) 61 deferred.append(
62 deferred[key] = metric_store.combine_modifications(
63 deferred.get(key),
64 metric_store.Modification(name, fields, 'set', (value, enforce_ge))) 62 metric_store.Modification(name, fields, 'set', (value, enforce_ge)))
65 63
66 def incr(self, name, fields, delta, modify_fn=operator.add): 64 def incr(self, name, fields, delta, modify_fn=operator.add):
67 try: 65 try:
68 deferred = self._thread_local.deferred 66 deferred = self._thread_local.deferred
69 except AttributeError: 67 except AttributeError:
70 if not getattr( 68 if not getattr(
71 self._thread_local, 'finalizing', False): # pragma: no cover 69 self._thread_local, 'finalizing', False): # pragma: no cover
72 logging.warning( 70 logging.warning(
73 'DeferredMetricStore is used without a context. Have you wrapped ' 71 'DeferredMetricStore is used without a context. Have you wrapped '
74 'your WSGIApplication with gae_ts_mon.initialize?') 72 'your WSGIApplication with gae_ts_mon.initialize?')
75 self._base_store.incr(name, fields, delta, modify_fn) 73 self._base_store.incr(name, fields, delta, modify_fn)
76 else: 74 else:
77 key = (name, fields) 75 deferred.append(
78 deferred[key] = metric_store.combine_modifications(
79 deferred.get(key),
80 metric_store.Modification(name, fields, 'incr', (delta, modify_fn))) 76 metric_store.Modification(name, fields, 'incr', (delta, modify_fn)))
81 77
82 def reset_for_unittest(self, name=None): 78 def reset_for_unittest(self, name=None):
83 self._base_store.reset_for_unittest(name) 79 self._base_store.reset_for_unittest(name)
84 80
85 try: 81 try:
86 deferred = self._thread_local.deferred 82 deferred = self._thread_local.deferred
87 except AttributeError: 83 except AttributeError:
88 pass 84 pass
89 else: 85 else:
90 for key in deferred.keys(): 86 if name is None:
91 if name is None or key[0] == name: 87 self._thread_local.deferred = []
92 del deferred[key] 88 else:
89 self._thread_local.deferred = [x for x in deferred if x.name != name]
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698