| OLD | NEW |
| 1 # Copyright 2015 The Chromium Authors. All rights reserved. | 1 # Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 import logging | 5 import logging |
| 6 import operator | 6 import operator |
| 7 import threading | 7 import threading |
| 8 | 8 |
| 9 from infra_libs.ts_mon.common import metric_store | 9 from infra_libs.ts_mon.common import metric_store |
| 10 | 10 |
| 11 | 11 |
| 12 class FinalizeWithoutInitializeError(Exception): | 12 class FinalizeWithoutInitializeError(Exception): |
| 13 def __init__(self): | 13 def __init__(self): |
| 14 super(FinalizeWithoutInitializeError, self).__init__( | 14 super(FinalizeWithoutInitializeError, self).__init__( |
| 15 'finalize_context called before initialize_context in a thread') | 15 'finalize_context called before initialize_context in a thread') |
| 16 | 16 |
| 17 | 17 |
| 18 class DeferredMetricStore(metric_store.MetricStore): | 18 class DeferredMetricStore(metric_store.MetricStore): |
| 19 def __init__(self, state, base_store, time_fn=None): | 19 def __init__(self, state, base_store, time_fn=None): |
| 20 super(DeferredMetricStore, self).__init__(state, time_fn=time_fn) | 20 super(DeferredMetricStore, self).__init__(state, time_fn=time_fn) |
| 21 | 21 |
| 22 self._base_store = base_store | 22 self._base_store = base_store |
| 23 self._thread_local = threading.local() | 23 self._thread_local = threading.local() |
| 24 | 24 |
| 25 def initialize_context(self): | 25 def initialize_context(self): |
| 26 self._thread_local.deferred = {} | 26 self._thread_local.deferred = [] |
| 27 | 27 |
| 28 def finalize_context(self): | 28 def finalize_context(self): |
| 29 try: | 29 try: |
| 30 deferred = self._thread_local.deferred | 30 deferred = self._thread_local.deferred |
| 31 except AttributeError: | 31 except AttributeError: |
| 32 raise FinalizeWithoutInitializeError() | 32 raise FinalizeWithoutInitializeError() |
| 33 else: | 33 else: |
| 34 del self._thread_local.deferred | 34 del self._thread_local.deferred |
| 35 self._thread_local.finalizing = True | 35 self._thread_local.finalizing = True |
| 36 try: | 36 try: |
| 37 self._base_store.modify_multi(deferred.itervalues()) | 37 self._base_store.modify_multi(deferred) |
| 38 finally: | 38 finally: |
| 39 self._thread_local.finalizing = False | 39 self._thread_local.finalizing = False |
| 40 | 40 |
| 41 def update_metric_index(self): | 41 def update_metric_index(self): |
| 42 self._base_store.update_metric_index() | 42 self._base_store.update_metric_index() |
| 43 | 43 |
| 44 def get(self, name, fields, default=None): | 44 def get(self, name, fields, default=None): |
| 45 return self._base_store.get(name, fields, default) | 45 return self._base_store.get(name, fields, default) |
| 46 | 46 |
| 47 def get_all(self): | 47 def get_all(self): |
| 48 return self._base_store.get_all() | 48 return self._base_store.get_all() |
| 49 | 49 |
| 50 def set(self, name, fields, value, enforce_ge=False): | 50 def set(self, name, fields, value, enforce_ge=False): |
| 51 try: | 51 try: |
| 52 deferred = self._thread_local.deferred | 52 deferred = self._thread_local.deferred |
| 53 except AttributeError: | 53 except AttributeError: |
| 54 if not getattr( | 54 if not getattr( |
| 55 self._thread_local, 'finalizing', False): # pragma: no cover | 55 self._thread_local, 'finalizing', False): # pragma: no cover |
| 56 logging.warning( | 56 logging.warning( |
| 57 'DeferredMetricStore is used without a context. Have you wrapped ' | 57 'DeferredMetricStore is used without a context. Have you wrapped ' |
| 58 'your WSGIApplication with gae_ts_mon.initialize?') | 58 'your WSGIApplication with gae_ts_mon.initialize?') |
| 59 self._base_store.set(name, fields, value, enforce_ge) | 59 self._base_store.set(name, fields, value, enforce_ge) |
| 60 else: | 60 else: |
| 61 key = (name, fields) | 61 deferred.append( |
| 62 deferred[key] = metric_store.combine_modifications( | |
| 63 deferred.get(key), | |
| 64 metric_store.Modification(name, fields, 'set', (value, enforce_ge))) | 62 metric_store.Modification(name, fields, 'set', (value, enforce_ge))) |
| 65 | 63 |
| 66 def incr(self, name, fields, delta, modify_fn=operator.add): | 64 def incr(self, name, fields, delta, modify_fn=operator.add): |
| 67 try: | 65 try: |
| 68 deferred = self._thread_local.deferred | 66 deferred = self._thread_local.deferred |
| 69 except AttributeError: | 67 except AttributeError: |
| 70 if not getattr( | 68 if not getattr( |
| 71 self._thread_local, 'finalizing', False): # pragma: no cover | 69 self._thread_local, 'finalizing', False): # pragma: no cover |
| 72 logging.warning( | 70 logging.warning( |
| 73 'DeferredMetricStore is used without a context. Have you wrapped ' | 71 'DeferredMetricStore is used without a context. Have you wrapped ' |
| 74 'your WSGIApplication with gae_ts_mon.initialize?') | 72 'your WSGIApplication with gae_ts_mon.initialize?') |
| 75 self._base_store.incr(name, fields, delta, modify_fn) | 73 self._base_store.incr(name, fields, delta, modify_fn) |
| 76 else: | 74 else: |
| 77 key = (name, fields) | 75 deferred.append( |
| 78 deferred[key] = metric_store.combine_modifications( | |
| 79 deferred.get(key), | |
| 80 metric_store.Modification(name, fields, 'incr', (delta, modify_fn))) | 76 metric_store.Modification(name, fields, 'incr', (delta, modify_fn))) |
| 81 | 77 |
| 82 def reset_for_unittest(self, name=None): | 78 def reset_for_unittest(self, name=None): |
| 83 self._base_store.reset_for_unittest(name) | 79 self._base_store.reset_for_unittest(name) |
| 84 | 80 |
| 85 try: | 81 try: |
| 86 deferred = self._thread_local.deferred | 82 deferred = self._thread_local.deferred |
| 87 except AttributeError: | 83 except AttributeError: |
| 88 pass | 84 pass |
| 89 else: | 85 else: |
| 90 for key in deferred.keys(): | 86 if name is None: |
| 91 if name is None or key[0] == name: | 87 self._thread_local.deferred = [] |
| 92 del deferred[key] | 88 else: |
| 89 self._thread_local.deferred = [x for x in deferred if x.name != name] |
| OLD | NEW |