Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(275)

Side by Side Diff: logdog/client/butler/output/pubsub/pubsubOutput.go

Issue 2951393002: [errors] de-specialize Transient in favor of Tags. (Closed)
Patch Set: more refactor Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package pubsub 5 package pubsub
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "fmt" 9 "fmt"
10 "sync" 10 "sync"
11 "time" 11 "time"
12 12
13 "github.com/luci/luci-go/common/clock" 13 "github.com/luci/luci-go/common/clock"
14 "github.com/luci/luci-go/common/data/recordio" 14 "github.com/luci/luci-go/common/data/recordio"
15 "github.com/luci/luci-go/common/errors" 15 "github.com/luci/luci-go/common/errors"
16 gcps "github.com/luci/luci-go/common/gcloud/pubsub" 16 gcps "github.com/luci/luci-go/common/gcloud/pubsub"
17 log "github.com/luci/luci-go/common/logging" 17 log "github.com/luci/luci-go/common/logging"
18 "github.com/luci/luci-go/common/retry" 18 "github.com/luci/luci-go/common/retry"
19 "github.com/luci/luci-go/common/retry/transient"
19 "github.com/luci/luci-go/grpc/grpcutil" 20 "github.com/luci/luci-go/grpc/grpcutil"
20 "github.com/luci/luci-go/logdog/api/logpb" 21 "github.com/luci/luci-go/logdog/api/logpb"
21 "github.com/luci/luci-go/logdog/client/butler/output" 22 "github.com/luci/luci-go/logdog/client/butler/output"
22 "github.com/luci/luci-go/logdog/client/butlerproto" 23 "github.com/luci/luci-go/logdog/client/butlerproto"
23 "github.com/luci/luci-go/logdog/common/types" 24 "github.com/luci/luci-go/logdog/common/types"
24 25
25 "cloud.google.com/go/pubsub" 26 "cloud.google.com/go/pubsub"
26 "golang.org/x/net/context" 27 "golang.org/x/net/context"
27 ) 28 )
28 29
(...skipping 158 matching lines...) Expand 10 before | Expand all | Expand 10 after
187 return &pubsub.Message{ 188 return &pubsub.Message{
188 Data: buf.Bytes(), 189 Data: buf.Bytes(),
189 }, nil 190 }, nil
190 } 191 }
191 192
192 // publishMessage handles an individual publish request. It will indefinitely 193 // publishMessage handles an individual publish request. It will indefinitely
193 // retry transient errors until the publish succeeds. 194 // retry transient errors until the publish succeeds.
194 func (o *pubSubOutput) publishMessage(message *pubsub.Message) error { 195 func (o *pubSubOutput) publishMessage(message *pubsub.Message) error {
195 var messageID string 196 var messageID string
196 transientErrors := 0 197 transientErrors := 0
197 » err := retry.Retry(o, retry.TransientOnly(indefiniteRetry), func() (err error) { 198 » err := retry.Retry(o, transient.Only(indefiniteRetry), func() (err error ) {
198 ctx := o.Context 199 ctx := o.Context
199 if o.RPCTimeout > 0 { 200 if o.RPCTimeout > 0 {
200 var cancelFunc context.CancelFunc 201 var cancelFunc context.CancelFunc
201 ctx, cancelFunc = clock.WithTimeout(o, o.RPCTimeout) 202 ctx, cancelFunc = clock.WithTimeout(o, o.RPCTimeout)
202 defer cancelFunc() 203 defer cancelFunc()
203 } 204 }
204 205
205 messageID, err = o.Topic.Publish(ctx, message) 206 messageID, err = o.Topic.Publish(ctx, message)
206 if err == context.DeadlineExceeded { 207 if err == context.DeadlineExceeded {
207 // If we hit our publish deadline, retry. 208 // If we hit our publish deadline, retry.
208 » » » err = errors.WrapTransient(err) 209 » » » err = transient.Tag.Apply(err)
209 } else { 210 } else {
210 err = grpcutil.WrapIfTransient(err) 211 err = grpcutil.WrapIfTransient(err)
211 } 212 }
212 return 213 return
213 }, func(err error, d time.Duration) { 214 }, func(err error, d time.Duration) {
214 log.Fields{ 215 log.Fields{
215 log.ErrorKey: err, 216 log.ErrorKey: err,
216 "delay": d, 217 "delay": d,
217 }.Warningf(o, "TRANSIENT error publishing messages; retrying..." ) 218 }.Warningf(o, "TRANSIENT error publishing messages; retrying..." )
218 transientErrors++ 219 transientErrors++
(...skipping 29 matching lines...) Expand all
248 // a maximum backoff. 249 // a maximum backoff.
249 func indefiniteRetry() retry.Iterator { 250 func indefiniteRetry() retry.Iterator {
250 return &retry.ExponentialBackoff{ 251 return &retry.ExponentialBackoff{
251 Limited: retry.Limited{ 252 Limited: retry.Limited{
252 Retries: -1, 253 Retries: -1,
253 Delay: 500 * time.Millisecond, 254 Delay: 500 * time.Millisecond,
254 }, 255 },
255 MaxDelay: 30 * time.Second, 256 MaxDelay: 30 * time.Second,
256 } 257 }
257 } 258 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698