Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(148)

Side by Side Diff: appengine/findit/crash/crash_pipeline.py

Issue 2414523002: [Findit] Reorganizing findit_for_*.py (Closed)
Patch Set: more debugging Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2016 The Chromium Authors. All rights reserved. 1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 import copy 5 import copy
6 import json 6 import json
7 import logging 7 import logging
8 8
9 from google.appengine.api import app_identity
10 from google.appengine.ext import ndb
11
12 from common import appengine_util 9 from common import appengine_util
13 from common import constants
14 from common import pubsub_util 10 from common import pubsub_util
15 from common import time_util 11 from common import time_util
16 from common.pipeline_wrapper import BasePipeline 12 from common.pipeline_wrapper import BasePipeline
17 from common.pipeline_wrapper import pipeline 13 from common.pipeline_wrapper import pipeline
18 from crash import findit_for_client
19 from model import analysis_status 14 from model import analysis_status
20 from model.crash.crash_config import CrashConfig 15 from model.crash.crash_config import CrashConfig
21 16
22 17
23 class CrashBasePipeline(BasePipeline): 18 class CrashBasePipeline(BasePipeline):
24 def __init__(self, crash_identifiers, client_id): 19 def __init__(self, findit_client, crash_identifiers):
25 super(CrashBasePipeline, self).__init__(crash_identifiers, client_id) 20 super(CrashBasePipeline, self).__init__(
26 self.crash_identifiers = crash_identifiers 21 crash_identifiers, findit_client.client_id)
27 self.client_id = client_id
28 22
23 # TODO(wrengr): why do we take crash_identifiers as an argument and
24 # cache them, when they'll be passed in to every call to run?
25 self._crash_identifiers = crash_identifiers
26 self._findit = findit_client
27
28 @property
29 def client_id(self):
30 return self._findit.client_id
31
32 # This function is specified by the AppEngine pipeline API. So we can't
33 # change the name nor restrict the arguments (even though subclasses
34 # will restrict the arguments).
29 def run(self, *args, **kwargs): 35 def run(self, *args, **kwargs):
30 raise NotImplementedError() 36 raise NotImplementedError()
31 37
32 38
33 class CrashAnalysisPipeline(CrashBasePipeline): 39 class CrashAnalysisPipeline(CrashBasePipeline):
34 def _SetErrorIfAborted(self, aborted): 40 def finalized(self):
35 if not aborted: 41 if self.was_aborted:
36 return 42 self._PutAbortedError()
37 43
38 logging.error('Aborted analysis for %s', repr(self.crash_identifiers)) 44 # N.B., this method must be factored out for unittest reasons; since
39 analysis = findit_for_client.GetAnalysisForClient(self.crash_identifiers, 45 # |was_aborted| can't be altered directly.
40 self.client_id) 46 def _PutAbortedError(self):
47 """Update the ndb.Model to indicate that this pipeline was aborted."""
48 logging.error('Aborted analysis for %s', repr(self._crash_identifiers))
49 analysis = self._findit.GetAnalysis(self._crash_identifiers)
41 analysis.status = analysis_status.ERROR 50 analysis.status = analysis_status.ERROR
42 analysis.put() 51 analysis.put()
43 52
44 def finalized(self): 53 # Arguments number differs from overridden method - pylint: disable=W0221
45 self._SetErrorIfAborted(self.was_aborted) 54 def run(self, crash_identifiers):
55 # In practice, the |crash_identifiers| this method gets will be
56 # identical to the ones passed to __init__; and yet they must still be
57 # passed for some reason having to do with AppEngine API. Just to make
58 # certain that our logging is consistent (in the face of mocking etc),
59 # we reset the cached copy anyways.
60 self._crash_identifiers = crash_identifiers
46 61
47 # Arguments number differs from overridden method - pylint: disable=W0221 62 analysis = self._findit.GetAnalysis(crash_identifiers)
48 def run(self, crash_identifiers, client_id):
49 analysis = findit_for_client.GetAnalysisForClient(crash_identifiers,
50 client_id)
51 63
52 # Update analysis status. 64 # Update analysis status.
53 analysis.pipeline_status_path = self.pipeline_status_path() 65 analysis.pipeline_status_path = self.pipeline_status_path()
54 analysis.status = analysis_status.RUNNING 66 analysis.status = analysis_status.RUNNING
55 analysis.started_time = time_util.GetUTCNow() 67 analysis.started_time = time_util.GetUTCNow()
56 analysis.findit_version = appengine_util.GetCurrentVersion() 68 analysis.findit_version = appengine_util.GetCurrentVersion()
57 analysis.put() 69 analysis.put()
58 70
59 # Run the analysis. 71 # Run the analysis.
60 result, tags = findit_for_client.FindCulprit(analysis) 72 result, tags = self._findit.FindCulprit(analysis).ToDicts()
61 73
62 # Update analysis status and save the analysis result. 74 # Update analysis status and save the analysis result.
63 analysis.completed_time = time_util.GetUTCNow() 75 analysis.completed_time = time_util.GetUTCNow()
64 analysis.result = result 76 analysis.result = result
65 for tag_name, tag_value in tags.iteritems(): 77 for tag_name, tag_value in tags.iteritems():
66 # TODO(http://crbug.com/602702): make it possible to add arbitrary tags. 78 # TODO(http://crbug.com/602702): make it possible to add arbitrary tags.
67 if hasattr(analysis, tag_name): 79 if hasattr(analysis, tag_name):
68 setattr(analysis, tag_name, tag_value) 80 setattr(analysis, tag_name, tag_value)
69 analysis.status = analysis_status.COMPLETED 81 analysis.status = analysis_status.COMPLETED
70 analysis.put() 82 analysis.put()
71 83
72 84
73 class PublishResultPipeline(CrashBasePipeline): 85 class PublishResultPipeline(CrashBasePipeline):
74 def finalized(self): 86 def finalized(self):
75 if self.was_aborted: # pragma: no cover. 87 if self.was_aborted: # pragma: no cover.
76 logging.error('Failed to publish %s analysis result for %s', 88 logging.error('Failed to publish %s analysis result for %s',
77 repr(self.crash_identifiers), self.client_id) 89 repr(self._crash_identifiers), self.client_id)
78
79 90
80 # Arguments number differs from overridden method - pylint: disable=W0221 91 # Arguments number differs from overridden method - pylint: disable=W0221
81 def run(self, crash_identifiers, client_id): 92 def run(self, crash_identifiers):
82 analysis = findit_for_client.GetAnalysisForClient(crash_identifiers, 93 # In practice, the |crash_identifiers| this method gets will be
83 client_id) 94 # identical to the ones passed to __init__; and yet they must still be
84 result = findit_for_client.GetPublishResultFromAnalysis(analysis, 95 # passed for some reason having to do with AppEngine API. Just to make
85 crash_identifiers, 96 # certain that our logging is consistent (in the face of mocking etc),
86 client_id) 97 # we reset the cached copy anyways.
98 self._crash_identifiers = crash_identifiers
99
100 analysis = self._findit.GetAnalysis(crash_identifiers)
101 result = analysis.GetPublishableResult(crash_identifiers)
87 messages_data = [json.dumps(result, sort_keys=True)] 102 messages_data = [json.dumps(result, sort_keys=True)]
88 103
89 client_config = CrashConfig.Get().GetClientConfig(client_id) 104 client_config = self._findit.config
90 # TODO(katesonia): Clean string uses in config. 105 # TODO(katesonia): Clean string uses in config.
91 topic = client_config['analysis_result_pubsub_topic'] 106 topic = client_config['analysis_result_pubsub_topic']
92 pubsub_util.PublishMessagesToTopic(messages_data, topic) 107 pubsub_util.PublishMessagesToTopic(messages_data, topic)
93 logging.info('Published %s analysis result for %s', client_id, 108 logging.info('Published %s analysis result for %s', self.client_id,
94 repr(crash_identifiers)) 109 repr(crash_identifiers))
95 110
96 111
112 # TODO(wrengr): why isn't this a subclass of CrashBasePipeline? That'd
113 # make it more consistent about caching the findit_client in __init__
97 class CrashWrapperPipeline(BasePipeline): 114 class CrashWrapperPipeline(BasePipeline):
98 # Arguments number differs from overridden method - pylint: disable=W0221 115 # Arguments number differs from overridden method - pylint: disable=W0221
99 def run(self, crash_identifiers, client_id): 116 def run(self, findit_client, crash_identifiers):
100 run_analysis = yield CrashAnalysisPipeline(crash_identifiers, client_id) 117 # Whatever coroutine consumes what this pipleline |yield|s is the
118 # one that double-passes the |crash_identifiers| and will throw an
119 # error if |run| doesn't accept arguments.
120 run_analysis = yield CrashAnalysisPipeline(findit_client, crash_identifiers)
stgao 2016/10/19 18:20:53 And same here for the sub-pipelines, `findit_clien
wrengr 2016/10/24 18:36:14 Done.
101 with pipeline.After(run_analysis): 121 with pipeline.After(run_analysis):
102 yield PublishResultPipeline(crash_identifiers, client_id) 122 yield PublishResultPipeline(findit_client, crash_identifiers)
103
104
105 @ndb.transactional
106 def _NeedsNewAnalysis(
107 crash_identifiers, chrome_version, signature, client_id,
108 platform, stack_trace, customized_data):
109 analysis = findit_for_client.GetAnalysisForClient(crash_identifiers,
110 client_id)
111 regression_range = findit_for_client.GetRegressionRange(client_id,
112 customized_data)
113 if (analysis and not analysis.failed and
114 regression_range == analysis.regression_range):
115 logging.info('The analysis of %s has already been done.',
116 repr(crash_identifiers))
117 return False
118
119 # Create analysis for findit to run if this is not a rerun.
120 if not analysis:
121 analysis = findit_for_client.CreateAnalysisForClient(crash_identifiers,
122 client_id)
123
124 findit_for_client.ResetAnalysis(analysis, chrome_version, signature,
125 client_id, platform, stack_trace,
126 customized_data, regression_range)
127 return True
128
129
130 def ScheduleNewAnalysisForCrash(
131 crash_identifiers, chrome_version, signature, client_id,
132 platform, stack_trace, customized_data,
133 queue_name=constants.DEFAULT_QUEUE):
134 """Schedules an analysis."""
135 # Check policy and tune arguments if needed.
136 pass_policy, updated_analysis_args = findit_for_client.CheckPolicyForClient(
137 crash_identifiers, chrome_version, signature,
138 client_id, platform, stack_trace,
139 customized_data)
140
141 if not pass_policy:
142 return False
143
144 if _NeedsNewAnalysis(*updated_analysis_args):
145 analysis_pipeline = CrashWrapperPipeline(crash_identifiers, client_id)
146 # Attribute defined outside __init__ - pylint: disable=W0201
147 analysis_pipeline.target = appengine_util.GetTargetNameForModule(
148 constants.CRASH_BACKEND[client_id])
149 analysis_pipeline.start(queue_name=queue_name)
150 logging.info('New %s analysis is scheduled for %s', client_id,
151 repr(crash_identifiers))
152 return True
153
154 return False
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698