Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(30)

Side by Side Diff: appengine/findit/crash/crash_pipeline.py

Issue 2414523002: [Findit] Reorganizing findit_for_*.py (Closed)
Patch Set: Addressing the crash_config.fracas issue Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2016 The Chromium Authors. All rights reserved. 1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 import copy 5 import copy
6 import json 6 import json
7 import logging 7 import logging
8 8
9 from google.appengine.api import app_identity
10 from google.appengine.ext import ndb
11
12 from common import appengine_util 9 from common import appengine_util
13 from common import constants
14 from common import pubsub_util 10 from common import pubsub_util
15 from common import time_util 11 from common import time_util
16 from common.pipeline_wrapper import BasePipeline 12 from common.pipeline_wrapper import BasePipeline
17 from common.pipeline_wrapper import pipeline 13 from common.pipeline_wrapper import pipeline
18 from crash import findit_for_client 14 from crash import findit_for_chromecrash
15 from crash import findit_for_clusterfuzz
16 from crash.type_enums import CrashClient
19 from model import analysis_status 17 from model import analysis_status
20 from model.crash.crash_config import CrashConfig 18 from model.crash.crash_config import CrashConfig
21 19
22 20
21 # TODO(wrengr): write complete coverage tests for this.
22 def FinditForClientID(client_id): # pragma: no cover
23 """Construct a Findit object from a client id string specifying the class.
24
25 We cannot pass Findit objects to the various methods in
26 |crash.crash_pipeline|, because they are not JSON serializable. For now,
27 we just serialize Findit objects as their |client_id|, and then use this
28 function to reconstruct them. Alas, this means we will lose various
29 other information stored in the Findit object (i.e., stuff that comes
30 from CrashConfig); which could lead to some hard-to-diagnose coherency
31 bugs, since the new Findit object will be based on the CrashConfig at
32 the time it's constructed, which may be different than the CrashConfig
33 at the time the previous Findit object was constructed. In the future
34 we should fix all this to serialize Findit objects in a more robust way.
35 """
36 assert isinstance(client_id, (str, unicode)), (
37 'FinditForClientID: expected string or unicode, but got %s'
38 % client_id.__class__.__name__)
39 # TODO(wrengr): it'd be nice to replace this with a single lookup in
40 # a dict; but that's buggy for some unknown reason.
41 if client_id == CrashClient.FRACAS:
42 cls = findit_for_chromecrash.FinditForFracas
43 elif client_id == CrashClient.CRACAS:
44 cls = findit_for_chromecrash.FinditForCracas
45 elif client_id == CrashClient.CLUSTERFUZZ:
46 cls = findit_for_clusterfuzz.FinditForClusterfuzz
47 else:
48 raise ValueError('FinditForClientID: '
49 'unknown or unsupported client %s' % client_id)
50
51 return cls(CrashWrapperPipeline)
52
53
54 # Some notes about the classes below, for people who are not
55 # familiar with AppEngine. The thing that really kicks everything off
56 # is |CrashWrapperPipeline.run|. However, an important thing to bear in
57 # mind is that whatever arguments are passed to that method will also
58 # be passed to the |run| method on whatever objects it yields. Thus,
59 # all the |run| methods across these different classes must have the same
60 # type. In practice, we end up passing all the arguments to the
61 # constructors, because we need to have the fields around for logging
62 # (e.g., in |finalized|); thus, there's nothing that needs to be passed
63 # to |run|. Another thing to bear in mind is that whatever objects
64 # |CrashWrapperPipeline.run| yields must be JSON-serializable. The base
65 # class handles most of that for us, so the force of this constraint is
66 # that all the arguments to the constructors for those classes must be
67 # JSON-serializable. Thus, we cannot actually pass a Findit object to
68 # the constructor, but rather must pass only the |client_id| (or whatever
69 # JSON dict) and then reconstruct the Findit object from that. Moreover,
70 # the |run| method and the |finalized| method will be run in different
71 # processes, so we will actually end up reconstructing the Findit object
72 # twice. Thus, we shouldn't store anything in the pipeline objects outside
73 # of what their constructors store.
74
23 class CrashBasePipeline(BasePipeline): 75 class CrashBasePipeline(BasePipeline):
24 def __init__(self, crash_identifiers, client_id): 76 def __init__(self, client_id, crash_identifiers):
25 super(CrashBasePipeline, self).__init__(crash_identifiers, client_id) 77 super(CrashBasePipeline, self).__init__(client_id, crash_identifiers)
26 self.crash_identifiers = crash_identifiers 78 self._crash_identifiers = crash_identifiers
27 self.client_id = client_id 79 self._findit = FinditForClientID(client_id)
80
81 @property
82 def client_id(self): # pragma: no cover
83 return self._findit.client_id
28 84
29 def run(self, *args, **kwargs): 85 def run(self, *args, **kwargs):
30 raise NotImplementedError() 86 raise NotImplementedError()
31 87
32 88
33 class CrashAnalysisPipeline(CrashBasePipeline): 89 class CrashAnalysisPipeline(CrashBasePipeline):
34 def _SetErrorIfAborted(self, aborted): 90 def finalized(self): # pragma: no cover
35 if not aborted: 91 if self.was_aborted:
36 return 92 self._PutAbortedError()
37 93
38 logging.error('Aborted analysis for %s', repr(self.crash_identifiers)) 94 # N.B., this method must be factored out for unittest reasons; since
39 analysis = findit_for_client.GetAnalysisForClient(self.crash_identifiers, 95 # |finalized| takes no arguments (by AppEngine's spec) and |was_aborted|
40 self.client_id) 96 # can't be altered directly.
97 def _PutAbortedError(self):
98 """Update the ndb.Model to indicate that this pipeline was aborted."""
99 logging.error('Aborted analysis for %s', repr(self._crash_identifiers))
100 analysis = self._findit.GetAnalysis(self._crash_identifiers)
41 analysis.status = analysis_status.ERROR 101 analysis.status = analysis_status.ERROR
42 analysis.put() 102 analysis.put()
43 103
44 def finalized(self): 104 # TODO(wrengr): we misplaced the coverage test; find it!
45 self._SetErrorIfAborted(self.was_aborted) 105 # Arguments number differs from overridden method - pylint: disable=W0221
106 def run(self):
107 # TODO(wrengr): shouldn't this method somehow call _NeedsNewAnalysis
108 # to guard against race conditions?
109 analysis = self._findit.GetAnalysis(self._crash_identifiers)
46 110
47 # Arguments number differs from overridden method - pylint: disable=W0221 111 # Update the model's status to say we're in the process of doing analysis.
48 def run(self, crash_identifiers, client_id):
49 analysis = findit_for_client.GetAnalysisForClient(crash_identifiers,
50 client_id)
51
52 # Update analysis status.
53 analysis.pipeline_status_path = self.pipeline_status_path() 112 analysis.pipeline_status_path = self.pipeline_status_path()
54 analysis.status = analysis_status.RUNNING 113 analysis.status = analysis_status.RUNNING
55 analysis.started_time = time_util.GetUTCNow() 114 analysis.started_time = time_util.GetUTCNow()
56 analysis.findit_version = appengine_util.GetCurrentVersion() 115 analysis.findit_version = appengine_util.GetCurrentVersion()
57 analysis.put() 116 analysis.put()
58 117
59 # Run the analysis. 118 # Actually do the analysis.
60 result, tags = findit_for_client.FindCulprit(analysis) 119 result, tags = self._findit.FindCulprit(analysis).ToDicts()
61 120
62 # Update analysis status and save the analysis result. 121 # Update model's status to say we're done, and save the results.
63 analysis.completed_time = time_util.GetUTCNow() 122 analysis.completed_time = time_util.GetUTCNow()
64 analysis.result = result 123 analysis.result = result
65 for tag_name, tag_value in tags.iteritems(): 124 for tag_name, tag_value in tags.iteritems():
66 # TODO(http://crbug.com/602702): make it possible to add arbitrary tags. 125 # TODO(http://crbug.com/602702): make it possible to add arbitrary tags.
67 if hasattr(analysis, tag_name): 126 if hasattr(analysis, tag_name):
68 setattr(analysis, tag_name, tag_value) 127 setattr(analysis, tag_name, tag_value)
69 analysis.status = analysis_status.COMPLETED 128 analysis.status = analysis_status.COMPLETED
70 analysis.put() 129 analysis.put()
71 130
72 131
73 class PublishResultPipeline(CrashBasePipeline): 132 class PublishResultPipeline(CrashBasePipeline):
74 def finalized(self): 133 def finalized(self):
75 if self.was_aborted: # pragma: no cover. 134 if self.was_aborted: # pragma: no cover.
76 logging.error('Failed to publish %s analysis result for %s', 135 logging.error('Failed to publish %s analysis result for %s',
77 repr(self.crash_identifiers), self.client_id) 136 repr(self._crash_identifiers), self.client_id)
78 137
79 138 # TODO(wrengr): we misplaced the coverage test; find it!
80 # Arguments number differs from overridden method - pylint: disable=W0221 139 # Arguments number differs from overridden method - pylint: disable=W0221
81 def run(self, crash_identifiers, client_id): 140 def run(self):
82 analysis = findit_for_client.GetAnalysisForClient(crash_identifiers, 141 analysis = self._findit.GetAnalysis(self._crash_identifiers)
83 client_id) 142 result = analysis.ToPublishableResult(self._crash_identifiers)
84 result = findit_for_client.GetPublishResultFromAnalysis(analysis,
85 crash_identifiers,
86 client_id)
87 messages_data = [json.dumps(result, sort_keys=True)] 143 messages_data = [json.dumps(result, sort_keys=True)]
88 144
89 client_config = CrashConfig.Get().GetClientConfig(client_id) 145 # TODO(wrengr): remove Findit's dependency on CrashConfig.
146 client_config = self._findit.config
stgao 2016/10/25 18:03:41 IMO, it's OK for the pipeline to depends on the co
wrengr 2016/10/25 19:49:54 My concern is that the config is volatile. Since w
90 # TODO(katesonia): Clean string uses in config. 147 # TODO(katesonia): Clean string uses in config.
91 topic = client_config['analysis_result_pubsub_topic'] 148 topic = client_config['analysis_result_pubsub_topic']
92 pubsub_util.PublishMessagesToTopic(messages_data, topic) 149 pubsub_util.PublishMessagesToTopic(messages_data, topic)
93 logging.info('Published %s analysis result for %s', client_id, 150 logging.info('Published %s analysis result for %s', self.client_id,
94 repr(crash_identifiers)) 151 repr(self._crash_identifiers))
95 152
96 153
97 class CrashWrapperPipeline(BasePipeline): 154 class CrashWrapperPipeline(BasePipeline):
155 """Fire off pipelines to (1) do the analysis and (2) publish results.
156
157 The reason we have analysis and publishing as separate pipelines is
158 because each of them can fail for independent reasons. E.g., if we
159 successfully finish the analysis, but then the publishing fails due to
160 network errors, we don't want to have to redo the analysis in order
161 to redo the publishing. We could try to cache the fact that analysis
162 succeeded in the pipeline object itself, but we'd have to be careful
163 because the |run| and |finalized| methods are executed in different
164 processes.
165 """
166 def __init__(self, client_id, crash_identifiers):
167 super(CrashWrapperPipeline, self).__init__(client_id, crash_identifiers)
168 self._crash_identifiers = crash_identifiers
169 self._client_id = client_id
170
171 # TODO(wrengr): we misplaced the coverage test; find it!
98 # Arguments number differs from overridden method - pylint: disable=W0221 172 # Arguments number differs from overridden method - pylint: disable=W0221
99 def run(self, crash_identifiers, client_id): 173 def run(self):
100 run_analysis = yield CrashAnalysisPipeline(crash_identifiers, client_id) 174 run_analysis = yield CrashAnalysisPipeline(
175 self._client_id, self._crash_identifiers)
101 with pipeline.After(run_analysis): 176 with pipeline.After(run_analysis):
102 yield PublishResultPipeline(crash_identifiers, client_id) 177 yield PublishResultPipeline(self._client_id, self._crash_identifiers)
103
104
105 @ndb.transactional
106 def _NeedsNewAnalysis(
107 crash_identifiers, chrome_version, signature, client_id,
108 platform, stack_trace, customized_data):
109 analysis = findit_for_client.GetAnalysisForClient(crash_identifiers,
110 client_id)
111 regression_range = findit_for_client.GetRegressionRange(client_id,
112 customized_data)
113 if (analysis and not analysis.failed and
114 regression_range == analysis.regression_range):
115 logging.info('The analysis of %s has already been done.',
116 repr(crash_identifiers))
117 return False
118
119 # Create analysis for findit to run if this is not a rerun.
120 if not analysis:
121 analysis = findit_for_client.CreateAnalysisForClient(crash_identifiers,
122 client_id)
123
124 findit_for_client.ResetAnalysis(analysis, chrome_version, signature,
125 client_id, platform, stack_trace,
126 customized_data, regression_range)
127 return True
128
129
130 def ScheduleNewAnalysisForCrash(
131 crash_identifiers, chrome_version, signature, client_id,
132 platform, stack_trace, customized_data,
133 queue_name=constants.DEFAULT_QUEUE):
134 """Schedules an analysis."""
135 # Check policy and tune arguments if needed.
136 pass_policy, updated_analysis_args = findit_for_client.CheckPolicyForClient(
137 crash_identifiers, chrome_version, signature,
138 client_id, platform, stack_trace,
139 customized_data)
140
141 if not pass_policy:
142 return False
143
144 if _NeedsNewAnalysis(*updated_analysis_args):
145 analysis_pipeline = CrashWrapperPipeline(crash_identifiers, client_id)
146 # Attribute defined outside __init__ - pylint: disable=W0201
147 analysis_pipeline.target = appengine_util.GetTargetNameForModule(
148 constants.CRASH_BACKEND[client_id])
149 analysis_pipeline.start(queue_name=queue_name)
150 logging.info('New %s analysis is scheduled for %s', client_id,
151 repr(crash_identifiers))
152 return True
153
154 return False
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698