Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(558)

Side by Side Diff: infra_libs/ts_mon/common/interface.py

Issue 2213143002: Add infra_libs as a bootstrap dependency. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Removed the ugly import hack Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 """Classes representing the monitoring interface for tasks or devices.
6
7 Usage:
8 import argparse
9 from infra_libs import ts_mon
10
11 p = argparse.ArgumentParser()
12 ts_mon.add_argparse_options(p)
13 args = p.parse_args() # Must contain info for Monitor (and optionally Target)
14 ts_mon.process_argparse_options(args)
15
16 # Will use the default Target set up via command line args:
17 m = ts_mon.BooleanMetric('/my/metric/name', fields={'foo': 1, 'bar': 'baz'})
18 m.set(True)
19
20 # Use a custom Target:
21 t = ts_mon.TaskTarget('service', 'job', 'region', 'host') # or DeviceTarget
22 m2 = ts_mon.GaugeMetric('/my/metric/name2', fields={'asdf': 'qwer'}, target=t)
23 m2.set(5)
24
25 Library usage:
26 from infra_libs.ts_mon import CounterMetric
27 # No need to set up Monitor or Target, assume calling code did that.
28 c = CounterMetric('/my/counter', fields={'source': 'mylibrary'})
29 c.set(0)
30 for x in range(100):
31 c.increment()
32 """
33
34 import datetime
35 import logging
36 import random
37 import threading
38 import time
39
40 from infra_libs.ts_mon.common import errors
41 from infra_libs.ts_mon.common import metric_store
42 from infra_libs.ts_mon.protos import metrics_pb2
43
44 # The maximum number of MetricsData messages to include in each HTTP request.
45 # MetricsCollections larger than this will be split into multiple requests.
46 METRICS_DATA_LENGTH_LIMIT = 1000
47
48
49 class State(object):
50 """Package-level state is stored here so that it is easily accessible.
51
52 Configuration is kept in this one object at the global level so that all
53 libraries in use by the same tool or service can all take advantage of the
54 same configuration.
55 """
56
57 def __init__(self, store_ctor=None, target=None):
58 """Optional arguments are for unit tests."""
59 if store_ctor is None: # pragma: no branch
60 store_ctor = metric_store.InProcessMetricStore
61 # The Monitor object that will be used to send all metrics.
62 self.global_monitor = None
63 # The Target object that will be paired with all metrics that don't supply
64 # their own.
65 self.target = target
66 # The flush mode being used to control when metrics are pushed.
67 self.flush_mode = None
68 # A predicate to determine if metrics should be sent.
69 self.flush_enabled_fn = lambda: True
70 # The background thread that flushes metrics every
71 # --ts-mon-flush-interval-secs seconds. May be None if
72 # --ts-mon-flush != 'auto' or --ts-mon-flush-interval-secs == 0.
73 self.flush_thread = None
74 # All metrics created by this application.
75 self.metrics = {}
76 # The MetricStore object that holds the actual metric values.
77 self.store = store_ctor(self)
78 # Cached time of the last flush. Useful mostly in AppEngine apps.
79 self.last_flushed = datetime.datetime.utcfromtimestamp(0)
80 # Metric name prefix
81 self.metric_name_prefix = '/chrome/infra/'
82
83 def reset_for_unittest(self):
84 self.metrics = {}
85 self.last_flushed = datetime.datetime.utcfromtimestamp(0)
86 self.store.reset_for_unittest()
87
88 state = State()
89
90
91 def flush():
92 """Send all metrics that are registered in the application."""
93
94 if not state.flush_enabled_fn():
95 logging.debug('ts_mon: sending metrics is disabled.')
96 return
97
98 if not state.global_monitor or not state.target:
99 raise errors.MonitoringNoConfiguredMonitorError(None)
100
101 proto = metrics_pb2.MetricsCollection()
102
103 for target, metric, start_time, fields_values in state.store.get_all():
104 for fields, value in fields_values.iteritems():
105 if len(proto.data) >= METRICS_DATA_LENGTH_LIMIT:
106 state.global_monitor.send(proto)
107 del proto.data[:]
108
109 metric.serialize_to(proto, start_time, fields, value, target)
110
111 state.global_monitor.send(proto)
112 state.last_flushed = datetime.datetime.utcnow()
113
114
115 def register(metric):
116 """Adds the metric to the list of metrics sent by flush().
117
118 This is called automatically by Metric's constructor.
119 """
120 # If someone is registering the same metric object twice, that's okay, but
121 # registering two different metric objects with the same metric name is not.
122 for m in state.metrics.values():
123 if metric == m:
124 state.metrics[metric.name] = metric
125 return
126 if metric.name in state.metrics:
127 raise errors.MonitoringDuplicateRegistrationError(metric.name)
128
129 state.metrics[metric.name] = metric
130
131
132 def unregister(metric):
133 """Removes the metric from the list of metrics sent by flush()."""
134 del state.metrics[metric.name]
135
136
137 def close():
138 """Stops any background threads and waits for them to exit."""
139 if state.flush_thread is not None:
140 state.flush_thread.stop()
141
142
143 def reset_for_unittest(disable=False):
144 state.reset_for_unittest()
145 if disable:
146 state.flush_enabled_fn = lambda: False
147
148
149 class _FlushThread(threading.Thread):
150 """Background thread that flushes metrics on an interval."""
151
152 def __init__(self, interval_secs, stop_event=None):
153 super(_FlushThread, self).__init__(name='ts_mon')
154
155 if stop_event is None:
156 stop_event = threading.Event()
157
158 self.daemon = True
159 self.interval_secs = interval_secs
160 self.stop_event = stop_event
161
162 def _flush_and_log_exceptions(self):
163 try:
164 flush()
165 except Exception:
166 logging.exception('Automatic monitoring flush failed.')
167
168 def run(self):
169 # Jitter the first interval so tasks started at the same time (say, by cron)
170 # on different machines don't all send metrics simultaneously.
171 next_timeout = random.uniform(self.interval_secs / 2.0, self.interval_secs)
172
173 while True:
174 if self.stop_event.wait(next_timeout):
175 self._flush_and_log_exceptions()
176 return
177
178 # Try to flush every N seconds exactly so rate calculations are more
179 # consistent.
180 start = time.time()
181 self._flush_and_log_exceptions()
182 flush_duration = time.time() - start
183 next_timeout = self.interval_secs - flush_duration
184
185 if next_timeout < 0:
186 logging.warning(
187 'Last monitoring flush took %f seconds (longer than '
188 '--ts-mon-flush-interval-secs = %f seconds)',
189 flush_duration, self.interval_secs)
190 next_timeout = 0
191
192 def stop(self):
193 """Stops the background thread and performs a final flush."""
194
195 self.stop_event.set()
196 self.join()
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698