| Index: appengine/monorail/search/backendsearchpipeline.py
|
| diff --git a/appengine/monorail/search/backendsearchpipeline.py b/appengine/monorail/search/backendsearchpipeline.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..c3bb3bf20bf1a6fa5dfb321d9b5c5d3f658fb70e
|
| --- /dev/null
|
| +++ b/appengine/monorail/search/backendsearchpipeline.py
|
| @@ -0,0 +1,309 @@
|
| +# Copyright 2016 The Chromium Authors. All rights reserved.
|
| +# Use of this source code is govered by a BSD-style
|
| +# license that can be found in the LICENSE file or at
|
| +# https://developers.google.com/open-source/licenses/bsd
|
| +
|
| +"""Backend issue issue search and sorting.
|
| +
|
| +Each of several "besearch" backend jobs manages one shard of the overall set
|
| +of issues in the system. The backend search pipeline retrieves the issues
|
| +that match the user query, puts them into memcache, and returns them to
|
| +the frontend search pipeline.
|
| +"""
|
| +
|
| +import logging
|
| +import re
|
| +import time
|
| +
|
| +from google.appengine.api import memcache
|
| +
|
| +import settings
|
| +from features import savedqueries_helpers
|
| +from framework import framework_constants
|
| +from framework import framework_helpers
|
| +from framework import sorting
|
| +from framework import sql
|
| +from proto import ast_pb2
|
| +from proto import tracker_pb2
|
| +from search import ast2ast
|
| +from search import ast2select
|
| +from search import ast2sort
|
| +from search import query2ast
|
| +from search import searchpipeline
|
| +from services import tracker_fulltext
|
| +from services import fulltext_helpers
|
| +from tracker import tracker_bizobj
|
| +
|
| +
|
| +# Used in constructing the at-risk query.
|
| +AT_RISK_LABEL_RE = re.compile(r'^(restrict-view-.+)$', re.IGNORECASE)
|
| +
|
| +# Limit on the number of list items to show in debug log statements
|
| +MAX_LOG = 200
|
| +
|
| +
|
| +class BackendSearchPipeline(object):
|
| + """Manage the process of issue search, including Promises and caching.
|
| +
|
| + Even though the code is divided into several methods, the public
|
| + methods should be called in sequence, so the execution of the code
|
| + is pretty much in the order of the source code lines here.
|
| + """
|
| +
|
| + def __init__(
|
| + self, mr, services, prof, default_results_per_page,
|
| + query_project_names, logged_in_user_id, me_user_id):
|
| +
|
| + self.mr = mr
|
| + self.profiler = prof
|
| + self.services = services
|
| + self.default_results_per_page = default_results_per_page
|
| +
|
| + self.query_project_list = services.project.GetProjectsByName(
|
| + mr.cnxn, query_project_names).values()
|
| + self.query_project_ids = [
|
| + p.project_id for p in self.query_project_list]
|
| +
|
| + self.me_user_id = me_user_id
|
| + self.mr.auth.user_id = logged_in_user_id
|
| + if self.mr.auth.user_id:
|
| + self.mr.auth.effective_ids = services.usergroup.LookupMemberships(
|
| + mr.cnxn, self.mr.auth.user_id)
|
| + self.mr.auth.effective_ids.add(self.mr.auth.user_id)
|
| +
|
| + # The following fields are filled in as the pipeline progresses.
|
| + # The value None means that we still need to compute that value.
|
| + self.result_iids = None # Sorted issue IDs that match the query
|
| + self.search_limit_reached = False # True if search results limit is hit.
|
| +
|
| + # Projects that contain the result issues.
|
| + self.issue_projects = {p.project_id: p for p in self.query_project_list}
|
| +
|
| + self._MakePromises()
|
| +
|
| + def _MakePromises(self):
|
| + config_dict = self.services.config.GetProjectConfigs(
|
| + self.mr.cnxn, self.query_project_ids)
|
| + self.harmonized_config = tracker_bizobj.HarmonizeConfigs(
|
| + config_dict.values())
|
| +
|
| + self.canned_query = savedqueries_helpers.SavedQueryIDToCond(
|
| + self.mr.cnxn, self.services.features, self.mr.can)
|
| +
|
| + self.canned_query = searchpipeline.ReplaceKeywordsWithUserID(
|
| + self.me_user_id, self.canned_query)
|
| + self.user_query = searchpipeline.ReplaceKeywordsWithUserID(
|
| + self.me_user_id, self.mr.query)
|
| + logging.debug('Searching query: %s %s', self.canned_query, self.user_query)
|
| +
|
| + slice_term = ('Issue.shard = %s', [self.mr.shard_id])
|
| +
|
| + sd = sorting.ComputeSortDirectives(self.mr, self.harmonized_config)
|
| +
|
| + self.result_iids_promise = framework_helpers.Promise(
|
| + _GetQueryResultIIDs, self.mr.cnxn,
|
| + self.services, self.canned_query, self.user_query,
|
| + self.query_project_ids, self.harmonized_config, sd,
|
| + slice_term, self.mr.shard_id, self.mr.invalidation_timestep)
|
| +
|
| + def SearchForIIDs(self):
|
| + """Wait for the search Promises and store their results."""
|
| + with self.profiler.Phase('WaitOnPromises'):
|
| + self.result_iids, self.search_limit_reached = (
|
| + self.result_iids_promise.WaitAndGetValue())
|
| +
|
| +
|
| +def SearchProjectCan(
|
| + cnxn, services, project_ids, query_ast, shard_id, harmonized_config,
|
| + left_joins=None, where=None, sort_directives=None, query_desc=''):
|
| + """Return a list of issue global IDs in the projects that satisfy the query.
|
| +
|
| + Args:
|
| + cnxn: Regular database connection to the master DB.
|
| + services: interface to issue storage backends.
|
| + project_ids: list of int IDs of the project to search
|
| + query_ast: A QueryAST PB with conjunctions and conditions.
|
| + shard_id: limit search to the specified shard ID int.
|
| + harmonized_config: harmonized config for all projects being searched.
|
| + left_joins: SQL LEFT JOIN clauses that are needed in addition to
|
| + anything generated from the query_ast.
|
| + where: SQL WHERE clauses that are needed in addition to
|
| + anything generated from the query_ast.
|
| + sort_directives: list of strings specifying the columns to sort on.
|
| + query_desc: descriptive string for debugging.
|
| +
|
| + Returns:
|
| + (issue_ids, capped) where issue_ids is a list of issue issue_ids that
|
| + satisfy the query, and capped is True if the number of results were
|
| + capped due to an implementation limit.
|
| + """
|
| + logging.info('searching projects %r for AST %r', project_ids, query_ast)
|
| + start_time = time.time()
|
| + left_joins = left_joins or []
|
| + where = where or []
|
| + if project_ids:
|
| + cond_str = 'Issue.project_id IN (%s)' % sql.PlaceHolders(project_ids)
|
| + where.append((cond_str, project_ids))
|
| +
|
| + query_ast = ast2ast.PreprocessAST(
|
| + cnxn, query_ast, project_ids, services, harmonized_config)
|
| + logging.info('simplified AST is %r', query_ast)
|
| + try:
|
| + query_left_joins, query_where = ast2select.BuildSQLQuery(query_ast)
|
| + left_joins.extend(query_left_joins)
|
| + where.extend(query_where)
|
| + except ast2select.NoPossibleResults as e:
|
| + # TODO(jrobbins): inform the user that their query was impossible.
|
| + logging.info('Impossible query %s.\n %r\n\n', e.message, query_ast)
|
| + return [], False
|
| + logging.info('translated to left_joins %r', left_joins)
|
| + logging.info('translated to where %r', where)
|
| +
|
| + fts_capped = False
|
| + if query_ast.conjunctions:
|
| + # TODO(jrobbins): Handle "OR" in queries. For now, we just process the
|
| + # first conjunction.
|
| + assert len(query_ast.conjunctions) == 1
|
| + conj = query_ast.conjunctions[0]
|
| + full_text_iids, fts_capped = tracker_fulltext.SearchIssueFullText(
|
| + project_ids, conj, shard_id)
|
| + if full_text_iids is not None:
|
| + if not full_text_iids:
|
| + return [], False # No match on free-text terms, so don't bother DB.
|
| + cond_str = 'Issue.id IN (%s)' % sql.PlaceHolders(full_text_iids)
|
| + where.append((cond_str, full_text_iids))
|
| +
|
| + label_def_rows = []
|
| + status_def_rows = []
|
| + if sort_directives:
|
| + if project_ids:
|
| + for pid in project_ids:
|
| + label_def_rows.extend(services.config.GetLabelDefRows(cnxn, pid))
|
| + status_def_rows.extend(services.config.GetStatusDefRows(cnxn, pid))
|
| + else:
|
| + label_def_rows = services.config.GetLabelDefRowsAnyProject(cnxn)
|
| + status_def_rows = services.config.GetStatusDefRowsAnyProject(cnxn)
|
| +
|
| + harmonized_labels = tracker_bizobj.HarmonizeLabelOrStatusRows(
|
| + label_def_rows)
|
| + harmonized_statuses = tracker_bizobj.HarmonizeLabelOrStatusRows(
|
| + status_def_rows)
|
| + harmonized_fields = harmonized_config.field_defs
|
| + sort_left_joins, order_by = ast2sort.BuildSortClauses(
|
| + sort_directives, harmonized_labels, harmonized_statuses,
|
| + harmonized_fields)
|
| + logging.info('translated to sort left_joins %r', sort_left_joins)
|
| + logging.info('translated to order_by %r', order_by)
|
| +
|
| + issue_ids, db_capped = services.issue.RunIssueQuery(
|
| + cnxn, left_joins + sort_left_joins, where, order_by, shard_id=shard_id)
|
| + logging.warn('executed "%s" query %r for %d issues in %dms',
|
| + query_desc, query_ast, len(issue_ids),
|
| + int((time.time() - start_time) * 1000))
|
| + capped = fts_capped or db_capped
|
| + return issue_ids, capped
|
| +
|
| +def _FilterSpam(query_ast):
|
| + uses_spam = False
|
| + # TODO(jrobbins): Handle "OR" in queries. For now, we just modify the
|
| + # first conjunction.
|
| + conjunction = query_ast.conjunctions[0]
|
| + for condition in conjunction.conds:
|
| + for field in condition.field_defs:
|
| + if field.field_name == 'spam':
|
| + uses_spam = True
|
| +
|
| + if not uses_spam:
|
| + query_ast.conjunctions[0].conds.append(
|
| + ast_pb2.MakeCond(
|
| + ast_pb2.QueryOp.EQ,
|
| + [tracker_pb2.FieldDef(
|
| + field_name='spam',
|
| + field_type=tracker_pb2.FieldTypes.BOOL_TYPE)
|
| + ],
|
| + [], [0]))
|
| +
|
| + return query_ast
|
| +
|
| +def _GetQueryResultIIDs(
|
| + cnxn, services, canned_query, user_query,
|
| + query_project_ids, harmonized_config, sd, slice_term,
|
| + shard_id, invalidation_timestep):
|
| + """Do a search and return a list of matching issue IDs.
|
| +
|
| + Args:
|
| + cnxn: connection to the database.
|
| + services: interface to issue storage backends.
|
| + canned_query: string part of the query from the drop-down menu.
|
| + user_query: string part of the query that the user typed in.
|
| + query_project_ids: list of project IDs to search.
|
| + harmonized_config: combined configs for all the queried projects.
|
| + sd: list of sort directives.
|
| + slice_term: additional query term to narrow results to a logical shard
|
| + within a physical shard.
|
| + shard_id: int number of the database shard to search.
|
| + invalidation_timestep: int timestep to use keep memcached items fresh.
|
| +
|
| + Returns:
|
| + Tuple consisting of:
|
| + A list of issue issue_ids that match the user's query. An empty list, [],
|
| + is returned if no issues match the query.
|
| + Boolean that is set to True if the search results limit of this shard is
|
| + hit.
|
| + """
|
| + query_ast = _FilterSpam(query2ast.ParseUserQuery(
|
| + user_query, canned_query, query2ast.BUILTIN_ISSUE_FIELDS,
|
| + harmonized_config))
|
| +
|
| + logging.info('query_project_ids is %r', query_project_ids)
|
| +
|
| + is_fulltext_query = bool(
|
| + query_ast.conjunctions and
|
| + fulltext_helpers.BuildFTSQuery(
|
| + query_ast.conjunctions[0], tracker_fulltext.ISSUE_FULLTEXT_FIELDS))
|
| + expiration = framework_constants.MEMCACHE_EXPIRATION
|
| + if is_fulltext_query:
|
| + expiration = framework_constants.FULLTEXT_MEMCACHE_EXPIRATION
|
| +
|
| + result_iids, search_limit_reached = SearchProjectCan(
|
| + cnxn, services, query_project_ids, query_ast, shard_id,
|
| + harmonized_config, sort_directives=sd, where=[slice_term],
|
| + query_desc='getting query issue IDs')
|
| + logging.info('Found %d result_iids', len(result_iids))
|
| +
|
| + projects_str = ','.join(str(pid) for pid in sorted(query_project_ids))
|
| + projects_str = projects_str or 'all'
|
| + memcache_key = ';'.join([
|
| + projects_str, canned_query, user_query, ' '.join(sd), str(shard_id)])
|
| + memcache.set(memcache_key, (result_iids, invalidation_timestep),
|
| + time=expiration)
|
| + logging.info('set memcache key %r', memcache_key)
|
| +
|
| + search_limit_memcache_key = ';'.join([
|
| + projects_str, canned_query, user_query, ' '.join(sd),
|
| + 'search_limit_reached', str(shard_id)])
|
| + memcache.set(search_limit_memcache_key,
|
| + (search_limit_reached, invalidation_timestep),
|
| + time=expiration)
|
| + logging.info('set search limit memcache key %r',
|
| + search_limit_memcache_key)
|
| +
|
| + timestamps_for_projects = memcache.get_multi(
|
| + keys=(['%d;%d' % (pid, shard_id) for pid in query_project_ids] +
|
| + ['all:%d' % shard_id]))
|
| +
|
| + if query_project_ids:
|
| + for pid in query_project_ids:
|
| + key = '%d;%d' % (pid, shard_id)
|
| + if key not in timestamps_for_projects:
|
| + memcache.set(
|
| + key, invalidation_timestep,
|
| + time=framework_constants.MEMCACHE_EXPIRATION)
|
| + else:
|
| + key = 'all;%d' % shard_id
|
| + if key not in timestamps_for_projects:
|
| + memcache.set(
|
| + key, invalidation_timestep,
|
| + time=framework_constants.MEMCACHE_EXPIRATION)
|
| +
|
| + return result_iids, search_limit_reached
|
|
|