| Index: common/lazyslot/lazyslot.go
|
| diff --git a/common/lazyslot/lazyslot.go b/common/lazyslot/lazyslot.go
|
| index a7ae392cf670f4c73ea279a20a1907562697ac67..9cf59cee00592eb06abddff788c944c616bb7a43 100644
|
| --- a/common/lazyslot/lazyslot.go
|
| +++ b/common/lazyslot/lazyslot.go
|
| @@ -3,9 +3,11 @@
|
| // found in the LICENSE file.
|
|
|
| // Package lazyslot implements a caching scheme for globally shared objects that
|
| -// take significant time to refresh. The defining property of the implementation
|
| -// is that only one goroutine (can be background one) will block when refreshing
|
| -// such object, while all others will use a slightly stale cached copy.
|
| +// take significant time to refresh.
|
| +//
|
| +// The defining property of the implementation is that only one goroutine will
|
| +// block when refreshing such object, while all others will use a slightly stale
|
| +// cached copy.
|
| package lazyslot
|
|
|
| import (
|
| @@ -25,106 +27,116 @@ type Value struct {
|
| Expiration time.Time
|
| }
|
|
|
| -// Fetcher knows how to load new value.
|
| +// Fetcher knows how to load a new value.
|
| +//
|
| +// If it returns no errors, it MUST return non-nil Value.Value or Slot.Get will
|
| +// panic.
|
| type Fetcher func(c context.Context, prev Value) (Value, error)
|
|
|
| -// Slot holds a cached Value and refreshes it when it expires. Only one
|
| -// goroutine will be busy refreshing, all others will see a slightly stale
|
| -// copy of the value during the refresh.
|
| +// Slot holds a cached Value and refreshes it when it expires.
|
| +//
|
| +// Only one goroutine will be busy refreshing, all others will see a slightly
|
| +// stale copy of the value during the refresh.
|
| type Slot struct {
|
| Fetcher Fetcher // used to actually load the value on demand
|
| Timeout time.Duration // how long to allow to fetch, 5 sec by default.
|
| - Async bool // if true do fetches in background goroutine
|
|
|
| - lock sync.Mutex // protects the guts below
|
| - current *Value // currently known value or nil if not fetched
|
| - currentFetcher context.Context // non nil if some goroutine is fetching now
|
| + lock sync.Mutex // protects the guts below
|
| + current *Value // currently known value or nil if not fetched
|
| + currentFetcherCtx context.Context // non-nil if some goroutine is fetching now
|
| }
|
|
|
| -// Peek returns currently cached value if there's one or zero Value{} if not.
|
| -// It doesn't try to fetch a value.
|
| -func (s *Slot) Peek() Value {
|
| - if s.current == nil {
|
| - return Value{}
|
| +// Get returns stored value if it is still fresh.
|
| +//
|
| +// It may return slightly stale copy if some other goroutine is fetching a new
|
| +// copy now. If there's no cached copy at all, blocks until it is retrieved.
|
| +//
|
| +// Returns an error only when Fetcher returns an error. Panics if fetcher
|
| +// doesn't produce a value, and doesn't return an error.
|
| +func (s *Slot) Get(c context.Context) (result Value, err error) {
|
| + // state is populate in the anonymous function below.
|
| + var state struct {
|
| + C context.Context
|
| + Fetcher Fetcher
|
| + PrevValue Value
|
| }
|
| - return *s.current
|
| -}
|
|
|
| -// Get returns stored value if it is still fresh. It may return slightly stale
|
| -// copy if some other goroutine is fetching a new copy now. If there's no cached
|
| -// copy at all, blocks until it is retrieved (even if slot is configured with
|
| -// Async = true). Returns an error only when Fetcher returns an error.
|
| -func (s *Slot) Get(c context.Context) (Value, error) {
|
| - now := clock.Now(c)
|
| -
|
| - // Set in the local function below, used it fetch is needed.
|
| - var (
|
| - ctx context.Context
|
| - fetchCb Fetcher
|
| - prevVal Value
|
| - async bool
|
| - )
|
| -
|
| - // If done is true, val and err are returned right away.
|
| - done, val, err := func() (bool, Value, error) {
|
| + result, err, done := func() (result Value, err error, done bool) {
|
| + now := clock.Now(c)
|
| +
|
| + // This lock protects the guts of the slot and makes sure only one goroutine
|
| + // is doing an initial fetch.
|
| s.lock.Lock()
|
| defer s.lock.Unlock()
|
|
|
| - // Still fresh? Return right away.
|
| + // A cached value exists and it is still fresh? Return it right away.
|
| if s.current != nil && now.Before(s.current.Expiration) {
|
| - return true, *s.current, nil
|
| + result = *s.current
|
| + done = true
|
| + return
|
| }
|
|
|
| // Fetching the value for the first time ever? Do it under the lock because
|
| // there's nothing to return yet. All goroutines would have to wait for this
|
| // initial fetch to complete. They'll all block on s.lock.Lock() above.
|
| if s.current == nil {
|
| - val, err := s.Fetcher(c, Value{})
|
| - if err != nil {
|
| - return true, Value{}, err
|
| + result, err = doFetch(c, s.Fetcher, Value{})
|
| + if err == nil {
|
| + s.current = &result
|
| }
|
| - s.current = &val
|
| - return true, val, nil
|
| + done = true
|
| + return
|
| }
|
|
|
| - // We have a cached copy and it has expired. Maybe some other goroutine is
|
| - // fetching it already? Returns the stale copy if so.
|
| - if s.currentFetcher != nil {
|
| - return true, *s.current, nil
|
| + // We have a cached copy but it has expired. Maybe some other goroutine is
|
| + // fetching it already? Returns the cached stale copy if so.
|
| + if s.currentFetcherCtx != nil {
|
| + result = *s.current
|
| + done = true
|
| + return
|
| }
|
|
|
| - // No one is fetching the value now, we should do it. Release the lock while
|
| - // fetching to allow other goroutines to grab the stale copy.
|
| + // No one is fetching the value now, we should do it. Prepare a new context
|
| + // that will be used to do the fetch once lock is released.
|
| timeout := 5 * time.Second
|
| if s.Timeout != 0 {
|
| timeout = s.Timeout
|
| }
|
| - s.currentFetcher, _ = context.WithTimeout(c, timeout)
|
| - ctx = s.currentFetcher
|
| - fetchCb = s.Fetcher
|
| - prevVal = *s.current
|
| - async = s.Async
|
| - return false, Value{}, nil
|
| + s.currentFetcherCtx, _ = context.WithTimeout(c, timeout)
|
| +
|
| + // Copy lock-protected guts into local variables before releasing the lock.
|
| + state.C = s.currentFetcherCtx
|
| + state.Fetcher = s.Fetcher
|
| + state.PrevValue = *s.current
|
| + return
|
| }()
|
| if done {
|
| - return val, err
|
| + return
|
| }
|
|
|
| - fetch := func() (val Value, err error) {
|
| + // Finish the fetch and update the cached value.
|
| + return func() (result Value, err error) {
|
| defer func() {
|
| s.lock.Lock()
|
| defer s.lock.Unlock()
|
| - s.currentFetcher = nil
|
| - if err == nil && val.Value != nil {
|
| - s.current = &val
|
| + s.currentFetcherCtx = nil
|
| + // result.Value is not nil iff fetch succeeded and didn't panic.
|
| + if result.Value != nil {
|
| + s.current = &result
|
| }
|
| }()
|
| - return fetchCb(ctx, prevVal)
|
| - }
|
| + return doFetch(state.C, state.Fetcher, state.PrevValue)
|
| + }()
|
| +}
|
|
|
| - if async {
|
| - go fetch()
|
| - return prevVal, nil
|
| +// doFetch calls fetcher callback and validates return value.
|
| +func doFetch(ctx context.Context, cb Fetcher, prev Value) (result Value, err error) {
|
| + result, err = cb(ctx, prev)
|
| + switch {
|
| + case err == nil && result.Value == nil:
|
| + panic("lazyslot.Slot Fetcher returned nil value")
|
| + case err != nil:
|
| + result = Value{}
|
| }
|
| - return fetch()
|
| + return
|
| }
|
|
|