| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package datastorecache | 5 package datastorecache |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "fmt" | 9 "fmt" |
| 10 "net/http" | 10 "net/http" |
| 11 "strings" | 11 "strings" |
| 12 "sync/atomic" | 12 "sync/atomic" |
| 13 "time" | 13 "time" |
| 14 | 14 |
| 15 "github.com/luci/luci-go/appengine/memlock" | 15 "github.com/luci/luci-go/appengine/memlock" |
| 16 "github.com/luci/luci-go/common/clock" | 16 "github.com/luci/luci-go/common/clock" |
| 17 "github.com/luci/luci-go/common/errors" | 17 "github.com/luci/luci-go/common/errors" |
| 18 log "github.com/luci/luci-go/common/logging" | 18 log "github.com/luci/luci-go/common/logging" |
| 19 "github.com/luci/luci-go/common/sync/parallel" | 19 "github.com/luci/luci-go/common/sync/parallel" |
| 20 "github.com/luci/luci-go/server/router" | 20 "github.com/luci/luci-go/server/router" |
| 21 | 21 |
| 22 "github.com/luci/gae/filter/dsQueryBatch" | |
| 23 "github.com/luci/gae/service/datastore" | 22 "github.com/luci/gae/service/datastore" |
| 24 "github.com/luci/gae/service/info" | 23 "github.com/luci/gae/service/info" |
| 25 | 24 |
| 26 "github.com/julienschmidt/httprouter" | 25 "github.com/julienschmidt/httprouter" |
| 27 "golang.org/x/net/context" | 26 "golang.org/x/net/context" |
| 28 ) | 27 ) |
| 29 | 28 |
| 30 const ( | |
| 31 // managerQueryBatchSize is the query batch size for cache entries that
are | |
| 32 // processed by a manager shard. | |
| 33 managerQueryBatchSize = 200 | |
| 34 ) | |
| 35 | |
| 36 func errHTTPHandler(fn func(c context.Context, req *http.Request, params httprou
ter.Params) error) router.Handler { | 29 func errHTTPHandler(fn func(c context.Context, req *http.Request, params httprou
ter.Params) error) router.Handler { |
| 37 return func(ctx *router.Context) { | 30 return func(ctx *router.Context) { |
| 38 err := fn(ctx.Context, ctx.Request, ctx.Params) | 31 err := fn(ctx.Context, ctx.Request, ctx.Params) |
| 39 if err == nil { | 32 if err == nil { |
| 40 // Handler returned no error, everything is good. | 33 // Handler returned no error, everything is good. |
| 41 return | 34 return |
| 42 } | 35 } |
| 43 | 36 |
| 44 // Handler returned an error, dump it to output. | 37 // Handler returned an error, dump it to output. |
| 45 ctx.Writer.WriteHeader(http.StatusInternalServerError) | 38 ctx.Writer.WriteHeader(http.StatusInternalServerError) |
| (...skipping 273 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 319 } else { | 312 } else { |
| 320 totalPruned += int(deleteIdx) | 313 totalPruned += int(deleteIdx) |
| 321 } | 314 } |
| 322 return nil | 315 return nil |
| 323 } | 316 } |
| 324 } | 317 } |
| 325 }) | 318 }) |
| 326 return nil | 319 return nil |
| 327 } | 320 } |
| 328 | 321 |
| 329 » err := datastore.Run(dsQueryBatch.BatchQueries(c, int32(ms.queryBatchSiz
e), handleEntries), q, func(e *entry) error { | 322 » b := datastore.Batcher{ |
| 323 » » Size: ms.queryBatchSize, |
| 324 » » Callback: handleEntries, |
| 325 » } |
| 326 » err := b.Run(c, q, func(e *entry) error { |
| 330 totalEntries++ | 327 totalEntries++ |
| 331 ms.observeEntry() | 328 ms.observeEntry() |
| 332 entries = append(entries, e) | 329 entries = append(entries, e) |
| 333 return nil | 330 return nil |
| 334 }) | 331 }) |
| 335 if err != nil { | 332 if err != nil { |
| 336 return errors.Annotate(err).Reason("failed to run entry query").
Err() | 333 return errors.Annotate(err).Reason("failed to run entry query").
Err() |
| 337 } | 334 } |
| 338 | 335 |
| 339 // Flush any outstanding entries (ignore error, will always be nil). | 336 // Flush any outstanding entries (ignore error, will always be nil). |
| 340 _ = handleEntries(c) | 337 _ = handleEntries(c) |
| 341 if totalErrors > 0 { | 338 if totalErrors > 0 { |
| 342 ms.observeErrors(totalErrors) | 339 ms.observeErrors(totalErrors) |
| 343 } | 340 } |
| 344 | 341 |
| 345 log.Fields{ | 342 log.Fields{ |
| 346 "entries": totalEntries, | 343 "entries": totalEntries, |
| 347 "errors": totalErrors, | 344 "errors": totalErrors, |
| 348 "refreshed": totalRefreshed, | 345 "refreshed": totalRefreshed, |
| 349 "pruned": totalPruned, | 346 "pruned": totalPruned, |
| 350 }.Infof(c, "Successfully updated cache entries.") | 347 }.Infof(c, "Successfully updated cache entries.") |
| 351 return nil | 348 return nil |
| 352 } | 349 } |
| OLD | NEW |