OLD | NEW |
---|---|
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 // Package ackbuffer implements a Pub/Sub acknowledgement buffer capability. | 5 // Package ackbuffer implements a Pub/Sub acknowledgement buffer capability. |
6 // Pub/Sub ACKs will be collected and batched before being sent to Pub/Sub, | 6 // Pub/Sub ACKs will be collected and batched before being sent to Pub/Sub, |
7 // with specific deadline enforcement. | 7 // with specific deadline enforcement. |
8 package ackbuffer | 8 package ackbuffer |
9 | 9 |
10 import ( | 10 import ( |
11 "sync" | |
12 "time" | 11 "time" |
13 | 12 |
13 "github.com/luci/luci-go/common/clock" | |
14 "github.com/luci/luci-go/common/gcloud/pubsub" | 14 "github.com/luci/luci-go/common/gcloud/pubsub" |
15 log "github.com/luci/luci-go/common/logging" | 15 log "github.com/luci/luci-go/common/logging" |
16 "github.com/luci/luci-go/common/meter" | |
17 "github.com/luci/luci-go/common/parallel" | 16 "github.com/luci/luci-go/common/parallel" |
18 "golang.org/x/net/context" | 17 "golang.org/x/net/context" |
19 ) | 18 ) |
20 | 19 |
21 const ( | 20 const ( |
22 // DefaultMaxBufferTime is the default amount of time that an ACK will r emain | 21 // DefaultMaxBufferTime is the default amount of time that an ACK will r emain |
23 // buffered before being sent. | 22 // buffered before being sent. |
24 // | 23 // |
25 // We base this off the default acknowledgement delay. | 24 // We base this off the default acknowledgement delay. |
26 DefaultMaxBufferTime = (pubsub.DefaultMaxAckDelay / 6) | 25 DefaultMaxBufferTime = (pubsub.DefaultMaxAckDelay / 6) |
(...skipping 27 matching lines...) Expand all Loading... | |
54 // AckBuffer buffers Pub/Sub ACK requests together and sends them in batches. | 53 // AckBuffer buffers Pub/Sub ACK requests together and sends them in batches. |
55 // If a batch of ACKs fails to send (after retries), it will be discarded with | 54 // If a batch of ACKs fails to send (after retries), it will be discarded with |
56 // an optional callback. | 55 // an optional callback. |
57 // | 56 // |
58 // After ACKs are enqueued, the AckBuffer should be flushed via CloseAndFlush | 57 // After ACKs are enqueued, the AckBuffer should be flushed via CloseAndFlush |
59 // to ensure that all ACKs have had a time to be sent. | 58 // to ensure that all ACKs have had a time to be sent. |
60 type AckBuffer struct { | 59 type AckBuffer struct { |
61 cfg *Config | 60 cfg *Config |
62 ctx context.Context | 61 ctx context.Context |
63 | 62 |
64 » meter meter.Meter | 63 » meterFinishedC chan struct{} |
65 | 64 |
65 ackC chan string // Used to send ACK requests. | |
66 ackRequestC chan []string // Used to submit ACK requests to ACK gorouti ne. | 66 ackRequestC chan []string // Used to submit ACK requests to ACK gorouti ne. |
67 ackFinishedC chan struct{} // Closed when ACK goroutine is finished. | 67 ackFinishedC chan struct{} // Closed when ACK goroutine is finished. |
68 | |
69 ackReceivedC chan string // (Testing) if not nil, send received ACKs. | |
68 } | 70 } |
69 | 71 |
70 // New instantiates a new AckBuffer. The returned AckBuffer must have its | 72 // New instantiates a new AckBuffer. The returned AckBuffer must have its |
71 // CloseAndFlush method invoked before terminating, else data loss may occur. | 73 // CloseAndFlush method invoked before terminating, else data loss may occur. |
72 func New(ctx context.Context, c Config) *AckBuffer { | 74 func New(ctx context.Context, c Config) *AckBuffer { |
73 if c.MaxBufferTime <= 0 { | 75 if c.MaxBufferTime <= 0 { |
74 c.MaxBufferTime = DefaultMaxBufferTime | 76 c.MaxBufferTime = DefaultMaxBufferTime |
75 } | 77 } |
76 if c.MaxParallelACK <= 0 { | 78 if c.MaxParallelACK <= 0 { |
77 c.MaxParallelACK = DefaultMaxParallelACK | 79 c.MaxParallelACK = DefaultMaxParallelACK |
78 } | 80 } |
79 | 81 |
80 » b := &AckBuffer{ | 82 » batchSize := c.Ack.AckBatchSize() |
81 » » cfg: &c, | 83 » b := AckBuffer{ |
82 » » ctx: ctx, | 84 » » cfg: &c, |
83 » » ackRequestC: make(chan []string), | 85 » » ctx: ctx, |
84 » » ackFinishedC: make(chan struct{}), | 86 » » ackC: make(chan string, batchSize), |
87 » » meterFinishedC: make(chan struct{}), | |
88 » » ackRequestC: make(chan []string), | |
89 » » ackFinishedC: make(chan struct{}), | |
85 } | 90 } |
86 » b.meter = meter.New(ctx, meter.Config{ | 91 |
87 » » Count: b.cfg.Ack.AckBatchSize(), | 92 » // Start a meter goroutine. This will buffer ACKs and send them at eithe r |
88 » » Delay: b.cfg.MaxBufferTime, | 93 » // capacity or timer intervals. |
89 » » Callback: b.meterCallback, | 94 » go func() { |
90 » }) | 95 » » defer close(b.ackRequestC) |
96 | |
97 » » buf := make([]string, 0, batchSize) | |
98 » » send := func() { | |
99 » » » if len(buf) > 0 { | |
100 » » » » ackIDs := make([]string, len(buf)) | |
101 » » » » copy(ackIDs, buf) | |
102 » » » » b.ackRequestC <- ackIDs | |
103 » » » » buf = buf[:0] | |
iannucci
2016/02/10 22:30:28
also reset timer
dnj (Google)
2016/02/11 01:26:55
Done.
| |
104 » » » } | |
105 » » } | |
106 | |
107 » » // When terminating, flush any remaining ACKs in the buffer. | |
108 » » defer send() | |
109 | |
110 » » // Ingest and dispatch ACKs. | |
111 » » timerRunning := false | |
112 » » timer := clock.NewTimer(ctx) | |
113 » » defer timer.Stop() | |
114 | |
115 » » for { | |
116 » » » select { | |
117 » » » case ack, ok := <-b.ackC: | |
118 » » » » if !ok { | |
119 » » » » » // Closing, exit loop. | |
120 » » » » » return | |
121 » » » » } | |
122 » » » » buf = append(buf, ack) | |
123 » » » » switch { | |
124 » » » » case len(buf) == cap(buf): | |
125 » » » » » send() | |
126 » » » » case !timerRunning: | |
127 » » » » » // Not at capacity yet, and our timer's not running, so start counting | |
128 » » » » » // down. | |
129 » » » » » timer.Reset(b.cfg.MaxBufferTime) | |
130 » » » » » timerRunning = true | |
131 » » » » } | |
132 | |
133 » » » » // (Testing) Notify when ACKs are received. | |
134 » » » » if b.ackReceivedC != nil { | |
135 » » » » » b.ackReceivedC <- ack | |
136 » » » » } | |
137 | |
138 » » » case <-timer.GetC(): | |
139 » » » » timerRunning = false | |
140 » » » » send() | |
141 » » » } | |
142 » » } | |
143 » }() | |
91 | 144 |
92 // Start our ACK loop. | 145 // Start our ACK loop. |
93 wg := sync.WaitGroup{} | |
94 go func() { | 146 go func() { |
95 defer close(b.ackFinishedC) | 147 defer close(b.ackFinishedC) |
96 | 148 |
97 // Allocate and populate a set of ACK tokens. This will be used as a | 149 // Allocate and populate a set of ACK tokens. This will be used as a |
98 // semaphore to control the number of parallel ACK requests. | 150 // semaphore to control the number of parallel ACK requests. |
99 sem := make(parallel.Semaphore, b.cfg.MaxParallelACK) | 151 sem := make(parallel.Semaphore, b.cfg.MaxParallelACK) |
100 for req := range b.ackRequestC { | 152 for req := range b.ackRequestC { |
101 req := req | 153 req := req |
102 | 154 |
103 // Take out an ACK token. | 155 // Take out an ACK token. |
104 sem.Lock() | 156 sem.Lock() |
105 wg.Add(1) | |
106 go func() { | 157 go func() { |
107 » » » » defer func() { | 158 » » » » defer sem.Unlock() |
108 » » » » » sem.Unlock() | |
109 » » » » » wg.Done() | |
110 » » » » }() | |
111 b.acknowledge(req) | 159 b.acknowledge(req) |
112 }() | 160 }() |
113 } | 161 } |
114 | 162 |
115 // Block until all ACK goroutines finish. | 163 // Block until all ACK goroutines finish. |
116 » » wg.Wait() | 164 » » sem.TakeAll() |
117 }() | 165 }() |
118 | 166 |
119 » return b | 167 » return &b |
120 } | 168 } |
121 | 169 |
122 // Ack enqueues a message's ACK ID for acknowledgement. | 170 // Ack enqueues a message's ACK ID for acknowledgement. |
123 func (b *AckBuffer) Ack(id string) { | 171 func (b *AckBuffer) Ack(id string) { |
124 » b.meter.AddWait(id) | 172 » b.ackC <- id |
125 } | 173 } |
126 | 174 |
127 // CloseAndFlush closes the AckBuffer, blocking until all pending ACKs are | 175 // CloseAndFlush closes the AckBuffer, blocking until all pending ACKs are |
128 // complete. | 176 // complete. |
129 func (b *AckBuffer) CloseAndFlush() { | 177 func (b *AckBuffer) CloseAndFlush() { |
130 » b.meter.Stop() | 178 » // Close our ackC. This will terminate our meter goroutine, which will |
179 » // terminate our ACK goroutine. | |
180 » close(b.ackC) | |
131 | 181 |
132 // Wait for ACK goroutine to terminate. | 182 // Wait for ACK goroutine to terminate. |
133 close(b.ackRequestC) | |
134 <-b.ackFinishedC | 183 <-b.ackFinishedC |
135 } | 184 } |
136 | 185 |
137 // meterCallback is the Meter callback that is invoked when a new batch of ACKs | |
138 // is encountered. | |
139 // | |
140 // This shouldn't block if possible, else the Meter will block. However, if | |
141 // ACK requests build up, this will block until they are finished. | |
142 func (b *AckBuffer) meterCallback(work []interface{}) { | |
143 ackIDs := make([]string, len(work)) | |
144 for idx, w := range work { | |
145 ackIDs[idx] = w.(string) | |
146 } | |
147 b.ackRequestC <- ackIDs | |
148 } | |
149 | |
150 // acknowledge acknowledges a set of IDs. | 186 // acknowledge acknowledges a set of IDs. |
151 // | 187 // |
152 // This method will discard the ACKs if they fail. | 188 // This method will discard the ACKs if they fail. |
153 func (b *AckBuffer) acknowledge(ackIDs []string) { | 189 func (b *AckBuffer) acknowledge(ackIDs []string) { |
154 if err := b.cfg.Ack.Ack(b.ctx, ackIDs...); err != nil { | 190 if err := b.cfg.Ack.Ack(b.ctx, ackIDs...); err != nil { |
155 log.Fields{ | 191 log.Fields{ |
156 log.ErrorKey: err, | 192 log.ErrorKey: err, |
157 "count": len(ackIDs), | 193 "count": len(ackIDs), |
158 }.Errorf(b.ctx, "Failed to ACK.") | 194 }.Errorf(b.ctx, "Failed to ACK.") |
159 if b.cfg.DiscardCallback != nil { | 195 if b.cfg.DiscardCallback != nil { |
160 b.cfg.DiscardCallback(ackIDs) | 196 b.cfg.DiscardCallback(ackIDs) |
161 } | 197 } |
162 } | 198 } |
163 } | 199 } |
OLD | NEW |