OLD | NEW |
1 # Copyright 2015 The Chromium Authors. All rights reserved. | 1 # Copyright 2015 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 import logging | 5 import logging |
6 import operator | |
7 import threading | 6 import threading |
8 | 7 |
9 from infra_libs.ts_mon.common import metric_store | 8 from infra_libs.ts_mon.common import metric_store |
10 | 9 |
11 | 10 |
12 class FinalizeWithoutInitializeError(Exception): | 11 class FinalizeWithoutInitializeError(Exception): |
13 def __init__(self): | 12 def __init__(self): |
14 super(FinalizeWithoutInitializeError, self).__init__( | 13 super(FinalizeWithoutInitializeError, self).__init__( |
15 'finalize_context called before initialize_context in a thread') | 14 'finalize_context called before initialize_context in a thread') |
16 | 15 |
17 | 16 |
18 class DeferredMetricStore(metric_store.MetricStore): | 17 class DeferredMetricStore(metric_store.MetricStore): |
19 def __init__(self, state, base_store, time_fn=None): | 18 def __init__(self, state, base_store, time_fn=None): |
20 super(DeferredMetricStore, self).__init__(state, time_fn=time_fn) | 19 super(DeferredMetricStore, self).__init__(state, time_fn=time_fn) |
21 | 20 |
22 self._base_store = base_store | 21 self._base_store = base_store |
23 self._thread_local = threading.local() | 22 self._thread_local = threading.local() |
24 | 23 |
25 def initialize_context(self): | 24 def initialize_context(self): |
26 self._thread_local.deferred = {} | 25 self._thread_local.deferred = [] |
27 | 26 |
28 def finalize_context(self): | 27 def finalize_context(self): |
29 try: | 28 try: |
30 deferred = self._thread_local.deferred | 29 deferred = self._thread_local.deferred |
31 except AttributeError: | 30 except AttributeError: |
32 raise FinalizeWithoutInitializeError() | 31 raise FinalizeWithoutInitializeError() |
33 else: | 32 else: |
34 del self._thread_local.deferred | 33 del self._thread_local.deferred |
35 self._thread_local.finalizing = True | 34 self._thread_local.finalizing = True |
36 try: | 35 try: |
37 self._base_store.modify_multi(deferred.itervalues()) | 36 self._base_store.modify_multi(deferred) |
38 finally: | 37 finally: |
39 self._thread_local.finalizing = False | 38 self._thread_local.finalizing = False |
40 | 39 |
41 def update_metric_index(self): | 40 def update_metric_index(self): |
42 self._base_store.update_metric_index() | 41 self._base_store.update_metric_index() |
43 | 42 |
44 def get(self, name, fields, default=None): | 43 def get(self, name, fields, default=None): |
45 return self._base_store.get(name, fields, default) | 44 return self._base_store.get(name, fields, default) |
46 | 45 |
47 def get_all(self): | 46 def get_all(self): |
48 return self._base_store.get_all() | 47 return self._base_store.get_all() |
49 | 48 |
50 def set(self, name, fields, value, enforce_ge=False): | 49 def set(self, name, fields, value, enforce_ge=False): |
51 try: | 50 try: |
52 deferred = self._thread_local.deferred | 51 deferred = self._thread_local.deferred |
53 except AttributeError: | 52 except AttributeError: |
54 if not getattr( | 53 if not getattr( |
55 self._thread_local, 'finalizing', False): # pragma: no cover | 54 self._thread_local, 'finalizing', False): # pragma: no cover |
56 logging.warning( | 55 logging.warning( |
57 'DeferredMetricStore is used without a context. Have you wrapped ' | 56 'DeferredMetricStore is used without a context. Have you wrapped ' |
58 'your WSGIApplication with gae_ts_mon.initialize?') | 57 'your WSGIApplication with gae_ts_mon.initialize?') |
59 self._base_store.set(name, fields, value, enforce_ge) | 58 self._base_store.set(name, fields, value, enforce_ge) |
60 else: | 59 else: |
61 key = (name, fields) | 60 deferred.append( |
62 deferred[key] = metric_store.combine_modifications( | |
63 deferred.get(key), | |
64 metric_store.Modification(name, fields, 'set', (value, enforce_ge))) | 61 metric_store.Modification(name, fields, 'set', (value, enforce_ge))) |
65 | 62 |
66 def incr(self, name, fields, delta, modify_fn=operator.add): | 63 def incr(self, name, fields, delta, modify_fn=None): |
67 try: | 64 try: |
68 deferred = self._thread_local.deferred | 65 deferred = self._thread_local.deferred |
69 except AttributeError: | 66 except AttributeError: |
70 if not getattr( | 67 if not getattr( |
71 self._thread_local, 'finalizing', False): # pragma: no cover | 68 self._thread_local, 'finalizing', False): # pragma: no cover |
72 logging.warning( | 69 logging.warning( |
73 'DeferredMetricStore is used without a context. Have you wrapped ' | 70 'DeferredMetricStore is used without a context. Have you wrapped ' |
74 'your WSGIApplication with gae_ts_mon.initialize?') | 71 'your WSGIApplication with gae_ts_mon.initialize?') |
75 self._base_store.incr(name, fields, delta, modify_fn) | 72 self._base_store.incr(name, fields, delta, modify_fn) |
76 else: | 73 else: |
77 key = (name, fields) | 74 deferred.append( |
78 deferred[key] = metric_store.combine_modifications( | |
79 deferred.get(key), | |
80 metric_store.Modification(name, fields, 'incr', (delta, modify_fn))) | 75 metric_store.Modification(name, fields, 'incr', (delta, modify_fn))) |
81 | 76 |
82 def reset_for_unittest(self, name=None): | 77 def reset_for_unittest(self, name=None): |
83 self._base_store.reset_for_unittest(name) | 78 self._base_store.reset_for_unittest(name) |
84 | 79 |
85 try: | 80 try: |
86 deferred = self._thread_local.deferred | 81 deferred = self._thread_local.deferred |
87 except AttributeError: | 82 except AttributeError: |
88 pass | 83 pass |
89 else: | 84 else: |
90 for key in deferred.keys(): | 85 if name is None: |
91 if name is None or key[0] == name: | 86 self._thread_local.deferred = [] |
92 del deferred[key] | 87 else: |
| 88 self._thread_local.deferred = [x for x in deferred if x.name != name] |
OLD | NEW |