| Index: server/internal/logdog/collector/coordinator/cache.go
|
| diff --git a/server/internal/logdog/collector/coordinator/cache.go b/server/internal/logdog/collector/coordinator/cache.go
|
| index 6242856e40397ee48a026ebc342c5c11e7b04584..65a6b882f25d2ac1297278899d26abe71da60085 100644
|
| --- a/server/internal/logdog/collector/coordinator/cache.go
|
| +++ b/server/internal/logdog/collector/coordinator/cache.go
|
| @@ -10,6 +10,7 @@ import (
|
|
|
| "github.com/luci/luci-go/common/clock"
|
| "github.com/luci/luci-go/common/config"
|
| + "github.com/luci/luci-go/common/errors"
|
| "github.com/luci/luci-go/common/logdog/types"
|
| log "github.com/luci/luci-go/common/logging"
|
| "github.com/luci/luci-go/common/lru"
|
| @@ -84,10 +85,11 @@ func (c *cache) RegisterStream(ctx context.Context, st *LogStreamState, desc []b
|
| // Coordinator sync.
|
| cacheHit := false
|
| entry := c.lru.Mutate(key, func(current interface{}) interface{} {
|
| - // Don't replace an existing entry, unless it has an error or has expired.
|
| + // Don't replace an existing entry, unless it has a transient error or has
|
| + // expired.
|
| if current != nil {
|
| curEntry := current.(*cacheEntry)
|
| - if !curEntry.hasError() && now.Before(curEntry.expiresAt) {
|
| + if !curEntry.hasTransientError() && now.Before(curEntry.expiresAt) {
|
| cacheHit = true
|
| return current
|
| }
|
| @@ -95,10 +97,7 @@ func (c *cache) RegisterStream(ctx context.Context, st *LogStreamState, desc []b
|
|
|
| p := promise.New(func() (interface{}, error) {
|
| st, err := c.Coordinator.RegisterStream(ctx, st, desc)
|
| - if err != nil {
|
| - return nil, err
|
| - }
|
| - return st, nil
|
| + return st, err
|
| })
|
|
|
| return &cacheEntry{
|
| @@ -112,18 +111,21 @@ func (c *cache) RegisterStream(ctx context.Context, st *LogStreamState, desc []b
|
| }
|
| }).(*cacheEntry)
|
|
|
| - // If there was an error, purge the erroneous entry from the cache so that
|
| - // the next "update" will re-fetch it.
|
| + // If there was a transient error, purge the erroneous entry from the cache so
|
| + // that the next "update" will re-fetch it.
|
| + //
|
| + // If the error was non-transient, we will retain it as the Promise return
|
| + // value and subsequent calls will also receive this error.
|
| st, err := entry.get(ctx)
|
| - if err != nil {
|
| + if errors.IsTransient(err) {
|
| log.Fields{
|
| log.ErrorKey: err,
|
| - }.Errorf(ctx, "Error retrieving stream state.")
|
| + }.Errorf(ctx, "Transient error retrieving stream state.")
|
| return nil, err
|
| }
|
|
|
| tsCache.Add(ctx, 1, cacheHit)
|
| - return st, nil
|
| + return st, err
|
| }
|
|
|
| func (c *cache) TerminateStream(ctx context.Context, st *LogStreamState) error {
|
| @@ -205,14 +207,16 @@ func (e *cacheEntry) get(ctx context.Context) (*LogStreamState, error) {
|
| return &rp, nil
|
| }
|
|
|
| -// hasError tests if this entry has completed evaluation with an error state.
|
| +// hasTransientError tests if this entry has completed evaluation with an error state.
|
| // This is non-blocking, so if the evaluation hasn't completed, it will return
|
| // false.
|
| -func (e *cacheEntry) hasError() bool {
|
| - if _, err := e.p.Peek(); err != nil && err != promise.ErrNoData {
|
| - return true
|
| +func (e *cacheEntry) hasTransientError() bool {
|
| + switch _, err := e.p.Peek(); err {
|
| + case nil, promise.ErrNoData:
|
| + return false
|
| + default:
|
| + return errors.IsTransient(err)
|
| }
|
| - return false
|
| }
|
|
|
| // loadTerminalIndex loads a local cache of the stream's terminal index. This
|
|
|