Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1525)

Unified Diff: server/internal/logdog/collector/coordinator/cache.go

Issue 1906023002: LogDog: Add project namespace to Butler/Collector. (Closed) Base URL: https://github.com/luci/luci-go@logdog-project-archivist
Patch Set: Rebase? Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: server/internal/logdog/collector/coordinator/cache.go
diff --git a/server/internal/logdog/collector/coordinator/cache.go b/server/internal/logdog/collector/coordinator/cache.go
index 24b3cef9fca2bdcbc067f0f42005ab33b9047bf2..079a097f9e6ad7ba7f177c3132b41c2f89d4a272 100644
--- a/server/internal/logdog/collector/coordinator/cache.go
+++ b/server/internal/logdog/collector/coordinator/cache.go
@@ -5,10 +5,11 @@
package coordinator
import (
- "sync/atomic"
+ "sync"
"time"
"github.com/luci/luci-go/common/clock"
+ "github.com/luci/luci-go/common/config"
"github.com/luci/luci-go/common/logdog/types"
log "github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/common/lru"
@@ -67,10 +68,15 @@ func (c *cache) RegisterStream(ctx context.Context, st *LogStreamState, d *logpb
*LogStreamState, error) {
now := clock.Now(ctx)
+ key := cacheEntryKey{
+ project: st.Project,
+ path: st.Path,
+ }
+
// Get the cacheEntry from our cache. If it is expired, doesn't exist, or
// we're forcing, ignore any existing entry and replace with a Promise pending
// Coordinator sync.
- entry := c.lru.Mutate(st.Path, func(current interface{}) interface{} {
+ entry := c.lru.Mutate(key, func(current interface{}) interface{} {
// Don't replace an existing entry, unless it has an error or has expired.
if current != nil {
curEntry := current.(*cacheEntry)
@@ -86,6 +92,7 @@ func (c *cache) RegisterStream(ctx context.Context, st *LogStreamState, d *logpb
}
return &LogStreamState{
+ Project: st.Project,
Path: st.Path,
ProtoVersion: st.ProtoVersion,
Secret: st.Secret,
@@ -96,9 +103,12 @@ func (c *cache) RegisterStream(ctx context.Context, st *LogStreamState, d *logpb
})
return &cacheEntry{
+ cacheEntryKey: cacheEntryKey{
+ project: st.Project,
+ path: st.Path,
+ },
terminalIndex: -1,
p: p,
- path: st.Path,
expiresAt: now.Add(c.expiration),
}
}).(*cacheEntry)
@@ -116,9 +126,14 @@ func (c *cache) RegisterStream(ctx context.Context, st *LogStreamState, d *logpb
}
func (c *cache) TerminateStream(ctx context.Context, st *LogStreamState) error {
+ key := cacheEntryKey{
+ project: st.Project,
+ path: st.Path,
+ }
+
// Immediately update our state cache to record the terminal index, if
// we have a state cache.
- c.lru.Mutate(st.Path, func(current interface{}) (r interface{}) {
+ c.lru.Mutate(key, func(current interface{}) (r interface{}) {
// Always return the current entry. We're just atomically examining it to
// load it with a terminal index.
r = current
@@ -131,6 +146,12 @@ func (c *cache) TerminateStream(ctx context.Context, st *LogStreamState) error {
return c.Coordinator.TerminateStream(ctx, st)
}
+// cacheEntryKey is the LRU key for a cacheEntry.
+type cacheEntryKey struct {
+ project config.ProjectName
+ path types.StreamPath
+}
+
// cacheEntry is the value stored in the cache. It contains a Promise
// representing the value and an optional invalidation singleton to ensure that
// if the state failed to fetch, it will be invalidated at most once.
@@ -139,17 +160,19 @@ func (c *cache) TerminateStream(ctx context.Context, st *LogStreamState) error {
// by calling the cache's "put" method. In this case, the Promise will be nil,
// and the state value will be populated.
type cacheEntry struct {
+ sync.Mutex
+ cacheEntryKey
+
// terminalIndex is the loaded terminal index set via loadTerminalIndex. It
// will be applied to returned LogStreamState objects so that once a terminal
// index has been set, we become aware of it in the Collector.
- //
- // This MUST be the first field in the struct in order to comply with atomic's
- // 64-bit alignment requirements.
- terminalIndex int64
+ terminalIndex types.MessageIndex
// p is a Promise that is blocking pending a Coordiantor stream state
// response. Upon successful resolution, it will contain a *LogStreamState.
- p promise.Promise
+ p promise.Promise
+
+ project config.ProjectName
path types.StreamPath
expiresAt time.Time
}
@@ -170,9 +193,12 @@ func (e *cacheEntry) get(ctx context.Context) (*LogStreamState, error) {
// cached, apply that to the response.
//
// We need to lock around our terminalIndex.
+ e.Lock()
+ defer e.Unlock()
+
rp := *(promisedSt.(*LogStreamState))
if rp.TerminalIndex < 0 {
- rp.TerminalIndex = types.MessageIndex(atomic.LoadInt64(&e.terminalIndex))
+ rp.TerminalIndex = e.terminalIndex
}
return &rp, nil
@@ -191,5 +217,8 @@ func (e *cacheEntry) hasError() bool {
// loadTerminalIndex loads a local cache of the stream's terminal index. This
// will be applied to all future get requests.
func (e *cacheEntry) loadTerminalIndex(tidx types.MessageIndex) {
- atomic.StoreInt64(&e.terminalIndex, int64(tidx))
+ e.Lock()
+ defer e.Unlock()
+
+ e.terminalIndex = tidx
}
« no previous file with comments | « server/internal/logdog/collector/collector_test.go ('k') | server/internal/logdog/collector/coordinator/cache_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698