Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(287)

Side by Side Diff: appengine/monorail/search/backendsearchpipeline.py

Issue 1868553004: Open Source Monorail (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Rebase Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is govered by a BSD-style
3 # license that can be found in the LICENSE file or at
4 # https://developers.google.com/open-source/licenses/bsd
5
6 """Backend issue issue search and sorting.
7
8 Each of several "besearch" backend jobs manages one shard of the overall set
9 of issues in the system. The backend search pipeline retrieves the issues
10 that match the user query, puts them into memcache, and returns them to
11 the frontend search pipeline.
12 """
13
14 import logging
15 import re
16 import time
17
18 from google.appengine.api import memcache
19
20 import settings
21 from features import savedqueries_helpers
22 from framework import framework_constants
23 from framework import framework_helpers
24 from framework import sorting
25 from framework import sql
26 from proto import ast_pb2
27 from proto import tracker_pb2
28 from search import ast2ast
29 from search import ast2select
30 from search import ast2sort
31 from search import query2ast
32 from search import searchpipeline
33 from services import tracker_fulltext
34 from services import fulltext_helpers
35 from tracker import tracker_bizobj
36
37
38 # Used in constructing the at-risk query.
39 AT_RISK_LABEL_RE = re.compile(r'^(restrict-view-.+)$', re.IGNORECASE)
40
41 # Limit on the number of list items to show in debug log statements
42 MAX_LOG = 200
43
44
45 class BackendSearchPipeline(object):
46 """Manage the process of issue search, including Promises and caching.
47
48 Even though the code is divided into several methods, the public
49 methods should be called in sequence, so the execution of the code
50 is pretty much in the order of the source code lines here.
51 """
52
53 def __init__(
54 self, mr, services, prof, default_results_per_page,
55 query_project_names, logged_in_user_id, me_user_id):
56
57 self.mr = mr
58 self.profiler = prof
59 self.services = services
60 self.default_results_per_page = default_results_per_page
61
62 self.query_project_list = services.project.GetProjectsByName(
63 mr.cnxn, query_project_names).values()
64 self.query_project_ids = [
65 p.project_id for p in self.query_project_list]
66
67 self.me_user_id = me_user_id
68 self.mr.auth.user_id = logged_in_user_id
69 if self.mr.auth.user_id:
70 self.mr.auth.effective_ids = services.usergroup.LookupMemberships(
71 mr.cnxn, self.mr.auth.user_id)
72 self.mr.auth.effective_ids.add(self.mr.auth.user_id)
73
74 # The following fields are filled in as the pipeline progresses.
75 # The value None means that we still need to compute that value.
76 self.result_iids = None # Sorted issue IDs that match the query
77 self.search_limit_reached = False # True if search results limit is hit.
78
79 # Projects that contain the result issues.
80 self.issue_projects = {p.project_id: p for p in self.query_project_list}
81
82 self._MakePromises()
83
84 def _MakePromises(self):
85 config_dict = self.services.config.GetProjectConfigs(
86 self.mr.cnxn, self.query_project_ids)
87 self.harmonized_config = tracker_bizobj.HarmonizeConfigs(
88 config_dict.values())
89
90 self.canned_query = savedqueries_helpers.SavedQueryIDToCond(
91 self.mr.cnxn, self.services.features, self.mr.can)
92
93 self.canned_query = searchpipeline.ReplaceKeywordsWithUserID(
94 self.me_user_id, self.canned_query)
95 self.user_query = searchpipeline.ReplaceKeywordsWithUserID(
96 self.me_user_id, self.mr.query)
97 logging.debug('Searching query: %s %s', self.canned_query, self.user_query)
98
99 slice_term = ('Issue.shard = %s', [self.mr.shard_id])
100
101 sd = sorting.ComputeSortDirectives(self.mr, self.harmonized_config)
102
103 self.result_iids_promise = framework_helpers.Promise(
104 _GetQueryResultIIDs, self.mr.cnxn,
105 self.services, self.canned_query, self.user_query,
106 self.query_project_ids, self.harmonized_config, sd,
107 slice_term, self.mr.shard_id, self.mr.invalidation_timestep)
108
109 def SearchForIIDs(self):
110 """Wait for the search Promises and store their results."""
111 with self.profiler.Phase('WaitOnPromises'):
112 self.result_iids, self.search_limit_reached = (
113 self.result_iids_promise.WaitAndGetValue())
114
115
116 def SearchProjectCan(
117 cnxn, services, project_ids, query_ast, shard_id, harmonized_config,
118 left_joins=None, where=None, sort_directives=None, query_desc=''):
119 """Return a list of issue global IDs in the projects that satisfy the query.
120
121 Args:
122 cnxn: Regular database connection to the master DB.
123 services: interface to issue storage backends.
124 project_ids: list of int IDs of the project to search
125 query_ast: A QueryAST PB with conjunctions and conditions.
126 shard_id: limit search to the specified shard ID int.
127 harmonized_config: harmonized config for all projects being searched.
128 left_joins: SQL LEFT JOIN clauses that are needed in addition to
129 anything generated from the query_ast.
130 where: SQL WHERE clauses that are needed in addition to
131 anything generated from the query_ast.
132 sort_directives: list of strings specifying the columns to sort on.
133 query_desc: descriptive string for debugging.
134
135 Returns:
136 (issue_ids, capped) where issue_ids is a list of issue issue_ids that
137 satisfy the query, and capped is True if the number of results were
138 capped due to an implementation limit.
139 """
140 logging.info('searching projects %r for AST %r', project_ids, query_ast)
141 start_time = time.time()
142 left_joins = left_joins or []
143 where = where or []
144 if project_ids:
145 cond_str = 'Issue.project_id IN (%s)' % sql.PlaceHolders(project_ids)
146 where.append((cond_str, project_ids))
147
148 query_ast = ast2ast.PreprocessAST(
149 cnxn, query_ast, project_ids, services, harmonized_config)
150 logging.info('simplified AST is %r', query_ast)
151 try:
152 query_left_joins, query_where = ast2select.BuildSQLQuery(query_ast)
153 left_joins.extend(query_left_joins)
154 where.extend(query_where)
155 except ast2select.NoPossibleResults as e:
156 # TODO(jrobbins): inform the user that their query was impossible.
157 logging.info('Impossible query %s.\n %r\n\n', e.message, query_ast)
158 return [], False
159 logging.info('translated to left_joins %r', left_joins)
160 logging.info('translated to where %r', where)
161
162 fts_capped = False
163 if query_ast.conjunctions:
164 # TODO(jrobbins): Handle "OR" in queries. For now, we just process the
165 # first conjunction.
166 assert len(query_ast.conjunctions) == 1
167 conj = query_ast.conjunctions[0]
168 full_text_iids, fts_capped = tracker_fulltext.SearchIssueFullText(
169 project_ids, conj, shard_id)
170 if full_text_iids is not None:
171 if not full_text_iids:
172 return [], False # No match on free-text terms, so don't bother DB.
173 cond_str = 'Issue.id IN (%s)' % sql.PlaceHolders(full_text_iids)
174 where.append((cond_str, full_text_iids))
175
176 label_def_rows = []
177 status_def_rows = []
178 if sort_directives:
179 if project_ids:
180 for pid in project_ids:
181 label_def_rows.extend(services.config.GetLabelDefRows(cnxn, pid))
182 status_def_rows.extend(services.config.GetStatusDefRows(cnxn, pid))
183 else:
184 label_def_rows = services.config.GetLabelDefRowsAnyProject(cnxn)
185 status_def_rows = services.config.GetStatusDefRowsAnyProject(cnxn)
186
187 harmonized_labels = tracker_bizobj.HarmonizeLabelOrStatusRows(
188 label_def_rows)
189 harmonized_statuses = tracker_bizobj.HarmonizeLabelOrStatusRows(
190 status_def_rows)
191 harmonized_fields = harmonized_config.field_defs
192 sort_left_joins, order_by = ast2sort.BuildSortClauses(
193 sort_directives, harmonized_labels, harmonized_statuses,
194 harmonized_fields)
195 logging.info('translated to sort left_joins %r', sort_left_joins)
196 logging.info('translated to order_by %r', order_by)
197
198 issue_ids, db_capped = services.issue.RunIssueQuery(
199 cnxn, left_joins + sort_left_joins, where, order_by, shard_id=shard_id)
200 logging.warn('executed "%s" query %r for %d issues in %dms',
201 query_desc, query_ast, len(issue_ids),
202 int((time.time() - start_time) * 1000))
203 capped = fts_capped or db_capped
204 return issue_ids, capped
205
206 def _FilterSpam(query_ast):
207 uses_spam = False
208 # TODO(jrobbins): Handle "OR" in queries. For now, we just modify the
209 # first conjunction.
210 conjunction = query_ast.conjunctions[0]
211 for condition in conjunction.conds:
212 for field in condition.field_defs:
213 if field.field_name == 'spam':
214 uses_spam = True
215
216 if not uses_spam:
217 query_ast.conjunctions[0].conds.append(
218 ast_pb2.MakeCond(
219 ast_pb2.QueryOp.EQ,
220 [tracker_pb2.FieldDef(
221 field_name='spam',
222 field_type=tracker_pb2.FieldTypes.BOOL_TYPE)
223 ],
224 [], [0]))
225
226 return query_ast
227
228 def _GetQueryResultIIDs(
229 cnxn, services, canned_query, user_query,
230 query_project_ids, harmonized_config, sd, slice_term,
231 shard_id, invalidation_timestep):
232 """Do a search and return a list of matching issue IDs.
233
234 Args:
235 cnxn: connection to the database.
236 services: interface to issue storage backends.
237 canned_query: string part of the query from the drop-down menu.
238 user_query: string part of the query that the user typed in.
239 query_project_ids: list of project IDs to search.
240 harmonized_config: combined configs for all the queried projects.
241 sd: list of sort directives.
242 slice_term: additional query term to narrow results to a logical shard
243 within a physical shard.
244 shard_id: int number of the database shard to search.
245 invalidation_timestep: int timestep to use keep memcached items fresh.
246
247 Returns:
248 Tuple consisting of:
249 A list of issue issue_ids that match the user's query. An empty list, [],
250 is returned if no issues match the query.
251 Boolean that is set to True if the search results limit of this shard is
252 hit.
253 """
254 query_ast = _FilterSpam(query2ast.ParseUserQuery(
255 user_query, canned_query, query2ast.BUILTIN_ISSUE_FIELDS,
256 harmonized_config))
257
258 logging.info('query_project_ids is %r', query_project_ids)
259
260 is_fulltext_query = bool(
261 query_ast.conjunctions and
262 fulltext_helpers.BuildFTSQuery(
263 query_ast.conjunctions[0], tracker_fulltext.ISSUE_FULLTEXT_FIELDS))
264 expiration = framework_constants.MEMCACHE_EXPIRATION
265 if is_fulltext_query:
266 expiration = framework_constants.FULLTEXT_MEMCACHE_EXPIRATION
267
268 result_iids, search_limit_reached = SearchProjectCan(
269 cnxn, services, query_project_ids, query_ast, shard_id,
270 harmonized_config, sort_directives=sd, where=[slice_term],
271 query_desc='getting query issue IDs')
272 logging.info('Found %d result_iids', len(result_iids))
273
274 projects_str = ','.join(str(pid) for pid in sorted(query_project_ids))
275 projects_str = projects_str or 'all'
276 memcache_key = ';'.join([
277 projects_str, canned_query, user_query, ' '.join(sd), str(shard_id)])
278 memcache.set(memcache_key, (result_iids, invalidation_timestep),
279 time=expiration)
280 logging.info('set memcache key %r', memcache_key)
281
282 search_limit_memcache_key = ';'.join([
283 projects_str, canned_query, user_query, ' '.join(sd),
284 'search_limit_reached', str(shard_id)])
285 memcache.set(search_limit_memcache_key,
286 (search_limit_reached, invalidation_timestep),
287 time=expiration)
288 logging.info('set search limit memcache key %r',
289 search_limit_memcache_key)
290
291 timestamps_for_projects = memcache.get_multi(
292 keys=(['%d;%d' % (pid, shard_id) for pid in query_project_ids] +
293 ['all:%d' % shard_id]))
294
295 if query_project_ids:
296 for pid in query_project_ids:
297 key = '%d;%d' % (pid, shard_id)
298 if key not in timestamps_for_projects:
299 memcache.set(
300 key, invalidation_timestep,
301 time=framework_constants.MEMCACHE_EXPIRATION)
302 else:
303 key = 'all;%d' % shard_id
304 if key not in timestamps_for_projects:
305 memcache.set(
306 key, invalidation_timestep,
307 time=framework_constants.MEMCACHE_EXPIRATION)
308
309 return result_iids, search_limit_reached
OLDNEW
« no previous file with comments | « appengine/monorail/search/backendsearch.py ('k') | appengine/monorail/search/frontendsearchpipeline.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698