Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(72)

Side by Side Diff: common/gcloud/gcps/ackbuffer/ackbuffer.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 // Package ackbuffer implements a Pub/Sub acknowledgement buffer capability. 5 // Package ackbuffer implements a Pub/Sub acknowledgement buffer capability.
6 // Pub/Sub ACKs will be collected and batched before being sent to Pub/Sub, 6 // Pub/Sub ACKs will be collected and batched before being sent to Pub/Sub,
7 // with specific deadline enforcement. 7 // with specific deadline enforcement.
8 package ackbuffer 8 package ackbuffer
9 9
10 import ( 10 import (
11 "sync" 11 "sync"
12 "time" 12 "time"
13 13
14 "github.com/luci/luci-go/common/gcloud/gcps" 14 "github.com/luci/luci-go/common/gcloud/gcps"
15 log "github.com/luci/luci-go/common/logging" 15 log "github.com/luci/luci-go/common/logging"
16 "github.com/luci/luci-go/common/meter" 16 "github.com/luci/luci-go/common/meter"
17 "github.com/luci/luci-go/common/parallel" 17 "github.com/luci/luci-go/common/parallel"
18 "github.com/luci/luci-go/common/retry"
19 "golang.org/x/net/context" 18 "golang.org/x/net/context"
20 ) 19 )
21 20
22 const ( 21 const (
23 // DefaultMaxBufferTime is the default amount of time that an ACK will r emain 22 // DefaultMaxBufferTime is the default amount of time that an ACK will r emain
24 // buffered before being sent. 23 // buffered before being sent.
25 // 24 //
26 // We base this off the default acknowledgement delay. 25 // We base this off the default acknowledgement delay.
27 DefaultMaxBufferTime = (gcps.DefaultMaxAckDelay / 6) 26 DefaultMaxBufferTime = (gcps.DefaultMaxAckDelay / 6)
28 27
29 // DefaultMaxParallelACK is the default maximum number of simultaneous 28 // DefaultMaxParallelACK is the default maximum number of simultaneous
30 // parallel ACK request goroutines. 29 // parallel ACK request goroutines.
31 DefaultMaxParallelACK = 16 30 DefaultMaxParallelACK = 16
32 ) 31 )
33 32
34 // DiscardCallback is a callback method that will be invoked if ACK IDs must 33 // DiscardCallback is a callback method that will be invoked if ACK IDs must
35 // be discarded. 34 // be discarded.
36 type DiscardCallback func(ackIDs []string) 35 type DiscardCallback func(ackIDs []string)
37 36
38 // PubSubACK sends ACKs to a Pub/Sub interface.
39 //
40 // gcps.PubSub naturally implements this interface.
41 type PubSubACK interface {
42 // Ack acknowledges one or more Pub/Sub message ACK IDs.
43 Ack(s gcps.Subscription, ackIDs ...string) error
44 }
45
46 // Config is a set of configuration parameters for an AckBuffer. 37 // Config is a set of configuration parameters for an AckBuffer.
47 type Config struct { 38 type Config struct {
48 » // PubSub is the Pub/Sub instance to ACK with. 39 » // Ack is the Pub/Sub instance to ACK with.
49 » PubSub PubSubACK 40 » Ack Acknowledger
50
51 » // Subscription is the name of the Pub/Sub subscription to ACK.
52 » Subscription gcps.Subscription
53 41
54 // MaxBufferTime is the maximum amount of time to buffer an ACK before s ending it. 42 // MaxBufferTime is the maximum amount of time to buffer an ACK before s ending it.
55 MaxBufferTime time.Duration 43 MaxBufferTime time.Duration
56 44
57 // The maximum number of parallel ACK requests that can be simultaneousl y 45 // The maximum number of parallel ACK requests that can be simultaneousl y
58 // open. If zero, DefaultMaxParallelACK will be used. 46 // open. If zero, DefaultMaxParallelACK will be used.
59 MaxParallelACK int 47 MaxParallelACK int
60 48
61 // DiscardCallback is invoked when a series of ACK IDs is discarded afte r 49 // DiscardCallback is invoked when a series of ACK IDs is discarded afte r
62 // repeated failures to ACK. If this is nil, no callback will be invoked . 50 // repeated failures to ACK. If this is nil, no callback will be invoked .
(...skipping 19 matching lines...) Expand all
82 // New instantiates a new AckBuffer. The returned AckBuffer must have its 70 // New instantiates a new AckBuffer. The returned AckBuffer must have its
83 // CloseAndFlush method invoked before terminating, else data loss may occur. 71 // CloseAndFlush method invoked before terminating, else data loss may occur.
84 func New(ctx context.Context, c Config) *AckBuffer { 72 func New(ctx context.Context, c Config) *AckBuffer {
85 if c.MaxBufferTime <= 0 { 73 if c.MaxBufferTime <= 0 {
86 c.MaxBufferTime = DefaultMaxBufferTime 74 c.MaxBufferTime = DefaultMaxBufferTime
87 } 75 }
88 if c.MaxParallelACK <= 0 { 76 if c.MaxParallelACK <= 0 {
89 c.MaxParallelACK = DefaultMaxParallelACK 77 c.MaxParallelACK = DefaultMaxParallelACK
90 } 78 }
91 79
92 ctx = log.SetField(ctx, "subscription", c.Subscription)
93
94 b := &AckBuffer{ 80 b := &AckBuffer{
95 cfg: &c, 81 cfg: &c,
96 ctx: ctx, 82 ctx: ctx,
97 ackRequestC: make(chan []string), 83 ackRequestC: make(chan []string),
98 ackFinishedC: make(chan struct{}), 84 ackFinishedC: make(chan struct{}),
99 } 85 }
100 b.meter = meter.New(ctx, meter.Config{ 86 b.meter = meter.New(ctx, meter.Config{
101 » » Count: gcps.MaxMessageAckPerRequest, 87 » » Count: b.cfg.Ack.AckBatchSize(),
102 Delay: b.cfg.MaxBufferTime, 88 Delay: b.cfg.MaxBufferTime,
103 Callback: b.meterCallback, 89 Callback: b.meterCallback,
104 }) 90 })
105 91
106 // Start our ACK loop. 92 // Start our ACK loop.
107 wg := sync.WaitGroup{} 93 wg := sync.WaitGroup{}
108 go func() { 94 go func() {
109 defer close(b.ackFinishedC) 95 defer close(b.ackFinishedC)
110 96
111 // Allocate and populate a set of ACK tokens. This will be used as a 97 // Allocate and populate a set of ACK tokens. This will be used as a
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
154 // This shouldn't block if possible, else the Meter will block. However, if 140 // This shouldn't block if possible, else the Meter will block. However, if
155 // ACK requests build up, this will block until they are finished. 141 // ACK requests build up, this will block until they are finished.
156 func (b *AckBuffer) meterCallback(work []interface{}) { 142 func (b *AckBuffer) meterCallback(work []interface{}) {
157 ackIDs := make([]string, len(work)) 143 ackIDs := make([]string, len(work))
158 for idx, w := range work { 144 for idx, w := range work {
159 ackIDs[idx] = w.(string) 145 ackIDs[idx] = w.(string)
160 } 146 }
161 b.ackRequestC <- ackIDs 147 b.ackRequestC <- ackIDs
162 } 148 }
163 149
164 // acknowledge acknowledges a set of IDs. It will retry on transient errors. 150 // acknowledge acknowledges a set of IDs.
165 // 151 //
166 // This method will discard the ACKs if they fail. 152 // This method will discard the ACKs if they fail.
167 func (b *AckBuffer) acknowledge(ackIDs []string) { 153 func (b *AckBuffer) acknowledge(ackIDs []string) {
168 » err := retry.Retry(b.ctx, retry.TransientOnly(retry.Default()), func() e rror { 154 » if err := b.cfg.Ack.Ack(b.ctx, ackIDs...); err != nil {
dnj (Google) 2016/01/21 04:36:24 User must supply a retry-enabled Pub/Sub instance
169 » » return b.cfg.PubSub.Ack(b.cfg.Subscription, ackIDs...)
170 » }, func(err error, delay time.Duration) {
171 » » log.Fields{
172 » » » log.ErrorKey: err,
173 » » » "delay": delay,
174 » » }.Warningf(b.ctx, "Error sending ACK; retrying.")
175 » })
176 » if err != nil {
177 log.Fields{ 155 log.Fields{
178 log.ErrorKey: err, 156 log.ErrorKey: err,
179 "count": len(ackIDs), 157 "count": len(ackIDs),
180 }.Errorf(b.ctx, "Failed to ACK.") 158 }.Errorf(b.ctx, "Failed to ACK.")
181 if b.cfg.DiscardCallback != nil { 159 if b.cfg.DiscardCallback != nil {
182 b.cfg.DiscardCallback(ackIDs) 160 b.cfg.DiscardCallback(ackIDs)
183 } 161 }
184 } 162 }
185 } 163 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698