| Index: common/gcloud/pubsub/ackbuffer/ackbuffer.go
|
| diff --git a/common/gcloud/pubsub/ackbuffer/ackbuffer.go b/common/gcloud/pubsub/ackbuffer/ackbuffer.go
|
| index b0dff0f36dca254e84b33e77d31ba7f23cf6b383..d55dc6c1544ce10fc2d68e5e00e2c749cc193cd3 100644
|
| --- a/common/gcloud/pubsub/ackbuffer/ackbuffer.go
|
| +++ b/common/gcloud/pubsub/ackbuffer/ackbuffer.go
|
| @@ -8,12 +8,11 @@
|
| package ackbuffer
|
|
|
| import (
|
| - "sync"
|
| "time"
|
|
|
| + "github.com/luci/luci-go/common/clock"
|
| "github.com/luci/luci-go/common/gcloud/pubsub"
|
| log "github.com/luci/luci-go/common/logging"
|
| - "github.com/luci/luci-go/common/meter"
|
| "github.com/luci/luci-go/common/parallel"
|
| "golang.org/x/net/context"
|
| )
|
| @@ -61,10 +60,13 @@ type AckBuffer struct {
|
| cfg *Config
|
| ctx context.Context
|
|
|
| - meter meter.Meter
|
| + meterFinishedC chan struct{}
|
|
|
| + ackC chan string // Used to send ACK requests.
|
| ackRequestC chan []string // Used to submit ACK requests to ACK goroutine.
|
| ackFinishedC chan struct{} // Closed when ACK goroutine is finished.
|
| +
|
| + ackReceivedC chan string // (Testing) if not nil, send received ACKs.
|
| }
|
|
|
| // New instantiates a new AckBuffer. The returned AckBuffer must have its
|
| @@ -77,20 +79,70 @@ func New(ctx context.Context, c Config) *AckBuffer {
|
| c.MaxParallelACK = DefaultMaxParallelACK
|
| }
|
|
|
| - b := &AckBuffer{
|
| - cfg: &c,
|
| - ctx: ctx,
|
| - ackRequestC: make(chan []string),
|
| - ackFinishedC: make(chan struct{}),
|
| + batchSize := c.Ack.AckBatchSize()
|
| + b := AckBuffer{
|
| + cfg: &c,
|
| + ctx: ctx,
|
| + ackC: make(chan string, batchSize),
|
| + meterFinishedC: make(chan struct{}),
|
| + ackRequestC: make(chan []string),
|
| + ackFinishedC: make(chan struct{}),
|
| }
|
| - b.meter = meter.New(ctx, meter.Config{
|
| - Count: b.cfg.Ack.AckBatchSize(),
|
| - Delay: b.cfg.MaxBufferTime,
|
| - Callback: b.meterCallback,
|
| - })
|
| +
|
| + // Start a meter goroutine. This will buffer ACKs and send them at either
|
| + // capacity or timer intervals.
|
| + go func() {
|
| + defer close(b.ackRequestC)
|
| +
|
| + buf := make([]string, 0, batchSize)
|
| + send := func() {
|
| + if len(buf) > 0 {
|
| + ackIDs := make([]string, len(buf))
|
| + copy(ackIDs, buf)
|
| + b.ackRequestC <- ackIDs
|
| + buf = buf[:0]
|
| + }
|
| + }
|
| +
|
| + // When terminating, flush any remaining ACKs in the buffer.
|
| + defer send()
|
| +
|
| + // Ingest and dispatch ACKs.
|
| + timerRunning := false
|
| + timer := clock.NewTimer(ctx)
|
| + defer timer.Stop()
|
| +
|
| + for {
|
| + select {
|
| + case ack, ok := <-b.ackC:
|
| + if !ok {
|
| + // Closing, exit loop.
|
| + return
|
| + }
|
| + buf = append(buf, ack)
|
| + switch {
|
| + case len(buf) == cap(buf):
|
| + send()
|
| + case !timerRunning:
|
| + // Not at capacity yet, and our timer's not running, so start counting
|
| + // down.
|
| + timer.Reset(b.cfg.MaxBufferTime)
|
| + timerRunning = true
|
| + }
|
| +
|
| + // (Testing) Notify when ACKs are received.
|
| + if b.ackReceivedC != nil {
|
| + b.ackReceivedC <- ack
|
| + }
|
| +
|
| + case <-timer.GetC():
|
| + timerRunning = false
|
| + send()
|
| + }
|
| + }
|
| + }()
|
|
|
| // Start our ACK loop.
|
| - wg := sync.WaitGroup{}
|
| go func() {
|
| defer close(b.ackFinishedC)
|
|
|
| @@ -102,51 +154,35 @@ func New(ctx context.Context, c Config) *AckBuffer {
|
|
|
| // Take out an ACK token.
|
| sem.Lock()
|
| - wg.Add(1)
|
| go func() {
|
| - defer func() {
|
| - sem.Unlock()
|
| - wg.Done()
|
| - }()
|
| + defer sem.Unlock()
|
| b.acknowledge(req)
|
| }()
|
| }
|
|
|
| // Block until all ACK goroutines finish.
|
| - wg.Wait()
|
| + sem.TakeAll()
|
| }()
|
|
|
| - return b
|
| + return &b
|
| }
|
|
|
| // Ack enqueues a message's ACK ID for acknowledgement.
|
| func (b *AckBuffer) Ack(id string) {
|
| - b.meter.AddWait(id)
|
| + b.ackC <- id
|
| }
|
|
|
| // CloseAndFlush closes the AckBuffer, blocking until all pending ACKs are
|
| // complete.
|
| func (b *AckBuffer) CloseAndFlush() {
|
| - b.meter.Stop()
|
| + // Close our ackC. This will terminate our meter goroutine, which will
|
| + // terminate our ACK goroutine.
|
| + close(b.ackC)
|
|
|
| // Wait for ACK goroutine to terminate.
|
| - close(b.ackRequestC)
|
| <-b.ackFinishedC
|
| }
|
|
|
| -// meterCallback is the Meter callback that is invoked when a new batch of ACKs
|
| -// is encountered.
|
| -//
|
| -// This shouldn't block if possible, else the Meter will block. However, if
|
| -// ACK requests build up, this will block until they are finished.
|
| -func (b *AckBuffer) meterCallback(work []interface{}) {
|
| - ackIDs := make([]string, len(work))
|
| - for idx, w := range work {
|
| - ackIDs[idx] = w.(string)
|
| - }
|
| - b.ackRequestC <- ackIDs
|
| -}
|
| -
|
| // acknowledge acknowledges a set of IDs.
|
| //
|
| // This method will discard the ACKs if they fail.
|
|
|