| Index: client/third_party/infra_libs/ts_mon/common/interface.py
|
| diff --git a/client/third_party/infra_libs/ts_mon/common/interface.py b/client/third_party/infra_libs/ts_mon/common/interface.py
|
| index b012812d92d0074fd867a66ae20884c3b8913495..64a2e4b05334c8b95be167b7e8d8cae522e83e31 100644
|
| --- a/client/third_party/infra_libs/ts_mon/common/interface.py
|
| +++ b/client/third_party/infra_libs/ts_mon/common/interface.py
|
| @@ -40,8 +40,7 @@ import traceback
|
|
|
| from infra_libs.ts_mon.common import errors
|
| from infra_libs.ts_mon.common import metric_store
|
| -from infra_libs.ts_mon.protos.current import metrics_pb2
|
| -from infra_libs.ts_mon.protos.new import metrics_pb2 as new_metrics_pb2
|
| +from infra_libs.ts_mon.protos import metrics_pb2
|
|
|
| # The maximum number of MetricsData messages to include in each HTTP request.
|
| # MetricsCollections larger than this will be split into multiple requests.
|
| @@ -81,8 +80,6 @@ class State(object):
|
| self.last_flushed = datetime.datetime.utcfromtimestamp(0)
|
| # Metric name prefix
|
| self.metric_name_prefix = '/chrome/infra/'
|
| - # Use the new proto schema
|
| - self.use_new_proto = False
|
| # Metrics registered with register_global_metrics. Keyed by metric name.
|
| self.global_metrics = {}
|
| # Callbacks registered with register_global_metrics_callback. Keyed by the
|
| @@ -99,7 +96,6 @@ class State(object):
|
| self.invoke_global_callbacks_on_flush = True
|
| self.last_flushed = datetime.datetime.utcfromtimestamp(0)
|
| self.store.reset_for_unittest()
|
| - self.use_new_proto = False
|
|
|
| state = State()
|
|
|
| @@ -116,19 +112,18 @@ def flush():
|
| if state.invoke_global_callbacks_on_flush:
|
| invoke_global_callbacks()
|
|
|
| - if state.use_new_proto:
|
| - generator = _generate_proto_new
|
| - else:
|
| - generator = _generate_proto
|
| -
|
| - for proto in generator():
|
| - state.global_monitor.send(proto)
|
| + rpcs = []
|
| + for proto in _generate_proto():
|
| + rpcs.append(state.global_monitor.send(proto))
|
| + for rpc in rpcs:
|
| + if rpc is not None:
|
| + state.global_monitor.wait(rpc)
|
| state.last_flushed = datetime.datetime.utcnow()
|
|
|
|
|
| -def _generate_proto_new():
|
| +def _generate_proto():
|
| """Generate MetricsPayload for global_monitor.send()."""
|
| - proto = new_metrics_pb2.MetricsPayload()
|
| + proto = metrics_pb2.MetricsPayload()
|
|
|
| # Key: Target, value: MetricsCollection.
|
| collections = {}
|
| @@ -142,24 +137,24 @@ def _generate_proto_new():
|
| for fields, value in fields_values.iteritems():
|
| if count >= METRICS_DATA_LENGTH_LIMIT:
|
| yield proto
|
| - proto = new_metrics_pb2.MetricsPayload()
|
| + proto = metrics_pb2.MetricsPayload()
|
| collections.clear()
|
| data_sets.clear()
|
| count = 0
|
|
|
| if target not in collections:
|
| collections[target] = proto.metrics_collection.add()
|
| - target._populate_target_pb_new(collections[target])
|
| + target.populate_target_pb(collections[target])
|
| collection = collections[target]
|
|
|
| key = (target, metric.name)
|
| new_data_set = None
|
| if key not in data_sets:
|
| - new_data_set = new_metrics_pb2.MetricsDataSet()
|
| - metric._populate_data_set(new_data_set)
|
| + new_data_set = metrics_pb2.MetricsDataSet()
|
| + metric.populate_data_set(new_data_set)
|
|
|
| - data = new_metrics_pb2.MetricsData()
|
| - metric._populate_data(data, start_time, end_time, fields, value)
|
| + data = metrics_pb2.MetricsData()
|
| + metric.populate_data(data, start_time, end_time, fields, value)
|
|
|
| # All required data protos have been successfully populated. Now we can
|
| # insert them in serialized proto and bookeeping data structures.
|
| @@ -173,25 +168,6 @@ def _generate_proto_new():
|
| yield proto
|
|
|
|
|
| -def _generate_proto():
|
| - """Generate MetricsCollection for global_monitor.send()."""
|
| - proto = metrics_pb2.MetricsCollection()
|
| -
|
| - for target, metric, start_time, _, fields_values in state.store.get_all():
|
| - for fields, value in fields_values.iteritems():
|
| - if len(proto.data) >= METRICS_DATA_LENGTH_LIMIT:
|
| - yield proto
|
| - proto = metrics_pb2.MetricsCollection()
|
| -
|
| - metrics_pb = metrics_pb2.MetricsData()
|
| - metric.serialize_to(metrics_pb, start_time, fields, value, target)
|
| -
|
| - proto.data.add().CopyFrom(metrics_pb)
|
| -
|
| - if len(proto.data) > 0:
|
| - yield proto
|
| -
|
| -
|
| def register(metric):
|
| """Adds the metric to the list of metrics sent by flush().
|
|
|
| @@ -222,8 +198,7 @@ def close():
|
|
|
| def reset_for_unittest(disable=False):
|
| state.reset_for_unittest()
|
| - if disable:
|
| - state.flush_enabled_fn = lambda: False
|
| + state.flush_enabled_fn = lambda: not disable
|
|
|
|
|
| def register_global_metrics(metrics):
|
|
|