Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(393)

Side by Side Diff: appengine/findit/crash/crash_pipeline.py

Issue 2414523002: [Findit] Reorganizing findit_for_*.py (Closed)
Patch Set: rebasing against recently landed cls Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2016 The Chromium Authors. All rights reserved. 1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 import copy 5 import copy
6 import json 6 import json
7 import logging 7 import logging
8 8
9 from google.appengine.api import app_identity
10 from google.appengine.ext import ndb
11
12 from common import appengine_util 9 from common import appengine_util
13 from common import constants 10 from common import constants
14 from common import git_repository 11 from common import git_repository
15 from common import pubsub_util 12 from common import pubsub_util
16 from common import time_util 13 from common import time_util
17 from common.http_client_appengine import HttpClientAppengine 14 from common.http_client_appengine import HttpClientAppengine
18 from common.pipeline_wrapper import BasePipeline 15 from common.pipeline_wrapper import BasePipeline
19 from common.pipeline_wrapper import pipeline 16 from common.pipeline_wrapper import pipeline
20 from crash import findit_for_client 17 from crash import findit_for_chromecrash
18 from crash import findit_for_clusterfuzz
19 from crash.type_enums import CrashClient
21 from model import analysis_status 20 from model import analysis_status
22 from model.crash.crash_config import CrashConfig 21 from model.crash.crash_config import CrashConfig
23 22
24 23
24 # TODO(http://crbug.com/659346): write complete coverage tests for this.
25 def FinditForClientID(client_id): # pragma: no cover
26 """Construct a Findit object from a client id string specifying the class.
27
28 We cannot pass Findit objects to the various methods in
29 |crash.crash_pipeline|, because they are not JSON serializable. For now,
30 we just serialize Findit objects as their |client_id|, and then use this
31 function to reconstruct them. Alas, this means we will lose various
32 other information stored in the Findit object (i.e., stuff that comes
33 from CrashConfig); which could lead to some hard-to-diagnose coherency
34 bugs, since the new Findit object will be based on the CrashConfig at
35 the time it's constructed, which may be different than the CrashConfig
36 at the time the previous Findit object was constructed. In the future
37 we should fix all this to serialize Findit objects in a more robust way.
38 """
39 assert isinstance(client_id, (str, unicode)), (
40 'FinditForClientID: expected string or unicode, but got %s'
41 % client_id.__class__.__name__)
42 # TODO(wrengr): it'd be nice to replace this with a single lookup in
43 # a dict; but that's buggy for some unknown reason.
44 if client_id == CrashClient.FRACAS:
45 cls = findit_for_chromecrash.FinditForFracas
46 elif client_id == CrashClient.CRACAS:
47 cls = findit_for_chromecrash.FinditForCracas
48 elif client_id == CrashClient.CLUSTERFUZZ:
49 cls = findit_for_clusterfuzz.FinditForClusterfuzz
50 else:
51 raise ValueError('FinditForClientID: '
52 'unknown or unsupported client %s' % client_id)
53
54 return cls(
55 git_repository.GitRepository(http_client=HttpClientAppengine()),
56 CrashWrapperPipeline)
57
58
59 # Some notes about the classes below, for people who are not
60 # familiar with AppEngine. The thing that really kicks everything off
61 # is |CrashWrapperPipeline.run|. However, an important thing to bear in
62 # mind is that whatever arguments are passed to that method will also
63 # be passed to the |run| method on whatever objects it yields. Thus,
64 # all the |run| methods across these different classes must have the same
65 # type. In practice, we end up passing all the arguments to the
66 # constructors, because we need to have the fields around for logging
67 # (e.g., in |finalized|); thus, there's nothing that needs to be passed
68 # to |run|. Another thing to bear in mind is that whatever objects
69 # |CrashWrapperPipeline.run| yields must be JSON-serializable. The base
70 # class handles most of that for us, so the force of this constraint is
71 # that all the arguments to the constructors for those classes must be
72 # JSON-serializable. Thus, we cannot actually pass a Findit object to
73 # the constructor, but rather must pass only the |client_id| (or whatever
74 # JSON dict) and then reconstruct the Findit object from that. Moreover,
75 # the |run| method and the |finalized| method will be run in different
76 # processes, so we will actually end up reconstructing the Findit object
77 # twice. Thus, we shouldn't store anything in the pipeline objects outside
78 # of what their constructors store.
79
25 class CrashBasePipeline(BasePipeline): 80 class CrashBasePipeline(BasePipeline):
26 def __init__(self, crash_identifiers, client_id): 81 def __init__(self, client_id, crash_identifiers):
27 super(CrashBasePipeline, self).__init__(crash_identifiers, client_id) 82 super(CrashBasePipeline, self).__init__(client_id, crash_identifiers)
28 self.crash_identifiers = crash_identifiers 83 self._crash_identifiers = crash_identifiers
29 self.client_id = client_id 84 self._findit = FinditForClientID(client_id)
85
86 @property
87 def client_id(self): # pragma: no cover
88 return self._findit.client_id
30 89
31 def run(self, *args, **kwargs): 90 def run(self, *args, **kwargs):
32 raise NotImplementedError() 91 raise NotImplementedError()
33 92
34 93
35 class CrashAnalysisPipeline(CrashBasePipeline): 94 class CrashAnalysisPipeline(CrashBasePipeline):
36 def _SetErrorIfAborted(self, aborted): 95 def finalized(self): # pragma: no cover
37 if not aborted: 96 if self.was_aborted:
38 return 97 self._PutAbortedError()
39 98
40 logging.error('Aborted analysis for %s', repr(self.crash_identifiers)) 99 # N.B., this method must be factored out for unittest reasons; since
41 analysis = findit_for_client.GetAnalysisForClient(self.crash_identifiers, 100 # |finalized| takes no arguments (by AppEngine's spec) and |was_aborted|
42 self.client_id) 101 # can't be altered directly.
102 def _PutAbortedError(self):
103 """Update the ndb.Model to indicate that this pipeline was aborted."""
104 logging.error('Aborted analysis for %s', repr(self._crash_identifiers))
105 analysis = self._findit.GetAnalysis(self._crash_identifiers)
43 analysis.status = analysis_status.ERROR 106 analysis.status = analysis_status.ERROR
44 analysis.put() 107 analysis.put()
45 108
46 def finalized(self): 109 # TODO(http://crbug.com/659346): we misplaced the coverage test; find it!
47 self._SetErrorIfAborted(self.was_aborted) 110 # Arguments number differs from overridden method - pylint: disable=W0221
111 def run(self):
112 # TODO(wrengr): shouldn't this method somehow call _NeedsNewAnalysis
113 # to guard against race conditions?
114 analysis = self._findit.GetAnalysis(self._crash_identifiers)
48 115
49 # Arguments number differs from overridden method - pylint: disable=W0221 116 # Update the model's status to say we're in the process of doing analysis.
50 def run(self, crash_identifiers, client_id):
51 analysis = findit_for_client.GetAnalysisForClient(crash_identifiers,
52 client_id)
53
54 # Update analysis status.
55 analysis.pipeline_status_path = self.pipeline_status_path() 117 analysis.pipeline_status_path = self.pipeline_status_path()
56 analysis.status = analysis_status.RUNNING 118 analysis.status = analysis_status.RUNNING
57 analysis.started_time = time_util.GetUTCNow() 119 analysis.started_time = time_util.GetUTCNow()
58 analysis.findit_version = appengine_util.GetCurrentVersion() 120 analysis.findit_version = appengine_util.GetCurrentVersion()
59 analysis.put() 121 analysis.put()
60 122
61 # Run the analysis. 123 # Actually do the analysis.
62 result, tags = findit_for_client.FindCulprit( 124 result, tags = self._findit.FindCulprit(
63 analysis, git_repository.GitRepository( 125 analysis,
64 http_client=HttpClientAppengine())) 126 git_repository.GitRepository(http_client=HttpClientAppengine())
127 ).ToDicts()
65 128
66 # Update analysis status and save the analysis result. 129 # Update model's status to say we're done, and save the results.
67 analysis.completed_time = time_util.GetUTCNow() 130 analysis.completed_time = time_util.GetUTCNow()
68 analysis.result = result 131 analysis.result = result
69 for tag_name, tag_value in tags.iteritems(): 132 for tag_name, tag_value in tags.iteritems():
70 # TODO(http://crbug.com/602702): make it possible to add arbitrary tags. 133 # TODO(http://crbug.com/602702): make it possible to add arbitrary tags.
71 if hasattr(analysis, tag_name): 134 if hasattr(analysis, tag_name):
72 setattr(analysis, tag_name, tag_value) 135 setattr(analysis, tag_name, tag_value)
73 analysis.status = analysis_status.COMPLETED 136 analysis.status = analysis_status.COMPLETED
74 analysis.put() 137 analysis.put()
75 138
76 139
77 class PublishResultPipeline(CrashBasePipeline): 140 class PublishResultPipeline(CrashBasePipeline):
78 def finalized(self): 141 def finalized(self):
79 if self.was_aborted: # pragma: no cover. 142 if self.was_aborted: # pragma: no cover.
80 logging.error('Failed to publish %s analysis result for %s', 143 logging.error('Failed to publish %s analysis result for %s',
81 repr(self.crash_identifiers), self.client_id) 144 repr(self._crash_identifiers), self.client_id)
82 145
83 146 # TODO(http://crbug.com/659346): we misplaced the coverage test; find it!
84 # Arguments number differs from overridden method - pylint: disable=W0221 147 # Arguments number differs from overridden method - pylint: disable=W0221
85 def run(self, crash_identifiers, client_id): 148 def run(self):
86 analysis = findit_for_client.GetAnalysisForClient(crash_identifiers, 149 analysis = self._findit.GetAnalysis(self._crash_identifiers)
87 client_id) 150 result = analysis.ToPublishableResult(self._crash_identifiers)
88 result = findit_for_client.GetPublishResultFromAnalysis(analysis,
89 crash_identifiers,
90 client_id)
91 messages_data = [json.dumps(result, sort_keys=True)] 151 messages_data = [json.dumps(result, sort_keys=True)]
92 152
93 client_config = CrashConfig.Get().GetClientConfig(client_id) 153 # TODO(http://crbug.com/659354): remove Findit's dependency on CrashConfig.
154 client_config = self._findit.config
94 # TODO(katesonia): Clean string uses in config. 155 # TODO(katesonia): Clean string uses in config.
95 topic = client_config['analysis_result_pubsub_topic'] 156 topic = client_config['analysis_result_pubsub_topic']
96 pubsub_util.PublishMessagesToTopic(messages_data, topic) 157 pubsub_util.PublishMessagesToTopic(messages_data, topic)
97 logging.info('Published %s analysis result for %s', client_id, 158 logging.info('Published %s analysis result for %s', self.client_id,
98 repr(crash_identifiers)) 159 repr(self._crash_identifiers))
99 160
100 161
101 class CrashWrapperPipeline(BasePipeline): 162 class CrashWrapperPipeline(BasePipeline):
163 """Fire off pipelines to (1) do the analysis and (2) publish results.
164
165 The reason we have analysis and publishing as separate pipelines is
166 because each of them can fail for independent reasons. E.g., if we
167 successfully finish the analysis, but then the publishing fails due to
168 network errors, we don't want to have to redo the analysis in order
169 to redo the publishing. We could try to cache the fact that analysis
170 succeeded in the pipeline object itself, but we'd have to be careful
171 because the |run| and |finalized| methods are executed in different
172 processes.
173 """
174 def __init__(self, client_id, crash_identifiers):
175 super(CrashWrapperPipeline, self).__init__(client_id, crash_identifiers)
176 self._crash_identifiers = crash_identifiers
177 self._client_id = client_id
178
179 # TODO(http://crbug.com/659346): we misplaced the coverage test; find it!
102 # Arguments number differs from overridden method - pylint: disable=W0221 180 # Arguments number differs from overridden method - pylint: disable=W0221
103 def run(self, crash_identifiers, client_id): 181 def run(self):
104 run_analysis = yield CrashAnalysisPipeline(crash_identifiers, client_id) 182 run_analysis = yield CrashAnalysisPipeline(
183 self._client_id, self._crash_identifiers)
105 with pipeline.After(run_analysis): 184 with pipeline.After(run_analysis):
106 yield PublishResultPipeline(crash_identifiers, client_id) 185 yield PublishResultPipeline(self._client_id, self._crash_identifiers)
107
108
109 @ndb.transactional
110 def _NeedsNewAnalysis(
111 crash_identifiers, chrome_version, signature, client_id,
112 platform, stack_trace, customized_data):
113 analysis = findit_for_client.GetAnalysisForClient(crash_identifiers,
114 client_id)
115 regression_range = findit_for_client.GetRegressionRange(client_id,
116 customized_data)
117 if (analysis and not analysis.failed and
118 regression_range == analysis.regression_range):
119 logging.info('The analysis of %s has already been done.',
120 repr(crash_identifiers))
121 return False
122
123 # Create analysis for findit to run if this is not a rerun.
124 if not analysis:
125 analysis = findit_for_client.CreateAnalysisForClient(crash_identifiers,
126 client_id)
127
128 findit_for_client.ResetAnalysis(analysis, chrome_version, signature,
129 client_id, platform, stack_trace,
130 customized_data, regression_range)
131 return True
132
133
134 def ScheduleNewAnalysisForCrash(
135 crash_identifiers, chrome_version, signature, client_id,
136 platform, stack_trace, customized_data,
137 queue_name=constants.DEFAULT_QUEUE):
138 """Schedules an analysis."""
139 # Check policy and tune arguments if needed.
140 pass_policy, updated_analysis_args = findit_for_client.CheckPolicyForClient(
141 crash_identifiers, chrome_version, signature,
142 client_id, platform, stack_trace,
143 customized_data)
144
145 if not pass_policy:
146 return False
147
148 if _NeedsNewAnalysis(*updated_analysis_args):
149 analysis_pipeline = CrashWrapperPipeline(crash_identifiers, client_id)
150 # Attribute defined outside __init__ - pylint: disable=W0201
151 analysis_pipeline.target = appengine_util.GetTargetNameForModule(
152 constants.CRASH_BACKEND[client_id])
153 analysis_pipeline.start(queue_name=queue_name)
154 logging.info('New %s analysis is scheduled for %s', client_id,
155 repr(crash_identifiers))
156 return True
157
158 return False
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698