Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(43)

Side by Side Diff: appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py

Issue 2130543004: Waterfall components of regression range finder. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: added another init whoooo Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2016 The Chromium Authors. All rights reserved. 1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 from collections import defaultdict 5 from collections import defaultdict
6 import datetime 6 import datetime
7 import logging 7 import logging
8 import time 8 import time
9 9
10 from common.http_client_appengine import HttpClientAppengine as HttpClient 10 from common.http_client_appengine import HttpClientAppengine as HttpClient
11 from common.pipeline_wrapper import BasePipeline 11 from common.pipeline_wrapper import BasePipeline
12 from model import analysis_status 12 from model import analysis_status
13 from model.wf_swarming_task import WfSwarmingTask 13 from model.wf_swarming_task import WfSwarmingTask
14 from waterfall import swarming_util 14 from waterfall import swarming_util
15 from waterfall import waterfall_config 15 from waterfall import waterfall_config
16 16
17 17
18 def _CheckTestsRunStatuses(output_json): 18 class ProcessBaseSwarmingTaskResultPipeline(BasePipeline):
19 """Checks result status for each test run and saves the numbers accordingly.
20
21 Args:
22 output_json (dict): A dict of all test results in the swarming task.
23
24 Returns:
25 tests_statuses (dict): A dict of different statuses for each test.
26
27 Currently for each test, we are saving number of total runs,
28 number of succeeded runs and number of failed runs.
29 """
30 tests_statuses = defaultdict(lambda: defaultdict(int))
31 if output_json:
32 for iteration in output_json.get('per_iteration_data'):
33 for test_name, tests in iteration.iteritems():
34 tests_statuses[test_name]['total_run'] += len(tests)
35 for test in tests:
36 tests_statuses[test_name][test['status']] += 1
37
38 return tests_statuses
39
40
41 def _ConvertDateTime(time_string):
42 """Convert UTC time string to datetime.datetime."""
43 # Match the time convertion with swarming.py.
44 # According to swarming.py,
45 # when microseconds are 0, the '.123456' suffix is elided.
46 if not time_string:
47 return None
48 for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'):
49 try:
50 return datetime.datetime.strptime(time_string, fmt)
51 except ValueError:
52 pass
53 raise ValueError('Failed to parse %s' % time_string) # pragma: no cover
54
55
56 class ProcessSwarmingTaskResultPipeline(BasePipeline):
57 """A pipeline for monitoring swarming task and processing task result. 19 """A pipeline for monitoring swarming task and processing task result.
58 20
59 This pipeline waits for result for a swarming task and processes the result to 21 This pipeline waits for result for a swarming task and processes the result to
60 generate a dict for statuses for each test run. 22 generate a dict for statuses for each test run.
61 """ 23 """
62 24
63 HTTP_CLIENT = HttpClient() 25 HTTP_CLIENT = HttpClient()
26
27 def _CheckTestsRunStatuses(self, output_json):
28 # Checks result status for each test run and saves the numbers accordingly.
29 # Should be overridden by subclass.
30 raise NotImplementedError(
31 '_CheckTestsRunStatuses should be implemented in the child class')
32
33 def _ConvertDateTime(self, time_string):
34 """Convert UTC time string to datetime.datetime."""
35 # Match the time conversion with swarming.py which elides the suffix
36 # when microseconds are 0.
37 if not time_string:
38 return None
39 for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'):
40 try:
41 return datetime.datetime.strptime(time_string, fmt)
42 except ValueError:
43 pass
44 raise ValueError('Failed to parse %s' % time_string) # pragma: no cover
45
46 def _GetSwarmingTask(self):
47 # Get the appropriate kind of Swarming Task (Wf or Flake).
48 # Should be overwritten by subclass.
49 raise NotImplementedError(
50 '_GetSwarmingTask should be implemented in the child class')
51
52 def _GetArgs(self):
53 # Return list of arguments to call _CheckTestsRunStatuses with - output_json
54 # Should be overwritten by subclass.
55 raise NotImplementedError(
56 '_GetArgs should be implemented in the child class')
57
64 # Arguments number differs from overridden method - pylint: disable=W0221 58 # Arguments number differs from overridden method - pylint: disable=W0221
65 def run(self, master_name, builder_name, build_number, step_name): 59 def run(self, master_name, builder_name, build_number,
60 step_name, task_id, *args): #pragma: no cover.
66 """ 61 """
67 Args: 62 Args:
68 master_name (str): The master name. 63 master_name (str): The master name.
69 builder_name (str): The builder name. 64 builder_name (str): The builder name.
70 build_number (str): The build number. 65 build_number (str): The build number.
71 step_name (str): The failed test step name. 66 step_name (str): The failed test step name.
67 task_id (str): Id for the swarming task which is triggered by Findit.
72 68
73 Returns: 69 Returns:
74 A dict of lists for reliable/flaky tests. 70 A dict of lists for reliable/flaky tests.
75 """ 71 """
76 72 call_args = self._GetArgs(master_name, builder_name, build_number,
73 step_name, *args)
74 assert task_id
77 timeout_hours = waterfall_config.GetSwarmingSettings().get( 75 timeout_hours = waterfall_config.GetSwarmingSettings().get(
78 'task_timeout_hours') 76 'task_timeout_hours')
79 deadline = time.time() + timeout_hours * 60 * 60 77 deadline = time.time() + timeout_hours * 60 * 60
80 server_query_interval_seconds = waterfall_config.GetSwarmingSettings().get( 78 server_query_interval_seconds = waterfall_config.GetSwarmingSettings().get(
81 'server_query_interval_seconds') 79 'server_query_interval_seconds')
82
83 task_started = False 80 task_started = False
84 task_completed = False 81 task_completed = False
85 tests_statuses = {} 82 tests_statuses = {}
86 step_name_no_platform = None 83 step_name_no_platform = None
87 84
88 task = WfSwarmingTask.Get(
89 master_name, builder_name, build_number, step_name)
90 task_id = task.task_id
91 while not task_completed: 85 while not task_completed:
92 # Keeps monitoring the swarming task, waits for it to complete. 86 # Keeps monitoring the swarming task, waits for it to complete.
93 data = swarming_util.GetSwarmingTaskResultById( 87 data = swarming_util.GetSwarmingTaskResultById(
94 task_id, self.HTTP_CLIENT) 88 task_id, self.HTTP_CLIENT)
95 task_state = data['state'] 89 task_state = data['state']
96 step_name_no_platform = swarming_util.GetTagValue( 90 step_name_no_platform = swarming_util.GetTagValue(
97 data.get('tags', {}), 'ref_name') 91 data.get('tags', {}), 'ref_name')
98 if task_state not in swarming_util.STATES_RUNNING: 92 if task_state not in swarming_util.STATES_RUNNING:
99 task_completed = True 93 task_completed = True
100 task = WfSwarmingTask.Get( 94 task = self._GetSwarmingTask(*call_args)
101 master_name, builder_name, build_number, step_name)
102 if task_state == swarming_util.STATE_COMPLETED: 95 if task_state == swarming_util.STATE_COMPLETED:
103 outputs_ref = data.get('outputs_ref') 96 outputs_ref = data.get('outputs_ref')
104 output_json = swarming_util.GetSwarmingTaskFailureLog( 97 output_json = swarming_util.GetSwarmingTaskFailureLog(
105 outputs_ref, self.HTTP_CLIENT) 98 outputs_ref, self.HTTP_CLIENT)
106 tests_statuses = _CheckTestsRunStatuses(output_json) 99 tests_statuses = self._CheckTestsRunStatuses(
107 100 output_json, *call_args)
108 task.status = analysis_status.COMPLETED 101 task.status = analysis_status.COMPLETED
109 task.tests_statuses = tests_statuses 102 task.tests_statuses = tests_statuses
110 else: 103 else:
111 task.status = analysis_status.ERROR 104 task.status = analysis_status.ERROR
112 logging.error('Swarming task stopped with status: %s' % ( 105 logging.error('Swarming task stopped with status: %s' % (
113 task_state)) 106 task_state))
114 priority_str = swarming_util.GetTagValue( 107 priority_str = swarming_util.GetTagValue(
115 data.get('tags', {}), 'priority') 108 data.get('tags', {}), 'priority')
116 if priority_str: 109 if priority_str:
117 task.parameters['priority'] = int(priority_str) 110 task.parameters['priority'] = int(priority_str)
118 task.put() 111 task.put()
119 else: # pragma: no cover 112 else: # pragma: no cover
120 if task_state == 'RUNNING' and not task_started: 113 if task_state == 'RUNNING' and not task_started:
121 # swarming task just starts, update status. 114 # swarming task just starts, update status.
122 task_started = True 115 task_started = True
123 task = WfSwarmingTask.Get( 116 task = self._GetSwarmingTask(*call_args)
124 master_name, builder_name, build_number, step_name)
125 task.status = analysis_status.RUNNING 117 task.status = analysis_status.RUNNING
126 task.put() 118 task.put()
127
128 time.sleep(server_query_interval_seconds) 119 time.sleep(server_query_interval_seconds)
129
130 if time.time() > deadline: 120 if time.time() > deadline:
131 # Updates status as ERROR. 121 # Updates status as ERROR.
132 task = WfSwarmingTask.Get( 122 task = self._GetSwarmingTask(*call_args)
133 master_name, builder_name, build_number, step_name)
134 task.status = analysis_status.ERROR 123 task.status = analysis_status.ERROR
135 task.put() 124 task.put()
136 logging.error('Swarming task timed out after %d hours.' % timeout_hours) 125 logging.error('Swarming task timed out after %d hours.' % timeout_hours)
137 break # Stops the loop and return. 126 break # Stops the loop and return.
138
139 # Update swarming task metadate. 127 # Update swarming task metadate.
140 task = WfSwarmingTask.Get( 128 task = self._GetSwarmingTask(*call_args)
141 master_name, builder_name, build_number, step_name) 129 task.created_time = self._ConvertDateTime(data.get('created_ts'))
142 task.created_time = _ConvertDateTime(data.get('created_ts')) 130 task.started_time = self._ConvertDateTime(data.get('started_ts'))
143 task.started_time = _ConvertDateTime(data.get('started_ts')) 131 task.completed_time = self._ConvertDateTime(data.get('completed_ts'))
144 task.completed_time = _ConvertDateTime(data.get('completed_ts'))
145 task.put() 132 task.put()
146 133
147 return step_name, (step_name_no_platform, task.classified_tests) 134 return step_name, step_name_no_platform
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698