Chromium Code Reviews| Index: appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py |
| diff --git a/appengine/findit/waterfall/process_swarming_task_result_pipeline.py b/appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py |
| similarity index 59% |
| copy from appengine/findit/waterfall/process_swarming_task_result_pipeline.py |
| copy to appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py |
| index 06d55d6de4d23ccba2e8dceb475278617b6aa2b4..28683f52d308c52d42a1b3a64b2d1d8ccea74edc 100644 |
| --- a/appengine/findit/waterfall/process_swarming_task_result_pipeline.py |
| +++ b/appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py |
| @@ -11,54 +11,57 @@ from common.http_client_appengine import HttpClientAppengine as HttpClient |
| from common.pipeline_wrapper import BasePipeline |
| from model import analysis_status |
| from model.wf_swarming_task import WfSwarmingTask |
| +from model.flake.flake_swarming_task_result import FlakeSwarmingTaskResult |
| from waterfall import swarming_util |
| from waterfall import waterfall_config |
| -def _CheckTestsRunStatuses(output_json): |
| - """Checks result status for each test run and saves the numbers accordingly. |
| - |
| - Args: |
| - output_json (dict): A dict of all test results in the swarming task. |
| - |
| - Returns: |
| - tests_statuses (dict): A dict of different statuses for each test. |
| - |
| - Currently for each test, we are saving number of total runs, |
| - number of succeeded runs and number of failed runs. |
| - """ |
| - tests_statuses = defaultdict(lambda: defaultdict(int)) |
| - if output_json: |
| - for iteration in output_json.get('per_iteration_data'): |
| - for test_name, tests in iteration.iteritems(): |
| - tests_statuses[test_name]['total_run'] += len(tests) |
| - for test in tests: |
| - tests_statuses[test_name][test['status']] += 1 |
| - |
| - return tests_statuses |
| - |
| - |
| -def _ConvertDateTime(time_string): |
| - """Convert UTC time string to datetime.datetime.""" |
| - # Match the time convertion with swarming.py. |
| - # According to swarming.py, |
| - # when microseconds are 0, the '.123456' suffix is elided. |
| - if not time_string: |
| - return None |
| - for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'): |
| - try: |
| - return datetime.datetime.strptime(time_string, fmt) |
| - except ValueError: |
| - pass |
| - raise ValueError('Failed to parse %s' % time_string) # pragma: no cover |
| - |
| - |
| -class ProcessSwarmingTaskResultPipeline(BasePipeline): |
| +class ProcessBaseSwarmingTaskResultPipeline(BasePipeline): |
| """A pipeline for monitoring swarming task and processing task result. |
| This pipeline waits for result for a swarming task and processes the result to |
| generate a dict for statuses for each test run. |
| """ |
| + def _CheckTestsRunStatuses(self, output_json): |
| + """Checks result status for each test run and saves the numbers accordingly. |
| + |
| + Args: |
| + output_json (dict): A dict of all test results in the swarming task. |
|
stgao
2016/07/09 00:04:33
style nit: indent.
Args:
arg_name (type): expla
caiw
2016/07/14 00:59:44
Done.
|
| + |
| + Returns: |
| + tests_statuses (dict): A dict of different statuses for each test. |
| + |
| + Currently for each test, we are saving number of total runs, |
| + number of succeeded runs and number of failed runs. |
| + """ |
| + tests_statuses = defaultdict(lambda: defaultdict(int)) |
| + if output_json: |
| + for iteration in output_json.get('per_iteration_data'): |
| + for test_name, tests in iteration.iteritems(): |
| + tests_statuses[test_name]['total_run'] += len(tests) |
| + for test in tests: |
| + tests_statuses[test_name][test['status']] += 1 |
| + return tests_statuses |
| + |
| + def _ConvertDateTime(self, time_string): |
| + """Convert UTC time string to datetime.datetime.""" |
| + # Match the time convertion with swarming.py. |
| + # According to swarming.py, |
| + # when microseconds are 0, the '.123456' suffix is elided. |
| + if not time_string: |
| + return None |
| + for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'): |
| + try: |
| + return datetime.datetime.strptime(time_string, fmt) |
| + except ValueError: |
| + pass |
| + raise ValueError('Failed to parse %s' % time_string) # pragma: no cover |
| + |
| + def _GetSwarmingTask( |
| + self,master_name, builder_name, step_name, build_number): |
| + # Get the appropriate kind of Swarming Task (Wf or Flake) |
| + # Should be overwritten by subclass |
| + pass |
|
stgao
2016/07/09 00:04:33
How about raising an NotImplementedError here as t
caiw
2016/07/14 00:59:44
Done.
|
| HTTP_CLIENT = HttpClient() |
| # Arguments number differs from overridden method - pylint: disable=W0221 |
| @@ -74,14 +77,12 @@ class ProcessSwarmingTaskResultPipeline(BasePipeline): |
| Returns: |
| A dict of lists for reliable/flaky tests. |
| """ |
| - |
| assert task_id |
| timeout_hours = waterfall_config.GetSwarmingSettings().get( |
| 'task_timeout_hours') |
| deadline = time.time() + timeout_hours * 60 * 60 |
| server_query_interval_seconds = waterfall_config.GetSwarmingSettings().get( |
| 'server_query_interval_seconds') |
| - |
| task_started = False |
| task_completed = False |
| tests_statuses = {} |
| @@ -96,14 +97,16 @@ class ProcessSwarmingTaskResultPipeline(BasePipeline): |
| data.get('tags', {}), 'ref_name') |
| if task_state not in swarming_util.STATES_RUNNING: |
| task_completed = True |
| - task = WfSwarmingTask.Get( |
| - master_name, builder_name, build_number, step_name) |
| + task = self._GetSwarmingTask( |
| + master_name, builder_name, step_name, build_number) |
| if task_state == swarming_util.STATE_COMPLETED: |
| outputs_ref = data.get('outputs_ref') |
| output_json = swarming_util.GetSwarmingTaskFailureLog( |
| outputs_ref, self.HTTP_CLIENT) |
| - tests_statuses = _CheckTestsRunStatuses(output_json) |
| - |
| + # This following line will not be compatible! |
| + tests_statuses = self._CheckTestsRunStatuses( |
| + output_json, master_name, builder_name, |
| + build_number, step_name) |
| task.status = analysis_status.COMPLETED |
| task.tests_statuses = tests_statuses |
| else: |
| @@ -119,8 +122,8 @@ class ProcessSwarmingTaskResultPipeline(BasePipeline): |
| if task_state == 'RUNNING' and not task_started: |
| # swarming task just starts, update status. |
| task_started = True |
| - task = WfSwarmingTask.Get( |
| - master_name, builder_name, build_number, step_name) |
| + task = self._GetSwarmingTask( |
| + master_name, builder_name, step_name, build_number) |
| task.status = analysis_status.RUNNING |
| task.put() |
| @@ -128,19 +131,18 @@ class ProcessSwarmingTaskResultPipeline(BasePipeline): |
| if time.time() > deadline: |
| # Updates status as ERROR. |
| - task = WfSwarmingTask.Get( |
| - master_name, builder_name, build_number, step_name) |
| + task = self._GetSwarmingTask( |
| + master_name, builder_name, step_name, build_number) |
| task.status = analysis_status.ERROR |
| task.put() |
| logging.error('Swarming task timed out after %d hours.' % timeout_hours) |
| break # Stops the loop and return. |
| - |
| # Update swarming task metadate. |
| - task = WfSwarmingTask.Get( |
| - master_name, builder_name, build_number, step_name) |
| - task.created_time = _ConvertDateTime(data.get('created_ts')) |
| - task.started_time = _ConvertDateTime(data.get('started_ts')) |
| - task.completed_time = _ConvertDateTime(data.get('completed_ts')) |
| + task = self._GetSwarmingTask( |
| + master_name, builder_name, step_name, build_number) |
| + task.created_time = self._ConvertDateTime(data.get('created_ts')) |
| + task.started_time = self._ConvertDateTime(data.get('started_ts')) |
| + task.completed_time = self._ConvertDateTime(data.get('completed_ts')) |
| task.put() |
| - return step_name, (step_name_no_platform, task.classified_tests) |
| + return step_name, step_name_no_platform |