Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 # Copyright 2016 The Chromium Authors. All rights reserved. | 1 # Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 import copy | 5 import copy |
| 6 import json | 6 import json |
| 7 import logging | 7 import logging |
| 8 | 8 |
| 9 from google.appengine.api import app_identity | |
| 10 from google.appengine.ext import ndb | |
| 11 | |
| 12 from common import appengine_util | 9 from common import appengine_util |
| 13 from common import constants | |
| 14 from common import pubsub_util | 10 from common import pubsub_util |
| 15 from common import time_util | 11 from common import time_util |
| 16 from common.pipeline_wrapper import BasePipeline | 12 from common.pipeline_wrapper import BasePipeline |
| 17 from common.pipeline_wrapper import pipeline | 13 from common.pipeline_wrapper import pipeline |
| 18 from crash import findit_for_client | |
| 19 from model import analysis_status | 14 from model import analysis_status |
| 20 from model.crash.crash_config import CrashConfig | 15 from model.crash.crash_config import CrashConfig |
| 21 | 16 |
| 22 | 17 |
| 23 class CrashBasePipeline(BasePipeline): | 18 class CrashBasePipeline(BasePipeline): |
| 24 def __init__(self, crash_identifiers, client_id): | 19 def __init__(self, findit_client, crash_identifiers): |
| 25 super(CrashBasePipeline, self).__init__(crash_identifiers, client_id) | 20 super(CrashBasePipeline, self).__init__( |
| 26 self.crash_identifiers = crash_identifiers | 21 crash_identifiers, findit_client.client_id) |
| 27 self.client_id = client_id | |
| 28 | 22 |
| 23 # TODO(wrengr): why do we take crash_identifiers as an argument and | |
| 24 # cache them, when they'll be passed in to every call to run? | |
| 25 self._crash_identifiers = crash_identifiers | |
| 26 self._findit = findit_client | |
| 27 | |
| 28 @property | |
| 29 def client_id(self): | |
| 30 return self._findit.client_id | |
| 31 | |
| 32 # This function is specified by the AppEngine pipeline API. So we can't | |
| 33 # change the name nor restrict the arguments (even though subclasses | |
| 34 # will restrict the arguments). | |
| 29 def run(self, *args, **kwargs): | 35 def run(self, *args, **kwargs): |
| 30 raise NotImplementedError() | 36 raise NotImplementedError() |
| 31 | 37 |
| 32 | 38 |
| 33 class CrashAnalysisPipeline(CrashBasePipeline): | 39 class CrashAnalysisPipeline(CrashBasePipeline): |
| 34 def _SetErrorIfAborted(self, aborted): | 40 def finalized(self): |
| 35 if not aborted: | 41 if self.was_aborted: |
| 36 return | 42 self._PutAbortedError() |
| 37 | 43 |
| 38 logging.error('Aborted analysis for %s', repr(self.crash_identifiers)) | 44 # N.B., this method must be factored out for unittest reasons; since |
| 39 analysis = findit_for_client.GetAnalysisForClient(self.crash_identifiers, | 45 # |was_aborted| can't be altered directly. |
| 40 self.client_id) | 46 def _PutAbortedError(self): |
| 47 """Update the ndb.Model to indicate that this pipeline was aborted.""" | |
| 48 logging.error('Aborted analysis for %s', repr(self._crash_identifiers)) | |
| 49 analysis = self._findit.GetAnalysis(self._crash_identifiers) | |
| 41 analysis.status = analysis_status.ERROR | 50 analysis.status = analysis_status.ERROR |
| 42 analysis.put() | 51 analysis.put() |
| 43 | 52 |
| 44 def finalized(self): | 53 # Arguments number differs from overridden method - pylint: disable=W0221 |
| 45 self._SetErrorIfAborted(self.was_aborted) | 54 def run(self, crash_identifiers): |
| 55 # In practice, the |crash_identifiers| this method gets will be | |
| 56 # identical to the ones passed to __init__; and yet they must still be | |
| 57 # passed for some reason having to do with AppEngine API. Just to make | |
| 58 # certain that our logging is consistent (in the face of mocking etc), | |
| 59 # we reset the cached copy anyways. | |
| 60 self._crash_identifiers = crash_identifiers | |
| 46 | 61 |
| 47 # Arguments number differs from overridden method - pylint: disable=W0221 | 62 analysis = self._findit.GetAnalysis(crash_identifiers) |
| 48 def run(self, crash_identifiers, client_id): | |
| 49 analysis = findit_for_client.GetAnalysisForClient(crash_identifiers, | |
| 50 client_id) | |
| 51 | 63 |
| 52 # Update analysis status. | 64 # Update analysis status. |
| 53 analysis.pipeline_status_path = self.pipeline_status_path() | 65 analysis.pipeline_status_path = self.pipeline_status_path() |
| 54 analysis.status = analysis_status.RUNNING | 66 analysis.status = analysis_status.RUNNING |
| 55 analysis.started_time = time_util.GetUTCNow() | 67 analysis.started_time = time_util.GetUTCNow() |
| 56 analysis.findit_version = appengine_util.GetCurrentVersion() | 68 analysis.findit_version = appengine_util.GetCurrentVersion() |
| 57 analysis.put() | 69 analysis.put() |
| 58 | 70 |
| 59 # Run the analysis. | 71 # Run the analysis. |
| 60 result, tags = findit_for_client.FindCulprit(analysis) | 72 result, tags = self._findit.FindCulprit(analysis).ToDicts() |
| 61 | 73 |
| 62 # Update analysis status and save the analysis result. | 74 # Update analysis status and save the analysis result. |
| 63 analysis.completed_time = time_util.GetUTCNow() | 75 analysis.completed_time = time_util.GetUTCNow() |
| 64 analysis.result = result | 76 analysis.result = result |
| 65 for tag_name, tag_value in tags.iteritems(): | 77 for tag_name, tag_value in tags.iteritems(): |
| 66 # TODO(http://crbug.com/602702): make it possible to add arbitrary tags. | 78 # TODO(http://crbug.com/602702): make it possible to add arbitrary tags. |
| 67 if hasattr(analysis, tag_name): | 79 if hasattr(analysis, tag_name): |
| 68 setattr(analysis, tag_name, tag_value) | 80 setattr(analysis, tag_name, tag_value) |
| 69 analysis.status = analysis_status.COMPLETED | 81 analysis.status = analysis_status.COMPLETED |
| 70 analysis.put() | 82 analysis.put() |
| 71 | 83 |
| 72 | 84 |
| 73 class PublishResultPipeline(CrashBasePipeline): | 85 class PublishResultPipeline(CrashBasePipeline): |
| 74 def finalized(self): | 86 def finalized(self): |
| 75 if self.was_aborted: # pragma: no cover. | 87 if self.was_aborted: # pragma: no cover. |
| 76 logging.error('Failed to publish %s analysis result for %s', | 88 logging.error('Failed to publish %s analysis result for %s', |
| 77 repr(self.crash_identifiers), self.client_id) | 89 repr(self._crash_identifiers), self.client_id) |
| 78 | |
| 79 | 90 |
| 80 # Arguments number differs from overridden method - pylint: disable=W0221 | 91 # Arguments number differs from overridden method - pylint: disable=W0221 |
| 81 def run(self, crash_identifiers, client_id): | 92 def run(self, crash_identifiers): |
| 82 analysis = findit_for_client.GetAnalysisForClient(crash_identifiers, | 93 # In practice, the |crash_identifiers| this method gets will be |
| 83 client_id) | 94 # identical to the ones passed to __init__; and yet they must still be |
| 84 result = findit_for_client.GetPublishResultFromAnalysis(analysis, | 95 # passed for some reason having to do with AppEngine API. Just to make |
| 85 crash_identifiers, | 96 # certain that our logging is consistent (in the face of mocking etc), |
| 86 client_id) | 97 # we reset the cached copy anyways. |
| 98 self._crash_identifiers = crash_identifiers | |
| 99 | |
| 100 analysis = self._findit.GetAnalysis(crash_identifiers) | |
| 101 result = analysis.GetPublishableResult(crash_identifiers) | |
| 87 messages_data = [json.dumps(result, sort_keys=True)] | 102 messages_data = [json.dumps(result, sort_keys=True)] |
| 88 | 103 |
| 89 client_config = CrashConfig.Get().GetClientConfig(client_id) | 104 client_config = self._findit.config |
| 90 # TODO(katesonia): Clean string uses in config. | 105 # TODO(katesonia): Clean string uses in config. |
| 91 topic = client_config['analysis_result_pubsub_topic'] | 106 topic = client_config['analysis_result_pubsub_topic'] |
| 92 pubsub_util.PublishMessagesToTopic(messages_data, topic) | 107 pubsub_util.PublishMessagesToTopic(messages_data, topic) |
| 93 logging.info('Published %s analysis result for %s', client_id, | 108 logging.info('Published %s analysis result for %s', self.client_id, |
| 94 repr(crash_identifiers)) | 109 repr(crash_identifiers)) |
| 95 | 110 |
| 96 | 111 |
| 112 # TODO(wrengr): why isn't this a subclass of CrashBasePipeline? That'd | |
| 113 # make it more consistent about caching the findit_client in __init__ | |
| 97 class CrashWrapperPipeline(BasePipeline): | 114 class CrashWrapperPipeline(BasePipeline): |
| 98 # Arguments number differs from overridden method - pylint: disable=W0221 | 115 # Arguments number differs from overridden method - pylint: disable=W0221 |
| 99 def run(self, crash_identifiers, client_id): | 116 def run(self, findit_client, crash_identifiers): |
| 100 run_analysis = yield CrashAnalysisPipeline(crash_identifiers, client_id) | 117 # Whatever coroutine consumes what this pipleline |yield|s is the |
| 118 # one that double-passes the |crash_identifiers| and will throw an | |
| 119 # error if |run| doesn't accept arguments. | |
| 120 run_analysis = yield CrashAnalysisPipeline(findit_client, crash_identifiers) | |
|
stgao
2016/10/19 18:20:53
And same here for the sub-pipelines, `findit_clien
wrengr
2016/10/24 18:36:14
Done.
| |
| 101 with pipeline.After(run_analysis): | 121 with pipeline.After(run_analysis): |
| 102 yield PublishResultPipeline(crash_identifiers, client_id) | 122 yield PublishResultPipeline(findit_client, crash_identifiers) |
| 103 | |
| 104 | |
| 105 @ndb.transactional | |
| 106 def _NeedsNewAnalysis( | |
| 107 crash_identifiers, chrome_version, signature, client_id, | |
| 108 platform, stack_trace, customized_data): | |
| 109 analysis = findit_for_client.GetAnalysisForClient(crash_identifiers, | |
| 110 client_id) | |
| 111 regression_range = findit_for_client.GetRegressionRange(client_id, | |
| 112 customized_data) | |
| 113 if (analysis and not analysis.failed and | |
| 114 regression_range == analysis.regression_range): | |
| 115 logging.info('The analysis of %s has already been done.', | |
| 116 repr(crash_identifiers)) | |
| 117 return False | |
| 118 | |
| 119 # Create analysis for findit to run if this is not a rerun. | |
| 120 if not analysis: | |
| 121 analysis = findit_for_client.CreateAnalysisForClient(crash_identifiers, | |
| 122 client_id) | |
| 123 | |
| 124 findit_for_client.ResetAnalysis(analysis, chrome_version, signature, | |
| 125 client_id, platform, stack_trace, | |
| 126 customized_data, regression_range) | |
| 127 return True | |
| 128 | |
| 129 | |
| 130 def ScheduleNewAnalysisForCrash( | |
| 131 crash_identifiers, chrome_version, signature, client_id, | |
| 132 platform, stack_trace, customized_data, | |
| 133 queue_name=constants.DEFAULT_QUEUE): | |
| 134 """Schedules an analysis.""" | |
| 135 # Check policy and tune arguments if needed. | |
| 136 pass_policy, updated_analysis_args = findit_for_client.CheckPolicyForClient( | |
| 137 crash_identifiers, chrome_version, signature, | |
| 138 client_id, platform, stack_trace, | |
| 139 customized_data) | |
| 140 | |
| 141 if not pass_policy: | |
| 142 return False | |
| 143 | |
| 144 if _NeedsNewAnalysis(*updated_analysis_args): | |
| 145 analysis_pipeline = CrashWrapperPipeline(crash_identifiers, client_id) | |
| 146 # Attribute defined outside __init__ - pylint: disable=W0201 | |
| 147 analysis_pipeline.target = appengine_util.GetTargetNameForModule( | |
| 148 constants.CRASH_BACKEND[client_id]) | |
| 149 analysis_pipeline.start(queue_name=queue_name) | |
| 150 logging.info('New %s analysis is scheduled for %s', client_id, | |
| 151 repr(crash_identifiers)) | |
| 152 return True | |
| 153 | |
| 154 return False | |
| OLD | NEW |