Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(319)

Side by Side Diff: client/third_party/infra_libs/ts_mon/common/interface.py

Issue 2991803002: Update infra_libs to 1.1.15 / 0b44aba87c1c6538439df6d24a409870810747ab (Closed)
Patch Set: fix Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2015 The Chromium Authors. All rights reserved. 1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 """Classes representing the monitoring interface for tasks or devices. 5 """Classes representing the monitoring interface for tasks or devices.
6 6
7 Usage: 7 Usage:
8 import argparse 8 import argparse
9 from infra_libs import ts_mon 9 from infra_libs import ts_mon
10 10
(...skipping 22 matching lines...) Expand all
33 33
34 import datetime 34 import datetime
35 import logging 35 import logging
36 import random 36 import random
37 import threading 37 import threading
38 import time 38 import time
39 import traceback 39 import traceback
40 40
41 from infra_libs.ts_mon.common import errors 41 from infra_libs.ts_mon.common import errors
42 from infra_libs.ts_mon.common import metric_store 42 from infra_libs.ts_mon.common import metric_store
43 from infra_libs.ts_mon.protos.current import metrics_pb2 43 from infra_libs.ts_mon.protos import metrics_pb2
44 from infra_libs.ts_mon.protos.new import metrics_pb2 as new_metrics_pb2
45 44
46 # The maximum number of MetricsData messages to include in each HTTP request. 45 # The maximum number of MetricsData messages to include in each HTTP request.
47 # MetricsCollections larger than this will be split into multiple requests. 46 # MetricsCollections larger than this will be split into multiple requests.
48 METRICS_DATA_LENGTH_LIMIT = 500 47 METRICS_DATA_LENGTH_LIMIT = 500
49 48
50 49
51 class State(object): 50 class State(object):
52 """Package-level state is stored here so that it is easily accessible. 51 """Package-level state is stored here so that it is easily accessible.
53 52
54 Configuration is kept in this one object at the global level so that all 53 Configuration is kept in this one object at the global level so that all
(...skipping 19 matching lines...) Expand all
74 # --ts-mon-flush != 'auto' or --ts-mon-flush-interval-secs == 0. 73 # --ts-mon-flush != 'auto' or --ts-mon-flush-interval-secs == 0.
75 self.flush_thread = None 74 self.flush_thread = None
76 # All metrics created by this application. 75 # All metrics created by this application.
77 self.metrics = {} 76 self.metrics = {}
78 # The MetricStore object that holds the actual metric values. 77 # The MetricStore object that holds the actual metric values.
79 self.store = store_ctor(self) 78 self.store = store_ctor(self)
80 # Cached time of the last flush. Useful mostly in AppEngine apps. 79 # Cached time of the last flush. Useful mostly in AppEngine apps.
81 self.last_flushed = datetime.datetime.utcfromtimestamp(0) 80 self.last_flushed = datetime.datetime.utcfromtimestamp(0)
82 # Metric name prefix 81 # Metric name prefix
83 self.metric_name_prefix = '/chrome/infra/' 82 self.metric_name_prefix = '/chrome/infra/'
84 # Use the new proto schema
85 self.use_new_proto = False
86 # Metrics registered with register_global_metrics. Keyed by metric name. 83 # Metrics registered with register_global_metrics. Keyed by metric name.
87 self.global_metrics = {} 84 self.global_metrics = {}
88 # Callbacks registered with register_global_metrics_callback. Keyed by the 85 # Callbacks registered with register_global_metrics_callback. Keyed by the
89 # arbitrary string provided by the user. Called before each flush. 86 # arbitrary string provided by the user. Called before each flush.
90 self.global_metrics_callbacks = {} 87 self.global_metrics_callbacks = {}
91 # Whether to call invoke_global_callbacks() on every flush(). Set to False 88 # Whether to call invoke_global_callbacks() on every flush(). Set to False
92 # on Appengine because it does its own thing. 89 # on Appengine because it does its own thing.
93 self.invoke_global_callbacks_on_flush = True 90 self.invoke_global_callbacks_on_flush = True
94 91
95 def reset_for_unittest(self): 92 def reset_for_unittest(self):
96 self.metrics = {} 93 self.metrics = {}
97 self.global_metrics = {} 94 self.global_metrics = {}
98 self.global_metrics_callbacks = {} 95 self.global_metrics_callbacks = {}
99 self.invoke_global_callbacks_on_flush = True 96 self.invoke_global_callbacks_on_flush = True
100 self.last_flushed = datetime.datetime.utcfromtimestamp(0) 97 self.last_flushed = datetime.datetime.utcfromtimestamp(0)
101 self.store.reset_for_unittest() 98 self.store.reset_for_unittest()
102 self.use_new_proto = False
103 99
104 state = State() 100 state = State()
105 101
106 102
107 def flush(): 103 def flush():
108 """Send all metrics that are registered in the application.""" 104 """Send all metrics that are registered in the application."""
109 if not state.flush_enabled_fn(): 105 if not state.flush_enabled_fn():
110 logging.debug('ts_mon: sending metrics is disabled.') 106 logging.debug('ts_mon: sending metrics is disabled.')
111 return 107 return
112 108
113 if not state.global_monitor or not state.target: 109 if not state.global_monitor or not state.target:
114 raise errors.MonitoringNoConfiguredMonitorError(None) 110 raise errors.MonitoringNoConfiguredMonitorError(None)
115 111
116 if state.invoke_global_callbacks_on_flush: 112 if state.invoke_global_callbacks_on_flush:
117 invoke_global_callbacks() 113 invoke_global_callbacks()
118 114
119 if state.use_new_proto: 115 rpcs = []
120 generator = _generate_proto_new 116 for proto in _generate_proto():
121 else: 117 rpcs.append(state.global_monitor.send(proto))
122 generator = _generate_proto 118 for rpc in rpcs:
123 119 if rpc is not None:
124 for proto in generator(): 120 state.global_monitor.wait(rpc)
125 state.global_monitor.send(proto)
126 state.last_flushed = datetime.datetime.utcnow() 121 state.last_flushed = datetime.datetime.utcnow()
127 122
128 123
129 def _generate_proto_new(): 124 def _generate_proto():
130 """Generate MetricsPayload for global_monitor.send().""" 125 """Generate MetricsPayload for global_monitor.send()."""
131 proto = new_metrics_pb2.MetricsPayload() 126 proto = metrics_pb2.MetricsPayload()
132 127
133 # Key: Target, value: MetricsCollection. 128 # Key: Target, value: MetricsCollection.
134 collections = {} 129 collections = {}
135 130
136 # Key: (Target, metric name) tuple, value: MetricsDataSet. 131 # Key: (Target, metric name) tuple, value: MetricsDataSet.
137 data_sets = {} 132 data_sets = {}
138 133
139 count = 0 134 count = 0
140 for (target, metric, start_time, end_time, fields_values 135 for (target, metric, start_time, end_time, fields_values
141 ) in state.store.get_all(): 136 ) in state.store.get_all():
142 for fields, value in fields_values.iteritems(): 137 for fields, value in fields_values.iteritems():
143 if count >= METRICS_DATA_LENGTH_LIMIT: 138 if count >= METRICS_DATA_LENGTH_LIMIT:
144 yield proto 139 yield proto
145 proto = new_metrics_pb2.MetricsPayload() 140 proto = metrics_pb2.MetricsPayload()
146 collections.clear() 141 collections.clear()
147 data_sets.clear() 142 data_sets.clear()
148 count = 0 143 count = 0
149 144
150 if target not in collections: 145 if target not in collections:
151 collections[target] = proto.metrics_collection.add() 146 collections[target] = proto.metrics_collection.add()
152 target._populate_target_pb_new(collections[target]) 147 target.populate_target_pb(collections[target])
153 collection = collections[target] 148 collection = collections[target]
154 149
155 key = (target, metric.name) 150 key = (target, metric.name)
156 new_data_set = None 151 new_data_set = None
157 if key not in data_sets: 152 if key not in data_sets:
158 new_data_set = new_metrics_pb2.MetricsDataSet() 153 new_data_set = metrics_pb2.MetricsDataSet()
159 metric._populate_data_set(new_data_set) 154 metric.populate_data_set(new_data_set)
160 155
161 data = new_metrics_pb2.MetricsData() 156 data = metrics_pb2.MetricsData()
162 metric._populate_data(data, start_time, end_time, fields, value) 157 metric.populate_data(data, start_time, end_time, fields, value)
163 158
164 # All required data protos have been successfully populated. Now we can 159 # All required data protos have been successfully populated. Now we can
165 # insert them in serialized proto and bookeeping data structures. 160 # insert them in serialized proto and bookeeping data structures.
166 if new_data_set is not None: 161 if new_data_set is not None:
167 collection.metrics_data_set.add().CopyFrom(new_data_set) 162 collection.metrics_data_set.add().CopyFrom(new_data_set)
168 data_sets[key] = collection.metrics_data_set[-1] 163 data_sets[key] = collection.metrics_data_set[-1]
169 data_sets[key].data.add().CopyFrom(data) 164 data_sets[key].data.add().CopyFrom(data)
170 count += 1 165 count += 1
171 166
172 if count > 0: 167 if count > 0:
173 yield proto 168 yield proto
174 169
175 170
176 def _generate_proto():
177 """Generate MetricsCollection for global_monitor.send()."""
178 proto = metrics_pb2.MetricsCollection()
179
180 for target, metric, start_time, _, fields_values in state.store.get_all():
181 for fields, value in fields_values.iteritems():
182 if len(proto.data) >= METRICS_DATA_LENGTH_LIMIT:
183 yield proto
184 proto = metrics_pb2.MetricsCollection()
185
186 metrics_pb = metrics_pb2.MetricsData()
187 metric.serialize_to(metrics_pb, start_time, fields, value, target)
188
189 proto.data.add().CopyFrom(metrics_pb)
190
191 if len(proto.data) > 0:
192 yield proto
193
194
195 def register(metric): 171 def register(metric):
196 """Adds the metric to the list of metrics sent by flush(). 172 """Adds the metric to the list of metrics sent by flush().
197 173
198 This is called automatically by Metric's constructor. 174 This is called automatically by Metric's constructor.
199 """ 175 """
200 # If someone is registering the same metric object twice, that's okay, but 176 # If someone is registering the same metric object twice, that's okay, but
201 # registering two different metric objects with the same metric name is not. 177 # registering two different metric objects with the same metric name is not.
202 for m in state.metrics.values(): 178 for m in state.metrics.values():
203 if metric == m: 179 if metric == m:
204 state.metrics[metric.name] = metric 180 state.metrics[metric.name] = metric
(...skipping 10 matching lines...) Expand all
215 191
216 192
217 def close(): 193 def close():
218 """Stops any background threads and waits for them to exit.""" 194 """Stops any background threads and waits for them to exit."""
219 if state.flush_thread is not None: 195 if state.flush_thread is not None:
220 state.flush_thread.stop() 196 state.flush_thread.stop()
221 197
222 198
223 def reset_for_unittest(disable=False): 199 def reset_for_unittest(disable=False):
224 state.reset_for_unittest() 200 state.reset_for_unittest()
225 if disable: 201 state.flush_enabled_fn = lambda: not disable
226 state.flush_enabled_fn = lambda: False
227 202
228 203
229 def register_global_metrics(metrics): 204 def register_global_metrics(metrics):
230 """Declare metrics as global. 205 """Declare metrics as global.
231 206
232 Outside Appengine this has no effect. 207 Outside Appengine this has no effect.
233 208
234 On Appengine, registering a metric as "global" simply means it will be reset 209 On Appengine, registering a metric as "global" simply means it will be reset
235 every time the metric is sent. This allows any instance to send such a metric 210 every time the metric is sent. This allows any instance to send such a metric
236 to a shared stream, e.g. by overriding target fields like task_num (instance 211 to a shared stream, e.g. by overriding target fields like task_num (instance
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after
315 'Last monitoring flush took %f seconds (longer than ' 290 'Last monitoring flush took %f seconds (longer than '
316 '--ts-mon-flush-interval-secs = %f seconds)', 291 '--ts-mon-flush-interval-secs = %f seconds)',
317 flush_duration, self.interval_secs) 292 flush_duration, self.interval_secs)
318 next_timeout = 0 293 next_timeout = 0
319 294
320 def stop(self): 295 def stop(self):
321 """Stops the background thread and performs a final flush.""" 296 """Stops the background thread and performs a final flush."""
322 297
323 self.stop_event.set() 298 self.stop_event.set()
324 self.join() 299 self.join()
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698