Index: common/gcloud/pubsub/ackbuffer/ackbuffer.go |
diff --git a/common/gcloud/pubsub/ackbuffer/ackbuffer.go b/common/gcloud/pubsub/ackbuffer/ackbuffer.go |
index b0dff0f36dca254e84b33e77d31ba7f23cf6b383..e828407daa3e833012e3731c21050e7d4f0f5f5d 100644 |
--- a/common/gcloud/pubsub/ackbuffer/ackbuffer.go |
+++ b/common/gcloud/pubsub/ackbuffer/ackbuffer.go |
@@ -8,12 +8,11 @@ |
package ackbuffer |
import ( |
- "sync" |
"time" |
+ "github.com/luci/luci-go/common/clock" |
"github.com/luci/luci-go/common/gcloud/pubsub" |
log "github.com/luci/luci-go/common/logging" |
- "github.com/luci/luci-go/common/meter" |
"github.com/luci/luci-go/common/parallel" |
"golang.org/x/net/context" |
) |
@@ -61,10 +60,13 @@ type AckBuffer struct { |
cfg *Config |
ctx context.Context |
- meter meter.Meter |
+ meterFinishedC chan struct{} |
+ ackC chan string // Used to send ACK requests. |
ackRequestC chan []string // Used to submit ACK requests to ACK goroutine. |
ackFinishedC chan struct{} // Closed when ACK goroutine is finished. |
+ |
+ ackReceivedC chan string // (Testing) if not nil, send received ACKs. |
} |
// New instantiates a new AckBuffer. The returned AckBuffer must have its |
@@ -77,20 +79,75 @@ func New(ctx context.Context, c Config) *AckBuffer { |
c.MaxParallelACK = DefaultMaxParallelACK |
} |
- b := &AckBuffer{ |
- cfg: &c, |
- ctx: ctx, |
- ackRequestC: make(chan []string), |
- ackFinishedC: make(chan struct{}), |
+ batchSize := c.Ack.AckBatchSize() |
+ b := AckBuffer{ |
+ cfg: &c, |
+ ctx: ctx, |
+ ackC: make(chan string, batchSize), |
+ meterFinishedC: make(chan struct{}), |
+ ackRequestC: make(chan []string), |
+ ackFinishedC: make(chan struct{}), |
} |
- b.meter = meter.New(ctx, meter.Config{ |
- Count: b.cfg.Ack.AckBatchSize(), |
- Delay: b.cfg.MaxBufferTime, |
- Callback: b.meterCallback, |
- }) |
+ |
+ // Start a meter goroutine. This will buffer ACKs and send them at either |
+ // capacity or timer intervals. |
+ go func() { |
+ defer close(b.ackRequestC) |
+ |
+ // Create a timer. This will tick each time the buffer is empty and get a |
+ // new ACK to track the longest time we've been buffering an ACK. We will |
+ // reset the timer each time we clear the buffer. |
+ timerRunning := false |
+ timer := clock.NewTimer(ctx) |
+ defer timer.Stop() |
+ |
+ buf := make([]string, 0, batchSize) |
+ send := func() { |
+ if len(buf) > 0 { |
+ ackIDs := make([]string, len(buf)) |
+ copy(ackIDs, buf) |
+ b.ackRequestC <- ackIDs |
+ buf = buf[:0] |
+ } |
+ |
+ timer.Stop() |
+ timerRunning = false |
+ } |
+ |
+ // When terminating, flush any remaining ACKs in the buffer. |
+ defer send() |
+ |
+ // Ingest and dispatch ACKs. |
+ for { |
+ select { |
+ case ack, ok := <-b.ackC: |
+ if !ok { |
+ // Closing, exit loop. |
+ return |
+ } |
+ buf = append(buf, ack) |
+ switch { |
+ case len(buf) == cap(buf): |
+ send() |
+ case !timerRunning: |
+ // Not at capacity yet, and our timer's not running, so start counting |
+ // down. |
+ timer.Reset(b.cfg.MaxBufferTime) |
+ timerRunning = true |
+ } |
+ |
+ // (Testing) Notify when ACKs are received. |
+ if b.ackReceivedC != nil { |
+ b.ackReceivedC <- ack |
+ } |
+ |
+ case <-timer.GetC(): |
+ send() |
+ } |
+ } |
+ }() |
// Start our ACK loop. |
- wg := sync.WaitGroup{} |
go func() { |
defer close(b.ackFinishedC) |
@@ -102,51 +159,35 @@ func New(ctx context.Context, c Config) *AckBuffer { |
// Take out an ACK token. |
sem.Lock() |
- wg.Add(1) |
go func() { |
- defer func() { |
- sem.Unlock() |
- wg.Done() |
- }() |
+ defer sem.Unlock() |
b.acknowledge(req) |
}() |
} |
// Block until all ACK goroutines finish. |
- wg.Wait() |
+ sem.TakeAll() |
}() |
- return b |
+ return &b |
} |
// Ack enqueues a message's ACK ID for acknowledgement. |
func (b *AckBuffer) Ack(id string) { |
- b.meter.AddWait(id) |
+ b.ackC <- id |
} |
// CloseAndFlush closes the AckBuffer, blocking until all pending ACKs are |
// complete. |
func (b *AckBuffer) CloseAndFlush() { |
- b.meter.Stop() |
+ // Close our ackC. This will terminate our meter goroutine, which will |
+ // terminate our ACK goroutine. |
+ close(b.ackC) |
// Wait for ACK goroutine to terminate. |
- close(b.ackRequestC) |
<-b.ackFinishedC |
} |
-// meterCallback is the Meter callback that is invoked when a new batch of ACKs |
-// is encountered. |
-// |
-// This shouldn't block if possible, else the Meter will block. However, if |
-// ACK requests build up, this will block until they are finished. |
-func (b *AckBuffer) meterCallback(work []interface{}) { |
- ackIDs := make([]string, len(work)) |
- for idx, w := range work { |
- ackIDs[idx] = w.(string) |
- } |
- b.ackRequestC <- ackIDs |
-} |
- |
// acknowledge acknowledges a set of IDs. |
// |
// This method will discard the ACKs if they fail. |