Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(324)

Side by Side Diff: server/internal/logdog/collector/streamstatecache.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Rebased, updated from comments. Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package collector
6
7 import (
8 "errors"
9 "sync/atomic"
10 "time"
11
12 "github.com/luci/luci-go/common/clock"
13 "github.com/luci/luci-go/common/logdog/types"
14 log "github.com/luci/luci-go/common/logging"
15 "github.com/luci/luci-go/common/lru"
16 "github.com/luci/luci-go/common/promise"
17 cc "github.com/luci/luci-go/server/internal/logdog/coordinatorClient"
18 "golang.org/x/net/context"
19 )
20
21 const (
22 // The (maximum) size of the LRU cache.
23 defaultStreamStateCacheSize = 1024 * 1024
iannucci 2016/01/28 01:15:35 how was this value determined?
dnj 2016/01/29 20:46:52 Big number. I'll make this configurable.
24 )
25
26 // streamStateCacheOptions holds configurable options for a streamStateCache
27 // instance.
28 type streamStateCacheOptions struct {
29 // coordinator is the administrative Coordinator instance to use.
30 coordinator CoordinatorClient
31
32 // cacheSize is the number of stream states to hold in the cache. If zer o,
33 // defaultStreamStateCacheSize will be used.
34 cacheSize int
35
36 // expiration is the expiration period of time. If an entry is older tha n this
37 // period, it will be discarded. If zero, DefaultStreamStateCacheExpire will
38 // be used.
39 expiration time.Duration
40 }
41
42 // stateProxy is a local representation of a remote stream's state. It is a
43 // subset of the remote state with the necessary elements for the Collector to
44 // operate and update.
45 type stateProxy struct {
46 path types.StreamPath // Stream path.
47 proto string // Stream protocol version string.
48 secret []byte // Secret.
49 terminalIndex types.MessageIndex // Terminal index, <0 for unterminated.
50 archived bool // Is the stream archived?
51 purged bool // Is the stream purged?
52 }
53
54 // streamStateCache loads and caches the LogStream for a specified stream.
55 //
56 // Stream state is formally owned by the Coordinator instance. However, since
57 // state can only progress forwards and registration/termination operations are
58 // idempotent, it is safe to cache to alleviate server requests. This maintains
59 // stream state using an LRU cache to manage memory.
60 //
61 // The cache is responsible for two things: Firstly, it coalesces multiple
62 // pending requests for the same stream state into a single Coordinator request.
63 // Secondly, it maintains a cache of completed responses to short-circuit the
64 // Coordinator.
65 //
66 // Stream state is stored internally as a Promise. This Promise is evaluated by
67 // querying the Coordinator. This interface is hidden to callers.
68 type streamStateCache struct {
69 *streamStateCacheOptions
70
71 cache *lru.Cache
72 }
73
74 // streamStateCacheEntry is the value stored in the cache. It contains a Promise
75 // representing the value and an optional invalidation singleton to ensure that
76 // if the state failed to fetch, it will be invalidated at most once.
77 //
78 // In addition to remote cachine via Promise, the state can be updated locally
iannucci 2016/01/28 01:15:35 caching
dnj 2016/01/29 20:46:52 Done.
79 // by calling the cache's "put" method. In this case, the Promise will be nil,
80 // and the state value will be populated.
81 type streamStateCacheEntry struct {
82 // terminalIndex is the loaded terminal index set via loadTerminalIndex. It
83 // will be applied to returned stateProxy objects so that once a termina l
84 // index has been set, we become aware of it in the Collector.
85 //
86 // This MUST be the first field in the struct in order to comply with at omic's
87 // 64-bit alignment requirements.
88 terminalIndex int64
iannucci 2016/01/28 01:15:35 probably should also note that this field must not
dnj 2016/01/29 20:46:52 I feel like that's the default for internal member
89
90 p promise.Promise
iannucci 2016/01/28 01:15:35 inner type?
dnj 2016/01/29 20:46:52 Will document.
91 path types.StreamPath
92 expiresAt time.Time
93 }
94
95 // get returns the cached state that this entry owns, blocking until resolution
96 // if necessary.
97 func (e *streamStateCacheEntry) get(ctx context.Context) (*stateProxy, error) {
98 state, err := e.p.Get(ctx)
99 if err != nil {
100 return nil, err
101 }
102
103 // Create a clone of our cached value (not deep, so secret is not cloned , but
104 // the Collector will not modify that). If we have a local terminal inde x
105 // cached, apply that to the response.
106 //
107 // We need to lock around our terminalIndex.
iannucci 2016/01/28 01:15:35 erm... where's the lock? It looks like you read it
dnj 2016/01/29 20:46:52 Oh whoops, old comment before atomic goodness.
108 rp := *(state.(*stateProxy))
109 if rp.terminalIndex < 0 {
110 rp.terminalIndex = types.MessageIndex(atomic.LoadInt64(&e.termin alIndex))
111 }
112
113 return &rp, nil
114 }
115
116 // hasError tests if this entry has completed evaluation with an error state.
117 // This is non-blocking, so if the evaluation hasn't completed, it will return
118 // false.
119 func (e *streamStateCacheEntry) hasError() bool {
120 if _, err := e.p.Peek(); err != nil && err != promise.ErrNoData {
121 return true
122 }
123 return false
124 }
125
126 // loadTerminalIndex loads a local cache of the stream's terminal index. This
127 // will be applied to all future get requests.
128 func (e *streamStateCacheEntry) loadTerminalIndex(tidx types.MessageIndex) {
129 atomic.StoreInt64(&e.terminalIndex, int64(tidx))
130 }
131
132 // newStreamStateCache instantiates and initializes a new stream state cache
133 // instance.
134 func newStreamStateCache(o streamStateCacheOptions) *streamStateCache {
135 if o.cacheSize <= 0 {
136 o.cacheSize = defaultStreamStateCacheSize
137 }
138 if o.expiration <= 0 {
139 o.expiration = DefaultStreamStateCacheExpire
140 }
141
142 return &streamStateCache{
143 streamStateCacheOptions: &o,
144
145 cache: lru.New(o.cacheSize),
146 }
147 }
148
149 // getOrRegister is a goroutine-safe blocking call that synchronizes log stream
150 // state with the Coordinator.
151 //
152 // If successful, the supplied state will be pushed directly to the
153 // Coordinator service, and the pointer value will be overwritten with the
154 // state returned by the Coordinator service.
155 //
156 // If an error occurs and it is transient, an errors.Transient error will be
157 // returned.
158 func (s *streamStateCache) getOrRegister(ctx context.Context, state *cc.State) ( *stateProxy, error) {
159 now := clock.Now(ctx)
160
161 // Get the streamStateCacheEntry from our cache. If it is expired, doesn 't
162 // exist, or we're forcing, ignore any existing entry and replace with a
163 // Promise pending Coordinator sync.
164 entry := s.cache.Mutate(state.Path, func(current interface{}) interface{ } {
165 // Don't replace an existing entry, unless it has an error or ha s expired.
166 if current != nil {
167 curEntry := current.(*streamStateCacheEntry)
168 if !curEntry.hasError() && now.Before(curEntry.expiresAt ) {
169 return current
170 }
171 }
172
173 p := promise.New(func() (interface{}, error) {
174 st, err := s.coordinator.RegisterStream(ctx, *state)
175 if err != nil {
176 return nil, err
177 }
178
179 // The response must have an embedded state.
180 if st == nil || st.State == nil {
181 return nil, errors.New("registration response is missing embedded state")
182 }
183
184 return &stateProxy{
185 path: state.Path,
186 proto: st.ProtoVersion,
187 secret: st.Secret,
188 terminalIndex: types.MessageIndex(st.State.Termi nalIndex),
189 archived: st.Archived(),
190 purged: st.State.Purged,
191 }, nil
192 })
193
194 return &streamStateCacheEntry{
195 terminalIndex: -1,
196 p: p,
197 path: state.Path,
198 expiresAt: now.Add(s.expiration),
199 }
200 }).(*streamStateCacheEntry)
201
202 // If there was an error, purge the erroneous entry from the cache so th at
203 // the next "update" will re-fetch it.
204 st, err := entry.get(ctx)
205 if err != nil {
206 log.Fields{
207 log.ErrorKey: err,
208 }.Errorf(ctx, "Error retrieving stream state.")
209 return nil, err
210 }
211 return st, nil
212 }
213
214 func (s *streamStateCache) setTerminalIndex(ctx context.Context, st *stateProxy) error {
215 // Immediately update our state cache to record the terminal index, if
216 // we have a state cache.
217 s.cache.Mutate(st.path, func(current interface{}) (r interface{}) {
218 // Always return the current entry. We're just atomically examin ing it to
219 // load it with a terminal index.
220 r = current
221 if r != nil {
222 r.(*streamStateCacheEntry).loadTerminalIndex(st.terminal Index)
223 }
224 return
225 })
226
227 return s.coordinator.TerminateStream(ctx, st.path, st.secret, st.termina lIndex)
228 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698