| OLD | NEW |
| (Empty) |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 // Package ackbuffer implements a Pub/Sub acknowledgement buffer capability. | |
| 6 // Pub/Sub ACKs will be collected and batched before being sent to Pub/Sub, | |
| 7 // with specific deadline enforcement. | |
| 8 package ackbuffer | |
| 9 | |
| 10 import ( | |
| 11 "time" | |
| 12 | |
| 13 "github.com/luci/luci-go/common/clock" | |
| 14 "github.com/luci/luci-go/common/gcloud/pubsub" | |
| 15 log "github.com/luci/luci-go/common/logging" | |
| 16 "github.com/luci/luci-go/common/parallel" | |
| 17 "golang.org/x/net/context" | |
| 18 ) | |
| 19 | |
| 20 const ( | |
| 21 // DefaultMaxBufferTime is the default amount of time that an ACK will r
emain | |
| 22 // buffered before being sent. | |
| 23 // | |
| 24 // We base this off the default acknowledgement delay. | |
| 25 DefaultMaxBufferTime = (pubsub.DefaultMaxAckDelay / 6) | |
| 26 | |
| 27 // DefaultMaxParallelACK is the default maximum number of simultaneous | |
| 28 // parallel ACK request goroutines. | |
| 29 DefaultMaxParallelACK = 16 | |
| 30 ) | |
| 31 | |
| 32 // DiscardCallback is a callback method that will be invoked if ACK IDs must | |
| 33 // be discarded. | |
| 34 type DiscardCallback func(ackIDs []string) | |
| 35 | |
| 36 // Config is a set of configuration parameters for an AckBuffer. | |
| 37 type Config struct { | |
| 38 // Ack is the Pub/Sub instance to ACK with. | |
| 39 Ack Acknowledger | |
| 40 | |
| 41 // MaxBufferTime is the maximum amount of time to buffer an ACK before s
ending it. | |
| 42 MaxBufferTime time.Duration | |
| 43 | |
| 44 // The maximum number of parallel ACK requests that can be simultaneousl
y | |
| 45 // open. If zero, DefaultMaxParallelACK will be used. | |
| 46 MaxParallelACK int | |
| 47 | |
| 48 // DiscardCallback is invoked when a series of ACK IDs is discarded afte
r | |
| 49 // repeated failures to ACK. If this is nil, no callback will be invoked
. | |
| 50 DiscardCallback DiscardCallback | |
| 51 } | |
| 52 | |
| 53 // AckBuffer buffers Pub/Sub ACK requests together and sends them in batches. | |
| 54 // If a batch of ACKs fails to send (after retries), it will be discarded with | |
| 55 // an optional callback. | |
| 56 // | |
| 57 // After ACKs are enqueued, the AckBuffer should be flushed via CloseAndFlush | |
| 58 // to ensure that all ACKs have had a time to be sent. | |
| 59 type AckBuffer struct { | |
| 60 cfg *Config | |
| 61 ctx context.Context | |
| 62 | |
| 63 meterFinishedC chan struct{} | |
| 64 | |
| 65 ackC chan string // Used to send ACK requests. | |
| 66 ackRequestC chan []string // Used to submit ACK requests to ACK gorouti
ne. | |
| 67 ackFinishedC chan struct{} // Closed when ACK goroutine is finished. | |
| 68 | |
| 69 ackReceivedC chan string // (Testing) if not nil, send received ACKs. | |
| 70 } | |
| 71 | |
| 72 // New instantiates a new AckBuffer. The returned AckBuffer must have its | |
| 73 // CloseAndFlush method invoked before terminating, else data loss may occur. | |
| 74 func New(ctx context.Context, c Config) *AckBuffer { | |
| 75 if c.MaxBufferTime <= 0 { | |
| 76 c.MaxBufferTime = DefaultMaxBufferTime | |
| 77 } | |
| 78 if c.MaxParallelACK <= 0 { | |
| 79 c.MaxParallelACK = DefaultMaxParallelACK | |
| 80 } | |
| 81 | |
| 82 batchSize := c.Ack.AckBatchSize() | |
| 83 b := AckBuffer{ | |
| 84 cfg: &c, | |
| 85 ctx: ctx, | |
| 86 ackC: make(chan string, batchSize), | |
| 87 meterFinishedC: make(chan struct{}), | |
| 88 ackRequestC: make(chan []string), | |
| 89 ackFinishedC: make(chan struct{}), | |
| 90 } | |
| 91 | |
| 92 // Start a meter goroutine. This will buffer ACKs and send them at eithe
r | |
| 93 // capacity or timer intervals. | |
| 94 go func() { | |
| 95 defer close(b.ackRequestC) | |
| 96 | |
| 97 // Create a timer. This will tick each time the buffer is empty
and get a | |
| 98 // new ACK to track the longest time we've been buffering an ACK
. We will | |
| 99 // reset the timer each time we clear the buffer. | |
| 100 timerRunning := false | |
| 101 timer := clock.NewTimer(ctx) | |
| 102 defer timer.Stop() | |
| 103 | |
| 104 buf := make([]string, 0, batchSize) | |
| 105 send := func() { | |
| 106 if len(buf) > 0 { | |
| 107 ackIDs := make([]string, len(buf)) | |
| 108 copy(ackIDs, buf) | |
| 109 b.ackRequestC <- ackIDs | |
| 110 buf = buf[:0] | |
| 111 } | |
| 112 | |
| 113 timer.Stop() | |
| 114 timerRunning = false | |
| 115 } | |
| 116 | |
| 117 // When terminating, flush any remaining ACKs in the buffer. | |
| 118 defer send() | |
| 119 | |
| 120 // Ingest and dispatch ACKs. | |
| 121 for { | |
| 122 select { | |
| 123 case ack, ok := <-b.ackC: | |
| 124 if !ok { | |
| 125 // Closing, exit loop. | |
| 126 return | |
| 127 } | |
| 128 buf = append(buf, ack) | |
| 129 switch { | |
| 130 case len(buf) == cap(buf): | |
| 131 send() | |
| 132 case !timerRunning: | |
| 133 // Not at capacity yet, and our timer's
not running, so start counting | |
| 134 // down. | |
| 135 timer.Reset(b.cfg.MaxBufferTime) | |
| 136 timerRunning = true | |
| 137 } | |
| 138 | |
| 139 // (Testing) Notify when ACKs are received. | |
| 140 if b.ackReceivedC != nil { | |
| 141 b.ackReceivedC <- ack | |
| 142 } | |
| 143 | |
| 144 case <-timer.GetC(): | |
| 145 // (Ignores context cancellation) | |
| 146 send() | |
| 147 } | |
| 148 } | |
| 149 }() | |
| 150 | |
| 151 // Start our ACK loop. | |
| 152 go func() { | |
| 153 defer close(b.ackFinishedC) | |
| 154 | |
| 155 // Allocate and populate a set of ACK tokens. This will be used
as a | |
| 156 // semaphore to control the number of parallel ACK requests. | |
| 157 sem := make(parallel.Semaphore, b.cfg.MaxParallelACK) | |
| 158 for req := range b.ackRequestC { | |
| 159 req := req | |
| 160 | |
| 161 // Take out an ACK token. | |
| 162 sem.Lock() | |
| 163 go func() { | |
| 164 defer sem.Unlock() | |
| 165 b.acknowledge(req) | |
| 166 }() | |
| 167 } | |
| 168 | |
| 169 // Block until all ACK goroutines finish. | |
| 170 sem.TakeAll() | |
| 171 }() | |
| 172 | |
| 173 return &b | |
| 174 } | |
| 175 | |
| 176 // Ack enqueues a message's ACK ID for acknowledgement. | |
| 177 func (b *AckBuffer) Ack(id string) { | |
| 178 b.ackC <- id | |
| 179 } | |
| 180 | |
| 181 // CloseAndFlush closes the AckBuffer, blocking until all pending ACKs are | |
| 182 // complete. | |
| 183 func (b *AckBuffer) CloseAndFlush() { | |
| 184 // Close our ackC. This will terminate our meter goroutine, which will | |
| 185 // terminate our ACK goroutine. | |
| 186 close(b.ackC) | |
| 187 | |
| 188 // Wait for ACK goroutine to terminate. | |
| 189 <-b.ackFinishedC | |
| 190 } | |
| 191 | |
| 192 // acknowledge acknowledges a set of IDs. | |
| 193 // | |
| 194 // This method will discard the ACKs if they fail. | |
| 195 func (b *AckBuffer) acknowledge(ackIDs []string) { | |
| 196 log.Fields{ | |
| 197 "count": len(ackIDs), | |
| 198 }.Infof(b.ctx, "Sending ACKs.") | |
| 199 | |
| 200 if err := b.cfg.Ack.Ack(b.ctx, ackIDs...); err != nil { | |
| 201 log.Fields{ | |
| 202 log.ErrorKey: err, | |
| 203 "count": len(ackIDs), | |
| 204 }.Errorf(b.ctx, "Failed to ACK.") | |
| 205 if b.cfg.DiscardCallback != nil { | |
| 206 b.cfg.DiscardCallback(ackIDs) | |
| 207 } | |
| 208 } | |
| 209 } | |
| OLD | NEW |