| OLD | NEW |
| 1 # Copyright 2016 The Chromium Authors. All rights reserved. | 1 # Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 from collections import defaultdict | 5 from collections import defaultdict |
| 6 import datetime | 6 import datetime |
| 7 import logging | 7 import logging |
| 8 import time | 8 import time |
| 9 | 9 |
| 10 from common.http_client_appengine import HttpClientAppengine as HttpClient | 10 from common.http_client_appengine import HttpClientAppengine as HttpClient |
| 11 from common.pipeline_wrapper import BasePipeline | 11 from common.pipeline_wrapper import BasePipeline |
| 12 from model import analysis_status | 12 from model import analysis_status |
| 13 from model.wf_swarming_task import WfSwarmingTask | 13 from model.wf_swarming_task import WfSwarmingTask |
| 14 from waterfall import swarming_util | 14 from waterfall import swarming_util |
| 15 from waterfall import waterfall_config | 15 from waterfall import waterfall_config |
| 16 | 16 |
| 17 | 17 |
| 18 def _CheckTestsRunStatuses(output_json): | 18 class ProcessBaseSwarmingTaskResultPipeline(BasePipeline): |
| 19 """Checks result status for each test run and saves the numbers accordingly. | |
| 20 | |
| 21 Args: | |
| 22 output_json (dict): A dict of all test results in the swarming task. | |
| 23 | |
| 24 Returns: | |
| 25 tests_statuses (dict): A dict of different statuses for each test. | |
| 26 | |
| 27 Currently for each test, we are saving number of total runs, | |
| 28 number of succeeded runs and number of failed runs. | |
| 29 """ | |
| 30 tests_statuses = defaultdict(lambda: defaultdict(int)) | |
| 31 if output_json: | |
| 32 for iteration in output_json.get('per_iteration_data'): | |
| 33 for test_name, tests in iteration.iteritems(): | |
| 34 tests_statuses[test_name]['total_run'] += len(tests) | |
| 35 for test in tests: | |
| 36 tests_statuses[test_name][test['status']] += 1 | |
| 37 | |
| 38 return tests_statuses | |
| 39 | |
| 40 | |
| 41 def _ConvertDateTime(time_string): | |
| 42 """Convert UTC time string to datetime.datetime.""" | |
| 43 # Match the time convertion with swarming.py. | |
| 44 # According to swarming.py, | |
| 45 # when microseconds are 0, the '.123456' suffix is elided. | |
| 46 if not time_string: | |
| 47 return None | |
| 48 for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'): | |
| 49 try: | |
| 50 return datetime.datetime.strptime(time_string, fmt) | |
| 51 except ValueError: | |
| 52 pass | |
| 53 raise ValueError('Failed to parse %s' % time_string) # pragma: no cover | |
| 54 | |
| 55 | |
| 56 class ProcessSwarmingTaskResultPipeline(BasePipeline): | |
| 57 """A pipeline for monitoring swarming task and processing task result. | 19 """A pipeline for monitoring swarming task and processing task result. |
| 58 | 20 |
| 59 This pipeline waits for result for a swarming task and processes the result to | 21 This pipeline waits for result for a swarming task and processes the result to |
| 60 generate a dict for statuses for each test run. | 22 generate a dict for statuses for each test run. |
| 61 """ | 23 """ |
| 62 | 24 |
| 63 HTTP_CLIENT = HttpClient() | 25 HTTP_CLIENT = HttpClient() |
| 26 |
| 27 def _CheckTestsRunStatuses(self, output_json): |
| 28 # Checks result status for each test run and saves the numbers accordingly. |
| 29 # Should be overridden by subclass. |
| 30 raise NotImplementedError( |
| 31 '_CheckTestsRunStatuses should be implemented in the child class') |
| 32 |
| 33 def _ConvertDateTime(self, time_string): |
| 34 """Convert UTC time string to datetime.datetime.""" |
| 35 # Match the time conversion with swarming.py which elides the suffix |
| 36 # when microseconds are 0. |
| 37 if not time_string: |
| 38 return None |
| 39 for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'): |
| 40 try: |
| 41 return datetime.datetime.strptime(time_string, fmt) |
| 42 except ValueError: |
| 43 pass |
| 44 raise ValueError('Failed to parse %s' % time_string) # pragma: no cover |
| 45 |
| 46 def _GetSwarmingTask(self): |
| 47 # Get the appropriate kind of Swarming Task (Wf or Flake). |
| 48 # Should be overwritten by subclass. |
| 49 raise NotImplementedError( |
| 50 '_GetSwarmingTask should be implemented in the child class') |
| 51 |
| 52 def _GetArgs(self): |
| 53 # Return list of arguments to call _CheckTestsRunStatuses with - output_json |
| 54 # Should be overwritten by subclass. |
| 55 raise NotImplementedError( |
| 56 '_GetArgs should be implemented in the child class') |
| 57 |
| 64 # Arguments number differs from overridden method - pylint: disable=W0221 | 58 # Arguments number differs from overridden method - pylint: disable=W0221 |
| 65 def run(self, master_name, builder_name, build_number, step_name): | 59 def run(self, master_name, builder_name, build_number, |
| 60 step_name, task_id, *args): #pragma: no cover. |
| 66 """ | 61 """ |
| 67 Args: | 62 Args: |
| 68 master_name (str): The master name. | 63 master_name (str): The master name. |
| 69 builder_name (str): The builder name. | 64 builder_name (str): The builder name. |
| 70 build_number (str): The build number. | 65 build_number (str): The build number. |
| 71 step_name (str): The failed test step name. | 66 step_name (str): The failed test step name. |
| 67 task_id (str): Id for the swarming task which is triggered by Findit. |
| 72 | 68 |
| 73 Returns: | 69 Returns: |
| 74 A dict of lists for reliable/flaky tests. | 70 A dict of lists for reliable/flaky tests. |
| 75 """ | 71 """ |
| 76 | 72 call_args = self._GetArgs(master_name, builder_name, build_number, |
| 73 step_name, *args) |
| 74 assert task_id |
| 77 timeout_hours = waterfall_config.GetSwarmingSettings().get( | 75 timeout_hours = waterfall_config.GetSwarmingSettings().get( |
| 78 'task_timeout_hours') | 76 'task_timeout_hours') |
| 79 deadline = time.time() + timeout_hours * 60 * 60 | 77 deadline = time.time() + timeout_hours * 60 * 60 |
| 80 server_query_interval_seconds = waterfall_config.GetSwarmingSettings().get( | 78 server_query_interval_seconds = waterfall_config.GetSwarmingSettings().get( |
| 81 'server_query_interval_seconds') | 79 'server_query_interval_seconds') |
| 82 | |
| 83 task_started = False | 80 task_started = False |
| 84 task_completed = False | 81 task_completed = False |
| 85 tests_statuses = {} | 82 tests_statuses = {} |
| 86 step_name_no_platform = None | 83 step_name_no_platform = None |
| 87 | 84 |
| 88 task = WfSwarmingTask.Get( | |
| 89 master_name, builder_name, build_number, step_name) | |
| 90 task_id = task.task_id | |
| 91 while not task_completed: | 85 while not task_completed: |
| 92 # Keeps monitoring the swarming task, waits for it to complete. | 86 # Keeps monitoring the swarming task, waits for it to complete. |
| 93 data = swarming_util.GetSwarmingTaskResultById( | 87 data = swarming_util.GetSwarmingTaskResultById( |
| 94 task_id, self.HTTP_CLIENT) | 88 task_id, self.HTTP_CLIENT) |
| 95 task_state = data['state'] | 89 task_state = data['state'] |
| 96 step_name_no_platform = swarming_util.GetTagValue( | 90 step_name_no_platform = swarming_util.GetTagValue( |
| 97 data.get('tags', {}), 'ref_name') | 91 data.get('tags', {}), 'ref_name') |
| 98 if task_state not in swarming_util.STATES_RUNNING: | 92 if task_state not in swarming_util.STATES_RUNNING: |
| 99 task_completed = True | 93 task_completed = True |
| 100 task = WfSwarmingTask.Get( | 94 task = self._GetSwarmingTask(*call_args) |
| 101 master_name, builder_name, build_number, step_name) | |
| 102 if task_state == swarming_util.STATE_COMPLETED: | 95 if task_state == swarming_util.STATE_COMPLETED: |
| 103 outputs_ref = data.get('outputs_ref') | 96 outputs_ref = data.get('outputs_ref') |
| 104 output_json = swarming_util.GetSwarmingTaskFailureLog( | 97 output_json = swarming_util.GetSwarmingTaskFailureLog( |
| 105 outputs_ref, self.HTTP_CLIENT) | 98 outputs_ref, self.HTTP_CLIENT) |
| 106 tests_statuses = _CheckTestsRunStatuses(output_json) | 99 tests_statuses = self._CheckTestsRunStatuses( |
| 107 | 100 output_json, *call_args) |
| 108 task.status = analysis_status.COMPLETED | 101 task.status = analysis_status.COMPLETED |
| 109 task.tests_statuses = tests_statuses | 102 task.tests_statuses = tests_statuses |
| 110 else: | 103 else: |
| 111 task.status = analysis_status.ERROR | 104 task.status = analysis_status.ERROR |
| 112 logging.error('Swarming task stopped with status: %s' % ( | 105 logging.error('Swarming task stopped with status: %s' % ( |
| 113 task_state)) | 106 task_state)) |
| 114 priority_str = swarming_util.GetTagValue( | 107 priority_str = swarming_util.GetTagValue( |
| 115 data.get('tags', {}), 'priority') | 108 data.get('tags', {}), 'priority') |
| 116 if priority_str: | 109 if priority_str: |
| 117 task.parameters['priority'] = int(priority_str) | 110 task.parameters['priority'] = int(priority_str) |
| 118 task.put() | 111 task.put() |
| 119 else: # pragma: no cover | 112 else: # pragma: no cover |
| 120 if task_state == 'RUNNING' and not task_started: | 113 if task_state == 'RUNNING' and not task_started: |
| 121 # swarming task just starts, update status. | 114 # swarming task just starts, update status. |
| 122 task_started = True | 115 task_started = True |
| 123 task = WfSwarmingTask.Get( | 116 task = self._GetSwarmingTask(*call_args) |
| 124 master_name, builder_name, build_number, step_name) | |
| 125 task.status = analysis_status.RUNNING | 117 task.status = analysis_status.RUNNING |
| 126 task.put() | 118 task.put() |
| 127 | |
| 128 time.sleep(server_query_interval_seconds) | 119 time.sleep(server_query_interval_seconds) |
| 129 | |
| 130 if time.time() > deadline: | 120 if time.time() > deadline: |
| 131 # Updates status as ERROR. | 121 # Updates status as ERROR. |
| 132 task = WfSwarmingTask.Get( | 122 task = self._GetSwarmingTask(*call_args) |
| 133 master_name, builder_name, build_number, step_name) | |
| 134 task.status = analysis_status.ERROR | 123 task.status = analysis_status.ERROR |
| 135 task.put() | 124 task.put() |
| 136 logging.error('Swarming task timed out after %d hours.' % timeout_hours) | 125 logging.error('Swarming task timed out after %d hours.' % timeout_hours) |
| 137 break # Stops the loop and return. | 126 break # Stops the loop and return. |
| 138 | |
| 139 # Update swarming task metadate. | 127 # Update swarming task metadate. |
| 140 task = WfSwarmingTask.Get( | 128 task = self._GetSwarmingTask(*call_args) |
| 141 master_name, builder_name, build_number, step_name) | 129 task.created_time = self._ConvertDateTime(data.get('created_ts')) |
| 142 task.created_time = _ConvertDateTime(data.get('created_ts')) | 130 task.started_time = self._ConvertDateTime(data.get('started_ts')) |
| 143 task.started_time = _ConvertDateTime(data.get('started_ts')) | 131 task.completed_time = self._ConvertDateTime(data.get('completed_ts')) |
| 144 task.completed_time = _ConvertDateTime(data.get('completed_ts')) | |
| 145 task.put() | 132 task.put() |
| 146 | 133 |
| 147 return step_name, (step_name_no_platform, task.classified_tests) | 134 return step_name, step_name_no_platform |
| OLD | NEW |