Index: appengine/monorail/tracker/test/tracker_helpers_test.py |
diff --git a/appengine/monorail/tracker/test/tracker_helpers_test.py b/appengine/monorail/tracker/test/tracker_helpers_test.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..012bd5ed03ab9a9b29c486f4674a33eeb6d572bd |
--- /dev/null |
+++ b/appengine/monorail/tracker/test/tracker_helpers_test.py |
@@ -0,0 +1,1064 @@ |
+# Copyright 2016 The Chromium Authors. All rights reserved. |
+# Use of this source code is govered by a BSD-style |
+# license that can be found in the LICENSE file or at |
+# https://developers.google.com/open-source/licenses/bsd |
+ |
+"""Unittest for the tracker helpers module.""" |
+ |
+import unittest |
+ |
+import settings |
+ |
+from framework import framework_constants |
+from framework import framework_helpers |
+from framework import permissions |
+from framework import template_helpers |
+from framework import urls |
+from proto import project_pb2 |
+from proto import tracker_pb2 |
+from proto import user_pb2 |
+from services import service_manager |
+from testing import fake |
+from testing import testing_helpers |
+from tracker import tracker_bizobj |
+from tracker import tracker_constants |
+from tracker import tracker_helpers |
+ |
+TEST_ID_MAP = { |
+ 'a@example.com': 1, |
+ 'b@example.com': 2, |
+ 'c@example.com': 3, |
+ 'd@example.com': 4, |
+ } |
+ |
+ |
+def _Issue(project_name, local_id, summary, status): |
+ issue = tracker_pb2.Issue() |
+ issue.project_name = project_name |
+ issue.project_id = 789 |
+ issue.local_id = local_id |
+ issue.issue_id = 100000 + local_id |
+ issue.summary = summary |
+ issue.status = status |
+ return issue |
+ |
+ |
+def _MakeConfig(): |
+ config = tracker_pb2.ProjectIssueConfig() |
+ config.well_known_statuses.append(tracker_pb2.StatusDef( |
+ means_open=True, status='New', deprecated=False)) |
+ config.well_known_statuses.append(tracker_pb2.StatusDef( |
+ status='Old', means_open=False, deprecated=False)) |
+ config.well_known_statuses.append(tracker_pb2.StatusDef( |
+ status='StatusThatWeDontUseAnymore', means_open=False, deprecated=True)) |
+ |
+ return config |
+ |
+ |
+class HelpersTest(unittest.TestCase): |
+ |
+ def setUp(self): |
+ self.services = service_manager.Services( |
+ project=fake.ProjectService(), |
+ config=fake.ConfigService(), |
+ issue=fake.IssueService(), |
+ user=fake.UserService(), |
+ usergroup=fake.UserGroupService()) |
+ |
+ for email, user_id in TEST_ID_MAP.iteritems(): |
+ self.services.user.TestAddUser(email, user_id) |
+ |
+ self.services.project.TestAddProject('testproj', project_id=789) |
+ self.issue1 = fake.MakeTestIssue(789, 1, 'one', 'New', 111L) |
+ self.issue1.project_name = 'testproj' |
+ self.services.issue.TestAddIssue(self.issue1) |
+ self.issue2 = fake.MakeTestIssue(789, 2, 'two', 'New', 111L) |
+ self.issue2.project_name = 'testproj' |
+ self.services.issue.TestAddIssue(self.issue2) |
+ self.issue3 = fake.MakeTestIssue(789, 3, 'three', 'New', 111L) |
+ self.issue3.project_name = 'testproj' |
+ self.services.issue.TestAddIssue(self.issue3) |
+ self.cnxn = 'fake connextion' |
+ self.errors = template_helpers.EZTError() |
+ self.default_colspec_param = 'colspec=%s' % ( |
+ tracker_constants.DEFAULT_COL_SPEC.replace(' ', '%20')) |
+ self.services.usergroup.TestAddGroupSettings(999L, 'group@example.com') |
+ |
+ def testParseIssueRequest_Empty(self): |
+ post_data = fake.PostData() |
+ errors = template_helpers.EZTError() |
+ parsed = tracker_helpers.ParseIssueRequest( |
+ 'fake cnxn', post_data, self.services, errors, 'proj') |
+ self.assertEqual('', parsed.summary) |
+ self.assertEqual('', parsed.comment) |
+ self.assertEqual('', parsed.status) |
+ self.assertEqual('', parsed.users.owner_username) |
+ self.assertEqual(0, parsed.users.owner_id) |
+ self.assertEqual([], parsed.users.cc_usernames) |
+ self.assertEqual([], parsed.users.cc_usernames_remove) |
+ self.assertEqual([], parsed.users.cc_ids) |
+ self.assertEqual([], parsed.users.cc_ids_remove) |
+ self.assertEqual('', parsed.template_name) |
+ self.assertEqual([], parsed.labels) |
+ self.assertEqual([], parsed.labels_remove) |
+ self.assertEqual({}, parsed.fields.vals) |
+ self.assertEqual({}, parsed.fields.vals_remove) |
+ self.assertEqual([], parsed.fields.fields_clear) |
+ self.assertEqual('', parsed.blocked_on.entered_str) |
+ self.assertEqual([], parsed.blocked_on.iids) |
+ |
+ def testParseIssueRequest_Normal(self): |
+ post_data = fake.PostData({ |
+ 'summary': ['some summary'], |
+ 'comment': ['some comment'], |
+ 'status': ['SomeStatus'], |
+ 'template_name': ['some template'], |
+ 'label': ['lab1', '-lab2'], |
+ 'custom_123': ['field1123a', 'field1123b'], |
+ }) |
+ errors = template_helpers.EZTError() |
+ parsed = tracker_helpers.ParseIssueRequest( |
+ 'fake cnxn', post_data, self.services, errors, 'proj') |
+ self.assertEqual('some summary', parsed.summary) |
+ self.assertEqual('some comment', parsed.comment) |
+ self.assertEqual('SomeStatus', parsed.status) |
+ self.assertEqual('', parsed.users.owner_username) |
+ self.assertEqual(0, parsed.users.owner_id) |
+ self.assertEqual([], parsed.users.cc_usernames) |
+ self.assertEqual([], parsed.users.cc_usernames_remove) |
+ self.assertEqual([], parsed.users.cc_ids) |
+ self.assertEqual([], parsed.users.cc_ids_remove) |
+ self.assertEqual('some template', parsed.template_name) |
+ self.assertEqual(['lab1'], parsed.labels) |
+ self.assertEqual(['lab2'], parsed.labels_remove) |
+ self.assertEqual({123: ['field1123a', 'field1123b']}, parsed.fields.vals) |
+ self.assertEqual({}, parsed.fields.vals_remove) |
+ self.assertEqual([], parsed.fields.fields_clear) |
+ |
+ def testParseBlockers_BlockedOnNothing(self): |
+ """Was blocked on nothing, still nothing.""" |
+ post_data = {tracker_helpers.BLOCKED_ON: ''} |
+ parsed_blockers = tracker_helpers._ParseBlockers( |
+ self.cnxn, post_data, self.services, self.errors, 'testproj', |
+ tracker_helpers.BLOCKED_ON) |
+ |
+ self.assertEqual('', parsed_blockers.entered_str) |
+ self.assertEqual([], parsed_blockers.iids) |
+ self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKED_ON)) |
+ self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKING)) |
+ |
+ def testParseBlockers_BlockedOnAdded(self): |
+ """Was blocked on nothing; now 1, 2, 3.""" |
+ post_data = {tracker_helpers.BLOCKED_ON: '1, 2, 3'} |
+ parsed_blockers = tracker_helpers._ParseBlockers( |
+ self.cnxn, post_data, self.services, self.errors, 'testproj', |
+ tracker_helpers.BLOCKED_ON) |
+ |
+ self.assertEqual('1, 2, 3', parsed_blockers.entered_str) |
+ self.assertEqual([100001, 100002, 100003], parsed_blockers.iids) |
+ self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKED_ON)) |
+ self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKING)) |
+ |
+ def testParseBlockers_BlockedOnDuplicateRef(self): |
+ """Was blocked on nothing; now just 2, but repeated in input.""" |
+ post_data = {tracker_helpers.BLOCKED_ON: '2, 2, 2'} |
+ parsed_blockers = tracker_helpers._ParseBlockers( |
+ self.cnxn, post_data, self.services, self.errors, 'testproj', |
+ tracker_helpers.BLOCKED_ON) |
+ |
+ self.assertEqual('2, 2, 2', parsed_blockers.entered_str) |
+ self.assertEqual([100002], parsed_blockers.iids) |
+ self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKED_ON)) |
+ self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKING)) |
+ |
+ def testParseBlockers_Missing(self): |
+ """Parsing an input field that was not in the POST.""" |
+ post_data = {} |
+ parsed_blockers = tracker_helpers._ParseBlockers( |
+ self.cnxn, post_data, self.services, self.errors, 'testproj', |
+ tracker_helpers.BLOCKED_ON) |
+ |
+ self.assertEqual('', parsed_blockers.entered_str) |
+ self.assertEqual([], parsed_blockers.iids) |
+ self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKED_ON)) |
+ self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKING)) |
+ |
+ def testParseBlockers_SameIssueNoProject(self): |
+ """Adding same issue as blocker should modify the errors object.""" |
+ post_data = {'id': '2', tracker_helpers.BLOCKING: '2, 3'} |
+ |
+ parsed_blockers = tracker_helpers._ParseBlockers( |
+ self.cnxn, post_data, self.services, self.errors, 'testproj', |
+ tracker_helpers.BLOCKING) |
+ self.assertEqual('2, 3', parsed_blockers.entered_str) |
+ self.assertEqual([], parsed_blockers.iids) |
+ self.assertEqual( |
+ getattr(self.errors, tracker_helpers.BLOCKING), |
+ 'Cannot be blocking the same issue') |
+ self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKED_ON)) |
+ |
+ def testParseBlockers_SameIssueSameProject(self): |
+ """Adding same issue as blocker should modify the errors object.""" |
+ post_data = {'id': '2', tracker_helpers.BLOCKING: 'testproj:2, 3'} |
+ |
+ parsed_blockers = tracker_helpers._ParseBlockers( |
+ self.cnxn, post_data, self.services, self.errors, 'testproj', |
+ tracker_helpers.BLOCKING) |
+ self.assertEqual('testproj:2, 3', parsed_blockers.entered_str) |
+ self.assertEqual([], parsed_blockers.iids) |
+ self.assertEqual( |
+ getattr(self.errors, tracker_helpers.BLOCKING), |
+ 'Cannot be blocking the same issue') |
+ self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKED_ON)) |
+ |
+ def testParseBlockers_SameIssueDifferentProject(self): |
+ """Adding different blocker issue should not modify the errors object.""" |
+ post_data = {'id': '2', tracker_helpers.BLOCKING: 'testproj:2'} |
+ |
+ parsed_blockers = tracker_helpers._ParseBlockers( |
+ self.cnxn, post_data, self.services, self.errors, 'testprojB', |
+ tracker_helpers.BLOCKING) |
+ self.assertEqual('testproj:2', parsed_blockers.entered_str) |
+ self.assertEqual([100002], parsed_blockers.iids) |
+ self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKING)) |
+ self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKED_ON)) |
+ |
+ def testParseBlockers_Invalid(self): |
+ """Input fields with invalid values should modify the errors object.""" |
+ post_data = {tracker_helpers.BLOCKING: '2, foo', |
+ tracker_helpers.BLOCKED_ON: '3, bar'} |
+ |
+ parsed_blockers = tracker_helpers._ParseBlockers( |
+ self.cnxn, post_data, self.services, self.errors, 'testproj', |
+ tracker_helpers.BLOCKING) |
+ self.assertEqual('2, foo', parsed_blockers.entered_str) |
+ self.assertEqual([100002], parsed_blockers.iids) |
+ self.assertEqual( |
+ getattr(self.errors, tracker_helpers.BLOCKING), 'Invalid issue ID foo') |
+ self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKED_ON)) |
+ |
+ parsed_blockers = tracker_helpers._ParseBlockers( |
+ self.cnxn, post_data, self.services, self.errors, 'testproj', |
+ tracker_helpers.BLOCKED_ON) |
+ self.assertEqual('3, bar', parsed_blockers.entered_str) |
+ self.assertEqual([100003], parsed_blockers.iids) |
+ self.assertEqual( |
+ getattr(self.errors, tracker_helpers.BLOCKED_ON), |
+ 'Invalid issue ID bar') |
+ |
+ def testParseBlockers_Dangling(self): |
+ """A ref to a sanctioned projected should be allowed.""" |
+ post_data = {'id': '2', tracker_helpers.BLOCKING: 'otherproj:2'} |
+ real_codesite_projects = settings.recognized_codesite_projects |
+ settings.recognized_codesite_projects = ['otherproj'] |
+ parsed_blockers = tracker_helpers._ParseBlockers( |
+ self.cnxn, post_data, self.services, self.errors, 'testproj', |
+ tracker_helpers.BLOCKING) |
+ self.assertEqual('otherproj:2', parsed_blockers.entered_str) |
+ self.assertEqual([('otherproj', 2)], parsed_blockers.dangling_refs) |
+ settings.recognized_codesite_projects = real_codesite_projects |
+ |
+ def testMeansOpenInProject(self): |
+ config = _MakeConfig() |
+ |
+ # ensure open means open |
+ self.assertTrue(tracker_helpers.MeansOpenInProject('New', config)) |
+ self.assertTrue(tracker_helpers.MeansOpenInProject('new', config)) |
+ |
+ # ensure an unrecognized status means open |
+ self.assertTrue(tracker_helpers.MeansOpenInProject( |
+ '_undefined_status_', config)) |
+ |
+ # ensure closed means closed |
+ self.assertFalse(tracker_helpers.MeansOpenInProject('Old', config)) |
+ self.assertFalse(tracker_helpers.MeansOpenInProject('old', config)) |
+ self.assertFalse(tracker_helpers.MeansOpenInProject( |
+ 'StatusThatWeDontUseAnymore', config)) |
+ |
+ def testIsNoisy(self): |
+ self.assertTrue(tracker_helpers.IsNoisy(778, 320)) |
+ self.assertFalse(tracker_helpers.IsNoisy(20, 500)) |
+ self.assertFalse(tracker_helpers.IsNoisy(500, 20)) |
+ self.assertFalse(tracker_helpers.IsNoisy(1, 1)) |
+ |
+ def testClassifyPlusMinusItems(self): |
+ add, remove = tracker_helpers._ClassifyPlusMinusItems([]) |
+ self.assertEquals([], add) |
+ self.assertEquals([], remove) |
+ |
+ add, remove = tracker_helpers._ClassifyPlusMinusItems( |
+ ['', ' ', ' \t', '-']) |
+ self.assertItemsEqual([], add) |
+ self.assertItemsEqual([], remove) |
+ |
+ add, remove = tracker_helpers._ClassifyPlusMinusItems( |
+ ['a', 'b', 'c']) |
+ self.assertItemsEqual(['a', 'b', 'c'], add) |
+ self.assertItemsEqual([], remove) |
+ |
+ add, remove = tracker_helpers._ClassifyPlusMinusItems( |
+ ['a-a-a', 'b-b', 'c-']) |
+ self.assertItemsEqual(['a-a-a', 'b-b', 'c-'], add) |
+ self.assertItemsEqual([], remove) |
+ |
+ add, remove = tracker_helpers._ClassifyPlusMinusItems( |
+ ['-a']) |
+ self.assertItemsEqual([], add) |
+ self.assertItemsEqual(['a'], remove) |
+ |
+ add, remove = tracker_helpers._ClassifyPlusMinusItems( |
+ ['-a', 'b', 'c-c']) |
+ self.assertItemsEqual(['b', 'c-c'], add) |
+ self.assertItemsEqual(['a'], remove) |
+ |
+ add, remove = tracker_helpers._ClassifyPlusMinusItems( |
+ ['-a', '-b-b', '-c-']) |
+ self.assertItemsEqual([], add) |
+ self.assertItemsEqual(['a', 'b-b', 'c-'], remove) |
+ |
+ # We dedup, but we don't cancel out items that are both added and removed. |
+ add, remove = tracker_helpers._ClassifyPlusMinusItems( |
+ ['a', 'a', '-a']) |
+ self.assertItemsEqual(['a'], add) |
+ self.assertItemsEqual(['a'], remove) |
+ |
+ def testParseIssueRequestAttachments(self): |
+ file1 = testing_helpers.Blank( |
+ filename='hello.c', |
+ value='hello world') |
+ |
+ file2 = testing_helpers.Blank( |
+ filename='README', |
+ value='Welcome to our project') |
+ |
+ file3 = testing_helpers.Blank( |
+ filename='c:\\dir\\subdir\\FILENAME.EXT', |
+ value='Abort, Retry, or Fail?') |
+ |
+ # Browsers send this if FILE field was not filled in. |
+ file4 = testing_helpers.Blank( |
+ filename='', |
+ value='') |
+ |
+ attachments = tracker_helpers._ParseIssueRequestAttachments({}) |
+ self.assertEquals([], attachments) |
+ |
+ attachments = tracker_helpers._ParseIssueRequestAttachments(fake.PostData({ |
+ 'file1': [file1], |
+ })) |
+ self.assertEquals( |
+ [('hello.c', 'hello world', 'text/plain')], |
+ attachments) |
+ |
+ attachments = tracker_helpers._ParseIssueRequestAttachments(fake.PostData({ |
+ 'file1': [file1], |
+ 'file2': [file2], |
+ })) |
+ self.assertEquals( |
+ [('hello.c', 'hello world', 'text/plain'), |
+ ('README', 'Welcome to our project', 'text/plain')], |
+ attachments) |
+ |
+ attachments = tracker_helpers._ParseIssueRequestAttachments(fake.PostData({ |
+ 'file3': [file3], |
+ })) |
+ self.assertEquals( |
+ [('FILENAME.EXT', 'Abort, Retry, or Fail?', |
+ 'application/octet-stream')], |
+ attachments) |
+ |
+ attachments = tracker_helpers._ParseIssueRequestAttachments(fake.PostData({ |
+ 'file1': [file4], # Does not appear in result |
+ 'file3': [file3], |
+ 'file4': [file4], # Does not appear in result |
+ })) |
+ self.assertEquals( |
+ [('FILENAME.EXT', 'Abort, Retry, or Fail?', |
+ 'application/octet-stream')], |
+ attachments) |
+ |
+ def testParseIssueRequestUsers(self): |
+ post_data = {} |
+ parsed_users = tracker_helpers._ParseIssueRequestUsers( |
+ 'fake connection', post_data, self.services) |
+ self.assertEquals('', parsed_users.owner_username) |
+ self.assertEquals( |
+ framework_constants.NO_USER_SPECIFIED, parsed_users.owner_id) |
+ self.assertEquals([], parsed_users.cc_usernames) |
+ self.assertEquals([], parsed_users.cc_usernames_remove) |
+ self.assertEquals([], parsed_users.cc_ids) |
+ self.assertEquals([], parsed_users.cc_ids_remove) |
+ |
+ post_data = fake.PostData({ |
+ 'owner': [''], |
+ }) |
+ parsed_users = tracker_helpers._ParseIssueRequestUsers( |
+ 'fake connection', post_data, self.services) |
+ self.assertEquals('', parsed_users.owner_username) |
+ self.assertEquals( |
+ framework_constants.NO_USER_SPECIFIED, parsed_users.owner_id) |
+ self.assertEquals([], parsed_users.cc_usernames) |
+ self.assertEquals([], parsed_users.cc_usernames_remove) |
+ self.assertEquals([], parsed_users.cc_ids) |
+ self.assertEquals([], parsed_users.cc_ids_remove) |
+ |
+ post_data = fake.PostData({ |
+ 'owner': [' \t'], |
+ }) |
+ parsed_users = tracker_helpers._ParseIssueRequestUsers( |
+ 'fake connection', post_data, self.services) |
+ self.assertEquals('', parsed_users.owner_username) |
+ self.assertEquals( |
+ framework_constants.NO_USER_SPECIFIED, parsed_users.owner_id) |
+ self.assertEquals([], parsed_users.cc_usernames) |
+ self.assertEquals([], parsed_users.cc_usernames_remove) |
+ self.assertEquals([], parsed_users.cc_ids) |
+ self.assertEquals([], parsed_users.cc_ids_remove) |
+ |
+ post_data = fake.PostData({ |
+ 'owner': ['b@example.com'], |
+ }) |
+ parsed_users = tracker_helpers._ParseIssueRequestUsers( |
+ 'fake connection', post_data, self.services) |
+ self.assertEquals('b@example.com', parsed_users.owner_username) |
+ self.assertEquals(TEST_ID_MAP['b@example.com'], parsed_users.owner_id) |
+ self.assertEquals([], parsed_users.cc_usernames) |
+ self.assertEquals([], parsed_users.cc_usernames_remove) |
+ self.assertEquals([], parsed_users.cc_ids) |
+ self.assertEquals([], parsed_users.cc_ids_remove) |
+ |
+ post_data = fake.PostData({ |
+ 'owner': ['b@example.com'], |
+ }) |
+ parsed_users = tracker_helpers._ParseIssueRequestUsers( |
+ 'fake connection', post_data, self.services) |
+ self.assertEquals('b@example.com', parsed_users.owner_username) |
+ self.assertEquals(TEST_ID_MAP['b@example.com'], parsed_users.owner_id) |
+ self.assertEquals([], parsed_users.cc_usernames) |
+ self.assertEquals([], parsed_users.cc_usernames_remove) |
+ self.assertEquals([], parsed_users.cc_ids) |
+ self.assertEquals([], parsed_users.cc_ids_remove) |
+ |
+ post_data = fake.PostData({ |
+ 'cc': ['b@example.com'], |
+ }) |
+ parsed_users = tracker_helpers._ParseIssueRequestUsers( |
+ 'fake connection', post_data, self.services) |
+ self.assertEquals('', parsed_users.owner_username) |
+ self.assertEquals( |
+ framework_constants.NO_USER_SPECIFIED, parsed_users.owner_id) |
+ self.assertEquals(['b@example.com'], parsed_users.cc_usernames) |
+ self.assertEquals([], parsed_users.cc_usernames_remove) |
+ self.assertEquals([TEST_ID_MAP['b@example.com']], parsed_users.cc_ids) |
+ self.assertEquals([], parsed_users.cc_ids_remove) |
+ |
+ post_data = fake.PostData({ |
+ 'cc': ['-b@example.com, c@example.com,,' |
+ 'a@example.com,'], |
+ }) |
+ parsed_users = tracker_helpers._ParseIssueRequestUsers( |
+ 'fake connection', post_data, self.services) |
+ self.assertEquals('', parsed_users.owner_username) |
+ self.assertEquals( |
+ framework_constants.NO_USER_SPECIFIED, parsed_users.owner_id) |
+ self.assertItemsEqual(['c@example.com', 'a@example.com'], |
+ parsed_users.cc_usernames) |
+ self.assertEquals(['b@example.com'], parsed_users.cc_usernames_remove) |
+ self.assertItemsEqual([TEST_ID_MAP['c@example.com'], |
+ TEST_ID_MAP['a@example.com']], |
+ parsed_users.cc_ids) |
+ self.assertEquals([TEST_ID_MAP['b@example.com']], |
+ parsed_users.cc_ids_remove) |
+ |
+ post_data = fake.PostData({ |
+ 'owner': ['fuhqwhgads@example.com'], |
+ 'cc': ['c@example.com, fuhqwhgads@example.com'], |
+ }) |
+ parsed_users = tracker_helpers._ParseIssueRequestUsers( |
+ 'fake connection', post_data, self.services) |
+ self.assertEquals('fuhqwhgads@example.com', parsed_users.owner_username) |
+ gen_uid = framework_helpers.MurmurHash3_x86_32(parsed_users.owner_username) |
+ self.assertEquals(gen_uid, parsed_users.owner_id) # autocreated user |
+ self.assertItemsEqual( |
+ ['c@example.com', 'fuhqwhgads@example.com'], parsed_users.cc_usernames) |
+ self.assertEquals([], parsed_users.cc_usernames_remove) |
+ self.assertItemsEqual( |
+ [TEST_ID_MAP['c@example.com'], gen_uid], parsed_users.cc_ids) |
+ self.assertEquals([], parsed_users.cc_ids_remove) |
+ |
+ def testIsValidIssueOwner(self): |
+ project = project_pb2.Project() |
+ project.owner_ids.extend([1L, 2L]) |
+ project.committer_ids.extend([3L]) |
+ project.contributor_ids.extend([4L, 999L]) |
+ |
+ valid, _ = tracker_helpers.IsValidIssueOwner( |
+ 'fake cnxn', project, framework_constants.NO_USER_SPECIFIED, |
+ self.services) |
+ self.assertTrue(valid) |
+ |
+ valid, _ = tracker_helpers.IsValidIssueOwner( |
+ 'fake cnxn', project, 1L, |
+ self.services) |
+ self.assertTrue(valid) |
+ valid, _ = tracker_helpers.IsValidIssueOwner( |
+ 'fake cnxn', project, 2L, |
+ self.services) |
+ self.assertTrue(valid) |
+ valid, _ = tracker_helpers.IsValidIssueOwner( |
+ 'fake cnxn', project, 3L, |
+ self.services) |
+ self.assertTrue(valid) |
+ valid, _ = tracker_helpers.IsValidIssueOwner( |
+ 'fake cnxn', project, 4L, |
+ self.services) |
+ self.assertTrue(valid) |
+ |
+ valid, _ = tracker_helpers.IsValidIssueOwner( |
+ 'fake cnxn', project, 7L, |
+ self.services) |
+ self.assertFalse(valid) |
+ |
+ valid, _ = tracker_helpers.IsValidIssueOwner( |
+ 'fake cnxn', project, 999L, |
+ self.services) |
+ self.assertFalse(valid) |
+ |
+ def testGetAllowedOpenAndClosedRelatedIssues(self): |
+ gaoacri = tracker_helpers.GetAllowedOpenAndClosedRelatedIssues |
+ opened = { |
+ 100001: _Issue('proj', 1, 'summary 1', 'New'), |
+ 100002: _Issue('proj', 2, 'summary 2', 'Accepted'), |
+ } |
+ closed = { |
+ 100003: _Issue('proj', 3, 'summary 3', 'Accepted'), |
+ 100004: _Issue('proj', 4, 'summary 4', 'Invalid'), |
+ } |
+ project = project_pb2.Project() |
+ project.project_id = 789 |
+ project.project_name = 'proj' |
+ project.state = project_pb2.ProjectState.LIVE |
+ mr = testing_helpers.MakeMonorailRequest(project=project) |
+ fake_issue_service = testing_helpers.Blank( |
+ GetOpenAndClosedIssues=lambda _cnxn, iids: ( |
+ [opened[iid] for iid in iids if iid in opened], |
+ [closed[iid] for iid in iids if iid in closed])) |
+ fake_config_service = testing_helpers.Blank( |
+ GetProjectConfigs=lambda _cnxn, pids: ( |
+ {pid: tracker_bizobj.MakeDefaultProjectIssueConfig(pid) |
+ for pid in pids})) |
+ fake_project_service = testing_helpers.Blank( |
+ GetProjects=lambda _, project_ids: {project.project_id: project}) |
+ services = service_manager.Services( |
+ issue=fake_issue_service, config=fake_config_service, |
+ project=fake_project_service) |
+ |
+ issue = tracker_pb2.Issue() |
+ issue.project_id = 789 |
+ # No merged into, no blocking, no blocked on. |
+ open_dict, closed_dict = gaoacri(services, mr, issue) |
+ self.assertEqual({}, open_dict) |
+ self.assertEqual({}, closed_dict) |
+ |
+ # An open "merged into" |
+ issue.merged_into = 100001 |
+ open_dict, closed_dict = gaoacri(services, mr, issue) |
+ self.assertEqual({100001: opened[100001]}, open_dict) |
+ self.assertEqual({}, closed_dict) |
+ |
+ # A closed "merged into" |
+ issue.merged_into = 100003 |
+ open_dict, closed_dict = gaoacri(services, mr, issue) |
+ self.assertEqual({}, open_dict) |
+ self.assertEqual({100003: closed[100003]}, closed_dict) |
+ |
+ # Some blocking and blocked on |
+ issue.blocking_iids.append(100001) |
+ issue.blocked_on_iids.append(100004) |
+ open_dict, closed_dict = gaoacri(services, mr, issue) |
+ self.assertEqual({100001: opened[100001]}, open_dict) |
+ self.assertEqual({100003: closed[100003], |
+ 100004: closed[100004]}, closed_dict) |
+ |
+ def testMergeCCsAndAddComment(self): |
+ target_issue = fake.MakeTestIssue( |
+ 789, 10, 'Target issue', 'New', 111L) |
+ source_issue = fake.MakeTestIssue( |
+ 789, 100, 'Source issue', 'New', 222L) |
+ source_issue.cc_ids.append(111L) |
+ # Issue without owner |
+ source_issue_2 = fake.MakeTestIssue( |
+ 789, 101, 'Source issue 2', 'New', 0L) |
+ |
+ project = self.services.project.TestAddProject( |
+ 'testproj', owner_ids=[222L], project_id=789) |
+ self.services.issue.TestAddIssue(target_issue) |
+ self.services.issue.TestAddIssue(source_issue) |
+ self.services.issue.TestAddIssue(source_issue_2) |
+ |
+ # We copy this list so that it isn't updated by the test framework |
+ initial_issue_comments = ( |
+ self.services.issue.GetCommentsForIssue( |
+ 'fake cnxn', target_issue.issue_id)[:]) |
+ mr = testing_helpers.MakeMonorailRequest(user_info={'user_id': 111L}) |
+ |
+ # Merging source into target should create a comment. |
+ self.assertIsNotNone( |
+ tracker_helpers.MergeCCsAndAddComment( |
+ self.services, mr, source_issue, project, target_issue)) |
+ updated_issue_comments = self.services.issue.GetCommentsForIssue( |
+ 'fake cnxn', target_issue.issue_id) |
+ for comment in initial_issue_comments: |
+ self.assertIn(comment, updated_issue_comments) |
+ self.assertEqual( |
+ len(initial_issue_comments) + 1, len(updated_issue_comments)) |
+ |
+ # Merging source into target should add source's owner to target's CCs. |
+ updated_target_issue = self.services.issue.GetIssueByLocalID( |
+ 'fake cnxn', 789, 10) |
+ self.assertIn(111L, updated_target_issue.cc_ids) |
+ self.assertIn(222L, updated_target_issue.cc_ids) |
+ |
+ # Merging source 2 into target should make a comment, but not update CCs. |
+ self.assertIsNotNone( |
+ tracker_helpers.MergeCCsAndAddComment( |
+ self.services, mr, source_issue_2, project, updated_target_issue)) |
+ updated_target_issue = self.services.issue.GetIssueByLocalID( |
+ 'fake cnxn', 789, 10) |
+ self.assertNotIn(0L, updated_target_issue.cc_ids) |
+ |
+ def testMergeCCsAndAddCommentRestrictedSourceIssue(self): |
+ target_issue = fake.MakeTestIssue( |
+ 789, 10, 'Target issue', 'New', 222L) |
+ target_issue_2 = fake.MakeTestIssue( |
+ 789, 11, 'Target issue 2', 'New', 222L) |
+ source_issue = fake.MakeTestIssue( |
+ 789, 100, 'Source issue', 'New', 111L) |
+ source_issue.cc_ids.append(111L) |
+ source_issue.labels.append('Restrict-View-Commit') |
+ target_issue_2.labels.append('Restrict-View-Commit') |
+ |
+ project = self.services.project.TestAddProject( |
+ 'testproj', owner_ids=[222L], project_id=789) |
+ self.services.issue.TestAddIssue(source_issue) |
+ self.services.issue.TestAddIssue(target_issue) |
+ self.services.issue.TestAddIssue(target_issue_2) |
+ |
+ # We copy this list so that it isn't updated by the test framework |
+ initial_issue_comments = self.services.issue.GetCommentsForIssue( |
+ 'fake cnxn', target_issue.issue_id)[:] |
+ mr = testing_helpers.MakeMonorailRequest(user_info={'user_id': 111L}) |
+ self.assertIsNotNone( |
+ tracker_helpers.MergeCCsAndAddComment( |
+ self.services, mr, source_issue, project, target_issue)) |
+ |
+ # When the source is restricted, we update the target comments... |
+ updated_issue_comments = self.services.issue.GetCommentsForIssue( |
+ 'fake cnxn', target_issue.issue_id) |
+ for comment in initial_issue_comments: |
+ self.assertIn(comment, updated_issue_comments) |
+ self.assertEqual( |
+ len(initial_issue_comments) + 1, len(updated_issue_comments)) |
+ # ...but not the target CCs... |
+ updated_target_issue = self.services.issue.GetIssueByLocalID( |
+ 'fake cnxn', 789, 10) |
+ self.assertNotIn(111L, updated_target_issue.cc_ids) |
+ # ...unless both issues have the same restrictions. |
+ self.assertIsNotNone( |
+ tracker_helpers.MergeCCsAndAddComment( |
+ self.services, mr, source_issue, project, target_issue_2)) |
+ updated_target_issue_2 = self.services.issue.GetIssueByLocalID( |
+ 'fake cnxn', 789, 11) |
+ self.assertIn(111L, updated_target_issue_2.cc_ids) |
+ |
+ def testFormatIssueListURLNoCurrentState(self): |
+ config = tracker_pb2.ProjectIssueConfig() |
+ path = '/p/proj/issues/detail?id=123' |
+ mr = testing_helpers.MakeMonorailRequest( |
+ path=path, headers={'Host': 'code.google.com'}) |
+ mr.ComputeColSpec(config) |
+ |
+ absolute_base_url = 'http://code.google.com' |
+ |
+ url_1 = tracker_helpers.FormatIssueListURL(mr, config) |
+ self.assertEquals( |
+ '%s/p/proj/issues/list?%s' % ( |
+ absolute_base_url, self.default_colspec_param), |
+ url_1) |
+ |
+ url_2 = tracker_helpers.FormatIssueListURL( |
+ mr, config, foo=123) |
+ self.assertEquals( |
+ '%s/p/proj/issues/list?%s&foo=123' % ( |
+ absolute_base_url, self.default_colspec_param), |
+ url_2) |
+ |
+ url_3 = tracker_helpers.FormatIssueListURL( |
+ mr, config, foo=123, bar='abc') |
+ self.assertEquals( |
+ '%s/p/proj/issues/list?bar=abc&%s&foo=123' % ( |
+ absolute_base_url, self.default_colspec_param), |
+ url_3) |
+ |
+ url_4 = tracker_helpers.FormatIssueListURL( |
+ mr, config, baz='escaped+encoded&and100% "safe"') |
+ self.assertEquals( |
+ '%s/p/proj/issues/list?' |
+ 'baz=escaped%%2Bencoded%%26and100%%25%%20%%22safe%%22&%s' % ( |
+ absolute_base_url, self.default_colspec_param), |
+ url_4) |
+ |
+ def testFormatIssueListURLKeepCurrentState(self): |
+ config = tracker_pb2.ProjectIssueConfig() |
+ path = '/p/proj/issues/detail?id=123&sort=aa&colspec=a b c&groupby=d' |
+ mr = testing_helpers.MakeMonorailRequest( |
+ path=path, headers={'Host': 'localhost:8080'}) |
+ mr.ComputeColSpec(config) |
+ |
+ absolute_base_url = 'http://localhost:8080' |
+ |
+ url_1 = tracker_helpers.FormatIssueListURL(mr, config) |
+ self.assertEquals( |
+ '%s/p/proj/issues/list?colspec=a%%20b%%20c' |
+ '&groupby=d&sort=aa' % absolute_base_url, |
+ url_1) |
+ |
+ url_2 = tracker_helpers.FormatIssueListURL( |
+ mr, config, foo=123) |
+ self.assertEquals( |
+ '%s/p/proj/issues/list?' |
+ 'colspec=a%%20b%%20c&foo=123&groupby=d&sort=aa' % absolute_base_url, |
+ url_2) |
+ |
+ url_3 = tracker_helpers.FormatIssueListURL( |
+ mr, config, colspec='X Y Z') |
+ self.assertEquals( |
+ '%s/p/proj/issues/list?colspec=a%%20b%%20c' |
+ '&groupby=d&sort=aa' % absolute_base_url, |
+ url_3) |
+ |
+ def testFormatRelativeIssueURL(self): |
+ self.assertEquals( |
+ '/p/proj/issues/attachment', |
+ tracker_helpers.FormatRelativeIssueURL( |
+ 'proj', urls.ISSUE_ATTACHMENT)) |
+ |
+ self.assertEquals( |
+ '/p/proj/issues/detail?id=123', |
+ tracker_helpers.FormatRelativeIssueURL( |
+ 'proj', urls.ISSUE_DETAIL, id=123)) |
+ |
+ |
+class MakeViewsForUsersInIssuesTest(unittest.TestCase): |
+ |
+ def setUp(self): |
+ self.issue1 = _Issue('proj', 1, 'summary 1', 'New') |
+ self.issue1.owner_id = 1001 |
+ self.issue1.reporter_id = 1002 |
+ |
+ self.issue2 = _Issue('proj', 2, 'summary 2', 'New') |
+ self.issue2.owner_id = 2001 |
+ self.issue2.reporter_id = 2002 |
+ self.issue2.cc_ids.extend([1, 1001, 1002, 1003]) |
+ |
+ self.issue3 = _Issue('proj', 3, 'summary 3', 'New') |
+ self.issue3.owner_id = 1001 |
+ self.issue3.reporter_id = 3002 |
+ |
+ self.user = fake.UserService() |
+ for user_id in [1, 1001, 1002, 1003, 2001, 2002, 3002]: |
+ self.user.TestAddUser( |
+ 'test%d' % user_id, user_id, add_user=True) |
+ |
+ def testMakeViewsForUsersInIssues(self): |
+ issue_list = [self.issue1, self.issue2, self.issue3] |
+ users_by_id = tracker_helpers.MakeViewsForUsersInIssues( |
+ 'fake cnxn', issue_list, self.user) |
+ self.assertItemsEqual([1, 1001, 1002, 1003, 2001, 2002, 3002], |
+ users_by_id.keys()) |
+ for user_id in [1001, 1002, 1003, 2001]: |
+ self.assertEqual(users_by_id[user_id].user_id, user_id) |
+ |
+ def testMakeViewsForUsersInIssuesOmittingSome(self): |
+ issue_list = [self.issue1, self.issue2, self.issue3] |
+ users_by_id = tracker_helpers.MakeViewsForUsersInIssues( |
+ 'fake cnxn', issue_list, self.user, omit_ids=[1001, 1003]) |
+ self.assertItemsEqual([1, 1002, 2001, 2002, 3002], users_by_id.keys()) |
+ for user_id in [1002, 2001, 2002, 3002]: |
+ self.assertEqual(users_by_id[user_id].user_id, user_id) |
+ |
+ def testMakeViewsForUsersInIssuesEmpty(self): |
+ issue_list = [] |
+ users_by_id = tracker_helpers.MakeViewsForUsersInIssues( |
+ 'fake cnxn', issue_list, self.user) |
+ self.assertItemsEqual([], users_by_id.keys()) |
+ |
+ |
+class GetAllIssueProjectsTest(unittest.TestCase): |
+ issue_x_1 = tracker_pb2.Issue() |
+ issue_x_1.project_id = 789 |
+ issue_x_1.local_id = 1 |
+ issue_x_1.reporter_id = 1002 |
+ |
+ issue_x_2 = tracker_pb2.Issue() |
+ issue_x_2.project_id = 789 |
+ issue_x_2.local_id = 2 |
+ issue_x_2.reporter_id = 2002 |
+ |
+ issue_y_1 = tracker_pb2.Issue() |
+ issue_y_1.project_id = 678 |
+ issue_y_1.local_id = 1 |
+ issue_y_1.reporter_id = 2002 |
+ |
+ def setUp(self): |
+ self.project_service = fake.ProjectService() |
+ self.project_service.TestAddProject('proj-x', project_id=789) |
+ self.project_service.TestAddProject('proj-y', project_id=678) |
+ self.cnxn = 'fake connection' |
+ |
+ def testGetAllIssueProjects_Empty(self): |
+ self.assertEqual( |
+ {}, tracker_helpers.GetAllIssueProjects( |
+ self.cnxn, [], self.project_service)) |
+ |
+ def testGetAllIssueProjects_Normal(self): |
+ self.assertEqual( |
+ {789: self.project_service.GetProjectByName(self.cnxn, 'proj-x')}, |
+ tracker_helpers.GetAllIssueProjects( |
+ self.cnxn, [self.issue_x_1, self.issue_x_2], self.project_service)) |
+ self.assertEqual( |
+ {789: self.project_service.GetProjectByName(self.cnxn, 'proj-x'), |
+ 678: self.project_service.GetProjectByName(self.cnxn, 'proj-y')}, |
+ tracker_helpers.GetAllIssueProjects( |
+ self.cnxn, [self.issue_x_1, self.issue_x_2, self.issue_y_1], |
+ self.project_service)) |
+ |
+ |
+class FilterOutNonViewableIssuesTest(unittest.TestCase): |
+ owner_id = 111L |
+ committer_id = 222L |
+ nonmember_1_id = 1002L |
+ nonmember_2_id = 2002L |
+ nonmember_3_id = 3002L |
+ |
+ issue1 = tracker_pb2.Issue() |
+ issue1.project_name = 'proj' |
+ issue1.project_id = 789 |
+ issue1.local_id = 1 |
+ issue1.reporter_id = nonmember_1_id |
+ |
+ issue2 = tracker_pb2.Issue() |
+ issue2.project_name = 'proj' |
+ issue2.project_id = 789 |
+ issue2.local_id = 2 |
+ issue2.reporter_id = nonmember_2_id |
+ issue2.labels.extend(['foo', 'bar']) |
+ |
+ issue3 = tracker_pb2.Issue() |
+ issue3.project_name = 'proj' |
+ issue3.project_id = 789 |
+ issue3.local_id = 3 |
+ issue3.reporter_id = nonmember_3_id |
+ issue3.labels.extend(['restrict-view-commit']) |
+ |
+ issue4 = tracker_pb2.Issue() |
+ issue4.project_name = 'proj' |
+ issue4.project_id = 789 |
+ issue4.local_id = 4 |
+ issue4.reporter_id = nonmember_3_id |
+ issue4.labels.extend(['Foo', 'Restrict-View-Commit']) |
+ |
+ def setUp(self): |
+ self.user = user_pb2.User() |
+ self.project = self.MakeProject(project_pb2.ProjectState.LIVE) |
+ self.config = tracker_bizobj.MakeDefaultProjectIssueConfig( |
+ self.project.project_id) |
+ self.project_dict = {self.project.project_id: self.project} |
+ self.config_dict = {self.config.project_id: self.config} |
+ |
+ def MakeProject(self, state): |
+ p = project_pb2.Project( |
+ project_id=789, project_name='proj', state=state, |
+ owner_ids=[self.owner_id], committer_ids=[self.committer_id]) |
+ return p |
+ |
+ def testFilterOutNonViewableIssues_Member(self): |
+ # perms will be permissions.COMMITTER_ACTIVE_PERMISSIONSET |
+ filtered_issues = tracker_helpers.FilterOutNonViewableIssues( |
+ {self.committer_id}, self.user, self.project_dict, |
+ self.config_dict, |
+ [self.issue1, self.issue2, self.issue3, self.issue4]) |
+ self.assertListEqual([1, 2, 3, 4], |
+ [issue.local_id for issue in filtered_issues]) |
+ |
+ def testFilterOutNonViewableIssues_Owner(self): |
+ # perms will be permissions.OWNER_ACTIVE_PERMISSIONSET |
+ filtered_issues = tracker_helpers.FilterOutNonViewableIssues( |
+ {self.owner_id}, self.user, self.project_dict, self.config_dict, |
+ [self.issue1, self.issue2, self.issue3, self.issue4]) |
+ self.assertListEqual([1, 2, 3, 4], |
+ [issue.local_id for issue in filtered_issues]) |
+ |
+ def testFilterOutNonViewableIssues_Empty(self): |
+ # perms will be permissions.COMMITTER_ACTIVE_PERMISSIONSET |
+ filtered_issues = tracker_helpers.FilterOutNonViewableIssues( |
+ {self.committer_id}, self.user, self.project_dict, |
+ self.config_dict, []) |
+ self.assertListEqual([], filtered_issues) |
+ |
+ def testFilterOutNonViewableIssues_NonMember(self): |
+ # perms will be permissions.READ_ONLY_PERMISSIONSET |
+ filtered_issues = tracker_helpers.FilterOutNonViewableIssues( |
+ {self.nonmember_1_id}, self.user, self.project_dict, |
+ self.config_dict, [self.issue1, self.issue2, self.issue3, self.issue4]) |
+ self.assertListEqual([1, 2], |
+ [issue.local_id for issue in filtered_issues]) |
+ |
+ def testFilterOutNonViewableIssues_Reporter(self): |
+ # perms will be permissions.READ_ONLY_PERMISSIONSET |
+ filtered_issues = tracker_helpers.FilterOutNonViewableIssues( |
+ {self.nonmember_3_id}, self.user, self.project_dict, |
+ self.config_dict, [self.issue1, self.issue2, self.issue3, self.issue4]) |
+ self.assertListEqual([1, 2, 3, 4], |
+ [issue.local_id for issue in filtered_issues]) |
+ |
+ |
+class IssueMergeTest(unittest.TestCase): |
+ |
+ def setUp(self): |
+ self.cnxn = 'fake cnxn' |
+ self.services = service_manager.Services( |
+ config=fake.ConfigService(), |
+ issue=fake.IssueService(), |
+ user=fake.UserService(), |
+ project=fake.ProjectService(), |
+ issue_star=fake.IssueStarService(), |
+ spam=fake.SpamService() |
+ ) |
+ self.project = self.services.project.TestAddProject('proj', project_id=987) |
+ self.config = tracker_bizobj.MakeDefaultProjectIssueConfig( |
+ self.project.project_id) |
+ self.project_dict = {self.project.project_id: self.project} |
+ self.config_dict = {self.config.project_id: self.config} |
+ |
+ def testParseMergeFields_NotSpecified(self): |
+ issue = fake.MakeTestIssue(987, 1, 'summary', 'New', 111L) |
+ errors = template_helpers.EZTError() |
+ post_data = {} |
+ |
+ text, merge_into_issue = tracker_helpers.ParseMergeFields( |
+ self.cnxn, None, 'proj', post_data, 'New', self.config, issue, errors) |
+ self.assertEqual('', text) |
+ self.assertEqual(None, merge_into_issue) |
+ |
+ text, merge_into_issue = tracker_helpers.ParseMergeFields( |
+ self.cnxn, None, 'proj', post_data, 'Duplicate', self.config, issue, |
+ errors) |
+ self.assertEqual('', text) |
+ self.assertTrue(errors.merge_into_id) |
+ self.assertEqual(None, merge_into_issue) |
+ |
+ def testParseMergeFields_WrongStatus(self): |
+ issue = fake.MakeTestIssue(987, 1, 'summary', 'New', 111L) |
+ errors = template_helpers.EZTError() |
+ post_data = {'merge_into': '12'} |
+ |
+ text, merge_into_issue = tracker_helpers.ParseMergeFields( |
+ self.cnxn, None, 'proj', post_data, 'New', self.config, issue, errors) |
+ self.assertEqual('', text) |
+ self.assertEqual(None, merge_into_issue) |
+ |
+ def testParseMergeFields_NoSuchIssue(self): |
+ issue = fake.MakeTestIssue(987, 1, 'summary', 'New', 111L) |
+ issue.merged_into = 12 |
+ errors = template_helpers.EZTError() |
+ post_data = {'merge_into': '12'} |
+ |
+ text, merge_into_issue = tracker_helpers.ParseMergeFields( |
+ self.cnxn, self.services, 'proj', post_data, 'Duplicate', |
+ self.config, issue, errors) |
+ self.assertEqual('12', text) |
+ self.assertEqual(None, merge_into_issue) |
+ |
+ def testParseMergeFields_DontSelfMerge(self): |
+ issue = fake.MakeTestIssue(987, 1, 'summary', 'New', 111L) |
+ errors = template_helpers.EZTError() |
+ post_data = {'merge_into': '1'} |
+ |
+ text, merge_into_issue = tracker_helpers.ParseMergeFields( |
+ self.cnxn, self.services, 'proj', post_data, 'Duplicate', self.config, |
+ issue, errors) |
+ self.assertEqual('1', text) |
+ self.assertEqual(None, merge_into_issue) |
+ self.assertEqual('Cannot merge issue into itself', errors.merge_into_id) |
+ |
+ def testParseMergeFields_NewIssueToMerge(self): |
+ merged_local_id = self.services.issue.CreateIssue( |
+ self.cnxn, self.services, |
+ self.project.project_id, 'unused_summary', 'unused_status', 111L, |
+ [], [], [], [], 111L, 'unused_marked_description') |
+ mergee_local_id = self.services.issue.CreateIssue( |
+ self.cnxn, self.services, |
+ self.project.project_id, 'unused_summary', 'unused_status', 111L, |
+ [], [], [], [], 111L, 'unused_marked_description') |
+ merged_issue = self.services.issue.GetIssueByLocalID( |
+ self.cnxn, self.project.project_id, merged_local_id) |
+ mergee_issue = self.services.issue.GetIssueByLocalID( |
+ self.cnxn, self.project.project_id, mergee_local_id) |
+ |
+ errors = template_helpers.EZTError() |
+ post_data = {'merge_into': str(mergee_issue.local_id)} |
+ |
+ text, merge_into_issue = tracker_helpers.ParseMergeFields( |
+ self.cnxn, self.services, 'proj', post_data, 'Duplicate', self.config, |
+ merged_issue, errors) |
+ self.assertEqual(str(mergee_issue.local_id), text) |
+ self.assertEqual(mergee_issue, merge_into_issue) |
+ |
+ def testIsMergeAllowed(self): |
+ mr = testing_helpers.MakeMonorailRequest() |
+ issue = fake.MakeTestIssue(987, 1, 'summary', 'New', 111L) |
+ issue.project_name = self.project.project_name |
+ |
+ for (perm_set, expected_merge_allowed) in ( |
+ (permissions.READ_ONLY_PERMISSIONSET, False), |
+ (permissions.COMMITTER_INACTIVE_PERMISSIONSET, False), |
+ (permissions.COMMITTER_ACTIVE_PERMISSIONSET, True), |
+ (permissions.OWNER_ACTIVE_PERMISSIONSET, True)): |
+ mr.perms = perm_set |
+ merge_allowed = tracker_helpers.IsMergeAllowed(issue, mr, self.services) |
+ self.assertEquals(expected_merge_allowed, merge_allowed) |
+ |
+ def testMergeIssueStars(self): |
+ mr = testing_helpers.MakeMonorailRequest() |
+ mr.project_name = self.project.project_name |
+ mr.project = self.project |
+ |
+ config = self.services.config.GetProjectConfig( |
+ self.cnxn, self.project.project_id) |
+ self.services.issue_star.SetStar( |
+ self.cnxn, self.services, config, 1, 1, True) |
+ self.services.issue_star.SetStar( |
+ self.cnxn, self.services, config, 1, 2, True) |
+ self.services.issue_star.SetStar( |
+ self.cnxn, self.services, config, 1, 3, True) |
+ self.services.issue_star.SetStar( |
+ self.cnxn, self.services, config, 2, 3, True) |
+ self.services.issue_star.SetStar( |
+ self.cnxn, self.services, config, 2, 4, True) |
+ self.services.issue_star.SetStar( |
+ self.cnxn, self.services, config, 2, 5, True) |
+ |
+ new_starrers = tracker_helpers.GetNewIssueStarrers( |
+ self.cnxn, self.services, 1, 2) |
+ self.assertItemsEqual(new_starrers, [1, 2]) |
+ tracker_helpers.AddIssueStarrers( |
+ self.cnxn, self.services, mr, 2, self.project, new_starrers) |
+ issue_2_starrers = self.services.issue_star.LookupItemStarrers( |
+ self.cnxn, 2) |
+ # XXX(jrobbins): these tests incorrectly mix local IDs with IIDs. |
+ self.assertItemsEqual([1, 2, 3, 4, 5], issue_2_starrers) |
+ |
+ |
+if __name__ == '__main__': |
+ unittest.main() |