OLD | NEW |
| (Empty) |
1 # Copyright 2015 The Chromium Authors. All rights reserved. | |
2 # Use of this source code is governed by a BSD-style license that can be | |
3 # found in the LICENSE file. | |
4 | |
5 """Classes representing the monitoring interface for tasks or devices. | |
6 | |
7 Usage: | |
8 import argparse | |
9 from infra_libs import ts_mon | |
10 | |
11 p = argparse.ArgumentParser() | |
12 ts_mon.add_argparse_options(p) | |
13 args = p.parse_args() # Must contain info for Monitor (and optionally Target) | |
14 ts_mon.process_argparse_options(args) | |
15 | |
16 # Will use the default Target set up via command line args: | |
17 m = ts_mon.BooleanMetric('/my/metric/name', fields={'foo': 1, 'bar': 'baz'}) | |
18 m.set(True) | |
19 | |
20 # Use a custom Target: | |
21 t = ts_mon.TaskTarget('service', 'job', 'region', 'host') # or DeviceTarget | |
22 m2 = ts_mon.GaugeMetric('/my/metric/name2', fields={'asdf': 'qwer'}, target=t) | |
23 m2.set(5) | |
24 | |
25 Library usage: | |
26 from infra_libs.ts_mon import CounterMetric | |
27 # No need to set up Monitor or Target, assume calling code did that. | |
28 c = CounterMetric('/my/counter', fields={'source': 'mylibrary'}) | |
29 c.set(0) | |
30 for x in range(100): | |
31 c.increment() | |
32 """ | |
33 | |
34 import datetime | |
35 import logging | |
36 import random | |
37 import threading | |
38 import time | |
39 | |
40 from infra_libs.ts_mon.common import errors | |
41 from infra_libs.ts_mon.common import metric_store | |
42 from infra_libs.ts_mon.protos import metrics_pb2 | |
43 | |
44 # The maximum number of MetricsData messages to include in each HTTP request. | |
45 # MetricsCollections larger than this will be split into multiple requests. | |
46 METRICS_DATA_LENGTH_LIMIT = 1000 | |
47 | |
48 | |
49 class State(object): | |
50 """Package-level state is stored here so that it is easily accessible. | |
51 | |
52 Configuration is kept in this one object at the global level so that all | |
53 libraries in use by the same tool or service can all take advantage of the | |
54 same configuration. | |
55 """ | |
56 | |
57 def __init__(self, store_ctor=None, target=None): | |
58 """Optional arguments are for unit tests.""" | |
59 if store_ctor is None: # pragma: no branch | |
60 store_ctor = metric_store.InProcessMetricStore | |
61 # The Monitor object that will be used to send all metrics. | |
62 self.global_monitor = None | |
63 # The Target object that will be paired with all metrics that don't supply | |
64 # their own. | |
65 self.target = target | |
66 # The flush mode being used to control when metrics are pushed. | |
67 self.flush_mode = None | |
68 # A predicate to determine if metrics should be sent. | |
69 self.flush_enabled_fn = lambda: True | |
70 # The background thread that flushes metrics every | |
71 # --ts-mon-flush-interval-secs seconds. May be None if | |
72 # --ts-mon-flush != 'auto' or --ts-mon-flush-interval-secs == 0. | |
73 self.flush_thread = None | |
74 # All metrics created by this application. | |
75 self.metrics = {} | |
76 # The MetricStore object that holds the actual metric values. | |
77 self.store = store_ctor(self) | |
78 # Cached time of the last flush. Useful mostly in AppEngine apps. | |
79 self.last_flushed = datetime.datetime.utcfromtimestamp(0) | |
80 # Metric name prefix | |
81 self.metric_name_prefix = '/chrome/infra/' | |
82 | |
83 def reset_for_unittest(self): | |
84 self.metrics = {} | |
85 self.last_flushed = datetime.datetime.utcfromtimestamp(0) | |
86 self.store.reset_for_unittest() | |
87 | |
88 state = State() | |
89 | |
90 | |
91 def flush(): | |
92 """Send all metrics that are registered in the application.""" | |
93 | |
94 if not state.flush_enabled_fn(): | |
95 logging.debug('ts_mon: sending metrics is disabled.') | |
96 return | |
97 | |
98 if not state.global_monitor or not state.target: | |
99 raise errors.MonitoringNoConfiguredMonitorError(None) | |
100 | |
101 proto = metrics_pb2.MetricsCollection() | |
102 | |
103 for target, metric, start_time, fields_values in state.store.get_all(): | |
104 for fields, value in fields_values.iteritems(): | |
105 if len(proto.data) >= METRICS_DATA_LENGTH_LIMIT: | |
106 state.global_monitor.send(proto) | |
107 del proto.data[:] | |
108 | |
109 metric.serialize_to(proto, start_time, fields, value, target) | |
110 | |
111 state.global_monitor.send(proto) | |
112 state.last_flushed = datetime.datetime.utcnow() | |
113 | |
114 | |
115 def register(metric): | |
116 """Adds the metric to the list of metrics sent by flush(). | |
117 | |
118 This is called automatically by Metric's constructor. | |
119 """ | |
120 # If someone is registering the same metric object twice, that's okay, but | |
121 # registering two different metric objects with the same metric name is not. | |
122 for m in state.metrics.values(): | |
123 if metric == m: | |
124 state.metrics[metric.name] = metric | |
125 return | |
126 if metric.name in state.metrics: | |
127 raise errors.MonitoringDuplicateRegistrationError(metric.name) | |
128 | |
129 state.metrics[metric.name] = metric | |
130 | |
131 | |
132 def unregister(metric): | |
133 """Removes the metric from the list of metrics sent by flush().""" | |
134 del state.metrics[metric.name] | |
135 | |
136 | |
137 def close(): | |
138 """Stops any background threads and waits for them to exit.""" | |
139 if state.flush_thread is not None: | |
140 state.flush_thread.stop() | |
141 | |
142 | |
143 def reset_for_unittest(disable=False): | |
144 state.reset_for_unittest() | |
145 if disable: | |
146 state.flush_enabled_fn = lambda: False | |
147 | |
148 | |
149 class _FlushThread(threading.Thread): | |
150 """Background thread that flushes metrics on an interval.""" | |
151 | |
152 def __init__(self, interval_secs, stop_event=None): | |
153 super(_FlushThread, self).__init__(name='ts_mon') | |
154 | |
155 if stop_event is None: | |
156 stop_event = threading.Event() | |
157 | |
158 self.daemon = True | |
159 self.interval_secs = interval_secs | |
160 self.stop_event = stop_event | |
161 | |
162 def _flush_and_log_exceptions(self): | |
163 try: | |
164 flush() | |
165 except Exception: | |
166 logging.exception('Automatic monitoring flush failed.') | |
167 | |
168 def run(self): | |
169 # Jitter the first interval so tasks started at the same time (say, by cron) | |
170 # on different machines don't all send metrics simultaneously. | |
171 next_timeout = random.uniform(self.interval_secs / 2.0, self.interval_secs) | |
172 | |
173 while True: | |
174 if self.stop_event.wait(next_timeout): | |
175 self._flush_and_log_exceptions() | |
176 return | |
177 | |
178 # Try to flush every N seconds exactly so rate calculations are more | |
179 # consistent. | |
180 start = time.time() | |
181 self._flush_and_log_exceptions() | |
182 flush_duration = time.time() - start | |
183 next_timeout = self.interval_secs - flush_duration | |
184 | |
185 if next_timeout < 0: | |
186 logging.warning( | |
187 'Last monitoring flush took %f seconds (longer than ' | |
188 '--ts-mon-flush-interval-secs = %f seconds)', | |
189 flush_duration, self.interval_secs) | |
190 next_timeout = 0 | |
191 | |
192 def stop(self): | |
193 """Stops the background thread and performs a final flush.""" | |
194 | |
195 self.stop_event.set() | |
196 self.join() | |
OLD | NEW |