Chromium Code Reviews| Index: appengine/findit/crash/fracas_crash_pipeline.py |
| diff --git a/appengine/findit/crash/fracas_crash_pipeline.py b/appengine/findit/crash/fracas_crash_pipeline.py |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..00d83d3573838bca2778e1e2ab0f3bd511f25acb |
| --- /dev/null |
| +++ b/appengine/findit/crash/fracas_crash_pipeline.py |
| @@ -0,0 +1,150 @@ |
| +# Copyright 2016 The Chromium Authors. All rights reserved. |
| +# Use of this source code is governed by a BSD-style license that can be |
| +# found in the LICENSE file. |
| + |
| +import base64 |
| +import datetime |
| +import json |
| +import logging |
| + |
| +from google.appengine.ext import ndb |
| + |
| +from common import appengine_util |
| +from common import constants |
| +from common import pubsub_util |
| +from crash import fracas |
| +from model import analysis_status |
| +from model.crash.crash_config import CrashConfig |
| +from model.crash.fracas_crash_analysis import FracasCrashAnalysis |
| +from pipeline_wrapper import BasePipeline |
| +from pipeline_wrapper import pipeline |
| + |
| + |
| +class FracasCrashBasePipeline(BasePipeline): |
| + def __init__(self, channel, platform, signature): |
| + super(FracasCrashBasePipeline, self).__init__( |
| + channel, platform, signature) |
| + self.channel = channel |
| + self.platform = platform |
| + self.signature = signature |
| + |
| + def run(self, *args, **kwargs): |
| + raise NotImplementedError() |
| + |
| + |
| +class AnalyzeCrashPipeline(FracasCrashBasePipeline): |
| + def _SetErrorWhenAborted(self, aborted): |
| + if aborted: |
| + logging.error('Aborted analysis for %s, %s, %s', |
| + self.channel, self.platform, self.signature) |
| + analysis = FracasCrashAnalysis.Get( |
| + self.channel, self.platform, self.signature) |
| + analysis.status = analysis_status.ERROR |
| + analysis.put() |
| + |
| + def finalized(self): |
| + self._SetErrorWhenAborted(self.was_aborted) |
| + |
| + # Arguments number differs from overridden method - pylint: disable=W0221 |
| + def run(self, channel, platform, signature): |
| + analysis = FracasCrashAnalysis.Get(channel, platform, signature) |
| + |
| + # Update analysis status. |
| + analysis.pipeline_status_path = self.pipeline_status_path() |
| + analysis.status = analysis_status.RUNNING |
| + analysis.started_time = datetime.datetime.utcnow() |
| + analysis.findit_version = appengine_util.GetCurrentVersion() |
| + analysis.put() |
| + |
| + # Run the analysis. |
| + result, tags = fracas.FindCulpritForChromeCrash( |
| + channel, platform, signature, analysis.stack_trace, |
| + analysis.crashed_version, analysis.cpm) |
| + |
| + # Update analysis status and save the analysis result. |
| + analysis.completed_time = datetime.datetime.utcnow() |
| + analysis.result = result |
| + for tag_name, tag_value in tags.iteritems(): |
| + # Later, we might consider adding arbitrary tags. |
| + if hasattr(analysis, tag_name): |
| + setattr(analysis, tag_name, tag_value) |
| + analysis.status = analysis_status.COMPLETED |
| + analysis.put() |
| + |
| + |
| +class PublishResultPipeline(FracasCrashBasePipeline): |
| + def finalized(self): |
| + if self.was_aborted: # pragma: no cover. |
| + logging.error('Failed to publish analysis result for %s, %s, %s', |
| + self.channel, self.platform, self.signature) |
| + |
| + # Arguments number differs from overridden method - pylint: disable=W0221 |
| + def run(self, channel, platform, signature): |
| + analysis = FracasCrashAnalysis.Get(channel, platform, signature) |
| + result = { |
| + 'channel': channel, |
| + 'platform': platform, |
| + 'signature': signature, |
| + 'result': analysis.result, |
| + } |
| + messages_data = [json.dumps(result, sort_keys=True)] |
| + |
| + crash_config = CrashConfig.Get() |
| + topic = crash_config.fracas['analysis_result_pubsub_topic'] |
| + pubsub_util.PublishMessagesToTopic(messages_data, topic) |
| + logging.info('Published analysis result for %s, %s, %s', |
| + channel, platform, signature) |
| + |
| + |
| +class FracasCrashWrapperPipeline(BasePipeline): |
| + # Arguments number differs from overridden method - pylint: disable=W0221 |
| + def run(self, channel, platform, signature): |
| + run_analysis = yield AnalyzeCrashPipeline(channel, platform, signature) |
| + with pipeline.After(run_analysis): |
| + yield PublishResultPipeline(channel, platform, signature) |
| + |
| + |
| +@ndb.transactional |
| +def _NeedANewAnalysis( |
| + channel, platform, signature, stack_trace, chrome_version, cpm): |
| + analysis = FracasCrashAnalysis.Get(channel, platform, signature) |
| + if analysis and not analysis.failed: |
| + # A new analysis is not needed if last one didn't complete and fail. |
|
mimee
2016/04/07 23:43:09
fail-> failed.
stgao
2016/04/08 01:14:08
Corrected.
|
| + # TODO(http://crbug.com/600535): re-analyze if stack trace or regression |
| + # range changed. |
| + return False |
| + elif not analysis: |
|
mimee
2016/04/07 23:43:09
Already returned previously, elif->if
stgao
2016/04/08 01:14:08
Done.
|
| + # A new analysis is needed if there is no analysis yet. |
| + analysis = FracasCrashAnalysis.Create(channel, platform, signature) |
| + |
| + analysis.Reset() |
| + analysis.crashed_version = chrome_version |
| + analysis.stack_trace = stack_trace |
| + analysis.cpm = cpm |
| + analysis.status = analysis_status.PENDING |
| + analysis.requested_time = datetime.datetime.utcnow() |
| + analysis.put() |
| + return True |
| + |
| + |
| +def ScheduleNewAnalysisForCrash( |
| + channel, platform, signature, stack_trace, chrome_version, cpm, |
| + queue_name=constants.DEFAULT_QUEUE): |
| + """Schedules an analysis.""" |
| + crash_config = CrashConfig.Get() |
| + if platform not in crash_config.fracas.get( |
| + 'supported_channels', {}).get(channel, []): |
| + # Bail out if either the channel or platform is not supported yet. |
| + return False |
| + |
| + if _NeedANewAnalysis( |
| + channel, platform, signature, stack_trace, chrome_version, cpm): |
| + analysis_pipeline = FracasCrashWrapperPipeline(channel, platform, signature) |
| + analysis_pipeline.target = appengine_util.GetTargetNameForModule( |
| + constants.CRASH_BACKEND_FRACAS) |
| + analysis_pipeline.start(queue_name=queue_name) |
| + logging.info('New analysis is scheduled for %s, %s, %s', |
| + channel, platform, signature) |
| + return True |
| + |
| + return False |