Chromium Code Reviews| Index: appengine_module/gae_ts_mon/config.py |
| diff --git a/appengine_module/gae_ts_mon/config.py b/appengine_module/gae_ts_mon/config.py |
| index 2cbe0a344d95538c079dc8fba7c7c115efbdbbcf..24c607ec75bf250d76424a51856547cc40a28bd8 100644 |
| --- a/appengine_module/gae_ts_mon/config.py |
| +++ b/appengine_module/gae_ts_mon/config.py |
| @@ -4,12 +4,14 @@ |
| import copy |
| import datetime |
| +import functools |
| import logging |
| import os |
| import sys |
| import time |
| import threading |
| +import endpoints |
| import webapp2 |
| from google.appengine.api import modules |
| @@ -38,15 +40,20 @@ def _reset_cumulative_metrics(): |
| _flush_metrics_lock = threading.Lock() |
| -def flush_metrics_if_needed(time_fn=datetime.datetime.utcnow): |
| - time_now = time_fn() |
| +def need_to_flush_metrics(time_fn=time.time): |
|
nodir
2016/03/18 17:38:02
I'd add a docstring that this function is not pure
Sergey Berezin
2016/03/21 18:49:32
Done.
|
| + time_now = datetime.datetime.utcfromtimestamp(time_fn()) |
| minute_ago = time_now - datetime.timedelta(seconds=60) |
| with _flush_metrics_lock: |
| if interface.state.last_flushed > minute_ago: |
| return False |
| interface.state.last_flushed = time_now |
|
nodir
2016/03/18 17:38:02
What happens if flushing fails after this value is
Sergey Berezin
2016/03/21 18:49:32
We simply retry a minute later. This in fact is a
|
| + return True |
| + |
| - return _flush_metrics(time_now) |
| +def flush_metrics_if_needed(time_fn=time.time): |
| + if not need_to_flush_metrics(time_fn=time_fn): |
| + return False |
| + return _flush_metrics(interface.state.last_flushed) |
|
nodir
2016/03/18 17:38:02
passing last_flushed as time_now seems to heavily
Sergey Berezin
2016/03/21 18:49:32
Very good point, thanks. I refactored this part of
|
| def _flush_metrics(time_now): |
| @@ -171,8 +178,11 @@ def _instrumented_dispatcher(dispatcher, request, response, time_fn=time.time): |
| start_time = time_fn() |
| response_status = 0 |
| interface.state.store.initialize_context() |
| - flush_thread = threading.Thread(target=flush_metrics_if_needed) |
| - flush_thread.start() |
| + flush_thread = None |
| + if need_to_flush_metrics(time_fn=time_fn): |
| + flush_thread = threading.Thread( |
| + target=_flush_metrics, args=(interface.state.last_flushed,)) |
| + flush_thread.start() |
| try: |
| ret = dispatcher(request, response) |
| except webapp2.HTTPException as ex: |
| @@ -186,29 +196,20 @@ def _instrumented_dispatcher(dispatcher, request, response, time_fn=time.time): |
| response = ret |
| response_status = response.status_int |
| finally: |
| - flush_thread.join() |
| + if flush_thread: |
| + flush_thread.join() |
|
nodir
2016/03/18 17:38:02
please join after elapsed_ms is computed
Sergey Berezin
2016/03/21 18:49:32
This is on purpose: I want to track the total requ
|
| elapsed_ms = int((time_fn() - start_time) * 1000) |
| - fields = {'status': response_status, 'name': '', 'is_robot': False} |
| - if request.route is not None: |
| - # Use the route template regex, not the request path, to prevent an |
| - # explosion in possible field values. |
| - fields['name'] = request.route.template |
| - if request.user_agent is not None: |
| - # We must not log user agents, but we can store whether or not the |
| - # user agent string indicates that the requester was a Google bot. |
| - fields['is_robot'] = ( |
| - 'GoogleBot' in request.user_agent or |
| - 'GoogleSecurityScanner' in request.user_agent) |
| - |
| - http_metrics.server_durations.add(elapsed_ms, fields=fields) |
| - http_metrics.server_response_status.increment(fields=fields) |
| - if request.content_length is not None: |
| - http_metrics.server_request_bytes.add(request.content_length, |
| - fields=fields) |
| - if response.content_length is not None: # pragma: no cover |
| - http_metrics.server_response_bytes.add(response.content_length, |
| - fields=fields) |
| + # Use the route template regex, not the request path, to prevent an |
| + # explosion in possible field values. |
| + name = request.route.template if request.route is not None else '' |
| + |
| + http_metrics.update_http_server_metrics( |
| + name, response_status, elapsed_ms, |
| + request_size=request.content_length, |
| + response_size=response.content_length, |
| + user_agent=request.user_agent) |
| + |
| return ret |
| @@ -227,6 +228,43 @@ def instrument_wsgi_application(app, time_fn=time.time): |
| app.router.__instrumented_by_ts_mon = True |
| +def instrument_endpoint(time_fn=time.time): |
| + """Decorator to instrument Cloud Endpoint methods.""" |
| + def decorator(fn): |
| + method_name = fn.__name__ |
| + assert method_name |
| + @functools.wraps(fn) |
| + def decorated(service, *args, **kwargs): |
| + service_name = service.__class__.__name__ |
| + endpoint_name = '/_ah/spi/%s.%s' % (service_name, method_name) |
| + start_time = time_fn() |
| + response_status = 0 |
| + interface.state.store.initialize_context() |
| + flush_thread = None |
| + if need_to_flush_metrics(time_fn=time_fn): |
| + flush_thread = threading.Thread( |
| + target=_flush_metrics, args=(interface.state.last_flushed,)) |
| + flush_thread.start() |
| + try: |
| + ret = fn(service, *args, **kwargs) |
| + response_status = 200 |
| + return ret |
| + except endpoints.ServiceException as e: |
| + response_status = e.http_status |
| + raise |
| + except Exception: |
| + response_status = 500 |
| + raise |
| + finally: |
| + if flush_thread: |
| + flush_thread.join() |
|
nodir
2016/03/18 17:38:02
same here
Sergey Berezin
2016/03/21 18:49:32
Same response as above.
|
| + elapsed_ms = int((time_fn() - start_time) * 1000) |
| + http_metrics.update_http_server_metrics( |
| + endpoint_name, response_status, elapsed_ms) |
| + return decorated |
| + return decorator |
| + |
| + |
| def reset_for_unittest(): |
| shared.reset_for_unittest() |
| interface.reset_for_unittest() |