Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(79)

Side by Side Diff: dm/appengine/deps/walk_graph.go

Issue 2617903005: Remove dsQueryBatch in favor of ds.Batcher. (Closed)
Patch Set: Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/tsmon/handler_test.go ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package deps 5 package deps
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "math" 9 "math"
10 "sort" 10 "sort"
11 "sync" 11 "sync"
12 "time" 12 "time"
13 13
14 "golang.org/x/net/context" 14 "golang.org/x/net/context"
15 "google.golang.org/grpc/codes" 15 "google.golang.org/grpc/codes"
16 16
17 "github.com/luci/gae/filter/dsQueryBatch"
18 ds "github.com/luci/gae/service/datastore" 17 ds "github.com/luci/gae/service/datastore"
18
19 "github.com/luci/luci-go/common/clock" 19 "github.com/luci/luci-go/common/clock"
20 "github.com/luci/luci-go/common/logging" 20 "github.com/luci/luci-go/common/logging"
21 "github.com/luci/luci-go/common/sync/parallel" 21 "github.com/luci/luci-go/common/sync/parallel"
22 dm "github.com/luci/luci-go/dm/api/service/v1" 22 dm "github.com/luci/luci-go/dm/api/service/v1"
23 "github.com/luci/luci-go/dm/appengine/distributor" 23 "github.com/luci/luci-go/dm/appengine/distributor"
24 "github.com/luci/luci-go/dm/appengine/model" 24 "github.com/luci/luci-go/dm/appengine/model"
25 "github.com/luci/luci-go/grpc/grpcutil" 25 "github.com/luci/luci-go/grpc/grpcutil"
26 ) 26 )
27 27
28 const numWorkers = 16 28 const numWorkers = 16
29 29
30 const maxTimeout = 55 * time.Second // GAE limit is 60s 30 const maxTimeout = 55 * time.Second // GAE limit is 60s
31 31
32 // queryBatcher is a default datastore Batcher instance for queries.
33 //
34 // we end up doing a lot of long queries in here, so use a batcher to
35 // prevent datastore timeouts.
36 var queryBatcher ds.Batcher
37
32 type node struct { 38 type node struct {
33 aid *dm.Attempt_ID 39 aid *dm.Attempt_ID
34 depth int64 40 depth int64
35 canSeeAttemptResult bool 41 canSeeAttemptResult bool
36 } 42 }
37 43
38 func (g *graphWalker) runSearchQuery(send func(*dm.Attempt_ID) error, s *dm.Grap hQuery_Search) func() error { 44 func (g *graphWalker) runSearchQuery(send func(*dm.Attempt_ID) error, s *dm.Grap hQuery_Search) func() error {
39 return func() error { 45 return func() error {
40 logging.Errorf(g, "SearchQuery not implemented") 46 logging.Errorf(g, "SearchQuery not implemented")
41 return grpcutil.Errf(codes.Unimplemented, "GraphQuery.Search is not implemented") 47 return grpcutil.Errf(codes.Unimplemented, "GraphQuery.Search is not implemented")
42 } 48 }
43 } 49 }
44 50
45 func isCtxErr(err error) bool { 51 func isCtxErr(err error) bool {
46 return err == context.Canceled || err == context.DeadlineExceeded 52 return err == context.Canceled || err == context.DeadlineExceeded
47 } 53 }
48 54
49 func (g *graphWalker) runAttemptListQuery(send func(*dm.Attempt_ID) error) func( ) error { 55 func (g *graphWalker) runAttemptListQuery(send func(*dm.Attempt_ID) error) func( ) error {
50 return func() error { 56 return func() error {
51 for qst, anum := range g.req.Query.AttemptList.To { 57 for qst, anum := range g.req.Query.AttemptList.To {
52 if len(anum.Nums) == 0 { 58 if len(anum.Nums) == 0 {
53 qry := model.QueryAttemptsForQuest(g, qst) 59 qry := model.QueryAttemptsForQuest(g, qst)
54 if !g.req.Include.Attempt.Abnormal { 60 if !g.req.Include.Attempt.Abnormal {
55 qry = qry.Eq("IsAbnormal", false) 61 qry = qry.Eq("IsAbnormal", false)
56 } 62 }
57 if !g.req.Include.Attempt.Expired { 63 if !g.req.Include.Attempt.Expired {
58 qry = qry.Eq("IsExpired", false) 64 qry = qry.Eq("IsExpired", false)
59 } 65 }
60 » » » » err := ds.Run(g, qry, func(k *ds.Key) error { 66 » » » » err := queryBatcher.Run(g, qry, func(k *ds.Key) error {
61 aid := &dm.Attempt_ID{} 67 aid := &dm.Attempt_ID{}
62 if err := aid.SetDMEncoded(k.StringID()) ; err != nil { 68 if err := aid.SetDMEncoded(k.StringID()) ; err != nil {
63 logging.WithError(err).Errorf(g, "Attempt_ID.SetDMEncoded returned an error with input: %q", k.StringID()) 69 logging.WithError(err).Errorf(g, "Attempt_ID.SetDMEncoded returned an error with input: %q", k.StringID())
64 panic(fmt.Errorf("in AttemptList Query: %s", err)) 70 panic(fmt.Errorf("in AttemptList Query: %s", err))
65 } 71 }
66 return send(aid) 72 return send(aid)
67 }) 73 })
68 if err != nil { 74 if err != nil {
69 if !isCtxErr(err) { 75 if !isCtxErr(err) {
70 logging.WithError(err).Errorf(g, "in AttemptListQuery") 76 logging.WithError(err).Errorf(g, "in AttemptListQuery")
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
106 logging.Fields{ek: err, "qid": qid}.Errorf(g, "while loa ding quest") 112 logging.Fields{ek: err, "qid": qid}.Errorf(g, "while loa ding quest")
107 return err 113 return err
108 } 114 }
109 dst.Data = qst.DataProto() 115 dst.Data = qst.DataProto()
110 dst.Partial = false 116 dst.Partial = false
111 return nil 117 return nil
112 } 118 }
113 } 119 }
114 120
115 func (g *graphWalker) loadEdges(send func(*dm.Attempt_ID) error, typ string, bas e *ds.Key, fan *dm.AttemptList, doSend bool) error { 121 func (g *graphWalker) loadEdges(send func(*dm.Attempt_ID) error, typ string, bas e *ds.Key, fan *dm.AttemptList, doSend bool) error {
116 » return ds.Run(g, ds.NewQuery(typ).Ancestor(base), func(k *ds.Key) error { 122 » return queryBatcher.Run(g, ds.NewQuery(typ).Ancestor(base), func(k *ds.K ey) error {
117 if g.Err() != nil { 123 if g.Err() != nil {
118 return ds.Stop 124 return ds.Stop
119 } 125 }
120 aid := &dm.Attempt_ID{} 126 aid := &dm.Attempt_ID{}
121 if err := aid.SetDMEncoded(k.StringID()); err != nil { 127 if err := aid.SetDMEncoded(k.StringID()); err != nil {
122 logging.WithError(err).Errorf(g, "Attempt_ID.SetDMEncode d returned an error with input: %q", k.StringID()) 128 logging.WithError(err).Errorf(g, "Attempt_ID.SetDMEncode d returned an error with input: %q", k.StringID())
123 panic(fmt.Errorf("in AttemptListQuery: %s", err)) 129 panic(fmt.Errorf("in AttemptListQuery: %s", err))
124 } 130 }
125 if fan != nil { 131 if fan != nil {
126 fan.AddAIDs(aid) 132 fan.AddAIDs(aid)
(...skipping 20 matching lines...) Expand all
147 } else { 153 } else {
148 q = q.Limit(int32(numEx)) 154 q = q.Limit(int32(numEx))
149 } 155 }
150 if !g.req.Include.Execution.Abnormal { 156 if !g.req.Include.Execution.Abnormal {
151 q = q.Eq("IsAbnormal", false) 157 q = q.Eq("IsAbnormal", false)
152 } 158 }
153 if !g.req.Include.Execution.Expired { 159 if !g.req.Include.Execution.Expired {
154 q = q.Eq("IsExpired", false) 160 q = q.Eq("IsExpired", false)
155 } 161 }
156 162
157 » return ds.Run(g, q, func(e *model.Execution) error { 163 » return queryBatcher.Run(g, q, func(e *model.Execution) error {
158 if g.Err() != nil { 164 if g.Err() != nil {
159 return ds.Stop 165 return ds.Stop
160 } 166 }
161 p := e.ToProto(g.req.Include.Execution.Ids) 167 p := e.ToProto(g.req.Include.Execution.Ids)
162 d, err := g.getDist(p.Data.DistributorInfo.ConfigName) 168 d, err := g.getDist(p.Data.DistributorInfo.ConfigName)
163 if err != nil { 169 if err != nil {
164 return err 170 return err
165 } 171 }
166 p.Data.DistributorInfo.Url = d.InfoURL(distributor.Token(p.Data. DistributorInfo.Token)) 172 p.Data.DistributorInfo.Url = d.InfoURL(distributor.Token(p.Data. DistributorInfo.Token))
167 if d := p.Data.GetFinished().GetData(); d != nil { 173 if d := p.Data.GetFinished().GetData(); d != nil {
(...skipping 245 matching lines...) Expand 10 before | Expand all | Expand 10 after
413 return idx < len(nums) && nums[idx] == aid.Id 419 return idx < len(nums) && nums[idx] == aid.Id
414 } 420 }
415 421
416 func doGraphWalk(c context.Context, req *dm.WalkGraphReq) (rsp *dm.GraphData, er r error) { 422 func doGraphWalk(c context.Context, req *dm.WalkGraphReq) (rsp *dm.GraphData, er r error) {
417 cncl := (func())(nil) 423 cncl := (func())(nil)
418 timeoutProto := req.Limit.MaxTime 424 timeoutProto := req.Limit.MaxTime
419 timeout := timeoutProto.Duration() // .Duration on nil is OK 425 timeout := timeoutProto.Duration() // .Duration on nil is OK
420 if timeoutProto == nil || timeout > maxTimeout { 426 if timeoutProto == nil || timeout > maxTimeout {
421 timeout = maxTimeout 427 timeout = maxTimeout
422 } 428 }
423 // we end up doing a lot of long queries in here, so install a batcher t o
424 // prevent datastore timeouts.
425 c = dsQueryBatch.BatchQueries(c, 100)
426 c, cncl = clock.WithTimeout(c, timeout) 429 c, cncl = clock.WithTimeout(c, timeout)
427 defer cncl() 430 defer cncl()
428 431
429 // nodeChan recieves attempt nodes to process. If it recieves the 432 // nodeChan recieves attempt nodes to process. If it recieves the
430 // `finishedJob` sentinel node, that indicates that an outstanding worke r is 433 // `finishedJob` sentinel node, that indicates that an outstanding worke r is
431 // finished. 434 // finished.
432 nodeChan := make(chan *node, numWorkers) 435 nodeChan := make(chan *node, numWorkers)
433 defer close(nodeChan) 436 defer close(nodeChan)
434 437
435 g := graphWalker{Context: c, req: req} 438 g := graphWalker{Context: c, req: req}
(...skipping 152 matching lines...) Expand 10 before | Expand all | Expand 10 after
588 func (d *deps) WalkGraph(c context.Context, req *dm.WalkGraphReq) (rsp *dm.Graph Data, err error) { 591 func (d *deps) WalkGraph(c context.Context, req *dm.WalkGraphReq) (rsp *dm.Graph Data, err error) {
589 if req.Auth != nil { 592 if req.Auth != nil {
590 logging.Fields{"execution": req.Auth.Id}.Debugf(c, "on behalf of ") 593 logging.Fields{"execution": req.Auth.Id}.Debugf(c, "on behalf of ")
591 } else { 594 } else {
592 if err = canRead(c); err != nil { 595 if err = canRead(c); err != nil {
593 return 596 return
594 } 597 }
595 } 598 }
596 return doGraphWalk(c, req) 599 return doGraphWalk(c, req)
597 } 600 }
OLDNEW
« no previous file with comments | « appengine/tsmon/handler_test.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698