| Index: appengine_module/gae_ts_mon/config.py
|
| diff --git a/appengine_module/gae_ts_mon/config.py b/appengine_module/gae_ts_mon/config.py
|
| index 790897d83ea891de8dbe09172eb91f0606b49894..8a9673f0c5eae55fa279dff86104f5e4162da99b 100644
|
| --- a/appengine_module/gae_ts_mon/config.py
|
| +++ b/appengine_module/gae_ts_mon/config.py
|
| @@ -4,12 +4,14 @@
|
|
|
| import copy
|
| import datetime
|
| +import functools
|
| import logging
|
| import os
|
| import sys
|
| import time
|
| import threading
|
|
|
| +import endpoints
|
| import webapp2
|
|
|
| from google.appengine.api import modules
|
| @@ -38,33 +40,50 @@ def _reset_cumulative_metrics():
|
| _flush_metrics_lock = threading.Lock()
|
|
|
|
|
| -def flush_metrics_if_needed(time_fn=datetime.datetime.utcnow):
|
| - time_now = time_fn()
|
| - minute_ago = time_now - datetime.timedelta(seconds=60)
|
| +def need_to_flush_metrics(time_now):
|
| + """Check if metrics need flushing, and update the timestamp of last flush.
|
| +
|
| + Even though the caller of this function may not successfully flush the
|
| + metrics, we still update the last_flushed timestamp to prevent too much work
|
| + being done in user requests.
|
| +
|
| + Also, this check-and-update has to happen atomically, to ensure only one
|
| + thread can flush metrics at a time.
|
| + """
|
| + if not interface.state.flush_enabled_fn():
|
| + return False
|
| + datetime_now = datetime.datetime.utcfromtimestamp(time_now)
|
| + minute_ago = datetime_now - datetime.timedelta(seconds=60)
|
| with _flush_metrics_lock:
|
| if interface.state.last_flushed > minute_ago:
|
| return False
|
| - interface.state.last_flushed = time_now
|
| + interface.state.last_flushed = datetime_now
|
| + return True
|
|
|
| +
|
| +def flush_metrics_if_needed(time_now):
|
| + if not need_to_flush_metrics(time_now):
|
| + return False
|
| return _flush_metrics(time_now)
|
|
|
|
|
| def _flush_metrics(time_now):
|
| """Return True if metrics were actually sent."""
|
| + datetime_now = datetime.datetime.utcfromtimestamp(time_now)
|
| entity = shared.get_instance_entity()
|
| if entity.task_num < 0:
|
| if interface.state.target.task_num >= 0:
|
| _reset_cumulative_metrics()
|
| interface.state.target.task_num = -1
|
| interface.state.last_flushed = entity.last_updated
|
| - updated_sec_ago = (time_now - entity.last_updated).total_seconds()
|
| + updated_sec_ago = (datetime_now - entity.last_updated).total_seconds()
|
| if updated_sec_ago > shared.INSTANCE_EXPECTED_TO_HAVE_TASK_NUM_SEC:
|
| logging.warning('Instance %s is %d seconds old with no task_num.',
|
| shared.instance_key_id(), updated_sec_ago)
|
| return False
|
| interface.state.target.task_num = entity.task_num
|
|
|
| - entity.last_updated = time_now
|
| + entity.last_updated = datetime_now
|
| entity_deferred = entity.put_async()
|
|
|
| interface.flush()
|
| @@ -76,9 +95,9 @@ def _flush_metrics(time_now):
|
| return True
|
|
|
|
|
| -def _shutdown_hook():
|
| +def _shutdown_hook(time_fn=time.time):
|
| shared.shutdown_counter.increment()
|
| - if flush_metrics_if_needed():
|
| + if flush_metrics_if_needed(time_fn()):
|
| logging.info('Shutdown hook: deleting %s, metrics were flushed.',
|
| shared.instance_key_id())
|
| else:
|
| @@ -171,8 +190,11 @@ def _instrumented_dispatcher(dispatcher, request, response, time_fn=time.time):
|
| start_time = time_fn()
|
| response_status = 0
|
| interface.state.store.initialize_context()
|
| - flush_thread = threading.Thread(target=flush_metrics_if_needed)
|
| - flush_thread.start()
|
| + flush_thread = None
|
| + time_now = time_fn()
|
| + if need_to_flush_metrics(time_now):
|
| + flush_thread = threading.Thread(target=_flush_metrics, args=(time_now,))
|
| + flush_thread.start()
|
| try:
|
| ret = dispatcher(request, response)
|
| except webapp2.HTTPException as ex:
|
| @@ -186,30 +208,20 @@ def _instrumented_dispatcher(dispatcher, request, response, time_fn=time.time):
|
| response = ret
|
| response_status = response.status_int
|
| finally:
|
| - flush_thread.join()
|
| + if flush_thread:
|
| + flush_thread.join()
|
| elapsed_ms = int((time_fn() - start_time) * 1000)
|
|
|
| - fields = {'status': response_status, 'name': '', 'is_robot': False}
|
| - if request.route is not None:
|
| - # Use the route template regex, not the request path, to prevent an
|
| - # explosion in possible field values.
|
| - fields['name'] = request.route.template
|
| - if request.user_agent is not None:
|
| - # We must not log user agents, but we can store whether or not the
|
| - # user agent string indicates that the requester was a Google bot.
|
| - fields['is_robot'] = (
|
| - 'GoogleBot' in request.user_agent or
|
| - 'GoogleSecurityScanner' in request.user_agent or
|
| - request.user_agent == 'B3M/prober')
|
| -
|
| - http_metrics.server_durations.add(elapsed_ms, fields=fields)
|
| - http_metrics.server_response_status.increment(fields=fields)
|
| - if request.content_length is not None:
|
| - http_metrics.server_request_bytes.add(request.content_length,
|
| - fields=fields)
|
| - if response.content_length is not None: # pragma: no cover
|
| - http_metrics.server_response_bytes.add(response.content_length,
|
| - fields=fields)
|
| + # Use the route template regex, not the request path, to prevent an
|
| + # explosion in possible field values.
|
| + name = request.route.template if request.route is not None else ''
|
| +
|
| + http_metrics.update_http_server_metrics(
|
| + name, response_status, elapsed_ms,
|
| + request_size=request.content_length,
|
| + response_size=response.content_length,
|
| + user_agent=request.user_agent)
|
| +
|
| return ret
|
|
|
|
|
| @@ -228,6 +240,43 @@ def instrument_wsgi_application(app, time_fn=time.time):
|
| app.router.__instrumented_by_ts_mon = True
|
|
|
|
|
| -def reset_for_unittest():
|
| +def instrument_endpoint(time_fn=time.time):
|
| + """Decorator to instrument Cloud Endpoint methods."""
|
| + def decorator(fn):
|
| + method_name = fn.__name__
|
| + assert method_name
|
| + @functools.wraps(fn)
|
| + def decorated(service, *args, **kwargs):
|
| + service_name = service.__class__.__name__
|
| + endpoint_name = '/_ah/spi/%s.%s' % (service_name, method_name)
|
| + start_time = time_fn()
|
| + response_status = 0
|
| + interface.state.store.initialize_context()
|
| + flush_thread = None
|
| + time_now = time_fn()
|
| + if need_to_flush_metrics(time_now):
|
| + flush_thread = threading.Thread(target=_flush_metrics, args=(time_now,))
|
| + flush_thread.start()
|
| + try:
|
| + ret = fn(service, *args, **kwargs)
|
| + response_status = 200
|
| + return ret
|
| + except endpoints.ServiceException as e:
|
| + response_status = e.http_status
|
| + raise
|
| + except Exception:
|
| + response_status = 500
|
| + raise
|
| + finally:
|
| + if flush_thread:
|
| + flush_thread.join()
|
| + elapsed_ms = int((time_fn() - start_time) * 1000)
|
| + http_metrics.update_http_server_metrics(
|
| + endpoint_name, response_status, elapsed_ms)
|
| + return decorated
|
| + return decorator
|
| +
|
| +
|
| +def reset_for_unittest(disable=False):
|
| shared.reset_for_unittest()
|
| - interface.reset_for_unittest()
|
| + interface.reset_for_unittest(disable=disable)
|
|
|