OLD | NEW |
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package tumble | 5 package tumble |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "fmt" | 9 "fmt" |
10 "math" | 10 "math" |
11 "sync/atomic" | 11 "sync/atomic" |
12 "time" | 12 "time" |
13 | 13 |
14 "github.com/luci/luci-go/appengine/memlock" | 14 "github.com/luci/luci-go/appengine/memlock" |
15 "github.com/luci/luci-go/common/clock" | 15 "github.com/luci/luci-go/common/clock" |
16 "github.com/luci/luci-go/common/data/stringset" | 16 "github.com/luci/luci-go/common/data/stringset" |
17 "github.com/luci/luci-go/common/errors" | 17 "github.com/luci/luci-go/common/errors" |
18 "github.com/luci/luci-go/common/logging" | 18 "github.com/luci/luci-go/common/logging" |
| 19 "github.com/luci/luci-go/common/retry/transient" |
19 "github.com/luci/luci-go/common/sync/parallel" | 20 "github.com/luci/luci-go/common/sync/parallel" |
20 | 21 |
21 "github.com/luci/gae/filter/txnBuf" | 22 "github.com/luci/gae/filter/txnBuf" |
22 ds "github.com/luci/gae/service/datastore" | 23 ds "github.com/luci/gae/service/datastore" |
23 "github.com/luci/gae/service/datastore/serialize" | 24 "github.com/luci/gae/service/datastore/serialize" |
24 "github.com/luci/gae/service/info" | 25 "github.com/luci/gae/service/info" |
25 mc "github.com/luci/gae/service/memcache" | 26 mc "github.com/luci/gae/service/memcache" |
26 | 27 |
27 "golang.org/x/net/context" | 28 "golang.org/x/net/context" |
28 ) | 29 ) |
(...skipping 197 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
226 "query": qstr, | 227 "query": qstr, |
227 }.Errorf(c, "Failure to run shard query.") | 228 }.Errorf(c, "Failure to run shard query.") |
228 errCount.inc() | 229 errCount.inc() |
229 } | 230 } |
230 }) | 231 }) |
231 | 232 |
232 logging.Infof(c, "cumulatively processed %d items with %d errors
(s) and %d transient error(s)", | 233 logging.Infof(c, "cumulatively processed %d items with %d errors
(s) and %d transient error(s)", |
233 numProcessed, errCount, transientErrCount) | 234 numProcessed, errCount, transientErrCount) |
234 switch { | 235 switch { |
235 case transientErrCount > 0: | 236 case transientErrCount > 0: |
236 » » » return errors.WrapTransient(errors.New("transient error
during shard processing")) | 237 » » » return errors.New("transient error during shard processi
ng", transient.Tag) |
237 case errCount > 0: | 238 case errCount > 0: |
238 return errors.New("encountered non-transient error durin
g shard processing") | 239 return errors.New("encountered non-transient error durin
g shard processing") |
239 } | 240 } |
240 | 241 |
241 now := clock.Now(c) | 242 now := clock.Now(c) |
242 didWork := numProcessed > 0 | 243 didWork := numProcessed > 0 |
243 if didWork { | 244 if didWork { |
244 // Set our last key value for next round. | 245 // Set our last key value for next round. |
245 err = mc.Set(c, mc.NewItem(c, t.lastKey).SetValue(serial
ize.ToBytes(now.UTC()))) | 246 err = mc.Set(c, mc.NewItem(c, t.lastKey).SetValue(serial
ize.ToBytes(now.UTC()))) |
246 if err != nil { | 247 if err != nil { |
(...skipping 251 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
498 prd.nextDelay = delay | 499 prd.nextDelay = delay |
499 } | 500 } |
500 | 501 |
501 // Enforce a no work lower bound. | 502 // Enforce a no work lower bound. |
502 if delay < minNoWorkDelay { | 503 if delay < minNoWorkDelay { |
503 delay = minNoWorkDelay | 504 delay = minNoWorkDelay |
504 } | 505 } |
505 | 506 |
506 return delay | 507 return delay |
507 } | 508 } |
OLD | NEW |