| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package coordinator | 5 package coordinator |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "sync" | 8 "sync" |
| 9 "time" | 9 "time" |
| 10 | 10 |
| 11 "github.com/luci/luci-go/common/clock" | 11 "github.com/luci/luci-go/common/clock" |
| 12 "github.com/luci/luci-go/common/data/caching/lru" | 12 "github.com/luci/luci-go/common/data/caching/lru" |
| 13 "github.com/luci/luci-go/common/errors" | |
| 14 log "github.com/luci/luci-go/common/logging" | 13 log "github.com/luci/luci-go/common/logging" |
| 14 "github.com/luci/luci-go/common/retry" |
| 15 "github.com/luci/luci-go/common/sync/promise" | 15 "github.com/luci/luci-go/common/sync/promise" |
| 16 "github.com/luci/luci-go/common/tsmon/field" | 16 "github.com/luci/luci-go/common/tsmon/field" |
| 17 "github.com/luci/luci-go/common/tsmon/metric" | 17 "github.com/luci/luci-go/common/tsmon/metric" |
| 18 "github.com/luci/luci-go/logdog/common/types" | 18 "github.com/luci/luci-go/logdog/common/types" |
| 19 "github.com/luci/luci-go/luci_config/common/cfgtypes" | 19 "github.com/luci/luci-go/luci_config/common/cfgtypes" |
| 20 | 20 |
| 21 "golang.org/x/net/context" | 21 "golang.org/x/net/context" |
| 22 ) | 22 ) |
| 23 | 23 |
| 24 const ( | 24 const ( |
| (...skipping 168 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 193 | 193 |
| 194 p, tidx = ce.registerP, ce.terminalIndex | 194 p, tidx = ce.registerP, ce.terminalIndex |
| 195 }) | 195 }) |
| 196 | 196 |
| 197 // Resolve our registration Promise. | 197 // Resolve our registration Promise. |
| 198 remoteStateIface, err := p.Get(ctx) | 198 remoteStateIface, err := p.Get(ctx) |
| 199 if err != nil { | 199 if err != nil { |
| 200 // If the promise failed transiently, clear it so that subsequen
t callers | 200 // If the promise failed transiently, clear it so that subsequen
t callers |
| 201 // will regenerate a new promise. ONLY clear it if it it is the
same | 201 // will regenerate a new promise. ONLY clear it if it it is the
same |
| 202 // promise, as different callers may have already cleared/renger
ated it. | 202 // promise, as different callers may have already cleared/renger
ated it. |
| 203 » » if errors.IsTransient(err) { | 203 » » if retry.Tag.In(err) { |
| 204 ce.withLock(func() { | 204 ce.withLock(func() { |
| 205 if ce.registerP == p { | 205 if ce.registerP == p { |
| 206 ce.registerP = nil | 206 ce.registerP = nil |
| 207 } | 207 } |
| 208 }) | 208 }) |
| 209 } | 209 } |
| 210 return nil, err | 210 return nil, err |
| 211 } | 211 } |
| 212 remoteState := remoteStateIface.(*LogStreamState) | 212 remoteState := remoteStateIface.(*LogStreamState) |
| 213 | 213 |
| (...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 260 "cachedIndex": tidx, | 260 "cachedIndex": tidx, |
| 261 }.Warningf(ctx, "Request terminal index doesn't match ca
ched value.") | 261 }.Warningf(ctx, "Request terminal index doesn't match ca
ched value.") |
| 262 } | 262 } |
| 263 return nil | 263 return nil |
| 264 } | 264 } |
| 265 | 265 |
| 266 // Resolve our termination Promise. | 266 // Resolve our termination Promise. |
| 267 if _, err := p.Get(ctx); err != nil { | 267 if _, err := p.Get(ctx); err != nil { |
| 268 // If this is a transient error, delete this Promise so future t
ermination | 268 // If this is a transient error, delete this Promise so future t
ermination |
| 269 // attempts will retry for this stream. | 269 // attempts will retry for this stream. |
| 270 » » if errors.IsTransient(err) { | 270 » » if retry.Tag.In(err) { |
| 271 ce.withLock(func() { | 271 ce.withLock(func() { |
| 272 if ce.terminateP == p { | 272 if ce.terminateP == p { |
| 273 ce.terminateP = nil | 273 ce.terminateP = nil |
| 274 } | 274 } |
| 275 }) | 275 }) |
| 276 } | 276 } |
| 277 return err | 277 return err |
| 278 } | 278 } |
| 279 return nil | 279 return nil |
| 280 } | 280 } |
| (...skipping 15 matching lines...) Expand all Loading... |
| 296 ce.terminalIndex = tidx | 296 ce.terminalIndex = tidx |
| 297 } | 297 } |
| 298 }) | 298 }) |
| 299 } | 299 } |
| 300 | 300 |
| 301 func (ce *cacheEntry) withLock(f func()) { | 301 func (ce *cacheEntry) withLock(f func()) { |
| 302 ce.Lock() | 302 ce.Lock() |
| 303 defer ce.Unlock() | 303 defer ce.Unlock() |
| 304 f() | 304 f() |
| 305 } | 305 } |
| OLD | NEW |