Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2157)

Unified Diff: appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py

Issue 2130543004: Waterfall components of regression range finder. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py
diff --git a/appengine/findit/waterfall/process_swarming_task_result_pipeline.py b/appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py
similarity index 59%
copy from appengine/findit/waterfall/process_swarming_task_result_pipeline.py
copy to appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py
index 06d55d6de4d23ccba2e8dceb475278617b6aa2b4..28683f52d308c52d42a1b3a64b2d1d8ccea74edc 100644
--- a/appengine/findit/waterfall/process_swarming_task_result_pipeline.py
+++ b/appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py
@@ -11,54 +11,57 @@ from common.http_client_appengine import HttpClientAppengine as HttpClient
from common.pipeline_wrapper import BasePipeline
from model import analysis_status
from model.wf_swarming_task import WfSwarmingTask
+from model.flake.flake_swarming_task_result import FlakeSwarmingTaskResult
from waterfall import swarming_util
from waterfall import waterfall_config
-def _CheckTestsRunStatuses(output_json):
- """Checks result status for each test run and saves the numbers accordingly.
-
- Args:
- output_json (dict): A dict of all test results in the swarming task.
-
- Returns:
- tests_statuses (dict): A dict of different statuses for each test.
-
- Currently for each test, we are saving number of total runs,
- number of succeeded runs and number of failed runs.
- """
- tests_statuses = defaultdict(lambda: defaultdict(int))
- if output_json:
- for iteration in output_json.get('per_iteration_data'):
- for test_name, tests in iteration.iteritems():
- tests_statuses[test_name]['total_run'] += len(tests)
- for test in tests:
- tests_statuses[test_name][test['status']] += 1
-
- return tests_statuses
-
-
-def _ConvertDateTime(time_string):
- """Convert UTC time string to datetime.datetime."""
- # Match the time convertion with swarming.py.
- # According to swarming.py,
- # when microseconds are 0, the '.123456' suffix is elided.
- if not time_string:
- return None
- for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'):
- try:
- return datetime.datetime.strptime(time_string, fmt)
- except ValueError:
- pass
- raise ValueError('Failed to parse %s' % time_string) # pragma: no cover
-
-
-class ProcessSwarmingTaskResultPipeline(BasePipeline):
+class ProcessBaseSwarmingTaskResultPipeline(BasePipeline):
"""A pipeline for monitoring swarming task and processing task result.
This pipeline waits for result for a swarming task and processes the result to
generate a dict for statuses for each test run.
"""
+ def _CheckTestsRunStatuses(self, output_json):
+ """Checks result status for each test run and saves the numbers accordingly.
+
+ Args:
+ output_json (dict): A dict of all test results in the swarming task.
stgao 2016/07/09 00:04:33 style nit: indent. Args: arg_name (type): expla
caiw 2016/07/14 00:59:44 Done.
+
+ Returns:
+ tests_statuses (dict): A dict of different statuses for each test.
+
+ Currently for each test, we are saving number of total runs,
+ number of succeeded runs and number of failed runs.
+ """
+ tests_statuses = defaultdict(lambda: defaultdict(int))
+ if output_json:
+ for iteration in output_json.get('per_iteration_data'):
+ for test_name, tests in iteration.iteritems():
+ tests_statuses[test_name]['total_run'] += len(tests)
+ for test in tests:
+ tests_statuses[test_name][test['status']] += 1
+ return tests_statuses
+
+ def _ConvertDateTime(self, time_string):
+ """Convert UTC time string to datetime.datetime."""
+ # Match the time convertion with swarming.py.
+ # According to swarming.py,
+ # when microseconds are 0, the '.123456' suffix is elided.
+ if not time_string:
+ return None
+ for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'):
+ try:
+ return datetime.datetime.strptime(time_string, fmt)
+ except ValueError:
+ pass
+ raise ValueError('Failed to parse %s' % time_string) # pragma: no cover
+
+ def _GetSwarmingTask(
+ self,master_name, builder_name, step_name, build_number):
+ # Get the appropriate kind of Swarming Task (Wf or Flake)
+ # Should be overwritten by subclass
+ pass
stgao 2016/07/09 00:04:33 How about raising an NotImplementedError here as t
caiw 2016/07/14 00:59:44 Done.
HTTP_CLIENT = HttpClient()
# Arguments number differs from overridden method - pylint: disable=W0221
@@ -74,14 +77,12 @@ class ProcessSwarmingTaskResultPipeline(BasePipeline):
Returns:
A dict of lists for reliable/flaky tests.
"""
-
assert task_id
timeout_hours = waterfall_config.GetSwarmingSettings().get(
'task_timeout_hours')
deadline = time.time() + timeout_hours * 60 * 60
server_query_interval_seconds = waterfall_config.GetSwarmingSettings().get(
'server_query_interval_seconds')
-
task_started = False
task_completed = False
tests_statuses = {}
@@ -96,14 +97,16 @@ class ProcessSwarmingTaskResultPipeline(BasePipeline):
data.get('tags', {}), 'ref_name')
if task_state not in swarming_util.STATES_RUNNING:
task_completed = True
- task = WfSwarmingTask.Get(
- master_name, builder_name, build_number, step_name)
+ task = self._GetSwarmingTask(
+ master_name, builder_name, step_name, build_number)
if task_state == swarming_util.STATE_COMPLETED:
outputs_ref = data.get('outputs_ref')
output_json = swarming_util.GetSwarmingTaskFailureLog(
outputs_ref, self.HTTP_CLIENT)
- tests_statuses = _CheckTestsRunStatuses(output_json)
-
+ # This following line will not be compatible!
+ tests_statuses = self._CheckTestsRunStatuses(
+ output_json, master_name, builder_name,
+ build_number, step_name)
task.status = analysis_status.COMPLETED
task.tests_statuses = tests_statuses
else:
@@ -119,8 +122,8 @@ class ProcessSwarmingTaskResultPipeline(BasePipeline):
if task_state == 'RUNNING' and not task_started:
# swarming task just starts, update status.
task_started = True
- task = WfSwarmingTask.Get(
- master_name, builder_name, build_number, step_name)
+ task = self._GetSwarmingTask(
+ master_name, builder_name, step_name, build_number)
task.status = analysis_status.RUNNING
task.put()
@@ -128,19 +131,18 @@ class ProcessSwarmingTaskResultPipeline(BasePipeline):
if time.time() > deadline:
# Updates status as ERROR.
- task = WfSwarmingTask.Get(
- master_name, builder_name, build_number, step_name)
+ task = self._GetSwarmingTask(
+ master_name, builder_name, step_name, build_number)
task.status = analysis_status.ERROR
task.put()
logging.error('Swarming task timed out after %d hours.' % timeout_hours)
break # Stops the loop and return.
-
# Update swarming task metadate.
- task = WfSwarmingTask.Get(
- master_name, builder_name, build_number, step_name)
- task.created_time = _ConvertDateTime(data.get('created_ts'))
- task.started_time = _ConvertDateTime(data.get('started_ts'))
- task.completed_time = _ConvertDateTime(data.get('completed_ts'))
+ task = self._GetSwarmingTask(
+ master_name, builder_name, step_name, build_number)
+ task.created_time = self._ConvertDateTime(data.get('created_ts'))
+ task.started_time = self._ConvertDateTime(data.get('started_ts'))
+ task.completed_time = self._ConvertDateTime(data.get('completed_ts'))
task.put()
- return step_name, (step_name_no_platform, task.classified_tests)
+ return step_name, step_name_no_platform

Powered by Google App Engine
This is Rietveld 408576698