| OLD | NEW |
| 1 # Copyright 2015 The Chromium Authors. All rights reserved. | 1 # Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 """Classes representing the monitoring interface for tasks or devices. | 5 """Classes representing the monitoring interface for tasks or devices. |
| 6 | 6 |
| 7 Usage: | 7 Usage: |
| 8 import argparse | 8 import argparse |
| 9 from infra_libs import ts_mon | 9 from infra_libs import ts_mon |
| 10 | 10 |
| (...skipping 18 matching lines...) Expand all Loading... |
| 29 c.set(0) | 29 c.set(0) |
| 30 for x in range(100): | 30 for x in range(100): |
| 31 c.increment() | 31 c.increment() |
| 32 """ | 32 """ |
| 33 | 33 |
| 34 import datetime | 34 import datetime |
| 35 import logging | 35 import logging |
| 36 import random | 36 import random |
| 37 import threading | 37 import threading |
| 38 import time | 38 import time |
| 39 import traceback |
| 39 | 40 |
| 40 from infra_libs.ts_mon.common import errors | 41 from infra_libs.ts_mon.common import errors |
| 41 from infra_libs.ts_mon.common import metric_store | 42 from infra_libs.ts_mon.common import metric_store |
| 42 from infra_libs.ts_mon.protos.current import metrics_pb2 | 43 from infra_libs.ts_mon.protos.current import metrics_pb2 |
| 43 from infra_libs.ts_mon.protos.new import metrics_pb2 as new_metrics_pb2 | 44 from infra_libs.ts_mon.protos.new import metrics_pb2 as new_metrics_pb2 |
| 44 | 45 |
| 45 # The maximum number of MetricsData messages to include in each HTTP request. | 46 # The maximum number of MetricsData messages to include in each HTTP request. |
| 46 # MetricsCollections larger than this will be split into multiple requests. | 47 # MetricsCollections larger than this will be split into multiple requests. |
| 47 METRICS_DATA_LENGTH_LIMIT = 1000 | 48 METRICS_DATA_LENGTH_LIMIT = 1000 |
| 48 | 49 |
| (...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 108 | 109 |
| 109 for proto in generator(): | 110 for proto in generator(): |
| 110 state.global_monitor.send(proto) | 111 state.global_monitor.send(proto) |
| 111 state.last_flushed = datetime.datetime.utcnow() | 112 state.last_flushed = datetime.datetime.utcnow() |
| 112 | 113 |
| 113 | 114 |
| 114 def _generate_proto_new(): | 115 def _generate_proto_new(): |
| 115 """Generate MetricsPayload for global_monitor.send().""" | 116 """Generate MetricsPayload for global_monitor.send().""" |
| 116 proto = new_metrics_pb2.MetricsPayload() | 117 proto = new_metrics_pb2.MetricsPayload() |
| 117 | 118 |
| 118 collection_by_target = {} | 119 # Key: Target, value: MetricsCollection. |
| 119 data_set_by_name = {} | 120 collections = {} |
| 121 |
| 122 # Key: (Target, metric name) tuple, value: MetricsDataSet. |
| 123 data_sets = {} |
| 120 | 124 |
| 121 count = 0 | 125 count = 0 |
| 126 error_count = 0 |
| 122 for (target, metric, start_time, end_time, fields_values | 127 for (target, metric, start_time, end_time, fields_values |
| 123 ) in state.store.get_all(): | 128 ) in state.store.get_all(): |
| 124 for fields, value in fields_values.iteritems(): | 129 for fields, value in fields_values.iteritems(): |
| 125 if count >= METRICS_DATA_LENGTH_LIMIT: | 130 if count >= METRICS_DATA_LENGTH_LIMIT: |
| 126 yield proto | 131 yield proto |
| 127 proto = new_metrics_pb2.MetricsPayload() | 132 proto = new_metrics_pb2.MetricsPayload() |
| 128 collection_by_target.clear() | 133 collections.clear() |
| 129 data_set_by_name.clear() | 134 data_sets.clear() |
| 130 count = 0 | 135 count = 0 |
| 131 | 136 |
| 132 if target not in collection_by_target: | 137 if target not in collections: |
| 133 collection_by_target[target] = proto.metrics_collection.add() | 138 collections[target] = proto.metrics_collection.add() |
| 134 target._populate_target_pb_new(collection_by_target[target]) | 139 target._populate_target_pb_new(collections[target]) |
| 135 collection_pb = collection_by_target[target] | 140 collection = collections[target] |
| 136 | 141 |
| 137 if metric.name not in data_set_by_name: | 142 key = (target, metric.name) |
| 138 data_set_by_name[metric.name] = collection_pb.metrics_data_set.add() | 143 new_data_set = None |
| 139 metric._populate_data_set(data_set_by_name[metric.name], fields) | 144 try: |
| 145 if key not in data_sets: |
| 146 new_data_set = new_metrics_pb2.MetricsDataSet() |
| 147 metric._populate_data_set(new_data_set, fields) |
| 140 | 148 |
| 141 metric._populate_data(data_set_by_name[metric.name], start_time, | 149 data = new_metrics_pb2.MetricsData() |
| 142 end_time, fields, value) | 150 metric._populate_data(data, start_time, end_time, fields, value) |
| 151 except errors.MonitoringError: |
| 152 logging.exception('Failed to serialize a metric.') |
| 153 error_count += 1 |
| 154 continue |
| 143 | 155 |
| 156 # All required data protos have been successfully populated. Now we can |
| 157 # insert them in serialized proto and bookeeping data structures. |
| 158 if new_data_set is not None: |
| 159 collection.metrics_data_set.add().CopyFrom(new_data_set) |
| 160 data_sets[key] = collection.metrics_data_set[-1] |
| 161 data_sets[key].data.add().CopyFrom(data) |
| 144 count += 1 | 162 count += 1 |
| 145 | 163 |
| 146 if count > 0: | 164 if count > 0: |
| 147 yield proto | 165 yield proto |
| 148 | 166 |
| 167 if error_count: |
| 168 raise errors.MonitoringFailedToFlushAllMetricsError(error_count) |
| 169 |
| 149 | 170 |
| 150 def _generate_proto(): | 171 def _generate_proto(): |
| 151 """Generate MetricsCollection for global_monitor.send().""" | 172 """Generate MetricsCollection for global_monitor.send().""" |
| 152 proto = metrics_pb2.MetricsCollection() | 173 proto = metrics_pb2.MetricsCollection() |
| 153 | 174 |
| 175 error_count = 0 |
| 154 for target, metric, start_time, _, fields_values in state.store.get_all(): | 176 for target, metric, start_time, _, fields_values in state.store.get_all(): |
| 155 for fields, value in fields_values.iteritems(): | 177 for fields, value in fields_values.iteritems(): |
| 156 if len(proto.data) >= METRICS_DATA_LENGTH_LIMIT: | 178 if len(proto.data) >= METRICS_DATA_LENGTH_LIMIT: |
| 157 yield proto | 179 yield proto |
| 158 proto = metrics_pb2.MetricsCollection() | 180 proto = metrics_pb2.MetricsCollection() |
| 159 | 181 |
| 160 metric.serialize_to(proto, start_time, fields, value, target) | 182 try: |
| 183 metrics_pb = metrics_pb2.MetricsData() |
| 184 metric.serialize_to(metrics_pb, start_time, fields, value, target) |
| 185 except errors.MonitoringError: |
| 186 error_count += 1 |
| 187 logging.exception('Failed to serialize a metric.') |
| 188 continue |
| 189 |
| 190 proto.data.add().CopyFrom(metrics_pb) |
| 161 | 191 |
| 162 if len(proto.data) > 0: | 192 if len(proto.data) > 0: |
| 163 yield proto | 193 yield proto |
| 164 | 194 |
| 195 if error_count: |
| 196 raise errors.MonitoringFailedToFlushAllMetricsError(error_count) |
| 197 |
| 165 | 198 |
| 166 def register(metric): | 199 def register(metric): |
| 167 """Adds the metric to the list of metrics sent by flush(). | 200 """Adds the metric to the list of metrics sent by flush(). |
| 168 | 201 |
| 169 This is called automatically by Metric's constructor. | 202 This is called automatically by Metric's constructor. |
| 170 """ | 203 """ |
| 171 # If someone is registering the same metric object twice, that's okay, but | 204 # If someone is registering the same metric object twice, that's okay, but |
| 172 # registering two different metric objects with the same metric name is not. | 205 # registering two different metric objects with the same metric name is not. |
| 173 for m in state.metrics.values(): | 206 for m in state.metrics.values(): |
| 174 if metric == m: | 207 if metric == m: |
| (...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 238 'Last monitoring flush took %f seconds (longer than ' | 271 'Last monitoring flush took %f seconds (longer than ' |
| 239 '--ts-mon-flush-interval-secs = %f seconds)', | 272 '--ts-mon-flush-interval-secs = %f seconds)', |
| 240 flush_duration, self.interval_secs) | 273 flush_duration, self.interval_secs) |
| 241 next_timeout = 0 | 274 next_timeout = 0 |
| 242 | 275 |
| 243 def stop(self): | 276 def stop(self): |
| 244 """Stops the background thread and performs a final flush.""" | 277 """Stops the background thread and performs a final flush.""" |
| 245 | 278 |
| 246 self.stop_event.set() | 279 self.stop_event.set() |
| 247 self.join() | 280 self.join() |
| OLD | NEW |