Chromium Code Reviews| Index: common/gcloud/gcps/ackbuffer/ackbuffer.go |
| diff --git a/common/gcloud/gcps/ackbuffer/ackbuffer.go b/common/gcloud/gcps/ackbuffer/ackbuffer.go |
| index bbb566ff65aeb77b816ec5152f5c492501a81e62..035544240762d0583aff4c3f1b159e64f35b406f 100644 |
| --- a/common/gcloud/gcps/ackbuffer/ackbuffer.go |
| +++ b/common/gcloud/gcps/ackbuffer/ackbuffer.go |
| @@ -15,7 +15,6 @@ import ( |
| log "github.com/luci/luci-go/common/logging" |
| "github.com/luci/luci-go/common/meter" |
| "github.com/luci/luci-go/common/parallel" |
| - "github.com/luci/luci-go/common/retry" |
| "golang.org/x/net/context" |
| ) |
| @@ -35,21 +34,10 @@ const ( |
| // be discarded. |
| type DiscardCallback func(ackIDs []string) |
| -// PubSubACK sends ACKs to a Pub/Sub interface. |
| -// |
| -// gcps.PubSub naturally implements this interface. |
| -type PubSubACK interface { |
| - // Ack acknowledges one or more Pub/Sub message ACK IDs. |
| - Ack(s gcps.Subscription, ackIDs ...string) error |
| -} |
| - |
| // Config is a set of configuration parameters for an AckBuffer. |
| type Config struct { |
| - // PubSub is the Pub/Sub instance to ACK with. |
| - PubSub PubSubACK |
| - |
| - // Subscription is the name of the Pub/Sub subscription to ACK. |
| - Subscription gcps.Subscription |
| + // Ack is the Pub/Sub instance to ACK with. |
| + Ack Acknowledger |
| // MaxBufferTime is the maximum amount of time to buffer an ACK before sending it. |
| MaxBufferTime time.Duration |
| @@ -89,8 +77,6 @@ func New(ctx context.Context, c Config) *AckBuffer { |
| c.MaxParallelACK = DefaultMaxParallelACK |
| } |
| - ctx = log.SetField(ctx, "subscription", c.Subscription) |
| - |
| b := &AckBuffer{ |
| cfg: &c, |
| ctx: ctx, |
| @@ -98,7 +84,7 @@ func New(ctx context.Context, c Config) *AckBuffer { |
| ackFinishedC: make(chan struct{}), |
| } |
| b.meter = meter.New(ctx, meter.Config{ |
| - Count: gcps.MaxMessageAckPerRequest, |
| + Count: b.cfg.Ack.AckBatchSize(), |
| Delay: b.cfg.MaxBufferTime, |
| Callback: b.meterCallback, |
| }) |
| @@ -161,19 +147,11 @@ func (b *AckBuffer) meterCallback(work []interface{}) { |
| b.ackRequestC <- ackIDs |
| } |
| -// acknowledge acknowledges a set of IDs. It will retry on transient errors. |
| +// acknowledge acknowledges a set of IDs. |
| // |
| // This method will discard the ACKs if they fail. |
| func (b *AckBuffer) acknowledge(ackIDs []string) { |
| - err := retry.Retry(b.ctx, retry.TransientOnly(retry.Default()), func() error { |
| - return b.cfg.PubSub.Ack(b.cfg.Subscription, ackIDs...) |
| - }, func(err error, delay time.Duration) { |
| - log.Fields{ |
| - log.ErrorKey: err, |
| - "delay": delay, |
| - }.Warningf(b.ctx, "Error sending ACK; retrying.") |
| - }) |
| - if err != nil { |
| + if err := b.cfg.Ack.Ack(b.ctx, ackIDs...); err != nil { |
|
dnj (Google)
2016/01/21 04:36:24
User must supply a retry-enabled Pub/Sub instance
|
| log.Fields{ |
| log.ErrorKey: err, |
| "count": len(ackIDs), |