Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(376)

Side by Side Diff: appengine_module/gae_ts_mon/config.py

Issue 1797103003: gae_ts_mon: instrument Cloud Endpoint methods (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Reduced test flakiness Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2015 The Chromium Authors. All rights reserved. 1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 import copy 5 import copy
6 import datetime 6 import datetime
7 import functools
7 import logging 8 import logging
8 import os 9 import os
9 import sys 10 import sys
10 import time 11 import time
11 import threading 12 import threading
12 13
14 import endpoints
13 import webapp2 15 import webapp2
14 16
15 from google.appengine.api import modules 17 from google.appengine.api import modules
16 from google.appengine.api.app_identity import app_identity 18 from google.appengine.api.app_identity import app_identity
17 from google.appengine.api import runtime 19 from google.appengine.api import runtime
18 from google.appengine.ext import ndb 20 from google.appengine.ext import ndb
19 21
20 from infra_libs.ts_mon import handlers 22 from infra_libs.ts_mon import handlers
21 from infra_libs.ts_mon import shared 23 from infra_libs.ts_mon import shared
22 from infra_libs.ts_mon.common import http_metrics 24 from infra_libs.ts_mon.common import http_metrics
23 from infra_libs.ts_mon.common import interface 25 from infra_libs.ts_mon.common import interface
24 from infra_libs.ts_mon.common import metric_store 26 from infra_libs.ts_mon.common import metric_store
25 from infra_libs.ts_mon.common import monitors 27 from infra_libs.ts_mon.common import monitors
26 from infra_libs.ts_mon.common import targets 28 from infra_libs.ts_mon.common import targets
27 29
28 30
29 def _reset_cumulative_metrics(): 31 def _reset_cumulative_metrics():
30 """Clear the state when an instance loses its task_num assignment.""" 32 """Clear the state when an instance loses its task_num assignment."""
31 logging.warning('Instance %s got purged from Datastore, but is still alive. ' 33 logging.warning('Instance %s got purged from Datastore, but is still alive. '
32 'Clearing cumulative metrics.', shared.instance_key_id()) 34 'Clearing cumulative metrics.', shared.instance_key_id())
33 for _target, metric, start_time, _fields in interface.state.store.get_all(): 35 for _target, metric, start_time, _fields in interface.state.store.get_all():
34 if metric.is_cumulative(): 36 if metric.is_cumulative():
35 metric.reset() 37 metric.reset()
36 38
37 39
38 _flush_metrics_lock = threading.Lock() 40 _flush_metrics_lock = threading.Lock()
39 41
40 42
41 def flush_metrics_if_needed(time_fn=datetime.datetime.utcnow): 43 def need_to_flush_metrics(time_now):
42 time_now = time_fn() 44 """Check if metrics need flushing, and update the timestamp of last flush.
43 minute_ago = time_now - datetime.timedelta(seconds=60) 45
46 Even though the caller of this function may not successfully flush the
47 metrics, we still update the last_flushed timestamp to prevent too much work
48 being done in user requests.
49
50 Also, this check-and-update has to happen atomically, to ensure only one
51 thread can flush metrics at a time.
52 """
53 if not interface.state.flush_enabled_fn():
54 return False
55 datetime_now = datetime.datetime.utcfromtimestamp(time_now)
56 minute_ago = datetime_now - datetime.timedelta(seconds=60)
44 with _flush_metrics_lock: 57 with _flush_metrics_lock:
45 if interface.state.last_flushed > minute_ago: 58 if interface.state.last_flushed > minute_ago:
46 return False 59 return False
47 interface.state.last_flushed = time_now 60 interface.state.last_flushed = datetime_now
61 return True
48 62
63
64 def flush_metrics_if_needed(time_now):
65 if not need_to_flush_metrics(time_now):
66 return False
49 return _flush_metrics(time_now) 67 return _flush_metrics(time_now)
50 68
51 69
52 def _flush_metrics(time_now): 70 def _flush_metrics(time_now):
53 """Return True if metrics were actually sent.""" 71 """Return True if metrics were actually sent."""
72 datetime_now = datetime.datetime.utcfromtimestamp(time_now)
54 entity = shared.get_instance_entity() 73 entity = shared.get_instance_entity()
55 if entity.task_num < 0: 74 if entity.task_num < 0:
56 if interface.state.target.task_num >= 0: 75 if interface.state.target.task_num >= 0:
57 _reset_cumulative_metrics() 76 _reset_cumulative_metrics()
58 interface.state.target.task_num = -1 77 interface.state.target.task_num = -1
59 interface.state.last_flushed = entity.last_updated 78 interface.state.last_flushed = entity.last_updated
60 updated_sec_ago = (time_now - entity.last_updated).total_seconds() 79 updated_sec_ago = (datetime_now - entity.last_updated).total_seconds()
61 if updated_sec_ago > shared.INSTANCE_EXPECTED_TO_HAVE_TASK_NUM_SEC: 80 if updated_sec_ago > shared.INSTANCE_EXPECTED_TO_HAVE_TASK_NUM_SEC:
62 logging.warning('Instance %s is %d seconds old with no task_num.', 81 logging.warning('Instance %s is %d seconds old with no task_num.',
63 shared.instance_key_id(), updated_sec_ago) 82 shared.instance_key_id(), updated_sec_ago)
64 return False 83 return False
65 interface.state.target.task_num = entity.task_num 84 interface.state.target.task_num = entity.task_num
66 85
67 entity.last_updated = time_now 86 entity.last_updated = datetime_now
68 entity_deferred = entity.put_async() 87 entity_deferred = entity.put_async()
69 88
70 interface.flush() 89 interface.flush()
71 90
72 for metric in shared.global_metrics.itervalues(): 91 for metric in shared.global_metrics.itervalues():
73 metric.reset() 92 metric.reset()
74 93
75 entity_deferred.get_result() 94 entity_deferred.get_result()
76 return True 95 return True
77 96
78 97
79 def _shutdown_hook(): 98 def _shutdown_hook(time_fn=time.time):
80 shared.shutdown_counter.increment() 99 shared.shutdown_counter.increment()
81 if flush_metrics_if_needed(): 100 if flush_metrics_if_needed(time_fn()):
82 logging.info('Shutdown hook: deleting %s, metrics were flushed.', 101 logging.info('Shutdown hook: deleting %s, metrics were flushed.',
83 shared.instance_key_id()) 102 shared.instance_key_id())
84 else: 103 else:
85 logging.warning('Shutdown hook: deleting %s, metrics were NOT flushed.', 104 logging.warning('Shutdown hook: deleting %s, metrics were NOT flushed.',
86 shared.instance_key_id()) 105 shared.instance_key_id())
87 with shared.instance_namespace_context(): 106 with shared.instance_namespace_context():
88 ndb.Key('Instance', shared.instance_key_id()).delete() 107 ndb.Key('Instance', shared.instance_key_id()).delete()
89 108
90 109
91 def _internal_callback(): 110 def _internal_callback():
(...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after
164 shared.INTERNAL_CALLBACK_NAME, _internal_callback) 183 shared.INTERNAL_CALLBACK_NAME, _internal_callback)
165 184
166 logging.info('Initialized ts_mon with service_name=%s, job_name=%s, ' 185 logging.info('Initialized ts_mon with service_name=%s, job_name=%s, '
167 'hostname=%s', service_name, job_name, hostname) 186 'hostname=%s', service_name, job_name, hostname)
168 187
169 188
170 def _instrumented_dispatcher(dispatcher, request, response, time_fn=time.time): 189 def _instrumented_dispatcher(dispatcher, request, response, time_fn=time.time):
171 start_time = time_fn() 190 start_time = time_fn()
172 response_status = 0 191 response_status = 0
173 interface.state.store.initialize_context() 192 interface.state.store.initialize_context()
174 flush_thread = threading.Thread(target=flush_metrics_if_needed) 193 flush_thread = None
175 flush_thread.start() 194 time_now = time_fn()
195 if need_to_flush_metrics(time_now):
196 flush_thread = threading.Thread(target=_flush_metrics, args=(time_now,))
197 flush_thread.start()
176 try: 198 try:
177 ret = dispatcher(request, response) 199 ret = dispatcher(request, response)
178 except webapp2.HTTPException as ex: 200 except webapp2.HTTPException as ex:
179 response_status = ex.code 201 response_status = ex.code
180 raise 202 raise
181 except Exception: 203 except Exception:
182 response_status = 500 204 response_status = 500
183 raise 205 raise
184 else: 206 else:
185 if isinstance(ret, webapp2.Response): 207 if isinstance(ret, webapp2.Response):
186 response = ret 208 response = ret
187 response_status = response.status_int 209 response_status = response.status_int
188 finally: 210 finally:
189 flush_thread.join() 211 if flush_thread:
212 flush_thread.join()
190 elapsed_ms = int((time_fn() - start_time) * 1000) 213 elapsed_ms = int((time_fn() - start_time) * 1000)
191 214
192 fields = {'status': response_status, 'name': '', 'is_robot': False} 215 # Use the route template regex, not the request path, to prevent an
193 if request.route is not None: 216 # explosion in possible field values.
194 # Use the route template regex, not the request path, to prevent an 217 name = request.route.template if request.route is not None else ''
195 # explosion in possible field values.
196 fields['name'] = request.route.template
197 if request.user_agent is not None:
198 # We must not log user agents, but we can store whether or not the
199 # user agent string indicates that the requester was a Google bot.
200 fields['is_robot'] = (
201 'GoogleBot' in request.user_agent or
202 'GoogleSecurityScanner' in request.user_agent or
203 request.user_agent == 'B3M/prober')
204 218
205 http_metrics.server_durations.add(elapsed_ms, fields=fields) 219 http_metrics.update_http_server_metrics(
206 http_metrics.server_response_status.increment(fields=fields) 220 name, response_status, elapsed_ms,
207 if request.content_length is not None: 221 request_size=request.content_length,
208 http_metrics.server_request_bytes.add(request.content_length, 222 response_size=response.content_length,
209 fields=fields) 223 user_agent=request.user_agent)
210 if response.content_length is not None: # pragma: no cover 224
211 http_metrics.server_response_bytes.add(response.content_length,
212 fields=fields)
213 return ret 225 return ret
214 226
215 227
216 def instrument_wsgi_application(app, time_fn=time.time): 228 def instrument_wsgi_application(app, time_fn=time.time):
217 # Don't instrument the same router twice. 229 # Don't instrument the same router twice.
218 if hasattr(app.router, '__instrumented_by_ts_mon'): 230 if hasattr(app.router, '__instrumented_by_ts_mon'):
219 return 231 return
220 232
221 old_dispatcher = app.router.dispatch 233 old_dispatcher = app.router.dispatch
222 234
223 def dispatch(router, request, response): 235 def dispatch(router, request, response):
224 return _instrumented_dispatcher(old_dispatcher, request, response, 236 return _instrumented_dispatcher(old_dispatcher, request, response,
225 time_fn=time_fn) 237 time_fn=time_fn)
226 238
227 app.router.set_dispatcher(dispatch) 239 app.router.set_dispatcher(dispatch)
228 app.router.__instrumented_by_ts_mon = True 240 app.router.__instrumented_by_ts_mon = True
229 241
230 242
231 def reset_for_unittest(): 243 def instrument_endpoint(time_fn=time.time):
244 """Decorator to instrument Cloud Endpoint methods."""
245 def decorator(fn):
246 method_name = fn.__name__
247 assert method_name
248 @functools.wraps(fn)
249 def decorated(service, *args, **kwargs):
250 service_name = service.__class__.__name__
251 endpoint_name = '/_ah/spi/%s.%s' % (service_name, method_name)
252 start_time = time_fn()
253 response_status = 0
254 interface.state.store.initialize_context()
255 flush_thread = None
256 time_now = time_fn()
257 if need_to_flush_metrics(time_now):
258 flush_thread = threading.Thread(target=_flush_metrics, args=(time_now,))
259 flush_thread.start()
260 try:
261 ret = fn(service, *args, **kwargs)
262 response_status = 200
263 return ret
264 except endpoints.ServiceException as e:
265 response_status = e.http_status
266 raise
267 except Exception:
268 response_status = 500
269 raise
270 finally:
271 if flush_thread:
272 flush_thread.join()
273 elapsed_ms = int((time_fn() - start_time) * 1000)
274 http_metrics.update_http_server_metrics(
275 endpoint_name, response_status, elapsed_ms)
276 return decorated
277 return decorator
278
279
280 def reset_for_unittest(disable=False):
232 shared.reset_for_unittest() 281 shared.reset_for_unittest()
233 interface.reset_for_unittest() 282 interface.reset_for_unittest(disable=disable)
OLDNEW
« no previous file with comments | « appengine_module/gae_ts_mon/__init__.py ('k') | appengine_module/gae_ts_mon/test/config_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698