Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(260)

Side by Side Diff: appengine/monorail/search/frontendsearchpipeline.py

Issue 1868553004: Open Source Monorail (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Rebase Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is govered by a BSD-style
3 # license that can be found in the LICENSE file or at
4 # https://developers.google.com/open-source/licenses/bsd
5
6 """The FrontendSearchPipeline class manages issue search and sorting.
7
8 The frontend pipeline checks memcache for cached results in each shard. It
9 then calls backend jobs to do any shards that had a cache miss. On cache hit,
10 the cached results must be filtered by permissions, so the at-risk cache and
11 backends are consulted. Next, the sharded results are combined into an overall
12 list of IIDs. Then, that list is paginated and the issues on the current
13 pagination page can be shown. Alternatively, this class can determine just the
14 position the currently shown issue would occupy in the overall sorted list.
15 """
16
17 import json
18
19 import collections
20 import logging
21 import math
22 import random
23 import time
24
25 from google.appengine.api import apiproxy_stub_map
26 from google.appengine.api import memcache
27 from google.appengine.api import modules
28 from google.appengine.api import urlfetch
29
30 import settings
31 from features import savedqueries_helpers
32 from framework import framework_constants
33 from framework import framework_helpers
34 from framework import paginate
35 from framework import sorting
36 from framework import urls
37 from search import query2ast
38 from search import searchpipeline
39 from services import fulltext_helpers
40 from tracker import tracker_bizobj
41 from tracker import tracker_constants
42 from tracker import tracker_helpers
43
44
45 # Fail-fast responses usually finish in less than 50ms. If we see a failure
46 # in under that amount of time, we don't bother logging it.
47 FAIL_FAST_LIMIT_SEC = 0.050
48
49 # The choices help balance the cost of choosing samples vs. the cost of
50 # selecting issues that are in a range bounded by neighboring samples.
51 # Preferred chunk size parameters were determined by experimentation.
52 MIN_SAMPLE_CHUNK_SIZE = int(
53 math.sqrt(tracker_constants.DEFAULT_RESULTS_PER_PAGE))
54 MAX_SAMPLE_CHUNK_SIZE = int(math.sqrt(settings.search_limit_per_shard))
55 PREFERRED_NUM_CHUNKS = 50
56
57
58 class FrontendSearchPipeline(object):
59 """Manage the process of issue search, including backends and caching.
60
61 Even though the code is divided into several methods, the public
62 methods should be called in sequence, so the execution of the code
63 is pretty much in the order of the source code lines here.
64 """
65
66 def __init__(self, mr, services, prof, default_results_per_page):
67 self.mr = mr
68 self.services = services
69 self.profiler = prof
70 self.default_results_per_page = default_results_per_page
71 self.grid_mode = (mr.mode == 'grid')
72 self.grid_limited = False
73 self.pagination = None
74 self.num_skipped_at_start = 0
75 self.total_count = 0
76
77 self.query_project_names = set()
78 if mr.query_project_names:
79 self.query_project_names.update(mr.query_project_names)
80
81 projects = services.project.GetProjectsByName(
82 mr.cnxn, self.query_project_names).values()
83 self.query_project_ids = [p.project_id for p in projects]
84 if mr.project_name:
85 self.query_project_ids.append(mr.project_id)
86 self.query_project_names.add(mr.project_name)
87
88 config_dict = self.services.config.GetProjectConfigs(
89 mr.cnxn, self.query_project_ids)
90 self.harmonized_config = tracker_bizobj.HarmonizeConfigs(
91 config_dict.values())
92
93 # The following fields are filled in as the pipeline progresses.
94 # The value None means that we still need to compute that value.
95 self.users_by_id = {}
96 self.nonviewable_iids = {} # {shard_id: set(iid)}
97 self.unfiltered_iids = {} # {shard_id: [iid, ...]} needing perm checks.
98 self.filtered_iids = {} # {shard_id: [iid, ...]} already perm checked.
99 self.search_limit_reached = {} # {shard_id: [bool, ...]}.
100 self.counts = {}
101 self.allowed_iids = [] # Matching iids that user is permitted to view.
102 self.allowed_results = None # results that the user is permitted to view.
103 self.visible_results = None # allowed_results on current pagination page.
104 self.error_responses = set()
105
106 # Projects that contain the result issues. This starts off as a dict of
107 # all the query projects, but it can grow based on the found issues in the
108 # case where the user is searching across the entire site.
109 self.issue_projects = {p.project_id: p for p in projects}
110
111 error_msg = query2ast.CheckSyntax(
112 self.mr.query, self.harmonized_config, warnings=self.mr.warnings)
113 if error_msg:
114 self.mr.errors.query = error_msg
115
116 def SearchForIIDs(self):
117 """Use backends to search each shard and store their results."""
118 with self.profiler.Phase('Checking cache and calling Backends'):
119 rpc_tuples = _StartBackendSearch(
120 self.mr, self.query_project_names, self.query_project_ids,
121 self.harmonized_config, self.unfiltered_iids,
122 self.search_limit_reached, self.nonviewable_iids,
123 self.error_responses, self.services)
124
125 with self.profiler.Phase('Waiting for Backends'):
126 try:
127 _FinishBackendSearch(rpc_tuples)
128 except Exception as e:
129 logging.exception(e)
130 raise
131
132 if self.error_responses:
133 logging.error('%r error responses. Incomplete search results.',
134 self.error_responses)
135
136 with self.profiler.Phase('Filtering cached results'):
137 for shard_id in self.unfiltered_iids:
138 if shard_id not in self.nonviewable_iids:
139 logging.error(
140 'Not displaying shard %r because of no nonviewable_iids', shard_id)
141 self.error_responses.add(shard_id)
142 filtered_shard_iids = []
143 else:
144 unfiltered_shard_iids = self.unfiltered_iids[shard_id]
145 nonviewable_shard_iids = self.nonviewable_iids[shard_id]
146 # TODO(jrobbins): avoid creating large temporary lists.
147 filtered_shard_iids = [iid for iid in unfiltered_shard_iids
148 if iid not in nonviewable_shard_iids]
149 if self.grid_mode:
150 self.filtered_iids[shard_id] = filtered_shard_iids
151 else:
152 self.filtered_iids[shard_id] = filtered_shard_iids[
153 :self.mr.start + self.mr.num]
154 self.counts[shard_id] = len(filtered_shard_iids)
155
156 with self.profiler.Phase('Counting all filtered results'):
157 self.total_count = sum(self.counts.itervalues())
158
159 def MergeAndSortIssues(self):
160 """Merge and sort results from all shards into one combined list."""
161 with self.profiler.Phase('selecting issues to merge and sort'):
162 if not self.grid_mode:
163 self._NarrowFilteredIIDs()
164 self.allowed_iids = []
165 for filtered_shard_iids in self.filtered_iids.itervalues():
166 self.allowed_iids.extend(filtered_shard_iids)
167
168 # The grid view is not paginated, so limit the results shown to avoid
169 # generating a HTML page that would be too large.
170 limit = settings.max_issues_in_grid
171 if self.grid_mode and len(self.allowed_iids) > limit:
172 self.grid_limited = True
173 self.allowed_iids = self.allowed_iids[:limit]
174
175 with self.profiler.Phase('getting allowed results'):
176 self.allowed_results = self.services.issue.GetIssues(
177 self.mr.cnxn, self.allowed_iids)
178
179 # Note: At this point, we have results that are only sorted within
180 # each backend's shard. We still need to sort the merged result.
181 self._LookupNeededUsers(self.allowed_results)
182 with self.profiler.Phase('merging and sorting issues'):
183 self.allowed_results = _SortIssues(
184 self.mr, self.allowed_results, self.harmonized_config,
185 self.users_by_id)
186
187 def _NarrowFilteredIIDs(self):
188 """Combine filtered shards into a range of IIDs for issues to sort.
189
190 The niave way is to concatenate shard_iids[:start + num] for all
191 shards then select [start:start + num]. We do better by sampling
192 issues and then determining which of those samples are known to
193 come before start or after start+num. We then trim off all those IIDs
194 and sort a smaller range of IIDs that might actuall be displayed.
195 See the design doc at go/monorail-sorting.
196
197 This method modifies self.fitered_iids and self.num_skipped_at_start.
198 """
199 # Sample issues and skip those that are known to come before start.
200 # See the "Sorting in Monorail" design doc.
201
202 # If the result set is small, don't bother optimizing it.
203 orig_length = _TotalLength(self.filtered_iids)
204 if orig_length < self.mr.num * 4:
205 return
206
207 # 1. Get sample issues in each shard and sort them all together.
208 last = self.mr.start + self.mr.num
209 on_hand_samples = {}
210 needed_iids = []
211 for shard_id in self.filtered_iids:
212 self._AccumulateSampleIssues(
213 self.filtered_iids[shard_id], on_hand_samples, needed_iids)
214 retrieved_samples = self.services.issue.GetIssuesDict(
215 self.mr.cnxn, needed_iids)
216 sample_issues = on_hand_samples.values() + retrieved_samples.values()
217 self._LookupNeededUsers(sample_issues)
218 sample_issues = _SortIssues(
219 self.mr, sample_issues, self.harmonized_config, self.users_by_id)
220 sample_iids = [issue.issue_id for issue in sample_issues]
221
222 # 2. Trim off some IIDs that are sure to be positioned after last.
223 num_trimmed_end = _TrimEndShardedIIDs(self.filtered_iids, sample_iids, last)
224 logging.info('Trimmed %r issues from the end of shards', num_trimmed_end)
225
226 # 3. Trim off some IIDs that are sure to be posiitoned before start.
227 keep = _TotalLength(self.filtered_iids) - self.mr.start
228 # Reverse the sharded lists.
229 _ReverseShards(self.filtered_iids)
230 sample_iids.reverse()
231 self.num_skipped_at_start = _TrimEndShardedIIDs(
232 self.filtered_iids, sample_iids, keep)
233 logging.info('Trimmed %r issues from the start of shards',
234 self.num_skipped_at_start)
235 # Reverse sharded lists again to get back into forward order.
236 _ReverseShards(self.filtered_iids)
237
238 def DetermineIssuePosition(self, issue):
239 """Calculate info needed to show the issue flipper.
240
241 Args:
242 issue: The issue currently being viewed.
243
244 Returns:
245 A 3-tuple (prev_iid, index, next_iid) were prev_iid is the
246 IID of the previous issue in the total ordering (or None),
247 index is the index that the current issue has in the total
248 ordering, and next_iid is the next issue (or None). If the current
249 issue is not in the list of results at all, returns None, None, None.
250 """
251 # 1. If the current issue is not in the results at all, then exit.
252 if not any(issue.issue_id in filtered_shard_iids
253 for filtered_shard_iids in self.filtered_iids.itervalues()):
254 return None, None, None
255
256 # 2. Choose and retrieve sample issues in each shard.
257 samples_by_shard = {} # {shard_id: {iid: issue}}
258 needed_iids = []
259 for shard_id in self.filtered_iids:
260 samples_by_shard[shard_id] = {}
261 self._AccumulateSampleIssues(
262 self.filtered_iids[shard_id], samples_by_shard[shard_id], needed_iids)
263 retrieved_samples = self.services.issue.GetIssuesDict(
264 self.mr.cnxn, needed_iids)
265 for retrieved_iid, retrieved_issue in retrieved_samples.iteritems():
266 shard_id = retrieved_iid % settings.num_logical_shards
267 samples_by_shard[shard_id][retrieved_iid] = retrieved_issue
268
269 # 3. Build up partial results for each shard.
270 preceeding_counts = {} # dict {shard_id: num_issues_preceeding_current}
271 prev_candidates, next_candidates = [], []
272 for shard_id in self.filtered_iids:
273 prev_candidate, index_in_shard, next_candidate = (
274 self._DetermineIssuePositionInShard(
275 shard_id, issue, samples_by_shard[shard_id]))
276 preceeding_counts[shard_id] = index_in_shard
277 if prev_candidate:
278 prev_candidates.append(prev_candidate)
279 if next_candidate:
280 next_candidates.append(next_candidate)
281
282 # 4. Combine the results.
283 index = sum(preceeding_counts.itervalues())
284 prev_candidates = _SortIssues(
285 self.mr, prev_candidates, self.harmonized_config, self.users_by_id)
286 prev_iid = prev_candidates[-1].issue_id if prev_candidates else None
287 next_candidates = _SortIssues(
288 self.mr, next_candidates, self.harmonized_config, self.users_by_id)
289 next_iid = next_candidates[0].issue_id if next_candidates else None
290
291 return prev_iid, index, next_iid
292
293 def _DetermineIssuePositionInShard(self, shard_id, issue, sample_dict):
294 """Determine where the given issue would fit into results from a shard."""
295 # See the design doc for details. Basically, it first surveys the results
296 # to bound a range where the given issue would belong, then it fetches the
297 # issues in that range and sorts them.
298
299 filtered_shard_iids = self.filtered_iids[shard_id]
300
301 # 1. Select a sample of issues, leveraging ones we have in RAM already.
302 issues_on_hand = sample_dict.values()
303 if issue.issue_id not in sample_dict:
304 issues_on_hand.append(issue)
305
306 self._LookupNeededUsers(issues_on_hand)
307 sorted_on_hand = _SortIssues(
308 self.mr, issues_on_hand, self.harmonized_config, self.users_by_id)
309 sorted_on_hand_iids = [soh.issue_id for soh in sorted_on_hand]
310 index_in_on_hand = sorted_on_hand_iids.index(issue.issue_id)
311
312 # 2. Bound the gap around where issue belongs.
313 if index_in_on_hand == 0:
314 fetch_start = 0
315 else:
316 prev_on_hand_iid = sorted_on_hand_iids[index_in_on_hand - 1]
317 fetch_start = filtered_shard_iids.index(prev_on_hand_iid) + 1
318
319 if index_in_on_hand == len(sorted_on_hand) - 1:
320 fetch_end = len(filtered_shard_iids)
321 else:
322 next_on_hand_iid = sorted_on_hand_iids[index_in_on_hand + 1]
323 fetch_end = filtered_shard_iids.index(next_on_hand_iid)
324
325 # 3. Retrieve all the issues in that gap to get an exact answer.
326 fetched_issues = self.services.issue.GetIssues(
327 self.mr.cnxn, filtered_shard_iids[fetch_start:fetch_end])
328 if issue.issue_id not in filtered_shard_iids[fetch_start:fetch_end]:
329 fetched_issues.append(issue)
330 self._LookupNeededUsers(fetched_issues)
331 sorted_fetched = _SortIssues(
332 self.mr, fetched_issues, self.harmonized_config, self.users_by_id)
333 sorted_fetched_iids = [sf.issue_id for sf in sorted_fetched]
334 index_in_fetched = sorted_fetched_iids.index(issue.issue_id)
335
336 # 4. Find the issues that come immediately before and after the place where
337 # the given issue would belong in this shard.
338 if index_in_fetched > 0:
339 prev_candidate = sorted_fetched[index_in_fetched - 1]
340 elif index_in_on_hand > 0:
341 prev_candidate = sorted_on_hand[index_in_on_hand - 1]
342 else:
343 prev_candidate = None
344
345 if index_in_fetched < len(sorted_fetched) - 1:
346 next_candidate = sorted_fetched[index_in_fetched + 1]
347 elif index_in_on_hand < len(sorted_on_hand) - 1:
348 next_candidate = sorted_on_hand[index_in_on_hand + 1]
349 else:
350 next_candidate = None
351
352 return prev_candidate, fetch_start + index_in_fetched, next_candidate
353
354 def _AccumulateSampleIssues(self, issue_ids, sample_dict, needed_iids):
355 """Select a scattering of issues from the list, leveraging RAM cache."""
356 chunk_size = max(MIN_SAMPLE_CHUNK_SIZE, min(MAX_SAMPLE_CHUNK_SIZE,
357 int(len(issue_ids) / PREFERRED_NUM_CHUNKS)))
358 for i in range(chunk_size, len(issue_ids), chunk_size):
359 issue = self.services.issue.GetAnyOnHandIssue(
360 issue_ids, start=i, end=min(i + chunk_size, len(issue_ids)))
361 if issue:
362 sample_dict[issue.issue_id] = issue
363 else:
364 needed_iids.append(issue_ids[i])
365
366 def _LookupNeededUsers(self, issues):
367 """Look up user info needed to sort issues, if any."""
368 with self.profiler.Phase('lookup of owner, reporter, and cc'):
369 additional_user_views_by_id = (
370 tracker_helpers.MakeViewsForUsersInIssues(
371 self.mr.cnxn, issues, self.services.user,
372 omit_ids=self.users_by_id.keys()))
373 self.users_by_id.update(additional_user_views_by_id)
374
375 def Paginate(self):
376 """Fetch matching issues and paginate the search results.
377
378 These two actions are intertwined because we try to only
379 retrieve the Issues on the current pagination page.
380 """
381 if self.grid_mode:
382 # We don't paginate the grid view. But, pagination object shows counts.
383 self.pagination = paginate.ArtifactPagination(
384 self.mr, self.allowed_results, self.default_results_per_page,
385 total_count=self.total_count, list_page_url=urls.ISSUE_LIST)
386 # We limited the results, but still show the original total count.
387 self.visible_results = self.allowed_results
388
389 else:
390 # We already got the issues, just display a slice of the visible ones.
391 limit_reached = False
392 for shard_limit_reached in self.search_limit_reached.values():
393 limit_reached |= shard_limit_reached
394 self.pagination = paginate.ArtifactPagination(
395 self.mr, self.allowed_results, self.default_results_per_page,
396 total_count=self.total_count, list_page_url=urls.ISSUE_LIST,
397 limit_reached=limit_reached, skipped=self.num_skipped_at_start)
398 self.visible_results = self.pagination.visible_results
399
400 # If we were not forced to look up visible users already, do it now.
401 if self.grid_mode:
402 self._LookupNeededUsers(self.allowed_results)
403 else:
404 self._LookupNeededUsers(self.visible_results)
405
406 def __repr__(self):
407 """Return a string that shows the internal state of this pipeline."""
408 if self.allowed_iids:
409 shown_allowed_iids = self.allowed_iids[:200]
410 else:
411 shown_allowed_iids = self.allowed_iids
412
413 if self.allowed_results:
414 shown_allowed_results = self.allowed_results[:200]
415 else:
416 shown_allowed_results = self.allowed_results
417
418 parts = [
419 'allowed_iids: %r' % shown_allowed_iids,
420 'allowed_results: %r' % shown_allowed_results,
421 'len(visible_results): %r' % (
422 self.visible_results and len(self.visible_results))]
423 return '%s(%s)' % (self.__class__.__name__, '\n'.join(parts))
424
425
426 def _MakeBackendCallback(func, *args):
427 return lambda: func(*args)
428
429
430 def _StartBackendSearch(
431 mr, query_project_names, query_project_ids, harmonized_config,
432 unfiltered_iids_dict, search_limit_reached_dict,
433 nonviewable_iids, error_responses, services):
434 """Request that our backends search and return a list of matching issue IDs.
435
436 Args:
437 mr: commonly used info parsed from the request, including query and
438 sort spec.
439 query_project_names: set of project names to search.
440 query_project_ids: list of project IDs to search.
441 harmonized_config: combined ProjectIssueConfig for all projects being
442 searched.
443 unfiltered_iids_dict: dict {shard_id: [iid, ...]} of unfiltered search
444 results to accumulate into. They need to be later filtered by
445 permissions and merged into filtered_iids_dict.
446 search_limit_reached_dict: dict{shard_id: [bool, ...]} to determine if
447 the search limit of any shard was reached.
448 nonviewable_iids: dict {shard_id: set(iid)} of restricted issues in the
449 projects being searched that the signed in user cannot view.
450 services: connections to backends.
451
452 Returns:
453 A list of rpc_tuples that can be passed to _FinishBackendSearch to wait
454 on any remaining backend calls.
455
456 SIDE-EFFECTS:
457 Any data found in memcache is immediately put into unfiltered_iids_dict.
458 As the backends finish their work, _HandleBackendSearchResponse will update
459 unfiltered_iids_dict for those shards.
460 """
461 rpc_tuples = []
462 needed_shard_ids = set(range(settings.num_logical_shards))
463
464 # 1. Get whatever we can from memcache. Cache hits are only kept if they are
465 # not already expired. Each kept cache hit will have unfiltered IIDs, so we
466 # need to get the at-risk IIDs to efficiently filter them based on perms.
467 project_shard_timestamps = _GetProjectTimestamps(
468 query_project_ids, needed_shard_ids)
469
470 if mr.use_cached_searches:
471 cached_unfiltered_iids_dict, cached_search_limit_reached_dict = (
472 _GetCachedSearchResults(
473 mr, query_project_ids, needed_shard_ids, harmonized_config,
474 project_shard_timestamps, services))
475 unfiltered_iids_dict.update(cached_unfiltered_iids_dict)
476 search_limit_reached_dict.update(cached_search_limit_reached_dict)
477 for cache_hit_shard_id in unfiltered_iids_dict:
478 needed_shard_ids.remove(cache_hit_shard_id)
479
480 _GetNonviewableIIDs(
481 query_project_ids, mr.auth.user_id, set(range(settings.num_logical_shards)),
482 rpc_tuples, nonviewable_iids, project_shard_timestamps,
483 services.cache_manager.processed_invalidations_up_to,
484 mr.use_cached_searches)
485
486 # 2. Hit backends for any shards that are still needed. When these results
487 # come back, they are also put into unfiltered_iids_dict..
488 for shard_id in needed_shard_ids:
489 rpc = _StartBackendSearchCall(
490 mr, query_project_names, shard_id,
491 services.cache_manager.processed_invalidations_up_to)
492 rpc_tuple = (time.time(), shard_id, rpc)
493 rpc.callback = _MakeBackendCallback(
494 _HandleBackendSearchResponse, mr, query_project_names, rpc_tuple,
495 rpc_tuples, settings.backend_retries, unfiltered_iids_dict,
496 search_limit_reached_dict,
497 services.cache_manager.processed_invalidations_up_to,
498 error_responses)
499 rpc_tuples.append(rpc_tuple)
500
501 return rpc_tuples
502
503
504 def _FinishBackendSearch(rpc_tuples):
505 """Wait for all backend calls to complete, including any retries."""
506 while rpc_tuples:
507 active_rpcs = [rpc for (_time, _shard_id, rpc) in rpc_tuples]
508 # Wait for any active RPC to complete. It's callback function will
509 # automatically be called.
510 finished_rpc = apiproxy_stub_map.UserRPC.wait_any(active_rpcs)
511 # Figure out which rpc_tuple finished and remove it from our list.
512 for rpc_tuple in rpc_tuples:
513 _time, _shard_id, rpc = rpc_tuple
514 if rpc == finished_rpc:
515 rpc_tuples.remove(rpc_tuple)
516 break
517 else:
518 raise ValueError('We somehow finished an RPC that is not in rpc_tuples')
519
520
521 def _GetProjectTimestamps(query_project_ids, needed_shard_ids):
522 """Get a dict of modified_ts values for all specified project-shards."""
523 project_shard_timestamps = {}
524 if query_project_ids:
525 keys = []
526 for pid in query_project_ids:
527 for sid in needed_shard_ids:
528 keys.append('%d;%d' % (pid, sid))
529 else:
530 keys = [('all;%d' % sid) for sid in needed_shard_ids]
531
532 timestamps_for_project = memcache.get_multi(keys=keys)
533 for key, timestamp in timestamps_for_project.iteritems():
534 pid_str, sid_str = key.split(';')
535 if pid_str == 'all':
536 project_shard_timestamps['all', int(sid_str)] = timestamp
537 else:
538 project_shard_timestamps[int(pid_str), int(sid_str)] = timestamp
539
540 return project_shard_timestamps
541
542
543 def _GetNonviewableIIDs(
544 query_project_ids, logged_in_user_id, needed_shard_ids, rpc_tuples,
545 nonviewable_iids, project_shard_timestamps, invalidation_timestep,
546 use_cached_searches):
547 """Build a set of at-risk IIDs, and accumulate RPCs to get uncached ones."""
548 if query_project_ids:
549 keys = []
550 for pid in query_project_ids:
551 for sid in needed_shard_ids:
552 keys.append('%d;%d;%d' % (pid, logged_in_user_id, sid))
553 else:
554 keys = [('all;%d;%d' % sid)
555 for (logged_in_user_id, sid) in needed_shard_ids]
556
557 if use_cached_searches:
558 cached_dict = memcache.get_multi(keys, key_prefix='nonviewable:')
559 else:
560 cached_dict = {}
561
562 for sid in needed_shard_ids:
563 if query_project_ids:
564 for pid in query_project_ids:
565 _AccumulateNonviewableIIDs(
566 pid, logged_in_user_id, sid, cached_dict, nonviewable_iids,
567 project_shard_timestamps, rpc_tuples, invalidation_timestep)
568 else:
569 _AccumulateNonviewableIIDs(
570 None, logged_in_user_id, sid, cached_dict, nonviewable_iids,
571 project_shard_timestamps, rpc_tuples, invalidation_timestep)
572
573
574 def _AccumulateNonviewableIIDs(
575 pid, logged_in_user_id, sid, cached_dict, nonviewable_iids,
576 project_shard_timestamps, rpc_tuples, invalidation_timestep):
577 """Use one of the retrieved cache entries or call a backend if needed."""
578 if pid is None:
579 key = 'all;%d;%d' % (logged_in_user_id, sid)
580 else:
581 key = '%d;%d;%d' % (pid, logged_in_user_id, sid)
582
583 if key in cached_dict:
584 issue_ids, cached_ts = cached_dict.get(key)
585 modified_ts = project_shard_timestamps.get((pid, sid))
586 if modified_ts is None or modified_ts > cached_ts:
587 logging.info('nonviewable too stale on (project %r, shard %r)',
588 pid, sid)
589 else:
590 logging.info('adding %d nonviewable issue_ids', len(issue_ids))
591 nonviewable_iids[sid] = set(issue_ids)
592
593 if sid not in nonviewable_iids:
594 logging.info('nonviewable for %r not found', key)
595 logging.info('starting backend call for nonviewable iids %r', key)
596 rpc = _StartBackendNonviewableCall(
597 pid, logged_in_user_id, sid, invalidation_timestep)
598 rpc_tuple = (time.time(), sid, rpc)
599 rpc.callback = _MakeBackendCallback(
600 _HandleBackendNonviewableResponse, pid, logged_in_user_id, sid,
601 rpc_tuple, rpc_tuples, settings.backend_retries, nonviewable_iids,
602 invalidation_timestep)
603 rpc_tuples.append(rpc_tuple)
604
605
606 def _GetCachedSearchResults(
607 mr, query_project_ids, needed_shard_ids, harmonized_config,
608 project_shard_timestamps, services):
609 """Return a dict of cached search results that are not already stale.
610
611 If it were not for cross-project search, we would simply cache when we do a
612 search and then invalidate when an issue is modified. But, with
613 cross-project search we don't know all the memcache entries that would
614 need to be invalidated. So, instead, we write the search result cache
615 entries and then an initial modified_ts value for each project if it was
616 not already there. And, when we update an issue we write a new
617 modified_ts entry, which implicitly invalidate all search result
618 cache entries that were written earlier because they are now stale. When
619 reading from the cache, we ignore any query project with modified_ts
620 after its search result cache timestamp, because it is stale.
621
622 Args:
623 mr: common information parsed from the request.
624 query_project_ids: list of project ID numbers for all projects being
625 searched.
626 needed_shard_ids: set of shard IDs that need to be checked.
627 harmonized_config: ProjectIsueConfig with combined information for all
628 projects involved in this search.
629 project_shard_timestamps: a dict {(project_id, shard_id): timestamp, ...}
630 that tells when each shard was last invalidated.
631 services: connections to backends.
632
633 Returns:
634 Tuple consisting of:
635 A dictionary {shard_id: [issue_id, ...], ...} of unfiltered search result
636 issue IDs. Only shard_ids found in memcache will be in that dictionary.
637 The result issue IDs must be permission checked before they can be
638 considered to be part of the user's result set.
639 A dictionary {shard_id: bool, ...}. The boolean is set to True if
640 the search results limit of the shard is hit.
641 """
642 projects_str = ','.join(str(pid) for pid in sorted(query_project_ids))
643 projects_str = projects_str or 'all'
644 canned_query = savedqueries_helpers.SavedQueryIDToCond(
645 mr.cnxn, services.features, mr.can)
646 logging.info('canned query is %r', canned_query)
647 canned_query = searchpipeline.ReplaceKeywordsWithUserID(
648 mr.me_user_id, canned_query)
649 user_query = searchpipeline.ReplaceKeywordsWithUserID(
650 mr.me_user_id, mr.query)
651
652 sd = sorting.ComputeSortDirectives(mr, harmonized_config)
653 memcache_prefix = ';'.join([projects_str, canned_query, user_query,
654 ' '.join(sd), ''])
655 cached_dict = memcache.get_multi(
656 [str(sid) for sid in needed_shard_ids], key_prefix=memcache_prefix)
657 search_limit_memcache_prefix = ';'.join(
658 [projects_str, canned_query, user_query,
659 ' '.join(sd), 'search_limit_reached', ''])
660 cached_search_limit_reached_dict = memcache.get_multi(
661 [str(sid) for sid in needed_shard_ids],
662 key_prefix=search_limit_memcache_prefix)
663
664 unfiltered_dict = {}
665 search_limit_reached_dict = {}
666 for shard_id in needed_shard_ids:
667 if str(shard_id) not in cached_dict:
668 logging.info('memcache miss on shard %r', shard_id)
669 continue
670
671 cached_iids, cached_ts = cached_dict[str(shard_id)]
672 if cached_search_limit_reached_dict.get(str(shard_id)):
673 search_limit_reached, _ = cached_search_limit_reached_dict[str(shard_id)]
674 else:
675 search_limit_reached = False
676
677 stale = False
678 if query_project_ids:
679 for project_id in query_project_ids:
680 modified_ts = project_shard_timestamps.get((project_id, shard_id))
681 if modified_ts is None or modified_ts > cached_ts:
682 stale = True
683 logging.info('memcache too stale on shard %r because of %r',
684 shard_id, project_id)
685 break
686 else:
687 modified_ts = project_shard_timestamps.get(('all', shard_id))
688 if modified_ts is None or modified_ts > cached_ts:
689 stale = True
690 logging.info('memcache too stale on shard %r because of all',
691 shard_id)
692
693 if not stale:
694 logging.info('memcache hit on %r', shard_id)
695 unfiltered_dict[shard_id] = cached_iids
696 search_limit_reached_dict[shard_id] = search_limit_reached
697
698 return unfiltered_dict, search_limit_reached_dict
699
700
701 def _MakeBackendRequestHeaders(failfast):
702 headers = {
703 # This is needed to allow frontends to talk to backends without going
704 # through a login screen on googleplex.com.
705 # http://wiki/Main/PrometheusInternal#Internal_Applications_and_APIs
706 'X-URLFetch-Service-Id': 'GOOGLEPLEX',
707 }
708 if failfast:
709 headers['X-AppEngine-FailFast'] = 'Yes'
710 return headers
711
712
713 def _StartBackendSearchCall(
714 mr, query_project_names, shard_id, invalidation_timestep,
715 deadline=None, failfast=True):
716 """Ask a backend to query one shard of the database."""
717 backend_host = modules.get_hostname(module='besearch')
718 url = 'http://%s%s' % (backend_host, framework_helpers.FormatURL(
719 mr, urls.BACKEND_SEARCH,
720 skip_filtering=True, # TODO(jrobbins): remove after next release.
721 projects=','.join(query_project_names),
722 start=0, num=mr.start + mr.num,
723 logged_in_user_id=mr.auth.user_id or 0,
724 me_user_id=mr.me_user_id, shard_id=shard_id,
725 invalidation_timestep=invalidation_timestep))
726 logging.info('\n\nCalling backend: %s', url)
727 rpc = urlfetch.create_rpc(
728 deadline=deadline or settings.backend_deadline)
729 headers = _MakeBackendRequestHeaders(failfast)
730 # follow_redirects=False is needed to avoid a login screen on googleplex.
731 urlfetch.make_fetch_call(rpc, url, follow_redirects=False, headers=headers)
732 return rpc
733
734
735 def _StartBackendNonviewableCall(
736 project_id, logged_in_user_id, shard_id, invalidation_timestep,
737 deadline=None, failfast=True):
738 """Ask a backend to query one shard of the database."""
739 backend_host = modules.get_hostname(module='besearch')
740 url = 'http://%s%s' % (backend_host, framework_helpers.FormatURL(
741 None, urls.BACKEND_NONVIEWABLE,
742 project_id=project_id or '',
743 logged_in_user_id=logged_in_user_id or '',
744 shard_id=shard_id,
745 invalidation_timestep=invalidation_timestep))
746 logging.info('Calling backend nonviewable: %s', url)
747 rpc = urlfetch.create_rpc(deadline=deadline or settings.backend_deadline)
748 headers = _MakeBackendRequestHeaders(failfast)
749 # follow_redirects=False is needed to avoid a login screen on googleplex.
750 urlfetch.make_fetch_call(rpc, url, follow_redirects=False, headers=headers)
751 return rpc
752
753
754 def _HandleBackendSearchResponse(
755 mr, query_project_names, rpc_tuple, rpc_tuples, remaining_retries,
756 unfiltered_iids, search_limit_reached, invalidation_timestep,
757 error_responses):
758 """Process one backend response and retry if there was an error."""
759 start_time, shard_id, rpc = rpc_tuple
760 duration_sec = time.time() - start_time
761
762 try:
763 response = rpc.get_result()
764 logging.info('call to backend took %d sec', duration_sec)
765 # Note that response.content has "})]'\n" prepended to it.
766 json_content = response.content[5:]
767 logging.info('got json text: %r length %r',
768 json_content[:framework_constants.LOGGING_MAX_LENGTH],
769 len(json_content))
770 json_data = json.loads(json_content)
771 unfiltered_iids[shard_id] = json_data['unfiltered_iids']
772 search_limit_reached[shard_id] = json_data['search_limit_reached']
773
774 except Exception as e:
775 if duration_sec > FAIL_FAST_LIMIT_SEC: # Don't log fail-fast exceptions.
776 logging.exception(e)
777 if not remaining_retries:
778 logging.error('backend search retries exceeded')
779 error_responses.add(shard_id)
780 return # Used all retries, so give up.
781
782 if duration_sec >= settings.backend_deadline:
783 logging.error('backend search on %r took too long', shard_id)
784 error_responses.add(shard_id)
785 return # That backend shard is overloaded, so give up.
786
787 logging.error('backend call for shard %r failed, retrying', shard_id)
788 retry_rpc = _StartBackendSearchCall(
789 mr, query_project_names, shard_id, invalidation_timestep,
790 failfast=remaining_retries > 2)
791 retry_rpc_tuple = (time.time(), shard_id, retry_rpc)
792 retry_rpc.callback = _MakeBackendCallback(
793 _HandleBackendSearchResponse, mr, query_project_names,
794 retry_rpc_tuple, rpc_tuples, remaining_retries - 1, unfiltered_iids,
795 search_limit_reached, invalidation_timestep, error_responses)
796 rpc_tuples.append(retry_rpc_tuple)
797
798
799 def _HandleBackendNonviewableResponse(
800 project_id, logged_in_user_id, shard_id, rpc_tuple, rpc_tuples,
801 remaining_retries, nonviewable_iids, invalidation_timestep):
802 """Process one backend response and retry if there was an error."""
803 start_time, shard_id, rpc = rpc_tuple
804 duration_sec = time.time() - start_time
805
806 try:
807 response = rpc.get_result()
808 logging.info('call to backend nonviewable took %d sec', duration_sec)
809 # Note that response.content has "})]'\n" prepended to it.
810 json_content = response.content[5:]
811 logging.info('got json text: %r length %r',
812 json_content[:framework_constants.LOGGING_MAX_LENGTH],
813 len(json_content))
814 json_data = json.loads(json_content)
815 nonviewable_iids[shard_id] = set(json_data['nonviewable'])
816
817 except Exception as e:
818 if duration_sec > FAIL_FAST_LIMIT_SEC: # Don't log fail-fast exceptions.
819 logging.exception(e)
820
821 if not remaining_retries:
822 logging.warn('Used all retries, so give up on shard %r', shard_id)
823 return
824
825 if duration_sec >= settings.backend_deadline:
826 logging.error('nonviewable call on %r took too long', shard_id)
827 return # That backend shard is overloaded, so give up.
828
829 logging.error(
830 'backend nonviewable call for shard %r;%r;%r failed, retrying',
831 project_id, logged_in_user_id, shard_id)
832 retry_rpc = _StartBackendNonviewableCall(
833 project_id, logged_in_user_id, shard_id, invalidation_timestep,
834 failfast=remaining_retries > 2)
835 retry_rpc_tuple = (time.time(), shard_id, retry_rpc)
836 retry_rpc.callback = _MakeBackendCallback(
837 _HandleBackendNonviewableResponse, project_id, logged_in_user_id,
838 shard_id, retry_rpc_tuple, rpc_tuples, remaining_retries - 1,
839 nonviewable_iids, invalidation_timestep)
840 rpc_tuples.append(retry_rpc_tuple)
841
842
843 def _TotalLength(sharded_iids):
844 """Return the total length of all issue_iids lists."""
845 return sum(len(issue_iids) for issue_iids in sharded_iids.itervalues())
846
847
848 def _ReverseShards(sharded_iids):
849 """Reverse each issue_iids list in place."""
850 for shard_id in sharded_iids:
851 sharded_iids[shard_id].reverse()
852
853
854 def _TrimEndShardedIIDs(sharded_iids, sample_iids, num_needed):
855 """Trim the IIDs to keep at least num_needed items.
856
857 Args:
858 sharded_iids: dict {shard_id: issue_id_list} for search results. This is
859 modified in place to remove some trailing issue IDs.
860 sample_iids: list of IIDs from a sorted list of sample issues.
861 num_needed: int minimum total number of items to keep. Some IIDs that are
862 known to belong in positions > num_needed will be trimmed off.
863
864 Returns:
865 The total number of IIDs removed from the IID lists.
866 """
867 # 1. Get (sample_iid, position_in_shard) for each sample.
868 sample_positions = _CalcSamplePositions(sharded_iids, sample_iids)
869
870 # 2. Walk through the samples, computing a combined lower bound at each
871 # step until we know that we have passed at least num_needed IIDs.
872 lower_bound_per_shard = {}
873 excess_samples = []
874 for i in range(len(sample_positions)):
875 sample_iid, pos = sample_positions[i]
876 shard_id = sample_iid % settings.num_logical_shards
877 lower_bound_per_shard[shard_id] = pos
878 overall_lower_bound = sum(lower_bound_per_shard.itervalues())
879 if overall_lower_bound >= num_needed:
880 excess_samples = sample_positions[i + 1:]
881 break
882 else:
883 return 0 # We went through all samples and never reached num_needed.
884
885 # 3. Truncate each shard at the first excess sample in that shard.
886 already_trimmed = set()
887 num_trimmed = 0
888 for sample_iid, pos in excess_samples:
889 shard_id = sample_iid % settings.num_logical_shards
890 if shard_id not in already_trimmed:
891 num_trimmed += len(sharded_iids[shard_id]) - pos
892 sharded_iids[shard_id] = sharded_iids[shard_id][:pos]
893 already_trimmed.add(shard_id)
894
895 return num_trimmed
896
897
898 # TODO(jrobbins): Convert this to a python generator.
899 def _CalcSamplePositions(sharded_iids, sample_iids):
900 """Return [(sample_iid, position_in_shard), ...] for each sample."""
901 # We keep track of how far index() has scanned in each shard to avoid
902 # starting over at position 0 when looking for the next sample in
903 # the same shard.
904 scan_positions = collections.defaultdict(lambda: 0)
905 sample_positions = []
906 for sample_iid in sample_iids:
907 shard_id = sample_iid % settings.num_logical_shards
908 try:
909 pos = sharded_iids.get(shard_id, []).index(
910 sample_iid, scan_positions[shard_id])
911 scan_positions[shard_id] = pos
912 sample_positions.append((sample_iid, pos))
913 except ValueError:
914 pass
915
916 return sample_positions
917
918
919 def _SortIssues(mr, issues, config, users_by_id):
920 """Sort the found issues based on the request and config values.
921
922 Args:
923 mr: common information parsed from the HTTP request.
924 issues: A list of issues to be sorted.
925 config: A ProjectIssueConfig that could impact sort order.
926 users_by_id: dictionary {user_id: user_view,...} for all users who
927 participate in any issue in the entire list.
928
929 Returns:
930 A sorted list of issues, based on parameters from mr and config.
931 """
932 issues = sorting.SortArtifacts(
933 mr, issues, config, tracker_helpers.SORTABLE_FIELDS,
934 username_cols=tracker_constants.USERNAME_COLS, users_by_id=users_by_id)
935 return issues
OLDNEW
« no previous file with comments | « appengine/monorail/search/backendsearchpipeline.py ('k') | appengine/monorail/search/query2ast.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698