| OLD | NEW |
| 1 # Copyright 2015 The Chromium Authors. All rights reserved. | 1 # Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 """Classes representing the monitoring interface for tasks or devices. | 5 """Classes representing the monitoring interface for tasks or devices. |
| 6 | 6 |
| 7 Usage: | 7 Usage: |
| 8 import argparse | 8 import argparse |
| 9 from infra_libs import ts_mon | 9 from infra_libs import ts_mon |
| 10 | 10 |
| (...skipping 22 matching lines...) Expand all Loading... |
| 33 | 33 |
| 34 import datetime | 34 import datetime |
| 35 import logging | 35 import logging |
| 36 import random | 36 import random |
| 37 import threading | 37 import threading |
| 38 import time | 38 import time |
| 39 import traceback | 39 import traceback |
| 40 | 40 |
| 41 from infra_libs.ts_mon.common import errors | 41 from infra_libs.ts_mon.common import errors |
| 42 from infra_libs.ts_mon.common import metric_store | 42 from infra_libs.ts_mon.common import metric_store |
| 43 from infra_libs.ts_mon.protos.current import metrics_pb2 | 43 from infra_libs.ts_mon.protos import metrics_pb2 |
| 44 from infra_libs.ts_mon.protos.new import metrics_pb2 as new_metrics_pb2 | |
| 45 | 44 |
| 46 # The maximum number of MetricsData messages to include in each HTTP request. | 45 # The maximum number of MetricsData messages to include in each HTTP request. |
| 47 # MetricsCollections larger than this will be split into multiple requests. | 46 # MetricsCollections larger than this will be split into multiple requests. |
| 48 METRICS_DATA_LENGTH_LIMIT = 500 | 47 METRICS_DATA_LENGTH_LIMIT = 500 |
| 49 | 48 |
| 50 | 49 |
| 51 class State(object): | 50 class State(object): |
| 52 """Package-level state is stored here so that it is easily accessible. | 51 """Package-level state is stored here so that it is easily accessible. |
| 53 | 52 |
| 54 Configuration is kept in this one object at the global level so that all | 53 Configuration is kept in this one object at the global level so that all |
| (...skipping 19 matching lines...) Expand all Loading... |
| 74 # --ts-mon-flush != 'auto' or --ts-mon-flush-interval-secs == 0. | 73 # --ts-mon-flush != 'auto' or --ts-mon-flush-interval-secs == 0. |
| 75 self.flush_thread = None | 74 self.flush_thread = None |
| 76 # All metrics created by this application. | 75 # All metrics created by this application. |
| 77 self.metrics = {} | 76 self.metrics = {} |
| 78 # The MetricStore object that holds the actual metric values. | 77 # The MetricStore object that holds the actual metric values. |
| 79 self.store = store_ctor(self) | 78 self.store = store_ctor(self) |
| 80 # Cached time of the last flush. Useful mostly in AppEngine apps. | 79 # Cached time of the last flush. Useful mostly in AppEngine apps. |
| 81 self.last_flushed = datetime.datetime.utcfromtimestamp(0) | 80 self.last_flushed = datetime.datetime.utcfromtimestamp(0) |
| 82 # Metric name prefix | 81 # Metric name prefix |
| 83 self.metric_name_prefix = '/chrome/infra/' | 82 self.metric_name_prefix = '/chrome/infra/' |
| 84 # Use the new proto schema | |
| 85 self.use_new_proto = False | |
| 86 # Metrics registered with register_global_metrics. Keyed by metric name. | 83 # Metrics registered with register_global_metrics. Keyed by metric name. |
| 87 self.global_metrics = {} | 84 self.global_metrics = {} |
| 88 # Callbacks registered with register_global_metrics_callback. Keyed by the | 85 # Callbacks registered with register_global_metrics_callback. Keyed by the |
| 89 # arbitrary string provided by the user. Called before each flush. | 86 # arbitrary string provided by the user. Called before each flush. |
| 90 self.global_metrics_callbacks = {} | 87 self.global_metrics_callbacks = {} |
| 91 # Whether to call invoke_global_callbacks() on every flush(). Set to False | 88 # Whether to call invoke_global_callbacks() on every flush(). Set to False |
| 92 # on Appengine because it does its own thing. | 89 # on Appengine because it does its own thing. |
| 93 self.invoke_global_callbacks_on_flush = True | 90 self.invoke_global_callbacks_on_flush = True |
| 94 | 91 |
| 95 def reset_for_unittest(self): | 92 def reset_for_unittest(self): |
| 96 self.metrics = {} | 93 self.metrics = {} |
| 97 self.global_metrics = {} | 94 self.global_metrics = {} |
| 98 self.global_metrics_callbacks = {} | 95 self.global_metrics_callbacks = {} |
| 99 self.invoke_global_callbacks_on_flush = True | 96 self.invoke_global_callbacks_on_flush = True |
| 100 self.last_flushed = datetime.datetime.utcfromtimestamp(0) | 97 self.last_flushed = datetime.datetime.utcfromtimestamp(0) |
| 101 self.store.reset_for_unittest() | 98 self.store.reset_for_unittest() |
| 102 self.use_new_proto = False | |
| 103 | 99 |
| 104 state = State() | 100 state = State() |
| 105 | 101 |
| 106 | 102 |
| 107 def flush(): | 103 def flush(): |
| 108 """Send all metrics that are registered in the application.""" | 104 """Send all metrics that are registered in the application.""" |
| 109 if not state.flush_enabled_fn(): | 105 if not state.flush_enabled_fn(): |
| 110 logging.debug('ts_mon: sending metrics is disabled.') | 106 logging.debug('ts_mon: sending metrics is disabled.') |
| 111 return | 107 return |
| 112 | 108 |
| 113 if not state.global_monitor or not state.target: | 109 if not state.global_monitor or not state.target: |
| 114 raise errors.MonitoringNoConfiguredMonitorError(None) | 110 raise errors.MonitoringNoConfiguredMonitorError(None) |
| 115 | 111 |
| 116 if state.invoke_global_callbacks_on_flush: | 112 if state.invoke_global_callbacks_on_flush: |
| 117 invoke_global_callbacks() | 113 invoke_global_callbacks() |
| 118 | 114 |
| 119 if state.use_new_proto: | 115 rpcs = [] |
| 120 generator = _generate_proto_new | 116 for proto in _generate_proto(): |
| 121 else: | 117 rpcs.append(state.global_monitor.send(proto)) |
| 122 generator = _generate_proto | 118 for rpc in rpcs: |
| 123 | 119 if rpc is not None: |
| 124 for proto in generator(): | 120 state.global_monitor.wait(rpc) |
| 125 state.global_monitor.send(proto) | |
| 126 state.last_flushed = datetime.datetime.utcnow() | 121 state.last_flushed = datetime.datetime.utcnow() |
| 127 | 122 |
| 128 | 123 |
| 129 def _generate_proto_new(): | 124 def _generate_proto(): |
| 130 """Generate MetricsPayload for global_monitor.send().""" | 125 """Generate MetricsPayload for global_monitor.send().""" |
| 131 proto = new_metrics_pb2.MetricsPayload() | 126 proto = metrics_pb2.MetricsPayload() |
| 132 | 127 |
| 133 # Key: Target, value: MetricsCollection. | 128 # Key: Target, value: MetricsCollection. |
| 134 collections = {} | 129 collections = {} |
| 135 | 130 |
| 136 # Key: (Target, metric name) tuple, value: MetricsDataSet. | 131 # Key: (Target, metric name) tuple, value: MetricsDataSet. |
| 137 data_sets = {} | 132 data_sets = {} |
| 138 | 133 |
| 139 count = 0 | 134 count = 0 |
| 140 for (target, metric, start_time, end_time, fields_values | 135 for (target, metric, start_time, end_time, fields_values |
| 141 ) in state.store.get_all(): | 136 ) in state.store.get_all(): |
| 142 for fields, value in fields_values.iteritems(): | 137 for fields, value in fields_values.iteritems(): |
| 143 if count >= METRICS_DATA_LENGTH_LIMIT: | 138 if count >= METRICS_DATA_LENGTH_LIMIT: |
| 144 yield proto | 139 yield proto |
| 145 proto = new_metrics_pb2.MetricsPayload() | 140 proto = metrics_pb2.MetricsPayload() |
| 146 collections.clear() | 141 collections.clear() |
| 147 data_sets.clear() | 142 data_sets.clear() |
| 148 count = 0 | 143 count = 0 |
| 149 | 144 |
| 150 if target not in collections: | 145 if target not in collections: |
| 151 collections[target] = proto.metrics_collection.add() | 146 collections[target] = proto.metrics_collection.add() |
| 152 target._populate_target_pb_new(collections[target]) | 147 target.populate_target_pb(collections[target]) |
| 153 collection = collections[target] | 148 collection = collections[target] |
| 154 | 149 |
| 155 key = (target, metric.name) | 150 key = (target, metric.name) |
| 156 new_data_set = None | 151 new_data_set = None |
| 157 if key not in data_sets: | 152 if key not in data_sets: |
| 158 new_data_set = new_metrics_pb2.MetricsDataSet() | 153 new_data_set = metrics_pb2.MetricsDataSet() |
| 159 metric._populate_data_set(new_data_set) | 154 metric.populate_data_set(new_data_set) |
| 160 | 155 |
| 161 data = new_metrics_pb2.MetricsData() | 156 data = metrics_pb2.MetricsData() |
| 162 metric._populate_data(data, start_time, end_time, fields, value) | 157 metric.populate_data(data, start_time, end_time, fields, value) |
| 163 | 158 |
| 164 # All required data protos have been successfully populated. Now we can | 159 # All required data protos have been successfully populated. Now we can |
| 165 # insert them in serialized proto and bookeeping data structures. | 160 # insert them in serialized proto and bookeeping data structures. |
| 166 if new_data_set is not None: | 161 if new_data_set is not None: |
| 167 collection.metrics_data_set.add().CopyFrom(new_data_set) | 162 collection.metrics_data_set.add().CopyFrom(new_data_set) |
| 168 data_sets[key] = collection.metrics_data_set[-1] | 163 data_sets[key] = collection.metrics_data_set[-1] |
| 169 data_sets[key].data.add().CopyFrom(data) | 164 data_sets[key].data.add().CopyFrom(data) |
| 170 count += 1 | 165 count += 1 |
| 171 | 166 |
| 172 if count > 0: | 167 if count > 0: |
| 173 yield proto | 168 yield proto |
| 174 | 169 |
| 175 | 170 |
| 176 def _generate_proto(): | |
| 177 """Generate MetricsCollection for global_monitor.send().""" | |
| 178 proto = metrics_pb2.MetricsCollection() | |
| 179 | |
| 180 for target, metric, start_time, _, fields_values in state.store.get_all(): | |
| 181 for fields, value in fields_values.iteritems(): | |
| 182 if len(proto.data) >= METRICS_DATA_LENGTH_LIMIT: | |
| 183 yield proto | |
| 184 proto = metrics_pb2.MetricsCollection() | |
| 185 | |
| 186 metrics_pb = metrics_pb2.MetricsData() | |
| 187 metric.serialize_to(metrics_pb, start_time, fields, value, target) | |
| 188 | |
| 189 proto.data.add().CopyFrom(metrics_pb) | |
| 190 | |
| 191 if len(proto.data) > 0: | |
| 192 yield proto | |
| 193 | |
| 194 | |
| 195 def register(metric): | 171 def register(metric): |
| 196 """Adds the metric to the list of metrics sent by flush(). | 172 """Adds the metric to the list of metrics sent by flush(). |
| 197 | 173 |
| 198 This is called automatically by Metric's constructor. | 174 This is called automatically by Metric's constructor. |
| 199 """ | 175 """ |
| 200 # If someone is registering the same metric object twice, that's okay, but | 176 # If someone is registering the same metric object twice, that's okay, but |
| 201 # registering two different metric objects with the same metric name is not. | 177 # registering two different metric objects with the same metric name is not. |
| 202 for m in state.metrics.values(): | 178 for m in state.metrics.values(): |
| 203 if metric == m: | 179 if metric == m: |
| 204 state.metrics[metric.name] = metric | 180 state.metrics[metric.name] = metric |
| (...skipping 10 matching lines...) Expand all Loading... |
| 215 | 191 |
| 216 | 192 |
| 217 def close(): | 193 def close(): |
| 218 """Stops any background threads and waits for them to exit.""" | 194 """Stops any background threads and waits for them to exit.""" |
| 219 if state.flush_thread is not None: | 195 if state.flush_thread is not None: |
| 220 state.flush_thread.stop() | 196 state.flush_thread.stop() |
| 221 | 197 |
| 222 | 198 |
| 223 def reset_for_unittest(disable=False): | 199 def reset_for_unittest(disable=False): |
| 224 state.reset_for_unittest() | 200 state.reset_for_unittest() |
| 225 if disable: | 201 state.flush_enabled_fn = lambda: not disable |
| 226 state.flush_enabled_fn = lambda: False | |
| 227 | 202 |
| 228 | 203 |
| 229 def register_global_metrics(metrics): | 204 def register_global_metrics(metrics): |
| 230 """Declare metrics as global. | 205 """Declare metrics as global. |
| 231 | 206 |
| 232 Outside Appengine this has no effect. | 207 Outside Appengine this has no effect. |
| 233 | 208 |
| 234 On Appengine, registering a metric as "global" simply means it will be reset | 209 On Appengine, registering a metric as "global" simply means it will be reset |
| 235 every time the metric is sent. This allows any instance to send such a metric | 210 every time the metric is sent. This allows any instance to send such a metric |
| 236 to a shared stream, e.g. by overriding target fields like task_num (instance | 211 to a shared stream, e.g. by overriding target fields like task_num (instance |
| (...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 315 'Last monitoring flush took %f seconds (longer than ' | 290 'Last monitoring flush took %f seconds (longer than ' |
| 316 '--ts-mon-flush-interval-secs = %f seconds)', | 291 '--ts-mon-flush-interval-secs = %f seconds)', |
| 317 flush_duration, self.interval_secs) | 292 flush_duration, self.interval_secs) |
| 318 next_timeout = 0 | 293 next_timeout = 0 |
| 319 | 294 |
| 320 def stop(self): | 295 def stop(self): |
| 321 """Stops the background thread and performs a final flush.""" | 296 """Stops the background thread and performs a final flush.""" |
| 322 | 297 |
| 323 self.stop_event.set() | 298 self.stop_event.set() |
| 324 self.join() | 299 self.join() |
| OLD | NEW |