Chromium Code Reviews| Index: common/gcloud/gcps/ackbuffer/ack.go |
| diff --git a/common/gcloud/gcps/ackbuffer/ack.go b/common/gcloud/gcps/ackbuffer/ack.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..384704ed2da26e5c1c83507bab468fb3ca0d3439 |
| --- /dev/null |
| +++ b/common/gcloud/gcps/ackbuffer/ack.go |
| @@ -0,0 +1,50 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package ackbuffer |
| + |
| +import ( |
| + "github.com/luci/luci-go/common/gcloud/gcps" |
| + "golang.org/x/net/context" |
| +) |
| + |
| +// Acknowledger sends ACKs to a Pub/Sub interface. |
| +// |
| +// gcps.PubSub naturally implements this interface. |
| +type Acknowledger interface { |
|
dnj (Google)
2016/01/21 04:36:24
These changes allow better interoperability betwee
|
| + // Ack acknowledges one or more Pub/Sub message ACK IDs. |
| + Ack(ctx context.Context, ackIDs ...string) error |
| + |
| + // AckBatchSize returns the maximum number of ACKs that can be sent at a time. |
| + AckBatchSize() int |
| +} |
| + |
| +type gcpsACK struct { |
| + ps gcps.PubSub |
| + sub gcps.Subscription |
| + batch int |
| +} |
| + |
| +// NewACK creates a Acknowledger instance from a gcps.PubSub implementation. |
| +// |
| +// If batch is <= 0, the maximum ACK batch size will be used. |
| +func NewACK(ps gcps.PubSub, s gcps.Subscription, batch int) Acknowledger { |
| + if batch <= 0 { |
| + batch = gcps.MaxMessageAckPerRequest |
| + } |
| + |
| + return &gcpsACK{ |
| + ps: ps, |
| + sub: s, |
| + batch: batch, |
| + } |
| +} |
| + |
| +func (a *gcpsACK) Ack(c context.Context, ackIDs ...string) error { |
| + return a.ps.Ack(c, a.sub, ackIDs...) |
| +} |
| + |
| +func (a *gcpsACK) AckBatchSize() int { |
| + return a.batch |
| +} |