| OLD | NEW |
| 1 # Copyright 2015 The Chromium Authors. All rights reserved. | 1 # Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 """Classes representing the monitoring interface for tasks or devices. | 5 """Classes representing the monitoring interface for tasks or devices. |
| 6 | 6 |
| 7 Usage: | 7 Usage: |
| 8 import argparse | 8 import argparse |
| 9 from infra_libs import ts_mon | 9 from infra_libs import ts_mon |
| 10 | 10 |
| (...skipping 105 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 116 """Generate MetricsPayload for global_monitor.send().""" | 116 """Generate MetricsPayload for global_monitor.send().""" |
| 117 proto = new_metrics_pb2.MetricsPayload() | 117 proto = new_metrics_pb2.MetricsPayload() |
| 118 | 118 |
| 119 # Key: Target, value: MetricsCollection. | 119 # Key: Target, value: MetricsCollection. |
| 120 collections = {} | 120 collections = {} |
| 121 | 121 |
| 122 # Key: (Target, metric name) tuple, value: MetricsDataSet. | 122 # Key: (Target, metric name) tuple, value: MetricsDataSet. |
| 123 data_sets = {} | 123 data_sets = {} |
| 124 | 124 |
| 125 count = 0 | 125 count = 0 |
| 126 error_count = 0 |
| 126 for (target, metric, start_time, end_time, fields_values | 127 for (target, metric, start_time, end_time, fields_values |
| 127 ) in state.store.get_all(): | 128 ) in state.store.get_all(): |
| 128 for fields, value in fields_values.iteritems(): | 129 for fields, value in fields_values.iteritems(): |
| 129 if count >= METRICS_DATA_LENGTH_LIMIT: | 130 if count >= METRICS_DATA_LENGTH_LIMIT: |
| 130 yield proto | 131 yield proto |
| 131 proto = new_metrics_pb2.MetricsPayload() | 132 proto = new_metrics_pb2.MetricsPayload() |
| 132 collections.clear() | 133 collections.clear() |
| 133 data_sets.clear() | 134 data_sets.clear() |
| 134 count = 0 | 135 count = 0 |
| 135 | 136 |
| 136 if target not in collections: | 137 if target not in collections: |
| 137 collections[target] = proto.metrics_collection.add() | 138 collections[target] = proto.metrics_collection.add() |
| 138 target._populate_target_pb_new(collections[target]) | 139 target._populate_target_pb_new(collections[target]) |
| 139 collection = collections[target] | 140 collection = collections[target] |
| 140 | 141 |
| 141 key = (target, metric.name) | 142 key = (target, metric.name) |
| 142 new_data_set = None | 143 new_data_set = None |
| 143 if key not in data_sets: | 144 try: |
| 144 new_data_set = new_metrics_pb2.MetricsDataSet() | 145 if key not in data_sets: |
| 145 metric._populate_data_set(new_data_set) | 146 new_data_set = new_metrics_pb2.MetricsDataSet() |
| 147 metric._populate_data_set(new_data_set, fields) |
| 146 | 148 |
| 147 data = new_metrics_pb2.MetricsData() | 149 data = new_metrics_pb2.MetricsData() |
| 148 metric._populate_data(data, start_time, end_time, fields, value) | 150 metric._populate_data(data, start_time, end_time, fields, value) |
| 151 except errors.MonitoringError: |
| 152 logging.exception('Failed to serialize a metric.') |
| 153 error_count += 1 |
| 154 continue |
| 149 | 155 |
| 150 # All required data protos have been successfully populated. Now we can | 156 # All required data protos have been successfully populated. Now we can |
| 151 # insert them in serialized proto and bookeeping data structures. | 157 # insert them in serialized proto and bookeeping data structures. |
| 152 if new_data_set is not None: | 158 if new_data_set is not None: |
| 153 collection.metrics_data_set.add().CopyFrom(new_data_set) | 159 collection.metrics_data_set.add().CopyFrom(new_data_set) |
| 154 data_sets[key] = collection.metrics_data_set[-1] | 160 data_sets[key] = collection.metrics_data_set[-1] |
| 155 data_sets[key].data.add().CopyFrom(data) | 161 data_sets[key].data.add().CopyFrom(data) |
| 156 count += 1 | 162 count += 1 |
| 157 | 163 |
| 158 if count > 0: | 164 if count > 0: |
| 159 yield proto | 165 yield proto |
| 160 | 166 |
| 167 if error_count: |
| 168 raise errors.MonitoringFailedToFlushAllMetricsError(error_count) |
| 169 |
| 161 | 170 |
| 162 def _generate_proto(): | 171 def _generate_proto(): |
| 163 """Generate MetricsCollection for global_monitor.send().""" | 172 """Generate MetricsCollection for global_monitor.send().""" |
| 164 proto = metrics_pb2.MetricsCollection() | 173 proto = metrics_pb2.MetricsCollection() |
| 165 | 174 |
| 175 error_count = 0 |
| 166 for target, metric, start_time, _, fields_values in state.store.get_all(): | 176 for target, metric, start_time, _, fields_values in state.store.get_all(): |
| 167 for fields, value in fields_values.iteritems(): | 177 for fields, value in fields_values.iteritems(): |
| 168 if len(proto.data) >= METRICS_DATA_LENGTH_LIMIT: | 178 if len(proto.data) >= METRICS_DATA_LENGTH_LIMIT: |
| 169 yield proto | 179 yield proto |
| 170 proto = metrics_pb2.MetricsCollection() | 180 proto = metrics_pb2.MetricsCollection() |
| 171 | 181 |
| 172 metrics_pb = metrics_pb2.MetricsData() | 182 try: |
| 173 metric.serialize_to(metrics_pb, start_time, fields, value, target) | 183 metrics_pb = metrics_pb2.MetricsData() |
| 184 metric.serialize_to(metrics_pb, start_time, fields, value, target) |
| 185 except errors.MonitoringError: |
| 186 error_count += 1 |
| 187 logging.exception('Failed to serialize a metric.') |
| 188 continue |
| 174 | 189 |
| 175 proto.data.add().CopyFrom(metrics_pb) | 190 proto.data.add().CopyFrom(metrics_pb) |
| 176 | 191 |
| 177 if len(proto.data) > 0: | 192 if len(proto.data) > 0: |
| 178 yield proto | 193 yield proto |
| 179 | 194 |
| 195 if error_count: |
| 196 raise errors.MonitoringFailedToFlushAllMetricsError(error_count) |
| 197 |
| 180 | 198 |
| 181 def register(metric): | 199 def register(metric): |
| 182 """Adds the metric to the list of metrics sent by flush(). | 200 """Adds the metric to the list of metrics sent by flush(). |
| 183 | 201 |
| 184 This is called automatically by Metric's constructor. | 202 This is called automatically by Metric's constructor. |
| 185 """ | 203 """ |
| 186 # If someone is registering the same metric object twice, that's okay, but | 204 # If someone is registering the same metric object twice, that's okay, but |
| 187 # registering two different metric objects with the same metric name is not. | 205 # registering two different metric objects with the same metric name is not. |
| 188 for m in state.metrics.values(): | 206 for m in state.metrics.values(): |
| 189 if metric == m: | 207 if metric == m: |
| (...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 252 'Last monitoring flush took %f seconds (longer than ' | 270 'Last monitoring flush took %f seconds (longer than ' |
| 253 '--ts-mon-flush-interval-secs = %f seconds)', | 271 '--ts-mon-flush-interval-secs = %f seconds)', |
| 254 flush_duration, self.interval_secs) | 272 flush_duration, self.interval_secs) |
| 255 next_timeout = 0 | 273 next_timeout = 0 |
| 256 | 274 |
| 257 def stop(self): | 275 def stop(self): |
| 258 """Stops the background thread and performs a final flush.""" | 276 """Stops the background thread and performs a final flush.""" |
| 259 | 277 |
| 260 self.stop_event.set() | 278 self.stop_event.set() |
| 261 self.join() | 279 self.join() |
| OLD | NEW |