Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(400)

Side by Side Diff: appengine/findit/waterfall/flake/recursive_flake_pipeline.py

Issue 2376573004: [Findit] For automatic analyses of flaky tests, run the Swarming tasks off PST peak hours. (Closed)
Patch Set: Fix nit. Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2016 The Chromium Authors. All rights reserved. 1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 import logging 5 import logging
6 from datetime import timedelta
7 import random
6 8
7 from common import appengine_util 9 from common import appengine_util
8 from common import constants 10 from common import constants
9 from common import time_util 11 from common import time_util
10 from common.pipeline_wrapper import BasePipeline 12 from common.pipeline_wrapper import BasePipeline
11 13
12 from model import analysis_status 14 from model import analysis_status
13 from model.flake.flake_swarming_task import FlakeSwarmingTask 15 from model.flake.flake_swarming_task import FlakeSwarmingTask
14 from model.flake.master_flake_analysis import MasterFlakeAnalysis 16 from model.flake.master_flake_analysis import MasterFlakeAnalysis
15 from waterfall import waterfall_config 17 from waterfall import waterfall_config
16 from waterfall.process_flake_swarming_task_result_pipeline import ( 18 from waterfall.process_flake_swarming_task_result_pipeline import (
17 ProcessFlakeSwarmingTaskResultPipeline) 19 ProcessFlakeSwarmingTaskResultPipeline)
18 from waterfall.trigger_flake_swarming_task_pipeline import ( 20 from waterfall.trigger_flake_swarming_task_pipeline import (
19 TriggerFlakeSwarmingTaskPipeline) 21 TriggerFlakeSwarmingTaskPipeline)
20 22
21 23
22 def _UpdateAnalysisStatusUponCompletion(master_flake_analysis, status, error): 24 def _UpdateAnalysisStatusUponCompletion(master_flake_analysis, status, error):
23 master_flake_analysis.completed_time = time_util.GetUTCNow() 25 master_flake_analysis.completed_time = time_util.GetUTCNow()
24 master_flake_analysis.status = status 26 master_flake_analysis.status = status
25 27
26 if error: 28 if error:
27 master_flake_analysis.error = error 29 master_flake_analysis.error = error
28 30
29 master_flake_analysis.put() 31 master_flake_analysis.put()
30 32
31 33
34 def _GetETAToStartAnalysis(manually_triggered):
35 """Returns an ETA as of a UTC datetime.datetime to start the analysis.
36
37 If not urgent, Swarming tasks should be run off PST peak hours from 11am to
38 6pm on workdays.
39
40 Args:
41 manually_triggered (bool): True if the analysis is from manual request, like
42 by a Chromium sheriff.
43
44 Returns:
45 The ETA as of a UTC datetime.datetime to start the analysis.
46 """
47 if manually_triggered:
48 # If the analysis is manually triggered, run it right away.
49 return time_util.GetUTCNow()
50
51 now_at_mtv = time_util.GetDatetimeInTimezone(
52 'US/Pacific', time_util.GetUTCNowWithTimezone())
53 if now_at_mtv.weekday() >= 5: # PST Saturday or Sunday.
54 return time_util.GetUTCNow()
55
56 if now_at_mtv.hour < 11 or now_at_mtv.hour >= 18: # Before 11am or after 6pm.
57 return time_util.GetUTCNow()
58
59 # Set ETA time to 6pm, and also with a random latency within 30 minutes to
60 # avoid sudden burst traffic to Swarming.
61 diff = timedelta(hours=18 - now_at_mtv.hour,
62 minutes=-now_at_mtv.minute,
63 seconds=-now_at_mtv.second + random.randint(0, 30 * 60),
64 microseconds=-now_at_mtv.microsecond)
65 eta = now_at_mtv + diff
66
67 # Convert back to UTC.
68 return time_util.GetDatetimeInTimezone('UTC', eta)
69
70
32 class RecursiveFlakePipeline(BasePipeline): 71 class RecursiveFlakePipeline(BasePipeline):
33 72
73 def __init__(self, *args, **kwargs):
74 super(RecursiveFlakePipeline, self).__init__(*args, **kwargs)
75 self.manually_triggered = kwargs.get('manually_triggered', False)
76
77 def StartOffPSTPeakHours(self, *args, **kwargs):
78 """Starts the pipeline off PST peak hours if not triggered manually."""
79 kwargs['eta'] = _GetETAToStartAnalysis(self.manually_triggered)
80 self.start(*args, **kwargs)
81
34 # Arguments number differs from overridden method - pylint: disable=W0221 82 # Arguments number differs from overridden method - pylint: disable=W0221
35 def run(self, master_name, builder_name, run_build_number, step_name, 83 def run(self, master_name, builder_name, run_build_number, step_name,
36 test_name, version_number, master_build_number, 84 test_name, version_number, master_build_number,
37 flakiness_algorithm_results_dict, queue_name=constants.DEFAULT_QUEUE): 85 flakiness_algorithm_results_dict, manually_triggered=False):
38 """Pipeline to determine the regression range of a flaky test. 86 """Pipeline to determine the regression range of a flaky test.
39 87
40 Args: 88 Args:
41 master_name (str): The master name. 89 master_name (str): The master name.
42 builder_name (str): The builder name. 90 builder_name (str): The builder name.
43 run_build_number (int): The build number of the current swarming rerun. 91 run_build_number (int): The build number of the current swarming rerun.
44 step_name (str): The step name. 92 step_name (str): The step name.
45 test_name (str): The test name. 93 test_name (str): The test name.
46 version_number (int): The version to save analysis results and data to. 94 version_number (int): The version to save analysis results and data to.
47 master_build_number (int): The build number of the Master_Flake_analysis. 95 master_build_number (int): The build number of the Master_Flake_analysis.
48 flakiness_algorithm_results_dict (dict): A dictionary used by 96 flakiness_algorithm_results_dict (dict): A dictionary used by
49 NextBuildNumberPipeline 97 NextBuildNumberPipeline
50 queue_name (str): Which queue to run on. 98 manually_triggered (bool): True if the analysis is from manual request,
99 like by a Chromium sheriff.
51 100
52 Returns: 101 Returns:
53 A dict of lists for reliable/flaky tests. 102 A dict of lists for reliable/flaky tests.
54 """ 103 """
55 flake_analysis = MasterFlakeAnalysis.GetVersion( 104 flake_analysis = MasterFlakeAnalysis.GetVersion(
56 master_name, builder_name, master_build_number, step_name, test_name, 105 master_name, builder_name, master_build_number, step_name, test_name,
57 version=version_number) 106 version=version_number)
58 logging.info( 107 logging.info(
59 'Running RecursiveFlakePipeline on MasterFlakeAnalysis %s/%s/%s/%s/%s', 108 'Running RecursiveFlakePipeline on MasterFlakeAnalysis %s/%s/%s/%s/%s',
60 master_name, builder_name, master_build_number, step_name, test_name) 109 master_name, builder_name, master_build_number, step_name, test_name)
61 logging.info( 110 logging.info(
62 'MasterFlakeAnalysis %s version %s', flake_analysis, version_number) 111 'MasterFlakeAnalysis %s version %s', flake_analysis, version_number)
63 112
64 if flake_analysis.status != analysis_status.RUNNING: # pragma: no branch 113 if flake_analysis.status != analysis_status.RUNNING: # pragma: no branch
65 flake_analysis.status = analysis_status.RUNNING 114 flake_analysis.status = analysis_status.RUNNING
66 flake_analysis.put() 115 flake_analysis.put()
67 116
68 # Call trigger pipeline (flake style). 117 # Call trigger pipeline (flake style).
69 task_id = yield TriggerFlakeSwarmingTaskPipeline( 118 task_id = yield TriggerFlakeSwarmingTaskPipeline(
70 master_name, builder_name, run_build_number, step_name, [test_name]) 119 master_name, builder_name, run_build_number, step_name, [test_name])
71 # Pass the trigger pipeline into a process pipeline. 120 # Pass the trigger pipeline into a process pipeline.
72 test_result_future = yield ProcessFlakeSwarmingTaskResultPipeline( 121 test_result_future = yield ProcessFlakeSwarmingTaskResultPipeline(
73 master_name, builder_name, run_build_number, 122 master_name, builder_name, run_build_number,
74 step_name, task_id, master_build_number, test_name, version_number) 123 step_name, task_id, master_build_number, test_name, version_number)
75 yield NextBuildNumberPipeline( 124 yield NextBuildNumberPipeline(
76 master_name, builder_name, master_build_number, run_build_number, 125 master_name, builder_name, master_build_number, run_build_number,
77 step_name, test_name, version_number, test_result_future, queue_name, 126 step_name, test_name, version_number, test_result_future,
78 flakiness_algorithm_results_dict) 127 flakiness_algorithm_results_dict, manually_triggered=manually_triggered)
79 128
80 129
81 def get_next_run(master_flake_analysis, flakiness_algorithm_results_dict): 130 def get_next_run(master_flake_analysis, flakiness_algorithm_results_dict):
82 # A description of this algorithm can be found at: 131 # A description of this algorithm can be found at:
83 # https://docs.google.com/document/d/1wPYFZ5OT998Yn7O8wGDOhgfcQ98mknoX13AesJaS 6ig/edit 132 # https://docs.google.com/document/d/1wPYFZ5OT998Yn7O8wGDOhgfcQ98mknoX13AesJaS 6ig/edit
84 # Get the last result. 133 # Get the last result.
85 last_result = master_flake_analysis.data_points[-1].pass_rate 134 last_result = master_flake_analysis.data_points[-1].pass_rate
86 cur_run = min([d.build_number for d in master_flake_analysis.data_points]) 135 cur_run = min([d.build_number for d in master_flake_analysis.data_points])
87 flake_settings = waterfall_config.GetCheckFlakeSettings() 136 flake_settings = waterfall_config.GetCheckFlakeSettings()
88 lower_flake_threshold = flake_settings.get('lower_flake_threshold') 137 lower_flake_threshold = flake_settings.get('lower_flake_threshold')
(...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after
171 return (flakiness_algorithm_results_dict['lower_boundary'] + 220 return (flakiness_algorithm_results_dict['lower_boundary'] +
172 flakiness_algorithm_results_dict['sequential_run_index']) 221 flakiness_algorithm_results_dict['sequential_run_index'])
173 222
174 223
175 class NextBuildNumberPipeline(BasePipeline): 224 class NextBuildNumberPipeline(BasePipeline):
176 225
177 # Arguments number differs from overridden method - pylint: disable=W0221 226 # Arguments number differs from overridden method - pylint: disable=W0221
178 # Unused argument - pylint: disable=W0613 227 # Unused argument - pylint: disable=W0613
179 def run(self, master_name, builder_name, master_build_number, 228 def run(self, master_name, builder_name, master_build_number,
180 run_build_number, step_name, test_name, version_number, 229 run_build_number, step_name, test_name, version_number,
181 test_result_future, queue_name, flakiness_algorithm_results_dict): 230 test_result_future, flakiness_algorithm_results_dict,
231 manually_triggered=False):
182 232
183 # Get MasterFlakeAnalysis success list corresponding to parameters. 233 # Get MasterFlakeAnalysis success list corresponding to parameters.
184 master_flake_analysis = MasterFlakeAnalysis.GetVersion( 234 master_flake_analysis = MasterFlakeAnalysis.GetVersion(
185 master_name, builder_name, master_build_number, step_name, test_name, 235 master_name, builder_name, master_build_number, step_name, test_name,
186 version=version_number) 236 version=version_number)
187 # Don't call another pipeline if we fail. 237 # Don't call another pipeline if we fail.
188 flake_swarming_task = FlakeSwarmingTask.Get( 238 flake_swarming_task = FlakeSwarmingTask.Get(
189 master_name, builder_name, run_build_number, step_name, test_name) 239 master_name, builder_name, run_build_number, step_name, test_name)
190 240
191 if flake_swarming_task.status == analysis_status.ERROR: 241 if flake_swarming_task.status == analysis_status.ERROR:
(...skipping 18 matching lines...) Expand all
210 260
211 if next_run < flakiness_algorithm_results_dict['last_build_number']: 261 if next_run < flakiness_algorithm_results_dict['last_build_number']:
212 next_run = 0 262 next_run = 0
213 elif next_run >= master_build_number: 263 elif next_run >= master_build_number:
214 next_run = 0 264 next_run = 0
215 265
216 if next_run: 266 if next_run:
217 pipeline_job = RecursiveFlakePipeline( 267 pipeline_job = RecursiveFlakePipeline(
218 master_name, builder_name, next_run, step_name, test_name, 268 master_name, builder_name, next_run, step_name, test_name,
219 version_number, master_build_number, 269 version_number, master_build_number,
220 flakiness_algorithm_results_dict=flakiness_algorithm_results_dict) 270 flakiness_algorithm_results_dict=flakiness_algorithm_results_dict,
271 manually_triggered=manually_triggered)
221 # pylint: disable=W0201 272 # pylint: disable=W0201
222 pipeline_job.target = appengine_util.GetTargetNameForModule( 273 pipeline_job.target = appengine_util.GetTargetNameForModule(
223 constants.WATERFALL_BACKEND) 274 constants.WATERFALL_BACKEND)
224 pipeline_job.start(queue_name=queue_name) 275 pipeline_job.StartOffPSTPeakHours(
276 queue_name=self.queue_name or constants.DEFAULT_QUEUE)
225 else: 277 else:
226 _UpdateAnalysisStatusUponCompletion( 278 _UpdateAnalysisStatusUponCompletion(
227 master_flake_analysis, analysis_status.COMPLETED, None) 279 master_flake_analysis, analysis_status.COMPLETED, None)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698