Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(107)

Side by Side Diff: appengine/findit/waterfall/monitor_try_job_pipeline.py

Issue 2260313003: [Findit] Capture and store latest buildbucket response with each query (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Addressing comments Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | appengine/findit/waterfall/test/monitor_try_job_pipeline_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright 2015 The Chromium Authors. All rights reserved. 1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 from datetime import datetime 5 from datetime import datetime
6 import json 6 import json
7 import time 7 import time
8 8
9 from google.appengine.ext import ndb 9 from google.appengine.ext import ndb
10 10
(...skipping 127 matching lines...) Expand 10 before | Expand all | Expand 10 after
138 error_dict, error_code = _GetError( 138 error_dict, error_code = _GetError(
139 buildbucket_response, buildbucket_error, timed_out) 139 buildbucket_response, buildbucket_error, timed_out)
140 140
141 if error_dict: 141 if error_dict:
142 try_job_data.error = error_dict 142 try_job_data.error = error_dict
143 try_job_data.error_code = error_code 143 try_job_data.error_code = error_code
144 144
145 try_job_data.put() 145 try_job_data.put()
146 146
147 147
148 def _DictsAreEqual(dict_1, dict_2, exclude_keys=None):
149 if dict_1 == dict_2:
150 return True
151
152 if dict_1 is None or dict_2 is None:
153 return False
154
155 if exclude_keys is None:
156 exclude_keys = []
157
158 for key, value in dict_1.iteritems():
159 if key not in exclude_keys and (key not in dict_2 or dict_2[key] != value):
160 return False
161
162 for key, value in dict_2.iteritems():
163 if key not in exclude_keys and (key not in dict_1 or dict_1[key] != value):
164 return False
165
166 return True
167
168
169 def _UpdateLastBuildbucketResponse(try_job_data, build):
170 if not build or not build.response:
171 return
172
173 if not _DictsAreEqual(try_job_data.last_buildbucket_response,
174 build.response, exclude_keys=['utcnow_ts']):
175 try_job_data.last_buildbucket_response = build.response
176 try_job_data.put()
177
178
148 class MonitorTryJobPipeline(BasePipeline): 179 class MonitorTryJobPipeline(BasePipeline):
149 """A pipeline for monitoring a try job and recording results when it's done. 180 """A pipeline for monitoring a try job and recording results when it's done.
150 181
151 The result will be stored to compile_results or test_results according to 182 The result will be stored to compile_results or test_results according to
152 which type of build failure we are running try job for. 183 which type of build failure we are running try job for.
153 """ 184 """
154 185
155 UNKNOWN = 'UNKNOWN' 186 UNKNOWN = 'UNKNOWN'
156 187
157 @ndb.transactional 188 @ndb.transactional
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
199 allowed_response_error_times = max_error_times 230 allowed_response_error_times = max_error_times
200 231
201 # TODO(chanli): Make sure total wait time equals to timeout_hours 232 # TODO(chanli): Make sure total wait time equals to timeout_hours
202 # regardless of retries. 233 # regardless of retries.
203 deadline = time.time() + timeout_hours * 60 * 60 234 deadline = time.time() + timeout_hours * 60 * 60
204 try_job_data = WfTryJobData.Get(try_job_id) 235 try_job_data = WfTryJobData.Get(try_job_id)
205 already_set_started = False 236 already_set_started = False
206 start_time = None 237 start_time = None
207 while True: 238 while True:
208 error, build = buildbucket_client.GetTryJobs([try_job_id])[0] 239 error, build = buildbucket_client.GetTryJobs([try_job_id])[0]
240
209 if error: 241 if error:
210 if allowed_response_error_times > 0: 242 if allowed_response_error_times > 0:
211 allowed_response_error_times -= 1 243 allowed_response_error_times -= 1
212 pipeline_wait_seconds += default_pipeline_wait_seconds 244 pipeline_wait_seconds += default_pipeline_wait_seconds
213 else: # pragma: no cover 245 else: # pragma: no cover
214 # Buildbucket has responded error more than 5 times, retry pipeline. 246 # Buildbucket has responded error more than 5 times, retry pipeline.
215 _UpdateTryJobMetadata(try_job_data, start_time, build, error, False) 247 _UpdateTryJobMetadata(try_job_data, start_time, build, error, False)
216 raise pipeline.Retry( 248 raise pipeline.Retry(
217 'Error "%s" occurred. Reason: "%s"' % (error.message, 249 'Error "%s" occurred. Reason: "%s"' % (error.message,
218 error.reason)) 250 error.reason))
(...skipping 27 matching lines...) Expand all
246 278
247 already_set_started = True 279 already_set_started = True
248 280
249 if time.time() > deadline: # pragma: no cover 281 if time.time() > deadline: # pragma: no cover
250 _UpdateTryJobMetadata(try_job_data, start_time, build, error, True) 282 _UpdateTryJobMetadata(try_job_data, start_time, build, error, True)
251 # Explicitly abort the whole pipeline. 283 # Explicitly abort the whole pipeline.
252 raise pipeline.Abort( 284 raise pipeline.Abort(
253 'Try job %s timed out after %d hours.' % ( 285 'Try job %s timed out after %d hours.' % (
254 try_job_id, timeout_hours)) 286 try_job_id, timeout_hours))
255 287
288 # Ensure last_buildbucket_response is always the most recent
289 # whenever available during intermediate queries.
290 _UpdateLastBuildbucketResponse(try_job_data, build)
291
256 time.sleep(pipeline_wait_seconds) # pragma: no cover 292 time.sleep(pipeline_wait_seconds) # pragma: no cover
OLDNEW
« no previous file with comments | « no previous file | appengine/findit/waterfall/test/monitor_try_job_pipeline_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698