Index: commit-queue/pending_manager.py |
=================================================================== |
--- commit-queue/pending_manager.py (revision 249146) |
+++ commit-queue/pending_manager.py (working copy) |
@@ -1,601 +0,0 @@ |
-# coding=utf8 |
-# Copyright (c) 2012 The Chromium Authors. All rights reserved. |
-# Use of this source code is governed by a BSD-style license that can be |
-# found in the LICENSE file. |
-"""Commit queue manager class. |
- |
-Security implications: |
- |
-The following hypothesis are made: |
-- Commit queue: |
- - Impersonate the same svn credentials that the patchset owner. |
- - Can't impersonate a non committer. |
- - SVN will check the committer write access. |
-""" |
- |
-import datetime |
-import errno |
-import logging |
-import os |
-import socket |
-import ssl |
-import time |
-import traceback |
-import urllib2 |
- |
-import find_depot_tools # pylint: disable=W0611 |
-import checkout |
-import git_cl |
-import patch |
-import subprocess2 |
- |
-import errors |
-import model |
-from verification import base |
- |
- |
-class PendingCommit(base.Verified): |
- """Represents a pending commit that is being processed.""" |
- # Important since they tell if we need to revalidate and send try jobs |
- # again or not if any of these value changes. |
- issue = int |
- patchset = int |
- description = unicode |
- files = list |
- # Only a cache, these values can be regenerated. |
- owner = unicode |
- reviewers = list |
- base_url = unicode |
- messages = list |
- relpath = unicode |
- sort_key = unicode |
- # Only used after a patch was committed. Keeping here for try job retries. |
- revision = (None, int, unicode) |
- |
- def __init__(self, **kwargs): |
- super(PendingCommit, self).__init__(**kwargs) |
- for message in self.messages: |
- # Save storage, no verifier really need 'text', just 'approval'. |
- if 'text' in message: |
- del message['text'] |
- |
- def pending_name(self): |
- """The name that should be used for try jobs. |
- |
- It makes it possible to regenerate the try_jobs array if ever needed.""" |
- return '%d-%d' % (self.issue, self.patchset) |
- |
- def prepare_for_patch(self, context_obj): |
- self.revision = context_obj.checkout.prepare(self.revision) |
- # Verify revision consistency. |
- if not self.revision: |
- raise base.DiscardPending( |
- self, 'Internal error: failed to checkout. Please try again.') |
- |
- def apply_patch(self, context_obj, prepare): |
- """Applies the pending patch to the checkout and throws if it fails.""" |
- try: |
- if prepare: |
- self.prepare_for_patch(context_obj) |
- patches = context_obj.rietveld.get_patch(self.issue, self.patchset) |
- if not patches: |
- raise base.DiscardPending( |
- self, 'No diff was found for this patchset.') |
- if self.relpath: |
- patches.set_relpath(self.relpath) |
- self.files = [p.filename for p in patches] |
- if not self.files: |
- raise base.DiscardPending( |
- self, 'No file was found in this patchset.') |
- context_obj.checkout.apply_patch(patches) |
- except (checkout.PatchApplicationFailed, patch.UnsupportedPatchFormat) as e: |
- raise base.DiscardPending(self, str(e)) |
- except subprocess2.CalledProcessError as e: |
- out = 'Failed to apply the patch.' |
- if e.stdout: |
- out += '\n%s' % e.stdout |
- raise base.DiscardPending(self, out) |
- except (ssl.SSLError, urllib2.HTTPError, urllib2.URLError) as e: |
- raise base.DiscardPending( |
- self, |
- ('Failed to request the patch to try. Please note that binary files ' |
- 'are still unsupported at the moment, this is being worked on.\n\n' |
- 'Thanks for your patience.\n\n%s') % e) |
- |
- |
-class PendingQueue(model.PersistentMixIn): |
- """Represents the queue of pending commits being processed. |
- |
- Each entry is keyed by the issue number as a string to be json-compatible. |
- There can only be one pending commit per issue and they are fine to be |
- processed out of order. |
- """ |
- pending_commits = dict |
- |
- def add(self, item): |
- self.pending_commits[str(item.issue)] = item |
- |
- def get(self, key): |
- return self.pending_commits[str(key)] |
- |
- def iterate(self): |
- """Returns the items sorted by .sort_key to ease testability.""" |
- return sorted(self.pending_commits.itervalues(), key=lambda x: x.sort_key) |
- |
- def remove(self, key): |
- self.pending_commits.pop(str(key), None) |
- |
- |
-class PendingManager(object): |
- """Fetch new issues from rietveld, pass the issues through all of verifiers |
- and then commit the patches with checkout. |
- """ |
- FAILED_NO_MESSAGE = ( |
- 'Commit queue patch verification failed without an error message.\n' |
- 'Something went wrong, probably a crash, a hickup or simply\n' |
- 'the monkeys went out for dinner.\n' |
- 'Please email commit-bot@chromium.org with the CL url.') |
- INTERNAL_EXCEPTION = ( |
- 'Commit queue had an internal error.\n' |
- 'Something went really wrong, probably a crash, a hickup or\n' |
- 'simply the monkeys went out for dinner.\n' |
- 'Please email commit-bot@chromium.org with the CL url.') |
- DESCRIPTION_UPDATED = ( |
- 'Commit queue rejected this change because the description was changed\n' |
- 'between the time the change entered the commit queue and the time it\n' |
- 'was ready to commit. You can safely check the commit box again.') |
- TRYING_PATCH = 'CQ is trying da patch. Follow status at\n' |
- # Maximum number of commits done in a burst. |
- MAX_COMMIT_BURST = 4 |
- # Delay (secs) between commit bursts. |
- COMMIT_BURST_DELAY = 8*60 |
- |
- def __init__(self, context_obj, pre_patch_verifiers, verifiers, |
- project_name=''): |
- """ |
- Args: |
- pre_patch_verifiers: Verifiers objects that are run before applying the |
- patch. |
- verifiers: Verifiers object run after applying the patch. |
- """ |
- if not(len(pre_patch_verifiers) or len(verifiers)): |
- raise ValueError('at least one verifier should be defined (in project %s)' |
- % project_name) |
- |
- self.context = context_obj |
- self.pre_patch_verifiers = pre_patch_verifiers or [] |
- self.verifiers = verifiers or [] |
- self.all_verifiers = pre_patch_verifiers + verifiers |
- self.queue = PendingQueue() |
- # Keep the timestamps of the last few commits so that we can control the |
- # pace (burstiness) of commits. |
- self.recent_commit_timestamps = [] |
- # Assert names are unique. |
- names = [x.name for x in pre_patch_verifiers + verifiers] |
- assert len(names) == len(set(names)) |
- for verifier in self.pre_patch_verifiers: |
- assert not isinstance(verifier, base.VerifierCheckout) |
- |
- def _get_user(self): |
- """Get the CQ's rietveld user name. |
- |
- We need it to look for messages posted by the CQ, to figure out |
- when the commit box was checked. TODO(sergeyberezin): when |
- Rietveld can tell this info natively, this function will be |
- obsolete. |
- """ |
- # Rietveld object or its email may be missing in some unittests. |
- if self.context.rietveld and self.context.rietveld.email: |
- return self.context.rietveld.email |
- else: |
- return 'commit-bot@chromium.org' |
- |
- def _set_sort_key(self, issue_data): |
- """Compute the sorting key. Must add .sort_key to issue_data. |
- |
- Previously we used issue id, but a better key is the last |
- timestamp when Commit box was checked. |
- """ |
- for m in issue_data['messages']: |
- if (m['sender'] == self._get_user() and |
- 'CQ is trying da patch.' in m['text']): |
- issue_data['sort_key'] = m['date'] |
- if 'sort_key' not in issue_data: |
- # Set a default value: the current time. |
- issue_data['sort_key'] = str(datetime.datetime.utcnow()) |
- |
- def look_for_new_pending_commit(self): |
- """Looks for new reviews on self.context.rietveld with c+ set. |
- |
- Calls _new_pending_commit() on all new review found. |
- """ |
- try: |
- new_issues = self.context.rietveld.get_pending_issues() |
- except urllib2.URLError as e: |
- if 'timed out' in e.reason: |
- # Handle timeouts gracefully. Log them and pretend there are no |
- # pending issues. We'll retry on the next iteration. |
- logging.warn('request to fetch pending issues timed out: %s' % e) |
- return |
- |
- raise |
- |
- # If there is an issue in processed_issues that is not in new_issues, |
- # discard it. |
- for pending in self.queue.iterate(): |
- # Note that pending.issue is a int but self.queue.pending_commits keys |
- # are str due to json support. |
- if pending.issue not in new_issues: |
- logging.info('Flushing issue %d' % pending.issue) |
- self.context.status.send( |
- pending, |
- { 'verification': 'abort', |
- 'payload': { |
- 'output': 'CQ bit was unchecked on CL. Ignoring.' }}) |
- pending.get_state = lambda: base.IGNORED |
- self._discard_pending(pending, None) |
- |
- # Find new issues. |
- for issue_id in new_issues: |
- if str(issue_id) not in self.queue.pending_commits: |
- try: |
- issue_data = self.context.rietveld.get_issue_properties( |
- issue_id, True) |
- self._set_sort_key(issue_data) |
- except urllib2.HTTPError as e: |
- if e.code in (500, 502, 503): |
- # Temporary AppEngine hiccup. Just log it and continue. |
- logging.warning('%s while accessing %s. Ignoring error.' % ( |
- str(e), e.url)) |
- continue |
- raise |
- except urllib2.URLError as e: |
- # Temporary AppEngine hiccup. Just log it and continue. |
- if 'timed out' in e.reason: |
- logging.warning( |
- '%s while accessing rietveld issue %s. Ignoring error.' % ( |
- str(e), str(issue_id))) |
- continue |
- raise |
- except socket.error as e: |
- # Temporary AppEngine hiccup. Just log it and continue. |
- if e.errno == errno.ECONNRESET: |
- logging.warning( |
- '%s while accessing rietveld issue %s. Ignoring error.' % ( |
- str(e), str(issue_id))) |
- continue |
- raise |
- except IOError as e: |
- # Temporary AppEngine hiccup. Just log it and continue. |
- if e.errno == 'socket error': |
- logging.warning( |
- '%s while accessing rietveld issue %s. Ignoring error.' % ( |
- str(e), str(issue_id))) |
- continue |
- raise |
- # This assumption needs to hold. |
- assert issue_id == issue_data['issue'] |
- if issue_data['patchsets'] and issue_data['commit']: |
- logging.info('Found new issue %d' % issue_id) |
- self.queue.add( |
- PendingCommit( |
- issue=issue_id, |
- owner=issue_data['owner_email'], |
- reviewers=issue_data['reviewers'], |
- patchset=issue_data['patchsets'][-1], |
- base_url=issue_data['base_url'], |
- description=issue_data['description'].replace('\r', ''), |
- messages=issue_data['messages'], |
- sort_key=issue_data['sort_key'])) |
- |
- def process_new_pending_commit(self): |
- """Starts verification on newly found pending commits.""" |
- expected = set(i.name for i in self.all_verifiers) |
- for pending in self.queue.iterate(): |
- try: |
- # Take in account the case where a verifier was removed. |
- done = set(pending.verifications.keys()) |
- missing = expected - done |
- if (not missing or pending.get_state() != base.PROCESSING): |
- continue |
- logging.info( |
- 'Processing issue %s @ %s (%s, %d)' % ( |
- pending.issue, pending.sort_key, missing, pending.get_state())) |
- self._verify_pending(pending) |
- except base.DiscardPending as e: |
- self._discard_pending(e.pending, e.status) |
- |
- def update_status(self): |
- """Updates the status for each pending commit verifier.""" |
- why_nots = dict((p.issue, p.why_not()) for p in self.queue.iterate()) |
- |
- for verifier in self.all_verifiers: |
- try: |
- verifier.update_status(self.queue.iterate()) |
- except base.DiscardPending as e: |
- # It's not efficient since it takes a full loop for each pending |
- # commit to discard. |
- self._discard_pending(e.pending, e.status) |
- |
- for pending in self.queue.iterate(): |
- why_not = pending.why_not() |
- if why_nots[pending.issue] != why_not: |
- self.context.status.send( |
- pending, |
- {'verification': 'why not', |
- 'payload': {'message': why_not}}) |
- |
- |
- def scan_results(self): |
- """Scans pending commits that can be committed or discarded.""" |
- for pending in self.queue.iterate(): |
- state = pending.get_state() |
- if state == base.FAILED: |
- self._discard_pending( |
- pending, pending.error_message() or self.FAILED_NO_MESSAGE) |
- elif state == base.SUCCEEDED: |
- if self._throttle(pending): |
- continue |
- try: |
- # Runs checks. It's be nice to run the test before the postpone, |
- # especially if the tree is closed for a long moment but at the same |
- # time it would keep fetching the rietveld status constantly. |
- self._last_minute_checks(pending) |
- self.context.status.send( |
- pending, |
- {'verification': 'why not', |
- 'payload': {'message': ''}}) |
- |
- self._commit_patch(pending) |
- except base.DiscardPending as e: |
- self._discard_pending(e.pending, e.status) |
- except Exception as e: |
- self._discard_pending(pending, self.INTERNAL_EXCEPTION) |
- raise |
- else: |
- # When state is IGNORED, we need to keep this issue so it's not fetched |
- # another time but we can't discard it since we don't want to remove the |
- # commit bit for another project hosted on the same code review |
- # instance. |
- assert state in (base.PROCESSING, base.IGNORED) |
- |
- def _verify_pending(self, pending): |
- """Initiates all the verifiers on a pending change.""" |
- # Do not apply the patch if not necessary. It will be applied at commit |
- # time anyway so if the patch doesn't apply, it'll be catch later. |
- if not self._pending_run_verifiers(pending, self.pre_patch_verifiers): |
- return |
- |
- if self.verifiers: |
- pending.prepare_for_patch(self.context) |
- |
- # This CL is real business, alert the user that we're going to try his |
- # patch. Note that this is done *after* syncing but *before* applying the |
- # patch. |
- self.context.status.send( |
- pending, |
- { 'verification': 'initial', |
- 'payload': {'revision': pending.revision}}) |
- self.context.rietveld.add_comment( |
- pending.issue, |
- self.TRYING_PATCH + '%s/%s/%d/%d\n' % ( |
- self.context.status.url, pending.owner, |
- pending.issue, pending.patchset)) |
- |
- if self.verifiers: |
- pending.apply_patch(self.context, False) |
- previous_cwd = os.getcwd() |
- try: |
- os.chdir(self.context.checkout.project_path) |
- self._pending_run_verifiers(pending, self.verifiers) |
- finally: |
- os.chdir(previous_cwd) |
- |
- # Send the initial 'why not' message. |
- if pending.why_not(): |
- self.context.status.send( |
- pending, |
- {'verification': 'why not', |
- 'payload': {'message': pending.why_not()}}) |
- |
- @classmethod |
- def _pending_run_verifiers(cls, pending, verifiers): |
- """Runs verifiers on a pending change. |
- |
- Returns True if all Verifiers were run. |
- """ |
- for verifier in verifiers: |
- assert verifier.name not in pending.verifications |
- verifier.verify(pending) |
- assert verifier.name in pending.verifications |
- if pending.get_state() == base.IGNORED: |
- assert pending.verifications[verifier.name].get_state() == base.IGNORED |
- # Remove all the other verifiers since we need to keep it in the |
- # 'datastore' to not retry this issue constantly. |
- for key in pending.verifications.keys(): |
- if key != verifier.name: |
- del pending.verifications[key] |
- return False |
- if pending.get_state() == base.FAILED: |
- # Throw if it didn't pass, so the error message is not lost. |
- raise base.DiscardPending( |
- pending, pending.error_message() or cls.FAILED_NO_MESSAGE) |
- return True |
- |
- def _last_minute_checks(self, pending): |
- """Does last minute checks on Rietvld before committing a pending patch.""" |
- pending_data = self.context.rietveld.get_issue_properties( |
- pending.issue, True) |
- if pending_data['commit'] != True: |
- raise base.DiscardPending(pending, None) |
- if pending_data['closed'] != False: |
- raise base.DiscardPending(pending, None) |
- if pending.description != pending_data['description'].replace('\r', ''): |
- raise base.DiscardPending(pending, self.DESCRIPTION_UPDATED) |
- commit_user = set([self.context.rietveld.email]) |
- expected = set(pending.reviewers) - commit_user |
- actual = set(pending_data['reviewers']) - commit_user |
- # Try to be nice, if there was a drive-by review and the new reviewer left |
- # a lgtm, don't abort. |
- def is_approver(r): |
- return any( |
- m.get('approval') for m in pending_data['messages'] |
- if m['sender'] == r) |
- drivers_by = [r for r in (actual - expected) if not is_approver(r)] |
- if drivers_by: |
- # That annoying driver-by. |
- raise base.DiscardPending( |
- pending, |
- 'List of reviewers changed. %s did a drive-by without LGTM\'ing!' % |
- ','.join(drivers_by)) |
- if pending.patchset != pending_data['patchsets'][-1]: |
- raise base.DiscardPending(pending, |
- 'Commit queue failed due to new patchset.') |
- |
- def _discard_pending(self, pending, message): |
- """Discards a pending commit. Attach an optional message to the review.""" |
- logging.debug('_discard_pending(%s, %s)', pending.issue, message) |
- try: |
- try: |
- if pending.get_state() != base.IGNORED: |
- self.context.rietveld.set_flag( |
- pending.issue, pending.patchset, 'commit', False) |
- except urllib2.HTTPError as e: |
- logging.error( |
- 'Failed to set the flag to False for %s with message %s' % ( |
- pending.pending_name(), message)) |
- traceback.print_stack() |
- logging.error(str(e)) |
- errors.send_stack(e) |
- if message: |
- try: |
- self.context.rietveld.add_comment(pending.issue, message) |
- except urllib2.HTTPError as e: |
- logging.error( |
- 'Failed to add comment for %s with message %s' % ( |
- pending.pending_name(), message)) |
- traceback.print_stack() |
- errors.send_stack(e) |
- self.context.status.send( |
- pending, |
- { 'verification': 'abort', |
- 'payload': { |
- 'output': message }}) |
- finally: |
- # Most importantly, remove the PendingCommit from the queue. |
- self.queue.remove(pending.issue) |
- |
- def _commit_patch(self, pending): |
- """Commits the pending patch to the repository. |
- |
- Do the checkout and applies the patch. |
- """ |
- try: |
- try: |
- # Make sure to apply on HEAD. |
- pending.revision = None |
- pending.apply_patch(self.context, True) |
- # Commit it. |
- commit_desc = git_cl.ChangeDescription(pending.description) |
- if (self.context.server_hooks_missing and |
- self.context.rietveld.email != pending.owner): |
- commit_desc.update_reviewers(pending.reviewers) |
- commit_desc.append_footer('Author: ' + pending.owner) |
- commit_desc.append_footer('Review URL: %s/%s' % ( |
- self.context.rietveld.url, |
- pending.issue)) |
- pending.revision = self.context.checkout.commit( |
- commit_desc.description, pending.owner) |
- if not pending.revision: |
- raise base.DiscardPending(pending, 'Failed to commit patch.') |
- |
- # Note that the commit succeeded for commit throttling. |
- self.recent_commit_timestamps.append(time.time()) |
- self.recent_commit_timestamps = ( |
- self.recent_commit_timestamps[-(self.MAX_COMMIT_BURST + 1):]) |
- |
- viewvc_url = self.context.checkout.get_settings('VIEW_VC') |
- issue_desc = git_cl.ChangeDescription(pending.description) |
- msg = 'Committed: %s' % pending.revision |
- if viewvc_url: |
- viewvc_url = '%s%s' % (viewvc_url.rstrip('/'), pending.revision) |
- msg = 'Committed: %s' % viewvc_url |
- issue_desc.append_footer(msg) |
- |
- # Update the CQ dashboard. |
- self.context.status.send( |
- pending, |
- { 'verification': 'commit', |
- 'payload': { |
- 'revision': pending.revision, |
- 'output': msg, |
- 'url': viewvc_url}}) |
- |
- # Closes the issue on Rietveld. |
- # TODO(csharp): Retry if exceptions are encountered. |
- try: |
- self.context.rietveld.close_issue(pending.issue) |
- self.context.rietveld.update_description( |
- pending.issue, issue_desc.description) |
- self.context.rietveld.add_comment( |
- pending.issue, 'Change committed as %s' % pending.revision) |
- except (urllib2.HTTPError, urllib2.URLError) as e: |
- # Ignore AppEngine flakiness. |
- logging.warning('Unable to fully close the issue') |
- # And finally remove the issue. If the close_issue() call above failed, |
- # it is possible the dashboard will be confused but it is harmless. |
- try: |
- self.queue.get(pending.issue) |
- except KeyError: |
- logging.error('Internal inconsistency for %d', pending.issue) |
- self.queue.remove(pending.issue) |
- except ( |
- checkout.PatchApplicationFailed, patch.UnsupportedPatchFormat) as e: |
- raise base.DiscardPending(pending, str(e)) |
- except subprocess2.CalledProcessError as e: |
- stdout = getattr(e, 'stdout', None) |
- out = 'Failed to apply the patch.' |
- if stdout: |
- out += '\n%s' % stdout |
- raise base.DiscardPending(pending, out) |
- except base.DiscardPending as e: |
- self._discard_pending(e.pending, e.status) |
- |
- def _throttle(self, pending): |
- """Returns True if a commit should be delayed.""" |
- if pending.postpone(): |
- self.context.status.send( |
- pending, |
- {'verification': 'why not', |
- 'payload': { |
- 'message': pending.why_not()}}) |
- return True |
- if not self.recent_commit_timestamps: |
- return False |
- cutoff = time.time() - self.COMMIT_BURST_DELAY |
- bursted = len([True for i in self.recent_commit_timestamps if i > cutoff]) |
- |
- if bursted >= self.MAX_COMMIT_BURST: |
- self.context.status.send( |
- pending, |
- {'verification': 'why not', |
- 'payload': { |
- 'message': ('Patch is ready to commit, but the CQ is delaying ' |
- 'it because CQ has already submitted %d patches in ' |
- 'the last %d seconds' % |
- (self.MAX_COMMIT_BURST, self.COMMIT_BURST_DELAY))}}) |
- return True |
- |
- return False |
- |
- def load(self, filename): |
- """Loads the commit queue state from a JSON file.""" |
- self.queue = model.load_from_json_file(filename) |
- |
- def save(self, filename): |
- """Save the commit queue state in a simple JSON file.""" |
- model.save_to_json_file(filename, self.queue) |
- |
- def close(self): |
- """Close all the active pending manager items.""" |
- self.context.status.close() |