Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 // Package ackbuffer implements a Pub/Sub acknowledgement buffer capability. | 5 // Package ackbuffer implements a Pub/Sub acknowledgement buffer capability. |
| 6 // Pub/Sub ACKs will be collected and batched before being sent to Pub/Sub, | 6 // Pub/Sub ACKs will be collected and batched before being sent to Pub/Sub, |
| 7 // with specific deadline enforcement. | 7 // with specific deadline enforcement. |
| 8 package ackbuffer | 8 package ackbuffer |
| 9 | 9 |
| 10 import ( | 10 import ( |
| 11 "sync" | |
| 12 "time" | 11 "time" |
| 13 | 12 |
| 13 "github.com/luci/luci-go/common/clock" | |
| 14 "github.com/luci/luci-go/common/gcloud/pubsub" | 14 "github.com/luci/luci-go/common/gcloud/pubsub" |
| 15 log "github.com/luci/luci-go/common/logging" | 15 log "github.com/luci/luci-go/common/logging" |
| 16 "github.com/luci/luci-go/common/meter" | |
| 17 "github.com/luci/luci-go/common/parallel" | 16 "github.com/luci/luci-go/common/parallel" |
| 18 "golang.org/x/net/context" | 17 "golang.org/x/net/context" |
| 19 ) | 18 ) |
| 20 | 19 |
| 21 const ( | 20 const ( |
| 22 // DefaultMaxBufferTime is the default amount of time that an ACK will r emain | 21 // DefaultMaxBufferTime is the default amount of time that an ACK will r emain |
| 23 // buffered before being sent. | 22 // buffered before being sent. |
| 24 // | 23 // |
| 25 // We base this off the default acknowledgement delay. | 24 // We base this off the default acknowledgement delay. |
| 26 DefaultMaxBufferTime = (pubsub.DefaultMaxAckDelay / 6) | 25 DefaultMaxBufferTime = (pubsub.DefaultMaxAckDelay / 6) |
| (...skipping 27 matching lines...) Expand all Loading... | |
| 54 // AckBuffer buffers Pub/Sub ACK requests together and sends them in batches. | 53 // AckBuffer buffers Pub/Sub ACK requests together and sends them in batches. |
| 55 // If a batch of ACKs fails to send (after retries), it will be discarded with | 54 // If a batch of ACKs fails to send (after retries), it will be discarded with |
| 56 // an optional callback. | 55 // an optional callback. |
| 57 // | 56 // |
| 58 // After ACKs are enqueued, the AckBuffer should be flushed via CloseAndFlush | 57 // After ACKs are enqueued, the AckBuffer should be flushed via CloseAndFlush |
| 59 // to ensure that all ACKs have had a time to be sent. | 58 // to ensure that all ACKs have had a time to be sent. |
| 60 type AckBuffer struct { | 59 type AckBuffer struct { |
| 61 cfg *Config | 60 cfg *Config |
| 62 ctx context.Context | 61 ctx context.Context |
| 63 | 62 |
| 64 » meter meter.Meter | 63 » meterFinishedC chan struct{} |
| 65 | 64 |
| 65 ackC chan string // Used to send ACK requests. | |
| 66 ackRequestC chan []string // Used to submit ACK requests to ACK gorouti ne. | 66 ackRequestC chan []string // Used to submit ACK requests to ACK gorouti ne. |
| 67 ackFinishedC chan struct{} // Closed when ACK goroutine is finished. | 67 ackFinishedC chan struct{} // Closed when ACK goroutine is finished. |
| 68 | |
| 69 ackReceivedC chan string // (Testing) if not nil, send received ACKs. | |
| 68 } | 70 } |
| 69 | 71 |
| 70 // New instantiates a new AckBuffer. The returned AckBuffer must have its | 72 // New instantiates a new AckBuffer. The returned AckBuffer must have its |
| 71 // CloseAndFlush method invoked before terminating, else data loss may occur. | 73 // CloseAndFlush method invoked before terminating, else data loss may occur. |
| 72 func New(ctx context.Context, c Config) *AckBuffer { | 74 func New(ctx context.Context, c Config) *AckBuffer { |
| 73 if c.MaxBufferTime <= 0 { | 75 if c.MaxBufferTime <= 0 { |
| 74 c.MaxBufferTime = DefaultMaxBufferTime | 76 c.MaxBufferTime = DefaultMaxBufferTime |
| 75 } | 77 } |
| 76 if c.MaxParallelACK <= 0 { | 78 if c.MaxParallelACK <= 0 { |
| 77 c.MaxParallelACK = DefaultMaxParallelACK | 79 c.MaxParallelACK = DefaultMaxParallelACK |
| 78 } | 80 } |
| 79 | 81 |
| 80 » b := &AckBuffer{ | 82 » batchSize := c.Ack.AckBatchSize() |
| 81 » » cfg: &c, | 83 » b := AckBuffer{ |
| 82 » » ctx: ctx, | 84 » » cfg: &c, |
| 83 » » ackRequestC: make(chan []string), | 85 » » ctx: ctx, |
| 84 » » ackFinishedC: make(chan struct{}), | 86 » » ackC: make(chan string, batchSize), |
| 87 » » meterFinishedC: make(chan struct{}), | |
| 88 » » ackRequestC: make(chan []string), | |
| 89 » » ackFinishedC: make(chan struct{}), | |
| 85 } | 90 } |
| 86 » b.meter = meter.New(ctx, meter.Config{ | 91 |
| 87 » » Count: b.cfg.Ack.AckBatchSize(), | 92 » // Start a meter goroutine. This will buffer ACKs and send them at eithe r |
| 88 » » Delay: b.cfg.MaxBufferTime, | 93 » // capacity or timer intervals. |
| 89 » » Callback: b.meterCallback, | 94 » go func() { |
|
dnj (Google)
2016/02/10 03:19:04
Sorry to heap this in, but it's somewhat bound to
| |
| 90 » }) | 95 » » defer close(b.ackRequestC) |
| 96 | |
| 97 » » buf := make([]string, 0, batchSize) | |
| 98 » » send := func() { | |
| 99 » » » if len(buf) > 0 { | |
| 100 » » » » ackIDs := make([]string, len(buf)) | |
| 101 » » » » copy(ackIDs, buf) | |
| 102 » » » » b.ackRequestC <- ackIDs | |
| 103 » » » » buf = buf[:0] | |
| 104 » » » } | |
| 105 » » } | |
| 106 | |
| 107 » » // When terminating, flush any remaining ACKs in the buffer. | |
| 108 » » defer send() | |
| 109 | |
| 110 » » // Ingest and dispatch ACKs. | |
| 111 » » timerRunning := false | |
| 112 » » timer := clock.NewTimer(ctx) | |
| 113 » » defer timer.Stop() | |
| 114 | |
| 115 » » for { | |
| 116 » » » select { | |
| 117 » » » case ack, ok := <-b.ackC: | |
| 118 » » » » if !ok { | |
| 119 » » » » » // Closing, exit loop. | |
| 120 » » » » » return | |
| 121 » » » » } | |
| 122 » » » » buf = append(buf, ack) | |
| 123 » » » » switch { | |
| 124 » » » » case len(buf) == cap(buf): | |
| 125 » » » » » send() | |
| 126 » » » » case !timerRunning: | |
| 127 » » » » » // Not at capacity yet, and our timer's not running, so start counting | |
| 128 » » » » » // down. | |
| 129 » » » » » timer.Reset(b.cfg.MaxBufferTime) | |
| 130 » » » » » timerRunning = true | |
| 131 » » » » } | |
| 132 | |
| 133 » » » » // (Testing) Notify when ACKs are received. | |
| 134 » » » » if b.ackReceivedC != nil { | |
| 135 » » » » » b.ackReceivedC <- ack | |
| 136 » » » » } | |
| 137 | |
| 138 » » » case <-timer.GetC(): | |
| 139 » » » » timerRunning = false | |
| 140 » » » » send() | |
| 141 » » » } | |
| 142 » » } | |
| 143 » }() | |
| 91 | 144 |
| 92 // Start our ACK loop. | 145 // Start our ACK loop. |
| 93 wg := sync.WaitGroup{} | |
| 94 go func() { | 146 go func() { |
| 95 defer close(b.ackFinishedC) | 147 defer close(b.ackFinishedC) |
| 96 | 148 |
| 97 // Allocate and populate a set of ACK tokens. This will be used as a | 149 // Allocate and populate a set of ACK tokens. This will be used as a |
| 98 // semaphore to control the number of parallel ACK requests. | 150 // semaphore to control the number of parallel ACK requests. |
| 99 sem := make(parallel.Semaphore, b.cfg.MaxParallelACK) | 151 sem := make(parallel.Semaphore, b.cfg.MaxParallelACK) |
| 100 for req := range b.ackRequestC { | 152 for req := range b.ackRequestC { |
| 101 req := req | 153 req := req |
| 102 | 154 |
| 103 // Take out an ACK token. | 155 // Take out an ACK token. |
| 104 sem.Lock() | 156 sem.Lock() |
| 105 wg.Add(1) | |
| 106 go func() { | 157 go func() { |
| 107 » » » » defer func() { | 158 » » » » defer sem.Unlock() |
| 108 » » » » » sem.Unlock() | |
| 109 » » » » » wg.Done() | |
| 110 » » » » }() | |
| 111 b.acknowledge(req) | 159 b.acknowledge(req) |
| 112 }() | 160 }() |
| 113 } | 161 } |
| 114 | 162 |
| 115 // Block until all ACK goroutines finish. | 163 // Block until all ACK goroutines finish. |
| 116 » » wg.Wait() | 164 » » sem.TakeAll() |
| 117 }() | 165 }() |
| 118 | 166 |
| 119 » return b | 167 » return &b |
| 120 } | 168 } |
| 121 | 169 |
| 122 // Ack enqueues a message's ACK ID for acknowledgement. | 170 // Ack enqueues a message's ACK ID for acknowledgement. |
| 123 func (b *AckBuffer) Ack(id string) { | 171 func (b *AckBuffer) Ack(id string) { |
| 124 » b.meter.AddWait(id) | 172 » b.ackC <- id |
| 125 } | 173 } |
| 126 | 174 |
| 127 // CloseAndFlush closes the AckBuffer, blocking until all pending ACKs are | 175 // CloseAndFlush closes the AckBuffer, blocking until all pending ACKs are |
| 128 // complete. | 176 // complete. |
| 129 func (b *AckBuffer) CloseAndFlush() { | 177 func (b *AckBuffer) CloseAndFlush() { |
| 130 » b.meter.Stop() | 178 » // Close our ackC. This will terminate our meter goroutine, which will |
| 179 » // terminate our ACK goroutine. | |
| 180 » close(b.ackC) | |
| 131 | 181 |
| 132 // Wait for ACK goroutine to terminate. | 182 // Wait for ACK goroutine to terminate. |
| 133 close(b.ackRequestC) | |
| 134 <-b.ackFinishedC | 183 <-b.ackFinishedC |
| 135 } | 184 } |
| 136 | 185 |
| 137 // meterCallback is the Meter callback that is invoked when a new batch of ACKs | |
| 138 // is encountered. | |
| 139 // | |
| 140 // This shouldn't block if possible, else the Meter will block. However, if | |
| 141 // ACK requests build up, this will block until they are finished. | |
| 142 func (b *AckBuffer) meterCallback(work []interface{}) { | |
| 143 ackIDs := make([]string, len(work)) | |
| 144 for idx, w := range work { | |
| 145 ackIDs[idx] = w.(string) | |
| 146 } | |
| 147 b.ackRequestC <- ackIDs | |
| 148 } | |
| 149 | |
| 150 // acknowledge acknowledges a set of IDs. | 186 // acknowledge acknowledges a set of IDs. |
| 151 // | 187 // |
| 152 // This method will discard the ACKs if they fail. | 188 // This method will discard the ACKs if they fail. |
| 153 func (b *AckBuffer) acknowledge(ackIDs []string) { | 189 func (b *AckBuffer) acknowledge(ackIDs []string) { |
| 154 if err := b.cfg.Ack.Ack(b.ctx, ackIDs...); err != nil { | 190 if err := b.cfg.Ack.Ack(b.ctx, ackIDs...); err != nil { |
| 155 log.Fields{ | 191 log.Fields{ |
| 156 log.ErrorKey: err, | 192 log.ErrorKey: err, |
| 157 "count": len(ackIDs), | 193 "count": len(ackIDs), |
| 158 }.Errorf(b.ctx, "Failed to ACK.") | 194 }.Errorf(b.ctx, "Failed to ACK.") |
| 159 if b.cfg.DiscardCallback != nil { | 195 if b.cfg.DiscardCallback != nil { |
| 160 b.cfg.DiscardCallback(ackIDs) | 196 b.cfg.DiscardCallback(ackIDs) |
| 161 } | 197 } |
| 162 } | 198 } |
| 163 } | 199 } |
| OLD | NEW |