Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(128)

Unified Diff: appengine/findit/waterfall/flake/recursive_flake_pipeline.py

Issue 2369333002: [Findit] Capture versionized metadata for master_flake_analysis (Closed)
Patch Set: Fixing nits Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/findit/waterfall/flake/recursive_flake_pipeline.py
diff --git a/appengine/findit/waterfall/flake/recursive_flake_pipeline.py b/appengine/findit/waterfall/flake/recursive_flake_pipeline.py
index 4b1e0efa45d8f7963ca97d3ab60f5f20b76d4129..419955f07c2ebf4971ce59c1d548f9092a02db57 100644
--- a/appengine/findit/waterfall/flake/recursive_flake_pipeline.py
+++ b/appengine/findit/waterfall/flake/recursive_flake_pipeline.py
@@ -2,8 +2,11 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
+import logging
+
from common import appengine_util
from common import constants
+from common import time_util
from common.pipeline_wrapper import BasePipeline
from model import analysis_status
@@ -16,12 +19,22 @@ from waterfall.trigger_flake_swarming_task_pipeline import (
TriggerFlakeSwarmingTaskPipeline)
+def _UpdateAnalysisStatusUponCompletion(master_flake_analysis, status, error):
+ master_flake_analysis.completed_time = time_util.GetUTCNow()
+ master_flake_analysis.status = status
+
+ if error:
+ master_flake_analysis.error = error
+
+ master_flake_analysis.put()
+
+
class RecursiveFlakePipeline(BasePipeline):
# Arguments number differs from overridden method - pylint: disable=W0221
def run(self, master_name, builder_name, run_build_number, step_name,
- test_name, master_build_number, flakiness_algorithm_results_dict,
- queue_name=constants.DEFAULT_QUEUE):
+ test_name, version_number, master_build_number,
+ flakiness_algorithm_results_dict, queue_name=constants.DEFAULT_QUEUE):
"""Pipeline to determine the regression range of a flaky test.
Args:
@@ -30,6 +43,7 @@ class RecursiveFlakePipeline(BasePipeline):
run_build_number (int): The build number of the current swarming rerun.
step_name (str): The step name.
test_name (str): The test name.
+ version_number (int): The version to save analysis results and data to.
master_build_number (int): The build number of the Master_Flake_analysis.
flakiness_algorithm_results_dict (dict): A dictionary used by
NextBuildNumberPipeline
@@ -38,11 +52,18 @@ class RecursiveFlakePipeline(BasePipeline):
Returns:
A dict of lists for reliable/flaky tests.
"""
- master = MasterFlakeAnalysis.Get(master_name, builder_name,
- master_build_number, step_name, test_name)
- if master.status != analysis_status.RUNNING: # pragma: no branch
- master.status = analysis_status.RUNNING
- master.put()
+ flake_analysis = MasterFlakeAnalysis.GetVersion(
+ master_name, builder_name, master_build_number, step_name, test_name,
+ version=version_number)
+ logging.info(
+ 'Running RecursiveFlakePipeline on MasterFlakeAnalysis %s/%s/%s/%s/%s',
+ master_name, builder_name, master_build_number, step_name, test_name)
+ logging.info(
+ 'MasterFlakeAnalysis %s version %s', flake_analysis, version_number)
+
+ if flake_analysis.status != analysis_status.RUNNING: # pragma: no branch
+ flake_analysis.status = analysis_status.RUNNING
+ flake_analysis.put()
# Call trigger pipeline (flake style).
task_id = yield TriggerFlakeSwarmingTaskPipeline(
@@ -50,39 +71,39 @@ class RecursiveFlakePipeline(BasePipeline):
# Pass the trigger pipeline into a process pipeline.
test_result_future = yield ProcessFlakeSwarmingTaskResultPipeline(
master_name, builder_name, run_build_number,
- step_name, task_id, master_build_number, test_name)
+ step_name, task_id, master_build_number, test_name, version_number)
yield NextBuildNumberPipeline(
master_name, builder_name, master_build_number, run_build_number,
- step_name, test_name, test_result_future, queue_name,
+ step_name, test_name, version_number, test_result_future, queue_name,
flakiness_algorithm_results_dict)
-def get_next_run(master, flakiness_algorithm_results_dict):
+def get_next_run(master_flake_analysis, flakiness_algorithm_results_dict):
# A description of this algorithm can be found at:
# https://docs.google.com/document/d/1wPYFZ5OT998Yn7O8wGDOhgfcQ98mknoX13AesJaS6ig/edit
# Get the last result.
- last_result = master.success_rates[-1]
- cur_run = min(master.build_numbers)
+ last_result = master_flake_analysis.data_points[-1].pass_rate
+ cur_run = min([d.build_number for d in master_flake_analysis.data_points])
flake_settings = waterfall_config.GetCheckFlakeSettings()
lower_flake_threshold = flake_settings.get('lower_flake_threshold')
upper_flake_threshold = flake_settings.get('upper_flake_threshold')
max_stable_in_a_row = flake_settings.get('max_stable_in_a_row')
max_flake_in_a_row = flake_settings.get('max_flake_in_a_row')
- if last_result < 0: # Test doesn't exist in the current build number.
+ if last_result < 0: # Test doesn't exist in the current build number.
flakiness_algorithm_results_dict['stable_in_a_row'] += 1
flakiness_algorithm_results_dict['stabled_out'] = True
flakiness_algorithm_results_dict['flaked_out'] = True
flakiness_algorithm_results_dict['lower_boundary_result'] = 'STABLE'
- lower_boundary = master.build_numbers[
- -flakiness_algorithm_results_dict['stable_in_a_row']]
+ lower_boundary = master_flake_analysis.data_points[
+ -flakiness_algorithm_results_dict['stable_in_a_row']].build_number
flakiness_algorithm_results_dict['lower_boundary'] = lower_boundary
flakiness_algorithm_results_dict['sequential_run_index'] += 1
return lower_boundary + 1
elif (last_result < lower_flake_threshold or
- last_result > upper_flake_threshold): # Stable result.
+ last_result > upper_flake_threshold): # Stable result.
flakiness_algorithm_results_dict['stable_in_a_row'] += 1
if (flakiness_algorithm_results_dict['stable_in_a_row'] >
max_stable_in_a_row): # Identified a stable region.
@@ -127,8 +148,9 @@ def get_next_run(master, flakiness_algorithm_results_dict):
return cur_run - step_size
-def sequential_next_run(master, flakiness_algorithm_results_dict):
- last_result = master.success_rates[-1]
+def sequential_next_run(
+ master_flake_analysis, flakiness_algorithm_results_dict):
+ last_result = master_flake_analysis.data_points[-1].pass_rate
last_result_status = 'FLAKE'
flake_settings = waterfall_config.GetCheckFlakeSettings()
lower_flake_threshold = flake_settings.get('lower_flake_threshold')
@@ -140,10 +162,10 @@ def sequential_next_run(master, flakiness_algorithm_results_dict):
if flakiness_algorithm_results_dict['sequential_run_index'] > 0:
if (last_result_status !=
flakiness_algorithm_results_dict['lower_boundary_result']):
- master.suspected_flake_build_number = (
+ master_flake_analysis.suspected_flake_build_number = (
flakiness_algorithm_results_dict['lower_boundary'] +
flakiness_algorithm_results_dict['sequential_run_index'])
- master.put()
+ master_flake_analysis.put()
return 0
flakiness_algorithm_results_dict['sequential_run_index'] += 1
return (flakiness_algorithm_results_dict['lower_boundary'] +
@@ -155,27 +177,36 @@ class NextBuildNumberPipeline(BasePipeline):
# Arguments number differs from overridden method - pylint: disable=W0221
# Unused argument - pylint: disable=W0613
def run(self, master_name, builder_name, master_build_number,
- run_build_number, step_name, test_name, test_result_future,
- queue_name, flakiness_algorithm_results_dict):
+ run_build_number, step_name, test_name, version_number,
+ test_result_future, queue_name, flakiness_algorithm_results_dict):
# Get MasterFlakeAnalysis success list corresponding to parameters.
- master = MasterFlakeAnalysis.Get(master_name, builder_name,
- master_build_number, step_name, test_name)
+ master_flake_analysis = MasterFlakeAnalysis.GetVersion(
+ master_name, builder_name, master_build_number, step_name, test_name,
+ version=version_number)
# Don't call another pipeline if we fail.
flake_swarming_task = FlakeSwarmingTask.Get(
master_name, builder_name, run_build_number, step_name, test_name)
if flake_swarming_task.status == analysis_status.ERROR:
- master.status = analysis_status.ERROR
- master.put()
+ # TODO(lijeffrey): Implement more detailed error detection and reporting,
+ # such as timeouts, dead bots, etc.
+ error = {
+ 'error': 'Swarming task failed',
+ 'message': 'Swarming task failed'
+ }
+ _UpdateAnalysisStatusUponCompletion(
+ master_flake_analysis, analysis_status.ERROR, error)
return
# Figure out what build_number we should call, if any
if (flakiness_algorithm_results_dict['stabled_out'] and
flakiness_algorithm_results_dict['flaked_out']):
- next_run = sequential_next_run(master, flakiness_algorithm_results_dict)
+ next_run = sequential_next_run(
+ master_flake_analysis, flakiness_algorithm_results_dict)
else:
- next_run = get_next_run(master, flakiness_algorithm_results_dict)
+ next_run = get_next_run(
+ master_flake_analysis, flakiness_algorithm_results_dict)
if next_run < flakiness_algorithm_results_dict['last_build_number']:
next_run = 0
@@ -185,12 +216,12 @@ class NextBuildNumberPipeline(BasePipeline):
if next_run:
pipeline_job = RecursiveFlakePipeline(
master_name, builder_name, next_run, step_name, test_name,
- master_build_number,
+ version_number, master_build_number,
flakiness_algorithm_results_dict=flakiness_algorithm_results_dict)
# pylint: disable=W0201
pipeline_job.target = appengine_util.GetTargetNameForModule(
constants.WATERFALL_BACKEND)
pipeline_job.start(queue_name=queue_name)
else:
- master.status = analysis_status.COMPLETED
- master.put()
+ _UpdateAnalysisStatusUponCompletion(
+ master_flake_analysis, analysis_status.COMPLETED, None)

Powered by Google App Engine
This is Rietveld 408576698