Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(772)

Unified Diff: common/gcloud/pubsub/ackbuffer/ackbuffer.go

Issue 1679023005: Add Context cancellation to clock. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Actually upload the patch. Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « common/clock/timer.go ('k') | common/gcloud/pubsub/ackbuffer/ackbuffer_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: common/gcloud/pubsub/ackbuffer/ackbuffer.go
diff --git a/common/gcloud/pubsub/ackbuffer/ackbuffer.go b/common/gcloud/pubsub/ackbuffer/ackbuffer.go
index b0dff0f36dca254e84b33e77d31ba7f23cf6b383..e828407daa3e833012e3731c21050e7d4f0f5f5d 100644
--- a/common/gcloud/pubsub/ackbuffer/ackbuffer.go
+++ b/common/gcloud/pubsub/ackbuffer/ackbuffer.go
@@ -8,12 +8,11 @@
package ackbuffer
import (
- "sync"
"time"
+ "github.com/luci/luci-go/common/clock"
"github.com/luci/luci-go/common/gcloud/pubsub"
log "github.com/luci/luci-go/common/logging"
- "github.com/luci/luci-go/common/meter"
"github.com/luci/luci-go/common/parallel"
"golang.org/x/net/context"
)
@@ -61,10 +60,13 @@ type AckBuffer struct {
cfg *Config
ctx context.Context
- meter meter.Meter
+ meterFinishedC chan struct{}
+ ackC chan string // Used to send ACK requests.
ackRequestC chan []string // Used to submit ACK requests to ACK goroutine.
ackFinishedC chan struct{} // Closed when ACK goroutine is finished.
+
+ ackReceivedC chan string // (Testing) if not nil, send received ACKs.
}
// New instantiates a new AckBuffer. The returned AckBuffer must have its
@@ -77,20 +79,75 @@ func New(ctx context.Context, c Config) *AckBuffer {
c.MaxParallelACK = DefaultMaxParallelACK
}
- b := &AckBuffer{
- cfg: &c,
- ctx: ctx,
- ackRequestC: make(chan []string),
- ackFinishedC: make(chan struct{}),
+ batchSize := c.Ack.AckBatchSize()
+ b := AckBuffer{
+ cfg: &c,
+ ctx: ctx,
+ ackC: make(chan string, batchSize),
+ meterFinishedC: make(chan struct{}),
+ ackRequestC: make(chan []string),
+ ackFinishedC: make(chan struct{}),
}
- b.meter = meter.New(ctx, meter.Config{
- Count: b.cfg.Ack.AckBatchSize(),
- Delay: b.cfg.MaxBufferTime,
- Callback: b.meterCallback,
- })
+
+ // Start a meter goroutine. This will buffer ACKs and send them at either
+ // capacity or timer intervals.
+ go func() {
+ defer close(b.ackRequestC)
+
+ // Create a timer. This will tick each time the buffer is empty and get a
+ // new ACK to track the longest time we've been buffering an ACK. We will
+ // reset the timer each time we clear the buffer.
+ timerRunning := false
+ timer := clock.NewTimer(ctx)
+ defer timer.Stop()
+
+ buf := make([]string, 0, batchSize)
+ send := func() {
+ if len(buf) > 0 {
+ ackIDs := make([]string, len(buf))
+ copy(ackIDs, buf)
+ b.ackRequestC <- ackIDs
+ buf = buf[:0]
+ }
+
+ timer.Stop()
+ timerRunning = false
+ }
+
+ // When terminating, flush any remaining ACKs in the buffer.
+ defer send()
+
+ // Ingest and dispatch ACKs.
+ for {
+ select {
+ case ack, ok := <-b.ackC:
+ if !ok {
+ // Closing, exit loop.
+ return
+ }
+ buf = append(buf, ack)
+ switch {
+ case len(buf) == cap(buf):
+ send()
+ case !timerRunning:
+ // Not at capacity yet, and our timer's not running, so start counting
+ // down.
+ timer.Reset(b.cfg.MaxBufferTime)
+ timerRunning = true
+ }
+
+ // (Testing) Notify when ACKs are received.
+ if b.ackReceivedC != nil {
+ b.ackReceivedC <- ack
+ }
+
+ case <-timer.GetC():
+ send()
+ }
+ }
+ }()
// Start our ACK loop.
- wg := sync.WaitGroup{}
go func() {
defer close(b.ackFinishedC)
@@ -102,51 +159,35 @@ func New(ctx context.Context, c Config) *AckBuffer {
// Take out an ACK token.
sem.Lock()
- wg.Add(1)
go func() {
- defer func() {
- sem.Unlock()
- wg.Done()
- }()
+ defer sem.Unlock()
b.acknowledge(req)
}()
}
// Block until all ACK goroutines finish.
- wg.Wait()
+ sem.TakeAll()
}()
- return b
+ return &b
}
// Ack enqueues a message's ACK ID for acknowledgement.
func (b *AckBuffer) Ack(id string) {
- b.meter.AddWait(id)
+ b.ackC <- id
}
// CloseAndFlush closes the AckBuffer, blocking until all pending ACKs are
// complete.
func (b *AckBuffer) CloseAndFlush() {
- b.meter.Stop()
+ // Close our ackC. This will terminate our meter goroutine, which will
+ // terminate our ACK goroutine.
+ close(b.ackC)
// Wait for ACK goroutine to terminate.
- close(b.ackRequestC)
<-b.ackFinishedC
}
-// meterCallback is the Meter callback that is invoked when a new batch of ACKs
-// is encountered.
-//
-// This shouldn't block if possible, else the Meter will block. However, if
-// ACK requests build up, this will block until they are finished.
-func (b *AckBuffer) meterCallback(work []interface{}) {
- ackIDs := make([]string, len(work))
- for idx, w := range work {
- ackIDs[idx] = w.(string)
- }
- b.ackRequestC <- ackIDs
-}
-
// acknowledge acknowledges a set of IDs.
//
// This method will discard the ACKs if they fail.
« no previous file with comments | « common/clock/timer.go ('k') | common/gcloud/pubsub/ackbuffer/ackbuffer_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698