| Index: appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py
|
| diff --git a/appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py b/appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py
|
| index e493145670b4608e36b2180bb4624f6f9cf71dc5..ed7bf862345cf9a8f350dfac8009dff83ef335a7 100644
|
| --- a/appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py
|
| +++ b/appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py
|
| @@ -2,6 +2,7 @@
|
| # Use of this source code is governed by a BSD-style license that can be
|
| # found in the LICENSE file.
|
|
|
| +from collections import defaultdict
|
| import datetime
|
| import logging
|
| import time
|
| @@ -22,24 +23,27 @@ class ProcessBaseSwarmingTaskResultPipeline(BasePipeline):
|
|
|
| HTTP_CLIENT = HttpClient()
|
|
|
| - def _CheckTestsRunStatuses(self, output_json):
|
| - # Checks result status for each test run and saves the numbers accordingly.
|
| - # Should be overridden by subclass.
|
| - raise NotImplementedError(
|
| - '_CheckTestsRunStatuses should be implemented in the child class')
|
| + def _CheckTestsRunStatuses(self, output_json, *_):
|
| + """Checks result status for each test run and saves the numbers accordingly.
|
|
|
| - def _ConvertDateTime(self, time_string):
|
| - """Convert UTC time string to datetime.datetime."""
|
| - # Match the time conversion with swarming.py which elides the suffix
|
| - # when microseconds are 0.
|
| - if not time_string:
|
| - return None
|
| - for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'):
|
| - try:
|
| - return datetime.datetime.strptime(time_string, fmt)
|
| - except ValueError:
|
| - pass
|
| - raise ValueError('Failed to parse %s' % time_string) # pragma: no cover
|
| + Args:
|
| + output_json (dict): A dict of all test results in the swarming task.
|
| +
|
| + Returns:
|
| + tests_statuses (dict): A dict of different statuses for each test.
|
| +
|
| + Currently for each test, we are saving number of total runs,
|
| + number of succeeded runs and number of failed runs.
|
| + """
|
| + tests_statuses = defaultdict(lambda: defaultdict(int))
|
| + if output_json:
|
| + for iteration in output_json.get('per_iteration_data'):
|
| + for test_name, tests in iteration.iteritems():
|
| + tests_statuses[test_name]['total_run'] += len(tests)
|
| + for test in tests:
|
| + tests_statuses[test_name][test['status']] += 1
|
| +
|
| + return tests_statuses
|
|
|
| def _GetSwarmingTask(self):
|
| # Get the appropriate kind of Swarming Task (Wf or Flake).
|
| @@ -53,87 +57,108 @@ class ProcessBaseSwarmingTaskResultPipeline(BasePipeline):
|
| raise NotImplementedError(
|
| '_GetArgs should be implemented in the child class')
|
|
|
| - # Arguments number differs from overridden method - pylint: disable=W0221
|
| - def run(self, master_name, builder_name, build_number,
|
| - step_name, task_id, *args): # pragma: no cover.
|
| - """
|
| - Args:
|
| - master_name (str): The master name.
|
| - builder_name (str): The builder name.
|
| - build_number (str): The build number.
|
| - step_name (str): The failed test step name.
|
| - task_id (str): Id for the swarming task which is triggered by Findit.
|
| + def _ConvertDateTime(self, time_string):
|
| + """Convert UTC time string to datetime.datetime."""
|
| + if not time_string:
|
| + return None
|
| + for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'):
|
| + # When microseconds are 0, the '.123456' suffix is elided.
|
| + try:
|
| + return datetime.datetime.strptime(time_string, fmt)
|
| + except ValueError:
|
| + pass
|
| + raise ValueError('Failed to parse %s' % time_string) # pragma: no cover
|
|
|
| - Returns:
|
| - A dict of lists for reliable/flaky tests.
|
| - """
|
| - call_args = self._GetArgs(master_name, builder_name, build_number,
|
| - step_name, *args)
|
| - assert task_id
|
| + def _MonitorSwarmingTask(self, task_id, *call_args):
|
| + """Monitors the swarming task and waits for it to complete."""
|
| timeout_hours = waterfall_config.GetSwarmingSettings().get(
|
| 'task_timeout_hours')
|
| deadline = time.time() + timeout_hours * 60 * 60
|
| server_query_interval_seconds = waterfall_config.GetSwarmingSettings().get(
|
| 'server_query_interval_seconds')
|
| +
|
| task_started = False
|
| task_completed = False
|
| tests_statuses = {}
|
| step_name_no_platform = None
|
| + task = self._GetSwarmingTask(*call_args)
|
|
|
| while not task_completed:
|
| - # Keeps monitoring the swarming task, waits for it to complete.
|
| - data = swarming_util.GetSwarmingTaskResultById(
|
| - task_id, self.HTTP_CLIENT)
|
| + data = swarming_util.GetSwarmingTaskResultById(task_id, self.HTTP_CLIENT)
|
| task_state = data['state']
|
| - exit_code = (data['exit_code'] if
|
| + exit_code = (data.get('exit_code') if
|
| task_state == swarming_util.STATE_COMPLETED else None)
|
| - step_name_no_platform = swarming_util.GetTagValue(
|
| - data.get('tags', {}), 'ref_name')
|
| + step_name_no_platform = (
|
| + step_name_no_platform or swarming_util.GetTagValue(
|
| + data.get('tags', {}), 'ref_name'))
|
| +
|
| if task_state not in swarming_util.STATES_RUNNING:
|
| task_completed = True
|
| - task = self._GetSwarmingTask(*call_args)
|
| +
|
| if (task_state == swarming_util.STATE_COMPLETED and
|
| int(exit_code) != swarming_util.TASK_FAILED):
|
| outputs_ref = data.get('outputs_ref')
|
| output_json = swarming_util.GetSwarmingTaskFailureLog(
|
| outputs_ref, self.HTTP_CLIENT)
|
| - tests_statuses = self._CheckTestsRunStatuses(
|
| - output_json, *call_args)
|
| + tests_statuses = self._CheckTestsRunStatuses(output_json, *call_args)
|
| task.status = analysis_status.COMPLETED
|
| task.tests_statuses = tests_statuses
|
| else:
|
| task.status = analysis_status.ERROR
|
| logging_str = 'Swarming task stopped with status: %s' % task_state
|
| - if exit_code:
|
| + if exit_code: # pragma: no cover
|
| logging_str += ' and exit_code: %s - %s' % (
|
| exit_code, swarming_util.EXIT_CODE_DESCRIPTIONS[int(exit_code)])
|
| logging.error(logging_str)
|
|
|
| - priority_str = swarming_util.GetTagValue(
|
| - data.get('tags', {}), 'priority')
|
| + tags = data.get('tags', {})
|
| + priority_str = swarming_util.GetTagValue(tags, 'priority')
|
| if priority_str:
|
| task.parameters['priority'] = int(priority_str)
|
| +
|
| task.put()
|
| else: # pragma: no cover
|
| if task_state == 'RUNNING' and not task_started:
|
| # swarming task just starts, update status.
|
| task_started = True
|
| - task = self._GetSwarmingTask(*call_args)
|
| task.status = analysis_status.RUNNING
|
| task.put()
|
| time.sleep(server_query_interval_seconds)
|
| +
|
| + # Timeout.
|
| if time.time() > deadline:
|
| # Updates status as ERROR.
|
| - task = self._GetSwarmingTask(*call_args)
|
| task.status = analysis_status.ERROR
|
| task.put()
|
| logging.error('Swarming task timed out after %d hours.' % timeout_hours)
|
| break # Stops the loop and return.
|
| - # Update swarming task metadata.
|
| - task = self._GetSwarmingTask(*call_args)
|
| +
|
| + # Update swarming task metadata timestamps.
|
| task.created_time = self._ConvertDateTime(data.get('created_ts'))
|
| task.started_time = self._ConvertDateTime(data.get('started_ts'))
|
| task.completed_time = self._ConvertDateTime(data.get('completed_ts'))
|
| task.put()
|
|
|
| + return step_name_no_platform
|
| +
|
| + # Arguments number differs from overridden method - pylint: disable=W0221
|
| + def run(self, master_name, builder_name, build_number, step_name, task_id,
|
| + *args):
|
| + """Monitors a swarming task.
|
| +
|
| + Args:
|
| + master_name (str): The master name.
|
| + builder_name (str): The builder name.
|
| + build_number (str): The build number.
|
| + step_name (str): The failed test step name.
|
| + task_id (str): The task id to query the swarming server on the progresss
|
| + of a swarming task.
|
| +
|
| + Returns:
|
| + A dict of lists for reliable/flaky tests.
|
| + """
|
| + call_args = self._GetArgs(master_name, builder_name, build_number,
|
| + step_name, *args)
|
| + step_name_no_platform = self._MonitorSwarmingTask(task_id, *call_args)
|
| return step_name, step_name_no_platform
|
| +
|
|
|