| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2017 The LUCI Authors. |
| 2 // |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 // you may not use this file except in compliance with the License. |
| 5 // You may obtain a copy of the License at |
| 6 // |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 // |
| 9 // Unless required by applicable law or agreed to in writing, software |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 // See the License for the specific language governing permissions and |
| 13 // limitations under the License. |
| 14 |
| 15 package pubsub |
| 16 |
| 17 import ( |
| 18 "cloud.google.com/go/pubsub" |
| 19 vkit "cloud.google.com/go/pubsub/apiv1" |
| 20 pb "google.golang.org/genproto/googleapis/pubsub/v1" |
| 21 |
| 22 "golang.org/x/net/context" |
| 23 ) |
| 24 |
| 25 // Publisher is a generic interface to something that can publish Pub/Sub |
| 26 // messages. |
| 27 // |
| 28 // A Publisher should be Closed when finished with it. |
| 29 type Publisher interface { |
| 30 Publish(c context.Context, msgs ...*pubsub.Message) ([]string, error) |
| 31 Close() error |
| 32 } |
| 33 |
| 34 // UnbufferedPublisher directly instantiates a Pub/Sub client and publishes a |
| 35 // message to it. |
| 36 // |
| 37 // The standard Pub/Sub library has several issues, especially when used from |
| 38 // AppEngine: |
| 39 // - It uses an empty Context, discarding AppEngine context. |
| 40 // - It uses a buffer, which expects a lifecycle beyond that of a simple |
| 41 // AppEngine Request. |
| 42 type UnbufferedPublisher struct { |
| 43 // Topic is the name of the Topic to publish to. |
| 44 Topic Topic |
| 45 |
| 46 // Client is the Pub/Sub publisher client to use. Client will be closed
when |
| 47 // this UnbufferedPublisher is closed. |
| 48 Client *vkit.PublisherClient |
| 49 } |
| 50 |
| 51 var _ Publisher = (*UnbufferedPublisher)(nil) |
| 52 |
| 53 // Publish publishes a message immediately, blocking until it completes. |
| 54 // |
| 55 // "c" must be an AppEngine context. |
| 56 func (up *UnbufferedPublisher) Publish(c context.Context, msgs ...*pubsub.Messag
e) ([]string, error) { |
| 57 if len(msgs) == 0 { |
| 58 return nil, nil |
| 59 } |
| 60 |
| 61 messages := make([]*pb.PubsubMessage, len(msgs)) |
| 62 for i, msg := range msgs { |
| 63 messages[i] = &pb.PubsubMessage{ |
| 64 Data: msg.Data, |
| 65 Attributes: msg.Attributes, |
| 66 } |
| 67 } |
| 68 |
| 69 resp, err := up.Client.Publish(c, &pb.PublishRequest{ |
| 70 Topic: string(up.Topic), |
| 71 Messages: messages, |
| 72 }) |
| 73 if err != nil { |
| 74 return nil, err |
| 75 } |
| 76 return resp.MessageIds, nil |
| 77 } |
| 78 |
| 79 // Close closes the UnbufferedPublisher, notably its Client. |
| 80 func (up *UnbufferedPublisher) Close() error { return up.Client.Close() } |
| OLD | NEW |