| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package coordinator | 5 package coordinator |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "sync" | 8 "sync" |
| 9 "time" | 9 "time" |
| 10 | 10 |
| 11 "github.com/luci/luci-go/common/clock" | 11 "github.com/luci/luci-go/common/clock" |
| 12 "github.com/luci/luci-go/common/config" | 12 "github.com/luci/luci-go/common/config" |
| 13 "github.com/luci/luci-go/common/errors" |
| 13 "github.com/luci/luci-go/common/logdog/types" | 14 "github.com/luci/luci-go/common/logdog/types" |
| 14 log "github.com/luci/luci-go/common/logging" | 15 log "github.com/luci/luci-go/common/logging" |
| 15 "github.com/luci/luci-go/common/lru" | 16 "github.com/luci/luci-go/common/lru" |
| 16 "github.com/luci/luci-go/common/promise" | 17 "github.com/luci/luci-go/common/promise" |
| 17 "github.com/luci/luci-go/common/tsmon/field" | 18 "github.com/luci/luci-go/common/tsmon/field" |
| 18 "github.com/luci/luci-go/common/tsmon/metric" | 19 "github.com/luci/luci-go/common/tsmon/metric" |
| 19 "golang.org/x/net/context" | 20 "golang.org/x/net/context" |
| 20 ) | 21 ) |
| 21 | 22 |
| 22 const ( | 23 const ( |
| (...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 77 key := cacheEntryKey{ | 78 key := cacheEntryKey{ |
| 78 project: st.Project, | 79 project: st.Project, |
| 79 path: st.Path, | 80 path: st.Path, |
| 80 } | 81 } |
| 81 | 82 |
| 82 // Get the cacheEntry from our cache. If it is expired, doesn't exist, o
r | 83 // Get the cacheEntry from our cache. If it is expired, doesn't exist, o
r |
| 83 // we're forcing, ignore any existing entry and replace with a Promise p
ending | 84 // we're forcing, ignore any existing entry and replace with a Promise p
ending |
| 84 // Coordinator sync. | 85 // Coordinator sync. |
| 85 cacheHit := false | 86 cacheHit := false |
| 86 entry := c.lru.Mutate(key, func(current interface{}) interface{} { | 87 entry := c.lru.Mutate(key, func(current interface{}) interface{} { |
| 87 » » // Don't replace an existing entry, unless it has an error or ha
s expired. | 88 » » // Don't replace an existing entry, unless it has a transient er
ror or has |
| 89 » » // expired. |
| 88 if current != nil { | 90 if current != nil { |
| 89 curEntry := current.(*cacheEntry) | 91 curEntry := current.(*cacheEntry) |
| 90 » » » if !curEntry.hasError() && now.Before(curEntry.expiresAt
) { | 92 » » » if !curEntry.hasTransientError() && now.Before(curEntry.
expiresAt) { |
| 91 cacheHit = true | 93 cacheHit = true |
| 92 return current | 94 return current |
| 93 } | 95 } |
| 94 } | 96 } |
| 95 | 97 |
| 96 p := promise.New(func() (interface{}, error) { | 98 p := promise.New(func() (interface{}, error) { |
| 97 st, err := c.Coordinator.RegisterStream(ctx, st, desc) | 99 st, err := c.Coordinator.RegisterStream(ctx, st, desc) |
| 98 » » » if err != nil { | 100 » » » return st, err |
| 99 » » » » return nil, err | |
| 100 » » » } | |
| 101 » » » return st, nil | |
| 102 }) | 101 }) |
| 103 | 102 |
| 104 return &cacheEntry{ | 103 return &cacheEntry{ |
| 105 cacheEntryKey: cacheEntryKey{ | 104 cacheEntryKey: cacheEntryKey{ |
| 106 project: st.Project, | 105 project: st.Project, |
| 107 path: st.Path, | 106 path: st.Path, |
| 108 }, | 107 }, |
| 109 terminalIndex: -1, | 108 terminalIndex: -1, |
| 110 p: p, | 109 p: p, |
| 111 expiresAt: now.Add(c.expiration), | 110 expiresAt: now.Add(c.expiration), |
| 112 } | 111 } |
| 113 }).(*cacheEntry) | 112 }).(*cacheEntry) |
| 114 | 113 |
| 115 » // If there was an error, purge the erroneous entry from the cache so th
at | 114 » // If there was a transient error, purge the erroneous entry from the ca
che so |
| 116 » // the next "update" will re-fetch it. | 115 » // that the next "update" will re-fetch it. |
| 116 » // |
| 117 » // If the error was non-transient, we will retain it as the Promise retu
rn |
| 118 » // value and subsequent calls will also receive this error. |
| 117 st, err := entry.get(ctx) | 119 st, err := entry.get(ctx) |
| 118 » if err != nil { | 120 » if errors.IsTransient(err) { |
| 119 log.Fields{ | 121 log.Fields{ |
| 120 log.ErrorKey: err, | 122 log.ErrorKey: err, |
| 121 » » }.Errorf(ctx, "Error retrieving stream state.") | 123 » » }.Errorf(ctx, "Transient error retrieving stream state.") |
| 122 return nil, err | 124 return nil, err |
| 123 } | 125 } |
| 124 | 126 |
| 125 tsCache.Add(ctx, 1, cacheHit) | 127 tsCache.Add(ctx, 1, cacheHit) |
| 126 » return st, nil | 128 » return st, err |
| 127 } | 129 } |
| 128 | 130 |
| 129 func (c *cache) TerminateStream(ctx context.Context, st *LogStreamState) error { | 131 func (c *cache) TerminateStream(ctx context.Context, st *LogStreamState) error { |
| 130 key := cacheEntryKey{ | 132 key := cacheEntryKey{ |
| 131 project: st.Project, | 133 project: st.Project, |
| 132 path: st.Path, | 134 path: st.Path, |
| 133 } | 135 } |
| 134 | 136 |
| 135 // Immediately update our state cache to record the terminal index, if | 137 // Immediately update our state cache to record the terminal index, if |
| 136 // we have a state cache. | 138 // we have a state cache. |
| (...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 198 defer e.Unlock() | 200 defer e.Unlock() |
| 199 | 201 |
| 200 rp := *(promisedSt.(*LogStreamState)) | 202 rp := *(promisedSt.(*LogStreamState)) |
| 201 if rp.TerminalIndex < 0 { | 203 if rp.TerminalIndex < 0 { |
| 202 rp.TerminalIndex = e.terminalIndex | 204 rp.TerminalIndex = e.terminalIndex |
| 203 } | 205 } |
| 204 | 206 |
| 205 return &rp, nil | 207 return &rp, nil |
| 206 } | 208 } |
| 207 | 209 |
| 208 // hasError tests if this entry has completed evaluation with an error state. | 210 // hasTransientError tests if this entry has completed evaluation with an error
state. |
| 209 // This is non-blocking, so if the evaluation hasn't completed, it will return | 211 // This is non-blocking, so if the evaluation hasn't completed, it will return |
| 210 // false. | 212 // false. |
| 211 func (e *cacheEntry) hasError() bool { | 213 func (e *cacheEntry) hasTransientError() bool { |
| 212 » if _, err := e.p.Peek(); err != nil && err != promise.ErrNoData { | 214 » switch _, err := e.p.Peek(); err { |
| 213 » » return true | 215 » case nil, promise.ErrNoData: |
| 216 » » return false |
| 217 » default: |
| 218 » » return errors.IsTransient(err) |
| 214 } | 219 } |
| 215 return false | |
| 216 } | 220 } |
| 217 | 221 |
| 218 // loadTerminalIndex loads a local cache of the stream's terminal index. This | 222 // loadTerminalIndex loads a local cache of the stream's terminal index. This |
| 219 // will be applied to all future get requests. | 223 // will be applied to all future get requests. |
| 220 func (e *cacheEntry) loadTerminalIndex(tidx types.MessageIndex) { | 224 func (e *cacheEntry) loadTerminalIndex(tidx types.MessageIndex) { |
| 221 e.Lock() | 225 e.Lock() |
| 222 defer e.Unlock() | 226 defer e.Unlock() |
| 223 | 227 |
| 224 e.terminalIndex = tidx | 228 e.terminalIndex = tidx |
| 225 } | 229 } |
| OLD | NEW |