Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(112)

Side by Side Diff: appengine/tsmon/handler.go

Issue 2617903005: Remove dsQueryBatch in favor of ds.Batcher. (Closed)
Patch Set: Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/datastorecache/manager.go ('k') | appengine/tsmon/handler_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The LUCI Authors. All rights reserved. 1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package tsmon 5 package tsmon
6 6
7 import ( 7 import (
8 "net/http" 8 "net/http"
9 9
10 "golang.org/x/net/context" 10 "golang.org/x/net/context"
11 11
12 "github.com/luci/gae/filter/dsQueryBatch"
13 ds "github.com/luci/gae/service/datastore" 12 ds "github.com/luci/gae/service/datastore"
14 "github.com/luci/gae/service/info" 13 "github.com/luci/gae/service/info"
15 14
16 "github.com/luci/luci-go/common/clock" 15 "github.com/luci/luci-go/common/clock"
17 "github.com/luci/luci-go/common/logging" 16 "github.com/luci/luci-go/common/logging"
18 "github.com/luci/luci-go/common/tsmon" 17 "github.com/luci/luci-go/common/tsmon"
19 "github.com/luci/luci-go/server/router" 18 "github.com/luci/luci-go/server/router"
20 ) 19 )
21 20
22 const housekeepingInstanceBatchSize = 200
23
24 // InstallHandlers installs HTTP handlers for tsmon routes. 21 // InstallHandlers installs HTTP handlers for tsmon routes.
25 func InstallHandlers(r *router.Router, base router.MiddlewareChain) { 22 func InstallHandlers(r *router.Router, base router.MiddlewareChain) {
26 r.GET("/internal/cron/ts_mon/housekeeping", base, housekeepingHandler) 23 r.GET("/internal/cron/ts_mon/housekeeping", base, housekeepingHandler)
27 } 24 }
28 25
29 // housekeepingHandler is an HTTP handler that should be run every minute by 26 // housekeepingHandler is an HTTP handler that should be run every minute by
30 // cron on App Engine. It assigns task numbers to datastore entries, and runs 27 // cron on App Engine. It assigns task numbers to datastore entries, and runs
31 // any global metric callbacks. 28 // any global metric callbacks.
32 func housekeepingHandler(c *router.Context) { 29 func housekeepingHandler(c *router.Context) {
33 if !info.IsDevAppServer(c.Context) && c.Request.Header.Get("X-Appengine- Cron") != "true" { 30 if !info.IsDevAppServer(c.Context) && c.Request.Header.Get("X-Appengine- Cron") != "true" {
(...skipping 14 matching lines...) Expand all
48 // and expiring old entities. 45 // and expiring old entities.
49 func assignTaskNumbers(c context.Context) error { 46 func assignTaskNumbers(c context.Context) error {
50 c = info.MustNamespace(c, instanceNamespace) 47 c = info.MustNamespace(c, instanceNamespace)
51 48
52 now := clock.Now(c) 49 now := clock.Now(c)
53 expiredTime := now.Add(-instanceExpirationTimeout) 50 expiredTime := now.Add(-instanceExpirationTimeout)
54 51
55 usedTaskNums := map[int]struct{}{} 52 usedTaskNums := map[int]struct{}{}
56 totalExpired := 0 53 totalExpired := 0
57 54
58 » expiredKeys := make([]*ds.Key, 0, housekeepingInstanceBatchSize) 55 » expiredKeys := make([]*ds.Key, 0, ds.Raw(c).Constraints().QueryBatchSize )
59 var unassigned []*instance 56 var unassigned []*instance
60 57
61 // expireInstanceBatch processes the set of instances in "expiredKeys", 58 // expireInstanceBatch processes the set of instances in "expiredKeys",
62 // deletes them, and clears the list for the next iteration. 59 // deletes them, and clears the list for the next iteration.
63 // 60 //
64 // We do this in batches to handle large numbers without inflating memor y 61 // We do this in batches to handle large numbers without inflating memor y
65 // requirements. If there are any timeouts or problems, this will also e nable 62 // requirements. If there are any timeouts or problems, this will also e nable
66 // us to iteratively chip away at the problem. 63 // us to iteratively chip away at the problem.
67 expireInstanceBatch := func(c context.Context) error { 64 expireInstanceBatch := func(c context.Context) error {
68 if len(expiredKeys) == 0 { 65 if len(expiredKeys) == 0 {
69 return nil 66 return nil
70 } 67 }
71 68
72 logging.Debugf(c, "Expiring %d instance(s)", len(expiredKeys)) 69 logging.Debugf(c, "Expiring %d instance(s)", len(expiredKeys))
73 if err := ds.Delete(c, expiredKeys); err != nil { 70 if err := ds.Delete(c, expiredKeys); err != nil {
74 logging.WithError(err).Errorf(c, "Failed to expire insta nces.") 71 logging.WithError(err).Errorf(c, "Failed to expire insta nces.")
75 return err 72 return err
76 } 73 }
77 74
78 // Clear the instances list for next round. 75 // Clear the instances list for next round.
79 totalExpired += len(expiredKeys) 76 totalExpired += len(expiredKeys)
80 expiredKeys = expiredKeys[:0] 77 expiredKeys = expiredKeys[:0]
81 return nil 78 return nil
82 } 79 }
83 80
84 // Query all instances from datastore. 81 // Query all instances from datastore.
85 q := ds.NewQuery("Instance") 82 q := ds.NewQuery("Instance")
86 » if err := ds.Run(dsQueryBatch.BatchQueries(c, housekeepingInstanceBatchS ize, expireInstanceBatch), q, func(i *instance) { 83
84 » b := ds.Batcher{
85 » » Callback: expireInstanceBatch,
86 » }
87 » if err := b.Run(c, q, func(i *instance) {
87 if i.TaskNum >= 0 { 88 if i.TaskNum >= 0 {
88 usedTaskNums[i.TaskNum] = struct{}{} 89 usedTaskNums[i.TaskNum] = struct{}{}
89 } 90 }
90 if i.LastUpdated.Before(expiredTime) { 91 if i.LastUpdated.Before(expiredTime) {
91 expiredKeys = append(expiredKeys, ds.NewKey(c, "Instance ", i.ID, 0, nil)) 92 expiredKeys = append(expiredKeys, ds.NewKey(c, "Instance ", i.ID, 0, nil))
92 logging.Debugf(c, "Expiring %s task_num %d, inactive sin ce %s", 93 logging.Debugf(c, "Expiring %s task_num %d, inactive sin ce %s",
93 i.ID, i.TaskNum, i.LastUpdated.String()) 94 i.ID, i.TaskNum, i.LastUpdated.String())
94 } else if i.TaskNum < 0 { 95 } else if i.TaskNum < 0 {
95 unassigned = append(unassigned, i) 96 unassigned = append(unassigned, i)
96 } 97 }
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
129 for { 130 for {
130 n := next 131 n := next
131 next++ 132 next++
132 _, has := used[n] 133 _, has := used[n]
133 if !has { 134 if !has {
134 return n 135 return n
135 } 136 }
136 } 137 }
137 } 138 }
138 } 139 }
OLDNEW
« no previous file with comments | « appengine/datastorecache/manager.go ('k') | appengine/tsmon/handler_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698