Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(301)

Unified Diff: server/internal/logdog/collector/streamstatecache.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: server/internal/logdog/collector/streamstatecache.go
diff --git a/server/internal/logdog/collector/streamstatecache.go b/server/internal/logdog/collector/streamstatecache.go
new file mode 100644
index 0000000000000000000000000000000000000000..2c329c46680cc9efab94515b463d10e745ba9b81
--- /dev/null
+++ b/server/internal/logdog/collector/streamstatecache.go
@@ -0,0 +1,228 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
dnj (Google) 2016/01/21 04:36:25 Another main file for review. Comments should prov
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package collector
+
+import (
+ "errors"
+ "sync/atomic"
+ "time"
+
+ "github.com/luci/luci-go/common/clock"
+ "github.com/luci/luci-go/common/logdog/types"
+ log "github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/common/lru"
+ "github.com/luci/luci-go/common/promise"
+ cc "github.com/luci/luci-go/server/internal/logdog/coordinatorClient"
+ "golang.org/x/net/context"
+)
+
+const (
+ // The (maximum) size of the LRU cache.
+ defaultStreamStateCacheSize = 1024 * 1024
+)
+
+// streamStateCacheOptions holds configurable options for a streamStateCache
+// instance.
+type streamStateCacheOptions struct {
+ // coordinator is the administrative Coordinator instance to use.
+ coordinator CoordinatorClient
+
+ // cacheSize is the number of stream states to hold in the cache. If zero,
+ // defaultStreamStateCacheSize will be used.
+ cacheSize int
+
+ // expiration is the expiration period of time. If an entry is older than this
+ // period, it will be discarded. If zero, DefaultStreamStateCacheExpire will
+ // be used.
+ expiration time.Duration
+}
+
+// stateProxy is a local representation of a remote stream's state. It is a
+// subset of the remote state with the necessary elements for the Collector to
+// operate and update.
+type stateProxy struct {
+ path types.StreamPath
+ proto string
+ secret []byte
+ terminalIndex types.MessageIndex
+ archived bool
+ purged bool
+}
+
+// streamStateCache loads and caches the LogStream for a specified stream.
+//
+// Stream state is formally owned by the Coordinator instance. However, since
+// state can only progress forwards and registration/termination operations are
+// idempotent, it is safe to cache to alleviate server requests. This maintains
+// stream state using an LRU cache to manage memory.
+//
+// The cache is responsible for two things: Firstly, it coalesces multiple
+// pending requests for the same stream state into a single Coordinator request.
+// Secondly, it maintains a cache of completed responses to short-circuit the
+// Coordinator.
+//
+// Stream state is stored internally as a Promise. This Promise is evaluated by
+// querying the Coordinator. This interface is hidden to callers.
+type streamStateCache struct {
+ *streamStateCacheOptions
+
+ cache *lru.Cache
+}
+
+// streamStateCacheEntry is the value stored in the cache. It contains a Promise
+// representing the value and an optional invalidation singleton to ensure that
+// if the state failed to fetch, it will be invalidated at most once.
+//
+// In addition to remote cachine via Promise, the state can be updated locally
+// by calling the cache's "put" method. In this case, the Promise will be nil,
+// and the state value will be populated.
+type streamStateCacheEntry struct {
+ // terminalIndex is the loaded terminal index set via loadTerminalIndex. It
+ // will be applied to returned stateProxy objects so that once a terminal
+ // index has been set, we become aware of it in the Collector.
+ //
+ // This MUST be the first field in the struct in order to comply with atomic's
+ // 64-bit alignment requirements.
+ terminalIndex int64
+
+ p promise.Promise
+ path types.StreamPath
+ expiresAt time.Time
+}
+
+// get returns the cached state that this entry owns, blocking until resolution
+// if necessary.
+func (e *streamStateCacheEntry) get(ctx context.Context) (*stateProxy, error) {
+ state, err := e.p.Get(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ // Create a clone of our cached value (not deep, so secret is not cloned, but
+ // the Collector will not modify that). If we have a local terminal index
+ // cached, apply that to the response.
+ //
+ // We need to lock around our terminalIndex.
+ rp := *(state.(*stateProxy))
+ if rp.terminalIndex < 0 {
+ rp.terminalIndex = types.MessageIndex(atomic.LoadInt64(&e.terminalIndex))
+ }
+
+ return &rp, nil
+}
+
+// hasError tests if this entry has completed evaluation with an error state.
+// This is non-blocking, so if the evaluation hasn't completed, it will return
+// false.
+func (e *streamStateCacheEntry) hasError() bool {
+ if _, err := e.p.Peek(); err != nil && err != promise.ErrNoData {
+ return true
+ }
+ return false
+}
+
+// loadTerminalIndex loads a local cache of the stream's terminal index. This
+// will be applied to all future get requests.
+func (e *streamStateCacheEntry) loadTerminalIndex(tidx types.MessageIndex) {
+ atomic.StoreInt64(&e.terminalIndex, int64(tidx))
+}
+
+// newStreamStateCache instantiates and initializes a new stream state cache
+// instance.
+func newStreamStateCache(o streamStateCacheOptions) *streamStateCache {
+ if o.cacheSize <= 0 {
+ o.cacheSize = defaultStreamStateCacheSize
+ }
+ if o.expiration <= 0 {
+ o.expiration = DefaultStreamStateCacheExpire
+ }
+
+ return &streamStateCache{
+ streamStateCacheOptions: &o,
+
+ cache: lru.New(o.cacheSize),
+ }
+}
+
+// getOrRegister is a goroutine-safe blocking call that synchronizes log stream
+// state with the Coordinator.
+//
+// If successful, the supplied state will be pushed directly to the
+// Coordinator service, and the pointer value will be overwritten with the
+// state returned by the Coordinator service.
+//
+// If an error occurs and it is transient, an errors.Transient error will be
+// returned.
+func (s *streamStateCache) getOrRegister(ctx context.Context, state *cc.State) (*stateProxy, error) {
+ now := clock.Now(ctx)
+
+ // Get the streamStateCacheEntry from our cache. If it is expired, doesn't
+ // exist, or we're forcing, ignore any existing entry and replace with a
+ // Promise pending Coordinator sync.
+ entry := s.cache.Mutate(state.Path, func(current interface{}) interface{} {
+ // Don't replace an existing entry, unless it has an error or has expired.
+ if current != nil {
+ curEntry := current.(*streamStateCacheEntry)
+ if !curEntry.hasError() && now.Before(curEntry.expiresAt) {
+ return current
+ }
+ }
+
+ p := promise.New(func() (interface{}, error) {
+ st, err := s.coordinator.RegisterStream(ctx, *state)
+ if err != nil {
+ return nil, err
+ }
+
+ // The response must have an embedded state.
+ if st == nil || st.State == nil {
+ return nil, errors.New("registration response is missing embedded state")
+ }
+
+ return &stateProxy{
+ path: state.Path,
+ proto: st.ProtoVersion,
+ secret: st.Secret,
+ terminalIndex: types.MessageIndex(st.State.TerminalIndex),
+ archived: st.Archived(),
+ purged: st.State.Purged,
+ }, nil
+ })
+
+ return &streamStateCacheEntry{
+ terminalIndex: -1,
+ p: p,
+ path: state.Path,
+ expiresAt: now.Add(s.expiration),
+ }
+ }).(*streamStateCacheEntry)
+
+ // If there was an error, purge the erroneous entry from the cache so that
+ // the next "update" will re-fetch it.
+ st, err := entry.get(ctx)
+ if err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ }.Errorf(ctx, "Error retrieving stream state.")
+ return nil, err
+ }
+ return st, nil
+}
+
+func (s *streamStateCache) setTerminalIndex(ctx context.Context, st *stateProxy) error {
+ // Immediately update our state cache to record the terminal index, if
+ // we have a state cache.
+ s.cache.Mutate(st.path, func(current interface{}) (r interface{}) {
+ // Always return the current entry. We're just atomically examining it to
+ // load it with a terminal index.
+ r = current
+ if r != nil {
+ r.(*streamStateCacheEntry).loadTerminalIndex(st.terminalIndex)
+ }
+ return
+ })
+
+ return s.coordinator.TerminateStream(ctx, st.path, st.secret, st.terminalIndex)
+}

Powered by Google App Engine
This is Rietveld 408576698