| OLD | NEW |
| 1 # Copyright 2015 The Chromium Authors. All rights reserved. | 1 # Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 """Classes representing the monitoring interface for tasks or devices. | 5 """Classes representing the monitoring interface for tasks or devices. |
| 6 | 6 |
| 7 Usage: | 7 Usage: |
| 8 import argparse | 8 import argparse |
| 9 from infra_libs import ts_mon | 9 from infra_libs import ts_mon |
| 10 | 10 |
| (...skipping 21 matching lines...) Expand all Loading... |
| 32 """ | 32 """ |
| 33 | 33 |
| 34 import datetime | 34 import datetime |
| 35 import logging | 35 import logging |
| 36 import random | 36 import random |
| 37 import threading | 37 import threading |
| 38 import time | 38 import time |
| 39 | 39 |
| 40 from infra_libs.ts_mon.common import errors | 40 from infra_libs.ts_mon.common import errors |
| 41 from infra_libs.ts_mon.common import metric_store | 41 from infra_libs.ts_mon.common import metric_store |
| 42 from infra_libs.ts_mon.protos import metrics_pb2 | 42 from infra_libs.ts_mon.protos.current import metrics_pb2 |
| 43 from infra_libs.ts_mon.protos.new import metrics_pb2 as new_metrics_pb2 |
| 43 | 44 |
| 44 # The maximum number of MetricsData messages to include in each HTTP request. | 45 # The maximum number of MetricsData messages to include in each HTTP request. |
| 45 # MetricsCollections larger than this will be split into multiple requests. | 46 # MetricsCollections larger than this will be split into multiple requests. |
| 46 METRICS_DATA_LENGTH_LIMIT = 1000 | 47 METRICS_DATA_LENGTH_LIMIT = 1000 |
| 47 | 48 |
| 48 | 49 |
| 49 class State(object): | 50 class State(object): |
| 50 """Package-level state is stored here so that it is easily accessible. | 51 """Package-level state is stored here so that it is easily accessible. |
| 51 | 52 |
| 52 Configuration is kept in this one object at the global level so that all | 53 Configuration is kept in this one object at the global level so that all |
| (...skipping 19 matching lines...) Expand all Loading... |
| 72 # --ts-mon-flush != 'auto' or --ts-mon-flush-interval-secs == 0. | 73 # --ts-mon-flush != 'auto' or --ts-mon-flush-interval-secs == 0. |
| 73 self.flush_thread = None | 74 self.flush_thread = None |
| 74 # All metrics created by this application. | 75 # All metrics created by this application. |
| 75 self.metrics = {} | 76 self.metrics = {} |
| 76 # The MetricStore object that holds the actual metric values. | 77 # The MetricStore object that holds the actual metric values. |
| 77 self.store = store_ctor(self) | 78 self.store = store_ctor(self) |
| 78 # Cached time of the last flush. Useful mostly in AppEngine apps. | 79 # Cached time of the last flush. Useful mostly in AppEngine apps. |
| 79 self.last_flushed = datetime.datetime.utcfromtimestamp(0) | 80 self.last_flushed = datetime.datetime.utcfromtimestamp(0) |
| 80 # Metric name prefix | 81 # Metric name prefix |
| 81 self.metric_name_prefix = '/chrome/infra/' | 82 self.metric_name_prefix = '/chrome/infra/' |
| 83 # Use the new proto schema |
| 84 self.use_new_proto = False |
| 82 | 85 |
| 83 def reset_for_unittest(self): | 86 def reset_for_unittest(self): |
| 84 self.metrics = {} | 87 self.metrics = {} |
| 85 self.last_flushed = datetime.datetime.utcfromtimestamp(0) | 88 self.last_flushed = datetime.datetime.utcfromtimestamp(0) |
| 86 self.store.reset_for_unittest() | 89 self.store.reset_for_unittest() |
| 90 self.use_new_proto = False |
| 87 | 91 |
| 88 state = State() | 92 state = State() |
| 89 | 93 |
| 90 | 94 |
| 91 def flush(): | 95 def flush(): |
| 92 """Send all metrics that are registered in the application.""" | 96 """Send all metrics that are registered in the application.""" |
| 93 | |
| 94 if not state.flush_enabled_fn(): | 97 if not state.flush_enabled_fn(): |
| 95 logging.debug('ts_mon: sending metrics is disabled.') | 98 logging.debug('ts_mon: sending metrics is disabled.') |
| 96 return | 99 return |
| 97 | 100 |
| 98 if not state.global_monitor or not state.target: | 101 if not state.global_monitor or not state.target: |
| 99 raise errors.MonitoringNoConfiguredMonitorError(None) | 102 raise errors.MonitoringNoConfiguredMonitorError(None) |
| 100 | 103 |
| 104 if state.use_new_proto: |
| 105 generator = _generate_proto_new |
| 106 else: |
| 107 generator = _generate_proto |
| 108 |
| 109 for proto in generator(): |
| 110 state.global_monitor.send(proto) |
| 111 state.last_flushed = datetime.datetime.utcnow() |
| 112 |
| 113 |
| 114 def _generate_proto_new(): |
| 115 """Generate MetricsPayload for global_monitor.send().""" |
| 116 proto = new_metrics_pb2.MetricsPayload() |
| 117 |
| 118 collection_by_target = {} |
| 119 data_set_by_name = {} |
| 120 |
| 121 count = 0 |
| 122 for (target, metric, start_time, end_time, fields_values |
| 123 ) in state.store.get_all(): |
| 124 for fields, value in fields_values.iteritems(): |
| 125 if count >= METRICS_DATA_LENGTH_LIMIT: |
| 126 yield proto |
| 127 proto = new_metrics_pb2.MetricsPayload() |
| 128 collection_by_target.clear() |
| 129 data_set_by_name.clear() |
| 130 count = 0 |
| 131 |
| 132 if target not in collection_by_target: |
| 133 collection_by_target[target] = proto.metrics_collection.add() |
| 134 target._populate_target_pb_new(collection_by_target[target]) |
| 135 collection_pb = collection_by_target[target] |
| 136 |
| 137 if metric.name not in data_set_by_name: |
| 138 data_set_by_name[metric.name] = collection_pb.metrics_data_set.add() |
| 139 metric._populate_data_set(data_set_by_name[metric.name], fields) |
| 140 |
| 141 metric._populate_data(data_set_by_name[metric.name], start_time, |
| 142 end_time, fields, value) |
| 143 |
| 144 count += 1 |
| 145 |
| 146 if count > 0: |
| 147 yield proto |
| 148 |
| 149 |
| 150 def _generate_proto(): |
| 151 """Generate MetricsCollection for global_monitor.send().""" |
| 101 proto = metrics_pb2.MetricsCollection() | 152 proto = metrics_pb2.MetricsCollection() |
| 102 | 153 |
| 103 for target, metric, start_time, fields_values in state.store.get_all(): | 154 for target, metric, start_time, _, fields_values in state.store.get_all(): |
| 104 for fields, value in fields_values.iteritems(): | 155 for fields, value in fields_values.iteritems(): |
| 105 if len(proto.data) >= METRICS_DATA_LENGTH_LIMIT: | 156 if len(proto.data) >= METRICS_DATA_LENGTH_LIMIT: |
| 106 state.global_monitor.send(proto) | 157 yield proto |
| 107 del proto.data[:] | 158 proto = metrics_pb2.MetricsCollection() |
| 108 | 159 |
| 109 metric.serialize_to(proto, start_time, fields, value, target) | 160 metric.serialize_to(proto, start_time, fields, value, target) |
| 110 | 161 |
| 111 state.global_monitor.send(proto) | 162 if len(proto.data) > 0: |
| 112 state.last_flushed = datetime.datetime.utcnow() | 163 yield proto |
| 113 | 164 |
| 114 | 165 |
| 115 def register(metric): | 166 def register(metric): |
| 116 """Adds the metric to the list of metrics sent by flush(). | 167 """Adds the metric to the list of metrics sent by flush(). |
| 117 | 168 |
| 118 This is called automatically by Metric's constructor. | 169 This is called automatically by Metric's constructor. |
| 119 """ | 170 """ |
| 120 # If someone is registering the same metric object twice, that's okay, but | 171 # If someone is registering the same metric object twice, that's okay, but |
| 121 # registering two different metric objects with the same metric name is not. | 172 # registering two different metric objects with the same metric name is not. |
| 122 for m in state.metrics.values(): | 173 for m in state.metrics.values(): |
| (...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 187 'Last monitoring flush took %f seconds (longer than ' | 238 'Last monitoring flush took %f seconds (longer than ' |
| 188 '--ts-mon-flush-interval-secs = %f seconds)', | 239 '--ts-mon-flush-interval-secs = %f seconds)', |
| 189 flush_duration, self.interval_secs) | 240 flush_duration, self.interval_secs) |
| 190 next_timeout = 0 | 241 next_timeout = 0 |
| 191 | 242 |
| 192 def stop(self): | 243 def stop(self): |
| 193 """Stops the background thread and performs a final flush.""" | 244 """Stops the background thread and performs a final flush.""" |
| 194 | 245 |
| 195 self.stop_event.set() | 246 self.stop_event.set() |
| 196 self.join() | 247 self.join() |
| OLD | NEW |