Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(228)

Side by Side Diff: server/internal/logdog/collector/coordinator/cache.go

Issue 1971933002: LogDog: collector cache non-transient errors. (Closed) Base URL: https://github.com/luci/luci-go@logdog-butler-register-coordinator-prefix-expiration
Patch Set: Updated patchset dependency Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | server/internal/logdog/collector/coordinator/cache_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package coordinator 5 package coordinator
6 6
7 import ( 7 import (
8 "sync" 8 "sync"
9 "time" 9 "time"
10 10
11 "github.com/luci/luci-go/common/clock" 11 "github.com/luci/luci-go/common/clock"
12 "github.com/luci/luci-go/common/config" 12 "github.com/luci/luci-go/common/config"
13 "github.com/luci/luci-go/common/errors"
13 "github.com/luci/luci-go/common/logdog/types" 14 "github.com/luci/luci-go/common/logdog/types"
14 log "github.com/luci/luci-go/common/logging" 15 log "github.com/luci/luci-go/common/logging"
15 "github.com/luci/luci-go/common/lru" 16 "github.com/luci/luci-go/common/lru"
16 "github.com/luci/luci-go/common/promise" 17 "github.com/luci/luci-go/common/promise"
17 "github.com/luci/luci-go/common/tsmon/field" 18 "github.com/luci/luci-go/common/tsmon/field"
18 "github.com/luci/luci-go/common/tsmon/metric" 19 "github.com/luci/luci-go/common/tsmon/metric"
19 "golang.org/x/net/context" 20 "golang.org/x/net/context"
20 ) 21 )
21 22
22 const ( 23 const (
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
77 key := cacheEntryKey{ 78 key := cacheEntryKey{
78 project: st.Project, 79 project: st.Project,
79 path: st.Path, 80 path: st.Path,
80 } 81 }
81 82
82 // Get the cacheEntry from our cache. If it is expired, doesn't exist, o r 83 // Get the cacheEntry from our cache. If it is expired, doesn't exist, o r
83 // we're forcing, ignore any existing entry and replace with a Promise p ending 84 // we're forcing, ignore any existing entry and replace with a Promise p ending
84 // Coordinator sync. 85 // Coordinator sync.
85 cacheHit := false 86 cacheHit := false
86 entry := c.lru.Mutate(key, func(current interface{}) interface{} { 87 entry := c.lru.Mutate(key, func(current interface{}) interface{} {
87 » » // Don't replace an existing entry, unless it has an error or ha s expired. 88 » » // Don't replace an existing entry, unless it has a transient er ror or has
89 » » // expired.
88 if current != nil { 90 if current != nil {
89 curEntry := current.(*cacheEntry) 91 curEntry := current.(*cacheEntry)
90 » » » if !curEntry.hasError() && now.Before(curEntry.expiresAt ) { 92 » » » if !curEntry.hasTransientError() && now.Before(curEntry. expiresAt) {
91 cacheHit = true 93 cacheHit = true
92 return current 94 return current
93 } 95 }
94 } 96 }
95 97
96 p := promise.New(func() (interface{}, error) { 98 p := promise.New(func() (interface{}, error) {
97 st, err := c.Coordinator.RegisterStream(ctx, st, desc) 99 st, err := c.Coordinator.RegisterStream(ctx, st, desc)
98 » » » if err != nil { 100 » » » return st, err
99 » » » » return nil, err
100 » » » }
101 » » » return st, nil
102 }) 101 })
103 102
104 return &cacheEntry{ 103 return &cacheEntry{
105 cacheEntryKey: cacheEntryKey{ 104 cacheEntryKey: cacheEntryKey{
106 project: st.Project, 105 project: st.Project,
107 path: st.Path, 106 path: st.Path,
108 }, 107 },
109 terminalIndex: -1, 108 terminalIndex: -1,
110 p: p, 109 p: p,
111 expiresAt: now.Add(c.expiration), 110 expiresAt: now.Add(c.expiration),
112 } 111 }
113 }).(*cacheEntry) 112 }).(*cacheEntry)
114 113
115 » // If there was an error, purge the erroneous entry from the cache so th at 114 » // If there was a transient error, purge the erroneous entry from the ca che so
116 » // the next "update" will re-fetch it. 115 » // that the next "update" will re-fetch it.
116 » //
117 » // If the error was non-transient, we will retain it as the Promise retu rn
118 » // value and subsequent calls will also receive this error.
117 st, err := entry.get(ctx) 119 st, err := entry.get(ctx)
118 » if err != nil { 120 » if errors.IsTransient(err) {
119 log.Fields{ 121 log.Fields{
120 log.ErrorKey: err, 122 log.ErrorKey: err,
121 » » }.Errorf(ctx, "Error retrieving stream state.") 123 » » }.Errorf(ctx, "Transient error retrieving stream state.")
122 return nil, err 124 return nil, err
123 } 125 }
124 126
125 tsCache.Add(ctx, 1, cacheHit) 127 tsCache.Add(ctx, 1, cacheHit)
126 » return st, nil 128 » return st, err
127 } 129 }
128 130
129 func (c *cache) TerminateStream(ctx context.Context, st *LogStreamState) error { 131 func (c *cache) TerminateStream(ctx context.Context, st *LogStreamState) error {
130 key := cacheEntryKey{ 132 key := cacheEntryKey{
131 project: st.Project, 133 project: st.Project,
132 path: st.Path, 134 path: st.Path,
133 } 135 }
134 136
135 // Immediately update our state cache to record the terminal index, if 137 // Immediately update our state cache to record the terminal index, if
136 // we have a state cache. 138 // we have a state cache.
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
198 defer e.Unlock() 200 defer e.Unlock()
199 201
200 rp := *(promisedSt.(*LogStreamState)) 202 rp := *(promisedSt.(*LogStreamState))
201 if rp.TerminalIndex < 0 { 203 if rp.TerminalIndex < 0 {
202 rp.TerminalIndex = e.terminalIndex 204 rp.TerminalIndex = e.terminalIndex
203 } 205 }
204 206
205 return &rp, nil 207 return &rp, nil
206 } 208 }
207 209
208 // hasError tests if this entry has completed evaluation with an error state. 210 // hasTransientError tests if this entry has completed evaluation with an error state.
209 // This is non-blocking, so if the evaluation hasn't completed, it will return 211 // This is non-blocking, so if the evaluation hasn't completed, it will return
210 // false. 212 // false.
211 func (e *cacheEntry) hasError() bool { 213 func (e *cacheEntry) hasTransientError() bool {
212 » if _, err := e.p.Peek(); err != nil && err != promise.ErrNoData { 214 » switch _, err := e.p.Peek(); err {
213 » » return true 215 » case nil, promise.ErrNoData:
216 » » return false
217 » default:
218 » » return errors.IsTransient(err)
214 } 219 }
215 return false
216 } 220 }
217 221
218 // loadTerminalIndex loads a local cache of the stream's terminal index. This 222 // loadTerminalIndex loads a local cache of the stream's terminal index. This
219 // will be applied to all future get requests. 223 // will be applied to all future get requests.
220 func (e *cacheEntry) loadTerminalIndex(tidx types.MessageIndex) { 224 func (e *cacheEntry) loadTerminalIndex(tidx types.MessageIndex) {
221 e.Lock() 225 e.Lock()
222 defer e.Unlock() 226 defer e.Unlock()
223 227
224 e.terminalIndex = tidx 228 e.terminalIndex = tidx
225 } 229 }
OLDNEW
« no previous file with comments | « no previous file | server/internal/logdog/collector/coordinator/cache_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698