OLD | NEW |
1 # Copyright 2015 The Chromium Authors. All rights reserved. | 1 # Copyright 2015 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 import logging | 5 import logging |
6 import operator | 6 import operator |
7 import threading | 7 import threading |
8 | 8 |
9 from infra_libs.ts_mon.common import metric_store | 9 from infra_libs.ts_mon.common import metric_store |
10 | 10 |
11 | 11 |
12 class FinalizeWithoutInitializeError(Exception): | 12 class FinalizeWithoutInitializeError(Exception): |
13 def __init__(self): | 13 def __init__(self): |
14 super(FinalizeWithoutInitializeError, self).__init__( | 14 super(FinalizeWithoutInitializeError, self).__init__( |
15 'finalize_context called before initialize_context in a thread') | 15 'finalize_context called before initialize_context in a thread') |
16 | 16 |
17 | 17 |
18 class DeferredMetricStore(metric_store.MetricStore): | 18 class DeferredMetricStore(metric_store.MetricStore): |
19 def __init__(self, state, base_store, time_fn=None): | 19 def __init__(self, state, base_store, time_fn=None): |
20 super(DeferredMetricStore, self).__init__(state, time_fn=time_fn) | 20 super(DeferredMetricStore, self).__init__(state, time_fn=time_fn) |
21 | 21 |
22 self._base_store = base_store | 22 self._base_store = base_store |
23 self._thread_local = threading.local() | 23 self._thread_local = threading.local() |
24 | 24 |
25 def initialize_context(self): | 25 def initialize_context(self): |
26 self._thread_local.deferred = {} | 26 self._thread_local.deferred = [] |
27 | 27 |
28 def finalize_context(self): | 28 def finalize_context(self): |
29 try: | 29 try: |
30 deferred = self._thread_local.deferred | 30 deferred = self._thread_local.deferred |
31 except AttributeError: | 31 except AttributeError: |
32 raise FinalizeWithoutInitializeError() | 32 raise FinalizeWithoutInitializeError() |
33 else: | 33 else: |
34 del self._thread_local.deferred | 34 del self._thread_local.deferred |
35 self._thread_local.finalizing = True | 35 self._thread_local.finalizing = True |
36 try: | 36 try: |
37 self._base_store.modify_multi(deferred.itervalues()) | 37 self._base_store.modify_multi(deferred) |
38 finally: | 38 finally: |
39 self._thread_local.finalizing = False | 39 self._thread_local.finalizing = False |
40 | 40 |
41 def update_metric_index(self): | 41 def update_metric_index(self): |
42 self._base_store.update_metric_index() | 42 self._base_store.update_metric_index() |
43 | 43 |
44 def get(self, name, fields, default=None): | 44 def get(self, name, fields, default=None): |
45 return self._base_store.get(name, fields, default) | 45 return self._base_store.get(name, fields, default) |
46 | 46 |
47 def get_all(self): | 47 def get_all(self): |
48 return self._base_store.get_all() | 48 return self._base_store.get_all() |
49 | 49 |
50 def set(self, name, fields, value, enforce_ge=False): | 50 def set(self, name, fields, value, enforce_ge=False): |
51 try: | 51 try: |
52 deferred = self._thread_local.deferred | 52 deferred = self._thread_local.deferred |
53 except AttributeError: | 53 except AttributeError: |
54 if not getattr( | 54 if not getattr( |
55 self._thread_local, 'finalizing', False): # pragma: no cover | 55 self._thread_local, 'finalizing', False): # pragma: no cover |
56 logging.warning( | 56 logging.warning( |
57 'DeferredMetricStore is used without a context. Have you wrapped ' | 57 'DeferredMetricStore is used without a context. Have you wrapped ' |
58 'your WSGIApplication with gae_ts_mon.initialize?') | 58 'your WSGIApplication with gae_ts_mon.initialize?') |
59 self._base_store.set(name, fields, value, enforce_ge) | 59 self._base_store.set(name, fields, value, enforce_ge) |
60 else: | 60 else: |
61 key = (name, fields) | 61 deferred.append( |
62 deferred[key] = metric_store.combine_modifications( | |
63 deferred.get(key), | |
64 metric_store.Modification(name, fields, 'set', (value, enforce_ge))) | 62 metric_store.Modification(name, fields, 'set', (value, enforce_ge))) |
65 | 63 |
66 def incr(self, name, fields, delta, modify_fn=operator.add): | 64 def incr(self, name, fields, delta, modify_fn=operator.add): |
67 try: | 65 try: |
68 deferred = self._thread_local.deferred | 66 deferred = self._thread_local.deferred |
69 except AttributeError: | 67 except AttributeError: |
70 if not getattr( | 68 if not getattr( |
71 self._thread_local, 'finalizing', False): # pragma: no cover | 69 self._thread_local, 'finalizing', False): # pragma: no cover |
72 logging.warning( | 70 logging.warning( |
73 'DeferredMetricStore is used without a context. Have you wrapped ' | 71 'DeferredMetricStore is used without a context. Have you wrapped ' |
74 'your WSGIApplication with gae_ts_mon.initialize?') | 72 'your WSGIApplication with gae_ts_mon.initialize?') |
75 self._base_store.incr(name, fields, delta, modify_fn) | 73 self._base_store.incr(name, fields, delta, modify_fn) |
76 else: | 74 else: |
77 key = (name, fields) | 75 deferred.append( |
78 deferred[key] = metric_store.combine_modifications( | |
79 deferred.get(key), | |
80 metric_store.Modification(name, fields, 'incr', (delta, modify_fn))) | 76 metric_store.Modification(name, fields, 'incr', (delta, modify_fn))) |
81 | 77 |
82 def reset_for_unittest(self, name=None): | 78 def reset_for_unittest(self, name=None): |
83 self._base_store.reset_for_unittest(name) | 79 self._base_store.reset_for_unittest(name) |
84 | 80 |
85 try: | 81 try: |
86 deferred = self._thread_local.deferred | 82 deferred = self._thread_local.deferred |
87 except AttributeError: | 83 except AttributeError: |
88 pass | 84 pass |
89 else: | 85 else: |
90 for key in deferred.keys(): | 86 if name is None: |
91 if name is None or key[0] == name: | 87 self._thread_local.deferred = [] |
92 del deferred[key] | 88 else: |
| 89 self._thread_local.deferred = [x for x in deferred if x.name != name] |
OLD | NEW |