| OLD | NEW |
| 1 # Copyright 2016 The Chromium Authors. All rights reserved. | 1 # Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 import logging | 5 import logging |
| 6 from datetime import timedelta |
| 7 import random |
| 6 | 8 |
| 7 from common import appengine_util | 9 from common import appengine_util |
| 8 from common import constants | 10 from common import constants |
| 9 from common import time_util | 11 from common import time_util |
| 10 from common.pipeline_wrapper import BasePipeline | 12 from common.pipeline_wrapper import BasePipeline |
| 11 | 13 |
| 12 from model import analysis_status | 14 from model import analysis_status |
| 13 from model.flake.flake_swarming_task import FlakeSwarmingTask | 15 from model.flake.flake_swarming_task import FlakeSwarmingTask |
| 14 from model.flake.master_flake_analysis import MasterFlakeAnalysis | 16 from model.flake.master_flake_analysis import MasterFlakeAnalysis |
| 15 from waterfall import waterfall_config | 17 from waterfall import waterfall_config |
| 16 from waterfall.process_flake_swarming_task_result_pipeline import ( | 18 from waterfall.process_flake_swarming_task_result_pipeline import ( |
| 17 ProcessFlakeSwarmingTaskResultPipeline) | 19 ProcessFlakeSwarmingTaskResultPipeline) |
| 18 from waterfall.trigger_flake_swarming_task_pipeline import ( | 20 from waterfall.trigger_flake_swarming_task_pipeline import ( |
| 19 TriggerFlakeSwarmingTaskPipeline) | 21 TriggerFlakeSwarmingTaskPipeline) |
| 20 | 22 |
| 21 | 23 |
| 22 def _UpdateAnalysisStatusUponCompletion(master_flake_analysis, status, error): | 24 def _UpdateAnalysisStatusUponCompletion(master_flake_analysis, status, error): |
| 23 master_flake_analysis.completed_time = time_util.GetUTCNow() | 25 master_flake_analysis.completed_time = time_util.GetUTCNow() |
| 24 master_flake_analysis.status = status | 26 master_flake_analysis.status = status |
| 25 | 27 |
| 26 if error: | 28 if error: |
| 27 master_flake_analysis.error = error | 29 master_flake_analysis.error = error |
| 28 | 30 |
| 29 master_flake_analysis.put() | 31 master_flake_analysis.put() |
| 30 | 32 |
| 31 | 33 |
| 34 def _GetETAToStartAnalysis(manually_triggered): |
| 35 """Returns an ETA as of a UTC datetime.datetime to start the analysis. |
| 36 |
| 37 If not urgent, Swarming tasks should be run off PST peak hours from 11am to |
| 38 6pm on workdays. |
| 39 |
| 40 Args: |
| 41 manually_triggered (bool): True if the analysis is from manual request, like |
| 42 by a Chromium sheriff. |
| 43 |
| 44 Returns: |
| 45 The ETA as of a UTC datetime.datetime to start the analysis. |
| 46 """ |
| 47 if manually_triggered: |
| 48 # If the analysis is manually triggered, run it right away. |
| 49 return time_util.GetUTCNow() |
| 50 |
| 51 now_at_mtv = time_util.GetDatetimeInTimezone( |
| 52 'US/Pacific', time_util.GetUTCNowWithTimezone()) |
| 53 if now_at_mtv.weekday() >= 5: # PST Saturday or Sunday. |
| 54 return time_util.GetUTCNow() |
| 55 |
| 56 if now_at_mtv.hour < 11 or now_at_mtv.hour >= 18: # Before 11am or after 6pm. |
| 57 return time_util.GetUTCNow() |
| 58 |
| 59 # Set ETA time to 6pm, and also with a random latency within 30 minutes to |
| 60 # avoid sudden burst traffic to Swarming. |
| 61 diff = timedelta(hours=18 - now_at_mtv.hour, |
| 62 minutes=-now_at_mtv.minute, |
| 63 seconds=-now_at_mtv.second + random.randint(0, 30 * 60), |
| 64 microseconds=-now_at_mtv.microsecond) |
| 65 eta = now_at_mtv + diff |
| 66 |
| 67 # Convert back to UTC. |
| 68 return time_util.GetDatetimeInTimezone('UTC', eta) |
| 69 |
| 70 |
| 32 class RecursiveFlakePipeline(BasePipeline): | 71 class RecursiveFlakePipeline(BasePipeline): |
| 33 | 72 |
| 73 def __init__(self, *args, **kwargs): |
| 74 super(RecursiveFlakePipeline, self).__init__(*args, **kwargs) |
| 75 self.manually_triggered = kwargs.get('manually_triggered', False) |
| 76 |
| 77 def StartOffPSTPeakHours(self, *args, **kwargs): |
| 78 """Starts the pipeline off PST peak hours if not triggered manually.""" |
| 79 kwargs['eta'] = _GetETAToStartAnalysis(self.manually_triggered) |
| 80 self.start(*args, **kwargs) |
| 81 |
| 34 # Arguments number differs from overridden method - pylint: disable=W0221 | 82 # Arguments number differs from overridden method - pylint: disable=W0221 |
| 35 def run(self, master_name, builder_name, run_build_number, step_name, | 83 def run(self, master_name, builder_name, run_build_number, step_name, |
| 36 test_name, version_number, master_build_number, | 84 test_name, version_number, master_build_number, |
| 37 flakiness_algorithm_results_dict, queue_name=constants.DEFAULT_QUEUE): | 85 flakiness_algorithm_results_dict, manually_triggered=False): |
| 38 """Pipeline to determine the regression range of a flaky test. | 86 """Pipeline to determine the regression range of a flaky test. |
| 39 | 87 |
| 40 Args: | 88 Args: |
| 41 master_name (str): The master name. | 89 master_name (str): The master name. |
| 42 builder_name (str): The builder name. | 90 builder_name (str): The builder name. |
| 43 run_build_number (int): The build number of the current swarming rerun. | 91 run_build_number (int): The build number of the current swarming rerun. |
| 44 step_name (str): The step name. | 92 step_name (str): The step name. |
| 45 test_name (str): The test name. | 93 test_name (str): The test name. |
| 46 version_number (int): The version to save analysis results and data to. | 94 version_number (int): The version to save analysis results and data to. |
| 47 master_build_number (int): The build number of the Master_Flake_analysis. | 95 master_build_number (int): The build number of the Master_Flake_analysis. |
| 48 flakiness_algorithm_results_dict (dict): A dictionary used by | 96 flakiness_algorithm_results_dict (dict): A dictionary used by |
| 49 NextBuildNumberPipeline | 97 NextBuildNumberPipeline |
| 50 queue_name (str): Which queue to run on. | 98 manually_triggered (bool): True if the analysis is from manual request, |
| 99 like by a Chromium sheriff. |
| 51 | 100 |
| 52 Returns: | 101 Returns: |
| 53 A dict of lists for reliable/flaky tests. | 102 A dict of lists for reliable/flaky tests. |
| 54 """ | 103 """ |
| 55 flake_analysis = MasterFlakeAnalysis.GetVersion( | 104 flake_analysis = MasterFlakeAnalysis.GetVersion( |
| 56 master_name, builder_name, master_build_number, step_name, test_name, | 105 master_name, builder_name, master_build_number, step_name, test_name, |
| 57 version=version_number) | 106 version=version_number) |
| 58 logging.info( | 107 logging.info( |
| 59 'Running RecursiveFlakePipeline on MasterFlakeAnalysis %s/%s/%s/%s/%s', | 108 'Running RecursiveFlakePipeline on MasterFlakeAnalysis %s/%s/%s/%s/%s', |
| 60 master_name, builder_name, master_build_number, step_name, test_name) | 109 master_name, builder_name, master_build_number, step_name, test_name) |
| 61 logging.info( | 110 logging.info( |
| 62 'MasterFlakeAnalysis %s version %s', flake_analysis, version_number) | 111 'MasterFlakeAnalysis %s version %s', flake_analysis, version_number) |
| 63 | 112 |
| 64 if flake_analysis.status != analysis_status.RUNNING: # pragma: no branch | 113 if flake_analysis.status != analysis_status.RUNNING: # pragma: no branch |
| 65 flake_analysis.status = analysis_status.RUNNING | 114 flake_analysis.status = analysis_status.RUNNING |
| 66 flake_analysis.put() | 115 flake_analysis.put() |
| 67 | 116 |
| 68 # Call trigger pipeline (flake style). | 117 # Call trigger pipeline (flake style). |
| 69 task_id = yield TriggerFlakeSwarmingTaskPipeline( | 118 task_id = yield TriggerFlakeSwarmingTaskPipeline( |
| 70 master_name, builder_name, run_build_number, step_name, [test_name]) | 119 master_name, builder_name, run_build_number, step_name, [test_name]) |
| 71 # Pass the trigger pipeline into a process pipeline. | 120 # Pass the trigger pipeline into a process pipeline. |
| 72 test_result_future = yield ProcessFlakeSwarmingTaskResultPipeline( | 121 test_result_future = yield ProcessFlakeSwarmingTaskResultPipeline( |
| 73 master_name, builder_name, run_build_number, | 122 master_name, builder_name, run_build_number, |
| 74 step_name, task_id, master_build_number, test_name, version_number) | 123 step_name, task_id, master_build_number, test_name, version_number) |
| 75 yield NextBuildNumberPipeline( | 124 yield NextBuildNumberPipeline( |
| 76 master_name, builder_name, master_build_number, run_build_number, | 125 master_name, builder_name, master_build_number, run_build_number, |
| 77 step_name, test_name, version_number, test_result_future, queue_name, | 126 step_name, test_name, version_number, test_result_future, |
| 78 flakiness_algorithm_results_dict) | 127 flakiness_algorithm_results_dict, manually_triggered=manually_triggered) |
| 79 | 128 |
| 80 | 129 |
| 81 def get_next_run(master_flake_analysis, flakiness_algorithm_results_dict): | 130 def get_next_run(master_flake_analysis, flakiness_algorithm_results_dict): |
| 82 # A description of this algorithm can be found at: | 131 # A description of this algorithm can be found at: |
| 83 # https://docs.google.com/document/d/1wPYFZ5OT998Yn7O8wGDOhgfcQ98mknoX13AesJaS
6ig/edit | 132 # https://docs.google.com/document/d/1wPYFZ5OT998Yn7O8wGDOhgfcQ98mknoX13AesJaS
6ig/edit |
| 84 # Get the last result. | 133 # Get the last result. |
| 85 last_result = master_flake_analysis.data_points[-1].pass_rate | 134 last_result = master_flake_analysis.data_points[-1].pass_rate |
| 86 cur_run = min([d.build_number for d in master_flake_analysis.data_points]) | 135 cur_run = min([d.build_number for d in master_flake_analysis.data_points]) |
| 87 flake_settings = waterfall_config.GetCheckFlakeSettings() | 136 flake_settings = waterfall_config.GetCheckFlakeSettings() |
| 88 lower_flake_threshold = flake_settings.get('lower_flake_threshold') | 137 lower_flake_threshold = flake_settings.get('lower_flake_threshold') |
| (...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 171 return (flakiness_algorithm_results_dict['lower_boundary'] + | 220 return (flakiness_algorithm_results_dict['lower_boundary'] + |
| 172 flakiness_algorithm_results_dict['sequential_run_index']) | 221 flakiness_algorithm_results_dict['sequential_run_index']) |
| 173 | 222 |
| 174 | 223 |
| 175 class NextBuildNumberPipeline(BasePipeline): | 224 class NextBuildNumberPipeline(BasePipeline): |
| 176 | 225 |
| 177 # Arguments number differs from overridden method - pylint: disable=W0221 | 226 # Arguments number differs from overridden method - pylint: disable=W0221 |
| 178 # Unused argument - pylint: disable=W0613 | 227 # Unused argument - pylint: disable=W0613 |
| 179 def run(self, master_name, builder_name, master_build_number, | 228 def run(self, master_name, builder_name, master_build_number, |
| 180 run_build_number, step_name, test_name, version_number, | 229 run_build_number, step_name, test_name, version_number, |
| 181 test_result_future, queue_name, flakiness_algorithm_results_dict): | 230 test_result_future, flakiness_algorithm_results_dict, |
| 231 manually_triggered=False): |
| 182 | 232 |
| 183 # Get MasterFlakeAnalysis success list corresponding to parameters. | 233 # Get MasterFlakeAnalysis success list corresponding to parameters. |
| 184 master_flake_analysis = MasterFlakeAnalysis.GetVersion( | 234 master_flake_analysis = MasterFlakeAnalysis.GetVersion( |
| 185 master_name, builder_name, master_build_number, step_name, test_name, | 235 master_name, builder_name, master_build_number, step_name, test_name, |
| 186 version=version_number) | 236 version=version_number) |
| 187 # Don't call another pipeline if we fail. | 237 # Don't call another pipeline if we fail. |
| 188 flake_swarming_task = FlakeSwarmingTask.Get( | 238 flake_swarming_task = FlakeSwarmingTask.Get( |
| 189 master_name, builder_name, run_build_number, step_name, test_name) | 239 master_name, builder_name, run_build_number, step_name, test_name) |
| 190 | 240 |
| 191 if flake_swarming_task.status == analysis_status.ERROR: | 241 if flake_swarming_task.status == analysis_status.ERROR: |
| (...skipping 18 matching lines...) Expand all Loading... |
| 210 | 260 |
| 211 if next_run < flakiness_algorithm_results_dict['last_build_number']: | 261 if next_run < flakiness_algorithm_results_dict['last_build_number']: |
| 212 next_run = 0 | 262 next_run = 0 |
| 213 elif next_run >= master_build_number: | 263 elif next_run >= master_build_number: |
| 214 next_run = 0 | 264 next_run = 0 |
| 215 | 265 |
| 216 if next_run: | 266 if next_run: |
| 217 pipeline_job = RecursiveFlakePipeline( | 267 pipeline_job = RecursiveFlakePipeline( |
| 218 master_name, builder_name, next_run, step_name, test_name, | 268 master_name, builder_name, next_run, step_name, test_name, |
| 219 version_number, master_build_number, | 269 version_number, master_build_number, |
| 220 flakiness_algorithm_results_dict=flakiness_algorithm_results_dict) | 270 flakiness_algorithm_results_dict=flakiness_algorithm_results_dict, |
| 271 manually_triggered=manually_triggered) |
| 221 # pylint: disable=W0201 | 272 # pylint: disable=W0201 |
| 222 pipeline_job.target = appengine_util.GetTargetNameForModule( | 273 pipeline_job.target = appengine_util.GetTargetNameForModule( |
| 223 constants.WATERFALL_BACKEND) | 274 constants.WATERFALL_BACKEND) |
| 224 pipeline_job.start(queue_name=queue_name) | 275 pipeline_job.StartOffPSTPeakHours( |
| 276 queue_name=self.queue_name or constants.DEFAULT_QUEUE) |
| 225 else: | 277 else: |
| 226 _UpdateAnalysisStatusUponCompletion( | 278 _UpdateAnalysisStatusUponCompletion( |
| 227 master_flake_analysis, analysis_status.COMPLETED, None) | 279 master_flake_analysis, analysis_status.COMPLETED, None) |
| OLD | NEW |