Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(431)

Side by Side Diff: appengine/findit/waterfall/flake/recursive_flake_try_job_pipeline.py

Issue 2630433002: Findit] Flake Checker: Pipeline to trigger try jobs to identify flake culprits (Closed)
Patch Set: . Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 import logging
6
7 from google.appengine.ext import ndb
8
9 from gae_libs.http.http_client_appengine import HttpClientAppengine
10 from gae_libs.gitiles.cached_gitiles_repository import CachedGitilesRepository
11 from libs import time_util
12
13 from common import appengine_util
14 from common import constants
15 from common.pipeline_wrapper import BasePipeline
16 from common.pipeline_wrapper import pipeline
17 from common.waterfall import failure_type
18 from model import analysis_status
19 from model import result_status
20 from model.flake.flake_culprit import FlakeCulprit
21 from model.flake.flake_try_job import FlakeTryJob
22 from waterfall.flake.process_flake_try_job_result_pipeline import (
23 ProcessFlakeTryJobResultPipeline)
24 from waterfall.monitor_try_job_pipeline import MonitorTryJobPipeline
chanli 2017/01/13 00:47:33 Nit: Move this after ln 25.
lijeffrey 2017/01/13 01:39:24 Done.
25 from waterfall.flake.schedule_flake_try_job_pipeline import (
26 ScheduleFlakeTryJobPipeline)
27
28
29 # TODO(lijeffrey): The lookback algorithms for RecursiveFlakePipeline and
30 # RecursiveFlakeTryJob are to be identical. Refactor both files to use a base
31 # algorithm.
32
33
34 _GIT_REPO = CachedGitilesRepository(
35 HttpClientAppengine(),
36 'https://chromium.googlesource.com/chromium/src.git')
37
38
39 def _CreateCulprit(revision, commit_position, repo_name='chromium'):
chanli 2017/01/13 00:47:33 Naming Nit: maybe change the function name to Crea
lijeffrey 2017/01/13 01:39:24 CulpritInfo sounds like a dict representation whic
40 """Sets culprit information."""
41 change_log = _GIT_REPO.GetChangeLog(revision)
42
43 if change_log:
44 url = change_log.code_review_url or change_log.commit_url
45 culprit = FlakeCulprit.Create(
46 repo_name, revision, commit_position, url)
47 else:
48 logging.error('Unable to retrieve change logs for %s', revision)
49 culprit = FlakeCulprit.Create(repo_name, revision, commit_position, None)
50
51 return culprit
52
53
54 def _UpdateAnalysisTryJobStatusUponCompletion(
55 flake_analysis, culprit, status, error):
56 flake_analysis.end_time = time_util.GetUTCNow()
57 flake_analysis.try_job_status = status
58
59 if error:
60 flake_analysis.error = error
61 elif culprit:
62 flake_analysis.culprit = culprit
63 flake_analysis.result_status = result_status.FOUND_UNTRIAGED
64 else:
65 flake_analysis.result_status = result_status.NOT_FOUND_UNTRIAGED
66
67 flake_analysis.put()
68
69
70 class RecursiveFlakeTryJobPipeline(BasePipeline):
71 """Starts a series of flake try jobs to identify the exact culprit."""
72
73 # Arguments number differs from overridden method - pylint: disable=W0221
74 def run(self, urlsafe_flake_analysis_key, commit_position, revision):
75 """Runs a try job at a revision to determine its flakiness.
76
77 Args:
78 urlsafe_flake_analysis_key (str): The urlsafe-key of the flake analysis
79 for which the try jobs are to analyze.
80 commit_position (int): The commit position corresponding to |revision| to
81 analyze.
82 revision (str): The revision to run the try job against corresponding to
83 |commit_position|.
84 """
85 flake_analysis = ndb.Key(urlsafe=urlsafe_flake_analysis_key).get()
86 assert flake_analysis
87
88 if (flake_analysis.error or
89 flake_analysis.status != analysis_status.COMPLETED):
90 # Don't start start try-jobs if the flake swarming tasks had error or are
chanli 2017/01/13 00:47:33 Nit: two start
lijeffrey 2017/01/13 01:39:24 Done.
91 # not done yet.
92 return
93
94 try_job = FlakeTryJob.Get(
95 flake_analysis.master_name, flake_analysis.builder_name,
96 flake_analysis.step_name, flake_analysis.test_name, revision)
97
98 if try_job: # pragma: no cover
chanli 2017/01/13 00:47:33 Any reason to add no cover here? Will add # pragm
lijeffrey 2017/01/13 01:39:24 This case actually should not happen for flake try
99 if try_job.failed:
100 try_job.status = analysis_status.PENDING
101 try_job.put()
102 else:
103 try_job = FlakeTryJob.Create(
104 flake_analysis.master_name, flake_analysis.builder_name,
105 flake_analysis.step_name, flake_analysis.test_name, revision)
106 try_job.put()
107
108 if flake_analysis.try_job_status is None: # pragma: no branch
109 flake_analysis.try_job_status = analysis_status.RUNNING
chanli 2017/01/13 00:47:33 RUNNING or PENDING?
lijeffrey 2017/01/13 01:39:23 As discussed before try job status at the analysis
110 flake_analysis.put()
111
112 with pipeline.InOrder():
113 try_job_id = yield ScheduleFlakeTryJobPipeline(
114 flake_analysis.master_name, flake_analysis.builder_name,
115 flake_analysis.step_name, flake_analysis.test_name, revision)
116
117 try_job_result = yield MonitorTryJobPipeline(
118 try_job.key.urlsafe(), failure_type.FLAKY_TEST, try_job_id)
119
120 yield ProcessFlakeTryJobResultPipeline(
121 revision, commit_position, try_job_result, try_job.key.urlsafe(),
122 urlsafe_flake_analysis_key)
123
124 yield NextCommitPositionPipeline(
125 urlsafe_flake_analysis_key, try_job.key.urlsafe())
126
127
128 def _IsStable(pass_rate, lower_flake_threshold, upper_flake_threshold):
129 return (
130 pass_rate < lower_flake_threshold or pass_rate > upper_flake_threshold)
131
132
133 def _GetNextCommitPosition(data_points, flake_settings,
134 lower_boundary_commit_position):
135 """Finds the next commit_position to analyze, or gets final result.
136
137 Args:
138 data_points (list): Already-completed data points.
139 flake_settings (dict): Parameters for flakiness algorithm.
140 lower_boundary_commit_position (int): The commit position not to pass when
141 looking back.
142
143 Returns:
144 (next_commit_position, suspected_commit_position): The commit position of
145 the next revision to check and suspected commit position that that the
146 flakiness was introduced in. If next_commit_position needs to be
147 checked, suspected_commit_position will be None. If
148 suspected_commit_position is found, next_commit_position will be
149 None. If no findings eventually, both will be None.
150 """
151 lower_flake_threshold = flake_settings.get('lower_flake_threshold')
152 upper_flake_threshold = flake_settings.get('upper_flake_threshold')
153 max_stable_in_a_row = flake_settings.get('max_stable_in_a_row')
154 max_flake_in_a_row = flake_settings.get('max_flake_in_a_row')
155 max_dive_in_a_row = flake_settings.get('max_dive_in_a_row')
156 dive_rate_threshold = flake_settings.get('dive_rate_threshold')
157
158 stables_in_a_row = 0
159 flakes_in_a_row = 0
160 dives_in_a_row = 0
161 stables_happened = False
162 flakes_first = 0
163 flaked_out = False
164 next_commit_position = None
165
166 total_data_points = len(data_points)
167
168 for i in xrange(total_data_points):
169 pass_rate = data_points[i].pass_rate
170 commit_position = data_points[i].commit_position
171
172 if pass_rate < 0: # Test doesn't exist at this revision.
173 if flaked_out or flakes_first:
174 stables_in_a_row += 1
175 lower_boundary = data_points[i - stables_in_a_row + 1].commit_position
176 return lower_boundary + 1, None
177 else:
178 return None, None
179 elif _IsStable(pass_rate, lower_flake_threshold, upper_flake_threshold):
180 stables_in_a_row += 1
181 flakes_in_a_row = 0
182 dives_in_a_row = 0
183 stables_happened = True
184
185 # These cases are not needed for try jobs.
chanli 2017/01/13 00:47:33 What cases are you referring to? "if stables_in_a_
lijeffrey 2017/01/13 01:39:24 For analysis at the revision level, there is no "f
186 if stables_in_a_row <= max_stable_in_a_row: # pragma: no cover.
187 # No stable region yet, keep searching.
188 next_commit_position = commit_position - 1
189 continue
190 # Stable region found.
191 if not flaked_out and not flakes_first: # pragma: no cover.
192 # Already stabled_out but no flake region yet, no findings.
193 return None, None
194
195 # Flake region is also found, ready for sequential search.
196 lower_boundary_index = i - stables_in_a_row + 1
197 lower_boundary = data_points[lower_boundary_index].commit_position
198 previous_commit_position = data_points[
199 lower_boundary_index - 1].commit_position
200
201 if previous_commit_position == lower_boundary + 1:
202 # Sequential search is Done.
203 return None, previous_commit_position
204 # Continue sequential search.
205 return lower_boundary + 1, None
206
207 else: # Flaky result.
208 flakes_in_a_row += 1
209 stables_in_a_row = 0
210
211 if flakes_in_a_row > max_flake_in_a_row: # Identified a flaky region.
212 flaked_out = True
213
214 if not stables_happened: # pragma: no branch
215 # No stables yet.
216 flakes_first += 1
217
218 if commit_position == lower_boundary_commit_position: # pragma: no branch
chanli 2017/01/13 00:47:34 Just double check: this means we don't handle dive
lijeffrey 2017/01/13 01:39:24 That's correct, removed.
219 # The earliest commit_position to look back is already flaky. This is
220 # the culprit.
221 return None, commit_position
222
223 # Check the pass_rate of previous run, if this is the first data_point,
224 # consider the virtual previous run is stable.
225 previous_pass_rate = data_points[i - 1].pass_rate if i > 0 else 0
226 if _IsStable(
227 previous_pass_rate, lower_flake_threshold, upper_flake_threshold):
228 next_commit_position = commit_position - flakes_in_a_row
229 continue
230
231 # Checks for dives. A dive is a sudden drop in pass rate.
232 if pass_rate - previous_pass_rate > dive_rate_threshold:
chanli 2017/01/13 00:47:34 If we don't handle dive, maybe we can remove this
lijeffrey 2017/01/13 01:39:24 Done.
233 # Possibly a dive just happened.
234 # Set dives_in_a_row to one since this is the first sign of diving.
235 # For cases where we have pass rates like 0.1, 0.51, 0.92, we will use
236 # the earliest dive.
237 dives_in_a_row = 1
238 elif previous_pass_rate - pass_rate > dive_rate_threshold:
239 # A rise just happened, sets dives_in_a_row back to 0.
240 dives_in_a_row = 0
241 else:
242 # Two last results are close, increases dives_in_a_row if not 0.
243 dives_in_a_row = dives_in_a_row + 1 if dives_in_a_row else 0
244
245 if dives_in_a_row <= max_dive_in_a_row:
246 step_size = 1 if dives_in_a_row else flakes_in_a_row
247 next_commit_position = commit_position - step_size
248 continue
249
250 # Dived out.
251 # Flake region must have been found, ready for sequential search.
252 lower_boundary_index = i - dives_in_a_row + 1
253 lower_boundary = data_points[lower_boundary_index].commit_position
254 commit_after_lower_boundary = (
255 data_points[lower_boundary_index - 1].commit_position)
256
257 if commit_after_lower_boundary == lower_boundary + 1:
258 # Sequential search is Done.
259 return None, commit_after_lower_boundary
260 # Sequential search.
261 return lower_boundary + 1, None
262
263 if next_commit_position < lower_boundary_commit_position:
264 # Do not run past the bounds of the blame list.
265 return lower_boundary_commit_position, None
266
267 return next_commit_position, None
268
269
270 def _GetTryJobDataPoints(analysis):
271 """Gets which data points should be used to determine the next revision.
272
273 Args:
274 all_data_points (list): A list of data points already analyzed and stored
275 in a MasterFlakeAnalysis entity.
chanli 2017/01/13 00:47:33 Docstring needs to be updated
lijeffrey 2017/01/13 01:39:23 Done.
276
277 Returns:
278 A list of data points used to analyze and determine what try job to trigger
279 next.
280 """
281 all_data_points = analysis.data_points
282
283 # Include the suspected build itself first, which already has a result.
284 data_points = [analysis.GetDataPointOfSuspectedBuild()]
285
286 for i in range(0, len(all_data_points)):
287 if all_data_points[i].try_job_id is not None:
chanli 2017/01/13 00:47:33 Nit: if all_data_points[i].try_job_id should also
lijeffrey 2017/01/13 01:39:24 Done.
288 data_points.append(all_data_points[i])
289
290 return sorted(data_points, key=lambda k: k.commit_position, reverse=True)
291
292
293 class NextCommitPositionPipeline(BasePipeline):
294 """Returns the next index in the blame list to run a try job on."""
295
296 # Arguments number differs from overridden method - pylint: disable=W0221
297 def run(self, urlsafe_flake_analysis_key, urlsafe_try_job_key):
298 """Determines the next commit position to run a try job on.
299
300 Args:
301 urlsafe_flake_analysis_key (str): The url-safe key to the corresponding
302 flake analysis that triggered this pipeline.
303 urlsafe_try_job_key (str): The url-safe key to the try job that was just
304 run.
305 """
306 flake_analysis = ndb.Key(urlsafe=urlsafe_flake_analysis_key).get()
307 try_job = ndb.Key(urlsafe=urlsafe_try_job_key).get()
308 assert flake_analysis
309 assert try_job
310
311 # Don't call another pipeline if the previous try job failed.
312 if try_job.status == analysis_status.ERROR:
313 error = try_job.error or {
314 'error': 'Try job %s failed' % try_job.try_job_id,
315 'message': 'The last try job did not complete as expected'
316 }
317 _UpdateAnalysisTryJobStatusUponCompletion(
318 flake_analysis, None, analysis_status.ERROR, error)
319 return
320
321 # TODO(lijeffrey) Move parameters to config.
322 flake_settings = {
323 'lower_flake_threshold': 0.02,
324 'upper_flake_threshold': 0.98,
325 'max_flake_in_a_row': 1,
326 'max_stable_in_a_row': 0,
327 'max_dive_in_a_row': 4,
chanli 2017/01/13 00:47:34 If don't handle dive for revision level, maybe we
lijeffrey 2017/01/13 01:39:23 Done.
328 'dive_rate_threshold': 0.4,
329 }
330
331 suspected_build_data_point = flake_analysis.GetDataPointOfSuspectedBuild()
332 lower_boundary_commit_position = (
333 suspected_build_data_point.previous_build_commit_position + 1)
334
335 # Because |suspected_build_data_point| already sets hard lower and upper
336 # bounds, only the data points involved in try jobs should be considered
337 # when determining the next commit position to test.
338 try_job_data_points = _GetTryJobDataPoints(flake_analysis)
339
340 # Figure out what commit position to trigger the next try job on, if any.
341 next_commit_position, suspected_commit_position = _GetNextCommitPosition(
342 try_job_data_points, flake_settings, lower_boundary_commit_position)
343
344 if (next_commit_position is None or
345 next_commit_position >= suspected_build_data_point.commit_position):
chanli 2017/01/13 00:47:34 In what situation next_commit_position can be grea
lijeffrey 2017/01/13 01:39:24 Changed to ==.
346 # Finished.
347 if next_commit_position == suspected_build_data_point.commit_position:
348 suspected_commit_position = next_commit_position
349
350 culprit_revision = suspected_build_data_point.GetRevisionAtCommitPosition(
351 suspected_commit_position)
352 culprit = _CreateCulprit(culprit_revision, suspected_commit_position)
353 _UpdateAnalysisTryJobStatusUponCompletion(
354 flake_analysis, culprit, analysis_status.COMPLETED, None)
355 return
356
357 next_revision = suspected_build_data_point.GetRevisionAtCommitPosition(
358 next_commit_position)
359
360 pipeline_job = RecursiveFlakeTryJobPipeline(
361 urlsafe_flake_analysis_key, next_commit_position, next_revision)
362 pipeline_job.target = appengine_util.GetTargetNameForModule(
363 constants.WATERFALL_BACKEND)
364 pipeline_job.start()
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698