| Index: common/tsmon/monitor/pubsub.go
|
| diff --git a/common/tsmon/monitor/pubsub.go b/common/tsmon/monitor/pubsub.go
|
| index 7fcd8cae832ffeae9fefe801c5efc7fcfd76249c..7299a48ca8a8d2105ca8c43276721ec3d34d1752 100644
|
| --- a/common/tsmon/monitor/pubsub.go
|
| +++ b/common/tsmon/monitor/pubsub.go
|
| @@ -8,17 +8,15 @@ import (
|
| "net/http"
|
|
|
| "github.com/golang/protobuf/proto"
|
| - "github.com/luci/luci-go/common/gcloud/pubsub"
|
| + gcps "github.com/luci/luci-go/common/gcloud/pubsub"
|
| "github.com/luci/luci-go/common/tsmon/types"
|
| "golang.org/x/net/context"
|
| + "google.golang.org/cloud"
|
| + "google.golang.org/cloud/pubsub"
|
| )
|
|
|
| -// ClientFactory is a function that creates an HTTP client.
|
| -type ClientFactory func(ctx context.Context) (*http.Client, error)
|
| -
|
| type pubSubMonitor struct {
|
| - clientFactory ClientFactory
|
| - topic pubsub.Topic
|
| + topic *pubsub.TopicHandle
|
| }
|
|
|
| // NewPubsubMonitor returns a Monitor that sends metrics to the Cloud Pub/Sub
|
| @@ -26,15 +24,21 @@ type pubSubMonitor struct {
|
| //
|
| // The provided client should do implement sufficient authentication to send
|
| // Cloud Pub/Sub requests.
|
| -func NewPubsubMonitor(c ClientFactory, project string, topic string) (Monitor, error) {
|
| +func NewPubsubMonitor(ctx context.Context, client *http.Client, topic gcps.Topic) (Monitor, error) {
|
| + project, name := topic.Split()
|
| +
|
| + psClient, err := pubsub.NewClient(ctx, project, cloud.WithBaseHTTP(client))
|
| + if err != nil {
|
| + return nil, err
|
| + }
|
| +
|
| return &pubSubMonitor{
|
| - clientFactory: c,
|
| - topic: pubsub.NewTopic(project, topic),
|
| + topic: psClient.Topic(name),
|
| }, nil
|
| }
|
|
|
| func (m *pubSubMonitor) ChunkSize() int {
|
| - return 1000
|
| + return gcps.MaxPublishBatchSize
|
| }
|
|
|
| func (m *pubSubMonitor) Send(ctx context.Context, cells []types.Cell) error {
|
| @@ -45,12 +49,6 @@ func (m *pubSubMonitor) Send(ctx context.Context, cells []types.Cell) error {
|
| return err
|
| }
|
|
|
| - client, err := m.clientFactory(ctx)
|
| - if err != nil {
|
| - return err
|
| - }
|
| -
|
| - ps := pubsub.NewConnection(client)
|
| - _, err = ps.Publish(ctx, m.topic, &pubsub.Message{Data: data})
|
| + _, err = m.topic.Publish(ctx, &pubsub.Message{Data: data})
|
| return err
|
| }
|
|
|