Index: server/internal/logdog/collector/coordinator/cache.go |
diff --git a/server/internal/logdog/collector/coordinator/cache.go b/server/internal/logdog/collector/coordinator/cache.go |
index 24b3cef9fca2bdcbc067f0f42005ab33b9047bf2..079a097f9e6ad7ba7f177c3132b41c2f89d4a272 100644 |
--- a/server/internal/logdog/collector/coordinator/cache.go |
+++ b/server/internal/logdog/collector/coordinator/cache.go |
@@ -5,10 +5,11 @@ |
package coordinator |
import ( |
- "sync/atomic" |
+ "sync" |
"time" |
"github.com/luci/luci-go/common/clock" |
+ "github.com/luci/luci-go/common/config" |
"github.com/luci/luci-go/common/logdog/types" |
log "github.com/luci/luci-go/common/logging" |
"github.com/luci/luci-go/common/lru" |
@@ -67,10 +68,15 @@ func (c *cache) RegisterStream(ctx context.Context, st *LogStreamState, d *logpb |
*LogStreamState, error) { |
now := clock.Now(ctx) |
+ key := cacheEntryKey{ |
+ project: st.Project, |
+ path: st.Path, |
+ } |
+ |
// Get the cacheEntry from our cache. If it is expired, doesn't exist, or |
// we're forcing, ignore any existing entry and replace with a Promise pending |
// Coordinator sync. |
- entry := c.lru.Mutate(st.Path, func(current interface{}) interface{} { |
+ entry := c.lru.Mutate(key, func(current interface{}) interface{} { |
// Don't replace an existing entry, unless it has an error or has expired. |
if current != nil { |
curEntry := current.(*cacheEntry) |
@@ -86,6 +92,7 @@ func (c *cache) RegisterStream(ctx context.Context, st *LogStreamState, d *logpb |
} |
return &LogStreamState{ |
+ Project: st.Project, |
Path: st.Path, |
ProtoVersion: st.ProtoVersion, |
Secret: st.Secret, |
@@ -96,9 +103,12 @@ func (c *cache) RegisterStream(ctx context.Context, st *LogStreamState, d *logpb |
}) |
return &cacheEntry{ |
+ cacheEntryKey: cacheEntryKey{ |
+ project: st.Project, |
+ path: st.Path, |
+ }, |
terminalIndex: -1, |
p: p, |
- path: st.Path, |
expiresAt: now.Add(c.expiration), |
} |
}).(*cacheEntry) |
@@ -116,9 +126,14 @@ func (c *cache) RegisterStream(ctx context.Context, st *LogStreamState, d *logpb |
} |
func (c *cache) TerminateStream(ctx context.Context, st *LogStreamState) error { |
+ key := cacheEntryKey{ |
+ project: st.Project, |
+ path: st.Path, |
+ } |
+ |
// Immediately update our state cache to record the terminal index, if |
// we have a state cache. |
- c.lru.Mutate(st.Path, func(current interface{}) (r interface{}) { |
+ c.lru.Mutate(key, func(current interface{}) (r interface{}) { |
// Always return the current entry. We're just atomically examining it to |
// load it with a terminal index. |
r = current |
@@ -131,6 +146,12 @@ func (c *cache) TerminateStream(ctx context.Context, st *LogStreamState) error { |
return c.Coordinator.TerminateStream(ctx, st) |
} |
+// cacheEntryKey is the LRU key for a cacheEntry. |
+type cacheEntryKey struct { |
+ project config.ProjectName |
+ path types.StreamPath |
+} |
+ |
// cacheEntry is the value stored in the cache. It contains a Promise |
// representing the value and an optional invalidation singleton to ensure that |
// if the state failed to fetch, it will be invalidated at most once. |
@@ -139,17 +160,19 @@ func (c *cache) TerminateStream(ctx context.Context, st *LogStreamState) error { |
// by calling the cache's "put" method. In this case, the Promise will be nil, |
// and the state value will be populated. |
type cacheEntry struct { |
+ sync.Mutex |
+ cacheEntryKey |
+ |
// terminalIndex is the loaded terminal index set via loadTerminalIndex. It |
// will be applied to returned LogStreamState objects so that once a terminal |
// index has been set, we become aware of it in the Collector. |
- // |
- // This MUST be the first field in the struct in order to comply with atomic's |
- // 64-bit alignment requirements. |
- terminalIndex int64 |
+ terminalIndex types.MessageIndex |
// p is a Promise that is blocking pending a Coordiantor stream state |
// response. Upon successful resolution, it will contain a *LogStreamState. |
- p promise.Promise |
+ p promise.Promise |
+ |
+ project config.ProjectName |
path types.StreamPath |
expiresAt time.Time |
} |
@@ -170,9 +193,12 @@ func (e *cacheEntry) get(ctx context.Context) (*LogStreamState, error) { |
// cached, apply that to the response. |
// |
// We need to lock around our terminalIndex. |
+ e.Lock() |
+ defer e.Unlock() |
+ |
rp := *(promisedSt.(*LogStreamState)) |
if rp.TerminalIndex < 0 { |
- rp.TerminalIndex = types.MessageIndex(atomic.LoadInt64(&e.terminalIndex)) |
+ rp.TerminalIndex = e.terminalIndex |
} |
return &rp, nil |
@@ -191,5 +217,8 @@ func (e *cacheEntry) hasError() bool { |
// loadTerminalIndex loads a local cache of the stream's terminal index. This |
// will be applied to all future get requests. |
func (e *cacheEntry) loadTerminalIndex(tidx types.MessageIndex) { |
- atomic.StoreInt64(&e.terminalIndex, int64(tidx)) |
+ e.Lock() |
+ defer e.Unlock() |
+ |
+ e.terminalIndex = tidx |
} |