Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 // Package ackbuffer implements a Pub/Sub acknowledgement buffer capability. | 5 // Package ackbuffer implements a Pub/Sub acknowledgement buffer capability. |
| 6 // Pub/Sub ACKs will be collected and batched before being sent to Pub/Sub, | 6 // Pub/Sub ACKs will be collected and batched before being sent to Pub/Sub, |
| 7 // with specific deadline enforcement. | 7 // with specific deadline enforcement. |
| 8 package ackbuffer | 8 package ackbuffer |
| 9 | 9 |
| 10 import ( | 10 import ( |
| 11 "sync" | 11 "sync" |
| 12 "time" | 12 "time" |
| 13 | 13 |
| 14 "github.com/luci/luci-go/common/gcloud/gcps" | 14 "github.com/luci/luci-go/common/gcloud/gcps" |
| 15 log "github.com/luci/luci-go/common/logging" | 15 log "github.com/luci/luci-go/common/logging" |
| 16 "github.com/luci/luci-go/common/meter" | 16 "github.com/luci/luci-go/common/meter" |
| 17 "github.com/luci/luci-go/common/parallel" | 17 "github.com/luci/luci-go/common/parallel" |
| 18 "github.com/luci/luci-go/common/retry" | |
| 19 "golang.org/x/net/context" | 18 "golang.org/x/net/context" |
| 20 ) | 19 ) |
| 21 | 20 |
| 22 const ( | 21 const ( |
| 23 // DefaultMaxBufferTime is the default amount of time that an ACK will r emain | 22 // DefaultMaxBufferTime is the default amount of time that an ACK will r emain |
| 24 // buffered before being sent. | 23 // buffered before being sent. |
| 25 // | 24 // |
| 26 // We base this off the default acknowledgement delay. | 25 // We base this off the default acknowledgement delay. |
| 27 DefaultMaxBufferTime = (gcps.DefaultMaxAckDelay / 6) | 26 DefaultMaxBufferTime = (gcps.DefaultMaxAckDelay / 6) |
| 28 | 27 |
| 29 // DefaultMaxParallelACK is the default maximum number of simultaneous | 28 // DefaultMaxParallelACK is the default maximum number of simultaneous |
| 30 // parallel ACK request goroutines. | 29 // parallel ACK request goroutines. |
| 31 DefaultMaxParallelACK = 16 | 30 DefaultMaxParallelACK = 16 |
| 32 ) | 31 ) |
| 33 | 32 |
| 34 // DiscardCallback is a callback method that will be invoked if ACK IDs must | 33 // DiscardCallback is a callback method that will be invoked if ACK IDs must |
| 35 // be discarded. | 34 // be discarded. |
| 36 type DiscardCallback func(ackIDs []string) | 35 type DiscardCallback func(ackIDs []string) |
| 37 | 36 |
| 38 // PubSubACK sends ACKs to a Pub/Sub interface. | |
| 39 // | |
| 40 // gcps.PubSub naturally implements this interface. | |
| 41 type PubSubACK interface { | |
| 42 // Ack acknowledges one or more Pub/Sub message ACK IDs. | |
| 43 Ack(s gcps.Subscription, ackIDs ...string) error | |
| 44 } | |
| 45 | |
| 46 // Config is a set of configuration parameters for an AckBuffer. | 37 // Config is a set of configuration parameters for an AckBuffer. |
| 47 type Config struct { | 38 type Config struct { |
| 48 » // PubSub is the Pub/Sub instance to ACK with. | 39 » // Ack is the Pub/Sub instance to ACK with. |
| 49 » PubSub PubSubACK | 40 » Ack Acknowledger |
| 50 | |
| 51 » // Subscription is the name of the Pub/Sub subscription to ACK. | |
| 52 » Subscription gcps.Subscription | |
| 53 | 41 |
| 54 // MaxBufferTime is the maximum amount of time to buffer an ACK before s ending it. | 42 // MaxBufferTime is the maximum amount of time to buffer an ACK before s ending it. |
| 55 MaxBufferTime time.Duration | 43 MaxBufferTime time.Duration |
| 56 | 44 |
| 57 // The maximum number of parallel ACK requests that can be simultaneousl y | 45 // The maximum number of parallel ACK requests that can be simultaneousl y |
| 58 // open. If zero, DefaultMaxParallelACK will be used. | 46 // open. If zero, DefaultMaxParallelACK will be used. |
| 59 MaxParallelACK int | 47 MaxParallelACK int |
| 60 | 48 |
| 61 // DiscardCallback is invoked when a series of ACK IDs is discarded afte r | 49 // DiscardCallback is invoked when a series of ACK IDs is discarded afte r |
| 62 // repeated failures to ACK. If this is nil, no callback will be invoked . | 50 // repeated failures to ACK. If this is nil, no callback will be invoked . |
| (...skipping 19 matching lines...) Expand all Loading... | |
| 82 // New instantiates a new AckBuffer. The returned AckBuffer must have its | 70 // New instantiates a new AckBuffer. The returned AckBuffer must have its |
| 83 // CloseAndFlush method invoked before terminating, else data loss may occur. | 71 // CloseAndFlush method invoked before terminating, else data loss may occur. |
| 84 func New(ctx context.Context, c Config) *AckBuffer { | 72 func New(ctx context.Context, c Config) *AckBuffer { |
| 85 if c.MaxBufferTime <= 0 { | 73 if c.MaxBufferTime <= 0 { |
| 86 c.MaxBufferTime = DefaultMaxBufferTime | 74 c.MaxBufferTime = DefaultMaxBufferTime |
| 87 } | 75 } |
| 88 if c.MaxParallelACK <= 0 { | 76 if c.MaxParallelACK <= 0 { |
| 89 c.MaxParallelACK = DefaultMaxParallelACK | 77 c.MaxParallelACK = DefaultMaxParallelACK |
| 90 } | 78 } |
| 91 | 79 |
| 92 ctx = log.SetField(ctx, "subscription", c.Subscription) | |
| 93 | |
| 94 b := &AckBuffer{ | 80 b := &AckBuffer{ |
| 95 cfg: &c, | 81 cfg: &c, |
| 96 ctx: ctx, | 82 ctx: ctx, |
| 97 ackRequestC: make(chan []string), | 83 ackRequestC: make(chan []string), |
| 98 ackFinishedC: make(chan struct{}), | 84 ackFinishedC: make(chan struct{}), |
| 99 } | 85 } |
| 100 b.meter = meter.New(ctx, meter.Config{ | 86 b.meter = meter.New(ctx, meter.Config{ |
| 101 » » Count: gcps.MaxMessageAckPerRequest, | 87 » » Count: b.cfg.Ack.AckBatchSize(), |
| 102 Delay: b.cfg.MaxBufferTime, | 88 Delay: b.cfg.MaxBufferTime, |
| 103 Callback: b.meterCallback, | 89 Callback: b.meterCallback, |
| 104 }) | 90 }) |
| 105 | 91 |
| 106 // Start our ACK loop. | 92 // Start our ACK loop. |
| 107 wg := sync.WaitGroup{} | 93 wg := sync.WaitGroup{} |
| 108 go func() { | 94 go func() { |
| 109 defer close(b.ackFinishedC) | 95 defer close(b.ackFinishedC) |
| 110 | 96 |
| 111 // Allocate and populate a set of ACK tokens. This will be used as a | 97 // Allocate and populate a set of ACK tokens. This will be used as a |
| (...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 154 // This shouldn't block if possible, else the Meter will block. However, if | 140 // This shouldn't block if possible, else the Meter will block. However, if |
| 155 // ACK requests build up, this will block until they are finished. | 141 // ACK requests build up, this will block until they are finished. |
| 156 func (b *AckBuffer) meterCallback(work []interface{}) { | 142 func (b *AckBuffer) meterCallback(work []interface{}) { |
| 157 ackIDs := make([]string, len(work)) | 143 ackIDs := make([]string, len(work)) |
| 158 for idx, w := range work { | 144 for idx, w := range work { |
| 159 ackIDs[idx] = w.(string) | 145 ackIDs[idx] = w.(string) |
| 160 } | 146 } |
| 161 b.ackRequestC <- ackIDs | 147 b.ackRequestC <- ackIDs |
| 162 } | 148 } |
| 163 | 149 |
| 164 // acknowledge acknowledges a set of IDs. It will retry on transient errors. | 150 // acknowledge acknowledges a set of IDs. |
| 165 // | 151 // |
| 166 // This method will discard the ACKs if they fail. | 152 // This method will discard the ACKs if they fail. |
| 167 func (b *AckBuffer) acknowledge(ackIDs []string) { | 153 func (b *AckBuffer) acknowledge(ackIDs []string) { |
| 168 » err := retry.Retry(b.ctx, retry.TransientOnly(retry.Default()), func() e rror { | 154 » if err := b.cfg.Ack.Ack(b.ctx, ackIDs...); err != nil { |
|
dnj (Google)
2016/01/21 04:36:24
User must supply a retry-enabled Pub/Sub instance
| |
| 169 » » return b.cfg.PubSub.Ack(b.cfg.Subscription, ackIDs...) | |
| 170 » }, func(err error, delay time.Duration) { | |
| 171 » » log.Fields{ | |
| 172 » » » log.ErrorKey: err, | |
| 173 » » » "delay": delay, | |
| 174 » » }.Warningf(b.ctx, "Error sending ACK; retrying.") | |
| 175 » }) | |
| 176 » if err != nil { | |
| 177 log.Fields{ | 155 log.Fields{ |
| 178 log.ErrorKey: err, | 156 log.ErrorKey: err, |
| 179 "count": len(ackIDs), | 157 "count": len(ackIDs), |
| 180 }.Errorf(b.ctx, "Failed to ACK.") | 158 }.Errorf(b.ctx, "Failed to ACK.") |
| 181 if b.cfg.DiscardCallback != nil { | 159 if b.cfg.DiscardCallback != nil { |
| 182 b.cfg.DiscardCallback(ackIDs) | 160 b.cfg.DiscardCallback(ackIDs) |
| 183 } | 161 } |
| 184 } | 162 } |
| 185 } | 163 } |
| OLD | NEW |