Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(89)

Side by Side Diff: client/third_party/infra_libs/ts_mon/common/interface.py

Issue 2573343002: Roll infra_libs and its dependencies to 066f135 (Closed)
Patch Set: Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2015 The Chromium Authors. All rights reserved. 1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 """Classes representing the monitoring interface for tasks or devices. 5 """Classes representing the monitoring interface for tasks or devices.
6 6
7 Usage: 7 Usage:
8 import argparse 8 import argparse
9 from infra_libs import ts_mon 9 from infra_libs import ts_mon
10 10
(...skipping 18 matching lines...) Expand all
29 c.set(0) 29 c.set(0)
30 for x in range(100): 30 for x in range(100):
31 c.increment() 31 c.increment()
32 """ 32 """
33 33
34 import datetime 34 import datetime
35 import logging 35 import logging
36 import random 36 import random
37 import threading 37 import threading
38 import time 38 import time
39 import traceback
39 40
40 from infra_libs.ts_mon.common import errors 41 from infra_libs.ts_mon.common import errors
41 from infra_libs.ts_mon.common import metric_store 42 from infra_libs.ts_mon.common import metric_store
42 from infra_libs.ts_mon.protos.current import metrics_pb2 43 from infra_libs.ts_mon.protos.current import metrics_pb2
43 from infra_libs.ts_mon.protos.new import metrics_pb2 as new_metrics_pb2 44 from infra_libs.ts_mon.protos.new import metrics_pb2 as new_metrics_pb2
44 45
45 # The maximum number of MetricsData messages to include in each HTTP request. 46 # The maximum number of MetricsData messages to include in each HTTP request.
46 # MetricsCollections larger than this will be split into multiple requests. 47 # MetricsCollections larger than this will be split into multiple requests.
47 METRICS_DATA_LENGTH_LIMIT = 1000 48 METRICS_DATA_LENGTH_LIMIT = 1000
48 49
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after
108 109
109 for proto in generator(): 110 for proto in generator():
110 state.global_monitor.send(proto) 111 state.global_monitor.send(proto)
111 state.last_flushed = datetime.datetime.utcnow() 112 state.last_flushed = datetime.datetime.utcnow()
112 113
113 114
114 def _generate_proto_new(): 115 def _generate_proto_new():
115 """Generate MetricsPayload for global_monitor.send().""" 116 """Generate MetricsPayload for global_monitor.send()."""
116 proto = new_metrics_pb2.MetricsPayload() 117 proto = new_metrics_pb2.MetricsPayload()
117 118
118 collection_by_target = {} 119 # Key: Target, value: MetricsCollection.
119 data_set_by_name = {} 120 collections = {}
121
122 # Key: (Target, metric name) tuple, value: MetricsDataSet.
123 data_sets = {}
120 124
121 count = 0 125 count = 0
126 error_count = 0
122 for (target, metric, start_time, end_time, fields_values 127 for (target, metric, start_time, end_time, fields_values
123 ) in state.store.get_all(): 128 ) in state.store.get_all():
124 for fields, value in fields_values.iteritems(): 129 for fields, value in fields_values.iteritems():
125 if count >= METRICS_DATA_LENGTH_LIMIT: 130 if count >= METRICS_DATA_LENGTH_LIMIT:
126 yield proto 131 yield proto
127 proto = new_metrics_pb2.MetricsPayload() 132 proto = new_metrics_pb2.MetricsPayload()
128 collection_by_target.clear() 133 collections.clear()
129 data_set_by_name.clear() 134 data_sets.clear()
130 count = 0 135 count = 0
131 136
132 if target not in collection_by_target: 137 if target not in collections:
133 collection_by_target[target] = proto.metrics_collection.add() 138 collections[target] = proto.metrics_collection.add()
134 target._populate_target_pb_new(collection_by_target[target]) 139 target._populate_target_pb_new(collections[target])
135 collection_pb = collection_by_target[target] 140 collection = collections[target]
136 141
137 if metric.name not in data_set_by_name: 142 key = (target, metric.name)
138 data_set_by_name[metric.name] = collection_pb.metrics_data_set.add() 143 new_data_set = None
139 metric._populate_data_set(data_set_by_name[metric.name], fields) 144 try:
145 if key not in data_sets:
146 new_data_set = new_metrics_pb2.MetricsDataSet()
147 metric._populate_data_set(new_data_set, fields)
140 148
141 metric._populate_data(data_set_by_name[metric.name], start_time, 149 data = new_metrics_pb2.MetricsData()
142 end_time, fields, value) 150 metric._populate_data(data, start_time, end_time, fields, value)
151 except errors.MonitoringError:
152 logging.exception('Failed to serialize a metric.')
153 error_count += 1
154 continue
143 155
156 # All required data protos have been successfully populated. Now we can
157 # insert them in serialized proto and bookeeping data structures.
158 if new_data_set is not None:
159 collection.metrics_data_set.add().CopyFrom(new_data_set)
160 data_sets[key] = collection.metrics_data_set[-1]
161 data_sets[key].data.add().CopyFrom(data)
144 count += 1 162 count += 1
145 163
146 if count > 0: 164 if count > 0:
147 yield proto 165 yield proto
148 166
167 if error_count:
168 raise errors.MonitoringFailedToFlushAllMetricsError(error_count)
169
149 170
150 def _generate_proto(): 171 def _generate_proto():
151 """Generate MetricsCollection for global_monitor.send().""" 172 """Generate MetricsCollection for global_monitor.send()."""
152 proto = metrics_pb2.MetricsCollection() 173 proto = metrics_pb2.MetricsCollection()
153 174
175 error_count = 0
154 for target, metric, start_time, _, fields_values in state.store.get_all(): 176 for target, metric, start_time, _, fields_values in state.store.get_all():
155 for fields, value in fields_values.iteritems(): 177 for fields, value in fields_values.iteritems():
156 if len(proto.data) >= METRICS_DATA_LENGTH_LIMIT: 178 if len(proto.data) >= METRICS_DATA_LENGTH_LIMIT:
157 yield proto 179 yield proto
158 proto = metrics_pb2.MetricsCollection() 180 proto = metrics_pb2.MetricsCollection()
159 181
160 metric.serialize_to(proto, start_time, fields, value, target) 182 try:
183 metrics_pb = metrics_pb2.MetricsData()
184 metric.serialize_to(metrics_pb, start_time, fields, value, target)
185 except errors.MonitoringError:
186 error_count += 1
187 logging.exception('Failed to serialize a metric.')
188 continue
189
190 proto.data.add().CopyFrom(metrics_pb)
161 191
162 if len(proto.data) > 0: 192 if len(proto.data) > 0:
163 yield proto 193 yield proto
164 194
195 if error_count:
196 raise errors.MonitoringFailedToFlushAllMetricsError(error_count)
197
165 198
166 def register(metric): 199 def register(metric):
167 """Adds the metric to the list of metrics sent by flush(). 200 """Adds the metric to the list of metrics sent by flush().
168 201
169 This is called automatically by Metric's constructor. 202 This is called automatically by Metric's constructor.
170 """ 203 """
171 # If someone is registering the same metric object twice, that's okay, but 204 # If someone is registering the same metric object twice, that's okay, but
172 # registering two different metric objects with the same metric name is not. 205 # registering two different metric objects with the same metric name is not.
173 for m in state.metrics.values(): 206 for m in state.metrics.values():
174 if metric == m: 207 if metric == m:
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
238 'Last monitoring flush took %f seconds (longer than ' 271 'Last monitoring flush took %f seconds (longer than '
239 '--ts-mon-flush-interval-secs = %f seconds)', 272 '--ts-mon-flush-interval-secs = %f seconds)',
240 flush_duration, self.interval_secs) 273 flush_duration, self.interval_secs)
241 next_timeout = 0 274 next_timeout = 0
242 275
243 def stop(self): 276 def stop(self):
244 """Stops the background thread and performs a final flush.""" 277 """Stops the background thread and performs a final flush."""
245 278
246 self.stop_event.set() 279 self.stop_event.set()
247 self.join() 280 self.join()
OLDNEW
« no previous file with comments | « client/third_party/infra_libs/ts_mon/common/errors.py ('k') | client/third_party/infra_libs/ts_mon/common/metric_store.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698