Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(111)

Side by Side Diff: appengine/findit/waterfall/flake/recursive_flake_try_job_pipeline.py

Issue 2630433002: Findit] Flake Checker: Pipeline to trigger try jobs to identify flake culprits (Closed)
Patch Set: Addressing comments Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 import logging
6
7 from google.appengine.ext import ndb
8
9 from gae_libs.http.http_client_appengine import HttpClientAppengine
10 from gae_libs.gitiles.cached_gitiles_repository import CachedGitilesRepository
11 from libs import time_util
12
13 from common import appengine_util
14 from common import constants
15 from common.pipeline_wrapper import BasePipeline
16 from common.pipeline_wrapper import pipeline
17 from common.waterfall import failure_type
18 from model import analysis_status
19 from model import result_status
20 from model.flake.flake_culprit import FlakeCulprit
21 from model.flake.flake_try_job import FlakeTryJob
22 from waterfall.flake.process_flake_try_job_result_pipeline import (
23 ProcessFlakeTryJobResultPipeline)
24 from waterfall.flake.schedule_flake_try_job_pipeline import (
25 ScheduleFlakeTryJobPipeline)
26 from waterfall.monitor_try_job_pipeline import MonitorTryJobPipeline
27
28
29 # TODO(lijeffrey): The lookback algorithms for RecursiveFlakePipeline and
30 # RecursiveFlakeTryJob are to be identical. Refactor both files to use a base
31 # algorithm.
32
33
34 _GIT_REPO = CachedGitilesRepository(
35 HttpClientAppengine(),
36 'https://chromium.googlesource.com/chromium/src.git')
37
38
39 def _CreateCulprit(revision, commit_position, repo_name='chromium'):
40 """Sets culprit information."""
41 change_log = _GIT_REPO.GetChangeLog(revision)
42
43 if change_log:
44 url = change_log.code_review_url or change_log.commit_url
45 culprit = FlakeCulprit.Create(
46 repo_name, revision, commit_position, url)
47 else:
48 logging.error('Unable to retrieve change logs for %s', revision)
49 culprit = FlakeCulprit.Create(repo_name, revision, commit_position, None)
50
51 return culprit
52
53
54 def _UpdateAnalysisTryJobStatusUponCompletion(
55 flake_analysis, culprit, status, error):
56 flake_analysis.end_time = time_util.GetUTCNow()
57 flake_analysis.try_job_status = status
58
59 if error:
60 flake_analysis.error = error
61 elif culprit:
62 flake_analysis.culprit = culprit
63 flake_analysis.result_status = result_status.FOUND_UNTRIAGED
64 else:
65 flake_analysis.result_status = result_status.NOT_FOUND_UNTRIAGED
66
67 flake_analysis.put()
68
69
70 class RecursiveFlakeTryJobPipeline(BasePipeline):
71 """Starts a series of flake try jobs to identify the exact culprit."""
72
73 # Arguments number differs from overridden method - pylint: disable=W0221
74 def run(self, urlsafe_flake_analysis_key, commit_position, revision):
75 """Runs a try job at a revision to determine its flakiness.
76
77 Args:
78 urlsafe_flake_analysis_key (str): The urlsafe-key of the flake analysis
79 for which the try jobs are to analyze.
80 commit_position (int): The commit position corresponding to |revision| to
81 analyze.
82 revision (str): The revision to run the try job against corresponding to
83 |commit_position|.
84 """
85 flake_analysis = ndb.Key(urlsafe=urlsafe_flake_analysis_key).get()
86 assert flake_analysis
87
88 if (flake_analysis.error or
89 flake_analysis.status != analysis_status.COMPLETED):
90 # Don't start try-jobs if analysis at the build level did not complete
91 # successfully.
92 return
93
94 # TODO(lijeffrey): support force/rerun.
95
96 try_job = FlakeTryJob.Create(
97 flake_analysis.master_name, flake_analysis.builder_name,
98 flake_analysis.step_name, flake_analysis.test_name, revision)
99 try_job.put()
100
101 if flake_analysis.try_job_status is None: # pragma: no branch
102 flake_analysis.try_job_status = analysis_status.RUNNING
103 flake_analysis.put()
104
105 with pipeline.InOrder():
106 try_job_id = yield ScheduleFlakeTryJobPipeline(
107 flake_analysis.master_name, flake_analysis.builder_name,
108 flake_analysis.step_name, flake_analysis.test_name, revision)
109
110 try_job_result = yield MonitorTryJobPipeline(
111 try_job.key.urlsafe(), failure_type.FLAKY_TEST, try_job_id)
112
113 yield ProcessFlakeTryJobResultPipeline(
114 revision, commit_position, try_job_result, try_job.key.urlsafe(),
115 urlsafe_flake_analysis_key)
116
117 yield NextCommitPositionPipeline(
118 urlsafe_flake_analysis_key, try_job.key.urlsafe())
119
120
121 def _IsStable(pass_rate, lower_flake_threshold, upper_flake_threshold):
122 return (
123 pass_rate < lower_flake_threshold or pass_rate > upper_flake_threshold)
124
125
126 def _GetNextCommitPosition(data_points, flake_settings,
127 lower_boundary_commit_position):
128 """Finds the next commit_position to analyze, or gets final result.
129
130 Args:
131 data_points (list): Already-completed data points.
132 flake_settings (dict): Parameters for flakiness algorithm.
133 lower_boundary_commit_position (int): The commit position not to pass when
134 looking back.
135
136 Returns:
137 (next_commit_position, suspected_commit_position): The commit position of
138 the next revision to check and suspected commit position that that the
139 flakiness was introduced in. If next_commit_position needs to be
140 checked, suspected_commit_position will be None. If
141 suspected_commit_position is found, next_commit_position will be
142 None. If no findings eventually, both will be None.
143 """
144 lower_flake_threshold = flake_settings.get('lower_flake_threshold')
145 upper_flake_threshold = flake_settings.get('upper_flake_threshold')
146 max_stable_in_a_row = flake_settings.get('max_stable_in_a_row')
147 max_flake_in_a_row = flake_settings.get('max_flake_in_a_row')
148
149 stables_in_a_row = 0
150 flakes_in_a_row = 0
151 stables_happened = False
152 flakes_first = 0
153 flaked_out = False
154 next_commit_position = None
155
156 total_data_points = len(data_points)
157
158 for i in xrange(total_data_points):
159 pass_rate = data_points[i].pass_rate
160 commit_position = data_points[i].commit_position
161
162 if pass_rate < 0: # Test doesn't exist at this revision.
163 if flaked_out or flakes_first:
164 stables_in_a_row += 1
165 lower_boundary = data_points[i - stables_in_a_row + 1].commit_position
166 return lower_boundary + 1, None
167 else:
168 return None, None
169 elif _IsStable(pass_rate, lower_flake_threshold, upper_flake_threshold):
170 stables_in_a_row += 1
171 flakes_in_a_row = 0
172 stables_happened = True
173
174 if stables_in_a_row <= max_stable_in_a_row: # pragma: no cover.
175 # No stable region yet, keep searching.
176 next_commit_position = commit_position - 1
177 continue
178 # Stable region found.
179 if not flaked_out and not flakes_first: # pragma: no cover.
180 # Already stabled_out but no flake region yet, no findings.
181 return None, None
182
183 # Flake region is also found, ready for sequential search.
184 lower_boundary_index = i - stables_in_a_row + 1
185 lower_boundary = data_points[lower_boundary_index].commit_position
186 previous_commit_position = data_points[
187 lower_boundary_index - 1].commit_position
188
189 if previous_commit_position == lower_boundary + 1:
190 # Sequential search is Done.
191 return None, previous_commit_position
192 # Continue sequential search.
193 return lower_boundary + 1, None
194
195 else: # Flaky result.
196 flakes_in_a_row += 1
197 stables_in_a_row = 0
198
199 if flakes_in_a_row > max_flake_in_a_row: # Identified a flaky region.
200 flaked_out = True
201
202 if not stables_happened: # pragma: no branch
203 # No stables yet.
204 flakes_first += 1
205
206 if commit_position == lower_boundary_commit_position: # pragma: no branch
207 # The earliest commit_position to look back is already flaky. This is
208 # the culprit.
209 return None, commit_position
210
211 # Check the pass_rate of previous run, if this is the first data_point,
212 # consider the virtual previous run is stable.
213 previous_pass_rate = data_points[i - 1].pass_rate if i > 0 else 0
chanli 2017/01/13 06:01:52 The check for previous_pass_rate is essentially fo
214 if _IsStable(
215 previous_pass_rate, lower_flake_threshold, upper_flake_threshold):
216 next_commit_position = commit_position - flakes_in_a_row
217 continue
218
219
220 step_size = flakes_in_a_row
221 next_commit_position = commit_position - step_size
222 continue
223
224
225
226 if next_commit_position < lower_boundary_commit_position:
227 # Do not run past the bounds of the blame list.
228 return lower_boundary_commit_position, None
229
230 return next_commit_position, None
231
232
233 def _GetTryJobDataPoints(analysis):
234 """Gets which data points should be used to determine the next revision.
235
236 Args:
237 analysis (MasterFlakeAnalysis): The analysis entity to determine what data
238 points to run on.
239
240 Returns:
241 A list of data points used to analyze and determine what try job to trigger
242 next.
243 """
244 all_data_points = analysis.data_points
245
246 # Include the suspected build itself first, which already has a result.
247 data_points = [analysis.GetDataPointOfSuspectedBuild()]
248
249 for i in range(0, len(all_data_points)):
250 if all_data_points[i].try_job_url:
251 data_points.append(all_data_points[i])
252
253 return sorted(data_points, key=lambda k: k.commit_position, reverse=True)
254
255
256 class NextCommitPositionPipeline(BasePipeline):
257 """Returns the next index in the blame list to run a try job on."""
258
259 # Arguments number differs from overridden method - pylint: disable=W0221
260 def run(self, urlsafe_flake_analysis_key, urlsafe_try_job_key):
261 """Determines the next commit position to run a try job on.
262
263 Args:
264 urlsafe_flake_analysis_key (str): The url-safe key to the corresponding
265 flake analysis that triggered this pipeline.
266 urlsafe_try_job_key (str): The url-safe key to the try job that was just
267 run.
268 """
269 flake_analysis = ndb.Key(urlsafe=urlsafe_flake_analysis_key).get()
270 try_job = ndb.Key(urlsafe=urlsafe_try_job_key).get()
271 assert flake_analysis
272 assert try_job
273
274 # Don't call another pipeline if the previous try job failed.
275 if try_job.status == analysis_status.ERROR:
276 error = try_job.error or {
277 'error': 'Try job %s failed' % try_job.try_job_id,
278 'message': 'The last try job did not complete as expected'
279 }
280 _UpdateAnalysisTryJobStatusUponCompletion(
281 flake_analysis, None, analysis_status.ERROR, error)
282 return
283
284 # TODO(lijeffrey) Move parameters to config.
285 flake_settings = {
286 'lower_flake_threshold': 0.02,
287 'upper_flake_threshold': 0.98,
288 'max_flake_in_a_row': 1,
289 'max_stable_in_a_row': 0,
290 }
291
292 suspected_build_data_point = flake_analysis.GetDataPointOfSuspectedBuild()
293 lower_boundary_commit_position = (
294 suspected_build_data_point.previous_build_commit_position + 1)
295
296 # Because |suspected_build_data_point| already sets hard lower and upper
297 # bounds, only the data points involved in try jobs should be considered
298 # when determining the next commit position to test.
299 try_job_data_points = _GetTryJobDataPoints(flake_analysis)
300
301 # Figure out what commit position to trigger the next try job on, if any.
302 next_commit_position, suspected_commit_position = _GetNextCommitPosition(
303 try_job_data_points, flake_settings, lower_boundary_commit_position)
304
305 if (next_commit_position is None or
306 next_commit_position == suspected_build_data_point.commit_position):
307 # Finished.
308 if next_commit_position == suspected_build_data_point.commit_position:
309 suspected_commit_position = next_commit_position
310
311 culprit_revision = suspected_build_data_point.GetRevisionAtCommitPosition(
312 suspected_commit_position)
313 culprit = _CreateCulprit(culprit_revision, suspected_commit_position)
314 _UpdateAnalysisTryJobStatusUponCompletion(
315 flake_analysis, culprit, analysis_status.COMPLETED, None)
316 return
317
318 next_revision = suspected_build_data_point.GetRevisionAtCommitPosition(
319 next_commit_position)
320
321 pipeline_job = RecursiveFlakeTryJobPipeline(
322 urlsafe_flake_analysis_key, next_commit_position, next_revision)
323 pipeline_job.target = appengine_util.GetTargetNameForModule(
324 constants.WATERFALL_BACKEND)
325 pipeline_job.start()
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698