Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(578)

Side by Side Diff: common/gcloud/pubsub/subscriber/source.go

Issue 1838803002: LogDog: BigTable batching schema. (Closed) Base URL: https://github.com/luci/luci-go@recordio-split
Patch Set: Minor comments and quality of code tweaks. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package subscriber 5 package subscriber
6 6
7 import ( 7 import (
8 "github.com/luci/luci-go/common/gcloud/pubsub" 8 "github.com/luci/luci-go/common/gcloud/pubsub"
9 "golang.org/x/net/context" 9 "golang.org/x/net/context"
10 ) 10 )
11 11
12 // Source is used to pull Pub/Sub messages in batches. 12 // Source is used to pull Pub/Sub messages in batches.
13 type Source interface { 13 type Source interface {
14 » // Pull retrieves a new batch of Pub/Sub messages to process. 14 » // Pull retrieves up to the specified number of of Pub/Sub messages to
15 » Pull(context.Context) ([]*pubsub.Message, error) 15 » // process.
16 » Pull(context.Context, int) ([]*pubsub.Message, error)
16 } 17 }
17 18
18 // PubSubSource is a Source implementation built on top of a pubsub.PubSub. 19 // PubSubSource is a Source implementation built on top of a pubsub.PubSub.
19 type pubSubSource struct { 20 type pubSubSource struct {
20 » ps pubsub.Connection 21 » ps pubsub.Connection
21 » sub pubsub.Subscription 22 » sub pubsub.Subscription
22 » batch int
23 } 23 }
24 24
25 // NewSource generates a new Source by wrapping a pubsub.Connection 25 // NewSource generates a new Source by wrapping a pubsub.Connection
26 // implementation. This Source is bound to a single subscription. 26 // implementation. This Source is bound to a single subscription.
27 // 27 func NewSource(ps pubsub.Connection, s pubsub.Subscription) Source {
28 // If the supplied batch size is <= 0, the maximum allowed Pub/Sub batch size
29 // will be used.
30 func NewSource(ps pubsub.Connection, s pubsub.Subscription, batch int) Source {
31 » if batch <= 0 {
32 » » batch = pubsub.MaxSubscriptionPullSize
33 » }
34 return &pubSubSource{ 28 return &pubSubSource{
35 » » ps: ps, 29 » » ps: ps,
36 » » sub: s, 30 » » sub: s,
37 » » batch: batch,
38 } 31 }
39 } 32 }
40 33
41 func (s *pubSubSource) Pull(c context.Context) (msgs []*pubsub.Message, err erro r) { 34 func (s *pubSubSource) Pull(c context.Context, batchSize int) (msgs []*pubsub.Me ssage, err error) {
42 » return s.ps.Pull(c, s.sub, s.batch) 35 » return s.ps.Pull(c, s.sub, batchSize)
43 } 36 }
OLDNEW
« no previous file with comments | « appengine/logdog/coordinator/endpoints/logs/get_test.go ('k') | common/gcloud/pubsub/subscriber/subscriber.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698