| Index: appengine/findit/waterfall/flake/recursive_flake_pipeline.py
|
| diff --git a/appengine/findit/waterfall/flake/recursive_flake_pipeline.py b/appengine/findit/waterfall/flake/recursive_flake_pipeline.py
|
| index 419955f07c2ebf4971ce59c1d548f9092a02db57..cd92068e2c85e946333277fdf3589e3068aca682 100644
|
| --- a/appengine/findit/waterfall/flake/recursive_flake_pipeline.py
|
| +++ b/appengine/findit/waterfall/flake/recursive_flake_pipeline.py
|
| @@ -3,6 +3,8 @@
|
| # found in the LICENSE file.
|
|
|
| import logging
|
| +from datetime import timedelta
|
| +import random
|
|
|
| from common import appengine_util
|
| from common import constants
|
| @@ -29,12 +31,58 @@ def _UpdateAnalysisStatusUponCompletion(master_flake_analysis, status, error):
|
| master_flake_analysis.put()
|
|
|
|
|
| +def _GetETAToStartAnalysis(manually_triggered):
|
| + """Returns an ETA as of a UTC datetime.datetime to start the analysis.
|
| +
|
| + If not urgent, Swarming tasks should be run off PST peak hours from 11am to
|
| + 6pm on workdays.
|
| +
|
| + Args:
|
| + manually_triggered (bool): True if the analysis is from manual request, like
|
| + by a Chromium sheriff.
|
| +
|
| + Returns:
|
| + The ETA as of a UTC datetime.datetime to start the analysis.
|
| + """
|
| + if manually_triggered:
|
| + # If the analysis is manually triggered, run it right away.
|
| + return time_util.GetUTCNow()
|
| +
|
| + now_at_mtv = time_util.GetDatetimeInTimezone(
|
| + 'US/Pacific', time_util.GetUTCNowWithTimezone())
|
| + if now_at_mtv.weekday() >= 5: # PST Saturday or Sunday.
|
| + return time_util.GetUTCNow()
|
| +
|
| + if now_at_mtv.hour < 11 or now_at_mtv.hour >= 18: # Before 11am or after 6pm.
|
| + return time_util.GetUTCNow()
|
| +
|
| + # Set ETA time to 6pm, and also with a random latency within 30 minutes to
|
| + # avoid sudden burst traffic to Swarming.
|
| + diff = timedelta(hours=18 - now_at_mtv.hour,
|
| + minutes=-now_at_mtv.minute,
|
| + seconds=-now_at_mtv.second + random.randint(0, 30 * 60),
|
| + microseconds=-now_at_mtv.microsecond)
|
| + eta = now_at_mtv + diff
|
| +
|
| + # Convert back to UTC.
|
| + return time_util.GetDatetimeInTimezone('UTC', eta)
|
| +
|
| +
|
| class RecursiveFlakePipeline(BasePipeline):
|
|
|
| + def __init__(self, *args, **kwargs):
|
| + super(RecursiveFlakePipeline, self).__init__(*args, **kwargs)
|
| + self.manually_triggered = kwargs.get('manually_triggered', False)
|
| +
|
| + def StartOffPSTPeakHours(self, *args, **kwargs):
|
| + """Starts the pipeline off PST peak hours if not triggered manually."""
|
| + kwargs['eta'] = _GetETAToStartAnalysis(self.manually_triggered)
|
| + self.start(*args, **kwargs)
|
| +
|
| # Arguments number differs from overridden method - pylint: disable=W0221
|
| def run(self, master_name, builder_name, run_build_number, step_name,
|
| test_name, version_number, master_build_number,
|
| - flakiness_algorithm_results_dict, queue_name=constants.DEFAULT_QUEUE):
|
| + flakiness_algorithm_results_dict, manually_triggered=False):
|
| """Pipeline to determine the regression range of a flaky test.
|
|
|
| Args:
|
| @@ -47,7 +95,8 @@ class RecursiveFlakePipeline(BasePipeline):
|
| master_build_number (int): The build number of the Master_Flake_analysis.
|
| flakiness_algorithm_results_dict (dict): A dictionary used by
|
| NextBuildNumberPipeline
|
| - queue_name (str): Which queue to run on.
|
| + manually_triggered (bool): True if the analysis is from manual request,
|
| + like by a Chromium sheriff.
|
|
|
| Returns:
|
| A dict of lists for reliable/flaky tests.
|
| @@ -74,8 +123,8 @@ class RecursiveFlakePipeline(BasePipeline):
|
| step_name, task_id, master_build_number, test_name, version_number)
|
| yield NextBuildNumberPipeline(
|
| master_name, builder_name, master_build_number, run_build_number,
|
| - step_name, test_name, version_number, test_result_future, queue_name,
|
| - flakiness_algorithm_results_dict)
|
| + step_name, test_name, version_number, test_result_future,
|
| + flakiness_algorithm_results_dict, manually_triggered=manually_triggered)
|
|
|
|
|
| def get_next_run(master_flake_analysis, flakiness_algorithm_results_dict):
|
| @@ -178,7 +227,8 @@ class NextBuildNumberPipeline(BasePipeline):
|
| # Unused argument - pylint: disable=W0613
|
| def run(self, master_name, builder_name, master_build_number,
|
| run_build_number, step_name, test_name, version_number,
|
| - test_result_future, queue_name, flakiness_algorithm_results_dict):
|
| + test_result_future, flakiness_algorithm_results_dict,
|
| + manually_triggered=False):
|
|
|
| # Get MasterFlakeAnalysis success list corresponding to parameters.
|
| master_flake_analysis = MasterFlakeAnalysis.GetVersion(
|
| @@ -217,11 +267,13 @@ class NextBuildNumberPipeline(BasePipeline):
|
| pipeline_job = RecursiveFlakePipeline(
|
| master_name, builder_name, next_run, step_name, test_name,
|
| version_number, master_build_number,
|
| - flakiness_algorithm_results_dict=flakiness_algorithm_results_dict)
|
| + flakiness_algorithm_results_dict=flakiness_algorithm_results_dict,
|
| + manually_triggered=manually_triggered)
|
| # pylint: disable=W0201
|
| pipeline_job.target = appengine_util.GetTargetNameForModule(
|
| constants.WATERFALL_BACKEND)
|
| - pipeline_job.start(queue_name=queue_name)
|
| + pipeline_job.StartOffPSTPeakHours(
|
| + queue_name=self.queue_name or constants.DEFAULT_QUEUE)
|
| else:
|
| _UpdateAnalysisStatusUponCompletion(
|
| master_flake_analysis, analysis_status.COMPLETED, None)
|
|
|