Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(253)

Side by Side Diff: common/gcloud/pubsub/ackbuffer/ackbuffer.go

Issue 1679023005: Add Context cancellation to clock. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Actually upload the patch. Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « common/clock/timer.go ('k') | common/gcloud/pubsub/ackbuffer/ackbuffer_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 // Package ackbuffer implements a Pub/Sub acknowledgement buffer capability. 5 // Package ackbuffer implements a Pub/Sub acknowledgement buffer capability.
6 // Pub/Sub ACKs will be collected and batched before being sent to Pub/Sub, 6 // Pub/Sub ACKs will be collected and batched before being sent to Pub/Sub,
7 // with specific deadline enforcement. 7 // with specific deadline enforcement.
8 package ackbuffer 8 package ackbuffer
9 9
10 import ( 10 import (
11 "sync"
12 "time" 11 "time"
13 12
13 "github.com/luci/luci-go/common/clock"
14 "github.com/luci/luci-go/common/gcloud/pubsub" 14 "github.com/luci/luci-go/common/gcloud/pubsub"
15 log "github.com/luci/luci-go/common/logging" 15 log "github.com/luci/luci-go/common/logging"
16 "github.com/luci/luci-go/common/meter"
17 "github.com/luci/luci-go/common/parallel" 16 "github.com/luci/luci-go/common/parallel"
18 "golang.org/x/net/context" 17 "golang.org/x/net/context"
19 ) 18 )
20 19
21 const ( 20 const (
22 // DefaultMaxBufferTime is the default amount of time that an ACK will r emain 21 // DefaultMaxBufferTime is the default amount of time that an ACK will r emain
23 // buffered before being sent. 22 // buffered before being sent.
24 // 23 //
25 // We base this off the default acknowledgement delay. 24 // We base this off the default acknowledgement delay.
26 DefaultMaxBufferTime = (pubsub.DefaultMaxAckDelay / 6) 25 DefaultMaxBufferTime = (pubsub.DefaultMaxAckDelay / 6)
(...skipping 27 matching lines...) Expand all
54 // AckBuffer buffers Pub/Sub ACK requests together and sends them in batches. 53 // AckBuffer buffers Pub/Sub ACK requests together and sends them in batches.
55 // If a batch of ACKs fails to send (after retries), it will be discarded with 54 // If a batch of ACKs fails to send (after retries), it will be discarded with
56 // an optional callback. 55 // an optional callback.
57 // 56 //
58 // After ACKs are enqueued, the AckBuffer should be flushed via CloseAndFlush 57 // After ACKs are enqueued, the AckBuffer should be flushed via CloseAndFlush
59 // to ensure that all ACKs have had a time to be sent. 58 // to ensure that all ACKs have had a time to be sent.
60 type AckBuffer struct { 59 type AckBuffer struct {
61 cfg *Config 60 cfg *Config
62 ctx context.Context 61 ctx context.Context
63 62
64 » meter meter.Meter 63 » meterFinishedC chan struct{}
65 64
65 ackC chan string // Used to send ACK requests.
66 ackRequestC chan []string // Used to submit ACK requests to ACK gorouti ne. 66 ackRequestC chan []string // Used to submit ACK requests to ACK gorouti ne.
67 ackFinishedC chan struct{} // Closed when ACK goroutine is finished. 67 ackFinishedC chan struct{} // Closed when ACK goroutine is finished.
68
69 ackReceivedC chan string // (Testing) if not nil, send received ACKs.
68 } 70 }
69 71
70 // New instantiates a new AckBuffer. The returned AckBuffer must have its 72 // New instantiates a new AckBuffer. The returned AckBuffer must have its
71 // CloseAndFlush method invoked before terminating, else data loss may occur. 73 // CloseAndFlush method invoked before terminating, else data loss may occur.
72 func New(ctx context.Context, c Config) *AckBuffer { 74 func New(ctx context.Context, c Config) *AckBuffer {
73 if c.MaxBufferTime <= 0 { 75 if c.MaxBufferTime <= 0 {
74 c.MaxBufferTime = DefaultMaxBufferTime 76 c.MaxBufferTime = DefaultMaxBufferTime
75 } 77 }
76 if c.MaxParallelACK <= 0 { 78 if c.MaxParallelACK <= 0 {
77 c.MaxParallelACK = DefaultMaxParallelACK 79 c.MaxParallelACK = DefaultMaxParallelACK
78 } 80 }
79 81
80 » b := &AckBuffer{ 82 » batchSize := c.Ack.AckBatchSize()
81 » » cfg: &c, 83 » b := AckBuffer{
82 » » ctx: ctx, 84 » » cfg: &c,
83 » » ackRequestC: make(chan []string), 85 » » ctx: ctx,
84 » » ackFinishedC: make(chan struct{}), 86 » » ackC: make(chan string, batchSize),
87 » » meterFinishedC: make(chan struct{}),
88 » » ackRequestC: make(chan []string),
89 » » ackFinishedC: make(chan struct{}),
85 } 90 }
86 » b.meter = meter.New(ctx, meter.Config{ 91
87 » » Count: b.cfg.Ack.AckBatchSize(), 92 » // Start a meter goroutine. This will buffer ACKs and send them at eithe r
88 » » Delay: b.cfg.MaxBufferTime, 93 » // capacity or timer intervals.
89 » » Callback: b.meterCallback, 94 » go func() {
90 » }) 95 » » defer close(b.ackRequestC)
96
97 » » // Create a timer. This will tick each time the buffer is empty and get a
98 » » // new ACK to track the longest time we've been buffering an ACK . We will
99 » » // reset the timer each time we clear the buffer.
100 » » timerRunning := false
101 » » timer := clock.NewTimer(ctx)
102 » » defer timer.Stop()
103
104 » » buf := make([]string, 0, batchSize)
105 » » send := func() {
106 » » » if len(buf) > 0 {
107 » » » » ackIDs := make([]string, len(buf))
108 » » » » copy(ackIDs, buf)
109 » » » » b.ackRequestC <- ackIDs
110 » » » » buf = buf[:0]
111 » » » }
112
113 » » » timer.Stop()
114 » » » timerRunning = false
115 » » }
116
117 » » // When terminating, flush any remaining ACKs in the buffer.
118 » » defer send()
119
120 » » // Ingest and dispatch ACKs.
121 » » for {
122 » » » select {
123 » » » case ack, ok := <-b.ackC:
124 » » » » if !ok {
125 » » » » » // Closing, exit loop.
126 » » » » » return
127 » » » » }
128 » » » » buf = append(buf, ack)
129 » » » » switch {
130 » » » » case len(buf) == cap(buf):
131 » » » » » send()
132 » » » » case !timerRunning:
133 » » » » » // Not at capacity yet, and our timer's not running, so start counting
134 » » » » » // down.
135 » » » » » timer.Reset(b.cfg.MaxBufferTime)
136 » » » » » timerRunning = true
137 » » » » }
138
139 » » » » // (Testing) Notify when ACKs are received.
140 » » » » if b.ackReceivedC != nil {
141 » » » » » b.ackReceivedC <- ack
142 » » » » }
143
144 » » » case <-timer.GetC():
145 » » » » send()
146 » » » }
147 » » }
148 » }()
91 149
92 // Start our ACK loop. 150 // Start our ACK loop.
93 wg := sync.WaitGroup{}
94 go func() { 151 go func() {
95 defer close(b.ackFinishedC) 152 defer close(b.ackFinishedC)
96 153
97 // Allocate and populate a set of ACK tokens. This will be used as a 154 // Allocate and populate a set of ACK tokens. This will be used as a
98 // semaphore to control the number of parallel ACK requests. 155 // semaphore to control the number of parallel ACK requests.
99 sem := make(parallel.Semaphore, b.cfg.MaxParallelACK) 156 sem := make(parallel.Semaphore, b.cfg.MaxParallelACK)
100 for req := range b.ackRequestC { 157 for req := range b.ackRequestC {
101 req := req 158 req := req
102 159
103 // Take out an ACK token. 160 // Take out an ACK token.
104 sem.Lock() 161 sem.Lock()
105 wg.Add(1)
106 go func() { 162 go func() {
107 » » » » defer func() { 163 » » » » defer sem.Unlock()
108 » » » » » sem.Unlock()
109 » » » » » wg.Done()
110 » » » » }()
111 b.acknowledge(req) 164 b.acknowledge(req)
112 }() 165 }()
113 } 166 }
114 167
115 // Block until all ACK goroutines finish. 168 // Block until all ACK goroutines finish.
116 » » wg.Wait() 169 » » sem.TakeAll()
117 }() 170 }()
118 171
119 » return b 172 » return &b
120 } 173 }
121 174
122 // Ack enqueues a message's ACK ID for acknowledgement. 175 // Ack enqueues a message's ACK ID for acknowledgement.
123 func (b *AckBuffer) Ack(id string) { 176 func (b *AckBuffer) Ack(id string) {
124 » b.meter.AddWait(id) 177 » b.ackC <- id
125 } 178 }
126 179
127 // CloseAndFlush closes the AckBuffer, blocking until all pending ACKs are 180 // CloseAndFlush closes the AckBuffer, blocking until all pending ACKs are
128 // complete. 181 // complete.
129 func (b *AckBuffer) CloseAndFlush() { 182 func (b *AckBuffer) CloseAndFlush() {
130 » b.meter.Stop() 183 » // Close our ackC. This will terminate our meter goroutine, which will
184 » // terminate our ACK goroutine.
185 » close(b.ackC)
131 186
132 // Wait for ACK goroutine to terminate. 187 // Wait for ACK goroutine to terminate.
133 close(b.ackRequestC)
134 <-b.ackFinishedC 188 <-b.ackFinishedC
135 } 189 }
136 190
137 // meterCallback is the Meter callback that is invoked when a new batch of ACKs
138 // is encountered.
139 //
140 // This shouldn't block if possible, else the Meter will block. However, if
141 // ACK requests build up, this will block until they are finished.
142 func (b *AckBuffer) meterCallback(work []interface{}) {
143 ackIDs := make([]string, len(work))
144 for idx, w := range work {
145 ackIDs[idx] = w.(string)
146 }
147 b.ackRequestC <- ackIDs
148 }
149
150 // acknowledge acknowledges a set of IDs. 191 // acknowledge acknowledges a set of IDs.
151 // 192 //
152 // This method will discard the ACKs if they fail. 193 // This method will discard the ACKs if they fail.
153 func (b *AckBuffer) acknowledge(ackIDs []string) { 194 func (b *AckBuffer) acknowledge(ackIDs []string) {
154 if err := b.cfg.Ack.Ack(b.ctx, ackIDs...); err != nil { 195 if err := b.cfg.Ack.Ack(b.ctx, ackIDs...); err != nil {
155 log.Fields{ 196 log.Fields{
156 log.ErrorKey: err, 197 log.ErrorKey: err,
157 "count": len(ackIDs), 198 "count": len(ackIDs),
158 }.Errorf(b.ctx, "Failed to ACK.") 199 }.Errorf(b.ctx, "Failed to ACK.")
159 if b.cfg.DiscardCallback != nil { 200 if b.cfg.DiscardCallback != nil {
160 b.cfg.DiscardCallback(ackIDs) 201 b.cfg.DiscardCallback(ackIDs)
161 } 202 }
162 } 203 }
163 } 204 }
OLDNEW
« no previous file with comments | « common/clock/timer.go ('k') | common/gcloud/pubsub/ackbuffer/ackbuffer_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698