| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package deps | 5 package deps |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "math" | 9 "math" |
| 10 "sort" | 10 "sort" |
| 11 "sync" | 11 "sync" |
| 12 "time" | 12 "time" |
| 13 | 13 |
| 14 "golang.org/x/net/context" | 14 "golang.org/x/net/context" |
| 15 "google.golang.org/grpc/codes" | 15 "google.golang.org/grpc/codes" |
| 16 | 16 |
| 17 "github.com/luci/gae/filter/dsQueryBatch" | |
| 18 ds "github.com/luci/gae/service/datastore" | 17 ds "github.com/luci/gae/service/datastore" |
| 18 |
| 19 "github.com/luci/luci-go/common/clock" | 19 "github.com/luci/luci-go/common/clock" |
| 20 "github.com/luci/luci-go/common/logging" | 20 "github.com/luci/luci-go/common/logging" |
| 21 "github.com/luci/luci-go/common/sync/parallel" | 21 "github.com/luci/luci-go/common/sync/parallel" |
| 22 dm "github.com/luci/luci-go/dm/api/service/v1" | 22 dm "github.com/luci/luci-go/dm/api/service/v1" |
| 23 "github.com/luci/luci-go/dm/appengine/distributor" | 23 "github.com/luci/luci-go/dm/appengine/distributor" |
| 24 "github.com/luci/luci-go/dm/appengine/model" | 24 "github.com/luci/luci-go/dm/appengine/model" |
| 25 "github.com/luci/luci-go/grpc/grpcutil" | 25 "github.com/luci/luci-go/grpc/grpcutil" |
| 26 ) | 26 ) |
| 27 | 27 |
| 28 const numWorkers = 16 | 28 const numWorkers = 16 |
| 29 | 29 |
| 30 const maxTimeout = 55 * time.Second // GAE limit is 60s | 30 const maxTimeout = 55 * time.Second // GAE limit is 60s |
| 31 | 31 |
| 32 // queryBatcher is a default datastore Batcher instance for queries. |
| 33 // |
| 34 // we end up doing a lot of long queries in here, so use a batcher to |
| 35 // prevent datastore timeouts. |
| 36 var queryBatcher ds.Batcher |
| 37 |
| 32 type node struct { | 38 type node struct { |
| 33 aid *dm.Attempt_ID | 39 aid *dm.Attempt_ID |
| 34 depth int64 | 40 depth int64 |
| 35 canSeeAttemptResult bool | 41 canSeeAttemptResult bool |
| 36 } | 42 } |
| 37 | 43 |
| 38 func (g *graphWalker) runSearchQuery(send func(*dm.Attempt_ID) error, s *dm.Grap
hQuery_Search) func() error { | 44 func (g *graphWalker) runSearchQuery(send func(*dm.Attempt_ID) error, s *dm.Grap
hQuery_Search) func() error { |
| 39 return func() error { | 45 return func() error { |
| 40 logging.Errorf(g, "SearchQuery not implemented") | 46 logging.Errorf(g, "SearchQuery not implemented") |
| 41 return grpcutil.Errf(codes.Unimplemented, "GraphQuery.Search is
not implemented") | 47 return grpcutil.Errf(codes.Unimplemented, "GraphQuery.Search is
not implemented") |
| 42 } | 48 } |
| 43 } | 49 } |
| 44 | 50 |
| 45 func isCtxErr(err error) bool { | 51 func isCtxErr(err error) bool { |
| 46 return err == context.Canceled || err == context.DeadlineExceeded | 52 return err == context.Canceled || err == context.DeadlineExceeded |
| 47 } | 53 } |
| 48 | 54 |
| 49 func (g *graphWalker) runAttemptListQuery(send func(*dm.Attempt_ID) error) func(
) error { | 55 func (g *graphWalker) runAttemptListQuery(send func(*dm.Attempt_ID) error) func(
) error { |
| 50 return func() error { | 56 return func() error { |
| 51 for qst, anum := range g.req.Query.AttemptList.To { | 57 for qst, anum := range g.req.Query.AttemptList.To { |
| 52 if len(anum.Nums) == 0 { | 58 if len(anum.Nums) == 0 { |
| 53 qry := model.QueryAttemptsForQuest(g, qst) | 59 qry := model.QueryAttemptsForQuest(g, qst) |
| 54 if !g.req.Include.Attempt.Abnormal { | 60 if !g.req.Include.Attempt.Abnormal { |
| 55 qry = qry.Eq("IsAbnormal", false) | 61 qry = qry.Eq("IsAbnormal", false) |
| 56 } | 62 } |
| 57 if !g.req.Include.Attempt.Expired { | 63 if !g.req.Include.Attempt.Expired { |
| 58 qry = qry.Eq("IsExpired", false) | 64 qry = qry.Eq("IsExpired", false) |
| 59 } | 65 } |
| 60 » » » » err := ds.Run(g, qry, func(k *ds.Key) error { | 66 » » » » err := queryBatcher.Run(g, qry, func(k *ds.Key)
error { |
| 61 aid := &dm.Attempt_ID{} | 67 aid := &dm.Attempt_ID{} |
| 62 if err := aid.SetDMEncoded(k.StringID())
; err != nil { | 68 if err := aid.SetDMEncoded(k.StringID())
; err != nil { |
| 63 logging.WithError(err).Errorf(g,
"Attempt_ID.SetDMEncoded returned an error with input: %q", k.StringID()) | 69 logging.WithError(err).Errorf(g,
"Attempt_ID.SetDMEncoded returned an error with input: %q", k.StringID()) |
| 64 panic(fmt.Errorf("in AttemptList
Query: %s", err)) | 70 panic(fmt.Errorf("in AttemptList
Query: %s", err)) |
| 65 } | 71 } |
| 66 return send(aid) | 72 return send(aid) |
| 67 }) | 73 }) |
| 68 if err != nil { | 74 if err != nil { |
| 69 if !isCtxErr(err) { | 75 if !isCtxErr(err) { |
| 70 logging.WithError(err).Errorf(g,
"in AttemptListQuery") | 76 logging.WithError(err).Errorf(g,
"in AttemptListQuery") |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 106 logging.Fields{ek: err, "qid": qid}.Errorf(g, "while loa
ding quest") | 112 logging.Fields{ek: err, "qid": qid}.Errorf(g, "while loa
ding quest") |
| 107 return err | 113 return err |
| 108 } | 114 } |
| 109 dst.Data = qst.DataProto() | 115 dst.Data = qst.DataProto() |
| 110 dst.Partial = false | 116 dst.Partial = false |
| 111 return nil | 117 return nil |
| 112 } | 118 } |
| 113 } | 119 } |
| 114 | 120 |
| 115 func (g *graphWalker) loadEdges(send func(*dm.Attempt_ID) error, typ string, bas
e *ds.Key, fan *dm.AttemptList, doSend bool) error { | 121 func (g *graphWalker) loadEdges(send func(*dm.Attempt_ID) error, typ string, bas
e *ds.Key, fan *dm.AttemptList, doSend bool) error { |
| 116 » return ds.Run(g, ds.NewQuery(typ).Ancestor(base), func(k *ds.Key) error
{ | 122 » return queryBatcher.Run(g, ds.NewQuery(typ).Ancestor(base), func(k *ds.K
ey) error { |
| 117 if g.Err() != nil { | 123 if g.Err() != nil { |
| 118 return ds.Stop | 124 return ds.Stop |
| 119 } | 125 } |
| 120 aid := &dm.Attempt_ID{} | 126 aid := &dm.Attempt_ID{} |
| 121 if err := aid.SetDMEncoded(k.StringID()); err != nil { | 127 if err := aid.SetDMEncoded(k.StringID()); err != nil { |
| 122 logging.WithError(err).Errorf(g, "Attempt_ID.SetDMEncode
d returned an error with input: %q", k.StringID()) | 128 logging.WithError(err).Errorf(g, "Attempt_ID.SetDMEncode
d returned an error with input: %q", k.StringID()) |
| 123 panic(fmt.Errorf("in AttemptListQuery: %s", err)) | 129 panic(fmt.Errorf("in AttemptListQuery: %s", err)) |
| 124 } | 130 } |
| 125 if fan != nil { | 131 if fan != nil { |
| 126 fan.AddAIDs(aid) | 132 fan.AddAIDs(aid) |
| (...skipping 20 matching lines...) Expand all Loading... |
| 147 } else { | 153 } else { |
| 148 q = q.Limit(int32(numEx)) | 154 q = q.Limit(int32(numEx)) |
| 149 } | 155 } |
| 150 if !g.req.Include.Execution.Abnormal { | 156 if !g.req.Include.Execution.Abnormal { |
| 151 q = q.Eq("IsAbnormal", false) | 157 q = q.Eq("IsAbnormal", false) |
| 152 } | 158 } |
| 153 if !g.req.Include.Execution.Expired { | 159 if !g.req.Include.Execution.Expired { |
| 154 q = q.Eq("IsExpired", false) | 160 q = q.Eq("IsExpired", false) |
| 155 } | 161 } |
| 156 | 162 |
| 157 » return ds.Run(g, q, func(e *model.Execution) error { | 163 » return queryBatcher.Run(g, q, func(e *model.Execution) error { |
| 158 if g.Err() != nil { | 164 if g.Err() != nil { |
| 159 return ds.Stop | 165 return ds.Stop |
| 160 } | 166 } |
| 161 p := e.ToProto(g.req.Include.Execution.Ids) | 167 p := e.ToProto(g.req.Include.Execution.Ids) |
| 162 d, err := g.getDist(p.Data.DistributorInfo.ConfigName) | 168 d, err := g.getDist(p.Data.DistributorInfo.ConfigName) |
| 163 if err != nil { | 169 if err != nil { |
| 164 return err | 170 return err |
| 165 } | 171 } |
| 166 p.Data.DistributorInfo.Url = d.InfoURL(distributor.Token(p.Data.
DistributorInfo.Token)) | 172 p.Data.DistributorInfo.Url = d.InfoURL(distributor.Token(p.Data.
DistributorInfo.Token)) |
| 167 if d := p.Data.GetFinished().GetData(); d != nil { | 173 if d := p.Data.GetFinished().GetData(); d != nil { |
| (...skipping 245 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 413 return idx < len(nums) && nums[idx] == aid.Id | 419 return idx < len(nums) && nums[idx] == aid.Id |
| 414 } | 420 } |
| 415 | 421 |
| 416 func doGraphWalk(c context.Context, req *dm.WalkGraphReq) (rsp *dm.GraphData, er
r error) { | 422 func doGraphWalk(c context.Context, req *dm.WalkGraphReq) (rsp *dm.GraphData, er
r error) { |
| 417 cncl := (func())(nil) | 423 cncl := (func())(nil) |
| 418 timeoutProto := req.Limit.MaxTime | 424 timeoutProto := req.Limit.MaxTime |
| 419 timeout := timeoutProto.Duration() // .Duration on nil is OK | 425 timeout := timeoutProto.Duration() // .Duration on nil is OK |
| 420 if timeoutProto == nil || timeout > maxTimeout { | 426 if timeoutProto == nil || timeout > maxTimeout { |
| 421 timeout = maxTimeout | 427 timeout = maxTimeout |
| 422 } | 428 } |
| 423 // we end up doing a lot of long queries in here, so install a batcher t
o | |
| 424 // prevent datastore timeouts. | |
| 425 c = dsQueryBatch.BatchQueries(c, 100) | |
| 426 c, cncl = clock.WithTimeout(c, timeout) | 429 c, cncl = clock.WithTimeout(c, timeout) |
| 427 defer cncl() | 430 defer cncl() |
| 428 | 431 |
| 429 // nodeChan recieves attempt nodes to process. If it recieves the | 432 // nodeChan recieves attempt nodes to process. If it recieves the |
| 430 // `finishedJob` sentinel node, that indicates that an outstanding worke
r is | 433 // `finishedJob` sentinel node, that indicates that an outstanding worke
r is |
| 431 // finished. | 434 // finished. |
| 432 nodeChan := make(chan *node, numWorkers) | 435 nodeChan := make(chan *node, numWorkers) |
| 433 defer close(nodeChan) | 436 defer close(nodeChan) |
| 434 | 437 |
| 435 g := graphWalker{Context: c, req: req} | 438 g := graphWalker{Context: c, req: req} |
| (...skipping 152 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 588 func (d *deps) WalkGraph(c context.Context, req *dm.WalkGraphReq) (rsp *dm.Graph
Data, err error) { | 591 func (d *deps) WalkGraph(c context.Context, req *dm.WalkGraphReq) (rsp *dm.Graph
Data, err error) { |
| 589 if req.Auth != nil { | 592 if req.Auth != nil { |
| 590 logging.Fields{"execution": req.Auth.Id}.Debugf(c, "on behalf of
") | 593 logging.Fields{"execution": req.Auth.Id}.Debugf(c, "on behalf of
") |
| 591 } else { | 594 } else { |
| 592 if err = canRead(c); err != nil { | 595 if err = canRead(c); err != nil { |
| 593 return | 596 return |
| 594 } | 597 } |
| 595 } | 598 } |
| 596 return doGraphWalk(c, req) | 599 return doGraphWalk(c, req) |
| 597 } | 600 } |
| OLD | NEW |