Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(76)

Side by Side Diff: common/gcloud/pubsub/ackbuffer/ackbuffer.go

Issue 1838303002: Use native Pub/Sub library primitives. (Closed) Base URL: https://github.com/luci/luci-go@logdog-go1.6
Patch Set: Use "Topic" instead of "NewTopic" ... don't want to create :) Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « common/gcloud/pubsub/ackbuffer/ack.go ('k') | common/gcloud/pubsub/ackbuffer/ackbuffer_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 // Package ackbuffer implements a Pub/Sub acknowledgement buffer capability.
6 // Pub/Sub ACKs will be collected and batched before being sent to Pub/Sub,
7 // with specific deadline enforcement.
8 package ackbuffer
9
10 import (
11 "time"
12
13 "github.com/luci/luci-go/common/clock"
14 "github.com/luci/luci-go/common/gcloud/pubsub"
15 log "github.com/luci/luci-go/common/logging"
16 "github.com/luci/luci-go/common/parallel"
17 "golang.org/x/net/context"
18 )
19
20 const (
21 // DefaultMaxBufferTime is the default amount of time that an ACK will r emain
22 // buffered before being sent.
23 //
24 // We base this off the default acknowledgement delay.
25 DefaultMaxBufferTime = (pubsub.DefaultMaxAckDelay / 6)
26
27 // DefaultMaxParallelACK is the default maximum number of simultaneous
28 // parallel ACK request goroutines.
29 DefaultMaxParallelACK = 16
30 )
31
32 // DiscardCallback is a callback method that will be invoked if ACK IDs must
33 // be discarded.
34 type DiscardCallback func(ackIDs []string)
35
36 // Config is a set of configuration parameters for an AckBuffer.
37 type Config struct {
38 // Ack is the Pub/Sub instance to ACK with.
39 Ack Acknowledger
40
41 // MaxBufferTime is the maximum amount of time to buffer an ACK before s ending it.
42 MaxBufferTime time.Duration
43
44 // The maximum number of parallel ACK requests that can be simultaneousl y
45 // open. If zero, DefaultMaxParallelACK will be used.
46 MaxParallelACK int
47
48 // DiscardCallback is invoked when a series of ACK IDs is discarded afte r
49 // repeated failures to ACK. If this is nil, no callback will be invoked .
50 DiscardCallback DiscardCallback
51 }
52
53 // AckBuffer buffers Pub/Sub ACK requests together and sends them in batches.
54 // If a batch of ACKs fails to send (after retries), it will be discarded with
55 // an optional callback.
56 //
57 // After ACKs are enqueued, the AckBuffer should be flushed via CloseAndFlush
58 // to ensure that all ACKs have had a time to be sent.
59 type AckBuffer struct {
60 cfg *Config
61 ctx context.Context
62
63 meterFinishedC chan struct{}
64
65 ackC chan string // Used to send ACK requests.
66 ackRequestC chan []string // Used to submit ACK requests to ACK gorouti ne.
67 ackFinishedC chan struct{} // Closed when ACK goroutine is finished.
68
69 ackReceivedC chan string // (Testing) if not nil, send received ACKs.
70 }
71
72 // New instantiates a new AckBuffer. The returned AckBuffer must have its
73 // CloseAndFlush method invoked before terminating, else data loss may occur.
74 func New(ctx context.Context, c Config) *AckBuffer {
75 if c.MaxBufferTime <= 0 {
76 c.MaxBufferTime = DefaultMaxBufferTime
77 }
78 if c.MaxParallelACK <= 0 {
79 c.MaxParallelACK = DefaultMaxParallelACK
80 }
81
82 batchSize := c.Ack.AckBatchSize()
83 b := AckBuffer{
84 cfg: &c,
85 ctx: ctx,
86 ackC: make(chan string, batchSize),
87 meterFinishedC: make(chan struct{}),
88 ackRequestC: make(chan []string),
89 ackFinishedC: make(chan struct{}),
90 }
91
92 // Start a meter goroutine. This will buffer ACKs and send them at eithe r
93 // capacity or timer intervals.
94 go func() {
95 defer close(b.ackRequestC)
96
97 // Create a timer. This will tick each time the buffer is empty and get a
98 // new ACK to track the longest time we've been buffering an ACK . We will
99 // reset the timer each time we clear the buffer.
100 timerRunning := false
101 timer := clock.NewTimer(ctx)
102 defer timer.Stop()
103
104 buf := make([]string, 0, batchSize)
105 send := func() {
106 if len(buf) > 0 {
107 ackIDs := make([]string, len(buf))
108 copy(ackIDs, buf)
109 b.ackRequestC <- ackIDs
110 buf = buf[:0]
111 }
112
113 timer.Stop()
114 timerRunning = false
115 }
116
117 // When terminating, flush any remaining ACKs in the buffer.
118 defer send()
119
120 // Ingest and dispatch ACKs.
121 for {
122 select {
123 case ack, ok := <-b.ackC:
124 if !ok {
125 // Closing, exit loop.
126 return
127 }
128 buf = append(buf, ack)
129 switch {
130 case len(buf) == cap(buf):
131 send()
132 case !timerRunning:
133 // Not at capacity yet, and our timer's not running, so start counting
134 // down.
135 timer.Reset(b.cfg.MaxBufferTime)
136 timerRunning = true
137 }
138
139 // (Testing) Notify when ACKs are received.
140 if b.ackReceivedC != nil {
141 b.ackReceivedC <- ack
142 }
143
144 case <-timer.GetC():
145 // (Ignores context cancellation)
146 send()
147 }
148 }
149 }()
150
151 // Start our ACK loop.
152 go func() {
153 defer close(b.ackFinishedC)
154
155 // Allocate and populate a set of ACK tokens. This will be used as a
156 // semaphore to control the number of parallel ACK requests.
157 sem := make(parallel.Semaphore, b.cfg.MaxParallelACK)
158 for req := range b.ackRequestC {
159 req := req
160
161 // Take out an ACK token.
162 sem.Lock()
163 go func() {
164 defer sem.Unlock()
165 b.acknowledge(req)
166 }()
167 }
168
169 // Block until all ACK goroutines finish.
170 sem.TakeAll()
171 }()
172
173 return &b
174 }
175
176 // Ack enqueues a message's ACK ID for acknowledgement.
177 func (b *AckBuffer) Ack(id string) {
178 b.ackC <- id
179 }
180
181 // CloseAndFlush closes the AckBuffer, blocking until all pending ACKs are
182 // complete.
183 func (b *AckBuffer) CloseAndFlush() {
184 // Close our ackC. This will terminate our meter goroutine, which will
185 // terminate our ACK goroutine.
186 close(b.ackC)
187
188 // Wait for ACK goroutine to terminate.
189 <-b.ackFinishedC
190 }
191
192 // acknowledge acknowledges a set of IDs.
193 //
194 // This method will discard the ACKs if they fail.
195 func (b *AckBuffer) acknowledge(ackIDs []string) {
196 log.Fields{
197 "count": len(ackIDs),
198 }.Infof(b.ctx, "Sending ACKs.")
199
200 if err := b.cfg.Ack.Ack(b.ctx, ackIDs...); err != nil {
201 log.Fields{
202 log.ErrorKey: err,
203 "count": len(ackIDs),
204 }.Errorf(b.ctx, "Failed to ACK.")
205 if b.cfg.DiscardCallback != nil {
206 b.cfg.DiscardCallback(ackIDs)
207 }
208 }
209 }
OLDNEW
« no previous file with comments | « common/gcloud/pubsub/ackbuffer/ack.go ('k') | common/gcloud/pubsub/ackbuffer/ackbuffer_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698