OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package coordinator | 5 package coordinator |
6 | 6 |
7 import ( | 7 import ( |
8 » "sync/atomic" | 8 » "sync" |
9 "time" | 9 "time" |
10 | 10 |
11 "github.com/luci/luci-go/common/clock" | 11 "github.com/luci/luci-go/common/clock" |
| 12 "github.com/luci/luci-go/common/config" |
12 "github.com/luci/luci-go/common/logdog/types" | 13 "github.com/luci/luci-go/common/logdog/types" |
13 log "github.com/luci/luci-go/common/logging" | 14 log "github.com/luci/luci-go/common/logging" |
14 "github.com/luci/luci-go/common/lru" | 15 "github.com/luci/luci-go/common/lru" |
15 "github.com/luci/luci-go/common/promise" | 16 "github.com/luci/luci-go/common/promise" |
16 "github.com/luci/luci-go/common/proto/logdog/logpb" | 17 "github.com/luci/luci-go/common/proto/logdog/logpb" |
17 "golang.org/x/net/context" | 18 "golang.org/x/net/context" |
18 ) | 19 ) |
19 | 20 |
20 const ( | 21 const ( |
21 // DefaultSize is the default (maximum) size of the LRU cache. | 22 // DefaultSize is the default (maximum) size of the LRU cache. |
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
60 } | 61 } |
61 } | 62 } |
62 | 63 |
63 // RegisterStream invokes the wrapped Coordinator's RegisterStream method and | 64 // RegisterStream invokes the wrapped Coordinator's RegisterStream method and |
64 // caches the result. It uses a Promise to cause all simultaneous identical | 65 // caches the result. It uses a Promise to cause all simultaneous identical |
65 // RegisterStream requests to block on a single RPC. | 66 // RegisterStream requests to block on a single RPC. |
66 func (c *cache) RegisterStream(ctx context.Context, st *LogStreamState, d *logpb
.LogStreamDescriptor) ( | 67 func (c *cache) RegisterStream(ctx context.Context, st *LogStreamState, d *logpb
.LogStreamDescriptor) ( |
67 *LogStreamState, error) { | 68 *LogStreamState, error) { |
68 now := clock.Now(ctx) | 69 now := clock.Now(ctx) |
69 | 70 |
| 71 key := cacheEntryKey{ |
| 72 project: st.Project, |
| 73 path: st.Path, |
| 74 } |
| 75 |
70 // Get the cacheEntry from our cache. If it is expired, doesn't exist, o
r | 76 // Get the cacheEntry from our cache. If it is expired, doesn't exist, o
r |
71 // we're forcing, ignore any existing entry and replace with a Promise p
ending | 77 // we're forcing, ignore any existing entry and replace with a Promise p
ending |
72 // Coordinator sync. | 78 // Coordinator sync. |
73 » entry := c.lru.Mutate(st.Path, func(current interface{}) interface{} { | 79 » entry := c.lru.Mutate(key, func(current interface{}) interface{} { |
74 // Don't replace an existing entry, unless it has an error or ha
s expired. | 80 // Don't replace an existing entry, unless it has an error or ha
s expired. |
75 if current != nil { | 81 if current != nil { |
76 curEntry := current.(*cacheEntry) | 82 curEntry := current.(*cacheEntry) |
77 if !curEntry.hasError() && now.Before(curEntry.expiresAt
) { | 83 if !curEntry.hasError() && now.Before(curEntry.expiresAt
) { |
78 return current | 84 return current |
79 } | 85 } |
80 } | 86 } |
81 | 87 |
82 p := promise.New(func() (interface{}, error) { | 88 p := promise.New(func() (interface{}, error) { |
83 st, err := c.Coordinator.RegisterStream(ctx, st, d) | 89 st, err := c.Coordinator.RegisterStream(ctx, st, d) |
84 if err != nil { | 90 if err != nil { |
85 return nil, err | 91 return nil, err |
86 } | 92 } |
87 | 93 |
88 return &LogStreamState{ | 94 return &LogStreamState{ |
| 95 Project: st.Project, |
89 Path: st.Path, | 96 Path: st.Path, |
90 ProtoVersion: st.ProtoVersion, | 97 ProtoVersion: st.ProtoVersion, |
91 Secret: st.Secret, | 98 Secret: st.Secret, |
92 TerminalIndex: types.MessageIndex(st.TerminalInd
ex), | 99 TerminalIndex: types.MessageIndex(st.TerminalInd
ex), |
93 Archived: st.Archived, | 100 Archived: st.Archived, |
94 Purged: st.Purged, | 101 Purged: st.Purged, |
95 }, nil | 102 }, nil |
96 }) | 103 }) |
97 | 104 |
98 return &cacheEntry{ | 105 return &cacheEntry{ |
| 106 cacheEntryKey: cacheEntryKey{ |
| 107 project: st.Project, |
| 108 path: st.Path, |
| 109 }, |
99 terminalIndex: -1, | 110 terminalIndex: -1, |
100 p: p, | 111 p: p, |
101 path: st.Path, | |
102 expiresAt: now.Add(c.expiration), | 112 expiresAt: now.Add(c.expiration), |
103 } | 113 } |
104 }).(*cacheEntry) | 114 }).(*cacheEntry) |
105 | 115 |
106 // If there was an error, purge the erroneous entry from the cache so th
at | 116 // If there was an error, purge the erroneous entry from the cache so th
at |
107 // the next "update" will re-fetch it. | 117 // the next "update" will re-fetch it. |
108 st, err := entry.get(ctx) | 118 st, err := entry.get(ctx) |
109 if err != nil { | 119 if err != nil { |
110 log.Fields{ | 120 log.Fields{ |
111 log.ErrorKey: err, | 121 log.ErrorKey: err, |
112 }.Errorf(ctx, "Error retrieving stream state.") | 122 }.Errorf(ctx, "Error retrieving stream state.") |
113 return nil, err | 123 return nil, err |
114 } | 124 } |
115 return st, nil | 125 return st, nil |
116 } | 126 } |
117 | 127 |
118 func (c *cache) TerminateStream(ctx context.Context, st *LogStreamState) error { | 128 func (c *cache) TerminateStream(ctx context.Context, st *LogStreamState) error { |
| 129 key := cacheEntryKey{ |
| 130 project: st.Project, |
| 131 path: st.Path, |
| 132 } |
| 133 |
119 // Immediately update our state cache to record the terminal index, if | 134 // Immediately update our state cache to record the terminal index, if |
120 // we have a state cache. | 135 // we have a state cache. |
121 » c.lru.Mutate(st.Path, func(current interface{}) (r interface{}) { | 136 » c.lru.Mutate(key, func(current interface{}) (r interface{}) { |
122 // Always return the current entry. We're just atomically examin
ing it to | 137 // Always return the current entry. We're just atomically examin
ing it to |
123 // load it with a terminal index. | 138 // load it with a terminal index. |
124 r = current | 139 r = current |
125 if r != nil { | 140 if r != nil { |
126 r.(*cacheEntry).loadTerminalIndex(st.TerminalIndex) | 141 r.(*cacheEntry).loadTerminalIndex(st.TerminalIndex) |
127 } | 142 } |
128 return | 143 return |
129 }) | 144 }) |
130 | 145 |
131 return c.Coordinator.TerminateStream(ctx, st) | 146 return c.Coordinator.TerminateStream(ctx, st) |
132 } | 147 } |
133 | 148 |
| 149 // cacheEntryKey is the LRU key for a cacheEntry. |
| 150 type cacheEntryKey struct { |
| 151 project config.ProjectName |
| 152 path types.StreamPath |
| 153 } |
| 154 |
134 // cacheEntry is the value stored in the cache. It contains a Promise | 155 // cacheEntry is the value stored in the cache. It contains a Promise |
135 // representing the value and an optional invalidation singleton to ensure that | 156 // representing the value and an optional invalidation singleton to ensure that |
136 // if the state failed to fetch, it will be invalidated at most once. | 157 // if the state failed to fetch, it will be invalidated at most once. |
137 // | 158 // |
138 // In addition to remote caching via Promise, the state can be updated locally | 159 // In addition to remote caching via Promise, the state can be updated locally |
139 // by calling the cache's "put" method. In this case, the Promise will be nil, | 160 // by calling the cache's "put" method. In this case, the Promise will be nil, |
140 // and the state value will be populated. | 161 // and the state value will be populated. |
141 type cacheEntry struct { | 162 type cacheEntry struct { |
| 163 sync.Mutex |
| 164 cacheEntryKey |
| 165 |
142 // terminalIndex is the loaded terminal index set via loadTerminalIndex.
It | 166 // terminalIndex is the loaded terminal index set via loadTerminalIndex.
It |
143 // will be applied to returned LogStreamState objects so that once a ter
minal | 167 // will be applied to returned LogStreamState objects so that once a ter
minal |
144 // index has been set, we become aware of it in the Collector. | 168 // index has been set, we become aware of it in the Collector. |
145 » // | 169 » terminalIndex types.MessageIndex |
146 » // This MUST be the first field in the struct in order to comply with at
omic's | |
147 » // 64-bit alignment requirements. | |
148 » terminalIndex int64 | |
149 | 170 |
150 // p is a Promise that is blocking pending a Coordiantor stream state | 171 // p is a Promise that is blocking pending a Coordiantor stream state |
151 // response. Upon successful resolution, it will contain a *LogStreamSta
te. | 172 // response. Upon successful resolution, it will contain a *LogStreamSta
te. |
152 » p promise.Promise | 173 » p promise.Promise |
| 174 |
| 175 » project config.ProjectName |
153 path types.StreamPath | 176 path types.StreamPath |
154 expiresAt time.Time | 177 expiresAt time.Time |
155 } | 178 } |
156 | 179 |
157 // get returns the cached state that this entry owns, blocking until resolution | 180 // get returns the cached state that this entry owns, blocking until resolution |
158 // if necessary. | 181 // if necessary. |
159 // | 182 // |
160 // The returned state is a shallow copy of the cached state, and may be | 183 // The returned state is a shallow copy of the cached state, and may be |
161 // modified by the caller. | 184 // modified by the caller. |
162 func (e *cacheEntry) get(ctx context.Context) (*LogStreamState, error) { | 185 func (e *cacheEntry) get(ctx context.Context) (*LogStreamState, error) { |
163 promisedSt, err := e.p.Get(ctx) | 186 promisedSt, err := e.p.Get(ctx) |
164 if err != nil { | 187 if err != nil { |
165 return nil, err | 188 return nil, err |
166 } | 189 } |
167 | 190 |
168 // Create a clone of our cached value (not deep, so secret is not cloned
, but | 191 // Create a clone of our cached value (not deep, so secret is not cloned
, but |
169 // the Collector will not modify that). If we have a local terminal inde
x | 192 // the Collector will not modify that). If we have a local terminal inde
x |
170 // cached, apply that to the response. | 193 // cached, apply that to the response. |
171 // | 194 // |
172 // We need to lock around our terminalIndex. | 195 // We need to lock around our terminalIndex. |
| 196 e.Lock() |
| 197 defer e.Unlock() |
| 198 |
173 rp := *(promisedSt.(*LogStreamState)) | 199 rp := *(promisedSt.(*LogStreamState)) |
174 if rp.TerminalIndex < 0 { | 200 if rp.TerminalIndex < 0 { |
175 » » rp.TerminalIndex = types.MessageIndex(atomic.LoadInt64(&e.termin
alIndex)) | 201 » » rp.TerminalIndex = e.terminalIndex |
176 } | 202 } |
177 | 203 |
178 return &rp, nil | 204 return &rp, nil |
179 } | 205 } |
180 | 206 |
181 // hasError tests if this entry has completed evaluation with an error state. | 207 // hasError tests if this entry has completed evaluation with an error state. |
182 // This is non-blocking, so if the evaluation hasn't completed, it will return | 208 // This is non-blocking, so if the evaluation hasn't completed, it will return |
183 // false. | 209 // false. |
184 func (e *cacheEntry) hasError() bool { | 210 func (e *cacheEntry) hasError() bool { |
185 if _, err := e.p.Peek(); err != nil && err != promise.ErrNoData { | 211 if _, err := e.p.Peek(); err != nil && err != promise.ErrNoData { |
186 return true | 212 return true |
187 } | 213 } |
188 return false | 214 return false |
189 } | 215 } |
190 | 216 |
191 // loadTerminalIndex loads a local cache of the stream's terminal index. This | 217 // loadTerminalIndex loads a local cache of the stream's terminal index. This |
192 // will be applied to all future get requests. | 218 // will be applied to all future get requests. |
193 func (e *cacheEntry) loadTerminalIndex(tidx types.MessageIndex) { | 219 func (e *cacheEntry) loadTerminalIndex(tidx types.MessageIndex) { |
194 » atomic.StoreInt64(&e.terminalIndex, int64(tidx)) | 220 » e.Lock() |
| 221 » defer e.Unlock() |
| 222 |
| 223 » e.terminalIndex = tidx |
195 } | 224 } |
OLD | NEW |