Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(62)

Unified Diff: appengine_module/gae_ts_mon/config.py

Issue 1797103003: gae_ts_mon: instrument Cloud Endpoint methods (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Reduced test flakiness Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « appengine_module/gae_ts_mon/__init__.py ('k') | appengine_module/gae_ts_mon/test/config_test.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: appengine_module/gae_ts_mon/config.py
diff --git a/appengine_module/gae_ts_mon/config.py b/appengine_module/gae_ts_mon/config.py
index 790897d83ea891de8dbe09172eb91f0606b49894..8a9673f0c5eae55fa279dff86104f5e4162da99b 100644
--- a/appengine_module/gae_ts_mon/config.py
+++ b/appengine_module/gae_ts_mon/config.py
@@ -4,12 +4,14 @@
import copy
import datetime
+import functools
import logging
import os
import sys
import time
import threading
+import endpoints
import webapp2
from google.appengine.api import modules
@@ -38,33 +40,50 @@ def _reset_cumulative_metrics():
_flush_metrics_lock = threading.Lock()
-def flush_metrics_if_needed(time_fn=datetime.datetime.utcnow):
- time_now = time_fn()
- minute_ago = time_now - datetime.timedelta(seconds=60)
+def need_to_flush_metrics(time_now):
+ """Check if metrics need flushing, and update the timestamp of last flush.
+
+ Even though the caller of this function may not successfully flush the
+ metrics, we still update the last_flushed timestamp to prevent too much work
+ being done in user requests.
+
+ Also, this check-and-update has to happen atomically, to ensure only one
+ thread can flush metrics at a time.
+ """
+ if not interface.state.flush_enabled_fn():
+ return False
+ datetime_now = datetime.datetime.utcfromtimestamp(time_now)
+ minute_ago = datetime_now - datetime.timedelta(seconds=60)
with _flush_metrics_lock:
if interface.state.last_flushed > minute_ago:
return False
- interface.state.last_flushed = time_now
+ interface.state.last_flushed = datetime_now
+ return True
+
+def flush_metrics_if_needed(time_now):
+ if not need_to_flush_metrics(time_now):
+ return False
return _flush_metrics(time_now)
def _flush_metrics(time_now):
"""Return True if metrics were actually sent."""
+ datetime_now = datetime.datetime.utcfromtimestamp(time_now)
entity = shared.get_instance_entity()
if entity.task_num < 0:
if interface.state.target.task_num >= 0:
_reset_cumulative_metrics()
interface.state.target.task_num = -1
interface.state.last_flushed = entity.last_updated
- updated_sec_ago = (time_now - entity.last_updated).total_seconds()
+ updated_sec_ago = (datetime_now - entity.last_updated).total_seconds()
if updated_sec_ago > shared.INSTANCE_EXPECTED_TO_HAVE_TASK_NUM_SEC:
logging.warning('Instance %s is %d seconds old with no task_num.',
shared.instance_key_id(), updated_sec_ago)
return False
interface.state.target.task_num = entity.task_num
- entity.last_updated = time_now
+ entity.last_updated = datetime_now
entity_deferred = entity.put_async()
interface.flush()
@@ -76,9 +95,9 @@ def _flush_metrics(time_now):
return True
-def _shutdown_hook():
+def _shutdown_hook(time_fn=time.time):
shared.shutdown_counter.increment()
- if flush_metrics_if_needed():
+ if flush_metrics_if_needed(time_fn()):
logging.info('Shutdown hook: deleting %s, metrics were flushed.',
shared.instance_key_id())
else:
@@ -171,8 +190,11 @@ def _instrumented_dispatcher(dispatcher, request, response, time_fn=time.time):
start_time = time_fn()
response_status = 0
interface.state.store.initialize_context()
- flush_thread = threading.Thread(target=flush_metrics_if_needed)
- flush_thread.start()
+ flush_thread = None
+ time_now = time_fn()
+ if need_to_flush_metrics(time_now):
+ flush_thread = threading.Thread(target=_flush_metrics, args=(time_now,))
+ flush_thread.start()
try:
ret = dispatcher(request, response)
except webapp2.HTTPException as ex:
@@ -186,30 +208,20 @@ def _instrumented_dispatcher(dispatcher, request, response, time_fn=time.time):
response = ret
response_status = response.status_int
finally:
- flush_thread.join()
+ if flush_thread:
+ flush_thread.join()
elapsed_ms = int((time_fn() - start_time) * 1000)
- fields = {'status': response_status, 'name': '', 'is_robot': False}
- if request.route is not None:
- # Use the route template regex, not the request path, to prevent an
- # explosion in possible field values.
- fields['name'] = request.route.template
- if request.user_agent is not None:
- # We must not log user agents, but we can store whether or not the
- # user agent string indicates that the requester was a Google bot.
- fields['is_robot'] = (
- 'GoogleBot' in request.user_agent or
- 'GoogleSecurityScanner' in request.user_agent or
- request.user_agent == 'B3M/prober')
-
- http_metrics.server_durations.add(elapsed_ms, fields=fields)
- http_metrics.server_response_status.increment(fields=fields)
- if request.content_length is not None:
- http_metrics.server_request_bytes.add(request.content_length,
- fields=fields)
- if response.content_length is not None: # pragma: no cover
- http_metrics.server_response_bytes.add(response.content_length,
- fields=fields)
+ # Use the route template regex, not the request path, to prevent an
+ # explosion in possible field values.
+ name = request.route.template if request.route is not None else ''
+
+ http_metrics.update_http_server_metrics(
+ name, response_status, elapsed_ms,
+ request_size=request.content_length,
+ response_size=response.content_length,
+ user_agent=request.user_agent)
+
return ret
@@ -228,6 +240,43 @@ def instrument_wsgi_application(app, time_fn=time.time):
app.router.__instrumented_by_ts_mon = True
-def reset_for_unittest():
+def instrument_endpoint(time_fn=time.time):
+ """Decorator to instrument Cloud Endpoint methods."""
+ def decorator(fn):
+ method_name = fn.__name__
+ assert method_name
+ @functools.wraps(fn)
+ def decorated(service, *args, **kwargs):
+ service_name = service.__class__.__name__
+ endpoint_name = '/_ah/spi/%s.%s' % (service_name, method_name)
+ start_time = time_fn()
+ response_status = 0
+ interface.state.store.initialize_context()
+ flush_thread = None
+ time_now = time_fn()
+ if need_to_flush_metrics(time_now):
+ flush_thread = threading.Thread(target=_flush_metrics, args=(time_now,))
+ flush_thread.start()
+ try:
+ ret = fn(service, *args, **kwargs)
+ response_status = 200
+ return ret
+ except endpoints.ServiceException as e:
+ response_status = e.http_status
+ raise
+ except Exception:
+ response_status = 500
+ raise
+ finally:
+ if flush_thread:
+ flush_thread.join()
+ elapsed_ms = int((time_fn() - start_time) * 1000)
+ http_metrics.update_http_server_metrics(
+ endpoint_name, response_status, elapsed_ms)
+ return decorated
+ return decorator
+
+
+def reset_for_unittest(disable=False):
shared.reset_for_unittest()
- interface.reset_for_unittest()
+ interface.reset_for_unittest(disable=disable)
« no previous file with comments | « appengine_module/gae_ts_mon/__init__.py ('k') | appengine_module/gae_ts_mon/test/config_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698