Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(247)

Side by Side Diff: appengine/third_party/gae_ts_mon/config.py

Issue 2991803002: Update infra_libs to 1.1.15 / 0b44aba87c1c6538439df6d24a409870810747ab (Closed)
Patch Set: fix Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2015 The Chromium Authors. All rights reserved. 1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 import copy 5 import copy
6 import datetime 6 import datetime
7 import functools 7 import functools
8 import logging 8 import logging
9 import os 9 import os
10 import sys 10 import sys
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after
73 return False 73 return False
74 return _flush_metrics(time_now) 74 return _flush_metrics(time_now)
75 75
76 76
77 def _flush_metrics(time_now): 77 def _flush_metrics(time_now):
78 """Return True if metrics were actually sent.""" 78 """Return True if metrics were actually sent."""
79 if interface.state.target is None: 79 if interface.state.target is None:
80 # ts_mon is not configured. 80 # ts_mon is not configured.
81 return False 81 return False
82 82
83 logging.info('ts_mon: fetching TSMonInstance entity')
84 datetime_now = datetime.datetime.utcfromtimestamp(time_now) 83 datetime_now = datetime.datetime.utcfromtimestamp(time_now)
85 entity = shared.get_instance_entity() 84 entity = shared.get_instance_entity()
86 if entity.task_num < 0: 85 if entity.task_num < 0:
87 if interface.state.target.task_num >= 0: 86 if interface.state.target.task_num >= 0:
88 _reset_cumulative_metrics() 87 _reset_cumulative_metrics()
89 interface.state.target.task_num = -1 88 interface.state.target.task_num = -1
90 interface.state.last_flushed = entity.last_updated 89 interface.state.last_flushed = entity.last_updated
91 updated_sec_ago = (datetime_now - entity.last_updated).total_seconds() 90 updated_sec_ago = (datetime_now - entity.last_updated).total_seconds()
92 if updated_sec_ago > shared.INSTANCE_EXPECTED_TO_HAVE_TASK_NUM_SEC: 91 if updated_sec_ago > shared.INSTANCE_EXPECTED_TO_HAVE_TASK_NUM_SEC:
93 logging.warning('Instance %s is %d seconds old with no task_num.', 92 logging.warning('Instance %s is %d seconds old with no task_num.',
94 shared.instance_key_id(), updated_sec_ago) 93 shared.instance_key_id(), updated_sec_ago)
95 logging.info('ts_mon: skipping the flush, no task_num')
96 return False 94 return False
97 interface.state.target.task_num = entity.task_num 95 interface.state.target.task_num = entity.task_num
98 96
99 logging.info('ts_mon: initiating TSMonInstance entity update')
100 entity.last_updated = datetime_now 97 entity.last_updated = datetime_now
101 entity_deferred = entity.put_async() 98 entity_deferred = entity.put_async()
102 99
103 logging.info('ts_mon: doing the flush')
104 started = time.time()
105 interface.flush() 100 interface.flush()
106 logging.info(
107 'ts_mon: flush finished in %d ms', (time.time() - started) * 1000)
108 101
109 for metric in interface.state.global_metrics.itervalues(): 102 for metric in interface.state.global_metrics.itervalues():
110 metric.reset() 103 metric.reset()
111 104
112 logging.info('ts_mon: waiting for TSMonInstance entity update to finish')
113 entity_deferred.get_result() 105 entity_deferred.get_result()
114 logging.info('ts_mon: flush is done')
115 return True 106 return True
116 107
117 108
118 def _shutdown_hook(time_fn=time.time): 109 def _shutdown_hook(time_fn=time.time):
119 shared.shutdown_counter.increment() 110 shared.shutdown_counter.increment()
120 if flush_metrics_if_needed(time_fn()): 111 if flush_metrics_if_needed(time_fn()):
121 logging.info('Shutdown hook: deleting %s, metrics were flushed.', 112 logging.info('Shutdown hook: deleting %s, metrics were flushed.',
122 shared.instance_key_id()) 113 shared.instance_key_id())
123 else: 114 else:
124 logging.warning('Shutdown hook: deleting %s, metrics were NOT flushed.', 115 logging.warning('Shutdown hook: deleting %s, metrics were NOT flushed.',
(...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after
211 202
212 standard_metrics.init() 203 standard_metrics.init()
213 204
214 logging.info('Initialized ts_mon with service_name=%s, job_name=%s, ' 205 logging.info('Initialized ts_mon with service_name=%s, job_name=%s, '
215 'hostname=%s', service_name, job_name, hostname) 206 'hostname=%s', service_name, job_name, hostname)
216 207
217 208
218 def _instrumented_dispatcher(dispatcher, request, response, time_fn=time.time): 209 def _instrumented_dispatcher(dispatcher, request, response, time_fn=time.time):
219 start_time = time_fn() 210 start_time = time_fn()
220 response_status = 0 211 response_status = 0
221 interface.state.store.initialize_context()
222 flush_thread = None 212 flush_thread = None
223 time_now = time_fn() 213 time_now = time_fn()
224 if need_to_flush_metrics(time_now): 214 if need_to_flush_metrics(time_now):
225 logging.info('ts_mon: starting flush thread')
226 flush_thread = threading.Thread(target=_flush_metrics, args=(time_now,)) 215 flush_thread = threading.Thread(target=_flush_metrics, args=(time_now,))
227 flush_thread.start() 216 flush_thread.start()
228 try: 217 try:
229 ret = dispatcher(request, response) 218 ret = dispatcher(request, response)
230 except webapp2.HTTPException as ex: 219 except webapp2.HTTPException as ex:
231 response_status = ex.code 220 response_status = ex.code
232 raise 221 raise
233 except Exception: 222 except Exception:
234 response_status = 500 223 response_status = 500
235 raise 224 raise
236 else: 225 else:
237 if isinstance(ret, webapp2.Response): 226 if isinstance(ret, webapp2.Response):
238 response = ret 227 response = ret
239 response_status = response.status_int 228 response_status = response.status_int
240 finally: 229 finally:
241 if flush_thread: 230 if flush_thread:
242 start_waiting = time_fn()
243 logging.info('ts_mon: joining flush thread')
244 flush_thread.join() 231 flush_thread.join()
245 logging.info(
246 'ts_mon: flush threads is done, waited %d ms',
247 (time_fn() - start_waiting) * 1000)
248 elapsed_ms = int((time_fn() - start_time) * 1000) 232 elapsed_ms = int((time_fn() - start_time) * 1000)
249 233
250 # Use the route template regex, not the request path, to prevent an 234 # Use the route template regex, not the request path, to prevent an
251 # explosion in possible field values. 235 # explosion in possible field values.
252 name = request.route.template if request.route is not None else '' 236 name = request.route.template if request.route is not None else ''
253 237
254 http_metrics.update_http_server_metrics( 238 http_metrics.update_http_server_metrics(
255 name, response_status, elapsed_ms, 239 name, response_status, elapsed_ms,
256 request_size=request.content_length, 240 request_size=request.content_length,
257 response_size=response.content_length, 241 response_size=response.content_length,
(...skipping 21 matching lines...) Expand all
279 """Decorator to instrument Cloud Endpoint methods.""" 263 """Decorator to instrument Cloud Endpoint methods."""
280 def decorator(fn): 264 def decorator(fn):
281 method_name = fn.__name__ 265 method_name = fn.__name__
282 assert method_name 266 assert method_name
283 @functools.wraps(fn) 267 @functools.wraps(fn)
284 def decorated(service, *args, **kwargs): 268 def decorated(service, *args, **kwargs):
285 service_name = service.__class__.__name__ 269 service_name = service.__class__.__name__
286 endpoint_name = '/_ah/spi/%s.%s' % (service_name, method_name) 270 endpoint_name = '/_ah/spi/%s.%s' % (service_name, method_name)
287 start_time = time_fn() 271 start_time = time_fn()
288 response_status = 0 272 response_status = 0
289 interface.state.store.initialize_context()
290 flush_thread = None 273 flush_thread = None
291 time_now = time_fn() 274 time_now = time_fn()
292 if need_to_flush_metrics(time_now): 275 if need_to_flush_metrics(time_now):
293 flush_thread = threading.Thread(target=_flush_metrics, args=(time_now,)) 276 flush_thread = threading.Thread(target=_flush_metrics, args=(time_now,))
294 flush_thread.start() 277 flush_thread.start()
295 try: 278 try:
296 ret = fn(service, *args, **kwargs) 279 ret = fn(service, *args, **kwargs)
297 response_status = 200 280 response_status = 200
298 return ret 281 return ret
299 except endpoints.ServiceException as e: 282 except endpoints.ServiceException as e:
(...skipping 22 matching lines...) Expand all
322 if hasattr(fn, 'im_class') and hasattr(fn, 'im_func'): # Bound method. 305 if hasattr(fn, 'im_class') and hasattr(fn, 'im_func'): # Bound method.
323 return '.'.join([ 306 return '.'.join([
324 fn.im_class.__module__, 307 fn.im_class.__module__,
325 fn.im_class.__name__, 308 fn.im_class.__name__,
326 fn.im_func.func_name]) 309 fn.im_func.func_name])
327 if hasattr(fn, '__name__'): # Function. 310 if hasattr(fn, '__name__'): # Function.
328 return fn.__module__ + '.' + fn.__name__ 311 return fn.__module__ + '.' + fn.__name__
329 return '<unknown>' # pragma: no cover 312 return '<unknown>' # pragma: no cover
330 313
331 def process_view(self, request, view_func, view_args, view_kwargs): 314 def process_view(self, request, view_func, view_args, view_kwargs):
332 interface.state.store.initialize_context()
333
334 time_now = self._time_fn() 315 time_now = self._time_fn()
335 state = { 316 state = {
336 'flush_thread': None, 317 'flush_thread': None,
337 'name': self._callable_name(view_func), 318 'name': self._callable_name(view_func),
338 'start_time': time_now, 319 'start_time': time_now,
339 } 320 }
340 321
341 if need_to_flush_metrics(time_now): 322 if need_to_flush_metrics(time_now):
342 thread = threading.Thread(target=_flush_metrics, args=(time_now,)) 323 thread = threading.Thread(target=_flush_metrics, args=(time_now,))
343 thread.start() 324 thread.start()
(...skipping 26 matching lines...) Expand all
370 response.status_code, 351 response.status_code,
371 duration_secs * 1000, 352 duration_secs * 1000,
372 request_size=request_size, 353 request_size=request_size,
373 response_size=response_size, 354 response_size=response_size,
374 user_agent=request.META.get('HTTP_USER_AGENT', None)) 355 user_agent=request.META.get('HTTP_USER_AGENT', None))
375 return response 356 return response
376 357
377 358
378 def reset_for_unittest(disable=False): 359 def reset_for_unittest(disable=False):
379 interface.reset_for_unittest(disable=disable) 360 interface.reset_for_unittest(disable=disable)
OLDNEW
« no previous file with comments | « appengine/third_party/gae_ts_mon/__init__.py ('k') | client/third_party/infra_libs/README.swarming » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698