| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package tumble | 5 package tumble |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "fmt" | 9 "fmt" |
| 10 "math" | 10 "math" |
| 11 "sync/atomic" | 11 "sync/atomic" |
| 12 "time" | 12 "time" |
| 13 | 13 |
| 14 "github.com/luci/luci-go/appengine/memlock" | 14 "github.com/luci/luci-go/appengine/memlock" |
| 15 "github.com/luci/luci-go/common/clock" | 15 "github.com/luci/luci-go/common/clock" |
| 16 "github.com/luci/luci-go/common/data/stringset" | 16 "github.com/luci/luci-go/common/data/stringset" |
| 17 "github.com/luci/luci-go/common/errors" | 17 "github.com/luci/luci-go/common/errors" |
| 18 "github.com/luci/luci-go/common/logging" | 18 "github.com/luci/luci-go/common/logging" |
| 19 "github.com/luci/luci-go/common/retry/transient" |
| 19 "github.com/luci/luci-go/common/sync/parallel" | 20 "github.com/luci/luci-go/common/sync/parallel" |
| 20 | 21 |
| 21 "github.com/luci/gae/filter/txnBuf" | 22 "github.com/luci/gae/filter/txnBuf" |
| 22 ds "github.com/luci/gae/service/datastore" | 23 ds "github.com/luci/gae/service/datastore" |
| 23 "github.com/luci/gae/service/datastore/serialize" | 24 "github.com/luci/gae/service/datastore/serialize" |
| 24 "github.com/luci/gae/service/info" | 25 "github.com/luci/gae/service/info" |
| 25 mc "github.com/luci/gae/service/memcache" | 26 mc "github.com/luci/gae/service/memcache" |
| 26 | 27 |
| 27 "golang.org/x/net/context" | 28 "golang.org/x/net/context" |
| 28 ) | 29 ) |
| (...skipping 197 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 226 "query": qstr, | 227 "query": qstr, |
| 227 }.Errorf(c, "Failure to run shard query.") | 228 }.Errorf(c, "Failure to run shard query.") |
| 228 errCount.inc() | 229 errCount.inc() |
| 229 } | 230 } |
| 230 }) | 231 }) |
| 231 | 232 |
| 232 logging.Infof(c, "cumulatively processed %d items with %d errors
(s) and %d transient error(s)", | 233 logging.Infof(c, "cumulatively processed %d items with %d errors
(s) and %d transient error(s)", |
| 233 numProcessed, errCount, transientErrCount) | 234 numProcessed, errCount, transientErrCount) |
| 234 switch { | 235 switch { |
| 235 case transientErrCount > 0: | 236 case transientErrCount > 0: |
| 236 » » » return errors.WrapTransient(errors.New("transient error
during shard processing")) | 237 » » » return errors.New("transient error during shard processi
ng", transient.Tag) |
| 237 case errCount > 0: | 238 case errCount > 0: |
| 238 return errors.New("encountered non-transient error durin
g shard processing") | 239 return errors.New("encountered non-transient error durin
g shard processing") |
| 239 } | 240 } |
| 240 | 241 |
| 241 now := clock.Now(c) | 242 now := clock.Now(c) |
| 242 didWork := numProcessed > 0 | 243 didWork := numProcessed > 0 |
| 243 if didWork { | 244 if didWork { |
| 244 // Set our last key value for next round. | 245 // Set our last key value for next round. |
| 245 err = mc.Set(c, mc.NewItem(c, t.lastKey).SetValue(serial
ize.ToBytes(now.UTC()))) | 246 err = mc.Set(c, mc.NewItem(c, t.lastKey).SetValue(serial
ize.ToBytes(now.UTC()))) |
| 246 if err != nil { | 247 if err != nil { |
| (...skipping 251 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 498 prd.nextDelay = delay | 499 prd.nextDelay = delay |
| 499 } | 500 } |
| 500 | 501 |
| 501 // Enforce a no work lower bound. | 502 // Enforce a no work lower bound. |
| 502 if delay < minNoWorkDelay { | 503 if delay < minNoWorkDelay { |
| 503 delay = minNoWorkDelay | 504 delay = minNoWorkDelay |
| 504 } | 505 } |
| 505 | 506 |
| 506 return delay | 507 return delay |
| 507 } | 508 } |
| OLD | NEW |