| Index: client/third_party/infra_libs/ts_mon/common/interface.py
|
| diff --git a/client/third_party/infra_libs/ts_mon/common/interface.py b/client/third_party/infra_libs/ts_mon/common/interface.py
|
| index df3f50286c7abadf96eb0f6d77d6b2a94555aea4..19168ea9ca242b750a14375ef311c8cbbcad8e52 100644
|
| --- a/client/third_party/infra_libs/ts_mon/common/interface.py
|
| +++ b/client/third_party/infra_libs/ts_mon/common/interface.py
|
| @@ -39,7 +39,8 @@ import time
|
|
|
| from infra_libs.ts_mon.common import errors
|
| from infra_libs.ts_mon.common import metric_store
|
| -from infra_libs.ts_mon.protos import metrics_pb2
|
| +from infra_libs.ts_mon.protos.current import metrics_pb2
|
| +from infra_libs.ts_mon.protos.new import metrics_pb2 as new_metrics_pb2
|
|
|
| # The maximum number of MetricsData messages to include in each HTTP request.
|
| # MetricsCollections larger than this will be split into multiple requests.
|
| @@ -79,18 +80,20 @@ class State(object):
|
| self.last_flushed = datetime.datetime.utcfromtimestamp(0)
|
| # Metric name prefix
|
| self.metric_name_prefix = '/chrome/infra/'
|
| + # Use the new proto schema
|
| + self.use_new_proto = False
|
|
|
| def reset_for_unittest(self):
|
| self.metrics = {}
|
| self.last_flushed = datetime.datetime.utcfromtimestamp(0)
|
| self.store.reset_for_unittest()
|
| + self.use_new_proto = False
|
|
|
| state = State()
|
|
|
|
|
| def flush():
|
| """Send all metrics that are registered in the application."""
|
| -
|
| if not state.flush_enabled_fn():
|
| logging.debug('ts_mon: sending metrics is disabled.')
|
| return
|
| @@ -98,18 +101,66 @@ def flush():
|
| if not state.global_monitor or not state.target:
|
| raise errors.MonitoringNoConfiguredMonitorError(None)
|
|
|
| + if state.use_new_proto:
|
| + generator = _generate_proto_new
|
| + else:
|
| + generator = _generate_proto
|
| +
|
| + for proto in generator():
|
| + state.global_monitor.send(proto)
|
| + state.last_flushed = datetime.datetime.utcnow()
|
| +
|
| +
|
| +def _generate_proto_new():
|
| + """Generate MetricsPayload for global_monitor.send()."""
|
| + proto = new_metrics_pb2.MetricsPayload()
|
| +
|
| + collection_by_target = {}
|
| + data_set_by_name = {}
|
| +
|
| + count = 0
|
| + for (target, metric, start_time, end_time, fields_values
|
| + ) in state.store.get_all():
|
| + for fields, value in fields_values.iteritems():
|
| + if count >= METRICS_DATA_LENGTH_LIMIT:
|
| + yield proto
|
| + proto = new_metrics_pb2.MetricsPayload()
|
| + collection_by_target.clear()
|
| + data_set_by_name.clear()
|
| + count = 0
|
| +
|
| + if target not in collection_by_target:
|
| + collection_by_target[target] = proto.metrics_collection.add()
|
| + target._populate_target_pb_new(collection_by_target[target])
|
| + collection_pb = collection_by_target[target]
|
| +
|
| + if metric.name not in data_set_by_name:
|
| + data_set_by_name[metric.name] = collection_pb.metrics_data_set.add()
|
| + metric._populate_data_set(data_set_by_name[metric.name], fields)
|
| +
|
| + metric._populate_data(data_set_by_name[metric.name], start_time,
|
| + end_time, fields, value)
|
| +
|
| + count += 1
|
| +
|
| + if count > 0:
|
| + yield proto
|
| +
|
| +
|
| +def _generate_proto():
|
| + """Generate MetricsCollection for global_monitor.send()."""
|
| proto = metrics_pb2.MetricsCollection()
|
|
|
| - for target, metric, start_time, fields_values in state.store.get_all():
|
| + for target, metric, start_time, _, fields_values in state.store.get_all():
|
| for fields, value in fields_values.iteritems():
|
| if len(proto.data) >= METRICS_DATA_LENGTH_LIMIT:
|
| - state.global_monitor.send(proto)
|
| - del proto.data[:]
|
| + yield proto
|
| + proto = metrics_pb2.MetricsCollection()
|
|
|
| metric.serialize_to(proto, start_time, fields, value, target)
|
|
|
| - state.global_monitor.send(proto)
|
| - state.last_flushed = datetime.datetime.utcnow()
|
| + if len(proto.data) > 0:
|
| + yield proto
|
|
|
|
|
| def register(metric):
|
|
|