Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(340)

Side by Side Diff: common/gcloud/pubsub/publisher.go

Issue 2989333002: [logdog] Replace Tumble with push queues. (Closed)
Patch Set: comments Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | common/gcloud/pubsub/topic.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2017 The LUCI Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package pubsub
16
17 import (
18 "cloud.google.com/go/pubsub"
19 vkit "cloud.google.com/go/pubsub/apiv1"
20 pb "google.golang.org/genproto/googleapis/pubsub/v1"
21
22 "golang.org/x/net/context"
23 )
24
25 // Publisher is a generic interface to something that can publish Pub/Sub
26 // messages.
27 //
28 // A Publisher should be Closed when finished with it.
29 type Publisher interface {
30 Publish(c context.Context, msgs ...*pubsub.Message) ([]string, error)
31 Close() error
32 }
33
34 // UnbufferedPublisher directly instantiates a Pub/Sub client and publishes a
35 // message to it.
36 //
37 // The standard Pub/Sub library has several issues, especially when used from
38 // AppEngine:
39 // - It uses an empty Context, discarding AppEngine context.
40 // - It uses a buffer, which expects a lifecycle beyond that of a simple
41 // AppEngine Request.
42 type UnbufferedPublisher struct {
43 // Topic is the name of the Topic to publish to.
44 Topic Topic
45
46 // Client is the Pub/Sub publisher client to use. Client will be closed when
47 // this UnbufferedPublisher is closed.
48 Client *vkit.PublisherClient
49 }
50
51 var _ Publisher = (*UnbufferedPublisher)(nil)
52
53 // Publish publishes a message immediately, blocking until it completes.
54 //
55 // "c" must be an AppEngine context.
56 func (up *UnbufferedPublisher) Publish(c context.Context, msgs ...*pubsub.Messag e) ([]string, error) {
57 if len(msgs) == 0 {
58 return nil, nil
59 }
60
61 messages := make([]*pb.PubsubMessage, len(msgs))
62 for i, msg := range msgs {
63 messages[i] = &pb.PubsubMessage{
64 Data: msg.Data,
65 Attributes: msg.Attributes,
66 }
67 }
68
69 resp, err := up.Client.Publish(c, &pb.PublishRequest{
70 Topic: string(up.Topic),
71 Messages: messages,
72 })
73 if err != nil {
74 return nil, err
75 }
76 return resp.MessageIds, nil
77 }
78
79 // Close closes the UnbufferedPublisher, notably its Client.
80 func (up *UnbufferedPublisher) Close() error { return up.Client.Close() }
OLDNEW
« no previous file with comments | « no previous file | common/gcloud/pubsub/topic.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698