| Index: appengine_module/gae_ts_mon/config.py
|
| diff --git a/appengine_module/gae_ts_mon/config.py b/appengine_module/gae_ts_mon/config.py
|
| index 8a9673f0c5eae55fa279dff86104f5e4162da99b..790897d83ea891de8dbe09172eb91f0606b49894 100644
|
| --- a/appengine_module/gae_ts_mon/config.py
|
| +++ b/appengine_module/gae_ts_mon/config.py
|
| @@ -4,14 +4,12 @@
|
|
|
| import copy
|
| import datetime
|
| -import functools
|
| import logging
|
| import os
|
| import sys
|
| import time
|
| import threading
|
|
|
| -import endpoints
|
| import webapp2
|
|
|
| from google.appengine.api import modules
|
| @@ -40,50 +38,33 @@
|
| _flush_metrics_lock = threading.Lock()
|
|
|
|
|
| -def need_to_flush_metrics(time_now):
|
| - """Check if metrics need flushing, and update the timestamp of last flush.
|
| -
|
| - Even though the caller of this function may not successfully flush the
|
| - metrics, we still update the last_flushed timestamp to prevent too much work
|
| - being done in user requests.
|
| -
|
| - Also, this check-and-update has to happen atomically, to ensure only one
|
| - thread can flush metrics at a time.
|
| - """
|
| - if not interface.state.flush_enabled_fn():
|
| - return False
|
| - datetime_now = datetime.datetime.utcfromtimestamp(time_now)
|
| - minute_ago = datetime_now - datetime.timedelta(seconds=60)
|
| +def flush_metrics_if_needed(time_fn=datetime.datetime.utcnow):
|
| + time_now = time_fn()
|
| + minute_ago = time_now - datetime.timedelta(seconds=60)
|
| with _flush_metrics_lock:
|
| if interface.state.last_flushed > minute_ago:
|
| return False
|
| - interface.state.last_flushed = datetime_now
|
| - return True
|
| -
|
| -
|
| -def flush_metrics_if_needed(time_now):
|
| - if not need_to_flush_metrics(time_now):
|
| - return False
|
| + interface.state.last_flushed = time_now
|
| +
|
| return _flush_metrics(time_now)
|
|
|
|
|
| def _flush_metrics(time_now):
|
| """Return True if metrics were actually sent."""
|
| - datetime_now = datetime.datetime.utcfromtimestamp(time_now)
|
| entity = shared.get_instance_entity()
|
| if entity.task_num < 0:
|
| if interface.state.target.task_num >= 0:
|
| _reset_cumulative_metrics()
|
| interface.state.target.task_num = -1
|
| interface.state.last_flushed = entity.last_updated
|
| - updated_sec_ago = (datetime_now - entity.last_updated).total_seconds()
|
| + updated_sec_ago = (time_now - entity.last_updated).total_seconds()
|
| if updated_sec_ago > shared.INSTANCE_EXPECTED_TO_HAVE_TASK_NUM_SEC:
|
| logging.warning('Instance %s is %d seconds old with no task_num.',
|
| shared.instance_key_id(), updated_sec_ago)
|
| return False
|
| interface.state.target.task_num = entity.task_num
|
|
|
| - entity.last_updated = datetime_now
|
| + entity.last_updated = time_now
|
| entity_deferred = entity.put_async()
|
|
|
| interface.flush()
|
| @@ -95,9 +76,9 @@
|
| return True
|
|
|
|
|
| -def _shutdown_hook(time_fn=time.time):
|
| +def _shutdown_hook():
|
| shared.shutdown_counter.increment()
|
| - if flush_metrics_if_needed(time_fn()):
|
| + if flush_metrics_if_needed():
|
| logging.info('Shutdown hook: deleting %s, metrics were flushed.',
|
| shared.instance_key_id())
|
| else:
|
| @@ -190,11 +171,8 @@
|
| start_time = time_fn()
|
| response_status = 0
|
| interface.state.store.initialize_context()
|
| - flush_thread = None
|
| - time_now = time_fn()
|
| - if need_to_flush_metrics(time_now):
|
| - flush_thread = threading.Thread(target=_flush_metrics, args=(time_now,))
|
| - flush_thread.start()
|
| + flush_thread = threading.Thread(target=flush_metrics_if_needed)
|
| + flush_thread.start()
|
| try:
|
| ret = dispatcher(request, response)
|
| except webapp2.HTTPException as ex:
|
| @@ -208,20 +186,30 @@
|
| response = ret
|
| response_status = response.status_int
|
| finally:
|
| - if flush_thread:
|
| - flush_thread.join()
|
| + flush_thread.join()
|
| elapsed_ms = int((time_fn() - start_time) * 1000)
|
|
|
| - # Use the route template regex, not the request path, to prevent an
|
| - # explosion in possible field values.
|
| - name = request.route.template if request.route is not None else ''
|
| -
|
| - http_metrics.update_http_server_metrics(
|
| - name, response_status, elapsed_ms,
|
| - request_size=request.content_length,
|
| - response_size=response.content_length,
|
| - user_agent=request.user_agent)
|
| -
|
| + fields = {'status': response_status, 'name': '', 'is_robot': False}
|
| + if request.route is not None:
|
| + # Use the route template regex, not the request path, to prevent an
|
| + # explosion in possible field values.
|
| + fields['name'] = request.route.template
|
| + if request.user_agent is not None:
|
| + # We must not log user agents, but we can store whether or not the
|
| + # user agent string indicates that the requester was a Google bot.
|
| + fields['is_robot'] = (
|
| + 'GoogleBot' in request.user_agent or
|
| + 'GoogleSecurityScanner' in request.user_agent or
|
| + request.user_agent == 'B3M/prober')
|
| +
|
| + http_metrics.server_durations.add(elapsed_ms, fields=fields)
|
| + http_metrics.server_response_status.increment(fields=fields)
|
| + if request.content_length is not None:
|
| + http_metrics.server_request_bytes.add(request.content_length,
|
| + fields=fields)
|
| + if response.content_length is not None: # pragma: no cover
|
| + http_metrics.server_response_bytes.add(response.content_length,
|
| + fields=fields)
|
| return ret
|
|
|
|
|
| @@ -240,43 +228,6 @@
|
| app.router.__instrumented_by_ts_mon = True
|
|
|
|
|
| -def instrument_endpoint(time_fn=time.time):
|
| - """Decorator to instrument Cloud Endpoint methods."""
|
| - def decorator(fn):
|
| - method_name = fn.__name__
|
| - assert method_name
|
| - @functools.wraps(fn)
|
| - def decorated(service, *args, **kwargs):
|
| - service_name = service.__class__.__name__
|
| - endpoint_name = '/_ah/spi/%s.%s' % (service_name, method_name)
|
| - start_time = time_fn()
|
| - response_status = 0
|
| - interface.state.store.initialize_context()
|
| - flush_thread = None
|
| - time_now = time_fn()
|
| - if need_to_flush_metrics(time_now):
|
| - flush_thread = threading.Thread(target=_flush_metrics, args=(time_now,))
|
| - flush_thread.start()
|
| - try:
|
| - ret = fn(service, *args, **kwargs)
|
| - response_status = 200
|
| - return ret
|
| - except endpoints.ServiceException as e:
|
| - response_status = e.http_status
|
| - raise
|
| - except Exception:
|
| - response_status = 500
|
| - raise
|
| - finally:
|
| - if flush_thread:
|
| - flush_thread.join()
|
| - elapsed_ms = int((time_fn() - start_time) * 1000)
|
| - http_metrics.update_http_server_metrics(
|
| - endpoint_name, response_status, elapsed_ms)
|
| - return decorated
|
| - return decorator
|
| -
|
| -
|
| -def reset_for_unittest(disable=False):
|
| +def reset_for_unittest():
|
| shared.reset_for_unittest()
|
| - interface.reset_for_unittest(disable=disable)
|
| + interface.reset_for_unittest()
|
|
|