Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(176)

Side by Side Diff: appengine/findit/crash/fracas_crash_pipeline.py

Issue 1852383002: [Findit] Integrate with Fracas through Pub/Sub. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Just rebase. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/findit/crash/fracas.py ('k') | appengine/findit/crash/test/crash_testcase.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 import base64
6 import datetime
7 import json
8 import logging
9
10 from google.appengine.ext import ndb
11
12 from common import appengine_util
13 from common import constants
14 from common import pubsub_util
15 from common.pipeline_wrapper import BasePipeline
16 from common.pipeline_wrapper import pipeline
17 from crash import fracas
18 from model import analysis_status
19 from model.crash.crash_config import CrashConfig
20 from model.crash.fracas_crash_analysis import FracasCrashAnalysis
21
22
23 class FracasBasePipeline(BasePipeline):
24 def __init__(self, channel, platform, signature):
25 super(FracasBasePipeline, self).__init__(
26 channel, platform, signature)
27 self.channel = channel
28 self.platform = platform
29 self.signature = signature
30
31 def run(self, *args, **kwargs):
32 raise NotImplementedError()
33
34
35 class FracasAnalysisPipeline(FracasBasePipeline):
36 def _SetErrorIfAborted(self, aborted):
37 if not aborted:
38 return
39
40 logging.error('Aborted analysis for %s, %s, %s',
41 self.channel, self.platform, self.signature)
42 analysis = FracasCrashAnalysis.Get(
43 self.channel, self.platform, self.signature)
44 analysis.status = analysis_status.ERROR
45 analysis.put()
46
47 def finalized(self):
48 self._SetErrorIfAborted(self.was_aborted)
49
50 # Arguments number differs from overridden method - pylint: disable=W0221
51 def run(self, channel, platform, signature):
52 analysis = FracasCrashAnalysis.Get(channel, platform, signature)
53
54 # Update analysis status.
55 analysis.pipeline_status_path = self.pipeline_status_path()
56 analysis.status = analysis_status.RUNNING
57 analysis.started_time = datetime.datetime.utcnow()
58 analysis.findit_version = appengine_util.GetCurrentVersion()
59 analysis.put()
60
61 # Run the analysis.
62 result, tags = fracas.FindCulpritForChromeCrash(
63 channel, platform, signature, analysis.stack_trace,
64 analysis.crashed_version, analysis.versions_to_cpm)
65
66 # Update analysis status and save the analysis result.
67 analysis.completed_time = datetime.datetime.utcnow()
68 analysis.result = result
69 for tag_name, tag_value in tags.iteritems():
70 # TODO(http://crbug.com/602702): make it possible to add arbitrary tags.
71 if hasattr(analysis, tag_name):
72 setattr(analysis, tag_name, tag_value)
73 analysis.status = analysis_status.COMPLETED
74 analysis.put()
75
76
77 class PublishResultPipeline(FracasBasePipeline):
78 def finalized(self):
79 if self.was_aborted: # pragma: no cover.
80 logging.error('Failed to publish analysis result for %s, %s, %s',
81 self.channel, self.platform, self.signature)
82
83 # Arguments number differs from overridden method - pylint: disable=W0221
84 def run(self, channel, platform, signature):
85 analysis = FracasCrashAnalysis.Get(channel, platform, signature)
86 result = {
87 'channel': channel,
88 'platform': platform,
89 'signature': signature,
90 'result': analysis.result,
91 }
92 messages_data = [json.dumps(result, sort_keys=True)]
93
94 crash_config = CrashConfig.Get()
95 topic = crash_config.fracas['analysis_result_pubsub_topic']
96 pubsub_util.PublishMessagesToTopic(messages_data, topic)
97 logging.info('Published analysis result for %s, %s, %s',
98 channel, platform, signature)
99
100
101 class FracasCrashWrapperPipeline(BasePipeline):
102 # Arguments number differs from overridden method - pylint: disable=W0221
103 def run(self, channel, platform, signature):
104 run_analysis = yield FracasAnalysisPipeline(channel, platform, signature)
105 with pipeline.After(run_analysis):
106 yield PublishResultPipeline(channel, platform, signature)
107
108
109 @ndb.transactional
110 def _NeedsNewAnalysis(
111 channel, platform, signature, stack_trace, chrome_version, versions_to_cpm):
112 analysis = FracasCrashAnalysis.Get(channel, platform, signature)
113 if analysis and not analysis.failed:
114 # A new analysis is not needed if last one didn't complete or succeeded.
115 # TODO(http://crbug.com/600535): re-analyze if stack trace or regression
116 # range changed.
117 return False
118
119 if not analysis:
120 # A new analysis is needed if there is no analysis yet.
121 analysis = FracasCrashAnalysis.Create(channel, platform, signature)
122
123 analysis.Reset()
124 analysis.crashed_version = chrome_version
125 analysis.stack_trace = stack_trace
126 analysis.versions_to_cpm = versions_to_cpm
127 analysis.status = analysis_status.PENDING
128 analysis.requested_time = datetime.datetime.utcnow()
129 analysis.put()
130 return True
131
132
133 def ScheduleNewAnalysisForCrash(
134 channel, platform, signature, stack_trace, chrome_version, versions_to_cpm,
135 queue_name=constants.DEFAULT_QUEUE):
136 """Schedules an analysis."""
137 crash_config = CrashConfig.Get()
138 if platform not in crash_config.fracas.get(
139 'supported_platform_list_by_channel', {}).get(channel, []):
140 # Bail out if either the channel or platform is not supported yet.
141 return False
142
143 if _NeedsNewAnalysis(channel, platform, signature, stack_trace,
144 chrome_version, versions_to_cpm):
145 analysis_pipeline = FracasCrashWrapperPipeline(channel, platform, signature)
146 analysis_pipeline.target = appengine_util.GetTargetNameForModule(
147 constants.CRASH_BACKEND_FRACAS)
148 analysis_pipeline.start(queue_name=queue_name)
149 logging.info('New analysis is scheduled for %s, %s, %s',
150 channel, platform, signature)
151 return True
152
153 return False
OLDNEW
« no previous file with comments | « appengine/findit/crash/fracas.py ('k') | appengine/findit/crash/test/crash_testcase.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698