Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package coordinator | |
| 6 | |
| 7 import ( | |
| 8 "sync/atomic" | |
| 9 "time" | |
| 10 | |
| 11 "github.com/luci/luci-go/common/clock" | |
| 12 "github.com/luci/luci-go/common/logdog/types" | |
| 13 log "github.com/luci/luci-go/common/logging" | |
| 14 "github.com/luci/luci-go/common/lru" | |
| 15 "github.com/luci/luci-go/common/promise" | |
| 16 "github.com/luci/luci-go/common/proto/logdog/logpb" | |
| 17 "golang.org/x/net/context" | |
| 18 ) | |
| 19 | |
| 20 const ( | |
| 21 // DefaultSize is the default (maximum) size of the LRU cache. | |
| 22 DefaultSize = 1024 * 1024 | |
| 23 | |
| 24 // DefaultExpiration is the default expiration value. | |
| 25 DefaultExpiration = 10 * time.Minute | |
| 26 ) | |
| 27 | |
| 28 // cache is a Coordinator interface implementation for the Collector service | |
| 29 // that caches remote results locally. | |
| 30 type cache struct { | |
| 31 Coordinator | |
| 32 | |
| 33 // Size is the number of stream states to hold in the cache. If zero, | |
| 34 // DefaultCacheSize will be used. | |
| 35 size int | |
| 36 | |
| 37 // expiration is the maximum lifespan of a cache entry. If an entry is o lder | |
| 38 // than this, it will be discarded. If zero, DefaultExpiration will be u sed. | |
| 39 expiration time.Duration | |
| 40 | |
| 41 // cache is the LRU state cache. | |
| 42 lru *lru.Cache | |
| 43 } | |
| 44 | |
| 45 // NewCache creates a new Cache instance that wraps a Coordinator instance. | |
|
iannucci
2016/02/05 23:41:01
wraps a Coordinator with a cache?
dnj (Google)
2016/02/06 04:10:36
Done.
| |
| 46 func NewCache(c Coordinator, size int, expiration time.Duration) Coordinator { | |
| 47 if size <= 0 { | |
| 48 size = DefaultSize | |
| 49 } | |
| 50 if expiration <= 0 { | |
| 51 expiration = DefaultExpiration | |
| 52 } | |
| 53 | |
| 54 return &cache{ | |
| 55 Coordinator: c, | |
| 56 expiration: expiration, | |
| 57 lru: lru.New(size), | |
| 58 } | |
| 59 } | |
| 60 | |
| 61 // getOrRegister is a goroutine-safe blocking call that synchronizes log stream | |
|
iannucci
2016/02/05 23:41:01
comment desync
dnj (Google)
2016/02/06 04:10:36
Done.
| |
| 62 // state with the Coordinator. | |
| 63 // | |
| 64 // If successful, the supplied state will be pushed directly to the | |
| 65 // Coordinator service, and the pointer value will be overwritten with the | |
| 66 // state returned by the Coordinator service. | |
| 67 // | |
| 68 // If an error occurs and it is transient, an errors.Transient error will be | |
| 69 // returned. | |
| 70 func (c *cache) RegisterStream(ctx context.Context, st *LogStreamState, d *logpb .LogStreamDescriptor) ( | |
| 71 *LogStreamState, error) { | |
| 72 now := clock.Now(ctx) | |
| 73 | |
| 74 // Get the cacheEntry from our cache. If it is expired, doesn't exist, o r | |
| 75 // we're forcing, ignore any existing entry and replace with a Promise p ending | |
| 76 // Coordinator sync. | |
| 77 entry := c.lru.Mutate(st.Path, func(current interface{}) interface{} { | |
| 78 // Don't replace an existing entry, unless it has an error or ha s expired. | |
| 79 if current != nil { | |
| 80 curEntry := current.(*cacheEntry) | |
| 81 if !curEntry.hasError() && now.Before(curEntry.expiresAt ) { | |
| 82 return current | |
| 83 } | |
| 84 } | |
| 85 | |
| 86 p := promise.New(func() (interface{}, error) { | |
| 87 st, err := c.Coordinator.RegisterStream(ctx, st, d) | |
| 88 if err != nil { | |
| 89 return nil, err | |
| 90 } | |
| 91 | |
| 92 return &LogStreamState{ | |
| 93 Path: st.Path, | |
| 94 ProtoVersion: st.ProtoVersion, | |
| 95 Secret: st.Secret, | |
| 96 TerminalIndex: types.MessageIndex(st.TerminalInd ex), | |
| 97 Archived: st.Archived, | |
| 98 Purged: st.Purged, | |
| 99 }, nil | |
| 100 }) | |
| 101 | |
| 102 return &cacheEntry{ | |
| 103 terminalIndex: -1, | |
| 104 p: p, | |
| 105 path: st.Path, | |
| 106 expiresAt: now.Add(c.expiration), | |
| 107 } | |
| 108 }).(*cacheEntry) | |
| 109 | |
| 110 // If there was an error, purge the erroneous entry from the cache so th at | |
| 111 // the next "update" will re-fetch it. | |
| 112 st, err := entry.get(ctx) | |
| 113 if err != nil { | |
| 114 log.Fields{ | |
| 115 log.ErrorKey: err, | |
| 116 }.Errorf(ctx, "Error retrieving stream state.") | |
| 117 return nil, err | |
| 118 } | |
| 119 return st, nil | |
| 120 } | |
| 121 | |
| 122 func (c *cache) TerminateStream(ctx context.Context, st *LogStreamState) error { | |
| 123 // Immediately update our state cache to record the terminal index, if | |
| 124 // we have a state cache. | |
| 125 c.lru.Mutate(st.Path, func(current interface{}) (r interface{}) { | |
| 126 // Always return the current entry. We're just atomically examin ing it to | |
| 127 // load it with a terminal index. | |
| 128 r = current | |
| 129 if r != nil { | |
| 130 r.(*cacheEntry).loadTerminalIndex(st.TerminalIndex) | |
| 131 } | |
| 132 return | |
| 133 }) | |
| 134 | |
| 135 return c.Coordinator.TerminateStream(ctx, st) | |
| 136 } | |
| 137 | |
| 138 // cacheEntry is the value stored in the cache. It contains a Promise | |
| 139 // representing the value and an optional invalidation singleton to ensure that | |
| 140 // if the state failed to fetch, it will be invalidated at most once. | |
| 141 // | |
| 142 // In addition to remote caching via Promise, the state can be updated locally | |
| 143 // by calling the cache's "put" method. In this case, the Promise will be nil, | |
| 144 // and the state value will be populated. | |
| 145 type cacheEntry struct { | |
| 146 // terminalIndex is the loaded terminal index set via loadTerminalIndex. It | |
| 147 // will be applied to returned LogStreamState objects so that once a ter minal | |
| 148 // index has been set, we become aware of it in the Collector. | |
| 149 // | |
| 150 // This MUST be the first field in the struct in order to comply with at omic's | |
| 151 // 64-bit alignment requirements. | |
| 152 terminalIndex int64 | |
| 153 | |
| 154 // p is a Promise that is blocking pending a Coordiantor stream state | |
| 155 // response. Upon successful resolution, it will contain a *LogStreamSta te. | |
| 156 p promise.Promise | |
| 157 path types.StreamPath | |
| 158 expiresAt time.Time | |
| 159 } | |
| 160 | |
| 161 // get returns the cached state that this entry owns, blocking until resolution | |
| 162 // if necessary. | |
| 163 // | |
| 164 // The returned state is a shallow copy of the cached state, and may be | |
| 165 // modified by the caller. | |
| 166 func (e *cacheEntry) get(ctx context.Context) (*LogStreamState, error) { | |
| 167 promisedSt, err := e.p.Get(ctx) | |
| 168 if err != nil { | |
| 169 return nil, err | |
| 170 } | |
| 171 | |
| 172 // Create a clone of our cached value (not deep, so secret is not cloned , but | |
| 173 // the Collector will not modify that). If we have a local terminal inde x | |
| 174 // cached, apply that to the response. | |
| 175 // | |
| 176 // We need to lock around our terminalIndex. | |
| 177 rp := *(promisedSt.(*LogStreamState)) | |
| 178 if rp.TerminalIndex < 0 { | |
| 179 rp.TerminalIndex = types.MessageIndex(atomic.LoadInt64(&e.termin alIndex)) | |
| 180 } | |
| 181 | |
| 182 return &rp, nil | |
| 183 } | |
| 184 | |
| 185 // hasError tests if this entry has completed evaluation with an error state. | |
| 186 // This is non-blocking, so if the evaluation hasn't completed, it will return | |
| 187 // false. | |
| 188 func (e *cacheEntry) hasError() bool { | |
| 189 if _, err := e.p.Peek(); err != nil && err != promise.ErrNoData { | |
| 190 return true | |
| 191 } | |
| 192 return false | |
| 193 } | |
| 194 | |
| 195 // loadTerminalIndex loads a local cache of the stream's terminal index. This | |
| 196 // will be applied to all future get requests. | |
| 197 func (e *cacheEntry) loadTerminalIndex(tidx types.MessageIndex) { | |
| 198 atomic.StoreInt64(&e.terminalIndex, int64(tidx)) | |
| 199 } | |
| OLD | NEW |