| Index: common/gcloud/pubsub/subscriber/source.go
|
| diff --git a/common/gcloud/pubsub/subscriber/source.go b/common/gcloud/pubsub/subscriber/source.go
|
| index 6bb4020c7d69275310a007f31bfde632171a5347..e2212fd6a1ab6d232c62613c84bead1e3a09f824 100644
|
| --- a/common/gcloud/pubsub/subscriber/source.go
|
| +++ b/common/gcloud/pubsub/subscriber/source.go
|
| @@ -11,33 +11,26 @@ import (
|
|
|
| // Source is used to pull Pub/Sub messages in batches.
|
| type Source interface {
|
| - // Pull retrieves a new batch of Pub/Sub messages to process.
|
| - Pull(context.Context) ([]*pubsub.Message, error)
|
| + // Pull retrieves up to the specified number of of Pub/Sub messages to
|
| + // process.
|
| + Pull(context.Context, int) ([]*pubsub.Message, error)
|
| }
|
|
|
| // PubSubSource is a Source implementation built on top of a pubsub.PubSub.
|
| type pubSubSource struct {
|
| - ps pubsub.Connection
|
| - sub pubsub.Subscription
|
| - batch int
|
| + ps pubsub.Connection
|
| + sub pubsub.Subscription
|
| }
|
|
|
| // NewSource generates a new Source by wrapping a pubsub.Connection
|
| // implementation. This Source is bound to a single subscription.
|
| -//
|
| -// If the supplied batch size is <= 0, the maximum allowed Pub/Sub batch size
|
| -// will be used.
|
| -func NewSource(ps pubsub.Connection, s pubsub.Subscription, batch int) Source {
|
| - if batch <= 0 {
|
| - batch = pubsub.MaxSubscriptionPullSize
|
| - }
|
| +func NewSource(ps pubsub.Connection, s pubsub.Subscription) Source {
|
| return &pubSubSource{
|
| - ps: ps,
|
| - sub: s,
|
| - batch: batch,
|
| + ps: ps,
|
| + sub: s,
|
| }
|
| }
|
|
|
| -func (s *pubSubSource) Pull(c context.Context) (msgs []*pubsub.Message, err error) {
|
| - return s.ps.Pull(c, s.sub, s.batch)
|
| +func (s *pubSubSource) Pull(c context.Context, batchSize int) (msgs []*pubsub.Message, err error) {
|
| + return s.ps.Pull(c, s.sub, batchSize)
|
| }
|
|
|