OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 // Package ackbuffer implements a Pub/Sub acknowledgement buffer capability. | 5 // Package ackbuffer implements a Pub/Sub acknowledgement buffer capability. |
6 // Pub/Sub ACKs will be collected and batched before being sent to Pub/Sub, | 6 // Pub/Sub ACKs will be collected and batched before being sent to Pub/Sub, |
7 // with specific deadline enforcement. | 7 // with specific deadline enforcement. |
8 package ackbuffer | 8 package ackbuffer |
9 | 9 |
10 import ( | 10 import ( |
11 "sync" | |
12 "time" | 11 "time" |
13 | 12 |
| 13 "github.com/luci/luci-go/common/clock" |
14 "github.com/luci/luci-go/common/gcloud/pubsub" | 14 "github.com/luci/luci-go/common/gcloud/pubsub" |
15 log "github.com/luci/luci-go/common/logging" | 15 log "github.com/luci/luci-go/common/logging" |
16 "github.com/luci/luci-go/common/meter" | |
17 "github.com/luci/luci-go/common/parallel" | 16 "github.com/luci/luci-go/common/parallel" |
18 "golang.org/x/net/context" | 17 "golang.org/x/net/context" |
19 ) | 18 ) |
20 | 19 |
21 const ( | 20 const ( |
22 // DefaultMaxBufferTime is the default amount of time that an ACK will r
emain | 21 // DefaultMaxBufferTime is the default amount of time that an ACK will r
emain |
23 // buffered before being sent. | 22 // buffered before being sent. |
24 // | 23 // |
25 // We base this off the default acknowledgement delay. | 24 // We base this off the default acknowledgement delay. |
26 DefaultMaxBufferTime = (pubsub.DefaultMaxAckDelay / 6) | 25 DefaultMaxBufferTime = (pubsub.DefaultMaxAckDelay / 6) |
(...skipping 27 matching lines...) Expand all Loading... |
54 // AckBuffer buffers Pub/Sub ACK requests together and sends them in batches. | 53 // AckBuffer buffers Pub/Sub ACK requests together and sends them in batches. |
55 // If a batch of ACKs fails to send (after retries), it will be discarded with | 54 // If a batch of ACKs fails to send (after retries), it will be discarded with |
56 // an optional callback. | 55 // an optional callback. |
57 // | 56 // |
58 // After ACKs are enqueued, the AckBuffer should be flushed via CloseAndFlush | 57 // After ACKs are enqueued, the AckBuffer should be flushed via CloseAndFlush |
59 // to ensure that all ACKs have had a time to be sent. | 58 // to ensure that all ACKs have had a time to be sent. |
60 type AckBuffer struct { | 59 type AckBuffer struct { |
61 cfg *Config | 60 cfg *Config |
62 ctx context.Context | 61 ctx context.Context |
63 | 62 |
64 » meter meter.Meter | 63 » meterFinishedC chan struct{} |
65 | 64 |
| 65 ackC chan string // Used to send ACK requests. |
66 ackRequestC chan []string // Used to submit ACK requests to ACK gorouti
ne. | 66 ackRequestC chan []string // Used to submit ACK requests to ACK gorouti
ne. |
67 ackFinishedC chan struct{} // Closed when ACK goroutine is finished. | 67 ackFinishedC chan struct{} // Closed when ACK goroutine is finished. |
| 68 |
| 69 ackReceivedC chan string // (Testing) if not nil, send received ACKs. |
68 } | 70 } |
69 | 71 |
70 // New instantiates a new AckBuffer. The returned AckBuffer must have its | 72 // New instantiates a new AckBuffer. The returned AckBuffer must have its |
71 // CloseAndFlush method invoked before terminating, else data loss may occur. | 73 // CloseAndFlush method invoked before terminating, else data loss may occur. |
72 func New(ctx context.Context, c Config) *AckBuffer { | 74 func New(ctx context.Context, c Config) *AckBuffer { |
73 if c.MaxBufferTime <= 0 { | 75 if c.MaxBufferTime <= 0 { |
74 c.MaxBufferTime = DefaultMaxBufferTime | 76 c.MaxBufferTime = DefaultMaxBufferTime |
75 } | 77 } |
76 if c.MaxParallelACK <= 0 { | 78 if c.MaxParallelACK <= 0 { |
77 c.MaxParallelACK = DefaultMaxParallelACK | 79 c.MaxParallelACK = DefaultMaxParallelACK |
78 } | 80 } |
79 | 81 |
80 » b := &AckBuffer{ | 82 » batchSize := c.Ack.AckBatchSize() |
81 » » cfg: &c, | 83 » b := AckBuffer{ |
82 » » ctx: ctx, | 84 » » cfg: &c, |
83 » » ackRequestC: make(chan []string), | 85 » » ctx: ctx, |
84 » » ackFinishedC: make(chan struct{}), | 86 » » ackC: make(chan string, batchSize), |
| 87 » » meterFinishedC: make(chan struct{}), |
| 88 » » ackRequestC: make(chan []string), |
| 89 » » ackFinishedC: make(chan struct{}), |
85 } | 90 } |
86 » b.meter = meter.New(ctx, meter.Config{ | 91 |
87 » » Count: b.cfg.Ack.AckBatchSize(), | 92 » // Start a meter goroutine. This will buffer ACKs and send them at eithe
r |
88 » » Delay: b.cfg.MaxBufferTime, | 93 » // capacity or timer intervals. |
89 » » Callback: b.meterCallback, | 94 » go func() { |
90 » }) | 95 » » defer close(b.ackRequestC) |
| 96 |
| 97 » » // Create a timer. This will tick each time the buffer is empty
and get a |
| 98 » » // new ACK to track the longest time we've been buffering an ACK
. We will |
| 99 » » // reset the timer each time we clear the buffer. |
| 100 » » timerRunning := false |
| 101 » » timer := clock.NewTimer(ctx) |
| 102 » » defer timer.Stop() |
| 103 |
| 104 » » buf := make([]string, 0, batchSize) |
| 105 » » send := func() { |
| 106 » » » if len(buf) > 0 { |
| 107 » » » » ackIDs := make([]string, len(buf)) |
| 108 » » » » copy(ackIDs, buf) |
| 109 » » » » b.ackRequestC <- ackIDs |
| 110 » » » » buf = buf[:0] |
| 111 » » » } |
| 112 |
| 113 » » » timer.Stop() |
| 114 » » » timerRunning = false |
| 115 » » } |
| 116 |
| 117 » » // When terminating, flush any remaining ACKs in the buffer. |
| 118 » » defer send() |
| 119 |
| 120 » » // Ingest and dispatch ACKs. |
| 121 » » for { |
| 122 » » » select { |
| 123 » » » case ack, ok := <-b.ackC: |
| 124 » » » » if !ok { |
| 125 » » » » » // Closing, exit loop. |
| 126 » » » » » return |
| 127 » » » » } |
| 128 » » » » buf = append(buf, ack) |
| 129 » » » » switch { |
| 130 » » » » case len(buf) == cap(buf): |
| 131 » » » » » send() |
| 132 » » » » case !timerRunning: |
| 133 » » » » » // Not at capacity yet, and our timer's
not running, so start counting |
| 134 » » » » » // down. |
| 135 » » » » » timer.Reset(b.cfg.MaxBufferTime) |
| 136 » » » » » timerRunning = true |
| 137 » » » » } |
| 138 |
| 139 » » » » // (Testing) Notify when ACKs are received. |
| 140 » » » » if b.ackReceivedC != nil { |
| 141 » » » » » b.ackReceivedC <- ack |
| 142 » » » » } |
| 143 |
| 144 » » » case <-timer.GetC(): |
| 145 » » » » send() |
| 146 » » » } |
| 147 » » } |
| 148 » }() |
91 | 149 |
92 // Start our ACK loop. | 150 // Start our ACK loop. |
93 wg := sync.WaitGroup{} | |
94 go func() { | 151 go func() { |
95 defer close(b.ackFinishedC) | 152 defer close(b.ackFinishedC) |
96 | 153 |
97 // Allocate and populate a set of ACK tokens. This will be used
as a | 154 // Allocate and populate a set of ACK tokens. This will be used
as a |
98 // semaphore to control the number of parallel ACK requests. | 155 // semaphore to control the number of parallel ACK requests. |
99 sem := make(parallel.Semaphore, b.cfg.MaxParallelACK) | 156 sem := make(parallel.Semaphore, b.cfg.MaxParallelACK) |
100 for req := range b.ackRequestC { | 157 for req := range b.ackRequestC { |
101 req := req | 158 req := req |
102 | 159 |
103 // Take out an ACK token. | 160 // Take out an ACK token. |
104 sem.Lock() | 161 sem.Lock() |
105 wg.Add(1) | |
106 go func() { | 162 go func() { |
107 » » » » defer func() { | 163 » » » » defer sem.Unlock() |
108 » » » » » sem.Unlock() | |
109 » » » » » wg.Done() | |
110 » » » » }() | |
111 b.acknowledge(req) | 164 b.acknowledge(req) |
112 }() | 165 }() |
113 } | 166 } |
114 | 167 |
115 // Block until all ACK goroutines finish. | 168 // Block until all ACK goroutines finish. |
116 » » wg.Wait() | 169 » » sem.TakeAll() |
117 }() | 170 }() |
118 | 171 |
119 » return b | 172 » return &b |
120 } | 173 } |
121 | 174 |
122 // Ack enqueues a message's ACK ID for acknowledgement. | 175 // Ack enqueues a message's ACK ID for acknowledgement. |
123 func (b *AckBuffer) Ack(id string) { | 176 func (b *AckBuffer) Ack(id string) { |
124 » b.meter.AddWait(id) | 177 » b.ackC <- id |
125 } | 178 } |
126 | 179 |
127 // CloseAndFlush closes the AckBuffer, blocking until all pending ACKs are | 180 // CloseAndFlush closes the AckBuffer, blocking until all pending ACKs are |
128 // complete. | 181 // complete. |
129 func (b *AckBuffer) CloseAndFlush() { | 182 func (b *AckBuffer) CloseAndFlush() { |
130 » b.meter.Stop() | 183 » // Close our ackC. This will terminate our meter goroutine, which will |
| 184 » // terminate our ACK goroutine. |
| 185 » close(b.ackC) |
131 | 186 |
132 // Wait for ACK goroutine to terminate. | 187 // Wait for ACK goroutine to terminate. |
133 close(b.ackRequestC) | |
134 <-b.ackFinishedC | 188 <-b.ackFinishedC |
135 } | 189 } |
136 | 190 |
137 // meterCallback is the Meter callback that is invoked when a new batch of ACKs | |
138 // is encountered. | |
139 // | |
140 // This shouldn't block if possible, else the Meter will block. However, if | |
141 // ACK requests build up, this will block until they are finished. | |
142 func (b *AckBuffer) meterCallback(work []interface{}) { | |
143 ackIDs := make([]string, len(work)) | |
144 for idx, w := range work { | |
145 ackIDs[idx] = w.(string) | |
146 } | |
147 b.ackRequestC <- ackIDs | |
148 } | |
149 | |
150 // acknowledge acknowledges a set of IDs. | 191 // acknowledge acknowledges a set of IDs. |
151 // | 192 // |
152 // This method will discard the ACKs if they fail. | 193 // This method will discard the ACKs if they fail. |
153 func (b *AckBuffer) acknowledge(ackIDs []string) { | 194 func (b *AckBuffer) acknowledge(ackIDs []string) { |
154 if err := b.cfg.Ack.Ack(b.ctx, ackIDs...); err != nil { | 195 if err := b.cfg.Ack.Ack(b.ctx, ackIDs...); err != nil { |
155 log.Fields{ | 196 log.Fields{ |
156 log.ErrorKey: err, | 197 log.ErrorKey: err, |
157 "count": len(ackIDs), | 198 "count": len(ackIDs), |
158 }.Errorf(b.ctx, "Failed to ACK.") | 199 }.Errorf(b.ctx, "Failed to ACK.") |
159 if b.cfg.DiscardCallback != nil { | 200 if b.cfg.DiscardCallback != nil { |
160 b.cfg.DiscardCallback(ackIDs) | 201 b.cfg.DiscardCallback(ackIDs) |
161 } | 202 } |
162 } | 203 } |
163 } | 204 } |
OLD | NEW |