| Index: appengine/findit/waterfall/flake/recursive_flake_pipeline.py
|
| diff --git a/appengine/findit/waterfall/flake/recursive_flake_pipeline.py b/appengine/findit/waterfall/flake/recursive_flake_pipeline.py
|
| index 4b1e0efa45d8f7963ca97d3ab60f5f20b76d4129..419955f07c2ebf4971ce59c1d548f9092a02db57 100644
|
| --- a/appengine/findit/waterfall/flake/recursive_flake_pipeline.py
|
| +++ b/appengine/findit/waterfall/flake/recursive_flake_pipeline.py
|
| @@ -2,8 +2,11 @@
|
| # Use of this source code is governed by a BSD-style license that can be
|
| # found in the LICENSE file.
|
|
|
| +import logging
|
| +
|
| from common import appengine_util
|
| from common import constants
|
| +from common import time_util
|
| from common.pipeline_wrapper import BasePipeline
|
|
|
| from model import analysis_status
|
| @@ -16,12 +19,22 @@ from waterfall.trigger_flake_swarming_task_pipeline import (
|
| TriggerFlakeSwarmingTaskPipeline)
|
|
|
|
|
| +def _UpdateAnalysisStatusUponCompletion(master_flake_analysis, status, error):
|
| + master_flake_analysis.completed_time = time_util.GetUTCNow()
|
| + master_flake_analysis.status = status
|
| +
|
| + if error:
|
| + master_flake_analysis.error = error
|
| +
|
| + master_flake_analysis.put()
|
| +
|
| +
|
| class RecursiveFlakePipeline(BasePipeline):
|
|
|
| # Arguments number differs from overridden method - pylint: disable=W0221
|
| def run(self, master_name, builder_name, run_build_number, step_name,
|
| - test_name, master_build_number, flakiness_algorithm_results_dict,
|
| - queue_name=constants.DEFAULT_QUEUE):
|
| + test_name, version_number, master_build_number,
|
| + flakiness_algorithm_results_dict, queue_name=constants.DEFAULT_QUEUE):
|
| """Pipeline to determine the regression range of a flaky test.
|
|
|
| Args:
|
| @@ -30,6 +43,7 @@ class RecursiveFlakePipeline(BasePipeline):
|
| run_build_number (int): The build number of the current swarming rerun.
|
| step_name (str): The step name.
|
| test_name (str): The test name.
|
| + version_number (int): The version to save analysis results and data to.
|
| master_build_number (int): The build number of the Master_Flake_analysis.
|
| flakiness_algorithm_results_dict (dict): A dictionary used by
|
| NextBuildNumberPipeline
|
| @@ -38,11 +52,18 @@ class RecursiveFlakePipeline(BasePipeline):
|
| Returns:
|
| A dict of lists for reliable/flaky tests.
|
| """
|
| - master = MasterFlakeAnalysis.Get(master_name, builder_name,
|
| - master_build_number, step_name, test_name)
|
| - if master.status != analysis_status.RUNNING: # pragma: no branch
|
| - master.status = analysis_status.RUNNING
|
| - master.put()
|
| + flake_analysis = MasterFlakeAnalysis.GetVersion(
|
| + master_name, builder_name, master_build_number, step_name, test_name,
|
| + version=version_number)
|
| + logging.info(
|
| + 'Running RecursiveFlakePipeline on MasterFlakeAnalysis %s/%s/%s/%s/%s',
|
| + master_name, builder_name, master_build_number, step_name, test_name)
|
| + logging.info(
|
| + 'MasterFlakeAnalysis %s version %s', flake_analysis, version_number)
|
| +
|
| + if flake_analysis.status != analysis_status.RUNNING: # pragma: no branch
|
| + flake_analysis.status = analysis_status.RUNNING
|
| + flake_analysis.put()
|
|
|
| # Call trigger pipeline (flake style).
|
| task_id = yield TriggerFlakeSwarmingTaskPipeline(
|
| @@ -50,39 +71,39 @@ class RecursiveFlakePipeline(BasePipeline):
|
| # Pass the trigger pipeline into a process pipeline.
|
| test_result_future = yield ProcessFlakeSwarmingTaskResultPipeline(
|
| master_name, builder_name, run_build_number,
|
| - step_name, task_id, master_build_number, test_name)
|
| + step_name, task_id, master_build_number, test_name, version_number)
|
| yield NextBuildNumberPipeline(
|
| master_name, builder_name, master_build_number, run_build_number,
|
| - step_name, test_name, test_result_future, queue_name,
|
| + step_name, test_name, version_number, test_result_future, queue_name,
|
| flakiness_algorithm_results_dict)
|
|
|
|
|
| -def get_next_run(master, flakiness_algorithm_results_dict):
|
| +def get_next_run(master_flake_analysis, flakiness_algorithm_results_dict):
|
| # A description of this algorithm can be found at:
|
| # https://docs.google.com/document/d/1wPYFZ5OT998Yn7O8wGDOhgfcQ98mknoX13AesJaS6ig/edit
|
| # Get the last result.
|
| - last_result = master.success_rates[-1]
|
| - cur_run = min(master.build_numbers)
|
| + last_result = master_flake_analysis.data_points[-1].pass_rate
|
| + cur_run = min([d.build_number for d in master_flake_analysis.data_points])
|
| flake_settings = waterfall_config.GetCheckFlakeSettings()
|
| lower_flake_threshold = flake_settings.get('lower_flake_threshold')
|
| upper_flake_threshold = flake_settings.get('upper_flake_threshold')
|
| max_stable_in_a_row = flake_settings.get('max_stable_in_a_row')
|
| max_flake_in_a_row = flake_settings.get('max_flake_in_a_row')
|
|
|
| - if last_result < 0: # Test doesn't exist in the current build number.
|
| + if last_result < 0: # Test doesn't exist in the current build number.
|
| flakiness_algorithm_results_dict['stable_in_a_row'] += 1
|
| flakiness_algorithm_results_dict['stabled_out'] = True
|
| flakiness_algorithm_results_dict['flaked_out'] = True
|
| flakiness_algorithm_results_dict['lower_boundary_result'] = 'STABLE'
|
|
|
| - lower_boundary = master.build_numbers[
|
| - -flakiness_algorithm_results_dict['stable_in_a_row']]
|
| + lower_boundary = master_flake_analysis.data_points[
|
| + -flakiness_algorithm_results_dict['stable_in_a_row']].build_number
|
|
|
| flakiness_algorithm_results_dict['lower_boundary'] = lower_boundary
|
| flakiness_algorithm_results_dict['sequential_run_index'] += 1
|
| return lower_boundary + 1
|
| elif (last_result < lower_flake_threshold or
|
| - last_result > upper_flake_threshold): # Stable result.
|
| + last_result > upper_flake_threshold): # Stable result.
|
| flakiness_algorithm_results_dict['stable_in_a_row'] += 1
|
| if (flakiness_algorithm_results_dict['stable_in_a_row'] >
|
| max_stable_in_a_row): # Identified a stable region.
|
| @@ -127,8 +148,9 @@ def get_next_run(master, flakiness_algorithm_results_dict):
|
| return cur_run - step_size
|
|
|
|
|
| -def sequential_next_run(master, flakiness_algorithm_results_dict):
|
| - last_result = master.success_rates[-1]
|
| +def sequential_next_run(
|
| + master_flake_analysis, flakiness_algorithm_results_dict):
|
| + last_result = master_flake_analysis.data_points[-1].pass_rate
|
| last_result_status = 'FLAKE'
|
| flake_settings = waterfall_config.GetCheckFlakeSettings()
|
| lower_flake_threshold = flake_settings.get('lower_flake_threshold')
|
| @@ -140,10 +162,10 @@ def sequential_next_run(master, flakiness_algorithm_results_dict):
|
| if flakiness_algorithm_results_dict['sequential_run_index'] > 0:
|
| if (last_result_status !=
|
| flakiness_algorithm_results_dict['lower_boundary_result']):
|
| - master.suspected_flake_build_number = (
|
| + master_flake_analysis.suspected_flake_build_number = (
|
| flakiness_algorithm_results_dict['lower_boundary'] +
|
| flakiness_algorithm_results_dict['sequential_run_index'])
|
| - master.put()
|
| + master_flake_analysis.put()
|
| return 0
|
| flakiness_algorithm_results_dict['sequential_run_index'] += 1
|
| return (flakiness_algorithm_results_dict['lower_boundary'] +
|
| @@ -155,27 +177,36 @@ class NextBuildNumberPipeline(BasePipeline):
|
| # Arguments number differs from overridden method - pylint: disable=W0221
|
| # Unused argument - pylint: disable=W0613
|
| def run(self, master_name, builder_name, master_build_number,
|
| - run_build_number, step_name, test_name, test_result_future,
|
| - queue_name, flakiness_algorithm_results_dict):
|
| + run_build_number, step_name, test_name, version_number,
|
| + test_result_future, queue_name, flakiness_algorithm_results_dict):
|
|
|
| # Get MasterFlakeAnalysis success list corresponding to parameters.
|
| - master = MasterFlakeAnalysis.Get(master_name, builder_name,
|
| - master_build_number, step_name, test_name)
|
| + master_flake_analysis = MasterFlakeAnalysis.GetVersion(
|
| + master_name, builder_name, master_build_number, step_name, test_name,
|
| + version=version_number)
|
| # Don't call another pipeline if we fail.
|
| flake_swarming_task = FlakeSwarmingTask.Get(
|
| master_name, builder_name, run_build_number, step_name, test_name)
|
|
|
| if flake_swarming_task.status == analysis_status.ERROR:
|
| - master.status = analysis_status.ERROR
|
| - master.put()
|
| + # TODO(lijeffrey): Implement more detailed error detection and reporting,
|
| + # such as timeouts, dead bots, etc.
|
| + error = {
|
| + 'error': 'Swarming task failed',
|
| + 'message': 'Swarming task failed'
|
| + }
|
| + _UpdateAnalysisStatusUponCompletion(
|
| + master_flake_analysis, analysis_status.ERROR, error)
|
| return
|
|
|
| # Figure out what build_number we should call, if any
|
| if (flakiness_algorithm_results_dict['stabled_out'] and
|
| flakiness_algorithm_results_dict['flaked_out']):
|
| - next_run = sequential_next_run(master, flakiness_algorithm_results_dict)
|
| + next_run = sequential_next_run(
|
| + master_flake_analysis, flakiness_algorithm_results_dict)
|
| else:
|
| - next_run = get_next_run(master, flakiness_algorithm_results_dict)
|
| + next_run = get_next_run(
|
| + master_flake_analysis, flakiness_algorithm_results_dict)
|
|
|
| if next_run < flakiness_algorithm_results_dict['last_build_number']:
|
| next_run = 0
|
| @@ -185,12 +216,12 @@ class NextBuildNumberPipeline(BasePipeline):
|
| if next_run:
|
| pipeline_job = RecursiveFlakePipeline(
|
| master_name, builder_name, next_run, step_name, test_name,
|
| - master_build_number,
|
| + version_number, master_build_number,
|
| flakiness_algorithm_results_dict=flakiness_algorithm_results_dict)
|
| # pylint: disable=W0201
|
| pipeline_job.target = appengine_util.GetTargetNameForModule(
|
| constants.WATERFALL_BACKEND)
|
| pipeline_job.start(queue_name=queue_name)
|
| else:
|
| - master.status = analysis_status.COMPLETED
|
| - master.put()
|
| + _UpdateAnalysisStatusUponCompletion(
|
| + master_flake_analysis, analysis_status.COMPLETED, None)
|
|
|