Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(770)

Unified Diff: server/internal/logdog/collector/coordinator/cache.go

Issue 1971933002: LogDog: collector cache non-transient errors. (Closed) Base URL: https://github.com/luci/luci-go@logdog-butler-register-coordinator-prefix-expiration
Patch Set: Updated patchset dependency Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | server/internal/logdog/collector/coordinator/cache_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: server/internal/logdog/collector/coordinator/cache.go
diff --git a/server/internal/logdog/collector/coordinator/cache.go b/server/internal/logdog/collector/coordinator/cache.go
index 6242856e40397ee48a026ebc342c5c11e7b04584..65a6b882f25d2ac1297278899d26abe71da60085 100644
--- a/server/internal/logdog/collector/coordinator/cache.go
+++ b/server/internal/logdog/collector/coordinator/cache.go
@@ -10,6 +10,7 @@ import (
"github.com/luci/luci-go/common/clock"
"github.com/luci/luci-go/common/config"
+ "github.com/luci/luci-go/common/errors"
"github.com/luci/luci-go/common/logdog/types"
log "github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/common/lru"
@@ -84,10 +85,11 @@ func (c *cache) RegisterStream(ctx context.Context, st *LogStreamState, desc []b
// Coordinator sync.
cacheHit := false
entry := c.lru.Mutate(key, func(current interface{}) interface{} {
- // Don't replace an existing entry, unless it has an error or has expired.
+ // Don't replace an existing entry, unless it has a transient error or has
+ // expired.
if current != nil {
curEntry := current.(*cacheEntry)
- if !curEntry.hasError() && now.Before(curEntry.expiresAt) {
+ if !curEntry.hasTransientError() && now.Before(curEntry.expiresAt) {
cacheHit = true
return current
}
@@ -95,10 +97,7 @@ func (c *cache) RegisterStream(ctx context.Context, st *LogStreamState, desc []b
p := promise.New(func() (interface{}, error) {
st, err := c.Coordinator.RegisterStream(ctx, st, desc)
- if err != nil {
- return nil, err
- }
- return st, nil
+ return st, err
})
return &cacheEntry{
@@ -112,18 +111,21 @@ func (c *cache) RegisterStream(ctx context.Context, st *LogStreamState, desc []b
}
}).(*cacheEntry)
- // If there was an error, purge the erroneous entry from the cache so that
- // the next "update" will re-fetch it.
+ // If there was a transient error, purge the erroneous entry from the cache so
+ // that the next "update" will re-fetch it.
+ //
+ // If the error was non-transient, we will retain it as the Promise return
+ // value and subsequent calls will also receive this error.
st, err := entry.get(ctx)
- if err != nil {
+ if errors.IsTransient(err) {
log.Fields{
log.ErrorKey: err,
- }.Errorf(ctx, "Error retrieving stream state.")
+ }.Errorf(ctx, "Transient error retrieving stream state.")
return nil, err
}
tsCache.Add(ctx, 1, cacheHit)
- return st, nil
+ return st, err
}
func (c *cache) TerminateStream(ctx context.Context, st *LogStreamState) error {
@@ -205,14 +207,16 @@ func (e *cacheEntry) get(ctx context.Context) (*LogStreamState, error) {
return &rp, nil
}
-// hasError tests if this entry has completed evaluation with an error state.
+// hasTransientError tests if this entry has completed evaluation with an error state.
// This is non-blocking, so if the evaluation hasn't completed, it will return
// false.
-func (e *cacheEntry) hasError() bool {
- if _, err := e.p.Peek(); err != nil && err != promise.ErrNoData {
- return true
+func (e *cacheEntry) hasTransientError() bool {
+ switch _, err := e.p.Peek(); err {
+ case nil, promise.ErrNoData:
+ return false
+ default:
+ return errors.IsTransient(err)
}
- return false
}
// loadTerminalIndex loads a local cache of the stream's terminal index. This
« no previous file with comments | « no previous file | server/internal/logdog/collector/coordinator/cache_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698