Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(74)

Side by Side Diff: common/tsmon/monitor/pubsub.go

Issue 1838303002: Use native Pub/Sub library primitives. (Closed) Base URL: https://github.com/luci/luci-go@logdog-go1.6
Patch Set: Use "Topic" instead of "NewTopic" ... don't want to create :) Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « common/tsmon/iface.go ('k') | server/cmd/logdog_collector/main.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package monitor 5 package monitor
6 6
7 import ( 7 import (
8 "net/http" 8 "net/http"
9 9
10 "github.com/golang/protobuf/proto" 10 "github.com/golang/protobuf/proto"
11 » "github.com/luci/luci-go/common/gcloud/pubsub" 11 » gcps "github.com/luci/luci-go/common/gcloud/pubsub"
12 "github.com/luci/luci-go/common/tsmon/types" 12 "github.com/luci/luci-go/common/tsmon/types"
13 "golang.org/x/net/context" 13 "golang.org/x/net/context"
14 "google.golang.org/cloud"
15 "google.golang.org/cloud/pubsub"
14 ) 16 )
15 17
16 // ClientFactory is a function that creates an HTTP client.
17 type ClientFactory func(ctx context.Context) (*http.Client, error)
18
19 type pubSubMonitor struct { 18 type pubSubMonitor struct {
20 » clientFactory ClientFactory 19 » topic *pubsub.TopicHandle
21 » topic pubsub.Topic
22 } 20 }
23 21
24 // NewPubsubMonitor returns a Monitor that sends metrics to the Cloud Pub/Sub 22 // NewPubsubMonitor returns a Monitor that sends metrics to the Cloud Pub/Sub
25 // API. 23 // API.
26 // 24 //
27 // The provided client should do implement sufficient authentication to send 25 // The provided client should do implement sufficient authentication to send
28 // Cloud Pub/Sub requests. 26 // Cloud Pub/Sub requests.
29 func NewPubsubMonitor(c ClientFactory, project string, topic string) (Monitor, e rror) { 27 func NewPubsubMonitor(ctx context.Context, client *http.Client, topic gcps.Topic ) (Monitor, error) {
28 » project, name := topic.Split()
29
30 » psClient, err := pubsub.NewClient(ctx, project, cloud.WithBaseHTTP(clien t))
31 » if err != nil {
32 » » return nil, err
33 » }
34
30 return &pubSubMonitor{ 35 return &pubSubMonitor{
31 » » clientFactory: c, 36 » » topic: psClient.Topic(name),
32 » » topic: pubsub.NewTopic(project, topic),
33 }, nil 37 }, nil
34 } 38 }
35 39
36 func (m *pubSubMonitor) ChunkSize() int { 40 func (m *pubSubMonitor) ChunkSize() int {
37 » return 1000 41 » return gcps.MaxPublishBatchSize
38 } 42 }
39 43
40 func (m *pubSubMonitor) Send(ctx context.Context, cells []types.Cell) error { 44 func (m *pubSubMonitor) Send(ctx context.Context, cells []types.Cell) error {
41 collection := SerializeCells(cells) 45 collection := SerializeCells(cells)
42 46
43 data, err := proto.Marshal(collection) 47 data, err := proto.Marshal(collection)
44 if err != nil { 48 if err != nil {
45 return err 49 return err
46 } 50 }
47 51
48 » client, err := m.clientFactory(ctx) 52 » _, err = m.topic.Publish(ctx, &pubsub.Message{Data: data})
49 » if err != nil {
50 » » return err
51 » }
52
53 » ps := pubsub.NewConnection(client)
54 » _, err = ps.Publish(ctx, m.topic, &pubsub.Message{Data: data})
55 return err 53 return err
56 } 54 }
OLDNEW
« no previous file with comments | « common/tsmon/iface.go ('k') | server/cmd/logdog_collector/main.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698