| OLD | NEW |
| 1 # Copyright 2016 The Chromium Authors. All rights reserved. | 1 # Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 import copy | 5 import copy |
| 6 import json | 6 import json |
| 7 import logging | 7 import logging |
| 8 | 8 |
| 9 from google.appengine.api import app_identity | |
| 10 from google.appengine.ext import ndb | |
| 11 | |
| 12 from common import appengine_util | 9 from common import appengine_util |
| 13 from common import constants | 10 from common import constants |
| 14 from common import git_repository | 11 from common import git_repository |
| 15 from common import pubsub_util | 12 from common import pubsub_util |
| 16 from common import time_util | 13 from common import time_util |
| 17 from common.http_client_appengine import HttpClientAppengine | 14 from common.http_client_appengine import HttpClientAppengine |
| 18 from common.pipeline_wrapper import BasePipeline | 15 from common.pipeline_wrapper import BasePipeline |
| 19 from common.pipeline_wrapper import pipeline | 16 from common.pipeline_wrapper import pipeline |
| 20 from crash import findit_for_client | 17 from crash import findit_for_chromecrash |
| 18 from crash import findit_for_clusterfuzz |
| 19 from crash.type_enums import CrashClient |
| 21 from model import analysis_status | 20 from model import analysis_status |
| 22 from model.crash.crash_config import CrashConfig | 21 from model.crash.crash_config import CrashConfig |
| 23 | 22 |
| 24 | 23 |
| 24 # TODO(http://crbug.com/659346): write complete coverage tests for this. |
| 25 def FinditForClientID(client_id): # pragma: no cover |
| 26 """Construct a Findit object from a client id string specifying the class. |
| 27 |
| 28 We cannot pass Findit objects to the various methods in |
| 29 |crash.crash_pipeline|, because they are not JSON serializable. For now, |
| 30 we just serialize Findit objects as their |client_id|, and then use this |
| 31 function to reconstruct them. Alas, this means we will lose various |
| 32 other information stored in the Findit object (i.e., stuff that comes |
| 33 from CrashConfig); which could lead to some hard-to-diagnose coherency |
| 34 bugs, since the new Findit object will be based on the CrashConfig at |
| 35 the time it's constructed, which may be different than the CrashConfig |
| 36 at the time the previous Findit object was constructed. In the future |
| 37 we should fix all this to serialize Findit objects in a more robust way. |
| 38 """ |
| 39 assert isinstance(client_id, (str, unicode)), ( |
| 40 'FinditForClientID: expected string or unicode, but got %s' |
| 41 % client_id.__class__.__name__) |
| 42 # TODO(wrengr): it'd be nice to replace this with a single lookup in |
| 43 # a dict; but that's buggy for some unknown reason. |
| 44 if client_id == CrashClient.FRACAS: |
| 45 cls = findit_for_chromecrash.FinditForFracas |
| 46 elif client_id == CrashClient.CRACAS: |
| 47 cls = findit_for_chromecrash.FinditForCracas |
| 48 elif client_id == CrashClient.CLUSTERFUZZ: |
| 49 cls = findit_for_clusterfuzz.FinditForClusterfuzz |
| 50 else: |
| 51 raise ValueError('FinditForClientID: ' |
| 52 'unknown or unsupported client %s' % client_id) |
| 53 |
| 54 return cls( |
| 55 git_repository.GitRepository(http_client=HttpClientAppengine()), |
| 56 CrashWrapperPipeline) |
| 57 |
| 58 |
| 59 # Some notes about the classes below, for people who are not |
| 60 # familiar with AppEngine. The thing that really kicks everything off |
| 61 # is |CrashWrapperPipeline.run|. However, an important thing to bear in |
| 62 # mind is that whatever arguments are passed to that method will also |
| 63 # be passed to the |run| method on whatever objects it yields. Thus, |
| 64 # all the |run| methods across these different classes must have the same |
| 65 # type. In practice, we end up passing all the arguments to the |
| 66 # constructors, because we need to have the fields around for logging |
| 67 # (e.g., in |finalized|); thus, there's nothing that needs to be passed |
| 68 # to |run|. Another thing to bear in mind is that whatever objects |
| 69 # |CrashWrapperPipeline.run| yields must be JSON-serializable. The base |
| 70 # class handles most of that for us, so the force of this constraint is |
| 71 # that all the arguments to the constructors for those classes must be |
| 72 # JSON-serializable. Thus, we cannot actually pass a Findit object to |
| 73 # the constructor, but rather must pass only the |client_id| (or whatever |
| 74 # JSON dict) and then reconstruct the Findit object from that. Moreover, |
| 75 # the |run| method and the |finalized| method will be run in different |
| 76 # processes, so we will actually end up reconstructing the Findit object |
| 77 # twice. Thus, we shouldn't store anything in the pipeline objects outside |
| 78 # of what their constructors store. |
| 79 |
| 25 class CrashBasePipeline(BasePipeline): | 80 class CrashBasePipeline(BasePipeline): |
| 26 def __init__(self, crash_identifiers, client_id): | 81 def __init__(self, client_id, crash_identifiers): |
| 27 super(CrashBasePipeline, self).__init__(crash_identifiers, client_id) | 82 super(CrashBasePipeline, self).__init__(client_id, crash_identifiers) |
| 28 self.crash_identifiers = crash_identifiers | 83 self._crash_identifiers = crash_identifiers |
| 29 self.client_id = client_id | 84 self._findit = FinditForClientID(client_id) |
| 85 |
| 86 @property |
| 87 def client_id(self): # pragma: no cover |
| 88 return self._findit.client_id |
| 30 | 89 |
| 31 def run(self, *args, **kwargs): | 90 def run(self, *args, **kwargs): |
| 32 raise NotImplementedError() | 91 raise NotImplementedError() |
| 33 | 92 |
| 34 | 93 |
| 35 class CrashAnalysisPipeline(CrashBasePipeline): | 94 class CrashAnalysisPipeline(CrashBasePipeline): |
| 36 def _SetErrorIfAborted(self, aborted): | 95 def finalized(self): # pragma: no cover |
| 37 if not aborted: | 96 if self.was_aborted: |
| 38 return | 97 self._PutAbortedError() |
| 39 | 98 |
| 40 logging.error('Aborted analysis for %s', repr(self.crash_identifiers)) | 99 # N.B., this method must be factored out for unittest reasons; since |
| 41 analysis = findit_for_client.GetAnalysisForClient(self.crash_identifiers, | 100 # |finalized| takes no arguments (by AppEngine's spec) and |was_aborted| |
| 42 self.client_id) | 101 # can't be altered directly. |
| 102 def _PutAbortedError(self): |
| 103 """Update the ndb.Model to indicate that this pipeline was aborted.""" |
| 104 logging.error('Aborted analysis for %s', repr(self._crash_identifiers)) |
| 105 analysis = self._findit.GetAnalysis(self._crash_identifiers) |
| 43 analysis.status = analysis_status.ERROR | 106 analysis.status = analysis_status.ERROR |
| 44 analysis.put() | 107 analysis.put() |
| 45 | 108 |
| 46 def finalized(self): | 109 # TODO(http://crbug.com/659346): we misplaced the coverage test; find it! |
| 47 self._SetErrorIfAborted(self.was_aborted) | 110 # Arguments number differs from overridden method - pylint: disable=W0221 |
| 111 def run(self): |
| 112 # TODO(wrengr): shouldn't this method somehow call _NeedsNewAnalysis |
| 113 # to guard against race conditions? |
| 114 analysis = self._findit.GetAnalysis(self._crash_identifiers) |
| 48 | 115 |
| 49 # Arguments number differs from overridden method - pylint: disable=W0221 | 116 # Update the model's status to say we're in the process of doing analysis. |
| 50 def run(self, crash_identifiers, client_id): | |
| 51 analysis = findit_for_client.GetAnalysisForClient(crash_identifiers, | |
| 52 client_id) | |
| 53 | |
| 54 # Update analysis status. | |
| 55 analysis.pipeline_status_path = self.pipeline_status_path() | 117 analysis.pipeline_status_path = self.pipeline_status_path() |
| 56 analysis.status = analysis_status.RUNNING | 118 analysis.status = analysis_status.RUNNING |
| 57 analysis.started_time = time_util.GetUTCNow() | 119 analysis.started_time = time_util.GetUTCNow() |
| 58 analysis.findit_version = appengine_util.GetCurrentVersion() | 120 analysis.findit_version = appengine_util.GetCurrentVersion() |
| 59 analysis.put() | 121 analysis.put() |
| 60 | 122 |
| 61 # Run the analysis. | 123 # Actually do the analysis. |
| 62 result, tags = findit_for_client.FindCulprit( | 124 result, tags = self._findit.FindCulprit(analysis).ToDicts() |
| 63 analysis, git_repository.GitRepository( | |
| 64 http_client=HttpClientAppengine())) | |
| 65 | 125 |
| 66 # Update analysis status and save the analysis result. | 126 # Update model's status to say we're done, and save the results. |
| 67 analysis.completed_time = time_util.GetUTCNow() | 127 analysis.completed_time = time_util.GetUTCNow() |
| 68 analysis.result = result | 128 analysis.result = result |
| 69 for tag_name, tag_value in tags.iteritems(): | 129 for tag_name, tag_value in tags.iteritems(): |
| 70 # TODO(http://crbug.com/602702): make it possible to add arbitrary tags. | 130 # TODO(http://crbug.com/602702): make it possible to add arbitrary tags. |
| 71 if hasattr(analysis, tag_name): | 131 if hasattr(analysis, tag_name): |
| 72 setattr(analysis, tag_name, tag_value) | 132 setattr(analysis, tag_name, tag_value) |
| 73 analysis.status = analysis_status.COMPLETED | 133 analysis.status = analysis_status.COMPLETED |
| 74 analysis.put() | 134 analysis.put() |
| 75 | 135 |
| 76 | 136 |
| 77 class PublishResultPipeline(CrashBasePipeline): | 137 class PublishResultPipeline(CrashBasePipeline): |
| 138 # TODO(http://crbug.com/659346): we misplaced the coverage test; find it! |
| 78 def finalized(self): | 139 def finalized(self): |
| 79 if self.was_aborted: # pragma: no cover. | 140 if self.was_aborted: # pragma: no cover. |
| 80 logging.error('Failed to publish %s analysis result for %s', | 141 logging.error('Failed to publish %s analysis result for %s', |
| 81 repr(self.crash_identifiers), self.client_id) | 142 repr(self._crash_identifiers), self.client_id) |
| 82 | |
| 83 | 143 |
| 84 # Arguments number differs from overridden method - pylint: disable=W0221 | 144 # Arguments number differs from overridden method - pylint: disable=W0221 |
| 85 def run(self, crash_identifiers, client_id): | 145 def run(self): |
| 86 analysis = findit_for_client.GetAnalysisForClient(crash_identifiers, | 146 analysis = self._findit.GetAnalysis(self._crash_identifiers) |
| 87 client_id) | 147 result = analysis.ToPublishableResult(self._crash_identifiers) |
| 88 result = findit_for_client.GetPublishResultFromAnalysis(analysis, | |
| 89 crash_identifiers, | |
| 90 client_id) | |
| 91 messages_data = [json.dumps(result, sort_keys=True)] | 148 messages_data = [json.dumps(result, sort_keys=True)] |
| 92 | 149 |
| 93 client_config = CrashConfig.Get().GetClientConfig(client_id) | 150 # TODO(http://crbug.com/659354): remove Findit's dependency on CrashConfig. |
| 151 client_config = self._findit.config |
| 94 # TODO(katesonia): Clean string uses in config. | 152 # TODO(katesonia): Clean string uses in config. |
| 95 topic = client_config['analysis_result_pubsub_topic'] | 153 topic = client_config['analysis_result_pubsub_topic'] |
| 96 pubsub_util.PublishMessagesToTopic(messages_data, topic) | 154 pubsub_util.PublishMessagesToTopic(messages_data, topic) |
| 97 logging.info('Published %s analysis result for %s', client_id, | 155 logging.info('Published %s analysis result for %s', self.client_id, |
| 98 repr(crash_identifiers)) | 156 repr(self._crash_identifiers)) |
| 99 | 157 |
| 100 | 158 |
| 101 class CrashWrapperPipeline(BasePipeline): | 159 class CrashWrapperPipeline(BasePipeline): |
| 160 """Fire off pipelines to (1) do the analysis and (2) publish results. |
| 161 |
| 162 The reason we have analysis and publishing as separate pipelines is |
| 163 because each of them can fail for independent reasons. E.g., if we |
| 164 successfully finish the analysis, but then the publishing fails due to |
| 165 network errors, we don't want to have to redo the analysis in order |
| 166 to redo the publishing. We could try to cache the fact that analysis |
| 167 succeeded in the pipeline object itself, but we'd have to be careful |
| 168 because the |run| and |finalized| methods are executed in different |
| 169 processes. |
| 170 """ |
| 171 def __init__(self, client_id, crash_identifiers): |
| 172 super(CrashWrapperPipeline, self).__init__(client_id, crash_identifiers) |
| 173 self._crash_identifiers = crash_identifiers |
| 174 self._client_id = client_id |
| 175 |
| 176 # TODO(http://crbug.com/659346): write coverage tests. |
| 102 # Arguments number differs from overridden method - pylint: disable=W0221 | 177 # Arguments number differs from overridden method - pylint: disable=W0221 |
| 103 def run(self, crash_identifiers, client_id): | 178 def run(self): # pragma: no cover |
| 104 run_analysis = yield CrashAnalysisPipeline(crash_identifiers, client_id) | 179 run_analysis = yield CrashAnalysisPipeline( |
| 180 self._client_id, self._crash_identifiers) |
| 105 with pipeline.After(run_analysis): | 181 with pipeline.After(run_analysis): |
| 106 yield PublishResultPipeline(crash_identifiers, client_id) | 182 yield PublishResultPipeline(self._client_id, self._crash_identifiers) |
| 107 | |
| 108 | |
| 109 @ndb.transactional | |
| 110 def _NeedsNewAnalysis( | |
| 111 crash_identifiers, chrome_version, signature, client_id, | |
| 112 platform, stack_trace, customized_data): | |
| 113 analysis = findit_for_client.GetAnalysisForClient(crash_identifiers, | |
| 114 client_id) | |
| 115 regression_range = findit_for_client.GetRegressionRange(client_id, | |
| 116 customized_data) | |
| 117 if (analysis and not analysis.failed and | |
| 118 regression_range == analysis.regression_range): | |
| 119 logging.info('The analysis of %s has already been done.', | |
| 120 repr(crash_identifiers)) | |
| 121 return False | |
| 122 | |
| 123 # Create analysis for findit to run if this is not a rerun. | |
| 124 if not analysis: | |
| 125 analysis = findit_for_client.CreateAnalysisForClient(crash_identifiers, | |
| 126 client_id) | |
| 127 | |
| 128 findit_for_client.ResetAnalysis(analysis, chrome_version, signature, | |
| 129 client_id, platform, stack_trace, | |
| 130 customized_data, regression_range) | |
| 131 return True | |
| 132 | |
| 133 | |
| 134 def ScheduleNewAnalysisForCrash( | |
| 135 crash_identifiers, chrome_version, signature, client_id, | |
| 136 platform, stack_trace, customized_data, | |
| 137 queue_name=constants.DEFAULT_QUEUE): | |
| 138 """Schedules an analysis.""" | |
| 139 # Check policy and tune arguments if needed. | |
| 140 pass_policy, updated_analysis_args = findit_for_client.CheckPolicyForClient( | |
| 141 crash_identifiers, chrome_version, signature, | |
| 142 client_id, platform, stack_trace, | |
| 143 customized_data) | |
| 144 | |
| 145 if not pass_policy: | |
| 146 return False | |
| 147 | |
| 148 if _NeedsNewAnalysis(*updated_analysis_args): | |
| 149 analysis_pipeline = CrashWrapperPipeline(crash_identifiers, client_id) | |
| 150 # Attribute defined outside __init__ - pylint: disable=W0201 | |
| 151 analysis_pipeline.target = appengine_util.GetTargetNameForModule( | |
| 152 constants.CRASH_BACKEND[client_id]) | |
| 153 analysis_pipeline.start(queue_name=queue_name) | |
| 154 logging.info('New %s analysis is scheduled for %s', client_id, | |
| 155 repr(crash_identifiers)) | |
| 156 return True | |
| 157 | |
| 158 return False | |
| OLD | NEW |