Chromium Code Reviews| Index: appengine/findit/waterfall/flake/recursive_flake_pipeline.py |
| diff --git a/appengine/findit/waterfall/flake/recursive_flake_pipeline.py b/appengine/findit/waterfall/flake/recursive_flake_pipeline.py |
| index 4b1e0efa45d8f7963ca97d3ab60f5f20b76d4129..5928b9b9c70a6e49013881b379b909c0ae7ad3ad 100644 |
| --- a/appengine/findit/waterfall/flake/recursive_flake_pipeline.py |
| +++ b/appengine/findit/waterfall/flake/recursive_flake_pipeline.py |
| @@ -2,8 +2,12 @@ |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| +from datetime import timedelta |
| +import random |
| + |
| from common import appengine_util |
| from common import constants |
| +from common import time_util |
| from common.pipeline_wrapper import BasePipeline |
| from model import analysis_status |
| @@ -16,12 +20,58 @@ from waterfall.trigger_flake_swarming_task_pipeline import ( |
| TriggerFlakeSwarmingTaskPipeline) |
| +def _GetETAToStartAnalysis(manually_triggered): |
| + """Returns an ETA as of a UTC datetime.datetime to start the analysis. |
| + |
| + If not urgent, Swarming tasks should be run off PST peak hours from 11am to |
| + 6pm on workdays. |
| + |
| + Args: |
| + manually_triggered (bool): True if the analysis is from manual request, like |
| + by a Chromium sheriff. |
| + |
| + Returns: |
| + The ETA as of a UTC datetime.datetime to start the analysis. |
| + """ |
| + if manually_triggered: |
| + # If the analysis is manually triggered, run it right away. |
| + return time_util.GetUTCNow() |
| + |
| + now_at_mtv = time_util.GetDatetimeInTimezone( |
| + 'US/Pacific', time_util.GetUTCNowWithTimezone()) |
| + if now_at_mtv.weekday() >= 5: # PST Saturday or Sunday. |
| + return time_util.GetUTCNow() |
| + |
| + if now_at_mtv.hour < 11 or now_at_mtv.hour >= 18: # Before 11am or after 6pm. |
| + return time_util.GetUTCNow() |
| + |
| + # Set ETA time to 6pm, with a random latency within 30 minutes to avoid sudden |
| + # burst traffic to Swarming. |
| + diff = timedelta(hours=18 - now_at_mtv.hour, |
|
lijeffrey
2016/09/27 23:48:05
nit: since diff is actually a random latency, why
stgao
2016/10/01 05:55:25
It is not exact a random latency. It is a diff + a
|
| + minutes=-now_at_mtv.minute, |
| + seconds=-now_at_mtv.second + random.randint(0, 30 * 60), |
| + microseconds=-now_at_mtv.microsecond) |
| + eta = now_at_mtv + diff |
| + |
| + # Convert back to UTC. |
| + return time_util.GetDatetimeInTimezone('UTC', eta) |
| + |
| + |
| class RecursiveFlakePipeline(BasePipeline): |
| + def __init__(self, *args, **kwargs): |
| + super(RecursiveFlakePipeline, self).__init__(*args, **kwargs) |
| + self.manually_triggered = kwargs.get('manually_triggered', False) |
| + |
| + def StartOffPSTPeakHours(self, *args, **kwargs): |
| + """Starts the pipeline off PST peak hours if not triggered manually.""" |
| + kwargs['eta'] = _GetETAToStartAnalysis(self.manually_triggered) |
| + self.start(*args, **kwargs) |
| + |
| # Arguments number differs from overridden method - pylint: disable=W0221 |
| def run(self, master_name, builder_name, run_build_number, step_name, |
| test_name, master_build_number, flakiness_algorithm_results_dict, |
| - queue_name=constants.DEFAULT_QUEUE): |
| + manually_triggered=False): |
| """Pipeline to determine the regression range of a flaky test. |
| Args: |
| @@ -33,7 +83,8 @@ class RecursiveFlakePipeline(BasePipeline): |
| master_build_number (int): The build number of the Master_Flake_analysis. |
| flakiness_algorithm_results_dict (dict): A dictionary used by |
| NextBuildNumberPipeline |
| - queue_name (str): Which queue to run on. |
| + manually_triggered (bool): True if the analysis is from manual request, |
| + like by a Chromium sheriff. |
| Returns: |
| A dict of lists for reliable/flaky tests. |
| @@ -53,8 +104,8 @@ class RecursiveFlakePipeline(BasePipeline): |
| step_name, task_id, master_build_number, test_name) |
| yield NextBuildNumberPipeline( |
| master_name, builder_name, master_build_number, run_build_number, |
| - step_name, test_name, test_result_future, queue_name, |
| - flakiness_algorithm_results_dict) |
| + step_name, test_name, test_result_future, |
| + flakiness_algorithm_results_dict, manually_triggered=manually_triggered) |
| def get_next_run(master, flakiness_algorithm_results_dict): |
| @@ -156,7 +207,7 @@ class NextBuildNumberPipeline(BasePipeline): |
| # Unused argument - pylint: disable=W0613 |
| def run(self, master_name, builder_name, master_build_number, |
| run_build_number, step_name, test_name, test_result_future, |
| - queue_name, flakiness_algorithm_results_dict): |
| + flakiness_algorithm_results_dict, manually_triggered=False): |
| # Get MasterFlakeAnalysis success list corresponding to parameters. |
| master = MasterFlakeAnalysis.Get(master_name, builder_name, |
| @@ -186,11 +237,13 @@ class NextBuildNumberPipeline(BasePipeline): |
| pipeline_job = RecursiveFlakePipeline( |
| master_name, builder_name, next_run, step_name, test_name, |
| master_build_number, |
| - flakiness_algorithm_results_dict=flakiness_algorithm_results_dict) |
| + flakiness_algorithm_results_dict=flakiness_algorithm_results_dict, |
| + manually_triggered=manually_triggered) |
| # pylint: disable=W0201 |
| pipeline_job.target = appengine_util.GetTargetNameForModule( |
| constants.WATERFALL_BACKEND) |
| - pipeline_job.start(queue_name=queue_name) |
| + pipeline_job.StartOffPSTPeakHours( |
| + queue_name=self.queue_name or constants.DEFAULT_QUEUE) |
| else: |
| master.status = analysis_status.COMPLETED |
| master.put() |