Chromium Code Reviews| Index: server/internal/logdog/collector/streamstatecache.go |
| diff --git a/server/internal/logdog/collector/streamstatecache.go b/server/internal/logdog/collector/streamstatecache.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..2c329c46680cc9efab94515b463d10e745ba9b81 |
| --- /dev/null |
| +++ b/server/internal/logdog/collector/streamstatecache.go |
| @@ -0,0 +1,228 @@ |
| +// Copyright 2016 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package collector |
| + |
| +import ( |
| + "errors" |
| + "sync/atomic" |
| + "time" |
| + |
| + "github.com/luci/luci-go/common/clock" |
| + "github.com/luci/luci-go/common/logdog/types" |
| + log "github.com/luci/luci-go/common/logging" |
| + "github.com/luci/luci-go/common/lru" |
| + "github.com/luci/luci-go/common/promise" |
| + cc "github.com/luci/luci-go/server/internal/logdog/coordinatorClient" |
| + "golang.org/x/net/context" |
| +) |
| + |
| +const ( |
| + // The (maximum) size of the LRU cache. |
| + defaultStreamStateCacheSize = 1024 * 1024 |
| +) |
| + |
| +// streamStateCacheOptions holds configurable options for a streamStateCache |
| +// instance. |
| +type streamStateCacheOptions struct { |
| + // coordinator is the administrative Coordinator instance to use. |
| + coordinator CoordinatorClient |
| + |
| + // cacheSize is the number of stream states to hold in the cache. If zero, |
| + // defaultStreamStateCacheSize will be used. |
| + cacheSize int |
| + |
| + // expiration is the expiration period of time. If an entry is older than this |
| + // period, it will be discarded. If zero, DefaultStreamStateCacheExpire will |
| + // be used. |
| + expiration time.Duration |
| +} |
| + |
| +// stateProxy is a local representation of a remote stream's state. It is a |
| +// subset of the remote state with the necessary elements for the Collector to |
| +// operate and update. |
| +type stateProxy struct { |
| + path types.StreamPath |
| + proto string |
|
martiniss
2016/01/26 02:24:18
wat this.
dnj (Google)
2016/01/26 05:06:05
Done.
|
| + secret []byte |
| + terminalIndex types.MessageIndex |
| + archived bool |
| + purged bool |
| +} |
| + |
| +// streamStateCache loads and caches the LogStream for a specified stream. |
| +// |
| +// Stream state is formally owned by the Coordinator instance. However, since |
| +// state can only progress forwards and registration/termination operations are |
| +// idempotent, it is safe to cache to alleviate server requests. This maintains |
| +// stream state using an LRU cache to manage memory. |
| +// |
| +// The cache is responsible for two things: Firstly, it coalesces multiple |
| +// pending requests for the same stream state into a single Coordinator request. |
| +// Secondly, it maintains a cache of completed responses to short-circuit the |
| +// Coordinator. |
| +// |
| +// Stream state is stored internally as a Promise. This Promise is evaluated by |
| +// querying the Coordinator. This interface is hidden to callers. |
| +type streamStateCache struct { |
| + *streamStateCacheOptions |
| + |
| + cache *lru.Cache |
| +} |
| + |
| +// streamStateCacheEntry is the value stored in the cache. It contains a Promise |
| +// representing the value and an optional invalidation singleton to ensure that |
| +// if the state failed to fetch, it will be invalidated at most once. |
| +// |
| +// In addition to remote cachine via Promise, the state can be updated locally |
| +// by calling the cache's "put" method. In this case, the Promise will be nil, |
| +// and the state value will be populated. |
| +type streamStateCacheEntry struct { |
| + // terminalIndex is the loaded terminal index set via loadTerminalIndex. It |
| + // will be applied to returned stateProxy objects so that once a terminal |
| + // index has been set, we become aware of it in the Collector. |
| + // |
| + // This MUST be the first field in the struct in order to comply with atomic's |
| + // 64-bit alignment requirements. |
| + terminalIndex int64 |
| + |
| + p promise.Promise |
| + path types.StreamPath |
| + expiresAt time.Time |
| +} |
| + |
| +// get returns the cached state that this entry owns, blocking until resolution |
| +// if necessary. |
| +func (e *streamStateCacheEntry) get(ctx context.Context) (*stateProxy, error) { |
| + state, err := e.p.Get(ctx) |
| + if err != nil { |
| + return nil, err |
| + } |
| + |
| + // Create a clone of our cached value (not deep, so secret is not cloned, but |
| + // the Collector will not modify that). If we have a local terminal index |
| + // cached, apply that to the response. |
| + // |
| + // We need to lock around our terminalIndex. |
| + rp := *(state.(*stateProxy)) |
| + if rp.terminalIndex < 0 { |
| + rp.terminalIndex = types.MessageIndex(atomic.LoadInt64(&e.terminalIndex)) |
| + } |
| + |
| + return &rp, nil |
| +} |
| + |
| +// hasError tests if this entry has completed evaluation with an error state. |
| +// This is non-blocking, so if the evaluation hasn't completed, it will return |
| +// false. |
| +func (e *streamStateCacheEntry) hasError() bool { |
| + if _, err := e.p.Peek(); err != nil && err != promise.ErrNoData { |
| + return true |
| + } |
| + return false |
| +} |
| + |
| +// loadTerminalIndex loads a local cache of the stream's terminal index. This |
| +// will be applied to all future get requests. |
| +func (e *streamStateCacheEntry) loadTerminalIndex(tidx types.MessageIndex) { |
| + atomic.StoreInt64(&e.terminalIndex, int64(tidx)) |
| +} |
| + |
| +// newStreamStateCache instantiates and initializes a new stream state cache |
| +// instance. |
| +func newStreamStateCache(o streamStateCacheOptions) *streamStateCache { |
| + if o.cacheSize <= 0 { |
| + o.cacheSize = defaultStreamStateCacheSize |
| + } |
| + if o.expiration <= 0 { |
| + o.expiration = DefaultStreamStateCacheExpire |
| + } |
| + |
| + return &streamStateCache{ |
| + streamStateCacheOptions: &o, |
| + |
| + cache: lru.New(o.cacheSize), |
| + } |
| +} |
| + |
| +// getOrRegister is a goroutine-safe blocking call that synchronizes log stream |
| +// state with the Coordinator. |
| +// |
| +// If successful, the supplied state will be pushed directly to the |
| +// Coordinator service, and the pointer value will be overwritten with the |
| +// state returned by the Coordinator service. |
| +// |
| +// If an error occurs and it is transient, an errors.Transient error will be |
| +// returned. |
| +func (s *streamStateCache) getOrRegister(ctx context.Context, state *cc.State) (*stateProxy, error) { |
| + now := clock.Now(ctx) |
| + |
| + // Get the streamStateCacheEntry from our cache. If it is expired, doesn't |
| + // exist, or we're forcing, ignore any existing entry and replace with a |
| + // Promise pending Coordinator sync. |
| + entry := s.cache.Mutate(state.Path, func(current interface{}) interface{} { |
| + // Don't replace an existing entry, unless it has an error or has expired. |
| + if current != nil { |
| + curEntry := current.(*streamStateCacheEntry) |
| + if !curEntry.hasError() && now.Before(curEntry.expiresAt) { |
| + return current |
| + } |
| + } |
| + |
| + p := promise.New(func() (interface{}, error) { |
| + st, err := s.coordinator.RegisterStream(ctx, *state) |
| + if err != nil { |
| + return nil, err |
| + } |
| + |
| + // The response must have an embedded state. |
| + if st == nil || st.State == nil { |
| + return nil, errors.New("registration response is missing embedded state") |
| + } |
| + |
| + return &stateProxy{ |
| + path: state.Path, |
| + proto: st.ProtoVersion, |
| + secret: st.Secret, |
| + terminalIndex: types.MessageIndex(st.State.TerminalIndex), |
| + archived: st.Archived(), |
| + purged: st.State.Purged, |
| + }, nil |
| + }) |
| + |
| + return &streamStateCacheEntry{ |
| + terminalIndex: -1, |
| + p: p, |
| + path: state.Path, |
| + expiresAt: now.Add(s.expiration), |
| + } |
| + }).(*streamStateCacheEntry) |
| + |
| + // If there was an error, purge the erroneous entry from the cache so that |
| + // the next "update" will re-fetch it. |
| + st, err := entry.get(ctx) |
| + if err != nil { |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + }.Errorf(ctx, "Error retrieving stream state.") |
| + return nil, err |
| + } |
| + return st, nil |
| +} |
| + |
| +func (s *streamStateCache) setTerminalIndex(ctx context.Context, st *stateProxy) error { |
| + // Immediately update our state cache to record the terminal index, if |
| + // we have a state cache. |
| + s.cache.Mutate(st.path, func(current interface{}) (r interface{}) { |
| + // Always return the current entry. We're just atomically examining it to |
| + // load it with a terminal index. |
| + r = current |
| + if r != nil { |
| + r.(*streamStateCacheEntry).loadTerminalIndex(st.terminalIndex) |
| + } |
| + return |
| + }) |
| + |
| + return s.coordinator.TerminateStream(ctx, st.path, st.secret, st.terminalIndex) |
| +} |