Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1315)

Side by Side Diff: common/tsmon/monitor/pubsub.go

Issue 1838303002: Use native Pub/Sub library primitives. (Closed) Base URL: https://github.com/luci/luci-go@logdog-go1.6
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package monitor 5 package monitor
6 6
7 import ( 7 import (
8 "net/http" 8 "net/http"
9 9
10 "github.com/golang/protobuf/proto" 10 "github.com/golang/protobuf/proto"
11 » "github.com/luci/luci-go/common/gcloud/pubsub" 11 » gcps "github.com/luci/luci-go/common/gcloud/pubsub"
12 "github.com/luci/luci-go/common/tsmon/types" 12 "github.com/luci/luci-go/common/tsmon/types"
13 "golang.org/x/net/context" 13 "golang.org/x/net/context"
14 "google.golang.org/cloud"
15 "google.golang.org/cloud/pubsub"
14 ) 16 )
15 17
16 // ClientFactory is a function that creates an HTTP client.
17 type ClientFactory func(ctx context.Context) (*http.Client, error)
18
19 type pubSubMonitor struct { 18 type pubSubMonitor struct {
20 » clientFactory ClientFactory 19 » topic *pubsub.TopicHandle
21 » topic pubsub.Topic
22 } 20 }
23 21
24 // NewPubsubMonitor returns a Monitor that sends metrics to the Cloud Pub/Sub 22 // NewPubsubMonitor returns a Monitor that sends metrics to the Cloud Pub/Sub
25 // API. 23 // API.
26 // 24 //
27 // The provided client should do implement sufficient authentication to send 25 // The provided client should do implement sufficient authentication to send
28 // Cloud Pub/Sub requests. 26 // Cloud Pub/Sub requests.
29 func NewPubsubMonitor(c ClientFactory, project string, topic string) (Monitor, e rror) { 27 func NewPubsubMonitor(ctx context.Context, client *http.Client, topic gcps.Topic ) (Monitor, error) {
28 » project, name := topic.Split()
29
30 » psClient, err := pubsub.NewClient(ctx, project, cloud.WithBaseHTTP(clien t))
31 » if err != nil {
32 » » return nil, err
33 » }
34
35 » psTopic, err := psClient.NewTopic(ctx, name)
36 » if err != nil {
37 » » return nil, err
38 » }
39
30 return &pubSubMonitor{ 40 return &pubSubMonitor{
31 » » clientFactory: c, 41 » » topic: psTopic,
32 » » topic: pubsub.NewTopic(project, topic),
33 }, nil 42 }, nil
34 } 43 }
35 44
36 func (m *pubSubMonitor) ChunkSize() int { 45 func (m *pubSubMonitor) ChunkSize() int {
37 » return 1000 46 » return gcps.MaxPublishBatchSize
38 } 47 }
39 48
40 func (m *pubSubMonitor) Send(ctx context.Context, cells []types.Cell) error { 49 func (m *pubSubMonitor) Send(ctx context.Context, cells []types.Cell) error {
41 collection := SerializeCells(cells) 50 collection := SerializeCells(cells)
42 51
43 data, err := proto.Marshal(collection) 52 data, err := proto.Marshal(collection)
44 if err != nil { 53 if err != nil {
45 return err 54 return err
46 } 55 }
47 56
48 » client, err := m.clientFactory(ctx) 57 » _, err = m.topic.Publish(ctx, &pubsub.Message{Data: data})
49 » if err != nil {
50 » » return err
51 » }
52
53 » ps := pubsub.NewConnection(client)
54 » _, err = ps.Publish(ctx, m.topic, &pubsub.Message{Data: data})
55 return err 58 return err
56 } 59 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698