Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(427)

Side by Side Diff: appengine_module/gae_ts_mon/deferred_metric_store.py

Issue 1531573003: Handle multiple modifications to distribution metrics correctly. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Add a missing test for coverage Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | appengine_module/gae_ts_mon/memcache_metric_store.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright 2015 The Chromium Authors. All rights reserved. 1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 import logging 5 import logging
6 import operator
7 import threading 6 import threading
8 7
9 from infra_libs.ts_mon.common import metric_store 8 from infra_libs.ts_mon.common import metric_store
10 9
11 10
12 class FinalizeWithoutInitializeError(Exception): 11 class FinalizeWithoutInitializeError(Exception):
13 def __init__(self): 12 def __init__(self):
14 super(FinalizeWithoutInitializeError, self).__init__( 13 super(FinalizeWithoutInitializeError, self).__init__(
15 'finalize_context called before initialize_context in a thread') 14 'finalize_context called before initialize_context in a thread')
16 15
17 16
18 class DeferredMetricStore(metric_store.MetricStore): 17 class DeferredMetricStore(metric_store.MetricStore):
19 def __init__(self, state, base_store, time_fn=None): 18 def __init__(self, state, base_store, time_fn=None):
20 super(DeferredMetricStore, self).__init__(state, time_fn=time_fn) 19 super(DeferredMetricStore, self).__init__(state, time_fn=time_fn)
21 20
22 self._base_store = base_store 21 self._base_store = base_store
23 self._thread_local = threading.local() 22 self._thread_local = threading.local()
24 23
25 def initialize_context(self): 24 def initialize_context(self):
26 self._thread_local.deferred = {} 25 self._thread_local.deferred = []
27 26
28 def finalize_context(self): 27 def finalize_context(self):
29 try: 28 try:
30 deferred = self._thread_local.deferred 29 deferred = self._thread_local.deferred
31 except AttributeError: 30 except AttributeError:
32 raise FinalizeWithoutInitializeError() 31 raise FinalizeWithoutInitializeError()
33 else: 32 else:
34 del self._thread_local.deferred 33 del self._thread_local.deferred
35 self._thread_local.finalizing = True 34 self._thread_local.finalizing = True
36 try: 35 try:
37 self._base_store.modify_multi(deferred.itervalues()) 36 self._base_store.modify_multi(deferred)
38 finally: 37 finally:
39 self._thread_local.finalizing = False 38 self._thread_local.finalizing = False
40 39
41 def update_metric_index(self): 40 def update_metric_index(self):
42 self._base_store.update_metric_index() 41 self._base_store.update_metric_index()
43 42
44 def get(self, name, fields, default=None): 43 def get(self, name, fields, default=None):
45 return self._base_store.get(name, fields, default) 44 return self._base_store.get(name, fields, default)
46 45
47 def get_all(self): 46 def get_all(self):
48 return self._base_store.get_all() 47 return self._base_store.get_all()
49 48
50 def set(self, name, fields, value, enforce_ge=False): 49 def set(self, name, fields, value, enforce_ge=False):
51 try: 50 try:
52 deferred = self._thread_local.deferred 51 deferred = self._thread_local.deferred
53 except AttributeError: 52 except AttributeError:
54 if not getattr( 53 if not getattr(
55 self._thread_local, 'finalizing', False): # pragma: no cover 54 self._thread_local, 'finalizing', False): # pragma: no cover
56 logging.warning( 55 logging.warning(
57 'DeferredMetricStore is used without a context. Have you wrapped ' 56 'DeferredMetricStore is used without a context. Have you wrapped '
58 'your WSGIApplication with gae_ts_mon.initialize?') 57 'your WSGIApplication with gae_ts_mon.initialize?')
59 self._base_store.set(name, fields, value, enforce_ge) 58 self._base_store.set(name, fields, value, enforce_ge)
60 else: 59 else:
61 key = (name, fields) 60 deferred.append(
62 deferred[key] = metric_store.combine_modifications(
63 deferred.get(key),
64 metric_store.Modification(name, fields, 'set', (value, enforce_ge))) 61 metric_store.Modification(name, fields, 'set', (value, enforce_ge)))
65 62
66 def incr(self, name, fields, delta, modify_fn=operator.add): 63 def incr(self, name, fields, delta, modify_fn=None):
67 try: 64 try:
68 deferred = self._thread_local.deferred 65 deferred = self._thread_local.deferred
69 except AttributeError: 66 except AttributeError:
70 if not getattr( 67 if not getattr(
71 self._thread_local, 'finalizing', False): # pragma: no cover 68 self._thread_local, 'finalizing', False): # pragma: no cover
72 logging.warning( 69 logging.warning(
73 'DeferredMetricStore is used without a context. Have you wrapped ' 70 'DeferredMetricStore is used without a context. Have you wrapped '
74 'your WSGIApplication with gae_ts_mon.initialize?') 71 'your WSGIApplication with gae_ts_mon.initialize?')
75 self._base_store.incr(name, fields, delta, modify_fn) 72 self._base_store.incr(name, fields, delta, modify_fn)
76 else: 73 else:
77 key = (name, fields) 74 deferred.append(
78 deferred[key] = metric_store.combine_modifications(
79 deferred.get(key),
80 metric_store.Modification(name, fields, 'incr', (delta, modify_fn))) 75 metric_store.Modification(name, fields, 'incr', (delta, modify_fn)))
81 76
82 def reset_for_unittest(self, name=None): 77 def reset_for_unittest(self, name=None):
83 self._base_store.reset_for_unittest(name) 78 self._base_store.reset_for_unittest(name)
84 79
85 try: 80 try:
86 deferred = self._thread_local.deferred 81 deferred = self._thread_local.deferred
87 except AttributeError: 82 except AttributeError:
88 pass 83 pass
89 else: 84 else:
90 for key in deferred.keys(): 85 if name is None:
91 if name is None or key[0] == name: 86 self._thread_local.deferred = []
92 del deferred[key] 87 else:
88 self._thread_local.deferred = [x for x in deferred if x.name != name]
OLDNEW
« no previous file with comments | « no previous file | appengine_module/gae_ts_mon/memcache_metric_store.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698