Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(243)

Side by Side Diff: server/internal/logdog/collector/coordinator/cache.go

Issue 1906023002: LogDog: Add project namespace to Butler/Collector. (Closed) Base URL: https://github.com/luci/luci-go@logdog-project-archivist
Patch Set: Rebase? Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package coordinator 5 package coordinator
6 6
7 import ( 7 import (
8 » "sync/atomic" 8 » "sync"
9 "time" 9 "time"
10 10
11 "github.com/luci/luci-go/common/clock" 11 "github.com/luci/luci-go/common/clock"
12 "github.com/luci/luci-go/common/config"
12 "github.com/luci/luci-go/common/logdog/types" 13 "github.com/luci/luci-go/common/logdog/types"
13 log "github.com/luci/luci-go/common/logging" 14 log "github.com/luci/luci-go/common/logging"
14 "github.com/luci/luci-go/common/lru" 15 "github.com/luci/luci-go/common/lru"
15 "github.com/luci/luci-go/common/promise" 16 "github.com/luci/luci-go/common/promise"
16 "github.com/luci/luci-go/common/proto/logdog/logpb" 17 "github.com/luci/luci-go/common/proto/logdog/logpb"
17 "golang.org/x/net/context" 18 "golang.org/x/net/context"
18 ) 19 )
19 20
20 const ( 21 const (
21 // DefaultSize is the default (maximum) size of the LRU cache. 22 // DefaultSize is the default (maximum) size of the LRU cache.
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
60 } 61 }
61 } 62 }
62 63
63 // RegisterStream invokes the wrapped Coordinator's RegisterStream method and 64 // RegisterStream invokes the wrapped Coordinator's RegisterStream method and
64 // caches the result. It uses a Promise to cause all simultaneous identical 65 // caches the result. It uses a Promise to cause all simultaneous identical
65 // RegisterStream requests to block on a single RPC. 66 // RegisterStream requests to block on a single RPC.
66 func (c *cache) RegisterStream(ctx context.Context, st *LogStreamState, d *logpb .LogStreamDescriptor) ( 67 func (c *cache) RegisterStream(ctx context.Context, st *LogStreamState, d *logpb .LogStreamDescriptor) (
67 *LogStreamState, error) { 68 *LogStreamState, error) {
68 now := clock.Now(ctx) 69 now := clock.Now(ctx)
69 70
71 key := cacheEntryKey{
72 project: st.Project,
73 path: st.Path,
74 }
75
70 // Get the cacheEntry from our cache. If it is expired, doesn't exist, o r 76 // Get the cacheEntry from our cache. If it is expired, doesn't exist, o r
71 // we're forcing, ignore any existing entry and replace with a Promise p ending 77 // we're forcing, ignore any existing entry and replace with a Promise p ending
72 // Coordinator sync. 78 // Coordinator sync.
73 » entry := c.lru.Mutate(st.Path, func(current interface{}) interface{} { 79 » entry := c.lru.Mutate(key, func(current interface{}) interface{} {
74 // Don't replace an existing entry, unless it has an error or ha s expired. 80 // Don't replace an existing entry, unless it has an error or ha s expired.
75 if current != nil { 81 if current != nil {
76 curEntry := current.(*cacheEntry) 82 curEntry := current.(*cacheEntry)
77 if !curEntry.hasError() && now.Before(curEntry.expiresAt ) { 83 if !curEntry.hasError() && now.Before(curEntry.expiresAt ) {
78 return current 84 return current
79 } 85 }
80 } 86 }
81 87
82 p := promise.New(func() (interface{}, error) { 88 p := promise.New(func() (interface{}, error) {
83 st, err := c.Coordinator.RegisterStream(ctx, st, d) 89 st, err := c.Coordinator.RegisterStream(ctx, st, d)
84 if err != nil { 90 if err != nil {
85 return nil, err 91 return nil, err
86 } 92 }
87 93
88 return &LogStreamState{ 94 return &LogStreamState{
95 Project: st.Project,
89 Path: st.Path, 96 Path: st.Path,
90 ProtoVersion: st.ProtoVersion, 97 ProtoVersion: st.ProtoVersion,
91 Secret: st.Secret, 98 Secret: st.Secret,
92 TerminalIndex: types.MessageIndex(st.TerminalInd ex), 99 TerminalIndex: types.MessageIndex(st.TerminalInd ex),
93 Archived: st.Archived, 100 Archived: st.Archived,
94 Purged: st.Purged, 101 Purged: st.Purged,
95 }, nil 102 }, nil
96 }) 103 })
97 104
98 return &cacheEntry{ 105 return &cacheEntry{
106 cacheEntryKey: cacheEntryKey{
107 project: st.Project,
108 path: st.Path,
109 },
99 terminalIndex: -1, 110 terminalIndex: -1,
100 p: p, 111 p: p,
101 path: st.Path,
102 expiresAt: now.Add(c.expiration), 112 expiresAt: now.Add(c.expiration),
103 } 113 }
104 }).(*cacheEntry) 114 }).(*cacheEntry)
105 115
106 // If there was an error, purge the erroneous entry from the cache so th at 116 // If there was an error, purge the erroneous entry from the cache so th at
107 // the next "update" will re-fetch it. 117 // the next "update" will re-fetch it.
108 st, err := entry.get(ctx) 118 st, err := entry.get(ctx)
109 if err != nil { 119 if err != nil {
110 log.Fields{ 120 log.Fields{
111 log.ErrorKey: err, 121 log.ErrorKey: err,
112 }.Errorf(ctx, "Error retrieving stream state.") 122 }.Errorf(ctx, "Error retrieving stream state.")
113 return nil, err 123 return nil, err
114 } 124 }
115 return st, nil 125 return st, nil
116 } 126 }
117 127
118 func (c *cache) TerminateStream(ctx context.Context, st *LogStreamState) error { 128 func (c *cache) TerminateStream(ctx context.Context, st *LogStreamState) error {
129 key := cacheEntryKey{
130 project: st.Project,
131 path: st.Path,
132 }
133
119 // Immediately update our state cache to record the terminal index, if 134 // Immediately update our state cache to record the terminal index, if
120 // we have a state cache. 135 // we have a state cache.
121 » c.lru.Mutate(st.Path, func(current interface{}) (r interface{}) { 136 » c.lru.Mutate(key, func(current interface{}) (r interface{}) {
122 // Always return the current entry. We're just atomically examin ing it to 137 // Always return the current entry. We're just atomically examin ing it to
123 // load it with a terminal index. 138 // load it with a terminal index.
124 r = current 139 r = current
125 if r != nil { 140 if r != nil {
126 r.(*cacheEntry).loadTerminalIndex(st.TerminalIndex) 141 r.(*cacheEntry).loadTerminalIndex(st.TerminalIndex)
127 } 142 }
128 return 143 return
129 }) 144 })
130 145
131 return c.Coordinator.TerminateStream(ctx, st) 146 return c.Coordinator.TerminateStream(ctx, st)
132 } 147 }
133 148
149 // cacheEntryKey is the LRU key for a cacheEntry.
150 type cacheEntryKey struct {
151 project config.ProjectName
152 path types.StreamPath
153 }
154
134 // cacheEntry is the value stored in the cache. It contains a Promise 155 // cacheEntry is the value stored in the cache. It contains a Promise
135 // representing the value and an optional invalidation singleton to ensure that 156 // representing the value and an optional invalidation singleton to ensure that
136 // if the state failed to fetch, it will be invalidated at most once. 157 // if the state failed to fetch, it will be invalidated at most once.
137 // 158 //
138 // In addition to remote caching via Promise, the state can be updated locally 159 // In addition to remote caching via Promise, the state can be updated locally
139 // by calling the cache's "put" method. In this case, the Promise will be nil, 160 // by calling the cache's "put" method. In this case, the Promise will be nil,
140 // and the state value will be populated. 161 // and the state value will be populated.
141 type cacheEntry struct { 162 type cacheEntry struct {
163 sync.Mutex
164 cacheEntryKey
165
142 // terminalIndex is the loaded terminal index set via loadTerminalIndex. It 166 // terminalIndex is the loaded terminal index set via loadTerminalIndex. It
143 // will be applied to returned LogStreamState objects so that once a ter minal 167 // will be applied to returned LogStreamState objects so that once a ter minal
144 // index has been set, we become aware of it in the Collector. 168 // index has been set, we become aware of it in the Collector.
145 » // 169 » terminalIndex types.MessageIndex
146 » // This MUST be the first field in the struct in order to comply with at omic's
147 » // 64-bit alignment requirements.
148 » terminalIndex int64
149 170
150 // p is a Promise that is blocking pending a Coordiantor stream state 171 // p is a Promise that is blocking pending a Coordiantor stream state
151 // response. Upon successful resolution, it will contain a *LogStreamSta te. 172 // response. Upon successful resolution, it will contain a *LogStreamSta te.
152 » p promise.Promise 173 » p promise.Promise
174
175 » project config.ProjectName
153 path types.StreamPath 176 path types.StreamPath
154 expiresAt time.Time 177 expiresAt time.Time
155 } 178 }
156 179
157 // get returns the cached state that this entry owns, blocking until resolution 180 // get returns the cached state that this entry owns, blocking until resolution
158 // if necessary. 181 // if necessary.
159 // 182 //
160 // The returned state is a shallow copy of the cached state, and may be 183 // The returned state is a shallow copy of the cached state, and may be
161 // modified by the caller. 184 // modified by the caller.
162 func (e *cacheEntry) get(ctx context.Context) (*LogStreamState, error) { 185 func (e *cacheEntry) get(ctx context.Context) (*LogStreamState, error) {
163 promisedSt, err := e.p.Get(ctx) 186 promisedSt, err := e.p.Get(ctx)
164 if err != nil { 187 if err != nil {
165 return nil, err 188 return nil, err
166 } 189 }
167 190
168 // Create a clone of our cached value (not deep, so secret is not cloned , but 191 // Create a clone of our cached value (not deep, so secret is not cloned , but
169 // the Collector will not modify that). If we have a local terminal inde x 192 // the Collector will not modify that). If we have a local terminal inde x
170 // cached, apply that to the response. 193 // cached, apply that to the response.
171 // 194 //
172 // We need to lock around our terminalIndex. 195 // We need to lock around our terminalIndex.
196 e.Lock()
197 defer e.Unlock()
198
173 rp := *(promisedSt.(*LogStreamState)) 199 rp := *(promisedSt.(*LogStreamState))
174 if rp.TerminalIndex < 0 { 200 if rp.TerminalIndex < 0 {
175 » » rp.TerminalIndex = types.MessageIndex(atomic.LoadInt64(&e.termin alIndex)) 201 » » rp.TerminalIndex = e.terminalIndex
176 } 202 }
177 203
178 return &rp, nil 204 return &rp, nil
179 } 205 }
180 206
181 // hasError tests if this entry has completed evaluation with an error state. 207 // hasError tests if this entry has completed evaluation with an error state.
182 // This is non-blocking, so if the evaluation hasn't completed, it will return 208 // This is non-blocking, so if the evaluation hasn't completed, it will return
183 // false. 209 // false.
184 func (e *cacheEntry) hasError() bool { 210 func (e *cacheEntry) hasError() bool {
185 if _, err := e.p.Peek(); err != nil && err != promise.ErrNoData { 211 if _, err := e.p.Peek(); err != nil && err != promise.ErrNoData {
186 return true 212 return true
187 } 213 }
188 return false 214 return false
189 } 215 }
190 216
191 // loadTerminalIndex loads a local cache of the stream's terminal index. This 217 // loadTerminalIndex loads a local cache of the stream's terminal index. This
192 // will be applied to all future get requests. 218 // will be applied to all future get requests.
193 func (e *cacheEntry) loadTerminalIndex(tidx types.MessageIndex) { 219 func (e *cacheEntry) loadTerminalIndex(tidx types.MessageIndex) {
194 » atomic.StoreInt64(&e.terminalIndex, int64(tidx)) 220 » e.Lock()
221 » defer e.Unlock()
222
223 » e.terminalIndex = tidx
195 } 224 }
OLDNEW
« no previous file with comments | « server/internal/logdog/collector/collector_test.go ('k') | server/internal/logdog/collector/coordinator/cache_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698