Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | |
|
dnj (Google)
2016/01/21 04:36:25
Another main file for review. Comments should prov
| |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package collector | |
| 6 | |
| 7 import ( | |
| 8 "errors" | |
| 9 "sync/atomic" | |
| 10 "time" | |
| 11 | |
| 12 "github.com/luci/luci-go/common/clock" | |
| 13 "github.com/luci/luci-go/common/logdog/types" | |
| 14 log "github.com/luci/luci-go/common/logging" | |
| 15 "github.com/luci/luci-go/common/lru" | |
| 16 "github.com/luci/luci-go/common/promise" | |
| 17 cc "github.com/luci/luci-go/server/internal/logdog/coordinatorClient" | |
| 18 "golang.org/x/net/context" | |
| 19 ) | |
| 20 | |
| 21 const ( | |
| 22 // The (maximum) size of the LRU cache. | |
| 23 defaultStreamStateCacheSize = 1024 * 1024 | |
| 24 ) | |
| 25 | |
| 26 // streamStateCacheOptions holds configurable options for a streamStateCache | |
| 27 // instance. | |
| 28 type streamStateCacheOptions struct { | |
| 29 // coordinator is the administrative Coordinator instance to use. | |
| 30 coordinator CoordinatorClient | |
| 31 | |
| 32 // cacheSize is the number of stream states to hold in the cache. If zer o, | |
| 33 // defaultStreamStateCacheSize will be used. | |
| 34 cacheSize int | |
| 35 | |
| 36 // expiration is the expiration period of time. If an entry is older tha n this | |
| 37 // period, it will be discarded. If zero, DefaultStreamStateCacheExpire will | |
| 38 // be used. | |
| 39 expiration time.Duration | |
| 40 } | |
| 41 | |
| 42 // stateProxy is a local representation of a remote stream's state. It is a | |
| 43 // subset of the remote state with the necessary elements for the Collector to | |
| 44 // operate and update. | |
| 45 type stateProxy struct { | |
| 46 path types.StreamPath | |
| 47 proto string | |
| 48 secret []byte | |
| 49 terminalIndex types.MessageIndex | |
| 50 archived bool | |
| 51 purged bool | |
| 52 } | |
| 53 | |
| 54 // streamStateCache loads and caches the LogStream for a specified stream. | |
| 55 // | |
| 56 // Stream state is formally owned by the Coordinator instance. However, since | |
| 57 // state can only progress forwards and registration/termination operations are | |
| 58 // idempotent, it is safe to cache to alleviate server requests. This maintains | |
| 59 // stream state using an LRU cache to manage memory. | |
| 60 // | |
| 61 // The cache is responsible for two things: Firstly, it coalesces multiple | |
| 62 // pending requests for the same stream state into a single Coordinator request. | |
| 63 // Secondly, it maintains a cache of completed responses to short-circuit the | |
| 64 // Coordinator. | |
| 65 // | |
| 66 // Stream state is stored internally as a Promise. This Promise is evaluated by | |
| 67 // querying the Coordinator. This interface is hidden to callers. | |
| 68 type streamStateCache struct { | |
| 69 *streamStateCacheOptions | |
| 70 | |
| 71 cache *lru.Cache | |
| 72 } | |
| 73 | |
| 74 // streamStateCacheEntry is the value stored in the cache. It contains a Promise | |
| 75 // representing the value and an optional invalidation singleton to ensure that | |
| 76 // if the state failed to fetch, it will be invalidated at most once. | |
| 77 // | |
| 78 // In addition to remote cachine via Promise, the state can be updated locally | |
| 79 // by calling the cache's "put" method. In this case, the Promise will be nil, | |
| 80 // and the state value will be populated. | |
| 81 type streamStateCacheEntry struct { | |
| 82 // terminalIndex is the loaded terminal index set via loadTerminalIndex. It | |
| 83 // will be applied to returned stateProxy objects so that once a termina l | |
| 84 // index has been set, we become aware of it in the Collector. | |
| 85 // | |
| 86 // This MUST be the first field in the struct in order to comply with at omic's | |
| 87 // 64-bit alignment requirements. | |
| 88 terminalIndex int64 | |
| 89 | |
| 90 p promise.Promise | |
| 91 path types.StreamPath | |
| 92 expiresAt time.Time | |
| 93 } | |
| 94 | |
| 95 // get returns the cached state that this entry owns, blocking until resolution | |
| 96 // if necessary. | |
| 97 func (e *streamStateCacheEntry) get(ctx context.Context) (*stateProxy, error) { | |
| 98 state, err := e.p.Get(ctx) | |
| 99 if err != nil { | |
| 100 return nil, err | |
| 101 } | |
| 102 | |
| 103 // Create a clone of our cached value (not deep, so secret is not cloned , but | |
| 104 // the Collector will not modify that). If we have a local terminal inde x | |
| 105 // cached, apply that to the response. | |
| 106 // | |
| 107 // We need to lock around our terminalIndex. | |
| 108 rp := *(state.(*stateProxy)) | |
| 109 if rp.terminalIndex < 0 { | |
| 110 rp.terminalIndex = types.MessageIndex(atomic.LoadInt64(&e.termin alIndex)) | |
| 111 } | |
| 112 | |
| 113 return &rp, nil | |
| 114 } | |
| 115 | |
| 116 // hasError tests if this entry has completed evaluation with an error state. | |
| 117 // This is non-blocking, so if the evaluation hasn't completed, it will return | |
| 118 // false. | |
| 119 func (e *streamStateCacheEntry) hasError() bool { | |
| 120 if _, err := e.p.Peek(); err != nil && err != promise.ErrNoData { | |
| 121 return true | |
| 122 } | |
| 123 return false | |
| 124 } | |
| 125 | |
| 126 // loadTerminalIndex loads a local cache of the stream's terminal index. This | |
| 127 // will be applied to all future get requests. | |
| 128 func (e *streamStateCacheEntry) loadTerminalIndex(tidx types.MessageIndex) { | |
| 129 atomic.StoreInt64(&e.terminalIndex, int64(tidx)) | |
| 130 } | |
| 131 | |
| 132 // newStreamStateCache instantiates and initializes a new stream state cache | |
| 133 // instance. | |
| 134 func newStreamStateCache(o streamStateCacheOptions) *streamStateCache { | |
| 135 if o.cacheSize <= 0 { | |
| 136 o.cacheSize = defaultStreamStateCacheSize | |
| 137 } | |
| 138 if o.expiration <= 0 { | |
| 139 o.expiration = DefaultStreamStateCacheExpire | |
| 140 } | |
| 141 | |
| 142 return &streamStateCache{ | |
| 143 streamStateCacheOptions: &o, | |
| 144 | |
| 145 cache: lru.New(o.cacheSize), | |
| 146 } | |
| 147 } | |
| 148 | |
| 149 // getOrRegister is a goroutine-safe blocking call that synchronizes log stream | |
| 150 // state with the Coordinator. | |
| 151 // | |
| 152 // If successful, the supplied state will be pushed directly to the | |
| 153 // Coordinator service, and the pointer value will be overwritten with the | |
| 154 // state returned by the Coordinator service. | |
| 155 // | |
| 156 // If an error occurs and it is transient, an errors.Transient error will be | |
| 157 // returned. | |
| 158 func (s *streamStateCache) getOrRegister(ctx context.Context, state *cc.State) ( *stateProxy, error) { | |
| 159 now := clock.Now(ctx) | |
| 160 | |
| 161 // Get the streamStateCacheEntry from our cache. If it is expired, doesn 't | |
| 162 // exist, or we're forcing, ignore any existing entry and replace with a | |
| 163 // Promise pending Coordinator sync. | |
| 164 entry := s.cache.Mutate(state.Path, func(current interface{}) interface{ } { | |
| 165 // Don't replace an existing entry, unless it has an error or ha s expired. | |
| 166 if current != nil { | |
| 167 curEntry := current.(*streamStateCacheEntry) | |
| 168 if !curEntry.hasError() && now.Before(curEntry.expiresAt ) { | |
| 169 return current | |
| 170 } | |
| 171 } | |
| 172 | |
| 173 p := promise.New(func() (interface{}, error) { | |
| 174 st, err := s.coordinator.RegisterStream(ctx, *state) | |
| 175 if err != nil { | |
| 176 return nil, err | |
| 177 } | |
| 178 | |
| 179 // The response must have an embedded state. | |
| 180 if st == nil || st.State == nil { | |
| 181 return nil, errors.New("registration response is missing embedded state") | |
| 182 } | |
| 183 | |
| 184 return &stateProxy{ | |
| 185 path: state.Path, | |
| 186 proto: st.ProtoVersion, | |
| 187 secret: st.Secret, | |
| 188 terminalIndex: types.MessageIndex(st.State.Termi nalIndex), | |
| 189 archived: st.Archived(), | |
| 190 purged: st.State.Purged, | |
| 191 }, nil | |
| 192 }) | |
| 193 | |
| 194 return &streamStateCacheEntry{ | |
| 195 terminalIndex: -1, | |
| 196 p: p, | |
| 197 path: state.Path, | |
| 198 expiresAt: now.Add(s.expiration), | |
| 199 } | |
| 200 }).(*streamStateCacheEntry) | |
| 201 | |
| 202 // If there was an error, purge the erroneous entry from the cache so th at | |
| 203 // the next "update" will re-fetch it. | |
| 204 st, err := entry.get(ctx) | |
| 205 if err != nil { | |
| 206 log.Fields{ | |
| 207 log.ErrorKey: err, | |
| 208 }.Errorf(ctx, "Error retrieving stream state.") | |
| 209 return nil, err | |
| 210 } | |
| 211 return st, nil | |
| 212 } | |
| 213 | |
| 214 func (s *streamStateCache) setTerminalIndex(ctx context.Context, st *stateProxy) error { | |
| 215 // Immediately update our state cache to record the terminal index, if | |
| 216 // we have a state cache. | |
| 217 s.cache.Mutate(st.path, func(current interface{}) (r interface{}) { | |
| 218 // Always return the current entry. We're just atomically examin ing it to | |
| 219 // load it with a terminal index. | |
| 220 r = current | |
| 221 if r != nil { | |
| 222 r.(*streamStateCacheEntry).loadTerminalIndex(st.terminal Index) | |
| 223 } | |
| 224 return | |
| 225 }) | |
| 226 | |
| 227 return s.coordinator.TerminateStream(ctx, st.path, st.secret, st.termina lIndex) | |
| 228 } | |
| OLD | NEW |