| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package coordinator | 5 package coordinator |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 » "sync/atomic" | 8 » "sync" |
| 9 "time" | 9 "time" |
| 10 | 10 |
| 11 "github.com/luci/luci-go/common/clock" | 11 "github.com/luci/luci-go/common/clock" |
| 12 "github.com/luci/luci-go/common/config" |
| 12 "github.com/luci/luci-go/common/logdog/types" | 13 "github.com/luci/luci-go/common/logdog/types" |
| 13 log "github.com/luci/luci-go/common/logging" | 14 log "github.com/luci/luci-go/common/logging" |
| 14 "github.com/luci/luci-go/common/lru" | 15 "github.com/luci/luci-go/common/lru" |
| 15 "github.com/luci/luci-go/common/promise" | 16 "github.com/luci/luci-go/common/promise" |
| 16 "github.com/luci/luci-go/common/proto/logdog/logpb" | 17 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 17 "golang.org/x/net/context" | 18 "golang.org/x/net/context" |
| 18 ) | 19 ) |
| 19 | 20 |
| 20 const ( | 21 const ( |
| 21 // DefaultSize is the default (maximum) size of the LRU cache. | 22 // DefaultSize is the default (maximum) size of the LRU cache. |
| (...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 60 } | 61 } |
| 61 } | 62 } |
| 62 | 63 |
| 63 // RegisterStream invokes the wrapped Coordinator's RegisterStream method and | 64 // RegisterStream invokes the wrapped Coordinator's RegisterStream method and |
| 64 // caches the result. It uses a Promise to cause all simultaneous identical | 65 // caches the result. It uses a Promise to cause all simultaneous identical |
| 65 // RegisterStream requests to block on a single RPC. | 66 // RegisterStream requests to block on a single RPC. |
| 66 func (c *cache) RegisterStream(ctx context.Context, st *LogStreamState, d *logpb
.LogStreamDescriptor) ( | 67 func (c *cache) RegisterStream(ctx context.Context, st *LogStreamState, d *logpb
.LogStreamDescriptor) ( |
| 67 *LogStreamState, error) { | 68 *LogStreamState, error) { |
| 68 now := clock.Now(ctx) | 69 now := clock.Now(ctx) |
| 69 | 70 |
| 71 key := cacheEntryKey{ |
| 72 project: st.Project, |
| 73 path: st.Path, |
| 74 } |
| 75 |
| 70 // Get the cacheEntry from our cache. If it is expired, doesn't exist, o
r | 76 // Get the cacheEntry from our cache. If it is expired, doesn't exist, o
r |
| 71 // we're forcing, ignore any existing entry and replace with a Promise p
ending | 77 // we're forcing, ignore any existing entry and replace with a Promise p
ending |
| 72 // Coordinator sync. | 78 // Coordinator sync. |
| 73 » entry := c.lru.Mutate(st.Path, func(current interface{}) interface{} { | 79 » entry := c.lru.Mutate(key, func(current interface{}) interface{} { |
| 74 // Don't replace an existing entry, unless it has an error or ha
s expired. | 80 // Don't replace an existing entry, unless it has an error or ha
s expired. |
| 75 if current != nil { | 81 if current != nil { |
| 76 curEntry := current.(*cacheEntry) | 82 curEntry := current.(*cacheEntry) |
| 77 if !curEntry.hasError() && now.Before(curEntry.expiresAt
) { | 83 if !curEntry.hasError() && now.Before(curEntry.expiresAt
) { |
| 78 return current | 84 return current |
| 79 } | 85 } |
| 80 } | 86 } |
| 81 | 87 |
| 82 p := promise.New(func() (interface{}, error) { | 88 p := promise.New(func() (interface{}, error) { |
| 83 st, err := c.Coordinator.RegisterStream(ctx, st, d) | 89 st, err := c.Coordinator.RegisterStream(ctx, st, d) |
| 84 if err != nil { | 90 if err != nil { |
| 85 return nil, err | 91 return nil, err |
| 86 } | 92 } |
| 87 | 93 |
| 88 return &LogStreamState{ | 94 return &LogStreamState{ |
| 95 Project: st.Project, |
| 89 Path: st.Path, | 96 Path: st.Path, |
| 90 ProtoVersion: st.ProtoVersion, | 97 ProtoVersion: st.ProtoVersion, |
| 91 Secret: st.Secret, | 98 Secret: st.Secret, |
| 92 TerminalIndex: types.MessageIndex(st.TerminalInd
ex), | 99 TerminalIndex: types.MessageIndex(st.TerminalInd
ex), |
| 93 Archived: st.Archived, | 100 Archived: st.Archived, |
| 94 Purged: st.Purged, | 101 Purged: st.Purged, |
| 95 }, nil | 102 }, nil |
| 96 }) | 103 }) |
| 97 | 104 |
| 98 return &cacheEntry{ | 105 return &cacheEntry{ |
| 106 cacheEntryKey: cacheEntryKey{ |
| 107 project: st.Project, |
| 108 path: st.Path, |
| 109 }, |
| 99 terminalIndex: -1, | 110 terminalIndex: -1, |
| 100 p: p, | 111 p: p, |
| 101 path: st.Path, | |
| 102 expiresAt: now.Add(c.expiration), | 112 expiresAt: now.Add(c.expiration), |
| 103 } | 113 } |
| 104 }).(*cacheEntry) | 114 }).(*cacheEntry) |
| 105 | 115 |
| 106 // If there was an error, purge the erroneous entry from the cache so th
at | 116 // If there was an error, purge the erroneous entry from the cache so th
at |
| 107 // the next "update" will re-fetch it. | 117 // the next "update" will re-fetch it. |
| 108 st, err := entry.get(ctx) | 118 st, err := entry.get(ctx) |
| 109 if err != nil { | 119 if err != nil { |
| 110 log.Fields{ | 120 log.Fields{ |
| 111 log.ErrorKey: err, | 121 log.ErrorKey: err, |
| 112 }.Errorf(ctx, "Error retrieving stream state.") | 122 }.Errorf(ctx, "Error retrieving stream state.") |
| 113 return nil, err | 123 return nil, err |
| 114 } | 124 } |
| 115 return st, nil | 125 return st, nil |
| 116 } | 126 } |
| 117 | 127 |
| 118 func (c *cache) TerminateStream(ctx context.Context, st *LogStreamState) error { | 128 func (c *cache) TerminateStream(ctx context.Context, st *LogStreamState) error { |
| 129 key := cacheEntryKey{ |
| 130 project: st.Project, |
| 131 path: st.Path, |
| 132 } |
| 133 |
| 119 // Immediately update our state cache to record the terminal index, if | 134 // Immediately update our state cache to record the terminal index, if |
| 120 // we have a state cache. | 135 // we have a state cache. |
| 121 » c.lru.Mutate(st.Path, func(current interface{}) (r interface{}) { | 136 » c.lru.Mutate(key, func(current interface{}) (r interface{}) { |
| 122 // Always return the current entry. We're just atomically examin
ing it to | 137 // Always return the current entry. We're just atomically examin
ing it to |
| 123 // load it with a terminal index. | 138 // load it with a terminal index. |
| 124 r = current | 139 r = current |
| 125 if r != nil { | 140 if r != nil { |
| 126 r.(*cacheEntry).loadTerminalIndex(st.TerminalIndex) | 141 r.(*cacheEntry).loadTerminalIndex(st.TerminalIndex) |
| 127 } | 142 } |
| 128 return | 143 return |
| 129 }) | 144 }) |
| 130 | 145 |
| 131 return c.Coordinator.TerminateStream(ctx, st) | 146 return c.Coordinator.TerminateStream(ctx, st) |
| 132 } | 147 } |
| 133 | 148 |
| 149 // cacheEntryKey is the LRU key for a cacheEntry. |
| 150 type cacheEntryKey struct { |
| 151 project config.ProjectName |
| 152 path types.StreamPath |
| 153 } |
| 154 |
| 134 // cacheEntry is the value stored in the cache. It contains a Promise | 155 // cacheEntry is the value stored in the cache. It contains a Promise |
| 135 // representing the value and an optional invalidation singleton to ensure that | 156 // representing the value and an optional invalidation singleton to ensure that |
| 136 // if the state failed to fetch, it will be invalidated at most once. | 157 // if the state failed to fetch, it will be invalidated at most once. |
| 137 // | 158 // |
| 138 // In addition to remote caching via Promise, the state can be updated locally | 159 // In addition to remote caching via Promise, the state can be updated locally |
| 139 // by calling the cache's "put" method. In this case, the Promise will be nil, | 160 // by calling the cache's "put" method. In this case, the Promise will be nil, |
| 140 // and the state value will be populated. | 161 // and the state value will be populated. |
| 141 type cacheEntry struct { | 162 type cacheEntry struct { |
| 163 sync.Mutex |
| 164 cacheEntryKey |
| 165 |
| 142 // terminalIndex is the loaded terminal index set via loadTerminalIndex.
It | 166 // terminalIndex is the loaded terminal index set via loadTerminalIndex.
It |
| 143 // will be applied to returned LogStreamState objects so that once a ter
minal | 167 // will be applied to returned LogStreamState objects so that once a ter
minal |
| 144 // index has been set, we become aware of it in the Collector. | 168 // index has been set, we become aware of it in the Collector. |
| 145 » // | 169 » terminalIndex types.MessageIndex |
| 146 » // This MUST be the first field in the struct in order to comply with at
omic's | |
| 147 » // 64-bit alignment requirements. | |
| 148 » terminalIndex int64 | |
| 149 | 170 |
| 150 // p is a Promise that is blocking pending a Coordiantor stream state | 171 // p is a Promise that is blocking pending a Coordiantor stream state |
| 151 // response. Upon successful resolution, it will contain a *LogStreamSta
te. | 172 // response. Upon successful resolution, it will contain a *LogStreamSta
te. |
| 152 » p promise.Promise | 173 » p promise.Promise |
| 174 |
| 175 » project config.ProjectName |
| 153 path types.StreamPath | 176 path types.StreamPath |
| 154 expiresAt time.Time | 177 expiresAt time.Time |
| 155 } | 178 } |
| 156 | 179 |
| 157 // get returns the cached state that this entry owns, blocking until resolution | 180 // get returns the cached state that this entry owns, blocking until resolution |
| 158 // if necessary. | 181 // if necessary. |
| 159 // | 182 // |
| 160 // The returned state is a shallow copy of the cached state, and may be | 183 // The returned state is a shallow copy of the cached state, and may be |
| 161 // modified by the caller. | 184 // modified by the caller. |
| 162 func (e *cacheEntry) get(ctx context.Context) (*LogStreamState, error) { | 185 func (e *cacheEntry) get(ctx context.Context) (*LogStreamState, error) { |
| 163 promisedSt, err := e.p.Get(ctx) | 186 promisedSt, err := e.p.Get(ctx) |
| 164 if err != nil { | 187 if err != nil { |
| 165 return nil, err | 188 return nil, err |
| 166 } | 189 } |
| 167 | 190 |
| 168 // Create a clone of our cached value (not deep, so secret is not cloned
, but | 191 // Create a clone of our cached value (not deep, so secret is not cloned
, but |
| 169 // the Collector will not modify that). If we have a local terminal inde
x | 192 // the Collector will not modify that). If we have a local terminal inde
x |
| 170 // cached, apply that to the response. | 193 // cached, apply that to the response. |
| 171 // | 194 // |
| 172 // We need to lock around our terminalIndex. | 195 // We need to lock around our terminalIndex. |
| 196 e.Lock() |
| 197 defer e.Unlock() |
| 198 |
| 173 rp := *(promisedSt.(*LogStreamState)) | 199 rp := *(promisedSt.(*LogStreamState)) |
| 174 if rp.TerminalIndex < 0 { | 200 if rp.TerminalIndex < 0 { |
| 175 » » rp.TerminalIndex = types.MessageIndex(atomic.LoadInt64(&e.termin
alIndex)) | 201 » » rp.TerminalIndex = e.terminalIndex |
| 176 } | 202 } |
| 177 | 203 |
| 178 return &rp, nil | 204 return &rp, nil |
| 179 } | 205 } |
| 180 | 206 |
| 181 // hasError tests if this entry has completed evaluation with an error state. | 207 // hasError tests if this entry has completed evaluation with an error state. |
| 182 // This is non-blocking, so if the evaluation hasn't completed, it will return | 208 // This is non-blocking, so if the evaluation hasn't completed, it will return |
| 183 // false. | 209 // false. |
| 184 func (e *cacheEntry) hasError() bool { | 210 func (e *cacheEntry) hasError() bool { |
| 185 if _, err := e.p.Peek(); err != nil && err != promise.ErrNoData { | 211 if _, err := e.p.Peek(); err != nil && err != promise.ErrNoData { |
| 186 return true | 212 return true |
| 187 } | 213 } |
| 188 return false | 214 return false |
| 189 } | 215 } |
| 190 | 216 |
| 191 // loadTerminalIndex loads a local cache of the stream's terminal index. This | 217 // loadTerminalIndex loads a local cache of the stream's terminal index. This |
| 192 // will be applied to all future get requests. | 218 // will be applied to all future get requests. |
| 193 func (e *cacheEntry) loadTerminalIndex(tidx types.MessageIndex) { | 219 func (e *cacheEntry) loadTerminalIndex(tidx types.MessageIndex) { |
| 194 » atomic.StoreInt64(&e.terminalIndex, int64(tidx)) | 220 » e.Lock() |
| 221 » defer e.Unlock() |
| 222 |
| 223 » e.terminalIndex = tidx |
| 195 } | 224 } |
| OLD | NEW |