Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(34)

Side by Side Diff: appengine/findit/crash/crash_pipeline.py

Issue 2414523002: [Findit] Reorganizing findit_for_*.py (Closed)
Patch Set: trying to fix some tests Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2016 The Chromium Authors. All rights reserved. 1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 import copy 5 import copy
6 import json 6 import json
7 import logging 7 import logging
8 8
9 from google.appengine.api import app_identity
10 from google.appengine.ext import ndb 9 from google.appengine.ext import ndb
11 10
12 from common import appengine_util 11 from common import appengine_util
13 from common import constants 12 from common import constants
14 from common import pubsub_util 13 from common import pubsub_util
15 from common import time_util 14 from common import time_util
16 from common.pipeline_wrapper import BasePipeline 15 from common.pipeline_wrapper import BasePipeline
17 from common.pipeline_wrapper import pipeline 16 from common.pipeline_wrapper import pipeline
18 from crash import findit_for_client 17 from crash import findit
18 from crash import findit_for_chromecrash
19 from crash.type_enums import CrashClient
20 from crash.detect_regression_range import DetectRegressionRange
19 from model import analysis_status 21 from model import analysis_status
20 from model.crash.crash_config import CrashConfig 22 from model.crash.crash_config import CrashConfig
21 23
24 # TODO(katesonia): Move this to fracas config.
25 _FINDIT_FRACAS_FEEDBACK_URL_TEMPLATE = '%s/crash/fracas-result-feedback?key=%s'
22 26
27 # TODO(wrengr): rename this. Should be "Pub" or "Publish*ed*" at the very least.
28 # TODO(wrengr): this should prolly be a method on CrashAnalysis rather
Sharu Jiang 2016/10/12 22:35:03 I prefer to put this in CrashAnalysis.
wrengr 2016/10/18 23:13:53 Done.
29 # than a stand-alone function. Or, we could have the CrashAnalysis be part
30 # of the Findit class directly, in order to get rid of this last remaining
31 # case switching on the |client_id|. With all the FooAnalysis methods, it
32 # basically already is a part of that class, just not done cleanly is all.
33 def GetPublishResultFromAnalysis(model, crash_identifiers):
34 """Convert the datastore analysis into a publishable result.
35
36 Args:
37 model (CrashAnalysis): the datastore to be converted.
38 crash_identifiers (??): ??
39
40 Returns:
41 A dict of ??
42 """
43 model_result = copy.deepcopy(model.result)
44 client_id = model.client_id
45
46 if (client_id == CrashClient.FRACAS or
47 client_id == CrashClient.CRACAS):
48 model_result['feedback_url'] = _FINDIT_FRACAS_FEEDBACK_URL_TEMPLATE % (
49 appengine_util.GetDefaultVersionHostname(), model.key.urlsafe())
50 if model_result['found']:
51 for cl in model_result['suspected_cls']:
52 cl['confidence'] = round(cl['confidence'], 2)
53 cl.pop('reason', None)
54 elif client_id == CrashClient.CLUSTERFUZZ: # pragma: no cover.
55 # TODO(katesonia): Post process clusterfuzz model result if needed.
56 pass
57
58 return {
59 'crash_identifiers': crash_identifiers,
60 'client_id': client_id,
61 'result': model_result,
62 }
63
64
65 # TODO(wrengr): we'd like to move this to findit.py (or somewhere similar)
66 # so it's encapsulated along with the Findit classes; but to do that we'd
67 # need to merge findit.py and findit_for_chromecrash.py to avoid cyclic
68 # dependencies. Is there a clean way to do what we want?
69 def FinditForClientID(client_id):
70 if client_id == CrashClient.FRACAS:
71 return findit_for_chromecrash.FinditForFracas()
72 elif client_id == CrashClient.CRACAS:
73 return findit_for_chromecrash.FinditForCracas()
74 elif client_id == CrashClient.CLUSTERFUZZ:
75 return findit.FinditForClusterfuzz()
Sharu Jiang 2016/10/12 22:35:03 We'd better create a findit_for_clusterfuzz for cl
wrengr 2016/10/18 23:13:54 Done.
76 else:
77 logging.info('Client %s is not supported by findit right now', client_id)
78 raise ValueError()
79
80
81 # TODO(wrengr): make this a member of Findit (or, perhaps, merge the
82 # two classes), so we don't need to pass the |client_id| around manually.
23 class CrashBasePipeline(BasePipeline): 83 class CrashBasePipeline(BasePipeline):
24 def __init__(self, crash_identifiers, client_id): 84 def __init__(self, crash_identifiers, client_id):
25 super(CrashBasePipeline, self).__init__(crash_identifiers, client_id) 85 super(CrashBasePipeline, self).__init__(crash_identifiers, client_id)
26 self.crash_identifiers = crash_identifiers 86 self.crash_identifiers = crash_identifiers
27 self.client_id = client_id 87 self.client_id = client_id
88 self.findit = FinditForClientID(client_id)
28 89
90 # TODO(wrengr): why not just use __call__? Or, if giving it this name,
91 # why not specify the right number of arguments so that we don't need
92 # to disable pylint?
stgao 2016/10/12 15:46:58 Unfornately, we can't do __call__, because this is
wrengr 2016/10/12 21:14:51 Acknowledged.
29 def run(self, *args, **kwargs): 93 def run(self, *args, **kwargs):
30 raise NotImplementedError() 94 raise NotImplementedError()
31 95
32 96
33 class CrashAnalysisPipeline(CrashBasePipeline): 97 class CrashAnalysisPipeline(CrashBasePipeline):
34 def _SetErrorIfAborted(self, aborted):
35 if not aborted:
36 return
37
38 logging.error('Aborted analysis for %s', repr(self.crash_identifiers))
39 analysis = findit_for_client.GetAnalysisForClient(self.crash_identifiers,
40 self.client_id)
41 analysis.status = analysis_status.ERROR
42 analysis.put()
43
44 def finalized(self): 98 def finalized(self):
45 self._SetErrorIfAborted(self.was_aborted) 99 if self.was_aborted:
100 logging.error('Aborted analysis for %s', repr(self.crash_identifiers))
stgao 2016/10/12 15:46:58 For testing purpose, we need to factor this out as
wrengr 2016/10/12 21:14:51 I've been told elsewhere that the main code should
stgao 2016/10/13 06:03:16 Yep, I agreed with this in general.
wrengr 2016/10/18 23:13:54 Done.
101 analysis = self.findit.GetAnalysis(self.crash_identifiers)
102 analysis.status = analysis_status.ERROR
103 analysis.put()
46 104
47 # Arguments number differs from overridden method - pylint: disable=W0221 105 # Arguments number differs from overridden method - pylint: disable=W0221
48 def run(self, crash_identifiers, client_id): 106 def run(self, crash_identifiers):
49 analysis = findit_for_client.GetAnalysisForClient(crash_identifiers, 107 analysis = self.findit.GetAnalysis(crash_identifiers)
50 client_id)
51 108
52 # Update analysis status. 109 # Update analysis status.
53 analysis.pipeline_status_path = self.pipeline_status_path() 110 analysis.pipeline_status_path = self.pipeline_status_path()
54 analysis.status = analysis_status.RUNNING 111 analysis.status = analysis_status.RUNNING
55 analysis.started_time = time_util.GetUTCNow() 112 analysis.started_time = time_util.GetUTCNow()
56 analysis.findit_version = appengine_util.GetCurrentVersion() 113 analysis.findit_version = appengine_util.GetCurrentVersion()
57 analysis.put() 114 analysis.put()
58 115
59 # Run the analysis. 116 # Run the analysis.
60 result, tags = findit_for_client.FindCulprit(analysis) 117 result, tags = self.findit.FindCulprit(analysis)
61 118
62 # Update analysis status and save the analysis result. 119 # Update analysis status and save the analysis result.
63 analysis.completed_time = time_util.GetUTCNow() 120 analysis.completed_time = time_util.GetUTCNow()
64 analysis.result = result 121 analysis.result = result
65 for tag_name, tag_value in tags.iteritems(): 122 for tag_name, tag_value in tags.iteritems():
66 # TODO(http://crbug.com/602702): make it possible to add arbitrary tags. 123 # TODO(http://crbug.com/602702): make it possible to add arbitrary tags.
67 if hasattr(analysis, tag_name): 124 if hasattr(analysis, tag_name):
68 setattr(analysis, tag_name, tag_value) 125 setattr(analysis, tag_name, tag_value)
69 analysis.status = analysis_status.COMPLETED 126 analysis.status = analysis_status.COMPLETED
70 analysis.put() 127 analysis.put()
71 128
72 129
73 class PublishResultPipeline(CrashBasePipeline): 130 class PublishResultPipeline(CrashBasePipeline):
74 def finalized(self): 131 def finalized(self):
75 if self.was_aborted: # pragma: no cover. 132 if self.was_aborted: # pragma: no cover.
76 logging.error('Failed to publish %s analysis result for %s', 133 logging.error('Failed to publish %s analysis result for %s',
77 repr(self.crash_identifiers), self.client_id) 134 repr(self.crash_identifiers), self.client_id)
78 135
79
80 # Arguments number differs from overridden method - pylint: disable=W0221 136 # Arguments number differs from overridden method - pylint: disable=W0221
81 def run(self, crash_identifiers, client_id): 137 def run(self, crash_identifiers):
82 analysis = findit_for_client.GetAnalysisForClient(crash_identifiers, 138 analysis = self.findit.GetAnalysis(crash_identifiers)
83 client_id) 139 result = GetPublishResultFromAnalysis(analysis, crash_identifiers)
84 result = findit_for_client.GetPublishResultFromAnalysis(analysis,
85 crash_identifiers,
86 client_id)
87 messages_data = [json.dumps(result, sort_keys=True)] 140 messages_data = [json.dumps(result, sort_keys=True)]
88 141
89 client_config = CrashConfig.Get().GetClientConfig(client_id) 142 client_config = CrashConfig.Get().GetClientConfig(self.client_id)
90 # TODO(katesonia): Clean string uses in config. 143 # TODO(katesonia): Clean string uses in config.
91 topic = client_config['analysis_result_pubsub_topic'] 144 topic = client_config['analysis_result_pubsub_topic']
92 pubsub_util.PublishMessagesToTopic(messages_data, topic) 145 pubsub_util.PublishMessagesToTopic(messages_data, topic)
93 logging.info('Published %s analysis result for %s', client_id, 146 logging.info('Published %s analysis result for %s', self.client_id,
94 repr(crash_identifiers)) 147 repr(crash_identifiers))
95 148
96 149
97 class CrashWrapperPipeline(BasePipeline): 150 class CrashWrapperPipeline(BasePipeline):
98 # Arguments number differs from overridden method - pylint: disable=W0221 151 # Arguments number differs from overridden method - pylint: disable=W0221
99 def run(self, crash_identifiers, client_id): 152 def run(self, crash_identifiers):
100 run_analysis = yield CrashAnalysisPipeline(crash_identifiers, client_id) 153 run_analysis = yield CrashAnalysisPipeline(crash_identifiers, self.client_id )
101 with pipeline.After(run_analysis): 154 with pipeline.After(run_analysis):
102 yield PublishResultPipeline(crash_identifiers, client_id) 155 yield PublishResultPipeline(crash_identifiers, self.client_id)
103 156
104 157
158 # TODO(wrengr): this should surely be a method on Findit, rather than
159 # a standalone function.
105 @ndb.transactional 160 @ndb.transactional
106 def _NeedsNewAnalysis( 161 def _NeedsNewAnalysis(findit_client, crash_identifiers, report, historical_metad ata=None):
Sharu Jiang 2016/10/12 22:35:03 instead of passing in historical_metadata, we shou
wrengr 2016/10/18 23:13:54 I can add another optional argument for regression
107 crash_identifiers, chrome_version, signature, client_id, 162 analysis = findit_client.GetAnalysis(crash_identifiers)
108 platform, stack_trace, customized_data): 163 regression_range = DetectRegressionRange(historical_metadata)
109 analysis = findit_for_client.GetAnalysisForClient(crash_identifiers,
110 client_id)
111 regression_range = findit_for_client.GetRegressionRange(client_id,
112 customized_data)
113 if (analysis and not analysis.failed and 164 if (analysis and not analysis.failed and
114 regression_range == analysis.regression_range): 165 regression_range == analysis.regression_range):
115 logging.info('The analysis of %s has already been done.', 166 logging.info('The analysis of %s has already been done.',
116 repr(crash_identifiers)) 167 repr(crash_identifiers))
117 return False 168 return False
118 169
119 # Create analysis for findit to run if this is not a rerun. 170 # Create analysis for findit to run if this is not a rerun.
120 if not analysis: 171 if not analysis:
121 analysis = findit_for_client.CreateAnalysisForClient(crash_identifiers, 172 analysis = findit_client.CreateAnalysis(crash_identifiers)
122 client_id)
123 173
124 findit_for_client.ResetAnalysis(analysis, chrome_version, signature, 174 findit_client.UpdateAnalysis(analysis, report)
125 client_id, platform, stack_trace,
126 customized_data, regression_range)
127 return True 175 return True
128 176
129 177
130 def ScheduleNewAnalysisForCrash( 178 # TODO(wrengr): this should surely be a method on Findit, rather than
131 crash_identifiers, chrome_version, signature, client_id, 179 # a standalone function. To see whether we can actually do so,
132 platform, stack_trace, customized_data, 180 # note that this method is only called by crash.crash_pipeline and handlers.cras h.crash_handler
133 queue_name=constants.DEFAULT_QUEUE): 181 def ScheduleNewAnalysisForCrash(findit_client, crash_identifiers, report,
Sharu Jiang 2016/10/12 22:35:03 ? In crash_handler, this function is called as bel
wrengr 2016/10/18 23:13:54 Whoops, I missed that callsite when reordering/reo
182 channel, queue_name=constants.DEFAULT_QUEUE):
134 """Schedules an analysis.""" 183 """Schedules an analysis."""
184 if not isinstance(findit_client, findit.Findit):
185 raise TypeError('In the first argument to ScheduleNewAnalysisForCrash, '
186 'expected Findit object, but got %s object instead.' %
187 findit_client.__class__.__name__)
188
135 # Check policy and tune arguments if needed. 189 # Check policy and tune arguments if needed.
136 pass_policy, updated_analysis_args = findit_for_client.CheckPolicyForClient( 190 updated_report = findit_client.CheckPolicy(crash_identifiers, report, channel)
137 crash_identifiers, chrome_version, signature, 191 if not updated_report:
138 client_id, platform, stack_trace,
139 customized_data)
140
141 if not pass_policy:
142 return False 192 return False
143 193
144 if _NeedsNewAnalysis(*updated_analysis_args): 194 if not _NeedsNewAnalysis(findit_client, crash_identifiers, updated_report):
145 analysis_pipeline = CrashWrapperPipeline(crash_identifiers, client_id) 195 return False
146 # Attribute defined outside __init__ - pylint: disable=W0201
147 analysis_pipeline.target = appengine_util.GetTargetNameForModule(
148 constants.CRASH_BACKEND[client_id])
149 analysis_pipeline.start(queue_name=queue_name)
150 logging.info('New %s analysis is scheduled for %s', client_id,
151 repr(crash_identifiers))
152 return True
153 196
154 return False 197 analysis_pipeline = CrashWrapperPipeline(crash_identifiers, findit_client.clie nt_id)
Sharu Jiang 2016/10/12 22:35:03 Assume we already created findit_client here, why
wrengr 2016/10/18 23:13:53 The general goal is to avoid passing the client_id
198 # Attribute defined outside __init__ - pylint: disable=W0201
199 analysis_pipeline.target = appengine_util.GetTargetNameForModule(
200 constants.CRASH_BACKEND[findit_client.client_id])
201 analysis_pipeline.start(queue_name=queue_name)
202 logging.info('New %s analysis is scheduled for %s', findit_client.client_id,
203 repr(crash_identifiers))
204 return True
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698