Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(302)

Side by Side Diff: appengine_module/gae_ts_mon/config.py

Issue 1829023003: Revert of gae_ts_mon: instrument Cloud Endpoint methods (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2015 The Chromium Authors. All rights reserved. 1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 import copy 5 import copy
6 import datetime 6 import datetime
7 import functools
8 import logging 7 import logging
9 import os 8 import os
10 import sys 9 import sys
11 import time 10 import time
12 import threading 11 import threading
13 12
14 import endpoints
15 import webapp2 13 import webapp2
16 14
17 from google.appengine.api import modules 15 from google.appengine.api import modules
18 from google.appengine.api.app_identity import app_identity 16 from google.appengine.api.app_identity import app_identity
19 from google.appengine.api import runtime 17 from google.appengine.api import runtime
20 from google.appengine.ext import ndb 18 from google.appengine.ext import ndb
21 19
22 from infra_libs.ts_mon import handlers 20 from infra_libs.ts_mon import handlers
23 from infra_libs.ts_mon import shared 21 from infra_libs.ts_mon import shared
24 from infra_libs.ts_mon.common import http_metrics 22 from infra_libs.ts_mon.common import http_metrics
25 from infra_libs.ts_mon.common import interface 23 from infra_libs.ts_mon.common import interface
26 from infra_libs.ts_mon.common import metric_store 24 from infra_libs.ts_mon.common import metric_store
27 from infra_libs.ts_mon.common import monitors 25 from infra_libs.ts_mon.common import monitors
28 from infra_libs.ts_mon.common import targets 26 from infra_libs.ts_mon.common import targets
29 27
30 28
31 def _reset_cumulative_metrics(): 29 def _reset_cumulative_metrics():
32 """Clear the state when an instance loses its task_num assignment.""" 30 """Clear the state when an instance loses its task_num assignment."""
33 logging.warning('Instance %s got purged from Datastore, but is still alive. ' 31 logging.warning('Instance %s got purged from Datastore, but is still alive. '
34 'Clearing cumulative metrics.', shared.instance_key_id()) 32 'Clearing cumulative metrics.', shared.instance_key_id())
35 for _target, metric, start_time, _fields in interface.state.store.get_all(): 33 for _target, metric, start_time, _fields in interface.state.store.get_all():
36 if metric.is_cumulative(): 34 if metric.is_cumulative():
37 metric.reset() 35 metric.reset()
38 36
39 37
40 _flush_metrics_lock = threading.Lock() 38 _flush_metrics_lock = threading.Lock()
41 39
42 40
43 def need_to_flush_metrics(time_now): 41 def flush_metrics_if_needed(time_fn=datetime.datetime.utcnow):
44 """Check if metrics need flushing, and update the timestamp of last flush. 42 time_now = time_fn()
45 43 minute_ago = time_now - datetime.timedelta(seconds=60)
46 Even though the caller of this function may not successfully flush the
47 metrics, we still update the last_flushed timestamp to prevent too much work
48 being done in user requests.
49
50 Also, this check-and-update has to happen atomically, to ensure only one
51 thread can flush metrics at a time.
52 """
53 if not interface.state.flush_enabled_fn():
54 return False
55 datetime_now = datetime.datetime.utcfromtimestamp(time_now)
56 minute_ago = datetime_now - datetime.timedelta(seconds=60)
57 with _flush_metrics_lock: 44 with _flush_metrics_lock:
58 if interface.state.last_flushed > minute_ago: 45 if interface.state.last_flushed > minute_ago:
59 return False 46 return False
60 interface.state.last_flushed = datetime_now 47 interface.state.last_flushed = time_now
61 return True
62 48
63
64 def flush_metrics_if_needed(time_now):
65 if not need_to_flush_metrics(time_now):
66 return False
67 return _flush_metrics(time_now) 49 return _flush_metrics(time_now)
68 50
69 51
70 def _flush_metrics(time_now): 52 def _flush_metrics(time_now):
71 """Return True if metrics were actually sent.""" 53 """Return True if metrics were actually sent."""
72 datetime_now = datetime.datetime.utcfromtimestamp(time_now)
73 entity = shared.get_instance_entity() 54 entity = shared.get_instance_entity()
74 if entity.task_num < 0: 55 if entity.task_num < 0:
75 if interface.state.target.task_num >= 0: 56 if interface.state.target.task_num >= 0:
76 _reset_cumulative_metrics() 57 _reset_cumulative_metrics()
77 interface.state.target.task_num = -1 58 interface.state.target.task_num = -1
78 interface.state.last_flushed = entity.last_updated 59 interface.state.last_flushed = entity.last_updated
79 updated_sec_ago = (datetime_now - entity.last_updated).total_seconds() 60 updated_sec_ago = (time_now - entity.last_updated).total_seconds()
80 if updated_sec_ago > shared.INSTANCE_EXPECTED_TO_HAVE_TASK_NUM_SEC: 61 if updated_sec_ago > shared.INSTANCE_EXPECTED_TO_HAVE_TASK_NUM_SEC:
81 logging.warning('Instance %s is %d seconds old with no task_num.', 62 logging.warning('Instance %s is %d seconds old with no task_num.',
82 shared.instance_key_id(), updated_sec_ago) 63 shared.instance_key_id(), updated_sec_ago)
83 return False 64 return False
84 interface.state.target.task_num = entity.task_num 65 interface.state.target.task_num = entity.task_num
85 66
86 entity.last_updated = datetime_now 67 entity.last_updated = time_now
87 entity_deferred = entity.put_async() 68 entity_deferred = entity.put_async()
88 69
89 interface.flush() 70 interface.flush()
90 71
91 for metric in shared.global_metrics.itervalues(): 72 for metric in shared.global_metrics.itervalues():
92 metric.reset() 73 metric.reset()
93 74
94 entity_deferred.get_result() 75 entity_deferred.get_result()
95 return True 76 return True
96 77
97 78
98 def _shutdown_hook(time_fn=time.time): 79 def _shutdown_hook():
99 shared.shutdown_counter.increment() 80 shared.shutdown_counter.increment()
100 if flush_metrics_if_needed(time_fn()): 81 if flush_metrics_if_needed():
101 logging.info('Shutdown hook: deleting %s, metrics were flushed.', 82 logging.info('Shutdown hook: deleting %s, metrics were flushed.',
102 shared.instance_key_id()) 83 shared.instance_key_id())
103 else: 84 else:
104 logging.warning('Shutdown hook: deleting %s, metrics were NOT flushed.', 85 logging.warning('Shutdown hook: deleting %s, metrics were NOT flushed.',
105 shared.instance_key_id()) 86 shared.instance_key_id())
106 with shared.instance_namespace_context(): 87 with shared.instance_namespace_context():
107 ndb.Key('Instance', shared.instance_key_id()).delete() 88 ndb.Key('Instance', shared.instance_key_id()).delete()
108 89
109 90
110 def _internal_callback(): 91 def _internal_callback():
(...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after
183 shared.INTERNAL_CALLBACK_NAME, _internal_callback) 164 shared.INTERNAL_CALLBACK_NAME, _internal_callback)
184 165
185 logging.info('Initialized ts_mon with service_name=%s, job_name=%s, ' 166 logging.info('Initialized ts_mon with service_name=%s, job_name=%s, '
186 'hostname=%s', service_name, job_name, hostname) 167 'hostname=%s', service_name, job_name, hostname)
187 168
188 169
189 def _instrumented_dispatcher(dispatcher, request, response, time_fn=time.time): 170 def _instrumented_dispatcher(dispatcher, request, response, time_fn=time.time):
190 start_time = time_fn() 171 start_time = time_fn()
191 response_status = 0 172 response_status = 0
192 interface.state.store.initialize_context() 173 interface.state.store.initialize_context()
193 flush_thread = None 174 flush_thread = threading.Thread(target=flush_metrics_if_needed)
194 time_now = time_fn() 175 flush_thread.start()
195 if need_to_flush_metrics(time_now):
196 flush_thread = threading.Thread(target=_flush_metrics, args=(time_now,))
197 flush_thread.start()
198 try: 176 try:
199 ret = dispatcher(request, response) 177 ret = dispatcher(request, response)
200 except webapp2.HTTPException as ex: 178 except webapp2.HTTPException as ex:
201 response_status = ex.code 179 response_status = ex.code
202 raise 180 raise
203 except Exception: 181 except Exception:
204 response_status = 500 182 response_status = 500
205 raise 183 raise
206 else: 184 else:
207 if isinstance(ret, webapp2.Response): 185 if isinstance(ret, webapp2.Response):
208 response = ret 186 response = ret
209 response_status = response.status_int 187 response_status = response.status_int
210 finally: 188 finally:
211 if flush_thread: 189 flush_thread.join()
212 flush_thread.join()
213 elapsed_ms = int((time_fn() - start_time) * 1000) 190 elapsed_ms = int((time_fn() - start_time) * 1000)
214 191
215 # Use the route template regex, not the request path, to prevent an 192 fields = {'status': response_status, 'name': '', 'is_robot': False}
216 # explosion in possible field values. 193 if request.route is not None:
217 name = request.route.template if request.route is not None else '' 194 # Use the route template regex, not the request path, to prevent an
195 # explosion in possible field values.
196 fields['name'] = request.route.template
197 if request.user_agent is not None:
198 # We must not log user agents, but we can store whether or not the
199 # user agent string indicates that the requester was a Google bot.
200 fields['is_robot'] = (
201 'GoogleBot' in request.user_agent or
202 'GoogleSecurityScanner' in request.user_agent or
203 request.user_agent == 'B3M/prober')
218 204
219 http_metrics.update_http_server_metrics( 205 http_metrics.server_durations.add(elapsed_ms, fields=fields)
220 name, response_status, elapsed_ms, 206 http_metrics.server_response_status.increment(fields=fields)
221 request_size=request.content_length, 207 if request.content_length is not None:
222 response_size=response.content_length, 208 http_metrics.server_request_bytes.add(request.content_length,
223 user_agent=request.user_agent) 209 fields=fields)
224 210 if response.content_length is not None: # pragma: no cover
211 http_metrics.server_response_bytes.add(response.content_length,
212 fields=fields)
225 return ret 213 return ret
226 214
227 215
228 def instrument_wsgi_application(app, time_fn=time.time): 216 def instrument_wsgi_application(app, time_fn=time.time):
229 # Don't instrument the same router twice. 217 # Don't instrument the same router twice.
230 if hasattr(app.router, '__instrumented_by_ts_mon'): 218 if hasattr(app.router, '__instrumented_by_ts_mon'):
231 return 219 return
232 220
233 old_dispatcher = app.router.dispatch 221 old_dispatcher = app.router.dispatch
234 222
235 def dispatch(router, request, response): 223 def dispatch(router, request, response):
236 return _instrumented_dispatcher(old_dispatcher, request, response, 224 return _instrumented_dispatcher(old_dispatcher, request, response,
237 time_fn=time_fn) 225 time_fn=time_fn)
238 226
239 app.router.set_dispatcher(dispatch) 227 app.router.set_dispatcher(dispatch)
240 app.router.__instrumented_by_ts_mon = True 228 app.router.__instrumented_by_ts_mon = True
241 229
242 230
243 def instrument_endpoint(time_fn=time.time): 231 def reset_for_unittest():
244 """Decorator to instrument Cloud Endpoint methods."""
245 def decorator(fn):
246 method_name = fn.__name__
247 assert method_name
248 @functools.wraps(fn)
249 def decorated(service, *args, **kwargs):
250 service_name = service.__class__.__name__
251 endpoint_name = '/_ah/spi/%s.%s' % (service_name, method_name)
252 start_time = time_fn()
253 response_status = 0
254 interface.state.store.initialize_context()
255 flush_thread = None
256 time_now = time_fn()
257 if need_to_flush_metrics(time_now):
258 flush_thread = threading.Thread(target=_flush_metrics, args=(time_now,))
259 flush_thread.start()
260 try:
261 ret = fn(service, *args, **kwargs)
262 response_status = 200
263 return ret
264 except endpoints.ServiceException as e:
265 response_status = e.http_status
266 raise
267 except Exception:
268 response_status = 500
269 raise
270 finally:
271 if flush_thread:
272 flush_thread.join()
273 elapsed_ms = int((time_fn() - start_time) * 1000)
274 http_metrics.update_http_server_metrics(
275 endpoint_name, response_status, elapsed_ms)
276 return decorated
277 return decorator
278
279
280 def reset_for_unittest(disable=False):
281 shared.reset_for_unittest() 232 shared.reset_for_unittest()
282 interface.reset_for_unittest(disable=disable) 233 interface.reset_for_unittest()
OLDNEW
« no previous file with comments | « appengine_module/gae_ts_mon/__init__.py ('k') | appengine_module/gae_ts_mon/test/config_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698