Chromium Code Reviews| Index: common/gcloud/pubsub/ackbuffer/ackbuffer.go |
| diff --git a/common/gcloud/pubsub/ackbuffer/ackbuffer.go b/common/gcloud/pubsub/ackbuffer/ackbuffer.go |
| index b0dff0f36dca254e84b33e77d31ba7f23cf6b383..d55dc6c1544ce10fc2d68e5e00e2c749cc193cd3 100644 |
| --- a/common/gcloud/pubsub/ackbuffer/ackbuffer.go |
| +++ b/common/gcloud/pubsub/ackbuffer/ackbuffer.go |
| @@ -8,12 +8,11 @@ |
| package ackbuffer |
| import ( |
| - "sync" |
| "time" |
| + "github.com/luci/luci-go/common/clock" |
| "github.com/luci/luci-go/common/gcloud/pubsub" |
| log "github.com/luci/luci-go/common/logging" |
| - "github.com/luci/luci-go/common/meter" |
| "github.com/luci/luci-go/common/parallel" |
| "golang.org/x/net/context" |
| ) |
| @@ -61,10 +60,13 @@ type AckBuffer struct { |
| cfg *Config |
| ctx context.Context |
| - meter meter.Meter |
| + meterFinishedC chan struct{} |
| + ackC chan string // Used to send ACK requests. |
| ackRequestC chan []string // Used to submit ACK requests to ACK goroutine. |
| ackFinishedC chan struct{} // Closed when ACK goroutine is finished. |
| + |
| + ackReceivedC chan string // (Testing) if not nil, send received ACKs. |
| } |
| // New instantiates a new AckBuffer. The returned AckBuffer must have its |
| @@ -77,20 +79,70 @@ func New(ctx context.Context, c Config) *AckBuffer { |
| c.MaxParallelACK = DefaultMaxParallelACK |
| } |
| - b := &AckBuffer{ |
| - cfg: &c, |
| - ctx: ctx, |
| - ackRequestC: make(chan []string), |
| - ackFinishedC: make(chan struct{}), |
| + batchSize := c.Ack.AckBatchSize() |
| + b := AckBuffer{ |
| + cfg: &c, |
| + ctx: ctx, |
| + ackC: make(chan string, batchSize), |
| + meterFinishedC: make(chan struct{}), |
| + ackRequestC: make(chan []string), |
| + ackFinishedC: make(chan struct{}), |
| } |
| - b.meter = meter.New(ctx, meter.Config{ |
| - Count: b.cfg.Ack.AckBatchSize(), |
| - Delay: b.cfg.MaxBufferTime, |
| - Callback: b.meterCallback, |
| - }) |
| + |
| + // Start a meter goroutine. This will buffer ACKs and send them at either |
| + // capacity or timer intervals. |
| + go func() { |
|
dnj (Google)
2016/02/10 03:19:04
Sorry to heap this in, but it's somewhat bound to
|
| + defer close(b.ackRequestC) |
| + |
| + buf := make([]string, 0, batchSize) |
| + send := func() { |
| + if len(buf) > 0 { |
| + ackIDs := make([]string, len(buf)) |
| + copy(ackIDs, buf) |
| + b.ackRequestC <- ackIDs |
| + buf = buf[:0] |
| + } |
| + } |
| + |
| + // When terminating, flush any remaining ACKs in the buffer. |
| + defer send() |
| + |
| + // Ingest and dispatch ACKs. |
| + timerRunning := false |
| + timer := clock.NewTimer(ctx) |
| + defer timer.Stop() |
| + |
| + for { |
| + select { |
| + case ack, ok := <-b.ackC: |
| + if !ok { |
| + // Closing, exit loop. |
| + return |
| + } |
| + buf = append(buf, ack) |
| + switch { |
| + case len(buf) == cap(buf): |
| + send() |
| + case !timerRunning: |
| + // Not at capacity yet, and our timer's not running, so start counting |
| + // down. |
| + timer.Reset(b.cfg.MaxBufferTime) |
| + timerRunning = true |
| + } |
| + |
| + // (Testing) Notify when ACKs are received. |
| + if b.ackReceivedC != nil { |
| + b.ackReceivedC <- ack |
| + } |
| + |
| + case <-timer.GetC(): |
| + timerRunning = false |
| + send() |
| + } |
| + } |
| + }() |
| // Start our ACK loop. |
| - wg := sync.WaitGroup{} |
| go func() { |
| defer close(b.ackFinishedC) |
| @@ -102,51 +154,35 @@ func New(ctx context.Context, c Config) *AckBuffer { |
| // Take out an ACK token. |
| sem.Lock() |
| - wg.Add(1) |
| go func() { |
| - defer func() { |
| - sem.Unlock() |
| - wg.Done() |
| - }() |
| + defer sem.Unlock() |
| b.acknowledge(req) |
| }() |
| } |
| // Block until all ACK goroutines finish. |
| - wg.Wait() |
| + sem.TakeAll() |
| }() |
| - return b |
| + return &b |
| } |
| // Ack enqueues a message's ACK ID for acknowledgement. |
| func (b *AckBuffer) Ack(id string) { |
| - b.meter.AddWait(id) |
| + b.ackC <- id |
| } |
| // CloseAndFlush closes the AckBuffer, blocking until all pending ACKs are |
| // complete. |
| func (b *AckBuffer) CloseAndFlush() { |
| - b.meter.Stop() |
| + // Close our ackC. This will terminate our meter goroutine, which will |
| + // terminate our ACK goroutine. |
| + close(b.ackC) |
| // Wait for ACK goroutine to terminate. |
| - close(b.ackRequestC) |
| <-b.ackFinishedC |
| } |
| -// meterCallback is the Meter callback that is invoked when a new batch of ACKs |
| -// is encountered. |
| -// |
| -// This shouldn't block if possible, else the Meter will block. However, if |
| -// ACK requests build up, this will block until they are finished. |
| -func (b *AckBuffer) meterCallback(work []interface{}) { |
| - ackIDs := make([]string, len(work)) |
| - for idx, w := range work { |
| - ackIDs[idx] = w.(string) |
| - } |
| - b.ackRequestC <- ackIDs |
| -} |
| - |
| // acknowledge acknowledges a set of IDs. |
| // |
| // This method will discard the ACKs if they fail. |