| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package datastorecache | 5 package datastorecache |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | |
| 9 "fmt" | 8 "fmt" |
| 10 "net/http" | 9 "net/http" |
| 11 "strings" | 10 "strings" |
| 12 "sync/atomic" | 11 "sync/atomic" |
| 13 "time" | 12 "time" |
| 14 | 13 |
| 15 "github.com/luci/luci-go/appengine/memlock" | 14 "github.com/luci/luci-go/appengine/memlock" |
| 16 "github.com/luci/luci-go/common/clock" | 15 "github.com/luci/luci-go/common/clock" |
| 17 "github.com/luci/luci-go/common/errors" | 16 "github.com/luci/luci-go/common/errors" |
| 18 log "github.com/luci/luci-go/common/logging" | 17 log "github.com/luci/luci-go/common/logging" |
| (...skipping 15 matching lines...) Expand all Loading... |
| 34 return | 33 return |
| 35 } | 34 } |
| 36 | 35 |
| 37 // Handler returned an error, dump it to output. | 36 // Handler returned an error, dump it to output. |
| 38 ctx.Writer.WriteHeader(http.StatusInternalServerError) | 37 ctx.Writer.WriteHeader(http.StatusInternalServerError) |
| 39 | 38 |
| 40 // Log all of our stack lines individually, so we don't overflow
the | 39 // Log all of our stack lines individually, so we don't overflow
the |
| 41 // maximum log message size with a full stack. | 40 // maximum log message size with a full stack. |
| 42 stk := errors.RenderStack(err) | 41 stk := errors.RenderStack(err) |
| 43 log.WithError(err).Errorf(ctx.Context, "Handler returned error."
) | 42 log.WithError(err).Errorf(ctx.Context, "Handler returned error."
) |
| 44 » » for _, line := range stk.ToLines() { | 43 » » for _, line := range stk { |
| 45 log.Errorf(ctx.Context, ">> %s", line) | 44 log.Errorf(ctx.Context, ">> %s", line) |
| 46 } | 45 } |
| 47 | 46 |
| 48 dumpErr := func() error { | 47 dumpErr := func() error { |
| 49 » » » var buf bytes.Buffer | 48 » » » for _, line := range stk { |
| 50 » » » if _, err := stk.DumpTo(&buf); err != nil { | 49 » » » » if _, err := ctx.Writer.Write([]byte(line + "\n"
)); err != nil { |
| 51 » » » » return err | 50 » » » » » return err |
| 52 » » » } | 51 » » » » } |
| 53 » » » if _, err := buf.WriteTo(ctx.Writer); err != nil { | |
| 54 » » » » return err | |
| 55 } | 52 } |
| 56 return nil | 53 return nil |
| 57 }() | 54 }() |
| 58 if dumpErr != nil { | 55 if dumpErr != nil { |
| 59 log.WithError(dumpErr).Errorf(ctx.Context, "Failed to du
mp error stack.") | 56 log.WithError(dumpErr).Errorf(ctx.Context, "Failed to du
mp error stack.") |
| 60 } | 57 } |
| 61 } | 58 } |
| 62 } | 59 } |
| 63 | 60 |
| 64 // manager is initialized to perform the management cron task. | 61 // manager is initialized to perform the management cron task. |
| (...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 141 if rv == nil { | 138 if rv == nil { |
| 142 ms.st.LastSuccessfulRun = ms.now | 139 ms.st.LastSuccessfulRun = ms.now |
| 143 } | 140 } |
| 144 | 141 |
| 145 if rv := datastore.Put(c, &ms.st); rv != nil { | 142 if rv := datastore.Put(c, &ms.st); rv != nil { |
| 146 log.WithError(rv).Errorf(c, "Failed to Put() sta
ts on completion.") | 143 log.WithError(rv).Errorf(c, "Failed to Put() sta
ts on completion.") |
| 147 } | 144 } |
| 148 }() | 145 }() |
| 149 | 146 |
| 150 if err := ms.runLocked(c); err != nil { | 147 if err := ms.runLocked(c); err != nil { |
| 151 » » » return errors.Annotate(err).Err() | 148 » » » return errors.Annotate(err, "running maintenance loop").
Err() |
| 152 } | 149 } |
| 153 | 150 |
| 154 // If we observed errors during processing, note this. | 151 // If we observed errors during processing, note this. |
| 155 if ms.errors > 0 { | 152 if ms.errors > 0 { |
| 156 » » » return errors.Reason("%(count)d error(s) encountered dur
ing processing").D("count", ms.errors).Err() | 153 » » » return errors.Reason("%d error(s) encountered during pro
cessing", ms.errors).Err() |
| 157 } | 154 } |
| 158 return nil | 155 return nil |
| 159 }) | 156 }) |
| 160 } | 157 } |
| 161 | 158 |
| 162 // runLocked runs the main main maintenance loop. | 159 // runLocked runs the main main maintenance loop. |
| 163 // | 160 // |
| 164 // As the run is executed, stats can be collected in ms.st. These will be output | 161 // As the run is executed, stats can be collected in ms.st. These will be output |
| 165 // to datastore on completion. | 162 // to datastore on completion. |
| 166 func (ms *managerShard) runLocked(c context.Context) error { | 163 func (ms *managerShard) runLocked(c context.Context) error { |
| (...skipping 156 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 323 Size: ms.queryBatchSize, | 320 Size: ms.queryBatchSize, |
| 324 Callback: handleEntries, | 321 Callback: handleEntries, |
| 325 } | 322 } |
| 326 err := b.Run(c, q, func(e *entry) error { | 323 err := b.Run(c, q, func(e *entry) error { |
| 327 totalEntries++ | 324 totalEntries++ |
| 328 ms.observeEntry() | 325 ms.observeEntry() |
| 329 entries = append(entries, e) | 326 entries = append(entries, e) |
| 330 return nil | 327 return nil |
| 331 }) | 328 }) |
| 332 if err != nil { | 329 if err != nil { |
| 333 » » return errors.Annotate(err).Reason("failed to run entry query").
Err() | 330 » » return errors.Annotate(err, "failed to run entry query").Err() |
| 334 } | 331 } |
| 335 | 332 |
| 336 // Flush any outstanding entries (ignore error, will always be nil). | 333 // Flush any outstanding entries (ignore error, will always be nil). |
| 337 _ = handleEntries(c) | 334 _ = handleEntries(c) |
| 338 if totalErrors > 0 { | 335 if totalErrors > 0 { |
| 339 ms.observeErrors(totalErrors) | 336 ms.observeErrors(totalErrors) |
| 340 } | 337 } |
| 341 | 338 |
| 342 log.Fields{ | 339 log.Fields{ |
| 343 "entries": totalEntries, | 340 "entries": totalEntries, |
| 344 "errors": totalErrors, | 341 "errors": totalErrors, |
| 345 "refreshed": totalRefreshed, | 342 "refreshed": totalRefreshed, |
| 346 "pruned": totalPruned, | 343 "pruned": totalPruned, |
| 347 }.Infof(c, "Successfully updated cache entries.") | 344 }.Infof(c, "Successfully updated cache entries.") |
| 348 return nil | 345 return nil |
| 349 } | 346 } |
| OLD | NEW |