| Index: dm/appengine/deps/walk_graph.go
|
| diff --git a/dm/appengine/deps/walk_graph.go b/dm/appengine/deps/walk_graph.go
|
| index 0c0ab4f04a901c081bf114937e84da9e7c38f43c..aa07f259ba3f63d19e81014114aa057341c2e4ed 100644
|
| --- a/dm/appengine/deps/walk_graph.go
|
| +++ b/dm/appengine/deps/walk_graph.go
|
| @@ -14,8 +14,8 @@ import (
|
| "golang.org/x/net/context"
|
| "google.golang.org/grpc/codes"
|
|
|
| - "github.com/luci/gae/filter/dsQueryBatch"
|
| ds "github.com/luci/gae/service/datastore"
|
| +
|
| "github.com/luci/luci-go/common/clock"
|
| "github.com/luci/luci-go/common/logging"
|
| "github.com/luci/luci-go/common/sync/parallel"
|
| @@ -29,6 +29,12 @@ const numWorkers = 16
|
|
|
| const maxTimeout = 55 * time.Second // GAE limit is 60s
|
|
|
| +// queryBatcher is a default datastore Batcher instance for queries.
|
| +//
|
| +// we end up doing a lot of long queries in here, so use a batcher to
|
| +// prevent datastore timeouts.
|
| +var queryBatcher ds.Batcher
|
| +
|
| type node struct {
|
| aid *dm.Attempt_ID
|
| depth int64
|
| @@ -57,7 +63,7 @@ func (g *graphWalker) runAttemptListQuery(send func(*dm.Attempt_ID) error) func(
|
| if !g.req.Include.Attempt.Expired {
|
| qry = qry.Eq("IsExpired", false)
|
| }
|
| - err := ds.Run(g, qry, func(k *ds.Key) error {
|
| + err := queryBatcher.Run(g, qry, func(k *ds.Key) error {
|
| aid := &dm.Attempt_ID{}
|
| if err := aid.SetDMEncoded(k.StringID()); err != nil {
|
| logging.WithError(err).Errorf(g, "Attempt_ID.SetDMEncoded returned an error with input: %q", k.StringID())
|
| @@ -113,7 +119,7 @@ func (g *graphWalker) questDataLoader(qid string, dst *dm.Quest) func() error {
|
| }
|
|
|
| func (g *graphWalker) loadEdges(send func(*dm.Attempt_ID) error, typ string, base *ds.Key, fan *dm.AttemptList, doSend bool) error {
|
| - return ds.Run(g, ds.NewQuery(typ).Ancestor(base), func(k *ds.Key) error {
|
| + return queryBatcher.Run(g, ds.NewQuery(typ).Ancestor(base), func(k *ds.Key) error {
|
| if g.Err() != nil {
|
| return ds.Stop
|
| }
|
| @@ -154,7 +160,7 @@ func (g *graphWalker) loadExecutions(includeResult bool, atmpt *model.Attempt, a
|
| q = q.Eq("IsExpired", false)
|
| }
|
|
|
| - return ds.Run(g, q, func(e *model.Execution) error {
|
| + return queryBatcher.Run(g, q, func(e *model.Execution) error {
|
| if g.Err() != nil {
|
| return ds.Stop
|
| }
|
| @@ -420,9 +426,6 @@ func doGraphWalk(c context.Context, req *dm.WalkGraphReq) (rsp *dm.GraphData, er
|
| if timeoutProto == nil || timeout > maxTimeout {
|
| timeout = maxTimeout
|
| }
|
| - // we end up doing a lot of long queries in here, so install a batcher to
|
| - // prevent datastore timeouts.
|
| - c = dsQueryBatch.BatchQueries(c, 100)
|
| c, cncl = clock.WithTimeout(c, timeout)
|
| defer cncl()
|
|
|
|
|