Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(253)

Side by Side Diff: appengine/findit/crash/fracas_crash_pipeline.py

Issue 1852383002: [Findit] Integrate with Fracas through Pub/Sub. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Address comments. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 import base64
6 import datetime
7 import json
8 import logging
9
10 from google.appengine.ext import ndb
11
12 from common import appengine_util
13 from common import constants
14 from common import pubsub_util
15 from crash import fracas
16 from model import analysis_status
17 from model.crash.crash_config import CrashConfig
18 from model.crash.fracas_crash_analysis import FracasCrashAnalysis
19 from pipeline_wrapper import BasePipeline
20 from pipeline_wrapper import pipeline
21
22
23 class FracasCrashBasePipeline(BasePipeline):
24 def __init__(self, channel, platform, signature):
25 super(FracasCrashBasePipeline, self).__init__(
26 channel, platform, signature)
27 self.channel = channel
28 self.platform = platform
29 self.signature = signature
30
31 def run(self, *args, **kwargs):
32 raise NotImplementedError()
33
34
35 class AnalyzeCrashPipeline(FracasCrashBasePipeline):
36 def _SetErrorWhenAborted(self, aborted):
Martin Barbella 2016/04/08 18:10:24 I don't think this necessarily needs to be in its
stgao 2016/04/11 20:32:19 This function is necessary for unittest purpose wi
37 if aborted:
38 logging.error('Aborted analysis for %s, %s, %s',
39 self.channel, self.platform, self.signature)
40 analysis = FracasCrashAnalysis.Get(
41 self.channel, self.platform, self.signature)
42 analysis.status = analysis_status.ERROR
43 analysis.put()
44
45 def finalized(self):
46 self._SetErrorWhenAborted(self.was_aborted)
47
48 # Arguments number differs from overridden method - pylint: disable=W0221
49 def run(self, channel, platform, signature):
50 analysis = FracasCrashAnalysis.Get(channel, platform, signature)
51
52 # Update analysis status.
53 analysis.pipeline_status_path = self.pipeline_status_path()
54 analysis.status = analysis_status.RUNNING
55 analysis.started_time = datetime.datetime.utcnow()
56 analysis.findit_version = appengine_util.GetCurrentVersion()
57 analysis.put()
58
59 # Run the analysis.
60 result, tags = fracas.FindCulpritForChromeCrash(
61 channel, platform, signature, analysis.stack_trace,
62 analysis.crashed_version, analysis.cpms)
63
64 # Update analysis status and save the analysis result.
65 analysis.completed_time = datetime.datetime.utcnow()
66 analysis.result = result
67 for tag_name, tag_value in tags.iteritems():
68 # Later, we might consider adding arbitrary tags.
Martin Barbella 2016/04/08 18:10:24 This seems ideal. At the very least, this should b
stgao 2016/04/11 20:32:19 Maybe let's remove for now. The intention was to a
Martin Barbella 2016/04/11 21:46:31 There are certainly ways to do this without adding
stgao 2016/04/12 18:19:17 Yes, there should be. But I'd like to defer the im
69 if hasattr(analysis, tag_name):
70 setattr(analysis, tag_name, tag_value)
71 analysis.status = analysis_status.COMPLETED
72 analysis.put()
73
74
75 class PublishResultPipeline(FracasCrashBasePipeline):
76 def finalized(self):
77 if self.was_aborted: # pragma: no cover.
78 logging.error('Failed to publish analysis result for %s, %s, %s',
79 self.channel, self.platform, self.signature)
80
81 # Arguments number differs from overridden method - pylint: disable=W0221
82 def run(self, channel, platform, signature):
83 analysis = FracasCrashAnalysis.Get(channel, platform, signature)
Sharu 2016/04/12 00:23:13 Should we set the self.channel, self.platform, sel
stgao 2016/04/12 18:27:35 self.channel, self.platform, self.signature are se
84 result = {
85 'channel': channel,
86 'platform': platform,
87 'signature': signature,
88 'result': analysis.result,
89 }
90 messages_data = [json.dumps(result, sort_keys=True)]
91
92 crash_config = CrashConfig.Get()
93 topic = crash_config.fracas['analysis_result_pubsub_topic']
94 pubsub_util.PublishMessagesToTopic(messages_data, topic)
95 logging.info('Published analysis result for %s, %s, %s',
96 channel, platform, signature)
97
98
99 class FracasCrashWrapperPipeline(BasePipeline):
100 # Arguments number differs from overridden method - pylint: disable=W0221
101 def run(self, channel, platform, signature):
102 run_analysis = yield AnalyzeCrashPipeline(channel, platform, signature)
103 with pipeline.After(run_analysis):
104 yield PublishResultPipeline(channel, platform, signature)
105
106
107 @ndb.transactional
108 def _NeedANewAnalysis(
Martin Barbella 2016/04/08 18:10:24 Nit: rename to _NeedsNewAnalysis.
stgao 2016/04/11 20:32:19 Done.
109 channel, platform, signature, stack_trace, chrome_version, cpms):
Martin Barbella 2016/04/08 18:10:24 Same comment about cpms from before.
stgao 2016/04/11 20:32:19 Done.
110 analysis = FracasCrashAnalysis.Get(channel, platform, signature)
111 if analysis and not analysis.failed:
112 # A new analysis is not needed if last one didn't complete or succeeded.
113 # TODO(http://crbug.com/600535): re-analyze if stack trace or regression
114 # range changed.
115 return False
116
117 if not analysis:
118 # A new analysis is needed if there is no analysis yet.
119 analysis = FracasCrashAnalysis.Create(channel, platform, signature)
120
121 analysis.Reset()
122 analysis.crashed_version = chrome_version
123 analysis.stack_trace = stack_trace
124 analysis.cpms = cpms
125 analysis.status = analysis_status.PENDING
126 analysis.requested_time = datetime.datetime.utcnow()
127 analysis.put()
128 return True
129
130
131 def ScheduleNewAnalysisForCrash(
132 channel, platform, signature, stack_trace, chrome_version, cpms,
Martin Barbella 2016/04/08 18:10:24 Ditto.
stgao 2016/04/11 20:32:19 Done.
133 queue_name=constants.DEFAULT_QUEUE):
134 """Schedules an analysis."""
135 crash_config = CrashConfig.Get()
136 if platform not in crash_config.fracas.get(
137 'supported_platform_list_by_channel', {}).get(channel, []):
138 # Bail out if either the channel or platform is not supported yet.
139 return False
140
141 if _NeedANewAnalysis(
142 channel, platform, signature, stack_trace, chrome_version, cpms):
143 analysis_pipeline = FracasCrashWrapperPipeline(channel, platform, signature)
144 analysis_pipeline.target = appengine_util.GetTargetNameForModule(
145 constants.CRASH_BACKEND_FRACAS)
146 analysis_pipeline.start(queue_name=queue_name)
147 logging.info('New analysis is scheduled for %s, %s, %s',
148 channel, platform, signature)
149 return True
150
151 return False
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698