| OLD | NEW |
| 1 # Copyright 2015 The Chromium Authors. All rights reserved. | 1 # Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 import copy | 5 import copy |
| 6 import datetime | 6 import datetime |
| 7 import functools | 7 import functools |
| 8 import logging | 8 import logging |
| 9 import os | 9 import os |
| 10 import sys | 10 import sys |
| (...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 73 return False | 73 return False |
| 74 return _flush_metrics(time_now) | 74 return _flush_metrics(time_now) |
| 75 | 75 |
| 76 | 76 |
| 77 def _flush_metrics(time_now): | 77 def _flush_metrics(time_now): |
| 78 """Return True if metrics were actually sent.""" | 78 """Return True if metrics were actually sent.""" |
| 79 if interface.state.target is None: | 79 if interface.state.target is None: |
| 80 # ts_mon is not configured. | 80 # ts_mon is not configured. |
| 81 return False | 81 return False |
| 82 | 82 |
| 83 logging.info('ts_mon: fetching TSMonInstance entity') | |
| 84 datetime_now = datetime.datetime.utcfromtimestamp(time_now) | 83 datetime_now = datetime.datetime.utcfromtimestamp(time_now) |
| 85 entity = shared.get_instance_entity() | 84 entity = shared.get_instance_entity() |
| 86 if entity.task_num < 0: | 85 if entity.task_num < 0: |
| 87 if interface.state.target.task_num >= 0: | 86 if interface.state.target.task_num >= 0: |
| 88 _reset_cumulative_metrics() | 87 _reset_cumulative_metrics() |
| 89 interface.state.target.task_num = -1 | 88 interface.state.target.task_num = -1 |
| 90 interface.state.last_flushed = entity.last_updated | 89 interface.state.last_flushed = entity.last_updated |
| 91 updated_sec_ago = (datetime_now - entity.last_updated).total_seconds() | 90 updated_sec_ago = (datetime_now - entity.last_updated).total_seconds() |
| 92 if updated_sec_ago > shared.INSTANCE_EXPECTED_TO_HAVE_TASK_NUM_SEC: | 91 if updated_sec_ago > shared.INSTANCE_EXPECTED_TO_HAVE_TASK_NUM_SEC: |
| 93 logging.warning('Instance %s is %d seconds old with no task_num.', | 92 logging.warning('Instance %s is %d seconds old with no task_num.', |
| 94 shared.instance_key_id(), updated_sec_ago) | 93 shared.instance_key_id(), updated_sec_ago) |
| 95 logging.info('ts_mon: skipping the flush, no task_num') | |
| 96 return False | 94 return False |
| 97 interface.state.target.task_num = entity.task_num | 95 interface.state.target.task_num = entity.task_num |
| 98 | 96 |
| 99 logging.info('ts_mon: initiating TSMonInstance entity update') | |
| 100 entity.last_updated = datetime_now | 97 entity.last_updated = datetime_now |
| 101 entity_deferred = entity.put_async() | 98 entity_deferred = entity.put_async() |
| 102 | 99 |
| 103 logging.info('ts_mon: doing the flush') | |
| 104 started = time.time() | |
| 105 interface.flush() | 100 interface.flush() |
| 106 logging.info( | |
| 107 'ts_mon: flush finished in %d ms', (time.time() - started) * 1000) | |
| 108 | 101 |
| 109 for metric in interface.state.global_metrics.itervalues(): | 102 for metric in interface.state.global_metrics.itervalues(): |
| 110 metric.reset() | 103 metric.reset() |
| 111 | 104 |
| 112 logging.info('ts_mon: waiting for TSMonInstance entity update to finish') | |
| 113 entity_deferred.get_result() | 105 entity_deferred.get_result() |
| 114 logging.info('ts_mon: flush is done') | |
| 115 return True | 106 return True |
| 116 | 107 |
| 117 | 108 |
| 118 def _shutdown_hook(time_fn=time.time): | 109 def _shutdown_hook(time_fn=time.time): |
| 119 shared.shutdown_counter.increment() | 110 shared.shutdown_counter.increment() |
| 120 if flush_metrics_if_needed(time_fn()): | 111 if flush_metrics_if_needed(time_fn()): |
| 121 logging.info('Shutdown hook: deleting %s, metrics were flushed.', | 112 logging.info('Shutdown hook: deleting %s, metrics were flushed.', |
| 122 shared.instance_key_id()) | 113 shared.instance_key_id()) |
| 123 else: | 114 else: |
| 124 logging.warning('Shutdown hook: deleting %s, metrics were NOT flushed.', | 115 logging.warning('Shutdown hook: deleting %s, metrics were NOT flushed.', |
| (...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 211 | 202 |
| 212 standard_metrics.init() | 203 standard_metrics.init() |
| 213 | 204 |
| 214 logging.info('Initialized ts_mon with service_name=%s, job_name=%s, ' | 205 logging.info('Initialized ts_mon with service_name=%s, job_name=%s, ' |
| 215 'hostname=%s', service_name, job_name, hostname) | 206 'hostname=%s', service_name, job_name, hostname) |
| 216 | 207 |
| 217 | 208 |
| 218 def _instrumented_dispatcher(dispatcher, request, response, time_fn=time.time): | 209 def _instrumented_dispatcher(dispatcher, request, response, time_fn=time.time): |
| 219 start_time = time_fn() | 210 start_time = time_fn() |
| 220 response_status = 0 | 211 response_status = 0 |
| 221 interface.state.store.initialize_context() | |
| 222 flush_thread = None | 212 flush_thread = None |
| 223 time_now = time_fn() | 213 time_now = time_fn() |
| 224 if need_to_flush_metrics(time_now): | 214 if need_to_flush_metrics(time_now): |
| 225 logging.info('ts_mon: starting flush thread') | |
| 226 flush_thread = threading.Thread(target=_flush_metrics, args=(time_now,)) | 215 flush_thread = threading.Thread(target=_flush_metrics, args=(time_now,)) |
| 227 flush_thread.start() | 216 flush_thread.start() |
| 228 try: | 217 try: |
| 229 ret = dispatcher(request, response) | 218 ret = dispatcher(request, response) |
| 230 except webapp2.HTTPException as ex: | 219 except webapp2.HTTPException as ex: |
| 231 response_status = ex.code | 220 response_status = ex.code |
| 232 raise | 221 raise |
| 233 except Exception: | 222 except Exception: |
| 234 response_status = 500 | 223 response_status = 500 |
| 235 raise | 224 raise |
| 236 else: | 225 else: |
| 237 if isinstance(ret, webapp2.Response): | 226 if isinstance(ret, webapp2.Response): |
| 238 response = ret | 227 response = ret |
| 239 response_status = response.status_int | 228 response_status = response.status_int |
| 240 finally: | 229 finally: |
| 241 if flush_thread: | 230 if flush_thread: |
| 242 start_waiting = time_fn() | |
| 243 logging.info('ts_mon: joining flush thread') | |
| 244 flush_thread.join() | 231 flush_thread.join() |
| 245 logging.info( | |
| 246 'ts_mon: flush threads is done, waited %d ms', | |
| 247 (time_fn() - start_waiting) * 1000) | |
| 248 elapsed_ms = int((time_fn() - start_time) * 1000) | 232 elapsed_ms = int((time_fn() - start_time) * 1000) |
| 249 | 233 |
| 250 # Use the route template regex, not the request path, to prevent an | 234 # Use the route template regex, not the request path, to prevent an |
| 251 # explosion in possible field values. | 235 # explosion in possible field values. |
| 252 name = request.route.template if request.route is not None else '' | 236 name = request.route.template if request.route is not None else '' |
| 253 | 237 |
| 254 http_metrics.update_http_server_metrics( | 238 http_metrics.update_http_server_metrics( |
| 255 name, response_status, elapsed_ms, | 239 name, response_status, elapsed_ms, |
| 256 request_size=request.content_length, | 240 request_size=request.content_length, |
| 257 response_size=response.content_length, | 241 response_size=response.content_length, |
| (...skipping 21 matching lines...) Expand all Loading... |
| 279 """Decorator to instrument Cloud Endpoint methods.""" | 263 """Decorator to instrument Cloud Endpoint methods.""" |
| 280 def decorator(fn): | 264 def decorator(fn): |
| 281 method_name = fn.__name__ | 265 method_name = fn.__name__ |
| 282 assert method_name | 266 assert method_name |
| 283 @functools.wraps(fn) | 267 @functools.wraps(fn) |
| 284 def decorated(service, *args, **kwargs): | 268 def decorated(service, *args, **kwargs): |
| 285 service_name = service.__class__.__name__ | 269 service_name = service.__class__.__name__ |
| 286 endpoint_name = '/_ah/spi/%s.%s' % (service_name, method_name) | 270 endpoint_name = '/_ah/spi/%s.%s' % (service_name, method_name) |
| 287 start_time = time_fn() | 271 start_time = time_fn() |
| 288 response_status = 0 | 272 response_status = 0 |
| 289 interface.state.store.initialize_context() | |
| 290 flush_thread = None | 273 flush_thread = None |
| 291 time_now = time_fn() | 274 time_now = time_fn() |
| 292 if need_to_flush_metrics(time_now): | 275 if need_to_flush_metrics(time_now): |
| 293 flush_thread = threading.Thread(target=_flush_metrics, args=(time_now,)) | 276 flush_thread = threading.Thread(target=_flush_metrics, args=(time_now,)) |
| 294 flush_thread.start() | 277 flush_thread.start() |
| 295 try: | 278 try: |
| 296 ret = fn(service, *args, **kwargs) | 279 ret = fn(service, *args, **kwargs) |
| 297 response_status = 200 | 280 response_status = 200 |
| 298 return ret | 281 return ret |
| 299 except endpoints.ServiceException as e: | 282 except endpoints.ServiceException as e: |
| (...skipping 22 matching lines...) Expand all Loading... |
| 322 if hasattr(fn, 'im_class') and hasattr(fn, 'im_func'): # Bound method. | 305 if hasattr(fn, 'im_class') and hasattr(fn, 'im_func'): # Bound method. |
| 323 return '.'.join([ | 306 return '.'.join([ |
| 324 fn.im_class.__module__, | 307 fn.im_class.__module__, |
| 325 fn.im_class.__name__, | 308 fn.im_class.__name__, |
| 326 fn.im_func.func_name]) | 309 fn.im_func.func_name]) |
| 327 if hasattr(fn, '__name__'): # Function. | 310 if hasattr(fn, '__name__'): # Function. |
| 328 return fn.__module__ + '.' + fn.__name__ | 311 return fn.__module__ + '.' + fn.__name__ |
| 329 return '<unknown>' # pragma: no cover | 312 return '<unknown>' # pragma: no cover |
| 330 | 313 |
| 331 def process_view(self, request, view_func, view_args, view_kwargs): | 314 def process_view(self, request, view_func, view_args, view_kwargs): |
| 332 interface.state.store.initialize_context() | |
| 333 | |
| 334 time_now = self._time_fn() | 315 time_now = self._time_fn() |
| 335 state = { | 316 state = { |
| 336 'flush_thread': None, | 317 'flush_thread': None, |
| 337 'name': self._callable_name(view_func), | 318 'name': self._callable_name(view_func), |
| 338 'start_time': time_now, | 319 'start_time': time_now, |
| 339 } | 320 } |
| 340 | 321 |
| 341 if need_to_flush_metrics(time_now): | 322 if need_to_flush_metrics(time_now): |
| 342 thread = threading.Thread(target=_flush_metrics, args=(time_now,)) | 323 thread = threading.Thread(target=_flush_metrics, args=(time_now,)) |
| 343 thread.start() | 324 thread.start() |
| (...skipping 26 matching lines...) Expand all Loading... |
| 370 response.status_code, | 351 response.status_code, |
| 371 duration_secs * 1000, | 352 duration_secs * 1000, |
| 372 request_size=request_size, | 353 request_size=request_size, |
| 373 response_size=response_size, | 354 response_size=response_size, |
| 374 user_agent=request.META.get('HTTP_USER_AGENT', None)) | 355 user_agent=request.META.get('HTTP_USER_AGENT', None)) |
| 375 return response | 356 return response |
| 376 | 357 |
| 377 | 358 |
| 378 def reset_for_unittest(disable=False): | 359 def reset_for_unittest(disable=False): |
| 379 interface.reset_for_unittest(disable=disable) | 360 interface.reset_for_unittest(disable=disable) |
| OLD | NEW |