Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(114)

Unified Diff: commit-queue/tests/pending_manager_test.py

Issue 135363007: Delete public commit queue to avoid confusion after move to internal repo (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/
Patch Set: Created 6 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « commit-queue/tests/natsort_test.py ('k') | commit-queue/tests/presubmit_check_test.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: commit-queue/tests/pending_manager_test.py
===================================================================
--- commit-queue/tests/pending_manager_test.py (revision 249146)
+++ commit-queue/tests/pending_manager_test.py (working copy)
@@ -1,732 +0,0 @@
-#!/usr/bin/env python
-# Copyright (c) 2012 The Chromium Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Unit tests for pending_manager.py."""
-
-import logging
-import os
-import re
-import sys
-import time
-import traceback
-import unittest
-import urllib2
-
-ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
-sys.path.insert(0, os.path.join(ROOT_DIR, '..'))
-
-import find_depot_tools # pylint: disable=W0611
-import breakpad
-
-import context
-import pending_manager
-from verification import base
-from verification import fake
-from verification import project_base
-from verification import reviewer_lgtm
-
-# In tests/
-import mocks
-
-
-def read(filename):
- f = open(filename, 'rb')
- content = f.read()
- f.close()
- return content
-
-
-def write(filename, content):
- f = open(filename, 'wb')
- f.write(content)
- f.close()
-
-
-def trim(x):
- return x.replace(' ', '').replace('\n', '')
-
-
-def _try_comment(issue=31337):
- return (
- "add_comment(%d, u'%shttp://localhost/author@example.com/%d/1\\n')" %
- (issue, pending_manager.PendingManager.TRYING_PATCH.replace('\n', '\\n'),
- issue))
-
-
-class TestPendingManager(mocks.TestCase):
- def setUp(self):
- super(TestPendingManager, self).setUp()
- self.root_dir = ROOT_DIR
-
- def testLoadSave(self):
- pc = pending_manager.PendingManager(
- context.Context(None, None, mocks.AsyncPushMock(self)),
- [fake.FakeVerifier(base.SUCCEEDED)],
- [])
- filename = os.path.join(self.root_dir, 'foo.json')
- empty = """{
- "__persistent_type__": "PendingQueue",
- "pending_commits": {}
-}
-"""
- write(filename, empty)
- try:
- pc.load(filename)
- self.assertEqual(pc.queue.pending_commits, {})
- pc.save(filename)
- self.assertEqual(trim(empty), trim(read(filename)))
- finally:
- os.remove(filename)
- if os.path.exists(filename + '.old'):
- os.remove(filename + '.old')
-
- def _get_pc(self, verifiers_no_patch, verifiers):
- return pending_manager.PendingManager(
- self.context, verifiers_no_patch, verifiers)
-
- def _check_standard_verification(self, pc, success, defered):
- """Verifies the checkout and rietveld calls."""
- pc.scan_results()
- self.assertEqual(len(pc.queue.iterate()), 0)
- issue = 31337
- if pc.verifiers:
- if success:
- self.context.checkout.check_calls(
- [ 'prepare(None)',
- 'apply_patch(%r)' % (self.context.rietveld.patchsets[0],),
- 'prepare(None)', # Will sync to HEAD/124.
- 'apply_patch(%r)' % (self.context.rietveld.patchsets[1],),
- (
- "commit(u'foo\\n\\n"
- "Review URL: http://nowhere/%d', "
- "u'author@example.com')") % issue])
- self.context.rietveld.check_calls(
- [ _try_comment(),
- 'close_issue(%d)' % issue,
- "update_description(%d, u'foo')" % issue,
- "add_comment(%d, 'Change committed as 125')" % issue])
- else:
- self.context.checkout.check_calls(
- [ 'prepare(None)',
- 'apply_patch(%r)' % (self.context.rietveld.patchsets[0],)])
- self.context.rietveld.check_calls(
- [ _try_comment(),
- "set_flag(%d, 1, 'commit', False)" % issue,
- "add_comment(%d, %r)" % (issue, pc.FAILED_NO_MESSAGE)])
- else:
- if success:
- self.context.checkout.check_calls(
- self._prepare_apply_commit(0, issue))
- self.context.rietveld.check_calls(
- [ _try_comment(),
- 'close_issue(%d)' % issue,
- "update_description(%d, u'foo')" % issue,
- "add_comment(%d, 'Change committed as 125')" % issue])
- else:
- # checkout is never touched in that case.
- self.context.checkout.check_calls([])
- if defered:
- self.context.rietveld.check_calls(
- [ _try_comment(),
- "set_flag(%d, 1, 'commit', False)" % issue,
- "add_comment(%d, %r)" % (issue, pc.FAILED_NO_MESSAGE)])
- else:
- self.context.rietveld.check_calls(
- [ "set_flag(%d, 1, 'commit', False)" % issue,
- "add_comment(%d, %r)" % (issue, pc.FAILED_NO_MESSAGE)])
-
- def _prepare_apply_commit(self, index, issue, server_hooks_missing=False):
- """Returns a frequent sequence of action happening on the Checkout object.
-
- The list returned by this function should be used as an argument to
- self.context.checkout.check_calls().
- """
- seq = [
- # Reverts any previous modification or checkout the tree if it was not
- # present.
- 'prepare(None)',
- # Applies the requested PatchSet.
- 'apply_patch(%r)' % self.context.rietveld.patchsets[index],
- ]
- # Commits the patch.
- author_and_reviewer = ''
- if server_hooks_missing:
- author_and_reviewer = (
- '\\n\\nR=rev@example.com\\n\\nAuthor: author@example.com')
- commit_message = (
- "commit(u'foo%s\\n\\n"
- "Review URL: http://nowhere/%d', "
- "u'author@example.com')") % (author_and_reviewer, issue)
- seq.append(commit_message)
-
- return seq
-
- def testNoVerification(self):
- try:
- self._get_pc([], [])
- except ValueError:
- pass
- else:
- self.fail(msg='A PendingManager must require at least one verifier.')
-
- try:
- # Cannot have the same verifier two times.
- self._get_pc(
- [fake.FakeVerifier(base.SUCCEEDED)],
- [fake.FakeVerifier(base.SUCCEEDED)])
- except AssertionError:
- pass
- else:
- self.fail(msg='A PendingManager should not accept the same verifier'
- ' two times.')
-
- def _check_1(self, pc, result):
- issue = 31337
- # 'initial' won't be sent if the pre-patch verification fails, this is to
- # not add noise for ignored CLs.
- send_initial_packet = (result == base.SUCCEEDED or pc.verifiers)
- self.assertEqual(len(pc.queue.iterate()), 0)
- pc.look_for_new_pending_commit()
- self.assertEqual(len(pc.queue.iterate()), 1)
- commit = pc.queue.get(issue)
- self.assertEqual(len(commit.verifications), 0)
- pc.process_new_pending_commit()
- if result == base.FAILED:
- self.assertEqual([], pc.queue.iterate())
- else:
- commit = pc.queue.get(issue)
- self.assertEqual(commit.verifications['fake'].get_state(), result)
- self.assertEqual(len(commit.verifications), 1)
- pc.update_status()
- if result == base.FAILED:
- self.assertEqual([], pc.queue.iterate())
- else:
- commit = pc.queue.get(issue)
- self.assertEqual(commit.verifications['fake'].get_state(), result)
- self.assertEqual('', commit.relpath)
- self.assertEqual(len(commit.verifications), 1)
- self._check_standard_verification(pc, result == base.SUCCEEDED, False)
-
- if result == base.SUCCEEDED:
- self.context.status.check_names(['initial', 'why not', 'commit'])
- elif send_initial_packet:
- self.context.status.check_names(['initial', 'abort'])
- else:
- # Only happens when there is no verifier that requires a patch.
- self.context.status.check_names(['abort'])
-
- def testNoPatchVerification(self):
- pc = self._get_pc([fake.FakeVerifier(base.SUCCEEDED)], [])
- self._check_1(pc, base.SUCCEEDED)
-
- def testPatchVerification(self):
- pc = self._get_pc([], [fake.FakeVerifier(base.SUCCEEDED)])
- self._check_1(pc, base.SUCCEEDED)
-
- def testNoPatchVerificationFail(self):
- pc = self._get_pc([fake.FakeVerifier(base.FAILED)], [])
- self._check_1(pc, base.FAILED)
-
- def testPatchVerificationFail(self):
- pc = self._get_pc([], [fake.FakeVerifier(base.FAILED)])
- self._check_1(pc, base.FAILED)
-
- def testPatchDiscardThrows(self):
- # Handle HTTPError correctly.
- result = []
- issue = 31337
- pc = self._get_pc([], [fake.FakeVerifier(base.FAILED)])
-
- def set_flag_throw(_issue, _patchset, _flag, _value):
- raise urllib2.HTTPError(None, None, None, None, None)
-
- def send_stack(*_args, **_kwargs):
- result.append(True)
-
- self.mock(breakpad, 'SendStack', send_stack)
- self.mock(traceback, 'print_stack', lambda: None)
- self.mock(logging, 'error', lambda _: None)
- pc.context.rietveld.set_flag = set_flag_throw
-
- self.assertEqual(len(pc.queue.iterate()), 0)
- pc.look_for_new_pending_commit()
- self.assertEqual(len(pc.queue.iterate()), 1)
- commit = pc.queue.get(issue)
- self.assertEqual(len(commit.verifications), 0)
- pc.process_new_pending_commit()
- self.assertEqual([], pc.queue.iterate())
- pc.update_status()
- self.assertEqual([], pc.queue.iterate())
- self.context.checkout.check_calls(
- [ 'prepare(None)',
- 'apply_patch(%r)' % (self.context.rietveld.patchsets[0],),
- ])
- self.context.rietveld.check_calls(
- [ _try_comment(),
- "add_comment(%d, %r)" % (issue, pc.FAILED_NO_MESSAGE),
- ])
- self.context.status.check_names(['initial', 'abort'])
-
- def _check_defer_1(self, pc, result):
- issue = 31337
- self.assertEqual(len(pc.queue.iterate()), 0)
- pc.look_for_new_pending_commit()
- self.assertEqual(len(pc.queue.iterate()), 1)
- commit = pc.queue.get(issue)
- self.assertEqual(len(commit.verifications), 0)
- pc.process_new_pending_commit()
- commit = pc.queue.get(issue)
- self.assertEqual('', commit.relpath)
- self.assertEqual(commit.verifications['fake'].get_state(), base.PROCESSING)
- self.assertEqual(len(commit.verifications), 1)
- pc.update_status()
- commit = pc.queue.get(issue)
- self.assertEqual('', commit.relpath)
- self.assertEqual(commit.verifications['fake'].get_state(), result)
- self.assertEqual(len(commit.verifications), 1)
- self._check_standard_verification(pc, result == base.SUCCEEDED, True)
- if result == base.SUCCEEDED:
- self.context.status.check_names(['initial', 'why not', 'why not',
- 'why not', 'commit'])
- else:
- self.context.status.check_names(['initial', 'why not', 'why not',
- 'abort'])
-
- def testDeferNoPatchVerification(self):
- pc = self._get_pc([fake.DeferredFakeVerifier(base.SUCCEEDED, 0)], [])
- self._check_defer_1(pc, base.SUCCEEDED)
-
- def testDeferPatchVerification(self):
- pc = self._get_pc([], [fake.DeferredFakeVerifier(base.SUCCEEDED, 0)])
- self._check_defer_1(pc, base.SUCCEEDED)
-
- def testDeferNoPatchVerificationFail(self):
- pc = self._get_pc([fake.DeferredFakeVerifier(base.FAILED, 0)], [])
- self._check_defer_1(pc, base.FAILED)
-
- def testDeferPatchVerificationFail(self):
- pc = self._get_pc([], [fake.DeferredFakeVerifier(base.FAILED, 0)])
- self._check_defer_1(pc, base.FAILED)
-
- def _check_4(self, f1, f2, f3, f4):
- issue = 31337
- fake1 = fake.FakeVerifier(f1)
- fake1.name = 'fake1'
- fake2 = fake.FakeVerifier(f2)
- fake2.name = 'fake2'
- fake3 = fake.FakeVerifier(f3)
- fake3.name = 'fake3'
- fake4 = fake.FakeVerifier(f4)
- fake4.name = 'fake4'
- nb = 1
- if f1 is base.SUCCEEDED:
- nb = 2
- if f2 is base.SUCCEEDED:
- nb = 3
- if f3 is base.SUCCEEDED:
- nb = 4
- pc = self._get_pc([fake1, fake2], [fake3, fake4])
- self.assertEqual(len(pc.queue.iterate()), 0)
- pc.look_for_new_pending_commit()
- self.assertEqual(len(pc.queue.iterate()), 1)
- commit = pc.queue.get(issue)
- self.assertEqual(len(commit.verifications), 0)
- pc.process_new_pending_commit()
- if not all(f == base.SUCCEEDED for f in (f1, f2, f3, f4)):
- self.assertEqual([], pc.queue.iterate())
- else:
- commit = pc.queue.get(issue)
- self.assertEqual(commit.verifications['fake1'].get_state(), f1)
- self.assertEqual(commit.verifications['fake2'].get_state(), f2)
- self.assertEqual(commit.verifications['fake3'].get_state(), f3)
- self.assertEqual(commit.verifications['fake4'].get_state(), f4)
- self.assertEqual(len(commit.verifications), nb)
- pc.update_status()
- if not all(f == base.SUCCEEDED for f in (f1, f2, f3, f4)):
- self.assertEqual([], pc.queue.iterate())
- else:
- commit = pc.queue.get(issue)
- self.assertEqual(commit.verifications['fake1'].get_state(), f1)
- self.assertEqual(commit.verifications['fake2'].get_state(), f2)
- self.assertEqual(commit.verifications['fake3'].get_state(), f3)
- self.assertEqual(commit.verifications['fake4'].get_state(), f4)
- self.assertEqual(len(commit.verifications), nb)
- self._check_standard_verification(
- pc, all(x == base.SUCCEEDED for x in (f1, f2, f3, f4)), False)
- if all(x == base.SUCCEEDED for x in (f1, f2, f3, f4)):
- self.context.status.check_names(['initial', 'why not', 'commit'])
- else:
- self.context.status.check_names(['initial', 'abort'])
-
- def test4thVerificationFail(self):
- self._check_4(base.SUCCEEDED, base.SUCCEEDED, base.SUCCEEDED, base.FAILED)
-
- def test4Verification(self):
- self._check_4(
- base.SUCCEEDED, base.SUCCEEDED, base.SUCCEEDED, base.SUCCEEDED)
-
- def test4Verification3rdFail(self):
- self._check_4(base.SUCCEEDED, base.SUCCEEDED, base.FAILED, base.SUCCEEDED)
-
- def _check_defer_4(self, f1, f2, f3, f4):
- issue = 31337
- fake1 = fake.DeferredFakeVerifier(f1, 0)
- fake1.name = 'fake1'
- fake2 = fake.DeferredFakeVerifier(f2, 0)
- fake2.name = 'fake2'
- fake3 = fake.DeferredFakeVerifier(f3, 0)
- fake3.name = 'fake3'
- fake4 = fake.DeferredFakeVerifier(f4, 0)
- fake4.name = 'fake4'
- pc = self._get_pc([fake1, fake2], [fake3, fake4])
- self.assertEqual(len(pc.queue.iterate()), 0)
- pc.look_for_new_pending_commit()
- self.assertEqual(len(pc.queue.iterate()), 1)
- commit = pc.queue.get(issue)
- self.assertEqual(len(commit.verifications), 0)
- pc.process_new_pending_commit()
- commit = pc.queue.get(issue)
- self.assertEqual(
- commit.verifications['fake1'].get_state(), base.PROCESSING)
- self.assertEqual(
- commit.verifications['fake2'].get_state(), base.PROCESSING)
- self.assertEqual(
- commit.verifications['fake3'].get_state(), base.PROCESSING)
- self.assertEqual(
- commit.verifications['fake4'].get_state(), base.PROCESSING)
- self.assertEqual(len(commit.verifications), 4)
- pc.update_status()
- self.assertEqual(commit.verifications['fake1'].get_state(), f1)
- self.assertEqual(commit.verifications['fake2'].get_state(), f2)
- self.assertEqual(commit.verifications['fake3'].get_state(), f3)
- self.assertEqual(commit.verifications['fake4'].get_state(), f4)
- self.assertEqual('', commit.relpath)
- self._check_standard_verification(
- pc, all(x == base.SUCCEEDED for x in (f1, f2, f3, f4)), False)
- if all(x == base.SUCCEEDED for x in (f1, f2, f3, f4)):
- self.context.status.check_names(['initial', 'why not', 'why not',
- 'why not', 'commit'])
- else:
- self.context.status.check_names(['initial', 'why not', 'why not',
- 'abort'])
-
- def testDefer4thVerificationFail(self):
- self._check_defer_4(
- base.SUCCEEDED, base.SUCCEEDED, base.SUCCEEDED, base.FAILED)
-
- def testDefer4Verification(self):
- self._check_defer_4(
- base.SUCCEEDED, base.SUCCEEDED, base.SUCCEEDED, base.SUCCEEDED)
-
- def testDefer4Verification3rdFail(self):
- self._check_defer_4(
- base.SUCCEEDED, base.SUCCEEDED, base.FAILED, base.SUCCEEDED)
-
- def testRelPath(self):
- issue = 31337
- verifiers = [
- project_base.ProjectBaseUrlVerifier(
- [r'^%s(.*)$' % re.escape(r'http://example.com/')]),
- ]
- pc = self._get_pc([], verifiers)
- pc.context.rietveld.issues[issue]['base_url'] = 'http://example.com/sub/dir'
- pc.look_for_new_pending_commit()
- self.assertEqual(1, len(pc.queue.iterate()))
- pc.process_new_pending_commit()
- self.assertEqual('sub/dir', pc.queue.get(issue).relpath)
- self.context.checkout.check_calls(
- [ 'prepare(None)',
- 'apply_patch(%r)' % (self.context.rietveld.patchsets[0],)])
- pc.update_status()
- self.context.checkout.check_calls([])
- pc.scan_results()
- self.context.checkout.check_calls(
- # Will sync to HEAD, 124.
- self._prepare_apply_commit(1, issue))
- self.context.rietveld.check_calls(
- [ _try_comment(),
- 'close_issue(%d)' % issue,
- "update_description(%d, u'foo')" % issue,
- "add_comment(%d, 'Change committed as 125')" % issue])
- self.context.status.check_names(['initial', 'why not', 'commit'])
-
- def testCommitBurst(self):
- issue = 31337
- pc = self._get_pc([fake.FakeVerifier(base.SUCCEEDED)], [])
- self.assertEqual(4, pc.MAX_COMMIT_BURST)
- timestamp = [1]
- self.mock(time, 'time', lambda: timestamp[-1])
- for i in range(pc.MAX_COMMIT_BURST + 2):
- self.context.rietveld.issues[i] = (
- self.context.rietveld.issues[issue].copy())
- self.context.rietveld.issues[i]['issue'] = i
- pc.look_for_new_pending_commit()
- self.assertEqual(len(pc.queue.iterate()), pc.MAX_COMMIT_BURST + 3)
- pc.process_new_pending_commit()
- pc.update_status()
- pc.scan_results()
- self.context.checkout.check_calls(
- self._prepare_apply_commit(0, 0) +
- self._prepare_apply_commit(1, 1) +
- self._prepare_apply_commit(2, 2) +
- self._prepare_apply_commit(3, 3))
- self.context.rietveld.check_calls(
- [ _try_comment(0),
- _try_comment(1),
- _try_comment(2),
- _try_comment(3),
- _try_comment(4),
- _try_comment(5),
- _try_comment(),
- 'close_issue(0)',
- "update_description(0, u'foo')",
- "add_comment(0, 'Change committed as 125')",
- 'close_issue(1)',
- "update_description(1, u'foo')",
- "add_comment(1, 'Change committed as 125')",
- 'close_issue(2)',
- "update_description(2, u'foo')",
- "add_comment(2, 'Change committed as 125')",
- 'close_issue(3)',
- "update_description(3, u'foo')",
- "add_comment(3, 'Change committed as 125')",
- ])
- self.assertEqual(3, len(pc.queue.iterate()))
- total = pc.MAX_COMMIT_BURST + 3
- self.context.status.check_names(['initial'] * total +
- (['why not', 'commit'] *
- pc.MAX_COMMIT_BURST) +
- ['why not'] * 3)
-
- # Dry run.
- pc.scan_results()
- self.context.checkout.check_calls([])
- self.context.rietveld.check_calls([])
- self.context.status.check_names(['why not'] * 3)
-
- # Remove one item from the burst.
- pc.recent_commit_timestamps.pop()
- pc.scan_results()
- next_item = pc.MAX_COMMIT_BURST
- self.context.checkout.check_calls(
- self._prepare_apply_commit(next_item, next_item))
- self.context.rietveld.check_calls(
- [ 'close_issue(%d)' % next_item,
- "update_description(%d, u'foo')" % next_item,
- "add_comment(%d, 'Change committed as 125')" % next_item,
- ])
- self.context.status.check_names(['why not', 'commit'] + ['why not'] * 2)
- # After a delay, must flush the queue.
- timestamp.append(timestamp[-1] + pc.COMMIT_BURST_DELAY + 1)
- pc.scan_results()
- self.context.checkout.check_calls(
- self._prepare_apply_commit(next_item + 1, next_item + 1) +
- self._prepare_apply_commit(next_item + 2, issue))
- self.context.rietveld.check_calls(
- [ 'close_issue(%d)' % (next_item + 1),
- "update_description(%d, u'foo')" % (next_item + 1),
- "add_comment(%d, 'Change committed as 125')" % (next_item + 1),
- 'close_issue(%d)' % issue,
- "update_description(%d, u'foo')" % issue,
- "add_comment(%d, 'Change committed as 125')" % issue])
- self.context.status.check_names(['why not', 'commit'] * 2)
-
- def testIgnored(self):
- issue = 31337
- verifiers = [
- project_base.ProjectBaseUrlVerifier(
- [r'^%s(.*)$' % re.escape(r'http://example.com/')]),
- ]
- pc = self._get_pc(verifiers, [])
- pc.context.rietveld.issues[issue]['base_url'] = 'http://unrelated.com/sub'
- pc.look_for_new_pending_commit()
- pc.process_new_pending_commit()
- pc.update_status()
- pc.scan_results()
- self.assertEqual(1, len(pc.queue.iterate()))
- self.assertEqual('', pc.queue.get(issue).relpath)
- self.assertEqual(base.IGNORED, pc.queue.get(issue).get_state())
-
- def testServerHooksMissing(self):
- issue = 31337
- verifiers = [
- project_base.ProjectBaseUrlVerifier(
- [r'^%s(.*)$' % re.escape(r'http://example.com/')]),
- ]
- self.context.server_hooks_missing = True
- pc = self._get_pc(verifiers, [])
- pc.context.rietveld.issues[issue]['base_url'] = 'http://example.com/'
- pc.look_for_new_pending_commit()
- pc.process_new_pending_commit()
- pc.update_status()
- pc.scan_results()
- self.context.rietveld.check_calls(
- [ _try_comment(),
- 'close_issue(%d)' % issue,
- "update_description(%d, u'foo')" % issue,
- "add_comment(%d, 'Change committed as 125')" % issue])
- self.context.status.check_names(['initial', 'why not', 'commit'])
- self.context.checkout.check_calls(
- self._prepare_apply_commit(0, issue, server_hooks_missing=True))
-
- def testDisapeared(self):
- issue = 31337
- verifiers = [
- project_base.ProjectBaseUrlVerifier(
- [r'^%s(.*)$' % re.escape(r'http://example.com/')]),
- ]
- pc = self._get_pc(verifiers, [])
- pc.context.rietveld.issues[issue]['base_url'] = 'http://unrelated.com/sub'
- pc.look_for_new_pending_commit()
- pc.process_new_pending_commit()
- pc.update_status()
- pc.scan_results()
- self.assertEqual(1, len(pc.queue.iterate()))
- del pc.context.rietveld.issues[issue]
- pc.look_for_new_pending_commit()
- pc.process_new_pending_commit()
- pc.update_status()
- pc.scan_results()
- self.assertEqual(0, len(pc.queue.iterate()))
- self.context.status.check_names(['abort'])
-
- def _get_pc_reviewer(self):
- verifiers = [
- reviewer_lgtm.ReviewerLgtmVerifier(
- ['.*'], [re.escape('commit-bot@example.com')])
- ]
- pc = self._get_pc(verifiers, [])
- return pc
-
- def _approve(self, sender=None):
- issue = 31337
- if not sender:
- sender = self.context.rietveld.issues[issue]['reviewers'][0]
- self.context.rietveld.issues[issue]['messages'].append(
- {
- 'approval': True,
- 'sender': sender,
- })
-
- def testVerifyDefaultMock(self):
- # Verify mock expectation for the default settings.
- issue = 31337
- pc = self._get_pc_reviewer()
- self.assertEqual(0, len(pc.queue.iterate()))
- pc.look_for_new_pending_commit()
- self.assertEqual(1, len(pc.queue.iterate()))
- # Pop the LGTM.
- pc.queue.iterate()[0].messages.pop(1)
- pc.process_new_pending_commit()
- self.assertEqual(0, len(pc.queue.iterate()))
- pc.update_status()
- self.assertEqual(0, len(pc.queue.iterate()))
- self.context.rietveld.check_calls(
- [ "set_flag(%d, 1, 'commit', False)" % issue,
- "add_comment(%d, %r)" % (issue, reviewer_lgtm.LgtmStatus.NO_LGTM)])
- self.context.status.check_names(['abort'])
-
- def testVerifyDefaultMockPlusLGTM(self):
- # Verify mock expectation with a single approval message.
- issue = 31337
- pc = self._get_pc_reviewer()
- self._approve()
- self.assertEqual(0, len(pc.queue.iterate()))
- pc.look_for_new_pending_commit()
- self.assertEqual(1, len(pc.queue.iterate()))
- pc.process_new_pending_commit()
- self.assertEqual(1, len(pc.queue.iterate()))
- pc.update_status()
- self.assertEqual(1, len(pc.queue.iterate()))
- pc.scan_results()
- self.assertEqual(0, len(pc.queue.iterate()))
- self.context.rietveld.check_calls(
- [ _try_comment(),
- 'close_issue(%d)' % issue,
- "update_description(%d, u'foo')" % issue,
- "add_comment(%d, 'Change committed as 125')" % issue])
- self.context.status.check_names(['initial', 'why not', 'commit'])
- self.context.checkout.check_calls(
- self._prepare_apply_commit(0, issue))
-
- def testDriveBy(self):
- issue = 31337
- pc = self._get_pc_reviewer()
- self._approve()
- pc.look_for_new_pending_commit()
- pc.process_new_pending_commit()
- pc.update_status()
- # A new reviewer prevents the commit.
- i = self.context.rietveld.issues[issue]
- i['reviewers'] = i['reviewers'] + ['annoying@dude.org']
- pc.scan_results()
- self.context.rietveld.check_calls(
- [ _try_comment(),
- "set_flag(%d, 1, 'commit', False)" % issue,
- ('add_comment(%d, "List of reviewers changed. annoying@dude.org '
- 'did a drive-by without LGTM\'ing!")') % issue])
- self.context.status.check_names(['initial', 'abort'])
-
- def testDriveByLGTM(self):
- issue = 31337
- pc = self._get_pc_reviewer()
- self._approve()
- pc.look_for_new_pending_commit()
- pc.process_new_pending_commit()
- pc.update_status()
- # He's nice, he left a LGTM.
- i = self.context.rietveld.issues[issue]
- i['reviewers'] = i['reviewers'] + ['nice@dude.org']
- self._approve('nice@dude.org')
- pc.scan_results()
- self.assertEqual(0, len(pc.queue.iterate()))
- self.context.rietveld.check_calls(
- [ _try_comment(),
- 'close_issue(%d)' % issue,
- "update_description(%d, u'foo')" % issue,
- "add_comment(%d, 'Change committed as 125')" % issue])
- self.context.status.check_names(['initial', 'why not', 'commit'])
- self.context.checkout.check_calls(
- self._prepare_apply_commit(0, issue))
-
- def testWhyNotUpdates(self):
- issue = 31337
- fake1_change = 1
- fake1 = fake.DeferredFakeVerifier(base.SUCCEEDED, fake1_change)
- fake1.name = 'fake1'
-
- fake2_change = 3
- fake2 = fake.DeferredFakeVerifier(base.SUCCEEDED, fake2_change)
- fake2.name = 'fake2'
- pc = self._get_pc([fake1, fake2], [])
- pc.look_for_new_pending_commit()
- pc.process_new_pending_commit()
-
- self.context.status.check_names(['initial', 'why not'])
-
- # Make sure the 'why not' is only pushed when it changes.
- for i in range(fake2_change * 2):
- expected = ['why not'] if fake1_change == i or fake2_change == i else []
- pc.update_status()
- self.context.status.check_names(expected)
-
- pc.scan_results()
-
- self.context.rietveld.check_calls(
- [_try_comment(),
- 'close_issue(%d)' % issue,
- "update_description(%d, u'foo')" % issue,
- "add_comment(%d, 'Change committed as 125')" % issue])
- self.context.status.check_names(['why not', 'commit'])
- self.context.checkout.check_calls(
- self._prepare_apply_commit(0, issue))
-
-
-if __name__ == '__main__':
- logging.basicConfig(
- level=[logging.ERROR, logging.WARNING, logging.INFO, logging.DEBUG][
- min(sys.argv.count('-v'), 3)],
- format='%(levelname)5s %(module)15s(%(lineno)3d): %(message)s')
- unittest.main()
« no previous file with comments | « commit-queue/tests/natsort_test.py ('k') | commit-queue/tests/presubmit_check_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698