| Index: appengine/monorail/search/test/ast2ast_test.py
|
| diff --git a/appengine/monorail/search/test/ast2ast_test.py b/appengine/monorail/search/test/ast2ast_test.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..0a9f9d42f5238fca62d37564b27df992941f5304
|
| --- /dev/null
|
| +++ b/appengine/monorail/search/test/ast2ast_test.py
|
| @@ -0,0 +1,554 @@
|
| +# Copyright 2016 The Chromium Authors. All rights reserved.
|
| +# Use of this source code is govered by a BSD-style
|
| +# license that can be found in the LICENSE file or at
|
| +# https://developers.google.com/open-source/licenses/bsd
|
| +
|
| +"""Tests for the ast2ast module."""
|
| +
|
| +import unittest
|
| +
|
| +from proto import ast_pb2
|
| +from proto import tracker_pb2
|
| +from search import ast2ast
|
| +from search import query2ast
|
| +from services import service_manager
|
| +from testing import fake
|
| +from tracker import tracker_bizobj
|
| +
|
| +
|
| +BUILTIN_ISSUE_FIELDS = query2ast.BUILTIN_ISSUE_FIELDS
|
| +ANY_FIELD = query2ast.BUILTIN_ISSUE_FIELDS['any_field']
|
| +OWNER_FIELD = query2ast.BUILTIN_ISSUE_FIELDS['owner']
|
| +OWNER_ID_FIELD = query2ast.BUILTIN_ISSUE_FIELDS['owner_id']
|
| +
|
| +
|
| +class AST2ASTTest(unittest.TestCase):
|
| +
|
| + def setUp(self):
|
| + self.cnxn = 'fake cnxn'
|
| + self.config = tracker_bizobj.MakeDefaultProjectIssueConfig(789)
|
| + self.config.component_defs.append(
|
| + tracker_bizobj.MakeComponentDef(
|
| + 101, 789, 'UI', 'doc', False, [], [], 0, 0))
|
| + self.config.component_defs.append(
|
| + tracker_bizobj.MakeComponentDef(
|
| + 102, 789, 'UI>Search', 'doc', False, [], [], 0, 0))
|
| + self.config.component_defs.append(
|
| + tracker_bizobj.MakeComponentDef(
|
| + 201, 789, 'DB', 'doc', False, [], [], 0, 0))
|
| + self.config.component_defs.append(
|
| + tracker_bizobj.MakeComponentDef(
|
| + 301, 789, 'Search', 'doc', False, [], [], 0, 0))
|
| + self.services = service_manager.Services(
|
| + user=fake.UserService(),
|
| + project=fake.ProjectService(),
|
| + issue=fake.IssueService(),
|
| + config=fake.ConfigService())
|
| + self.services.user.TestAddUser('a@example.com', 111L)
|
| +
|
| + def testPreprocessAST_EmptyAST(self):
|
| + ast = ast_pb2.QueryAST() # No conjunctions in it.
|
| + new_ast = ast2ast.PreprocessAST(
|
| + self.cnxn, ast, [789], self.services, self.config)
|
| + self.assertEqual(ast, new_ast)
|
| +
|
| + def testPreprocessAST_Normal(self):
|
| + open_field = BUILTIN_ISSUE_FIELDS['open']
|
| + label_field = BUILTIN_ISSUE_FIELDS['label']
|
| + label_id_field = BUILTIN_ISSUE_FIELDS['label_id']
|
| + status_id_field = BUILTIN_ISSUE_FIELDS['status_id']
|
| + conds = [
|
| + ast_pb2.MakeCond(ast_pb2.QueryOp.EQ, [open_field], [], [1]),
|
| + ast_pb2.MakeCond(ast_pb2.QueryOp.EQ, [label_field], ['Hot'], [])]
|
| +
|
| + ast = ast_pb2.QueryAST()
|
| + ast.conjunctions.append(ast_pb2.Conjunction(conds=conds))
|
| + new_ast = ast2ast.PreprocessAST(
|
| + self.cnxn, ast, [789], self.services, self.config)
|
| + self.assertEqual(2, len(new_ast.conjunctions[0].conds))
|
| + new_cond_1, new_cond_2 = new_ast.conjunctions[0].conds
|
| + self.assertEqual(ast_pb2.QueryOp.NE, new_cond_1.op)
|
| + self.assertEqual([status_id_field], new_cond_1.field_defs)
|
| + self.assertEqual([7, 8, 9], new_cond_1.int_values)
|
| + self.assertEqual([], new_cond_1.str_values)
|
| + self.assertEqual(ast_pb2.QueryOp.EQ, new_cond_2.op)
|
| + self.assertEqual([label_id_field], new_cond_2.field_defs)
|
| + self.assertEqual([0], new_cond_2.int_values)
|
| + self.assertEqual([], new_cond_2.str_values)
|
| +
|
| + def testPreprocessIsOpenCond(self):
|
| + open_field = BUILTIN_ISSUE_FIELDS['open']
|
| + status_id_field = BUILTIN_ISSUE_FIELDS['status_id']
|
| +
|
| + # is:open -> status_id!=closed_status_ids
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.EQ, [open_field], [], [1])
|
| + new_cond = ast2ast._PreprocessIsOpenCond(
|
| + self.cnxn, cond, [789], self.services, self.config)
|
| + self.assertEqual(ast_pb2.QueryOp.NE, new_cond.op)
|
| + self.assertEqual([status_id_field], new_cond.field_defs)
|
| + self.assertEqual([7, 8, 9], new_cond.int_values)
|
| + self.assertEqual([], new_cond.str_values)
|
| +
|
| + # -is:open -> status_id=closed_status_ids
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.EQ, [open_field], [], [0])
|
| + new_cond = ast2ast._PreprocessIsOpenCond(
|
| + self.cnxn, cond, [789], self.services, self.config)
|
| + self.assertEqual(ast_pb2.QueryOp.EQ, new_cond.op)
|
| + self.assertEqual([status_id_field], new_cond.field_defs)
|
| + self.assertEqual([7, 8, 9], new_cond.int_values)
|
| + self.assertEqual([], new_cond.str_values)
|
| +
|
| + def testPreprocessBlockedOnCond_WithSingleProjectID(self):
|
| + blockedon_field = BUILTIN_ISSUE_FIELDS['blockedon']
|
| + blockedon_id_field = BUILTIN_ISSUE_FIELDS['blockedon_id']
|
| + self.services.project.TestAddProject('Project1', project_id=1)
|
| + issue1 = fake.MakeTestIssue(
|
| + project_id=1, local_id=1, summary='sum', status='new', owner_id=2,
|
| + issue_id=101)
|
| + issue2 = fake.MakeTestIssue(
|
| + project_id=1, local_id=2, summary='sum', status='new', owner_id=2,
|
| + issue_id=102)
|
| + self.services.issue.TestAddIssue(issue1)
|
| + self.services.issue.TestAddIssue(issue2)
|
| +
|
| + for local_ids, expected in (
|
| + (['1'], [101]), # One existing issue.
|
| + (['Project1:1'], [101]), # One existing issue with project prefix.
|
| + (['1', '2'], [101, 102]), # Two existing issues.
|
| + (['3'], [])): # Non-existant issue.
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.TEXT_HAS, [blockedon_field], local_ids, [])
|
| + new_cond = ast2ast._PreprocessBlockedOnCond(
|
| + self.cnxn, cond, [1], self.services, None)
|
| + self.assertEqual(ast_pb2.QueryOp.EQ, new_cond.op)
|
| + self.assertEqual([blockedon_id_field], new_cond.field_defs)
|
| + self.assertEqual(expected, new_cond.int_values)
|
| + self.assertEqual([], new_cond.str_values)
|
| +
|
| + def testPreprocessBlockedOnCond_WithMultipleProjectIDs(self):
|
| + blockedon_field = BUILTIN_ISSUE_FIELDS['blockedon']
|
| + blockedon_id_field = BUILTIN_ISSUE_FIELDS['blockedon_id']
|
| + self.services.project.TestAddProject('Project1', project_id=1)
|
| + self.services.project.TestAddProject('Project2', project_id=2)
|
| + issue1 = fake.MakeTestIssue(
|
| + project_id=1, local_id=1, summary='sum', status='new', owner_id=2,
|
| + issue_id=101)
|
| + issue2 = fake.MakeTestIssue(
|
| + project_id=2, local_id=2, summary='sum', status='new', owner_id=2,
|
| + issue_id=102)
|
| + self.services.issue.TestAddIssue(issue1)
|
| + self.services.issue.TestAddIssue(issue2)
|
| +
|
| + for local_ids, expected in (
|
| + (['Project1:1'], [101]),
|
| + (['Project1:1', 'Project2:2'], [101, 102])):
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.TEXT_HAS, [blockedon_field], local_ids, [])
|
| + new_cond = ast2ast._PreprocessBlockedOnCond(
|
| + self.cnxn, cond, [1, 2], self.services, None)
|
| + self.assertEqual(ast_pb2.QueryOp.EQ, new_cond.op)
|
| + self.assertEqual([blockedon_id_field], new_cond.field_defs)
|
| + self.assertEqual(expected, new_cond.int_values)
|
| + self.assertEqual([], new_cond.str_values)
|
| +
|
| + def testPreprocessBlockedOnCond_WithMultipleProjectIDs_NoPrefix(self):
|
| + blockedon_field = BUILTIN_ISSUE_FIELDS['blockedon']
|
| + self.services.project.TestAddProject('Project1', project_id=1)
|
| + self.services.project.TestAddProject('Project2', project_id=2)
|
| + issue1 = fake.MakeTestIssue(
|
| + project_id=1, local_id=1, summary='sum', status='new', owner_id=2,
|
| + issue_id=101)
|
| + issue2 = fake.MakeTestIssue(
|
| + project_id=2, local_id=2, summary='sum', status='new', owner_id=2,
|
| + issue_id=102)
|
| + self.services.issue.TestAddIssue(issue1)
|
| + self.services.issue.TestAddIssue(issue2)
|
| +
|
| + for local_ids in (['1'], ['1', '2'], ['3']):
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.TEXT_HAS, [blockedon_field], local_ids, [])
|
| + try:
|
| + ast2ast._PreprocessBlockedOnCond(
|
| + self.cnxn, cond, [1, 2], self.services, None)
|
| + self.fail('Expected an Exception.')
|
| + except ValueError, e:
|
| + self.assertEquals(
|
| + 'Searching for issues accross multiple/all projects without '
|
| + 'project prefixes is ambiguous and is currently not supported.',
|
| + e.message)
|
| +
|
| + def testPreprocessIsBlockedCond(self):
|
| + blocked_field = BUILTIN_ISSUE_FIELDS['blockedon_id']
|
| + for int_val, expected_op in ((1, ast_pb2.QueryOp.IS_DEFINED),
|
| + (0, ast_pb2.QueryOp.IS_NOT_DEFINED)):
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.EQ, [blocked_field], [], [int_val])
|
| + new_cond = ast2ast._PreprocessIsBlockedCond(
|
| + self.cnxn, cond, [100], self.services, None)
|
| + self.assertEqual(expected_op, new_cond.op)
|
| + self.assertEqual([blocked_field], new_cond.field_defs)
|
| + self.assertEqual([], new_cond.int_values)
|
| + self.assertEqual([], new_cond.str_values)
|
| +
|
| + def testPreprocessHasBlockedOnCond(self):
|
| + blocked_field = BUILTIN_ISSUE_FIELDS['blockedon_id']
|
| + for op in (ast_pb2.QueryOp.IS_DEFINED, ast_pb2.QueryOp.IS_NOT_DEFINED):
|
| + cond = ast_pb2.MakeCond(op, [blocked_field], [], [])
|
| + new_cond = ast2ast._PreprocessBlockedOnCond(
|
| + self.cnxn, cond, [100], self.services, None)
|
| + self.assertEqual(op, op)
|
| + self.assertEqual([blocked_field], new_cond.field_defs)
|
| + self.assertEqual([], new_cond.int_values)
|
| + self.assertEqual([], new_cond.str_values)
|
| +
|
| + def testPreprocessHasBlockingCond(self):
|
| + blocking_field = BUILTIN_ISSUE_FIELDS['blocking_id']
|
| + for op in (ast_pb2.QueryOp.IS_DEFINED, ast_pb2.QueryOp.IS_NOT_DEFINED):
|
| + cond = ast_pb2.MakeCond(op, [blocking_field], [], [])
|
| + new_cond = ast2ast._PreprocessBlockingCond(
|
| + self.cnxn, cond, [100], self.services, None)
|
| + self.assertEqual(op, op)
|
| + self.assertEqual([blocking_field], new_cond.field_defs)
|
| + self.assertEqual([], new_cond.int_values)
|
| + self.assertEqual([], new_cond.str_values)
|
| +
|
| + def testPreprocessBlockingCond_WithSingleProjectID(self):
|
| + blocking_field = BUILTIN_ISSUE_FIELDS['blocking']
|
| + blocking_id_field = BUILTIN_ISSUE_FIELDS['blocking_id']
|
| + self.services.project.TestAddProject('Project1', project_id=1)
|
| + issue1 = fake.MakeTestIssue(
|
| + project_id=1, local_id=1, summary='sum', status='new', owner_id=2,
|
| + issue_id=101)
|
| + issue2 = fake.MakeTestIssue(
|
| + project_id=1, local_id=2, summary='sum', status='new', owner_id=2,
|
| + issue_id=102)
|
| + self.services.issue.TestAddIssue(issue1)
|
| + self.services.issue.TestAddIssue(issue2)
|
| +
|
| + for local_ids, expected in (
|
| + (['1'], [101]), # One existing issue.
|
| + (['Project1:1'], [101]), # One existing issue with project prefix.
|
| + (['1', '2'], [101, 102]), # Two existing issues.
|
| + (['3'], [])): # Non-existant issue.
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.TEXT_HAS, [blocking_field], local_ids, [])
|
| + new_cond = ast2ast._PreprocessBlockingCond(
|
| + self.cnxn, cond, [1], self.services, None)
|
| + self.assertEqual(ast_pb2.QueryOp.EQ, new_cond.op)
|
| + self.assertEqual([blocking_id_field], new_cond.field_defs)
|
| + self.assertEqual(expected, new_cond.int_values)
|
| + self.assertEqual([], new_cond.str_values)
|
| +
|
| + def testPreprocessBlockingCond_WithMultipleProjectIDs(self):
|
| + blocking_field = BUILTIN_ISSUE_FIELDS['blocking']
|
| + blocking_id_field = BUILTIN_ISSUE_FIELDS['blocking_id']
|
| + self.services.project.TestAddProject('Project1', project_id=1)
|
| + self.services.project.TestAddProject('Project2', project_id=2)
|
| + issue1 = fake.MakeTestIssue(
|
| + project_id=1, local_id=1, summary='sum', status='new', owner_id=2,
|
| + issue_id=101)
|
| + issue2 = fake.MakeTestIssue(
|
| + project_id=2, local_id=2, summary='sum', status='new', owner_id=2,
|
| + issue_id=102)
|
| + self.services.issue.TestAddIssue(issue1)
|
| + self.services.issue.TestAddIssue(issue2)
|
| +
|
| + for local_ids, expected in (
|
| + (['Project1:1'], [101]),
|
| + (['Project1:1', 'Project2:2'], [101, 102])):
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.TEXT_HAS, [blocking_field], local_ids, [])
|
| + new_cond = ast2ast._PreprocessBlockingCond(
|
| + self.cnxn, cond, [1, 2], self.services, None)
|
| + self.assertEqual(ast_pb2.QueryOp.EQ, new_cond.op)
|
| + self.assertEqual([blocking_id_field], new_cond.field_defs)
|
| + self.assertEqual(expected, new_cond.int_values)
|
| + self.assertEqual([], new_cond.str_values)
|
| +
|
| + def testPreprocessBlockingCond_WithMultipleProjectIDs_NoPrefix(self):
|
| + blocking_field = BUILTIN_ISSUE_FIELDS['blocking']
|
| + self.services.project.TestAddProject('Project1', project_id=1)
|
| + self.services.project.TestAddProject('Project2', project_id=2)
|
| + issue1 = fake.MakeTestIssue(
|
| + project_id=1, local_id=1, summary='sum', status='new', owner_id=2,
|
| + issue_id=101)
|
| + issue2 = fake.MakeTestIssue(
|
| + project_id=2, local_id=2, summary='sum', status='new', owner_id=2,
|
| + issue_id=102)
|
| + self.services.issue.TestAddIssue(issue1)
|
| + self.services.issue.TestAddIssue(issue2)
|
| +
|
| + for local_ids in (['1'], ['1', '2'], ['3']):
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.TEXT_HAS, [blocking_field], local_ids, [])
|
| + try:
|
| + ast2ast._PreprocessBlockingCond(
|
| + self.cnxn, cond, [1, 2], self.services, None)
|
| + self.fail('Expected an Exception.')
|
| + except ValueError, e:
|
| + self.assertEquals(
|
| + 'Searching for issues accross multiple/all projects without '
|
| + 'project prefixes is ambiguous and is currently not supported.',
|
| + e.message)
|
| +
|
| + def testPreprocessStatusCond(self):
|
| + status_field = BUILTIN_ISSUE_FIELDS['status']
|
| + status_id_field = BUILTIN_ISSUE_FIELDS['status_id']
|
| +
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.IS_DEFINED, [status_field], [], [])
|
| + new_cond = ast2ast._PreprocessStatusCond(
|
| + self.cnxn, cond, [789], self.services, self.config)
|
| + self.assertEqual(ast_pb2.QueryOp.IS_DEFINED, new_cond.op)
|
| + self.assertEqual([status_id_field], new_cond.field_defs)
|
| + self.assertEqual([], new_cond.int_values)
|
| + self.assertEqual([], new_cond.str_values)
|
| +
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.EQ, [status_field], ['New', 'Assigned'], [])
|
| + new_cond = ast2ast._PreprocessStatusCond(
|
| + self.cnxn, cond, [789], self.services, self.config)
|
| + self.assertEqual(ast_pb2.QueryOp.EQ, new_cond.op)
|
| + self.assertEqual([status_id_field], new_cond.field_defs)
|
| + self.assertEqual([0, 1], new_cond.int_values)
|
| + self.assertEqual([], new_cond.str_values)
|
| +
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.TEXT_HAS, [status_field], [], [])
|
| + new_cond = ast2ast._PreprocessStatusCond(
|
| + self.cnxn, cond, [789], self.services, self.config)
|
| + self.assertEqual([], new_cond.int_values)
|
| +
|
| + def testPrefixRegex(self):
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.IS_DEFINED, [BUILTIN_ISSUE_FIELDS['label']],
|
| + ['Priority', 'Severity'], [])
|
| + regex = ast2ast._MakePrefixRegex(cond)
|
| + self.assertRegexpMatches('Priority-1', regex)
|
| + self.assertRegexpMatches('Severity-3', regex)
|
| + self.assertNotRegexpMatches('My-Priority', regex)
|
| +
|
| + def testKeyValueRegex(self):
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.KEY_HAS, [BUILTIN_ISSUE_FIELDS['label']],
|
| + ['Type-Feature', 'Type-Security'], [])
|
| + regex = ast2ast._MakeKeyValueRegex(cond)
|
| + self.assertRegexpMatches('Type-Feature', regex)
|
| + self.assertRegexpMatches('Type-Bug-Security', regex)
|
| + self.assertNotRegexpMatches('Type-Bug', regex)
|
| + self.assertNotRegexpMatches('Security-Feature', regex)
|
| +
|
| + def testKeyValueRegex_multipleKeys(self):
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.KEY_HAS, [BUILTIN_ISSUE_FIELDS['label']],
|
| + ['Type-Bug', 'Security-Bug'], [])
|
| + with self.assertRaises(ValueError):
|
| + ast2ast._MakeKeyValueRegex(cond)
|
| +
|
| + def testWordBoundryRegex(self):
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.TEXT_HAS, [BUILTIN_ISSUE_FIELDS['label']],
|
| + ['Type-Bug'], [])
|
| + regex = ast2ast._MakeKeyValueRegex(cond)
|
| + self.assertRegexpMatches('Type-Bug-Security', regex)
|
| + self.assertNotRegexpMatches('Type-BugSecurity', regex)
|
| +
|
| + def testPreprocessLabelCond(self):
|
| + label_field = BUILTIN_ISSUE_FIELDS['label']
|
| + label_id_field = BUILTIN_ISSUE_FIELDS['label_id']
|
| +
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.IS_DEFINED, [label_field], ['Priority'], [])
|
| + new_cond = ast2ast._PreprocessLabelCond(
|
| + self.cnxn, cond, [789], self.services, self.config)
|
| + self.assertEqual(ast_pb2.QueryOp.IS_DEFINED, new_cond.op)
|
| + self.assertEqual([label_id_field], new_cond.field_defs)
|
| + self.assertEqual([1, 2, 3], new_cond.int_values)
|
| + self.assertEqual([], new_cond.str_values)
|
| +
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.EQ, [label_field],
|
| + ['Priority-Low', 'Priority-High'], [])
|
| + new_cond = ast2ast._PreprocessLabelCond(
|
| + self.cnxn, cond, [789], self.services, self.config)
|
| + self.assertEqual(ast_pb2.QueryOp.EQ, new_cond.op)
|
| + self.assertEqual([label_id_field], new_cond.field_defs)
|
| + self.assertEqual([0, 1], new_cond.int_values)
|
| + self.assertEqual([], new_cond.str_values)
|
| +
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.KEY_HAS, [label_field],
|
| + ['Priority-Low', 'Priority-High'], [])
|
| + new_cond = ast2ast._PreprocessLabelCond(
|
| + self.cnxn, cond, [789], self.services, self.config)
|
| + self.assertEqual(ast_pb2.QueryOp.EQ, new_cond.op)
|
| + self.assertEqual([label_id_field], new_cond.field_defs)
|
| + self.assertEqual([1, 2, 3], new_cond.int_values)
|
| + self.assertEqual([], new_cond.str_values)
|
| +
|
| + def testPreprocessComponentCond_QuickOR(self):
|
| + component_field = BUILTIN_ISSUE_FIELDS['component']
|
| + component_id_field = BUILTIN_ISSUE_FIELDS['component_id']
|
| +
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.IS_DEFINED, [component_field], ['UI', 'DB'], [])
|
| + new_cond = ast2ast._PreprocessComponentCond(
|
| + self.cnxn, cond, [789], self.services, self.config)
|
| + self.assertEqual(ast_pb2.QueryOp.IS_DEFINED, new_cond.op)
|
| + self.assertEqual([component_id_field], new_cond.field_defs)
|
| + self.assertEqual([101, 102, 201], new_cond.int_values)
|
| + self.assertEqual([], new_cond.str_values)
|
| +
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.TEXT_HAS, [component_field], ['UI', 'DB'], [])
|
| + new_cond = ast2ast._PreprocessComponentCond(
|
| + self.cnxn, cond, [789], self.services, self.config)
|
| + self.assertEqual(ast_pb2.QueryOp.EQ, new_cond.op)
|
| + self.assertEqual([component_id_field], new_cond.field_defs)
|
| + self.assertEqual([101, 102, 201], new_cond.int_values)
|
| + self.assertEqual([], new_cond.str_values)
|
| +
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.TEXT_HAS, [component_field], [], [])
|
| + new_cond = ast2ast._PreprocessComponentCond(
|
| + self.cnxn, cond, [789], self.services, self.config)
|
| + self.assertEqual([], new_cond.int_values)
|
| +
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.TEXT_HAS, [component_field], ['unknown@example.com'],
|
| + [])
|
| + new_cond = ast2ast._PreprocessComponentCond(
|
| + self.cnxn, cond, [789], self.services, self.config)
|
| + self.assertEqual([], new_cond.int_values)
|
| +
|
| + def testPreprocessComponentCond_RootedAndNonRooted(self):
|
| + component_field = BUILTIN_ISSUE_FIELDS['component']
|
| + component_id_field = BUILTIN_ISSUE_FIELDS['component_id']
|
| +
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.TEXT_HAS, [component_field], ['UI'], [])
|
| + new_cond = ast2ast._PreprocessComponentCond(
|
| + self.cnxn, cond, [789], self.services, self.config)
|
| + self.assertEqual(ast_pb2.QueryOp.EQ, new_cond.op)
|
| + self.assertEqual([component_id_field], new_cond.field_defs)
|
| + self.assertEqual([101, 102], new_cond.int_values)
|
| + self.assertEqual([], new_cond.str_values)
|
| +
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.EQ, [component_field], ['UI'], [])
|
| + new_cond = ast2ast._PreprocessComponentCond(
|
| + self.cnxn, cond, [789], self.services, self.config)
|
| + self.assertEqual(ast_pb2.QueryOp.EQ, new_cond.op)
|
| + self.assertEqual([component_id_field], new_cond.field_defs)
|
| + self.assertEqual([101], new_cond.int_values)
|
| + self.assertEqual([], new_cond.str_values)
|
| +
|
| + def testPreprocessExactUsers_IsDefined(self):
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.IS_DEFINED, [OWNER_FIELD], ['a@example.com'], [])
|
| + new_cond = ast2ast._PreprocessExactUsers(
|
| + self.cnxn, cond, self.services.user, [OWNER_ID_FIELD])
|
| + self.assertEqual(ast_pb2.QueryOp.IS_DEFINED, new_cond.op)
|
| + self.assertEqual([OWNER_ID_FIELD], new_cond.field_defs)
|
| + self.assertEqual([], new_cond.int_values)
|
| + self.assertEqual([], new_cond.str_values)
|
| +
|
| + def testPreprocessExactUsers_UserFound(self):
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.TEXT_HAS, [OWNER_FIELD], ['a@example.com'], [])
|
| + new_cond = ast2ast._PreprocessExactUsers(
|
| + self.cnxn, cond, self.services.user, [OWNER_ID_FIELD])
|
| + self.assertEqual(ast_pb2.QueryOp.EQ, new_cond.op)
|
| + self.assertEqual([OWNER_ID_FIELD], new_cond.field_defs)
|
| + self.assertEqual([111L], new_cond.int_values)
|
| + self.assertEqual([], new_cond.str_values)
|
| +
|
| + def testPreprocessExactUsers_UserSpecifiedByID(self):
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.TEXT_HAS, [OWNER_FIELD], ['123'], [])
|
| + new_cond = ast2ast._PreprocessExactUsers(
|
| + self.cnxn, cond, self.services.user, [OWNER_ID_FIELD])
|
| + self.assertEqual(ast_pb2.QueryOp.EQ, new_cond.op)
|
| + self.assertEqual([OWNER_ID_FIELD], new_cond.field_defs)
|
| + self.assertEqual([123L], new_cond.int_values)
|
| + self.assertEqual([], new_cond.str_values)
|
| +
|
| + def testPreprocessExactUsers_NonEquality(self):
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.GE, [OWNER_ID_FIELD], ['111'], [])
|
| + new_cond = ast2ast._PreprocessExactUsers(
|
| + self.cnxn, cond, self.services.user, [OWNER_ID_FIELD])
|
| + self.assertEqual(cond, new_cond)
|
| +
|
| + def testPreprocessExactUsers_UserNotFound(self):
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.TEXT_HAS, [OWNER_FIELD], ['unknown@example.com'], [])
|
| + new_cond = ast2ast._PreprocessExactUsers(
|
| + self.cnxn, cond, self.services.user, [OWNER_ID_FIELD])
|
| + self.assertEqual(cond, new_cond)
|
| +
|
| + def testPreprocessCustomCond_User(self):
|
| + fd = tracker_pb2.FieldDef(
|
| + field_id=1, field_name='TPM',
|
| + field_type=tracker_pb2.FieldTypes.USER_TYPE)
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.TEXT_HAS, [fd], ['a@example.com'], [])
|
| + new_cond = ast2ast._PreprocessCustomCond(self.cnxn, cond, self.services)
|
| + self.assertEqual(ast_pb2.QueryOp.EQ, new_cond.op)
|
| + self.assertEqual(cond.field_defs, new_cond.field_defs)
|
| + self.assertEqual([111L], new_cond.int_values)
|
| + self.assertEqual([], new_cond.str_values)
|
| +
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.TEXT_HAS, [fd], ['111'], [])
|
| + new_cond = ast2ast._PreprocessCustomCond(self.cnxn, cond, self.services)
|
| + self.assertEqual(ast_pb2.QueryOp.EQ, new_cond.op)
|
| + self.assertEqual(cond.field_defs, new_cond.field_defs)
|
| + self.assertEqual([111L], new_cond.int_values)
|
| + self.assertEqual([], new_cond.str_values)
|
| +
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.TEXT_HAS, [fd], ['unknown@example.com'], [])
|
| + new_cond = ast2ast._PreprocessCustomCond(self.cnxn, cond, self.services)
|
| + self.assertEqual(cond, new_cond)
|
| +
|
| + def testPreprocessCustomCond_NonUser(self):
|
| + fd = tracker_pb2.FieldDef(
|
| + field_id=1, field_name='TPM',
|
| + field_type=tracker_pb2.FieldTypes.INT_TYPE)
|
| + cond = ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.TEXT_HAS, [fd], ['foo'], [123])
|
| + new_cond = ast2ast._PreprocessCustomCond(self.cnxn, cond, self.services)
|
| + self.assertEqual(cond, new_cond)
|
| +
|
| + fd.field_type = tracker_pb2.FieldTypes.STR_TYPE
|
| + new_cond = ast2ast._PreprocessCustomCond(self.cnxn, cond, self.services)
|
| + self.assertEqual(cond, new_cond)
|
| +
|
| + def testPreprocessCond_NoChange(self):
|
| + cond = ast_pb2.MakeCond(ast_pb2.QueryOp.TEXT_HAS, [ANY_FIELD], ['foo'], [])
|
| + self.assertEqual(
|
| + cond, ast2ast._PreprocessCond(self.cnxn, cond, [], None, None))
|
| +
|
| + def testTextOpToIntOp(self):
|
| + self.assertEqual(ast_pb2.QueryOp.EQ,
|
| + ast2ast._TextOpToIntOp(ast_pb2.QueryOp.TEXT_HAS))
|
| + self.assertEqual(ast_pb2.QueryOp.EQ,
|
| + ast2ast._TextOpToIntOp(ast_pb2.QueryOp.KEY_HAS))
|
| + self.assertEqual(ast_pb2.QueryOp.NE,
|
| + ast2ast._TextOpToIntOp(ast_pb2.QueryOp.NOT_TEXT_HAS))
|
| +
|
| + for enum_name, _enum_id in ast_pb2.QueryOp.to_dict().iteritems():
|
| + no_change_op = ast_pb2.QueryOp(enum_name)
|
| + if no_change_op not in (
|
| + ast_pb2.QueryOp.TEXT_HAS,
|
| + ast_pb2.QueryOp.NOT_TEXT_HAS,
|
| + ast_pb2.QueryOp.KEY_HAS):
|
| + self.assertEqual(no_change_op,
|
| + ast2ast._TextOpToIntOp(no_change_op))
|
| +
|
| +
|
| +if __name__ == '__main__':
|
| + unittest.main()
|
|
|