| OLD | NEW |
| (Empty) |
| 1 # Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 # Use of this source code is governed by a BSD-style license that can be | |
| 3 # found in the LICENSE file. | |
| 4 | |
| 5 """Classes representing the monitoring interface for tasks or devices. | |
| 6 | |
| 7 Usage: | |
| 8 import argparse | |
| 9 from infra_libs import ts_mon | |
| 10 | |
| 11 p = argparse.ArgumentParser() | |
| 12 ts_mon.add_argparse_options(p) | |
| 13 args = p.parse_args() # Must contain info for Monitor (and optionally Target) | |
| 14 ts_mon.process_argparse_options(args) | |
| 15 | |
| 16 # Will use the default Target set up via command line args: | |
| 17 m = ts_mon.BooleanMetric('/my/metric/name', fields={'foo': 1, 'bar': 'baz'}) | |
| 18 m.set(True) | |
| 19 | |
| 20 # Use a custom Target: | |
| 21 t = ts_mon.TaskTarget('service', 'job', 'region', 'host') # or DeviceTarget | |
| 22 m2 = ts_mon.GaugeMetric('/my/metric/name2', fields={'asdf': 'qwer'}, target=t) | |
| 23 m2.set(5) | |
| 24 | |
| 25 Library usage: | |
| 26 from infra_libs.ts_mon import CounterMetric | |
| 27 # No need to set up Monitor or Target, assume calling code did that. | |
| 28 c = CounterMetric('/my/counter', fields={'source': 'mylibrary'}) | |
| 29 c.set(0) | |
| 30 for x in range(100): | |
| 31 c.increment() | |
| 32 """ | |
| 33 | |
| 34 import datetime | |
| 35 import logging | |
| 36 import random | |
| 37 import threading | |
| 38 import time | |
| 39 | |
| 40 from infra_libs.ts_mon.common import errors | |
| 41 from infra_libs.ts_mon.common import metric_store | |
| 42 from infra_libs.ts_mon.protos import metrics_pb2 | |
| 43 | |
| 44 # The maximum number of MetricsData messages to include in each HTTP request. | |
| 45 # MetricsCollections larger than this will be split into multiple requests. | |
| 46 METRICS_DATA_LENGTH_LIMIT = 1000 | |
| 47 | |
| 48 | |
| 49 class State(object): | |
| 50 """Package-level state is stored here so that it is easily accessible. | |
| 51 | |
| 52 Configuration is kept in this one object at the global level so that all | |
| 53 libraries in use by the same tool or service can all take advantage of the | |
| 54 same configuration. | |
| 55 """ | |
| 56 | |
| 57 def __init__(self, store_ctor=None, target=None): | |
| 58 """Optional arguments are for unit tests.""" | |
| 59 if store_ctor is None: # pragma: no branch | |
| 60 store_ctor = metric_store.InProcessMetricStore | |
| 61 # The Monitor object that will be used to send all metrics. | |
| 62 self.global_monitor = None | |
| 63 # The Target object that will be paired with all metrics that don't supply | |
| 64 # their own. | |
| 65 self.target = target | |
| 66 # The flush mode being used to control when metrics are pushed. | |
| 67 self.flush_mode = None | |
| 68 # A predicate to determine if metrics should be sent. | |
| 69 self.flush_enabled_fn = lambda: True | |
| 70 # The background thread that flushes metrics every | |
| 71 # --ts-mon-flush-interval-secs seconds. May be None if | |
| 72 # --ts-mon-flush != 'auto' or --ts-mon-flush-interval-secs == 0. | |
| 73 self.flush_thread = None | |
| 74 # All metrics created by this application. | |
| 75 self.metrics = {} | |
| 76 # The MetricStore object that holds the actual metric values. | |
| 77 self.store = store_ctor(self) | |
| 78 # Cached time of the last flush. Useful mostly in AppEngine apps. | |
| 79 self.last_flushed = datetime.datetime.utcfromtimestamp(0) | |
| 80 # Metric name prefix | |
| 81 self.metric_name_prefix = '/chrome/infra/' | |
| 82 | |
| 83 def reset_for_unittest(self): | |
| 84 self.metrics = {} | |
| 85 self.last_flushed = datetime.datetime.utcfromtimestamp(0) | |
| 86 self.store.reset_for_unittest() | |
| 87 | |
| 88 state = State() | |
| 89 | |
| 90 | |
| 91 def flush(): | |
| 92 """Send all metrics that are registered in the application.""" | |
| 93 | |
| 94 if not state.flush_enabled_fn(): | |
| 95 logging.debug('ts_mon: sending metrics is disabled.') | |
| 96 return | |
| 97 | |
| 98 if not state.global_monitor or not state.target: | |
| 99 raise errors.MonitoringNoConfiguredMonitorError(None) | |
| 100 | |
| 101 proto = metrics_pb2.MetricsCollection() | |
| 102 | |
| 103 for target, metric, start_time, fields_values in state.store.get_all(): | |
| 104 for fields, value in fields_values.iteritems(): | |
| 105 if len(proto.data) >= METRICS_DATA_LENGTH_LIMIT: | |
| 106 state.global_monitor.send(proto) | |
| 107 del proto.data[:] | |
| 108 | |
| 109 metric.serialize_to(proto, start_time, fields, value, target) | |
| 110 | |
| 111 state.global_monitor.send(proto) | |
| 112 state.last_flushed = datetime.datetime.utcnow() | |
| 113 | |
| 114 | |
| 115 def register(metric): | |
| 116 """Adds the metric to the list of metrics sent by flush(). | |
| 117 | |
| 118 This is called automatically by Metric's constructor. | |
| 119 """ | |
| 120 # If someone is registering the same metric object twice, that's okay, but | |
| 121 # registering two different metric objects with the same metric name is not. | |
| 122 for m in state.metrics.values(): | |
| 123 if metric == m: | |
| 124 state.metrics[metric.name] = metric | |
| 125 return | |
| 126 if metric.name in state.metrics: | |
| 127 raise errors.MonitoringDuplicateRegistrationError(metric.name) | |
| 128 | |
| 129 state.metrics[metric.name] = metric | |
| 130 | |
| 131 | |
| 132 def unregister(metric): | |
| 133 """Removes the metric from the list of metrics sent by flush().""" | |
| 134 del state.metrics[metric.name] | |
| 135 | |
| 136 | |
| 137 def close(): | |
| 138 """Stops any background threads and waits for them to exit.""" | |
| 139 if state.flush_thread is not None: | |
| 140 state.flush_thread.stop() | |
| 141 | |
| 142 | |
| 143 def reset_for_unittest(disable=False): | |
| 144 state.reset_for_unittest() | |
| 145 if disable: | |
| 146 state.flush_enabled_fn = lambda: False | |
| 147 | |
| 148 | |
| 149 class _FlushThread(threading.Thread): | |
| 150 """Background thread that flushes metrics on an interval.""" | |
| 151 | |
| 152 def __init__(self, interval_secs, stop_event=None): | |
| 153 super(_FlushThread, self).__init__(name='ts_mon') | |
| 154 | |
| 155 if stop_event is None: | |
| 156 stop_event = threading.Event() | |
| 157 | |
| 158 self.daemon = True | |
| 159 self.interval_secs = interval_secs | |
| 160 self.stop_event = stop_event | |
| 161 | |
| 162 def _flush_and_log_exceptions(self): | |
| 163 try: | |
| 164 flush() | |
| 165 except Exception: | |
| 166 logging.exception('Automatic monitoring flush failed.') | |
| 167 | |
| 168 def run(self): | |
| 169 # Jitter the first interval so tasks started at the same time (say, by cron) | |
| 170 # on different machines don't all send metrics simultaneously. | |
| 171 next_timeout = random.uniform(self.interval_secs / 2.0, self.interval_secs) | |
| 172 | |
| 173 while True: | |
| 174 if self.stop_event.wait(next_timeout): | |
| 175 self._flush_and_log_exceptions() | |
| 176 return | |
| 177 | |
| 178 # Try to flush every N seconds exactly so rate calculations are more | |
| 179 # consistent. | |
| 180 start = time.time() | |
| 181 self._flush_and_log_exceptions() | |
| 182 flush_duration = time.time() - start | |
| 183 next_timeout = self.interval_secs - flush_duration | |
| 184 | |
| 185 if next_timeout < 0: | |
| 186 logging.warning( | |
| 187 'Last monitoring flush took %f seconds (longer than ' | |
| 188 '--ts-mon-flush-interval-secs = %f seconds)', | |
| 189 flush_duration, self.interval_secs) | |
| 190 next_timeout = 0 | |
| 191 | |
| 192 def stop(self): | |
| 193 """Stops the background thread and performs a final flush.""" | |
| 194 | |
| 195 self.stop_event.set() | |
| 196 self.join() | |
| OLD | NEW |