Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(260)

Side by Side Diff: appengine/findit/waterfall/process_swarming_task_result_pipeline.py

Issue 2477343003: [Findit] Refactoring monitor swarming task pipelines (Closed)
Patch Set: Addressing nit Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2016 The Chromium Authors. All rights reserved. 1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 from collections import defaultdict
6 import datetime
7 import logging
8 import time
9
10 from common.http_client_appengine import HttpClientAppengine as HttpClient
11 from common.pipeline_wrapper import BasePipeline
12 from model import analysis_status
13 from model.wf_swarming_task import WfSwarmingTask 5 from model.wf_swarming_task import WfSwarmingTask
14 from waterfall import swarming_util 6 from waterfall.process_base_swarming_task_result_pipeline import (
15 from waterfall import waterfall_config 7 ProcessBaseSwarmingTaskResultPipeline)
16 8
17 9
18 def _CheckTestsRunStatuses(output_json): 10 class ProcessSwarmingTaskResultPipeline(ProcessBaseSwarmingTaskResultPipeline):
19 """Checks result status for each test run and saves the numbers accordingly.
20
21 Args:
22 output_json (dict): A dict of all test results in the swarming task.
23
24 Returns:
25 tests_statuses (dict): A dict of different statuses for each test.
26
27 Currently for each test, we are saving number of total runs,
28 number of succeeded runs and number of failed runs.
29 """
30 tests_statuses = defaultdict(lambda: defaultdict(int))
31 if output_json:
32 for iteration in output_json.get('per_iteration_data'):
33 for test_name, tests in iteration.iteritems():
34 tests_statuses[test_name]['total_run'] += len(tests)
35 for test in tests:
36 tests_statuses[test_name][test['status']] += 1
37
38 return tests_statuses
39
40
41 def _ConvertDateTime(time_string):
42 """Convert UTC time string to datetime.datetime."""
43 # Match the time convertion with swarming.py.
44 # According to swarming.py,
45 # when microseconds are 0, the '.123456' suffix is elided.
46 if not time_string:
47 return None
48 for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'):
49 try:
50 return datetime.datetime.strptime(time_string, fmt)
51 except ValueError:
52 pass
53 raise ValueError('Failed to parse %s' % time_string) # pragma: no cover
54
55
56 class ProcessSwarmingTaskResultPipeline(BasePipeline):
57 """A pipeline for monitoring swarming task and processing task result. 11 """A pipeline for monitoring swarming task and processing task result.
58 12
59 This pipeline waits for result for a swarming task and processes the result to 13 This pipeline waits for result for a swarming task and processes the result to
60 generate a dict for statuses for each test run. 14 generate a dict for statuses for each test run.
61 """ 15 """
62 16
63 HTTP_CLIENT = HttpClient() 17 # Arguments number differs from overridden method - pylint: disable=W0221
18 def _GetArgs(self, master_name, builder_name, build_number,
19 step_name):
20 return master_name, builder_name, build_number, step_name
21
22 # Arguments number differs from overridden method - pylint: disable=W0221
23 def _GetSwarmingTask(self, master_name, builder_name, build_number,
24 step_name):
25 # Gets the appropriate kind of swarming task (WfSwarmingTask).
26 return WfSwarmingTask.Get(master_name, builder_name, build_number,
27 step_name)
28
64 # Arguments number differs from overridden method - pylint: disable=W0221 29 # Arguments number differs from overridden method - pylint: disable=W0221
65 def run(self, master_name, builder_name, build_number, step_name): 30 def run(self, master_name, builder_name, build_number, step_name):
66 """ 31 """Monitors a swarming task.
32
67 Args: 33 Args:
68 master_name (str): The master name. 34 master_name (str): The master name.
69 builder_name (str): The builder name. 35 builder_name (str): The builder name.
70 build_number (str): The build number. 36 build_number (str): The build number.
71 step_name (str): The failed test step name. 37 step_name (str): The failed test step name.
72 38
73 Returns: 39 Returns:
74 A dict of lists for reliable/flaky tests. 40 A dict of lists for reliable/flaky tests.
75 """ 41 """
76 42 call_args = self._GetArgs(
77 timeout_hours = waterfall_config.GetSwarmingSettings().get(
78 'task_timeout_hours')
79 deadline = time.time() + timeout_hours * 60 * 60
80 server_query_interval_seconds = waterfall_config.GetSwarmingSettings().get(
81 'server_query_interval_seconds')
82
83 task_started = False
84 task_completed = False
85 tests_statuses = {}
86 step_name_no_platform = None
87
88 task = WfSwarmingTask.Get(
89 master_name, builder_name, build_number, step_name) 43 master_name, builder_name, build_number, step_name)
44 task = self._GetSwarmingTask(*call_args)
90 task_id = task.task_id 45 task_id = task.task_id
91 while not task_completed: 46 step_name_no_platform = self._MonitorSwarmingTask(task_id, *call_args)
92 # Keeps monitoring the swarming task, waits for it to complete.
93 data = swarming_util.GetSwarmingTaskResultById(
94 task_id, self.HTTP_CLIENT)
95 task_state = data['state']
96 step_name_no_platform = swarming_util.GetTagValue(
97 data.get('tags', {}), 'ref_name')
98 if task_state not in swarming_util.STATES_RUNNING:
99 task_completed = True
100 task = WfSwarmingTask.Get(
101 master_name, builder_name, build_number, step_name)
102 if task_state == swarming_util.STATE_COMPLETED:
103 outputs_ref = data.get('outputs_ref')
104 output_json = swarming_util.GetSwarmingTaskFailureLog(
105 outputs_ref, self.HTTP_CLIENT)
106 tests_statuses = _CheckTestsRunStatuses(output_json)
107
108 task.status = analysis_status.COMPLETED
109 task.tests_statuses = tests_statuses
110 else:
111 task.status = analysis_status.ERROR
112 logging.error('Swarming task stopped with status: %s' % (
113 task_state))
114 priority_str = swarming_util.GetTagValue(
115 data.get('tags', {}), 'priority')
116 if priority_str:
117 task.parameters['priority'] = int(priority_str)
118 task.put()
119 else: # pragma: no cover
120 if task_state == 'RUNNING' and not task_started:
121 # swarming task just starts, update status.
122 task_started = True
123 task = WfSwarmingTask.Get(
124 master_name, builder_name, build_number, step_name)
125 task.status = analysis_status.RUNNING
126 task.put()
127
128 time.sleep(server_query_interval_seconds)
129
130 if time.time() > deadline:
131 # Updates status as ERROR.
132 task = WfSwarmingTask.Get(
133 master_name, builder_name, build_number, step_name)
134 task.status = analysis_status.ERROR
135 task.put()
136 logging.error('Swarming task timed out after %d hours.' % timeout_hours)
137 break # Stops the loop and return.
138
139 # Update swarming task metadate.
140 task = WfSwarmingTask.Get(
141 master_name, builder_name, build_number, step_name)
142 task.created_time = _ConvertDateTime(data.get('created_ts'))
143 task.started_time = _ConvertDateTime(data.get('started_ts'))
144 task.completed_time = _ConvertDateTime(data.get('completed_ts'))
145 task.put()
146 47
147 return step_name, (step_name_no_platform, task.reliable_tests) 48 return step_name, (step_name_no_platform, task.reliable_tests)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698