Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(192)

Side by Side Diff: appengine/findit/crash/crash_pipeline.py

Issue 2414523002: [Findit] Reorganizing findit_for_*.py (Closed)
Patch Set: Finally fixed the mock tests! Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/findit/crash/component_classifier.py ('k') | appengine/findit/crash/crash_report.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright 2016 The Chromium Authors. All rights reserved. 1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 import copy 5 import copy
6 import json 6 import json
7 import logging 7 import logging
8 8
9 from google.appengine.api import app_identity
10 from google.appengine.ext import ndb
11
12 from common import appengine_util 9 from common import appengine_util
13 from common import constants 10 from common import constants
14 from common import git_repository 11 from common import git_repository
15 from common import pubsub_util 12 from common import pubsub_util
16 from common import time_util 13 from common import time_util
17 from common.http_client_appengine import HttpClientAppengine 14 from common.http_client_appengine import HttpClientAppengine
18 from common.pipeline_wrapper import BasePipeline 15 from common.pipeline_wrapper import BasePipeline
19 from common.pipeline_wrapper import pipeline 16 from common.pipeline_wrapper import pipeline
20 from crash import findit_for_client 17 from crash import findit_for_chromecrash
18 from crash import findit_for_clusterfuzz
19 from crash.type_enums import CrashClient
21 from model import analysis_status 20 from model import analysis_status
22 from model.crash.crash_config import CrashConfig 21 from model.crash.crash_config import CrashConfig
23 22
24 23
24 # TODO(http://crbug.com/659346): write complete coverage tests for this.
25 def FinditForClientID(client_id): # pragma: no cover
26 """Construct a Findit object from a client id string specifying the class.
27
28 We cannot pass Findit objects to the various methods in
29 |crash.crash_pipeline|, because they are not JSON serializable. For now,
30 we just serialize Findit objects as their |client_id|, and then use this
31 function to reconstruct them. Alas, this means we will lose various
32 other information stored in the Findit object (i.e., stuff that comes
33 from CrashConfig); which could lead to some hard-to-diagnose coherency
34 bugs, since the new Findit object will be based on the CrashConfig at
35 the time it's constructed, which may be different than the CrashConfig
36 at the time the previous Findit object was constructed. In the future
37 we should fix all this to serialize Findit objects in a more robust way.
38 """
39 assert isinstance(client_id, (str, unicode)), (
40 'FinditForClientID: expected string or unicode, but got %s'
41 % client_id.__class__.__name__)
42 # TODO(wrengr): it'd be nice to replace this with a single lookup in
43 # a dict; but that's buggy for some unknown reason.
44 if client_id == CrashClient.FRACAS:
45 cls = findit_for_chromecrash.FinditForFracas
46 elif client_id == CrashClient.CRACAS:
47 cls = findit_for_chromecrash.FinditForCracas
48 elif client_id == CrashClient.CLUSTERFUZZ:
49 cls = findit_for_clusterfuzz.FinditForClusterfuzz
50 else:
51 raise ValueError('FinditForClientID: '
52 'unknown or unsupported client %s' % client_id)
53
54 return cls(
55 git_repository.GitRepository(http_client=HttpClientAppengine()),
56 CrashWrapperPipeline)
57
58
59 # Some notes about the classes below, for people who are not
60 # familiar with AppEngine. The thing that really kicks everything off
61 # is |CrashWrapperPipeline.run|. However, an important thing to bear in
62 # mind is that whatever arguments are passed to that method will also
63 # be passed to the |run| method on whatever objects it yields. Thus,
64 # all the |run| methods across these different classes must have the same
65 # type. In practice, we end up passing all the arguments to the
66 # constructors, because we need to have the fields around for logging
67 # (e.g., in |finalized|); thus, there's nothing that needs to be passed
68 # to |run|. Another thing to bear in mind is that whatever objects
69 # |CrashWrapperPipeline.run| yields must be JSON-serializable. The base
70 # class handles most of that for us, so the force of this constraint is
71 # that all the arguments to the constructors for those classes must be
72 # JSON-serializable. Thus, we cannot actually pass a Findit object to
73 # the constructor, but rather must pass only the |client_id| (or whatever
74 # JSON dict) and then reconstruct the Findit object from that. Moreover,
75 # the |run| method and the |finalized| method will be run in different
76 # processes, so we will actually end up reconstructing the Findit object
77 # twice. Thus, we shouldn't store anything in the pipeline objects outside
78 # of what their constructors store.
79
25 class CrashBasePipeline(BasePipeline): 80 class CrashBasePipeline(BasePipeline):
26 def __init__(self, crash_identifiers, client_id): 81 def __init__(self, client_id, crash_identifiers):
27 super(CrashBasePipeline, self).__init__(crash_identifiers, client_id) 82 super(CrashBasePipeline, self).__init__(client_id, crash_identifiers)
28 self.crash_identifiers = crash_identifiers 83 self._crash_identifiers = crash_identifiers
29 self.client_id = client_id 84 self._findit = FinditForClientID(client_id)
85
86 @property
87 def client_id(self): # pragma: no cover
88 return self._findit.client_id
30 89
31 def run(self, *args, **kwargs): 90 def run(self, *args, **kwargs):
32 raise NotImplementedError() 91 raise NotImplementedError()
33 92
34 93
35 class CrashAnalysisPipeline(CrashBasePipeline): 94 class CrashAnalysisPipeline(CrashBasePipeline):
36 def _SetErrorIfAborted(self, aborted): 95 def finalized(self): # pragma: no cover
37 if not aborted: 96 if self.was_aborted:
38 return 97 self._PutAbortedError()
39 98
40 logging.error('Aborted analysis for %s', repr(self.crash_identifiers)) 99 # N.B., this method must be factored out for unittest reasons; since
41 analysis = findit_for_client.GetAnalysisForClient(self.crash_identifiers, 100 # |finalized| takes no arguments (by AppEngine's spec) and |was_aborted|
42 self.client_id) 101 # can't be altered directly.
102 def _PutAbortedError(self):
103 """Update the ndb.Model to indicate that this pipeline was aborted."""
104 logging.error('Aborted analysis for %s', repr(self._crash_identifiers))
105 analysis = self._findit.GetAnalysis(self._crash_identifiers)
43 analysis.status = analysis_status.ERROR 106 analysis.status = analysis_status.ERROR
44 analysis.put() 107 analysis.put()
45 108
46 def finalized(self): 109 # TODO(http://crbug.com/659346): we misplaced the coverage test; find it!
47 self._SetErrorIfAborted(self.was_aborted) 110 # Arguments number differs from overridden method - pylint: disable=W0221
111 def run(self):
112 # TODO(wrengr): shouldn't this method somehow call _NeedsNewAnalysis
113 # to guard against race conditions?
114 analysis = self._findit.GetAnalysis(self._crash_identifiers)
48 115
49 # Arguments number differs from overridden method - pylint: disable=W0221 116 # Update the model's status to say we're in the process of doing analysis.
50 def run(self, crash_identifiers, client_id):
51 analysis = findit_for_client.GetAnalysisForClient(crash_identifiers,
52 client_id)
53
54 # Update analysis status.
55 analysis.pipeline_status_path = self.pipeline_status_path() 117 analysis.pipeline_status_path = self.pipeline_status_path()
56 analysis.status = analysis_status.RUNNING 118 analysis.status = analysis_status.RUNNING
57 analysis.started_time = time_util.GetUTCNow() 119 analysis.started_time = time_util.GetUTCNow()
58 analysis.findit_version = appengine_util.GetCurrentVersion() 120 analysis.findit_version = appengine_util.GetCurrentVersion()
59 analysis.put() 121 analysis.put()
60 122
61 # Run the analysis. 123 # Actually do the analysis.
62 result, tags = findit_for_client.FindCulprit( 124 result, tags = self._findit.FindCulprit(analysis).ToDicts()
63 analysis, git_repository.GitRepository(
64 http_client=HttpClientAppengine()))
65 125
66 # Update analysis status and save the analysis result. 126 # Update model's status to say we're done, and save the results.
67 analysis.completed_time = time_util.GetUTCNow() 127 analysis.completed_time = time_util.GetUTCNow()
68 analysis.result = result 128 analysis.result = result
69 for tag_name, tag_value in tags.iteritems(): 129 for tag_name, tag_value in tags.iteritems():
70 # TODO(http://crbug.com/602702): make it possible to add arbitrary tags. 130 # TODO(http://crbug.com/602702): make it possible to add arbitrary tags.
71 if hasattr(analysis, tag_name): 131 if hasattr(analysis, tag_name):
72 setattr(analysis, tag_name, tag_value) 132 setattr(analysis, tag_name, tag_value)
73 analysis.status = analysis_status.COMPLETED 133 analysis.status = analysis_status.COMPLETED
74 analysis.put() 134 analysis.put()
75 135
76 136
77 class PublishResultPipeline(CrashBasePipeline): 137 class PublishResultPipeline(CrashBasePipeline):
138 # TODO(http://crbug.com/659346): we misplaced the coverage test; find it!
78 def finalized(self): 139 def finalized(self):
79 if self.was_aborted: # pragma: no cover. 140 if self.was_aborted: # pragma: no cover.
80 logging.error('Failed to publish %s analysis result for %s', 141 logging.error('Failed to publish %s analysis result for %s',
81 repr(self.crash_identifiers), self.client_id) 142 repr(self._crash_identifiers), self.client_id)
82
83 143
84 # Arguments number differs from overridden method - pylint: disable=W0221 144 # Arguments number differs from overridden method - pylint: disable=W0221
85 def run(self, crash_identifiers, client_id): 145 def run(self):
86 analysis = findit_for_client.GetAnalysisForClient(crash_identifiers, 146 analysis = self._findit.GetAnalysis(self._crash_identifiers)
87 client_id) 147 result = analysis.ToPublishableResult(self._crash_identifiers)
88 result = findit_for_client.GetPublishResultFromAnalysis(analysis,
89 crash_identifiers,
90 client_id)
91 messages_data = [json.dumps(result, sort_keys=True)] 148 messages_data = [json.dumps(result, sort_keys=True)]
92 149
93 client_config = CrashConfig.Get().GetClientConfig(client_id) 150 # TODO(http://crbug.com/659354): remove Findit's dependency on CrashConfig.
151 client_config = self._findit.config
94 # TODO(katesonia): Clean string uses in config. 152 # TODO(katesonia): Clean string uses in config.
95 topic = client_config['analysis_result_pubsub_topic'] 153 topic = client_config['analysis_result_pubsub_topic']
96 pubsub_util.PublishMessagesToTopic(messages_data, topic) 154 pubsub_util.PublishMessagesToTopic(messages_data, topic)
97 logging.info('Published %s analysis result for %s', client_id, 155 logging.info('Published %s analysis result for %s', self.client_id,
98 repr(crash_identifiers)) 156 repr(self._crash_identifiers))
99 157
100 158
101 class CrashWrapperPipeline(BasePipeline): 159 class CrashWrapperPipeline(BasePipeline):
160 """Fire off pipelines to (1) do the analysis and (2) publish results.
161
162 The reason we have analysis and publishing as separate pipelines is
163 because each of them can fail for independent reasons. E.g., if we
164 successfully finish the analysis, but then the publishing fails due to
165 network errors, we don't want to have to redo the analysis in order
166 to redo the publishing. We could try to cache the fact that analysis
167 succeeded in the pipeline object itself, but we'd have to be careful
168 because the |run| and |finalized| methods are executed in different
169 processes.
170 """
171 def __init__(self, client_id, crash_identifiers):
172 super(CrashWrapperPipeline, self).__init__(client_id, crash_identifiers)
173 self._crash_identifiers = crash_identifiers
174 self._client_id = client_id
175
176 # TODO(http://crbug.com/659346): write coverage tests.
102 # Arguments number differs from overridden method - pylint: disable=W0221 177 # Arguments number differs from overridden method - pylint: disable=W0221
103 def run(self, crash_identifiers, client_id): 178 def run(self): # pragma: no cover
104 run_analysis = yield CrashAnalysisPipeline(crash_identifiers, client_id) 179 run_analysis = yield CrashAnalysisPipeline(
180 self._client_id, self._crash_identifiers)
105 with pipeline.After(run_analysis): 181 with pipeline.After(run_analysis):
106 yield PublishResultPipeline(crash_identifiers, client_id) 182 yield PublishResultPipeline(self._client_id, self._crash_identifiers)
107
108
109 @ndb.transactional
110 def _NeedsNewAnalysis(
111 crash_identifiers, chrome_version, signature, client_id,
112 platform, stack_trace, customized_data):
113 analysis = findit_for_client.GetAnalysisForClient(crash_identifiers,
114 client_id)
115 regression_range = findit_for_client.GetRegressionRange(client_id,
116 customized_data)
117 if (analysis and not analysis.failed and
118 regression_range == analysis.regression_range):
119 logging.info('The analysis of %s has already been done.',
120 repr(crash_identifiers))
121 return False
122
123 # Create analysis for findit to run if this is not a rerun.
124 if not analysis:
125 analysis = findit_for_client.CreateAnalysisForClient(crash_identifiers,
126 client_id)
127
128 findit_for_client.ResetAnalysis(analysis, chrome_version, signature,
129 client_id, platform, stack_trace,
130 customized_data, regression_range)
131 return True
132
133
134 def ScheduleNewAnalysisForCrash(
135 crash_identifiers, chrome_version, signature, client_id,
136 platform, stack_trace, customized_data,
137 queue_name=constants.DEFAULT_QUEUE):
138 """Schedules an analysis."""
139 # Check policy and tune arguments if needed.
140 pass_policy, updated_analysis_args = findit_for_client.CheckPolicyForClient(
141 crash_identifiers, chrome_version, signature,
142 client_id, platform, stack_trace,
143 customized_data)
144
145 if not pass_policy:
146 return False
147
148 if _NeedsNewAnalysis(*updated_analysis_args):
149 analysis_pipeline = CrashWrapperPipeline(crash_identifiers, client_id)
150 # Attribute defined outside __init__ - pylint: disable=W0201
151 analysis_pipeline.target = appengine_util.GetTargetNameForModule(
152 constants.CRASH_BACKEND[client_id])
153 analysis_pipeline.start(queue_name=queue_name)
154 logging.info('New %s analysis is scheduled for %s', client_id,
155 repr(crash_identifiers))
156 return True
157
158 return False
OLDNEW
« no previous file with comments | « appengine/findit/crash/component_classifier.py ('k') | appengine/findit/crash/crash_report.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698