| Index: server/internal/logdog/collector/coordinator/cache.go
|
| diff --git a/server/internal/logdog/collector/coordinator/cache.go b/server/internal/logdog/collector/coordinator/cache.go
|
| index 24b3cef9fca2bdcbc067f0f42005ab33b9047bf2..079a097f9e6ad7ba7f177c3132b41c2f89d4a272 100644
|
| --- a/server/internal/logdog/collector/coordinator/cache.go
|
| +++ b/server/internal/logdog/collector/coordinator/cache.go
|
| @@ -5,10 +5,11 @@
|
| package coordinator
|
|
|
| import (
|
| - "sync/atomic"
|
| + "sync"
|
| "time"
|
|
|
| "github.com/luci/luci-go/common/clock"
|
| + "github.com/luci/luci-go/common/config"
|
| "github.com/luci/luci-go/common/logdog/types"
|
| log "github.com/luci/luci-go/common/logging"
|
| "github.com/luci/luci-go/common/lru"
|
| @@ -67,10 +68,15 @@ func (c *cache) RegisterStream(ctx context.Context, st *LogStreamState, d *logpb
|
| *LogStreamState, error) {
|
| now := clock.Now(ctx)
|
|
|
| + key := cacheEntryKey{
|
| + project: st.Project,
|
| + path: st.Path,
|
| + }
|
| +
|
| // Get the cacheEntry from our cache. If it is expired, doesn't exist, or
|
| // we're forcing, ignore any existing entry and replace with a Promise pending
|
| // Coordinator sync.
|
| - entry := c.lru.Mutate(st.Path, func(current interface{}) interface{} {
|
| + entry := c.lru.Mutate(key, func(current interface{}) interface{} {
|
| // Don't replace an existing entry, unless it has an error or has expired.
|
| if current != nil {
|
| curEntry := current.(*cacheEntry)
|
| @@ -86,6 +92,7 @@ func (c *cache) RegisterStream(ctx context.Context, st *LogStreamState, d *logpb
|
| }
|
|
|
| return &LogStreamState{
|
| + Project: st.Project,
|
| Path: st.Path,
|
| ProtoVersion: st.ProtoVersion,
|
| Secret: st.Secret,
|
| @@ -96,9 +103,12 @@ func (c *cache) RegisterStream(ctx context.Context, st *LogStreamState, d *logpb
|
| })
|
|
|
| return &cacheEntry{
|
| + cacheEntryKey: cacheEntryKey{
|
| + project: st.Project,
|
| + path: st.Path,
|
| + },
|
| terminalIndex: -1,
|
| p: p,
|
| - path: st.Path,
|
| expiresAt: now.Add(c.expiration),
|
| }
|
| }).(*cacheEntry)
|
| @@ -116,9 +126,14 @@ func (c *cache) RegisterStream(ctx context.Context, st *LogStreamState, d *logpb
|
| }
|
|
|
| func (c *cache) TerminateStream(ctx context.Context, st *LogStreamState) error {
|
| + key := cacheEntryKey{
|
| + project: st.Project,
|
| + path: st.Path,
|
| + }
|
| +
|
| // Immediately update our state cache to record the terminal index, if
|
| // we have a state cache.
|
| - c.lru.Mutate(st.Path, func(current interface{}) (r interface{}) {
|
| + c.lru.Mutate(key, func(current interface{}) (r interface{}) {
|
| // Always return the current entry. We're just atomically examining it to
|
| // load it with a terminal index.
|
| r = current
|
| @@ -131,6 +146,12 @@ func (c *cache) TerminateStream(ctx context.Context, st *LogStreamState) error {
|
| return c.Coordinator.TerminateStream(ctx, st)
|
| }
|
|
|
| +// cacheEntryKey is the LRU key for a cacheEntry.
|
| +type cacheEntryKey struct {
|
| + project config.ProjectName
|
| + path types.StreamPath
|
| +}
|
| +
|
| // cacheEntry is the value stored in the cache. It contains a Promise
|
| // representing the value and an optional invalidation singleton to ensure that
|
| // if the state failed to fetch, it will be invalidated at most once.
|
| @@ -139,17 +160,19 @@ func (c *cache) TerminateStream(ctx context.Context, st *LogStreamState) error {
|
| // by calling the cache's "put" method. In this case, the Promise will be nil,
|
| // and the state value will be populated.
|
| type cacheEntry struct {
|
| + sync.Mutex
|
| + cacheEntryKey
|
| +
|
| // terminalIndex is the loaded terminal index set via loadTerminalIndex. It
|
| // will be applied to returned LogStreamState objects so that once a terminal
|
| // index has been set, we become aware of it in the Collector.
|
| - //
|
| - // This MUST be the first field in the struct in order to comply with atomic's
|
| - // 64-bit alignment requirements.
|
| - terminalIndex int64
|
| + terminalIndex types.MessageIndex
|
|
|
| // p is a Promise that is blocking pending a Coordiantor stream state
|
| // response. Upon successful resolution, it will contain a *LogStreamState.
|
| - p promise.Promise
|
| + p promise.Promise
|
| +
|
| + project config.ProjectName
|
| path types.StreamPath
|
| expiresAt time.Time
|
| }
|
| @@ -170,9 +193,12 @@ func (e *cacheEntry) get(ctx context.Context) (*LogStreamState, error) {
|
| // cached, apply that to the response.
|
| //
|
| // We need to lock around our terminalIndex.
|
| + e.Lock()
|
| + defer e.Unlock()
|
| +
|
| rp := *(promisedSt.(*LogStreamState))
|
| if rp.TerminalIndex < 0 {
|
| - rp.TerminalIndex = types.MessageIndex(atomic.LoadInt64(&e.terminalIndex))
|
| + rp.TerminalIndex = e.terminalIndex
|
| }
|
|
|
| return &rp, nil
|
| @@ -191,5 +217,8 @@ func (e *cacheEntry) hasError() bool {
|
| // loadTerminalIndex loads a local cache of the stream's terminal index. This
|
| // will be applied to all future get requests.
|
| func (e *cacheEntry) loadTerminalIndex(tidx types.MessageIndex) {
|
| - atomic.StoreInt64(&e.terminalIndex, int64(tidx))
|
| + e.Lock()
|
| + defer e.Unlock()
|
| +
|
| + e.terminalIndex = tidx
|
| }
|
|
|