Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2379)

Unified Diff: appengine/findit/waterfall/process_swarming_task_result_pipeline.py

Issue 2477343003: [Findit] Refactoring monitor swarming task pipelines (Closed)
Patch Set: Addressing nit Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/findit/waterfall/process_swarming_task_result_pipeline.py
diff --git a/appengine/findit/waterfall/process_swarming_task_result_pipeline.py b/appengine/findit/waterfall/process_swarming_task_result_pipeline.py
index c4a105ba76185e611a9c37eb36d1c2df18ccadd4..4281f37c17460e5e515e3dd28c43290ce4aaffa6 100644
--- a/appengine/findit/waterfall/process_swarming_task_result_pipeline.py
+++ b/appengine/findit/waterfall/process_swarming_task_result_pipeline.py
@@ -2,68 +2,34 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
-from collections import defaultdict
-import datetime
-import logging
-import time
-
-from common.http_client_appengine import HttpClientAppengine as HttpClient
-from common.pipeline_wrapper import BasePipeline
-from model import analysis_status
from model.wf_swarming_task import WfSwarmingTask
-from waterfall import swarming_util
-from waterfall import waterfall_config
-
-
-def _CheckTestsRunStatuses(output_json):
- """Checks result status for each test run and saves the numbers accordingly.
-
- Args:
- output_json (dict): A dict of all test results in the swarming task.
-
- Returns:
- tests_statuses (dict): A dict of different statuses for each test.
-
- Currently for each test, we are saving number of total runs,
- number of succeeded runs and number of failed runs.
- """
- tests_statuses = defaultdict(lambda: defaultdict(int))
- if output_json:
- for iteration in output_json.get('per_iteration_data'):
- for test_name, tests in iteration.iteritems():
- tests_statuses[test_name]['total_run'] += len(tests)
- for test in tests:
- tests_statuses[test_name][test['status']] += 1
-
- return tests_statuses
-
+from waterfall.process_base_swarming_task_result_pipeline import (
+ ProcessBaseSwarmingTaskResultPipeline)
-def _ConvertDateTime(time_string):
- """Convert UTC time string to datetime.datetime."""
- # Match the time convertion with swarming.py.
- # According to swarming.py,
- # when microseconds are 0, the '.123456' suffix is elided.
- if not time_string:
- return None
- for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'):
- try:
- return datetime.datetime.strptime(time_string, fmt)
- except ValueError:
- pass
- raise ValueError('Failed to parse %s' % time_string) # pragma: no cover
-
-class ProcessSwarmingTaskResultPipeline(BasePipeline):
+class ProcessSwarmingTaskResultPipeline(ProcessBaseSwarmingTaskResultPipeline):
"""A pipeline for monitoring swarming task and processing task result.
This pipeline waits for result for a swarming task and processes the result to
generate a dict for statuses for each test run.
"""
- HTTP_CLIENT = HttpClient()
+ # Arguments number differs from overridden method - pylint: disable=W0221
+ def _GetArgs(self, master_name, builder_name, build_number,
+ step_name):
+ return master_name, builder_name, build_number, step_name
+
+ # Arguments number differs from overridden method - pylint: disable=W0221
+ def _GetSwarmingTask(self, master_name, builder_name, build_number,
+ step_name):
+ # Gets the appropriate kind of swarming task (WfSwarmingTask).
+ return WfSwarmingTask.Get(master_name, builder_name, build_number,
+ step_name)
+
# Arguments number differs from overridden method - pylint: disable=W0221
def run(self, master_name, builder_name, build_number, step_name):
- """
+ """Monitors a swarming task.
+
Args:
master_name (str): The master name.
builder_name (str): The builder name.
@@ -73,75 +39,10 @@ class ProcessSwarmingTaskResultPipeline(BasePipeline):
Returns:
A dict of lists for reliable/flaky tests.
"""
-
- timeout_hours = waterfall_config.GetSwarmingSettings().get(
- 'task_timeout_hours')
- deadline = time.time() + timeout_hours * 60 * 60
- server_query_interval_seconds = waterfall_config.GetSwarmingSettings().get(
- 'server_query_interval_seconds')
-
- task_started = False
- task_completed = False
- tests_statuses = {}
- step_name_no_platform = None
-
- task = WfSwarmingTask.Get(
+ call_args = self._GetArgs(
master_name, builder_name, build_number, step_name)
+ task = self._GetSwarmingTask(*call_args)
task_id = task.task_id
- while not task_completed:
- # Keeps monitoring the swarming task, waits for it to complete.
- data = swarming_util.GetSwarmingTaskResultById(
- task_id, self.HTTP_CLIENT)
- task_state = data['state']
- step_name_no_platform = swarming_util.GetTagValue(
- data.get('tags', {}), 'ref_name')
- if task_state not in swarming_util.STATES_RUNNING:
- task_completed = True
- task = WfSwarmingTask.Get(
- master_name, builder_name, build_number, step_name)
- if task_state == swarming_util.STATE_COMPLETED:
- outputs_ref = data.get('outputs_ref')
- output_json = swarming_util.GetSwarmingTaskFailureLog(
- outputs_ref, self.HTTP_CLIENT)
- tests_statuses = _CheckTestsRunStatuses(output_json)
-
- task.status = analysis_status.COMPLETED
- task.tests_statuses = tests_statuses
- else:
- task.status = analysis_status.ERROR
- logging.error('Swarming task stopped with status: %s' % (
- task_state))
- priority_str = swarming_util.GetTagValue(
- data.get('tags', {}), 'priority')
- if priority_str:
- task.parameters['priority'] = int(priority_str)
- task.put()
- else: # pragma: no cover
- if task_state == 'RUNNING' and not task_started:
- # swarming task just starts, update status.
- task_started = True
- task = WfSwarmingTask.Get(
- master_name, builder_name, build_number, step_name)
- task.status = analysis_status.RUNNING
- task.put()
-
- time.sleep(server_query_interval_seconds)
-
- if time.time() > deadline:
- # Updates status as ERROR.
- task = WfSwarmingTask.Get(
- master_name, builder_name, build_number, step_name)
- task.status = analysis_status.ERROR
- task.put()
- logging.error('Swarming task timed out after %d hours.' % timeout_hours)
- break # Stops the loop and return.
-
- # Update swarming task metadate.
- task = WfSwarmingTask.Get(
- master_name, builder_name, build_number, step_name)
- task.created_time = _ConvertDateTime(data.get('created_ts'))
- task.started_time = _ConvertDateTime(data.get('started_ts'))
- task.completed_time = _ConvertDateTime(data.get('completed_ts'))
- task.put()
+ step_name_no_platform = self._MonitorSwarmingTask(task_id, *call_args)
return step_name, (step_name_no_platform, task.reliable_tests)

Powered by Google App Engine
This is Rietveld 408576698