Index: commit-queue/tests/pending_manager_test.py |
=================================================================== |
--- commit-queue/tests/pending_manager_test.py (revision 249146) |
+++ commit-queue/tests/pending_manager_test.py (working copy) |
@@ -1,732 +0,0 @@ |
-#!/usr/bin/env python |
-# Copyright (c) 2012 The Chromium Authors. All rights reserved. |
-# Use of this source code is governed by a BSD-style license that can be |
-# found in the LICENSE file. |
- |
-"""Unit tests for pending_manager.py.""" |
- |
-import logging |
-import os |
-import re |
-import sys |
-import time |
-import traceback |
-import unittest |
-import urllib2 |
- |
-ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) |
-sys.path.insert(0, os.path.join(ROOT_DIR, '..')) |
- |
-import find_depot_tools # pylint: disable=W0611 |
-import breakpad |
- |
-import context |
-import pending_manager |
-from verification import base |
-from verification import fake |
-from verification import project_base |
-from verification import reviewer_lgtm |
- |
-# In tests/ |
-import mocks |
- |
- |
-def read(filename): |
- f = open(filename, 'rb') |
- content = f.read() |
- f.close() |
- return content |
- |
- |
-def write(filename, content): |
- f = open(filename, 'wb') |
- f.write(content) |
- f.close() |
- |
- |
-def trim(x): |
- return x.replace(' ', '').replace('\n', '') |
- |
- |
-def _try_comment(issue=31337): |
- return ( |
- "add_comment(%d, u'%shttp://localhost/author@example.com/%d/1\\n')" % |
- (issue, pending_manager.PendingManager.TRYING_PATCH.replace('\n', '\\n'), |
- issue)) |
- |
- |
-class TestPendingManager(mocks.TestCase): |
- def setUp(self): |
- super(TestPendingManager, self).setUp() |
- self.root_dir = ROOT_DIR |
- |
- def testLoadSave(self): |
- pc = pending_manager.PendingManager( |
- context.Context(None, None, mocks.AsyncPushMock(self)), |
- [fake.FakeVerifier(base.SUCCEEDED)], |
- []) |
- filename = os.path.join(self.root_dir, 'foo.json') |
- empty = """{ |
- "__persistent_type__": "PendingQueue", |
- "pending_commits": {} |
-} |
-""" |
- write(filename, empty) |
- try: |
- pc.load(filename) |
- self.assertEqual(pc.queue.pending_commits, {}) |
- pc.save(filename) |
- self.assertEqual(trim(empty), trim(read(filename))) |
- finally: |
- os.remove(filename) |
- if os.path.exists(filename + '.old'): |
- os.remove(filename + '.old') |
- |
- def _get_pc(self, verifiers_no_patch, verifiers): |
- return pending_manager.PendingManager( |
- self.context, verifiers_no_patch, verifiers) |
- |
- def _check_standard_verification(self, pc, success, defered): |
- """Verifies the checkout and rietveld calls.""" |
- pc.scan_results() |
- self.assertEqual(len(pc.queue.iterate()), 0) |
- issue = 31337 |
- if pc.verifiers: |
- if success: |
- self.context.checkout.check_calls( |
- [ 'prepare(None)', |
- 'apply_patch(%r)' % (self.context.rietveld.patchsets[0],), |
- 'prepare(None)', # Will sync to HEAD/124. |
- 'apply_patch(%r)' % (self.context.rietveld.patchsets[1],), |
- ( |
- "commit(u'foo\\n\\n" |
- "Review URL: http://nowhere/%d', " |
- "u'author@example.com')") % issue]) |
- self.context.rietveld.check_calls( |
- [ _try_comment(), |
- 'close_issue(%d)' % issue, |
- "update_description(%d, u'foo')" % issue, |
- "add_comment(%d, 'Change committed as 125')" % issue]) |
- else: |
- self.context.checkout.check_calls( |
- [ 'prepare(None)', |
- 'apply_patch(%r)' % (self.context.rietveld.patchsets[0],)]) |
- self.context.rietveld.check_calls( |
- [ _try_comment(), |
- "set_flag(%d, 1, 'commit', False)" % issue, |
- "add_comment(%d, %r)" % (issue, pc.FAILED_NO_MESSAGE)]) |
- else: |
- if success: |
- self.context.checkout.check_calls( |
- self._prepare_apply_commit(0, issue)) |
- self.context.rietveld.check_calls( |
- [ _try_comment(), |
- 'close_issue(%d)' % issue, |
- "update_description(%d, u'foo')" % issue, |
- "add_comment(%d, 'Change committed as 125')" % issue]) |
- else: |
- # checkout is never touched in that case. |
- self.context.checkout.check_calls([]) |
- if defered: |
- self.context.rietveld.check_calls( |
- [ _try_comment(), |
- "set_flag(%d, 1, 'commit', False)" % issue, |
- "add_comment(%d, %r)" % (issue, pc.FAILED_NO_MESSAGE)]) |
- else: |
- self.context.rietveld.check_calls( |
- [ "set_flag(%d, 1, 'commit', False)" % issue, |
- "add_comment(%d, %r)" % (issue, pc.FAILED_NO_MESSAGE)]) |
- |
- def _prepare_apply_commit(self, index, issue, server_hooks_missing=False): |
- """Returns a frequent sequence of action happening on the Checkout object. |
- |
- The list returned by this function should be used as an argument to |
- self.context.checkout.check_calls(). |
- """ |
- seq = [ |
- # Reverts any previous modification or checkout the tree if it was not |
- # present. |
- 'prepare(None)', |
- # Applies the requested PatchSet. |
- 'apply_patch(%r)' % self.context.rietveld.patchsets[index], |
- ] |
- # Commits the patch. |
- author_and_reviewer = '' |
- if server_hooks_missing: |
- author_and_reviewer = ( |
- '\\n\\nR=rev@example.com\\n\\nAuthor: author@example.com') |
- commit_message = ( |
- "commit(u'foo%s\\n\\n" |
- "Review URL: http://nowhere/%d', " |
- "u'author@example.com')") % (author_and_reviewer, issue) |
- seq.append(commit_message) |
- |
- return seq |
- |
- def testNoVerification(self): |
- try: |
- self._get_pc([], []) |
- except ValueError: |
- pass |
- else: |
- self.fail(msg='A PendingManager must require at least one verifier.') |
- |
- try: |
- # Cannot have the same verifier two times. |
- self._get_pc( |
- [fake.FakeVerifier(base.SUCCEEDED)], |
- [fake.FakeVerifier(base.SUCCEEDED)]) |
- except AssertionError: |
- pass |
- else: |
- self.fail(msg='A PendingManager should not accept the same verifier' |
- ' two times.') |
- |
- def _check_1(self, pc, result): |
- issue = 31337 |
- # 'initial' won't be sent if the pre-patch verification fails, this is to |
- # not add noise for ignored CLs. |
- send_initial_packet = (result == base.SUCCEEDED or pc.verifiers) |
- self.assertEqual(len(pc.queue.iterate()), 0) |
- pc.look_for_new_pending_commit() |
- self.assertEqual(len(pc.queue.iterate()), 1) |
- commit = pc.queue.get(issue) |
- self.assertEqual(len(commit.verifications), 0) |
- pc.process_new_pending_commit() |
- if result == base.FAILED: |
- self.assertEqual([], pc.queue.iterate()) |
- else: |
- commit = pc.queue.get(issue) |
- self.assertEqual(commit.verifications['fake'].get_state(), result) |
- self.assertEqual(len(commit.verifications), 1) |
- pc.update_status() |
- if result == base.FAILED: |
- self.assertEqual([], pc.queue.iterate()) |
- else: |
- commit = pc.queue.get(issue) |
- self.assertEqual(commit.verifications['fake'].get_state(), result) |
- self.assertEqual('', commit.relpath) |
- self.assertEqual(len(commit.verifications), 1) |
- self._check_standard_verification(pc, result == base.SUCCEEDED, False) |
- |
- if result == base.SUCCEEDED: |
- self.context.status.check_names(['initial', 'why not', 'commit']) |
- elif send_initial_packet: |
- self.context.status.check_names(['initial', 'abort']) |
- else: |
- # Only happens when there is no verifier that requires a patch. |
- self.context.status.check_names(['abort']) |
- |
- def testNoPatchVerification(self): |
- pc = self._get_pc([fake.FakeVerifier(base.SUCCEEDED)], []) |
- self._check_1(pc, base.SUCCEEDED) |
- |
- def testPatchVerification(self): |
- pc = self._get_pc([], [fake.FakeVerifier(base.SUCCEEDED)]) |
- self._check_1(pc, base.SUCCEEDED) |
- |
- def testNoPatchVerificationFail(self): |
- pc = self._get_pc([fake.FakeVerifier(base.FAILED)], []) |
- self._check_1(pc, base.FAILED) |
- |
- def testPatchVerificationFail(self): |
- pc = self._get_pc([], [fake.FakeVerifier(base.FAILED)]) |
- self._check_1(pc, base.FAILED) |
- |
- def testPatchDiscardThrows(self): |
- # Handle HTTPError correctly. |
- result = [] |
- issue = 31337 |
- pc = self._get_pc([], [fake.FakeVerifier(base.FAILED)]) |
- |
- def set_flag_throw(_issue, _patchset, _flag, _value): |
- raise urllib2.HTTPError(None, None, None, None, None) |
- |
- def send_stack(*_args, **_kwargs): |
- result.append(True) |
- |
- self.mock(breakpad, 'SendStack', send_stack) |
- self.mock(traceback, 'print_stack', lambda: None) |
- self.mock(logging, 'error', lambda _: None) |
- pc.context.rietveld.set_flag = set_flag_throw |
- |
- self.assertEqual(len(pc.queue.iterate()), 0) |
- pc.look_for_new_pending_commit() |
- self.assertEqual(len(pc.queue.iterate()), 1) |
- commit = pc.queue.get(issue) |
- self.assertEqual(len(commit.verifications), 0) |
- pc.process_new_pending_commit() |
- self.assertEqual([], pc.queue.iterate()) |
- pc.update_status() |
- self.assertEqual([], pc.queue.iterate()) |
- self.context.checkout.check_calls( |
- [ 'prepare(None)', |
- 'apply_patch(%r)' % (self.context.rietveld.patchsets[0],), |
- ]) |
- self.context.rietveld.check_calls( |
- [ _try_comment(), |
- "add_comment(%d, %r)" % (issue, pc.FAILED_NO_MESSAGE), |
- ]) |
- self.context.status.check_names(['initial', 'abort']) |
- |
- def _check_defer_1(self, pc, result): |
- issue = 31337 |
- self.assertEqual(len(pc.queue.iterate()), 0) |
- pc.look_for_new_pending_commit() |
- self.assertEqual(len(pc.queue.iterate()), 1) |
- commit = pc.queue.get(issue) |
- self.assertEqual(len(commit.verifications), 0) |
- pc.process_new_pending_commit() |
- commit = pc.queue.get(issue) |
- self.assertEqual('', commit.relpath) |
- self.assertEqual(commit.verifications['fake'].get_state(), base.PROCESSING) |
- self.assertEqual(len(commit.verifications), 1) |
- pc.update_status() |
- commit = pc.queue.get(issue) |
- self.assertEqual('', commit.relpath) |
- self.assertEqual(commit.verifications['fake'].get_state(), result) |
- self.assertEqual(len(commit.verifications), 1) |
- self._check_standard_verification(pc, result == base.SUCCEEDED, True) |
- if result == base.SUCCEEDED: |
- self.context.status.check_names(['initial', 'why not', 'why not', |
- 'why not', 'commit']) |
- else: |
- self.context.status.check_names(['initial', 'why not', 'why not', |
- 'abort']) |
- |
- def testDeferNoPatchVerification(self): |
- pc = self._get_pc([fake.DeferredFakeVerifier(base.SUCCEEDED, 0)], []) |
- self._check_defer_1(pc, base.SUCCEEDED) |
- |
- def testDeferPatchVerification(self): |
- pc = self._get_pc([], [fake.DeferredFakeVerifier(base.SUCCEEDED, 0)]) |
- self._check_defer_1(pc, base.SUCCEEDED) |
- |
- def testDeferNoPatchVerificationFail(self): |
- pc = self._get_pc([fake.DeferredFakeVerifier(base.FAILED, 0)], []) |
- self._check_defer_1(pc, base.FAILED) |
- |
- def testDeferPatchVerificationFail(self): |
- pc = self._get_pc([], [fake.DeferredFakeVerifier(base.FAILED, 0)]) |
- self._check_defer_1(pc, base.FAILED) |
- |
- def _check_4(self, f1, f2, f3, f4): |
- issue = 31337 |
- fake1 = fake.FakeVerifier(f1) |
- fake1.name = 'fake1' |
- fake2 = fake.FakeVerifier(f2) |
- fake2.name = 'fake2' |
- fake3 = fake.FakeVerifier(f3) |
- fake3.name = 'fake3' |
- fake4 = fake.FakeVerifier(f4) |
- fake4.name = 'fake4' |
- nb = 1 |
- if f1 is base.SUCCEEDED: |
- nb = 2 |
- if f2 is base.SUCCEEDED: |
- nb = 3 |
- if f3 is base.SUCCEEDED: |
- nb = 4 |
- pc = self._get_pc([fake1, fake2], [fake3, fake4]) |
- self.assertEqual(len(pc.queue.iterate()), 0) |
- pc.look_for_new_pending_commit() |
- self.assertEqual(len(pc.queue.iterate()), 1) |
- commit = pc.queue.get(issue) |
- self.assertEqual(len(commit.verifications), 0) |
- pc.process_new_pending_commit() |
- if not all(f == base.SUCCEEDED for f in (f1, f2, f3, f4)): |
- self.assertEqual([], pc.queue.iterate()) |
- else: |
- commit = pc.queue.get(issue) |
- self.assertEqual(commit.verifications['fake1'].get_state(), f1) |
- self.assertEqual(commit.verifications['fake2'].get_state(), f2) |
- self.assertEqual(commit.verifications['fake3'].get_state(), f3) |
- self.assertEqual(commit.verifications['fake4'].get_state(), f4) |
- self.assertEqual(len(commit.verifications), nb) |
- pc.update_status() |
- if not all(f == base.SUCCEEDED for f in (f1, f2, f3, f4)): |
- self.assertEqual([], pc.queue.iterate()) |
- else: |
- commit = pc.queue.get(issue) |
- self.assertEqual(commit.verifications['fake1'].get_state(), f1) |
- self.assertEqual(commit.verifications['fake2'].get_state(), f2) |
- self.assertEqual(commit.verifications['fake3'].get_state(), f3) |
- self.assertEqual(commit.verifications['fake4'].get_state(), f4) |
- self.assertEqual(len(commit.verifications), nb) |
- self._check_standard_verification( |
- pc, all(x == base.SUCCEEDED for x in (f1, f2, f3, f4)), False) |
- if all(x == base.SUCCEEDED for x in (f1, f2, f3, f4)): |
- self.context.status.check_names(['initial', 'why not', 'commit']) |
- else: |
- self.context.status.check_names(['initial', 'abort']) |
- |
- def test4thVerificationFail(self): |
- self._check_4(base.SUCCEEDED, base.SUCCEEDED, base.SUCCEEDED, base.FAILED) |
- |
- def test4Verification(self): |
- self._check_4( |
- base.SUCCEEDED, base.SUCCEEDED, base.SUCCEEDED, base.SUCCEEDED) |
- |
- def test4Verification3rdFail(self): |
- self._check_4(base.SUCCEEDED, base.SUCCEEDED, base.FAILED, base.SUCCEEDED) |
- |
- def _check_defer_4(self, f1, f2, f3, f4): |
- issue = 31337 |
- fake1 = fake.DeferredFakeVerifier(f1, 0) |
- fake1.name = 'fake1' |
- fake2 = fake.DeferredFakeVerifier(f2, 0) |
- fake2.name = 'fake2' |
- fake3 = fake.DeferredFakeVerifier(f3, 0) |
- fake3.name = 'fake3' |
- fake4 = fake.DeferredFakeVerifier(f4, 0) |
- fake4.name = 'fake4' |
- pc = self._get_pc([fake1, fake2], [fake3, fake4]) |
- self.assertEqual(len(pc.queue.iterate()), 0) |
- pc.look_for_new_pending_commit() |
- self.assertEqual(len(pc.queue.iterate()), 1) |
- commit = pc.queue.get(issue) |
- self.assertEqual(len(commit.verifications), 0) |
- pc.process_new_pending_commit() |
- commit = pc.queue.get(issue) |
- self.assertEqual( |
- commit.verifications['fake1'].get_state(), base.PROCESSING) |
- self.assertEqual( |
- commit.verifications['fake2'].get_state(), base.PROCESSING) |
- self.assertEqual( |
- commit.verifications['fake3'].get_state(), base.PROCESSING) |
- self.assertEqual( |
- commit.verifications['fake4'].get_state(), base.PROCESSING) |
- self.assertEqual(len(commit.verifications), 4) |
- pc.update_status() |
- self.assertEqual(commit.verifications['fake1'].get_state(), f1) |
- self.assertEqual(commit.verifications['fake2'].get_state(), f2) |
- self.assertEqual(commit.verifications['fake3'].get_state(), f3) |
- self.assertEqual(commit.verifications['fake4'].get_state(), f4) |
- self.assertEqual('', commit.relpath) |
- self._check_standard_verification( |
- pc, all(x == base.SUCCEEDED for x in (f1, f2, f3, f4)), False) |
- if all(x == base.SUCCEEDED for x in (f1, f2, f3, f4)): |
- self.context.status.check_names(['initial', 'why not', 'why not', |
- 'why not', 'commit']) |
- else: |
- self.context.status.check_names(['initial', 'why not', 'why not', |
- 'abort']) |
- |
- def testDefer4thVerificationFail(self): |
- self._check_defer_4( |
- base.SUCCEEDED, base.SUCCEEDED, base.SUCCEEDED, base.FAILED) |
- |
- def testDefer4Verification(self): |
- self._check_defer_4( |
- base.SUCCEEDED, base.SUCCEEDED, base.SUCCEEDED, base.SUCCEEDED) |
- |
- def testDefer4Verification3rdFail(self): |
- self._check_defer_4( |
- base.SUCCEEDED, base.SUCCEEDED, base.FAILED, base.SUCCEEDED) |
- |
- def testRelPath(self): |
- issue = 31337 |
- verifiers = [ |
- project_base.ProjectBaseUrlVerifier( |
- [r'^%s(.*)$' % re.escape(r'http://example.com/')]), |
- ] |
- pc = self._get_pc([], verifiers) |
- pc.context.rietveld.issues[issue]['base_url'] = 'http://example.com/sub/dir' |
- pc.look_for_new_pending_commit() |
- self.assertEqual(1, len(pc.queue.iterate())) |
- pc.process_new_pending_commit() |
- self.assertEqual('sub/dir', pc.queue.get(issue).relpath) |
- self.context.checkout.check_calls( |
- [ 'prepare(None)', |
- 'apply_patch(%r)' % (self.context.rietveld.patchsets[0],)]) |
- pc.update_status() |
- self.context.checkout.check_calls([]) |
- pc.scan_results() |
- self.context.checkout.check_calls( |
- # Will sync to HEAD, 124. |
- self._prepare_apply_commit(1, issue)) |
- self.context.rietveld.check_calls( |
- [ _try_comment(), |
- 'close_issue(%d)' % issue, |
- "update_description(%d, u'foo')" % issue, |
- "add_comment(%d, 'Change committed as 125')" % issue]) |
- self.context.status.check_names(['initial', 'why not', 'commit']) |
- |
- def testCommitBurst(self): |
- issue = 31337 |
- pc = self._get_pc([fake.FakeVerifier(base.SUCCEEDED)], []) |
- self.assertEqual(4, pc.MAX_COMMIT_BURST) |
- timestamp = [1] |
- self.mock(time, 'time', lambda: timestamp[-1]) |
- for i in range(pc.MAX_COMMIT_BURST + 2): |
- self.context.rietveld.issues[i] = ( |
- self.context.rietveld.issues[issue].copy()) |
- self.context.rietveld.issues[i]['issue'] = i |
- pc.look_for_new_pending_commit() |
- self.assertEqual(len(pc.queue.iterate()), pc.MAX_COMMIT_BURST + 3) |
- pc.process_new_pending_commit() |
- pc.update_status() |
- pc.scan_results() |
- self.context.checkout.check_calls( |
- self._prepare_apply_commit(0, 0) + |
- self._prepare_apply_commit(1, 1) + |
- self._prepare_apply_commit(2, 2) + |
- self._prepare_apply_commit(3, 3)) |
- self.context.rietveld.check_calls( |
- [ _try_comment(0), |
- _try_comment(1), |
- _try_comment(2), |
- _try_comment(3), |
- _try_comment(4), |
- _try_comment(5), |
- _try_comment(), |
- 'close_issue(0)', |
- "update_description(0, u'foo')", |
- "add_comment(0, 'Change committed as 125')", |
- 'close_issue(1)', |
- "update_description(1, u'foo')", |
- "add_comment(1, 'Change committed as 125')", |
- 'close_issue(2)', |
- "update_description(2, u'foo')", |
- "add_comment(2, 'Change committed as 125')", |
- 'close_issue(3)', |
- "update_description(3, u'foo')", |
- "add_comment(3, 'Change committed as 125')", |
- ]) |
- self.assertEqual(3, len(pc.queue.iterate())) |
- total = pc.MAX_COMMIT_BURST + 3 |
- self.context.status.check_names(['initial'] * total + |
- (['why not', 'commit'] * |
- pc.MAX_COMMIT_BURST) + |
- ['why not'] * 3) |
- |
- # Dry run. |
- pc.scan_results() |
- self.context.checkout.check_calls([]) |
- self.context.rietveld.check_calls([]) |
- self.context.status.check_names(['why not'] * 3) |
- |
- # Remove one item from the burst. |
- pc.recent_commit_timestamps.pop() |
- pc.scan_results() |
- next_item = pc.MAX_COMMIT_BURST |
- self.context.checkout.check_calls( |
- self._prepare_apply_commit(next_item, next_item)) |
- self.context.rietveld.check_calls( |
- [ 'close_issue(%d)' % next_item, |
- "update_description(%d, u'foo')" % next_item, |
- "add_comment(%d, 'Change committed as 125')" % next_item, |
- ]) |
- self.context.status.check_names(['why not', 'commit'] + ['why not'] * 2) |
- # After a delay, must flush the queue. |
- timestamp.append(timestamp[-1] + pc.COMMIT_BURST_DELAY + 1) |
- pc.scan_results() |
- self.context.checkout.check_calls( |
- self._prepare_apply_commit(next_item + 1, next_item + 1) + |
- self._prepare_apply_commit(next_item + 2, issue)) |
- self.context.rietveld.check_calls( |
- [ 'close_issue(%d)' % (next_item + 1), |
- "update_description(%d, u'foo')" % (next_item + 1), |
- "add_comment(%d, 'Change committed as 125')" % (next_item + 1), |
- 'close_issue(%d)' % issue, |
- "update_description(%d, u'foo')" % issue, |
- "add_comment(%d, 'Change committed as 125')" % issue]) |
- self.context.status.check_names(['why not', 'commit'] * 2) |
- |
- def testIgnored(self): |
- issue = 31337 |
- verifiers = [ |
- project_base.ProjectBaseUrlVerifier( |
- [r'^%s(.*)$' % re.escape(r'http://example.com/')]), |
- ] |
- pc = self._get_pc(verifiers, []) |
- pc.context.rietveld.issues[issue]['base_url'] = 'http://unrelated.com/sub' |
- pc.look_for_new_pending_commit() |
- pc.process_new_pending_commit() |
- pc.update_status() |
- pc.scan_results() |
- self.assertEqual(1, len(pc.queue.iterate())) |
- self.assertEqual('', pc.queue.get(issue).relpath) |
- self.assertEqual(base.IGNORED, pc.queue.get(issue).get_state()) |
- |
- def testServerHooksMissing(self): |
- issue = 31337 |
- verifiers = [ |
- project_base.ProjectBaseUrlVerifier( |
- [r'^%s(.*)$' % re.escape(r'http://example.com/')]), |
- ] |
- self.context.server_hooks_missing = True |
- pc = self._get_pc(verifiers, []) |
- pc.context.rietveld.issues[issue]['base_url'] = 'http://example.com/' |
- pc.look_for_new_pending_commit() |
- pc.process_new_pending_commit() |
- pc.update_status() |
- pc.scan_results() |
- self.context.rietveld.check_calls( |
- [ _try_comment(), |
- 'close_issue(%d)' % issue, |
- "update_description(%d, u'foo')" % issue, |
- "add_comment(%d, 'Change committed as 125')" % issue]) |
- self.context.status.check_names(['initial', 'why not', 'commit']) |
- self.context.checkout.check_calls( |
- self._prepare_apply_commit(0, issue, server_hooks_missing=True)) |
- |
- def testDisapeared(self): |
- issue = 31337 |
- verifiers = [ |
- project_base.ProjectBaseUrlVerifier( |
- [r'^%s(.*)$' % re.escape(r'http://example.com/')]), |
- ] |
- pc = self._get_pc(verifiers, []) |
- pc.context.rietveld.issues[issue]['base_url'] = 'http://unrelated.com/sub' |
- pc.look_for_new_pending_commit() |
- pc.process_new_pending_commit() |
- pc.update_status() |
- pc.scan_results() |
- self.assertEqual(1, len(pc.queue.iterate())) |
- del pc.context.rietveld.issues[issue] |
- pc.look_for_new_pending_commit() |
- pc.process_new_pending_commit() |
- pc.update_status() |
- pc.scan_results() |
- self.assertEqual(0, len(pc.queue.iterate())) |
- self.context.status.check_names(['abort']) |
- |
- def _get_pc_reviewer(self): |
- verifiers = [ |
- reviewer_lgtm.ReviewerLgtmVerifier( |
- ['.*'], [re.escape('commit-bot@example.com')]) |
- ] |
- pc = self._get_pc(verifiers, []) |
- return pc |
- |
- def _approve(self, sender=None): |
- issue = 31337 |
- if not sender: |
- sender = self.context.rietveld.issues[issue]['reviewers'][0] |
- self.context.rietveld.issues[issue]['messages'].append( |
- { |
- 'approval': True, |
- 'sender': sender, |
- }) |
- |
- def testVerifyDefaultMock(self): |
- # Verify mock expectation for the default settings. |
- issue = 31337 |
- pc = self._get_pc_reviewer() |
- self.assertEqual(0, len(pc.queue.iterate())) |
- pc.look_for_new_pending_commit() |
- self.assertEqual(1, len(pc.queue.iterate())) |
- # Pop the LGTM. |
- pc.queue.iterate()[0].messages.pop(1) |
- pc.process_new_pending_commit() |
- self.assertEqual(0, len(pc.queue.iterate())) |
- pc.update_status() |
- self.assertEqual(0, len(pc.queue.iterate())) |
- self.context.rietveld.check_calls( |
- [ "set_flag(%d, 1, 'commit', False)" % issue, |
- "add_comment(%d, %r)" % (issue, reviewer_lgtm.LgtmStatus.NO_LGTM)]) |
- self.context.status.check_names(['abort']) |
- |
- def testVerifyDefaultMockPlusLGTM(self): |
- # Verify mock expectation with a single approval message. |
- issue = 31337 |
- pc = self._get_pc_reviewer() |
- self._approve() |
- self.assertEqual(0, len(pc.queue.iterate())) |
- pc.look_for_new_pending_commit() |
- self.assertEqual(1, len(pc.queue.iterate())) |
- pc.process_new_pending_commit() |
- self.assertEqual(1, len(pc.queue.iterate())) |
- pc.update_status() |
- self.assertEqual(1, len(pc.queue.iterate())) |
- pc.scan_results() |
- self.assertEqual(0, len(pc.queue.iterate())) |
- self.context.rietveld.check_calls( |
- [ _try_comment(), |
- 'close_issue(%d)' % issue, |
- "update_description(%d, u'foo')" % issue, |
- "add_comment(%d, 'Change committed as 125')" % issue]) |
- self.context.status.check_names(['initial', 'why not', 'commit']) |
- self.context.checkout.check_calls( |
- self._prepare_apply_commit(0, issue)) |
- |
- def testDriveBy(self): |
- issue = 31337 |
- pc = self._get_pc_reviewer() |
- self._approve() |
- pc.look_for_new_pending_commit() |
- pc.process_new_pending_commit() |
- pc.update_status() |
- # A new reviewer prevents the commit. |
- i = self.context.rietveld.issues[issue] |
- i['reviewers'] = i['reviewers'] + ['annoying@dude.org'] |
- pc.scan_results() |
- self.context.rietveld.check_calls( |
- [ _try_comment(), |
- "set_flag(%d, 1, 'commit', False)" % issue, |
- ('add_comment(%d, "List of reviewers changed. annoying@dude.org ' |
- 'did a drive-by without LGTM\'ing!")') % issue]) |
- self.context.status.check_names(['initial', 'abort']) |
- |
- def testDriveByLGTM(self): |
- issue = 31337 |
- pc = self._get_pc_reviewer() |
- self._approve() |
- pc.look_for_new_pending_commit() |
- pc.process_new_pending_commit() |
- pc.update_status() |
- # He's nice, he left a LGTM. |
- i = self.context.rietveld.issues[issue] |
- i['reviewers'] = i['reviewers'] + ['nice@dude.org'] |
- self._approve('nice@dude.org') |
- pc.scan_results() |
- self.assertEqual(0, len(pc.queue.iterate())) |
- self.context.rietveld.check_calls( |
- [ _try_comment(), |
- 'close_issue(%d)' % issue, |
- "update_description(%d, u'foo')" % issue, |
- "add_comment(%d, 'Change committed as 125')" % issue]) |
- self.context.status.check_names(['initial', 'why not', 'commit']) |
- self.context.checkout.check_calls( |
- self._prepare_apply_commit(0, issue)) |
- |
- def testWhyNotUpdates(self): |
- issue = 31337 |
- fake1_change = 1 |
- fake1 = fake.DeferredFakeVerifier(base.SUCCEEDED, fake1_change) |
- fake1.name = 'fake1' |
- |
- fake2_change = 3 |
- fake2 = fake.DeferredFakeVerifier(base.SUCCEEDED, fake2_change) |
- fake2.name = 'fake2' |
- pc = self._get_pc([fake1, fake2], []) |
- pc.look_for_new_pending_commit() |
- pc.process_new_pending_commit() |
- |
- self.context.status.check_names(['initial', 'why not']) |
- |
- # Make sure the 'why not' is only pushed when it changes. |
- for i in range(fake2_change * 2): |
- expected = ['why not'] if fake1_change == i or fake2_change == i else [] |
- pc.update_status() |
- self.context.status.check_names(expected) |
- |
- pc.scan_results() |
- |
- self.context.rietveld.check_calls( |
- [_try_comment(), |
- 'close_issue(%d)' % issue, |
- "update_description(%d, u'foo')" % issue, |
- "add_comment(%d, 'Change committed as 125')" % issue]) |
- self.context.status.check_names(['why not', 'commit']) |
- self.context.checkout.check_calls( |
- self._prepare_apply_commit(0, issue)) |
- |
- |
-if __name__ == '__main__': |
- logging.basicConfig( |
- level=[logging.ERROR, logging.WARNING, logging.INFO, logging.DEBUG][ |
- min(sys.argv.count('-v'), 3)], |
- format='%(levelname)5s %(module)15s(%(lineno)3d): %(message)s') |
- unittest.main() |