Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(93)

Unified Diff: common/gcloud/gcps/ackbuffer/ackbuffer.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: common/gcloud/gcps/ackbuffer/ackbuffer.go
diff --git a/common/gcloud/gcps/ackbuffer/ackbuffer.go b/common/gcloud/gcps/ackbuffer/ackbuffer.go
index bbb566ff65aeb77b816ec5152f5c492501a81e62..035544240762d0583aff4c3f1b159e64f35b406f 100644
--- a/common/gcloud/gcps/ackbuffer/ackbuffer.go
+++ b/common/gcloud/gcps/ackbuffer/ackbuffer.go
@@ -15,7 +15,6 @@ import (
log "github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/common/meter"
"github.com/luci/luci-go/common/parallel"
- "github.com/luci/luci-go/common/retry"
"golang.org/x/net/context"
)
@@ -35,21 +34,10 @@ const (
// be discarded.
type DiscardCallback func(ackIDs []string)
-// PubSubACK sends ACKs to a Pub/Sub interface.
-//
-// gcps.PubSub naturally implements this interface.
-type PubSubACK interface {
- // Ack acknowledges one or more Pub/Sub message ACK IDs.
- Ack(s gcps.Subscription, ackIDs ...string) error
-}
-
// Config is a set of configuration parameters for an AckBuffer.
type Config struct {
- // PubSub is the Pub/Sub instance to ACK with.
- PubSub PubSubACK
-
- // Subscription is the name of the Pub/Sub subscription to ACK.
- Subscription gcps.Subscription
+ // Ack is the Pub/Sub instance to ACK with.
+ Ack Acknowledger
// MaxBufferTime is the maximum amount of time to buffer an ACK before sending it.
MaxBufferTime time.Duration
@@ -89,8 +77,6 @@ func New(ctx context.Context, c Config) *AckBuffer {
c.MaxParallelACK = DefaultMaxParallelACK
}
- ctx = log.SetField(ctx, "subscription", c.Subscription)
-
b := &AckBuffer{
cfg: &c,
ctx: ctx,
@@ -98,7 +84,7 @@ func New(ctx context.Context, c Config) *AckBuffer {
ackFinishedC: make(chan struct{}),
}
b.meter = meter.New(ctx, meter.Config{
- Count: gcps.MaxMessageAckPerRequest,
+ Count: b.cfg.Ack.AckBatchSize(),
Delay: b.cfg.MaxBufferTime,
Callback: b.meterCallback,
})
@@ -161,19 +147,11 @@ func (b *AckBuffer) meterCallback(work []interface{}) {
b.ackRequestC <- ackIDs
}
-// acknowledge acknowledges a set of IDs. It will retry on transient errors.
+// acknowledge acknowledges a set of IDs.
//
// This method will discard the ACKs if they fail.
func (b *AckBuffer) acknowledge(ackIDs []string) {
- err := retry.Retry(b.ctx, retry.TransientOnly(retry.Default()), func() error {
- return b.cfg.PubSub.Ack(b.cfg.Subscription, ackIDs...)
- }, func(err error, delay time.Duration) {
- log.Fields{
- log.ErrorKey: err,
- "delay": delay,
- }.Warningf(b.ctx, "Error sending ACK; retrying.")
- })
- if err != nil {
+ if err := b.cfg.Ack.Ack(b.ctx, ackIDs...); err != nil {
dnj (Google) 2016/01/21 04:36:24 User must supply a retry-enabled Pub/Sub instance
log.Fields{
log.ErrorKey: err,
"count": len(ackIDs),

Powered by Google App Engine
This is Rietveld 408576698