Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(169)

Side by Side Diff: appengine/findit/crash/crash_pipeline.py

Issue 2414523002: [Findit] Reorganizing findit_for_*.py (Closed)
Patch Set: linting for coverage Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2016 The Chromium Authors. All rights reserved. 1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 import copy 5 import copy
6 import json 6 import json
7 import logging 7 import logging
8 8
9 from google.appengine.api import app_identity
10 from google.appengine.ext import ndb
11
12 from common import appengine_util 9 from common import appengine_util
13 from common import constants
14 from common import pubsub_util 10 from common import pubsub_util
15 from common import time_util 11 from common import time_util
16 from common.pipeline_wrapper import BasePipeline 12 from common.pipeline_wrapper import BasePipeline
17 from common.pipeline_wrapper import pipeline 13 from common.pipeline_wrapper import pipeline
18 from crash import findit_for_client 14 from crash import findit_for_chromecrash
15 from crash import findit_for_clusterfuzz
16 from crash.type_enums import CrashClient
19 from model import analysis_status 17 from model import analysis_status
20 from model.crash.crash_config import CrashConfig 18 from model.crash.crash_config import CrashConfig
21 19
22 20
21 def FinditForClientID(client_id):
22 """Construct a Findit object from a client id string specifying the class.
23
24 We cannot pass Findit objects to the various methods in
25 |crash.crash_pipeline|, because they are not JSON serializable. For now,
26 we just serialize Findit objects as their |client_id|, and then use this
27 function to reconstruct them. Alas, this means we will lose various
28 other information stored in the Findit object (i.e., stuff that comes
29 from CrashConfig); which could lead to some hard-to-diagnose coherency
30 bugs, since the new Findit object will be based on the CrashConfig at
31 the time it's constructed, which may be different than the CrashConfig
32 at the time the previous Findit object was constructed. In the future
33 we should fix all this to serialize Findit objects in a more robust way.
34 """
35 assert isinstance(client_id, (str, unicode)), (
36 'FinditForClientID: expected string or unicode, but got %s'
37 % client_id.__class__.__name__)
38 # TODO(wrengr): it'd be nice to replace this with a single lookup in
39 # a dict; but that's buggy for some unknown reason.
40 if client_id == CrashClient.FRACAS:
41 cls = findit_for_chromecrash.FinditForFracas
42 elif client_id == CrashClient.CRACAS: # pragma: no cover
43 cls = findit_for_chromecrash.FinditForCracas
44 elif client_id == CrashClient.CLUSTERFUZZ: # pragma: no cover
45 cls = findit_for_clusterfuzz.FinditForClusterfuzz
46 else: # pragma: no cover
47 raise ValueError('FinditForClientID: '
48 'unknown or unsupported client %s' % client_id)
49
50 return cls(CrashWrapperPipeline)
51
52
53 # Some notes about the classes below, for people who are not
54 # familiar with AppEngine. The thing that really kicks everything off
55 # is |CrashWrapperPipeline.run|. However, an important thing to bear in
56 # mind is that whatever arguments are passed to that method will also
57 # be passed to the |run| method on whatever objects it yields. Thus,
58 # all the |run| methods across these different classes must have the same
59 # type. In practice, we end up passing all the arguments to the
60 # constructors, because we need to have the fields around for logging
61 # (e.g., in |finalized|); thus, there's nothing that needs to be passed
62 # to |run|. Another thing to bear in mind is that whatever objects
63 # |CrashWrapperPipeline.run| yields must be JSON-serializable. The base
64 # class handles most of that for us, so the force of this constraint is
65 # that all the arguments to the constructors for those classes must be
66 # JSON-serializable. Thus, we cannot actually pass a Findit object to
67 # the constructor, but rather must pass only the |client_id| (or whatever
68 # JSON dict) and then reconstruct the Findit object from that. Moreover,
69 # the |run| method and the |finalized| method will be run in different
70 # processes, so we will actually end up reconstructing the Findit object
71 # twice. Thus, we shouldn't store anything in the pipeline objects outside
72 # of what their constructors store.
73
23 class CrashBasePipeline(BasePipeline): 74 class CrashBasePipeline(BasePipeline):
24 def __init__(self, crash_identifiers, client_id): 75 def __init__(self, client_id, crash_identifiers):
25 super(CrashBasePipeline, self).__init__(crash_identifiers, client_id) 76 super(CrashBasePipeline, self).__init__(client_id, crash_identifiers)
26 self.crash_identifiers = crash_identifiers 77 self._crash_identifiers = crash_identifiers
27 self.client_id = client_id 78 self._findit = FinditForClientID(client_id)
79
80 @property
81 def client_id(self): # pragma: no cover
82 return self._findit.client_id
28 83
29 def run(self, *args, **kwargs): 84 def run(self, *args, **kwargs):
30 raise NotImplementedError() 85 raise NotImplementedError()
31 86
32 87
33 class CrashAnalysisPipeline(CrashBasePipeline): 88 class CrashAnalysisPipeline(CrashBasePipeline):
34 def _SetErrorIfAborted(self, aborted): 89 def finalized(self): # pragma: no cover
35 if not aborted: 90 if self.was_aborted:
36 return 91 self._PutAbortedError()
37 92
38 logging.error('Aborted analysis for %s', repr(self.crash_identifiers)) 93 # N.B., this method must be factored out for unittest reasons; since
39 analysis = findit_for_client.GetAnalysisForClient(self.crash_identifiers, 94 # |finalized| takes no arguments (by AppEngine's spec) and |was_aborted|
40 self.client_id) 95 # can't be altered directly.
96 def _PutAbortedError(self):
97 """Update the ndb.Model to indicate that this pipeline was aborted."""
98 logging.error('Aborted analysis for %s', repr(self._crash_identifiers))
99 analysis = self._findit.GetAnalysis(self._crash_identifiers)
41 analysis.status = analysis_status.ERROR 100 analysis.status = analysis_status.ERROR
42 analysis.put() 101 analysis.put()
43 102
44 def finalized(self): 103 # TODO(wrengr): we misplaced the coverage test; find it!
45 self._SetErrorIfAborted(self.was_aborted) 104 # Arguments number differs from overridden method - pylint: disable=W0221
105 def run(self):
106 # TODO(wrengr): shouldn't this method somehow call _NeedsNewAnalysis
107 # to guard against race conditions?
108 analysis = self._findit.GetAnalysis(self._crash_identifiers)
46 109
47 # Arguments number differs from overridden method - pylint: disable=W0221 110 # Update the model's status to say we're in the process of doing analysis.
48 def run(self, crash_identifiers, client_id):
49 analysis = findit_for_client.GetAnalysisForClient(crash_identifiers,
50 client_id)
51
52 # Update analysis status.
53 analysis.pipeline_status_path = self.pipeline_status_path() 111 analysis.pipeline_status_path = self.pipeline_status_path()
54 analysis.status = analysis_status.RUNNING 112 analysis.status = analysis_status.RUNNING
55 analysis.started_time = time_util.GetUTCNow() 113 analysis.started_time = time_util.GetUTCNow()
56 analysis.findit_version = appengine_util.GetCurrentVersion() 114 analysis.findit_version = appengine_util.GetCurrentVersion()
57 analysis.put() 115 analysis.put()
58 116
59 # Run the analysis. 117 # Actually do the analysis.
60 result, tags = findit_for_client.FindCulprit(analysis) 118 result, tags = self._findit.FindCulprit(analysis).ToDicts()
61 119
62 # Update analysis status and save the analysis result. 120 # Update model's status to say we're done, and save the results.
63 analysis.completed_time = time_util.GetUTCNow() 121 analysis.completed_time = time_util.GetUTCNow()
64 analysis.result = result 122 analysis.result = result
65 for tag_name, tag_value in tags.iteritems(): 123 for tag_name, tag_value in tags.iteritems():
66 # TODO(http://crbug.com/602702): make it possible to add arbitrary tags. 124 # TODO(http://crbug.com/602702): make it possible to add arbitrary tags.
67 if hasattr(analysis, tag_name): 125 if hasattr(analysis, tag_name):
68 setattr(analysis, tag_name, tag_value) 126 setattr(analysis, tag_name, tag_value)
69 analysis.status = analysis_status.COMPLETED 127 analysis.status = analysis_status.COMPLETED
70 analysis.put() 128 analysis.put()
71 129
72 130
73 class PublishResultPipeline(CrashBasePipeline): 131 class PublishResultPipeline(CrashBasePipeline):
74 def finalized(self): 132 def finalized(self):
75 if self.was_aborted: # pragma: no cover. 133 if self.was_aborted: # pragma: no cover.
76 logging.error('Failed to publish %s analysis result for %s', 134 logging.error('Failed to publish %s analysis result for %s',
77 repr(self.crash_identifiers), self.client_id) 135 repr(self._crash_identifiers), self.client_id)
78 136
79 137 # TODO(wrengr): we misplaced the coverage test; find it!
80 # Arguments number differs from overridden method - pylint: disable=W0221 138 # Arguments number differs from overridden method - pylint: disable=W0221
81 def run(self, crash_identifiers, client_id): 139 def run(self):
82 analysis = findit_for_client.GetAnalysisForClient(crash_identifiers, 140 analysis = self._findit.GetAnalysis(self._crash_identifiers)
83 client_id) 141 result = analysis.ToPublishableResult(self._crash_identifiers)
84 result = findit_for_client.GetPublishResultFromAnalysis(analysis,
85 crash_identifiers,
86 client_id)
87 messages_data = [json.dumps(result, sort_keys=True)] 142 messages_data = [json.dumps(result, sort_keys=True)]
88 143
89 client_config = CrashConfig.Get().GetClientConfig(client_id) 144 # TODO(wrengr): remove Findit's dependency on CrashConfig.
145 client_config = self._findit.config
90 # TODO(katesonia): Clean string uses in config. 146 # TODO(katesonia): Clean string uses in config.
91 topic = client_config['analysis_result_pubsub_topic'] 147 topic = client_config['analysis_result_pubsub_topic']
92 pubsub_util.PublishMessagesToTopic(messages_data, topic) 148 pubsub_util.PublishMessagesToTopic(messages_data, topic)
93 logging.info('Published %s analysis result for %s', client_id, 149 logging.info('Published %s analysis result for %s', self.client_id,
94 repr(crash_identifiers)) 150 repr(self._crash_identifiers))
95 151
96 152
97 class CrashWrapperPipeline(BasePipeline): 153 class CrashWrapperPipeline(BasePipeline):
154 """Fire off pipelines to (1) do the analysis and (2) publish results.
155
156 The reason we have analysis and publishing as separate pipelines is
157 because each of them can fail for independent reasons. E.g., if we
158 successfully finish the analysis, but then the publishing fails due to
159 network errors, we don't want to have to redo the analysis in order
160 to redo the publishing. We could try to cache the fact that analysis
161 succeeded in the pipeline object itself, but we'd have to be careful
162 because the |run| and |finalized| methods are executed in different
163 processes.
164 """
165 def __init__(self, client_id, crash_identifiers):
166 super(CrashWrapperPipeline, self).__init__(client_id, crash_identifiers)
167 self._crash_identifiers = crash_identifiers
168 self._client_id = client_id
169
170 # TODO(wrengr): we misplaced the coverage test; find it!
98 # Arguments number differs from overridden method - pylint: disable=W0221 171 # Arguments number differs from overridden method - pylint: disable=W0221
99 def run(self, crash_identifiers, client_id): 172 def run(self):
100 run_analysis = yield CrashAnalysisPipeline(crash_identifiers, client_id) 173 run_analysis = yield CrashAnalysisPipeline(
174 self._client_id, self._crash_identifiers)
101 with pipeline.After(run_analysis): 175 with pipeline.After(run_analysis):
102 yield PublishResultPipeline(crash_identifiers, client_id) 176 yield PublishResultPipeline(self._client_id, self._crash_identifiers)
103
104
105 @ndb.transactional
106 def _NeedsNewAnalysis(
107 crash_identifiers, chrome_version, signature, client_id,
108 platform, stack_trace, customized_data):
109 analysis = findit_for_client.GetAnalysisForClient(crash_identifiers,
110 client_id)
111 regression_range = findit_for_client.GetRegressionRange(client_id,
112 customized_data)
113 if (analysis and not analysis.failed and
114 regression_range == analysis.regression_range):
115 logging.info('The analysis of %s has already been done.',
116 repr(crash_identifiers))
117 return False
118
119 # Create analysis for findit to run if this is not a rerun.
120 if not analysis:
121 analysis = findit_for_client.CreateAnalysisForClient(crash_identifiers,
122 client_id)
123
124 findit_for_client.ResetAnalysis(analysis, chrome_version, signature,
125 client_id, platform, stack_trace,
126 customized_data, regression_range)
127 return True
128
129
130 def ScheduleNewAnalysisForCrash(
131 crash_identifiers, chrome_version, signature, client_id,
132 platform, stack_trace, customized_data,
133 queue_name=constants.DEFAULT_QUEUE):
134 """Schedules an analysis."""
135 # Check policy and tune arguments if needed.
136 pass_policy, updated_analysis_args = findit_for_client.CheckPolicyForClient(
137 crash_identifiers, chrome_version, signature,
138 client_id, platform, stack_trace,
139 customized_data)
140
141 if not pass_policy:
142 return False
143
144 if _NeedsNewAnalysis(*updated_analysis_args):
145 analysis_pipeline = CrashWrapperPipeline(crash_identifiers, client_id)
146 # Attribute defined outside __init__ - pylint: disable=W0201
147 analysis_pipeline.target = appengine_util.GetTargetNameForModule(
148 constants.CRASH_BACKEND[client_id])
149 analysis_pipeline.start(queue_name=queue_name)
150 logging.info('New %s analysis is scheduled for %s', client_id,
151 repr(crash_identifiers))
152 return True
153
154 return False
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698