Chromium Code Reviews| Index: server/internal/logdog/collector/coordinator/cache.go |
| diff --git a/server/internal/logdog/collector/coordinator/cache.go b/server/internal/logdog/collector/coordinator/cache.go |
| index 6242856e40397ee48a026ebc342c5c11e7b04584..65a6b882f25d2ac1297278899d26abe71da60085 100644 |
| --- a/server/internal/logdog/collector/coordinator/cache.go |
| +++ b/server/internal/logdog/collector/coordinator/cache.go |
| @@ -10,6 +10,7 @@ import ( |
| "github.com/luci/luci-go/common/clock" |
| "github.com/luci/luci-go/common/config" |
| + "github.com/luci/luci-go/common/errors" |
| "github.com/luci/luci-go/common/logdog/types" |
| log "github.com/luci/luci-go/common/logging" |
| "github.com/luci/luci-go/common/lru" |
| @@ -84,10 +85,11 @@ func (c *cache) RegisterStream(ctx context.Context, st *LogStreamState, desc []b |
| // Coordinator sync. |
| cacheHit := false |
| entry := c.lru.Mutate(key, func(current interface{}) interface{} { |
| - // Don't replace an existing entry, unless it has an error or has expired. |
| + // Don't replace an existing entry, unless it has a transient error or has |
| + // expired. |
| if current != nil { |
| curEntry := current.(*cacheEntry) |
| - if !curEntry.hasError() && now.Before(curEntry.expiresAt) { |
| + if !curEntry.hasTransientError() && now.Before(curEntry.expiresAt) { |
| cacheHit = true |
| return current |
| } |
| @@ -95,10 +97,7 @@ func (c *cache) RegisterStream(ctx context.Context, st *LogStreamState, desc []b |
| p := promise.New(func() (interface{}, error) { |
| st, err := c.Coordinator.RegisterStream(ctx, st, desc) |
|
nodir
2016/05/19 01:01:51
just return it
|
| - if err != nil { |
| - return nil, err |
| - } |
| - return st, nil |
| + return st, err |
| }) |
| return &cacheEntry{ |
| @@ -112,18 +111,21 @@ func (c *cache) RegisterStream(ctx context.Context, st *LogStreamState, desc []b |
| } |
| }).(*cacheEntry) |
| - // If there was an error, purge the erroneous entry from the cache so that |
| - // the next "update" will re-fetch it. |
| + // If there was a transient error, purge the erroneous entry from the cache so |
| + // that the next "update" will re-fetch it. |
|
nodir
2016/05/19 01:01:51
I am not clear where this purge happens?
|
| + // |
| + // If the error was non-transient, we will retain it as the Promise return |
| + // value and subsequent calls will also receive this error. |
| st, err := entry.get(ctx) |
| - if err != nil { |
| + if errors.IsTransient(err) { |
| log.Fields{ |
| log.ErrorKey: err, |
| - }.Errorf(ctx, "Error retrieving stream state.") |
| + }.Errorf(ctx, "Transient error retrieving stream state.") |
| return nil, err |
| } |
| tsCache.Add(ctx, 1, cacheHit) |
| - return st, nil |
| + return st, err |
| } |
| func (c *cache) TerminateStream(ctx context.Context, st *LogStreamState) error { |
| @@ -205,14 +207,16 @@ func (e *cacheEntry) get(ctx context.Context) (*LogStreamState, error) { |
| return &rp, nil |
| } |
| -// hasError tests if this entry has completed evaluation with an error state. |
| +// hasTransientError tests if this entry has completed evaluation with an error state. |
| // This is non-blocking, so if the evaluation hasn't completed, it will return |
| // false. |
| -func (e *cacheEntry) hasError() bool { |
| - if _, err := e.p.Peek(); err != nil && err != promise.ErrNoData { |
| - return true |
| +func (e *cacheEntry) hasTransientError() bool { |
| + switch _, err := e.p.Peek(); err { |
| + case nil, promise.ErrNoData: |
| + return false |
| + default: |
| + return errors.IsTransient(err) |
| } |
| - return false |
| } |
| // loadTerminalIndex loads a local cache of the stream's terminal index. This |