Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(120)

Side by Side Diff: appengine/findit/waterfall/monitor_try_job_pipeline.py

Issue 2605803002: [Findit] Refactoring WfTryJobData into BaseTryJobData, WfTryJobData, and FlakeTryJobData (Closed)
Patch Set: Fixing code coverage Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2015 The Chromium Authors. All rights reserved. 1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 import json 5 import json
6 import time 6 import time
7 7
8 from google.appengine.ext import ndb 8 from google.appengine.ext import ndb
9 9
10 from common.pipeline_wrapper import BasePipeline 10 from common.pipeline_wrapper import BasePipeline
11 from common.pipeline_wrapper import pipeline 11 from common.pipeline_wrapper import pipeline
12 from common.waterfall import buildbucket_client 12 from common.waterfall import buildbucket_client
13 from common.waterfall import failure_type 13 from common.waterfall import failure_type
14 from common.waterfall import try_job_error 14 from common.waterfall import try_job_error
15 from common.waterfall.buildbucket_client import BuildbucketBuild 15 from common.waterfall.buildbucket_client import BuildbucketBuild
16 from libs import time_util 16 from libs import time_util
17 from model import analysis_status 17 from model import analysis_status
18 from model.flake.flake_try_job_data import FlakeTryJobData
18 from model.wf_try_job_data import WfTryJobData 19 from model.wf_try_job_data import WfTryJobData
19 from waterfall import waterfall_config 20 from waterfall import waterfall_config
20 21
21 22
22 def _GetError(buildbucket_response, buildbucket_error, timed_out): 23 def _GetError(buildbucket_response, buildbucket_error, timed_out):
23 """Determines whether or not a try job error occurred. 24 """Determines whether or not a try job error occurred.
24 25
25 Args: 26 Args:
26 buildbucket_response: A dict of the json response from buildbucket. 27 buildbucket_response: A dict of the json response from buildbucket.
27 buildbucket_error: A BuildBucketError object returned from the call to 28 buildbucket_error: A BuildBucketError object returned from the call to
(...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after
100 { 101 {
101 'message': 'No result report was found.', 102 'message': 'No result report was found.',
102 'reason': MonitorTryJobPipeline.UNKNOWN 103 'reason': MonitorTryJobPipeline.UNKNOWN
103 }, 104 },
104 try_job_error.UNKNOWN 105 try_job_error.UNKNOWN
105 ) 106 )
106 107
107 return None, None 108 return None, None
108 109
109 110
110 def _UpdateTryJobMetadata(try_job_data, start_time, buildbucket_build, 111 def _UpdateTryJobMetadata(try_job_data, try_job_type, start_time,
111 buildbucket_error, timed_out): 112 buildbucket_build, buildbucket_error, timed_out):
112 buildbucket_response = {} 113 buildbucket_response = {}
113 114
114 if buildbucket_build: 115 if buildbucket_build:
115 try_job_data.request_time = ( 116 try_job_data.request_time = (
116 try_job_data.request_time or 117 try_job_data.request_time or
117 time_util.MicrosecondsToDatetime(buildbucket_build.request_time)) 118 time_util.MicrosecondsToDatetime(buildbucket_build.request_time))
118 # If start_time is unavailable, fallback to request_time. 119 # If start_time is unavailable, fallback to request_time.
119 try_job_data.start_time = start_time or try_job_data.request_time 120 try_job_data.start_time = start_time or try_job_data.request_time
120 try_job_data.end_time = time_util.MicrosecondsToDatetime( 121 try_job_data.end_time = time_util.MicrosecondsToDatetime(
121 buildbucket_build.end_time) 122 buildbucket_build.end_time)
122 try_job_data.number_of_commits_analyzed = len( 123
123 buildbucket_build.report.get('result', {})) 124 if try_job_type != failure_type.FLAKY_TEST: # pragma: no branch
124 try_job_data.regression_range_size = buildbucket_build.report.get( 125 try_job_data.number_of_commits_analyzed = len(
125 'metadata', {}).get('regression_range_size') 126 buildbucket_build.report.get('result', {}))
127 try_job_data.regression_range_size = buildbucket_build.report.get(
128 'metadata', {}).get('regression_range_size')
129
126 try_job_data.try_job_url = buildbucket_build.url 130 try_job_data.try_job_url = buildbucket_build.url
127 buildbucket_response = buildbucket_build.response 131 buildbucket_response = buildbucket_build.response
128 try_job_data.last_buildbucket_response = buildbucket_response 132 try_job_data.last_buildbucket_response = buildbucket_response
129 133
130 error_dict, error_code = _GetError( 134 error_dict, error_code = _GetError(
131 buildbucket_response, buildbucket_error, timed_out) 135 buildbucket_response, buildbucket_error, timed_out)
132 136
133 if error_dict: 137 if error_dict:
134 try_job_data.error = error_dict 138 try_job_data.error = error_dict
135 try_job_data.error_code = error_code 139 try_job_data.error_code = error_code
(...skipping 93 matching lines...) Expand 10 before | Expand all | Expand 10 after
229 233
230 timeout_hours = waterfall_config.GetTryJobSettings().get( 234 timeout_hours = waterfall_config.GetTryJobSettings().get(
231 'job_timeout_hours') 235 'job_timeout_hours')
232 default_pipeline_wait_seconds = waterfall_config.GetTryJobSettings().get( 236 default_pipeline_wait_seconds = waterfall_config.GetTryJobSettings().get(
233 'server_query_interval_seconds') 237 'server_query_interval_seconds')
234 max_error_times = waterfall_config.GetTryJobSettings().get( 238 max_error_times = waterfall_config.GetTryJobSettings().get(
235 'allowed_response_error_times') 239 'allowed_response_error_times')
236 pipeline_wait_seconds = default_pipeline_wait_seconds 240 pipeline_wait_seconds = default_pipeline_wait_seconds
237 allowed_response_error_times = max_error_times 241 allowed_response_error_times = max_error_times
238 242
243 if try_job_type == failure_type.FLAKY_TEST:
244 try_job_data = FlakeTryJobData.Get(try_job_id)
245 else:
246 try_job_data = WfTryJobData.Get(try_job_id)
247
239 # TODO(chanli): Make sure total wait time equals to timeout_hours 248 # TODO(chanli): Make sure total wait time equals to timeout_hours
240 # regardless of retries. 249 # regardless of retries.
241 deadline = time.time() + timeout_hours * 60 * 60 250 deadline = time.time() + timeout_hours * 60 * 60
242 try_job_data = WfTryJobData.Get(try_job_id)
243 already_set_started = False 251 already_set_started = False
244 start_time = None 252 start_time = None
245 while True: 253 while True:
246 error, build = buildbucket_client.GetTryJobs([try_job_id])[0] 254 error, build = buildbucket_client.GetTryJobs([try_job_id])[0]
247 255
248 if error: 256 if error:
249 if allowed_response_error_times > 0: 257 if allowed_response_error_times > 0:
250 allowed_response_error_times -= 1 258 allowed_response_error_times -= 1
251 pipeline_wait_seconds += default_pipeline_wait_seconds 259 pipeline_wait_seconds += default_pipeline_wait_seconds
252 else: # pragma: no cover 260 else: # pragma: no cover
253 # Buildbucket has responded error more than 5 times, retry pipeline. 261 # Buildbucket has responded error more than 5 times, retry pipeline.
254 _UpdateTryJobMetadata(try_job_data, start_time, build, error, False) 262 _UpdateTryJobMetadata(
263 try_job_data, try_job_type, start_time, build, error, False)
255 raise pipeline.Retry( 264 raise pipeline.Retry(
256 'Error "%s" occurred. Reason: "%s"' % (error.message, 265 'Error "%s" occurred. Reason: "%s"' % (error.message,
257 error.reason)) 266 error.reason))
258 elif build.status == BuildbucketBuild.COMPLETED: 267 elif build.status == BuildbucketBuild.COMPLETED:
259 _UpdateTryJobMetadata(try_job_data, start_time, build, error, False) 268 _UpdateTryJobMetadata(
269 try_job_data, try_job_type, start_time, build, error, False)
260 result_to_update = self._UpdateTryJobResult( 270 result_to_update = self._UpdateTryJobResult(
261 urlsafe_try_job_key, try_job_type, try_job_id, build.url, 271 urlsafe_try_job_key, try_job_type, try_job_id, build.url,
262 BuildbucketBuild.COMPLETED, build.report) 272 BuildbucketBuild.COMPLETED, build.report)
263 return result_to_update[-1] 273 return result_to_update[-1]
264 else: 274 else:
265 if allowed_response_error_times < max_error_times: 275 if allowed_response_error_times < max_error_times:
266 # Recovers from errors. 276 # Recovers from errors.
267 allowed_response_error_times = max_error_times 277 allowed_response_error_times = max_error_times
268 pipeline_wait_seconds = default_pipeline_wait_seconds 278 pipeline_wait_seconds = default_pipeline_wait_seconds
269 if build.status == BuildbucketBuild.STARTED and not already_set_started: 279 if build.status == BuildbucketBuild.STARTED and not already_set_started:
270 # It is possible this branch is skipped if a fast build goes from 280 # It is possible this branch is skipped if a fast build goes from
271 # 'SCHEDULED' to 'COMPLETED' between queries, so start_time may be 281 # 'SCHEDULED' to 'COMPLETED' between queries, so start_time may be
272 # unavailable. 282 # unavailable.
273 start_time = time_util.MicrosecondsToDatetime(build.updated_time) 283 start_time = time_util.MicrosecondsToDatetime(build.updated_time)
274 self._UpdateTryJobResult( 284 self._UpdateTryJobResult(
275 urlsafe_try_job_key, try_job_type, try_job_id, build.url, 285 urlsafe_try_job_key, try_job_type, try_job_id, build.url,
276 BuildbucketBuild.STARTED) 286 BuildbucketBuild.STARTED)
277 287
278 # Update as much try job metadata as soon as possible to avoid data 288 # Update as much try job metadata as soon as possible to avoid data
279 # loss in case of errors. 289 # loss in case of errors.
280 try_job_data.start_time = start_time 290 try_job_data.start_time = start_time
281 try_job_data.request_time = ( 291 try_job_data.request_time = (
282 time_util.MicrosecondsToDatetime(build.request_time)) 292 time_util.MicrosecondsToDatetime(build.request_time))
283 try_job_data.try_job_url = build.url 293 try_job_data.try_job_url = build.url
284 try_job_data.put() 294 try_job_data.put()
285 295
286 already_set_started = True 296 already_set_started = True
287 297
288 if time.time() > deadline: # pragma: no cover 298 if time.time() > deadline: # pragma: no cover
289 _UpdateTryJobMetadata(try_job_data, start_time, build, error, True) 299 _UpdateTryJobMetadata(
300 try_job_data, try_job_type, start_time, build, error, True)
290 # Explicitly abort the whole pipeline. 301 # Explicitly abort the whole pipeline.
291 raise pipeline.Abort( 302 raise pipeline.Abort(
292 'Try job %s timed out after %d hours.' % ( 303 'Try job %s timed out after %d hours.' % (
293 try_job_id, timeout_hours)) 304 try_job_id, timeout_hours))
294 305
295 # Ensure last_buildbucket_response is always the most recent 306 # Ensure last_buildbucket_response is always the most recent
296 # whenever available during intermediate queries. 307 # whenever available during intermediate queries.
297 _UpdateLastBuildbucketResponse(try_job_data, build) 308 _UpdateLastBuildbucketResponse(try_job_data, build)
298 309
299 time.sleep(pipeline_wait_seconds) # pragma: no cover 310 time.sleep(pipeline_wait_seconds) # pragma: no cover
OLDNEW
« no previous file with comments | « appengine/findit/model/wf_try_job_data.py ('k') | appengine/findit/waterfall/schedule_compile_try_job_pipeline.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698