| Index: client/third_party/infra_libs/ts_mon/common/interface.py
|
| diff --git a/client/third_party/infra_libs/ts_mon/common/interface.py b/client/third_party/infra_libs/ts_mon/common/interface.py
|
| index 19168ea9ca242b750a14375ef311c8cbbcad8e52..ef0c5ec635db1cd42bdf4c6bcb284e6cb54f0c0e 100644
|
| --- a/client/third_party/infra_libs/ts_mon/common/interface.py
|
| +++ b/client/third_party/infra_libs/ts_mon/common/interface.py
|
| @@ -36,6 +36,7 @@ import logging
|
| import random
|
| import threading
|
| import time
|
| +import traceback
|
|
|
| from infra_libs.ts_mon.common import errors
|
| from infra_libs.ts_mon.common import metric_store
|
| @@ -115,53 +116,85 @@ def _generate_proto_new():
|
| """Generate MetricsPayload for global_monitor.send()."""
|
| proto = new_metrics_pb2.MetricsPayload()
|
|
|
| - collection_by_target = {}
|
| - data_set_by_name = {}
|
| + # Key: Target, value: MetricsCollection.
|
| + collections = {}
|
| +
|
| + # Key: (Target, metric name) tuple, value: MetricsDataSet.
|
| + data_sets = {}
|
|
|
| count = 0
|
| + error_count = 0
|
| for (target, metric, start_time, end_time, fields_values
|
| ) in state.store.get_all():
|
| for fields, value in fields_values.iteritems():
|
| if count >= METRICS_DATA_LENGTH_LIMIT:
|
| yield proto
|
| proto = new_metrics_pb2.MetricsPayload()
|
| - collection_by_target.clear()
|
| - data_set_by_name.clear()
|
| + collections.clear()
|
| + data_sets.clear()
|
| count = 0
|
|
|
| - if target not in collection_by_target:
|
| - collection_by_target[target] = proto.metrics_collection.add()
|
| - target._populate_target_pb_new(collection_by_target[target])
|
| - collection_pb = collection_by_target[target]
|
| -
|
| - if metric.name not in data_set_by_name:
|
| - data_set_by_name[metric.name] = collection_pb.metrics_data_set.add()
|
| - metric._populate_data_set(data_set_by_name[metric.name], fields)
|
| -
|
| - metric._populate_data(data_set_by_name[metric.name], start_time,
|
| - end_time, fields, value)
|
| -
|
| + if target not in collections:
|
| + collections[target] = proto.metrics_collection.add()
|
| + target._populate_target_pb_new(collections[target])
|
| + collection = collections[target]
|
| +
|
| + key = (target, metric.name)
|
| + new_data_set = None
|
| + try:
|
| + if key not in data_sets:
|
| + new_data_set = new_metrics_pb2.MetricsDataSet()
|
| + metric._populate_data_set(new_data_set, fields)
|
| +
|
| + data = new_metrics_pb2.MetricsData()
|
| + metric._populate_data(data, start_time, end_time, fields, value)
|
| + except errors.MonitoringError:
|
| + logging.exception('Failed to serialize a metric.')
|
| + error_count += 1
|
| + continue
|
| +
|
| + # All required data protos have been successfully populated. Now we can
|
| + # insert them in serialized proto and bookeeping data structures.
|
| + if new_data_set is not None:
|
| + collection.metrics_data_set.add().CopyFrom(new_data_set)
|
| + data_sets[key] = collection.metrics_data_set[-1]
|
| + data_sets[key].data.add().CopyFrom(data)
|
| count += 1
|
|
|
| if count > 0:
|
| yield proto
|
|
|
| + if error_count:
|
| + raise errors.MonitoringFailedToFlushAllMetricsError(error_count)
|
| +
|
|
|
| def _generate_proto():
|
| """Generate MetricsCollection for global_monitor.send()."""
|
| proto = metrics_pb2.MetricsCollection()
|
|
|
| + error_count = 0
|
| for target, metric, start_time, _, fields_values in state.store.get_all():
|
| for fields, value in fields_values.iteritems():
|
| if len(proto.data) >= METRICS_DATA_LENGTH_LIMIT:
|
| yield proto
|
| proto = metrics_pb2.MetricsCollection()
|
|
|
| - metric.serialize_to(proto, start_time, fields, value, target)
|
| + try:
|
| + metrics_pb = metrics_pb2.MetricsData()
|
| + metric.serialize_to(metrics_pb, start_time, fields, value, target)
|
| + except errors.MonitoringError:
|
| + error_count += 1
|
| + logging.exception('Failed to serialize a metric.')
|
| + continue
|
| +
|
| + proto.data.add().CopyFrom(metrics_pb)
|
|
|
| if len(proto.data) > 0:
|
| yield proto
|
|
|
| + if error_count:
|
| + raise errors.MonitoringFailedToFlushAllMetricsError(error_count)
|
| +
|
|
|
| def register(metric):
|
| """Adds the metric to the list of metrics sent by flush().
|
|
|