OLD | NEW |
1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package coordinator | 5 package coordinator |
6 | 6 |
7 import ( | 7 import ( |
8 "sync" | 8 "sync" |
9 "time" | 9 "time" |
10 | 10 |
11 "github.com/luci/luci-go/common/clock" | 11 "github.com/luci/luci-go/common/clock" |
12 "github.com/luci/luci-go/common/data/caching/lru" | 12 "github.com/luci/luci-go/common/data/caching/lru" |
13 "github.com/luci/luci-go/common/errors" | |
14 log "github.com/luci/luci-go/common/logging" | 13 log "github.com/luci/luci-go/common/logging" |
| 14 "github.com/luci/luci-go/common/retry/transient" |
15 "github.com/luci/luci-go/common/sync/promise" | 15 "github.com/luci/luci-go/common/sync/promise" |
16 "github.com/luci/luci-go/common/tsmon/field" | 16 "github.com/luci/luci-go/common/tsmon/field" |
17 "github.com/luci/luci-go/common/tsmon/metric" | 17 "github.com/luci/luci-go/common/tsmon/metric" |
18 "github.com/luci/luci-go/logdog/common/types" | 18 "github.com/luci/luci-go/logdog/common/types" |
19 "github.com/luci/luci-go/luci_config/common/cfgtypes" | 19 "github.com/luci/luci-go/luci_config/common/cfgtypes" |
20 | 20 |
21 "golang.org/x/net/context" | 21 "golang.org/x/net/context" |
22 ) | 22 ) |
23 | 23 |
24 const ( | 24 const ( |
(...skipping 168 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
193 | 193 |
194 p, tidx = ce.registerP, ce.terminalIndex | 194 p, tidx = ce.registerP, ce.terminalIndex |
195 }) | 195 }) |
196 | 196 |
197 // Resolve our registration Promise. | 197 // Resolve our registration Promise. |
198 remoteStateIface, err := p.Get(ctx) | 198 remoteStateIface, err := p.Get(ctx) |
199 if err != nil { | 199 if err != nil { |
200 // If the promise failed transiently, clear it so that subsequen
t callers | 200 // If the promise failed transiently, clear it so that subsequen
t callers |
201 // will regenerate a new promise. ONLY clear it if it it is the
same | 201 // will regenerate a new promise. ONLY clear it if it it is the
same |
202 // promise, as different callers may have already cleared/renger
ated it. | 202 // promise, as different callers may have already cleared/renger
ated it. |
203 » » if errors.IsTransient(err) { | 203 » » if transient.Tag.In(err) { |
204 ce.withLock(func() { | 204 ce.withLock(func() { |
205 if ce.registerP == p { | 205 if ce.registerP == p { |
206 ce.registerP = nil | 206 ce.registerP = nil |
207 } | 207 } |
208 }) | 208 }) |
209 } | 209 } |
210 return nil, err | 210 return nil, err |
211 } | 211 } |
212 remoteState := remoteStateIface.(*LogStreamState) | 212 remoteState := remoteStateIface.(*LogStreamState) |
213 | 213 |
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
260 "cachedIndex": tidx, | 260 "cachedIndex": tidx, |
261 }.Warningf(ctx, "Request terminal index doesn't match ca
ched value.") | 261 }.Warningf(ctx, "Request terminal index doesn't match ca
ched value.") |
262 } | 262 } |
263 return nil | 263 return nil |
264 } | 264 } |
265 | 265 |
266 // Resolve our termination Promise. | 266 // Resolve our termination Promise. |
267 if _, err := p.Get(ctx); err != nil { | 267 if _, err := p.Get(ctx); err != nil { |
268 // If this is a transient error, delete this Promise so future t
ermination | 268 // If this is a transient error, delete this Promise so future t
ermination |
269 // attempts will retry for this stream. | 269 // attempts will retry for this stream. |
270 » » if errors.IsTransient(err) { | 270 » » if transient.Tag.In(err) { |
271 ce.withLock(func() { | 271 ce.withLock(func() { |
272 if ce.terminateP == p { | 272 if ce.terminateP == p { |
273 ce.terminateP = nil | 273 ce.terminateP = nil |
274 } | 274 } |
275 }) | 275 }) |
276 } | 276 } |
277 return err | 277 return err |
278 } | 278 } |
279 return nil | 279 return nil |
280 } | 280 } |
(...skipping 15 matching lines...) Expand all Loading... |
296 ce.terminalIndex = tidx | 296 ce.terminalIndex = tidx |
297 } | 297 } |
298 }) | 298 }) |
299 } | 299 } |
300 | 300 |
301 func (ce *cacheEntry) withLock(f func()) { | 301 func (ce *cacheEntry) withLock(f func()) { |
302 ce.Lock() | 302 ce.Lock() |
303 defer ce.Unlock() | 303 defer ce.Unlock() |
304 f() | 304 f() |
305 } | 305 } |
OLD | NEW |