Chromium Code Reviews| Index: appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py |
| diff --git a/appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py b/appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py |
| index e493145670b4608e36b2180bb4624f6f9cf71dc5..938112fc66adc80699093b825de6f2435ef09dd7 100644 |
| --- a/appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py |
| +++ b/appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py |
| @@ -2,6 +2,7 @@ |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| +from collections import defaultdict |
| import datetime |
| import logging |
| import time |
| @@ -22,24 +23,27 @@ class ProcessBaseSwarmingTaskResultPipeline(BasePipeline): |
| HTTP_CLIENT = HttpClient() |
| - def _CheckTestsRunStatuses(self, output_json): |
| - # Checks result status for each test run and saves the numbers accordingly. |
| - # Should be overridden by subclass. |
| - raise NotImplementedError( |
| - '_CheckTestsRunStatuses should be implemented in the child class') |
| + def _CheckTestsRunStatuses(self, output_json, *_): |
| + """Checks result status for each test run and saves the numbers accordingly. |
| - def _ConvertDateTime(self, time_string): |
| - """Convert UTC time string to datetime.datetime.""" |
| - # Match the time conversion with swarming.py which elides the suffix |
| - # when microseconds are 0. |
| - if not time_string: |
| - return None |
| - for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'): |
| - try: |
| - return datetime.datetime.strptime(time_string, fmt) |
| - except ValueError: |
| - pass |
| - raise ValueError('Failed to parse %s' % time_string) # pragma: no cover |
| + Args: |
| + output_json (dict): A dict of all test results in the swarming task. |
| + |
| + Returns: |
| + tests_statuses (dict): A dict of different statuses for each test. |
| + |
| + Currently for each test, we are saving number of total runs, |
| + number of succeeded runs and number of failed runs. |
| + """ |
| + tests_statuses = defaultdict(lambda: defaultdict(int)) |
| + if output_json: |
| + for iteration in output_json.get('per_iteration_data'): |
| + for test_name, tests in iteration.iteritems(): |
| + tests_statuses[test_name]['total_run'] += len(tests) |
| + for test in tests: |
| + tests_statuses[test_name][test['status']] += 1 |
| + |
| + return tests_statuses |
| def _GetSwarmingTask(self): |
| # Get the appropriate kind of Swarming Task (Wf or Flake). |
| @@ -53,22 +57,20 @@ class ProcessBaseSwarmingTaskResultPipeline(BasePipeline): |
| raise NotImplementedError( |
| '_GetArgs should be implemented in the child class') |
| - # Arguments number differs from overridden method - pylint: disable=W0221 |
| - def run(self, master_name, builder_name, build_number, |
| - step_name, task_id, *args): # pragma: no cover. |
| - """ |
| - Args: |
| - master_name (str): The master name. |
| - builder_name (str): The builder name. |
| - build_number (str): The build number. |
| - step_name (str): The failed test step name. |
| - task_id (str): Id for the swarming task which is triggered by Findit. |
| + def _ConvertDateTime(self, time_string): |
| + """Convert UTC time string to datetime.datetime.""" |
| + if not time_string: |
| + return None |
| + for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'): |
| + # When microseconds are 0, the '.123456' suffix is elided. |
| + try: |
| + return datetime.datetime.strptime(time_string, fmt) |
| + except ValueError: |
| + pass |
| + raise ValueError('Failed to parse %s' % time_string) # pragma: no cover |
| - Returns: |
| - A dict of lists for reliable/flaky tests. |
| - """ |
| - call_args = self._GetArgs(master_name, builder_name, build_number, |
| - step_name, *args) |
| + def _MonitorSwarmingTask(self, task_id, *call_args): |
| + """Monitors the swarming task and waits for it to complete.""" |
| assert task_id |
| timeout_hours = waterfall_config.GetSwarmingSettings().get( |
| 'task_timeout_hours') |
| @@ -79,61 +81,117 @@ class ProcessBaseSwarmingTaskResultPipeline(BasePipeline): |
| task_completed = False |
| tests_statuses = {} |
| step_name_no_platform = None |
| + task = self._GetSwarmingTask(*call_args) |
| while not task_completed: |
| - # Keeps monitoring the swarming task, waits for it to complete. |
| - data = swarming_util.GetSwarmingTaskResultById( |
| + data, error = swarming_util.GetSwarmingTaskResultById( |
| task_id, self.HTTP_CLIENT) |
| + |
| + if error: |
| + # An error occurred when trying to contact the swarming server. |
| + logging.error(error.get('message')) |
|
chanli
2016/11/11 00:05:13
In swarming_util._SendRequestToServer the error me
lijeffrey
2016/11/11 20:55:41
Done.
|
| + task.status = analysis_status.ERROR |
| + task.error = error |
| + task.put() |
| + break |
| + |
| task_state = data['state'] |
| - exit_code = (data['exit_code'] if |
| + exit_code = (data.get('exit_code') if |
| task_state == swarming_util.STATE_COMPLETED else None) |
| - step_name_no_platform = swarming_util.GetTagValue( |
| - data.get('tags', {}), 'ref_name') |
| + step_name_no_platform = ( |
| + step_name_no_platform or swarming_util.GetTagValue( |
| + data.get('tags', {}), 'ref_name')) |
| + |
| if task_state not in swarming_util.STATES_RUNNING: |
| task_completed = True |
| - task = self._GetSwarmingTask(*call_args) |
| + |
| if (task_state == swarming_util.STATE_COMPLETED and |
| int(exit_code) != swarming_util.TASK_FAILED): |
| outputs_ref = data.get('outputs_ref') |
| - output_json = swarming_util.GetSwarmingTaskFailureLog( |
| + output_json, error = swarming_util.GetSwarmingTaskFailureLog( |
| outputs_ref, self.HTTP_CLIENT) |
| - tests_statuses = self._CheckTestsRunStatuses( |
| - output_json, *call_args) |
| - task.status = analysis_status.COMPLETED |
| + |
| + if error: |
| + logging.error(error.get('message')) |
|
chanli
2016/11/11 00:05:13
Same here
lijeffrey
2016/11/11 20:55:41
Done.
|
| + task.status = analysis_status.ERROR |
| + task.error = error |
| + else: |
| + task.status = analysis_status.COMPLETED |
| + |
| + tests_statuses = self._CheckTestsRunStatuses(output_json, *call_args) |
| task.tests_statuses = tests_statuses |
| + task.put() |
| else: |
| + code = int(exit_code) if exit_code is not None else ( |
| + swarming_util.UNKNOWN) |
|
chanli
2016/11/11 00:05:13
I think UNKNOWN is misleading here. If task_state
|
| task.status = analysis_status.ERROR |
| + task.error = { |
| + 'code': code, |
| + 'message': swarming_util.EXIT_CODE_DESCRIPTIONS[code] |
| + } |
| + task.put() |
| + |
| logging_str = 'Swarming task stopped with status: %s' % task_state |
| - if exit_code: |
| + if exit_code: # pragma: no cover |
| logging_str += ' and exit_code: %s - %s' % ( |
| - exit_code, swarming_util.EXIT_CODE_DESCRIPTIONS[int(exit_code)]) |
| + exit_code, swarming_util.EXIT_CODE_DESCRIPTIONS[code]) |
| logging.error(logging_str) |
| - priority_str = swarming_util.GetTagValue( |
| - data.get('tags', {}), 'priority') |
| + tags = data.get('tags', {}) |
| + priority_str = swarming_util.GetTagValue(tags, 'priority') |
| if priority_str: |
| task.parameters['priority'] = int(priority_str) |
| + |
| task.put() |
| else: # pragma: no cover |
| if task_state == 'RUNNING' and not task_started: |
| # swarming task just starts, update status. |
| task_started = True |
| - task = self._GetSwarmingTask(*call_args) |
| task.status = analysis_status.RUNNING |
| task.put() |
| time.sleep(server_query_interval_seconds) |
| + |
| + # Timeout. |
| if time.time() > deadline: |
| # Updates status as ERROR. |
| - task = self._GetSwarmingTask(*call_args) |
| task.status = analysis_status.ERROR |
| + task.error = { |
| + 'code': swarming_util.TIMED_OUT, |
| + 'message': 'Process swarming task result timed out' |
| + } |
| task.put() |
| logging.error('Swarming task timed out after %d hours.' % timeout_hours) |
| break # Stops the loop and return. |
| + |
| # Update swarming task metadata. |
| - task = self._GetSwarmingTask(*call_args) |
| - task.created_time = self._ConvertDateTime(data.get('created_ts')) |
| - task.started_time = self._ConvertDateTime(data.get('started_ts')) |
| - task.completed_time = self._ConvertDateTime(data.get('completed_ts')) |
| + task.created_time = (task.created_time or |
| + self._ConvertDateTime(data.get('created_ts'))) |
| + task.started_time = (task.started_time or |
| + self._ConvertDateTime(data.get('started_ts'))) |
| + task.completed_time = (task.completed_time or |
| + self._ConvertDateTime(data.get('completed_ts'))) |
| task.put() |
| + return step_name_no_platform |
| + |
| + # Arguments number differs from overridden method - pylint: disable=W0221 |
| + def run(self, master_name, builder_name, build_number, step_name, task_id, |
| + *args): |
| + """Monitors a swarming task. |
| + |
| + Args: |
| + master_name (str): The master name. |
| + builder_name (str): The builder name. |
| + build_number (str): The build number. |
| + step_name (str): The failed test step name. |
| + task_id (str): The task id to query the swarming server on the progresss |
| + of a swarming task. |
| + |
| + Returns: |
| + A dict of lists for reliable/flaky tests. |
| + """ |
| + call_args = self._GetArgs(master_name, builder_name, build_number, |
| + step_name, *args) |
| + step_name_no_platform = self._MonitorSwarmingTask(task_id, *call_args) |
| return step_name, step_name_no_platform |
| + |