Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(19)

Side by Side Diff: appengine_module/gae_ts_mon/config.py

Issue 1797103003: gae_ts_mon: instrument Cloud Endpoint methods (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Removed unnecessary 'no cover' Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2015 The Chromium Authors. All rights reserved. 1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 import copy 5 import copy
6 import datetime 6 import datetime
7 import functools
7 import logging 8 import logging
8 import os 9 import os
9 import sys 10 import sys
10 import time 11 import time
11 import threading 12 import threading
12 13
14 import endpoints
13 import webapp2 15 import webapp2
14 16
15 from google.appengine.api import modules 17 from google.appengine.api import modules
16 from google.appengine.api.app_identity import app_identity 18 from google.appengine.api.app_identity import app_identity
17 from google.appengine.api import runtime 19 from google.appengine.api import runtime
18 from google.appengine.ext import ndb 20 from google.appengine.ext import ndb
19 21
20 from infra_libs.ts_mon import handlers 22 from infra_libs.ts_mon import handlers
21 from infra_libs.ts_mon import shared 23 from infra_libs.ts_mon import shared
22 from infra_libs.ts_mon.common import http_metrics 24 from infra_libs.ts_mon.common import http_metrics
23 from infra_libs.ts_mon.common import interface 25 from infra_libs.ts_mon.common import interface
24 from infra_libs.ts_mon.common import metric_store 26 from infra_libs.ts_mon.common import metric_store
25 from infra_libs.ts_mon.common import monitors 27 from infra_libs.ts_mon.common import monitors
26 from infra_libs.ts_mon.common import targets 28 from infra_libs.ts_mon.common import targets
27 29
28 30
29 def _reset_cumulative_metrics(): 31 def _reset_cumulative_metrics():
30 """Clear the state when an instance loses its task_num assignment.""" 32 """Clear the state when an instance loses its task_num assignment."""
31 logging.warning('Instance %s got purged from Datastore, but is still alive. ' 33 logging.warning('Instance %s got purged from Datastore, but is still alive. '
32 'Clearing cumulative metrics.', shared.instance_key_id()) 34 'Clearing cumulative metrics.', shared.instance_key_id())
33 for _target, metric, start_time, _fields in interface.state.store.get_all(): 35 for _target, metric, start_time, _fields in interface.state.store.get_all():
34 if metric.is_cumulative(): 36 if metric.is_cumulative():
35 metric.reset() 37 metric.reset()
36 38
37 39
38 _flush_metrics_lock = threading.Lock() 40 _flush_metrics_lock = threading.Lock()
39 41
40 42
41 def flush_metrics_if_needed(time_fn=datetime.datetime.utcnow): 43 def need_to_flush_metrics(time_fn=time.time):
nodir 2016/03/18 17:38:02 I'd add a docstring that this function is not pure
Sergey Berezin 2016/03/21 18:49:32 Done.
42 time_now = time_fn() 44 time_now = datetime.datetime.utcfromtimestamp(time_fn())
43 minute_ago = time_now - datetime.timedelta(seconds=60) 45 minute_ago = time_now - datetime.timedelta(seconds=60)
44 with _flush_metrics_lock: 46 with _flush_metrics_lock:
45 if interface.state.last_flushed > minute_ago: 47 if interface.state.last_flushed > minute_ago:
46 return False 48 return False
47 interface.state.last_flushed = time_now 49 interface.state.last_flushed = time_now
nodir 2016/03/18 17:38:02 What happens if flushing fails after this value is
Sergey Berezin 2016/03/21 18:49:32 We simply retry a minute later. This in fact is a
50 return True
48 51
49 return _flush_metrics(time_now) 52
53 def flush_metrics_if_needed(time_fn=time.time):
54 if not need_to_flush_metrics(time_fn=time_fn):
55 return False
56 return _flush_metrics(interface.state.last_flushed)
nodir 2016/03/18 17:38:02 passing last_flushed as time_now seems to heavily
Sergey Berezin 2016/03/21 18:49:32 Very good point, thanks. I refactored this part of
50 57
51 58
52 def _flush_metrics(time_now): 59 def _flush_metrics(time_now):
53 """Return True if metrics were actually sent.""" 60 """Return True if metrics were actually sent."""
54 entity = shared.get_instance_entity() 61 entity = shared.get_instance_entity()
55 if entity.task_num < 0: 62 if entity.task_num < 0:
56 if interface.state.target.task_num >= 0: 63 if interface.state.target.task_num >= 0:
57 _reset_cumulative_metrics() 64 _reset_cumulative_metrics()
58 interface.state.target.task_num = -1 65 interface.state.target.task_num = -1
59 interface.state.last_flushed = entity.last_updated 66 interface.state.last_flushed = entity.last_updated
(...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after
164 shared.INTERNAL_CALLBACK_NAME, _internal_callback) 171 shared.INTERNAL_CALLBACK_NAME, _internal_callback)
165 172
166 logging.info('Initialized ts_mon with service_name=%s, job_name=%s, ' 173 logging.info('Initialized ts_mon with service_name=%s, job_name=%s, '
167 'hostname=%s', service_name, job_name, hostname) 174 'hostname=%s', service_name, job_name, hostname)
168 175
169 176
170 def _instrumented_dispatcher(dispatcher, request, response, time_fn=time.time): 177 def _instrumented_dispatcher(dispatcher, request, response, time_fn=time.time):
171 start_time = time_fn() 178 start_time = time_fn()
172 response_status = 0 179 response_status = 0
173 interface.state.store.initialize_context() 180 interface.state.store.initialize_context()
174 flush_thread = threading.Thread(target=flush_metrics_if_needed) 181 flush_thread = None
175 flush_thread.start() 182 if need_to_flush_metrics(time_fn=time_fn):
183 flush_thread = threading.Thread(
184 target=_flush_metrics, args=(interface.state.last_flushed,))
185 flush_thread.start()
176 try: 186 try:
177 ret = dispatcher(request, response) 187 ret = dispatcher(request, response)
178 except webapp2.HTTPException as ex: 188 except webapp2.HTTPException as ex:
179 response_status = ex.code 189 response_status = ex.code
180 raise 190 raise
181 except Exception: 191 except Exception:
182 response_status = 500 192 response_status = 500
183 raise 193 raise
184 else: 194 else:
185 if isinstance(ret, webapp2.Response): 195 if isinstance(ret, webapp2.Response):
186 response = ret 196 response = ret
187 response_status = response.status_int 197 response_status = response.status_int
188 finally: 198 finally:
189 flush_thread.join() 199 if flush_thread:
200 flush_thread.join()
nodir 2016/03/18 17:38:02 please join after elapsed_ms is computed
Sergey Berezin 2016/03/21 18:49:32 This is on purpose: I want to track the total requ
190 elapsed_ms = int((time_fn() - start_time) * 1000) 201 elapsed_ms = int((time_fn() - start_time) * 1000)
191 202
192 fields = {'status': response_status, 'name': '', 'is_robot': False} 203 # Use the route template regex, not the request path, to prevent an
193 if request.route is not None: 204 # explosion in possible field values.
194 # Use the route template regex, not the request path, to prevent an 205 name = request.route.template if request.route is not None else ''
195 # explosion in possible field values.
196 fields['name'] = request.route.template
197 if request.user_agent is not None:
198 # We must not log user agents, but we can store whether or not the
199 # user agent string indicates that the requester was a Google bot.
200 fields['is_robot'] = (
201 'GoogleBot' in request.user_agent or
202 'GoogleSecurityScanner' in request.user_agent)
203 206
204 http_metrics.server_durations.add(elapsed_ms, fields=fields) 207 http_metrics.update_http_server_metrics(
205 http_metrics.server_response_status.increment(fields=fields) 208 name, response_status, elapsed_ms,
206 if request.content_length is not None: 209 request_size=request.content_length,
207 http_metrics.server_request_bytes.add(request.content_length, 210 response_size=response.content_length,
208 fields=fields) 211 user_agent=request.user_agent)
209 if response.content_length is not None: # pragma: no cover 212
210 http_metrics.server_response_bytes.add(response.content_length,
211 fields=fields)
212 return ret 213 return ret
213 214
214 215
215 def instrument_wsgi_application(app, time_fn=time.time): 216 def instrument_wsgi_application(app, time_fn=time.time):
216 # Don't instrument the same router twice. 217 # Don't instrument the same router twice.
217 if hasattr(app.router, '__instrumented_by_ts_mon'): 218 if hasattr(app.router, '__instrumented_by_ts_mon'):
218 return 219 return
219 220
220 old_dispatcher = app.router.dispatch 221 old_dispatcher = app.router.dispatch
221 222
222 def dispatch(router, request, response): 223 def dispatch(router, request, response):
223 return _instrumented_dispatcher(old_dispatcher, request, response, 224 return _instrumented_dispatcher(old_dispatcher, request, response,
224 time_fn=time_fn) 225 time_fn=time_fn)
225 226
226 app.router.set_dispatcher(dispatch) 227 app.router.set_dispatcher(dispatch)
227 app.router.__instrumented_by_ts_mon = True 228 app.router.__instrumented_by_ts_mon = True
228 229
229 230
231 def instrument_endpoint(time_fn=time.time):
232 """Decorator to instrument Cloud Endpoint methods."""
233 def decorator(fn):
234 method_name = fn.__name__
235 assert method_name
236 @functools.wraps(fn)
237 def decorated(service, *args, **kwargs):
238 service_name = service.__class__.__name__
239 endpoint_name = '/_ah/spi/%s.%s' % (service_name, method_name)
240 start_time = time_fn()
241 response_status = 0
242 interface.state.store.initialize_context()
243 flush_thread = None
244 if need_to_flush_metrics(time_fn=time_fn):
245 flush_thread = threading.Thread(
246 target=_flush_metrics, args=(interface.state.last_flushed,))
247 flush_thread.start()
248 try:
249 ret = fn(service, *args, **kwargs)
250 response_status = 200
251 return ret
252 except endpoints.ServiceException as e:
253 response_status = e.http_status
254 raise
255 except Exception:
256 response_status = 500
257 raise
258 finally:
259 if flush_thread:
260 flush_thread.join()
nodir 2016/03/18 17:38:02 same here
Sergey Berezin 2016/03/21 18:49:32 Same response as above.
261 elapsed_ms = int((time_fn() - start_time) * 1000)
262 http_metrics.update_http_server_metrics(
263 endpoint_name, response_status, elapsed_ms)
264 return decorated
265 return decorator
266
267
230 def reset_for_unittest(): 268 def reset_for_unittest():
231 shared.reset_for_unittest() 269 shared.reset_for_unittest()
232 interface.reset_for_unittest() 270 interface.reset_for_unittest()
OLDNEW
« no previous file with comments | « appengine_module/gae_ts_mon/__init__.py ('k') | appengine_module/gae_ts_mon/test/config_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698