| OLD | NEW |
| (Empty) |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package pubsub | |
| 6 | |
| 7 import ( | |
| 8 "net/http" | |
| 9 | |
| 10 "github.com/luci/luci-go/common/errors" | |
| 11 "golang.org/x/net/context" | |
| 12 "google.golang.org/cloud" | |
| 13 "google.golang.org/cloud/pubsub" | |
| 14 ) | |
| 15 | |
| 16 // connectionImpl is an implementation of Connection that communicates directly
to | |
| 17 // the Google Cloud Pub/Sub system. | |
| 18 // | |
| 19 // Currently, all errors are regarded as transient. | |
| 20 type connectionImpl struct { | |
| 21 client *http.Client | |
| 22 } | |
| 23 | |
| 24 // NewConnection instantiates a new Connection instance configured to use the | |
| 25 // Google Cloud Pub/Sub system. | |
| 26 // | |
| 27 // The supplied Client must be properly authenticated to interface with the | |
| 28 // named Pub/Sub system. | |
| 29 func NewConnection(c *http.Client) Connection { | |
| 30 return &connectionImpl{ | |
| 31 client: c, | |
| 32 } | |
| 33 } | |
| 34 | |
| 35 func (p *connectionImpl) TopicExists(c context.Context, t Topic) (bool, error) { | |
| 36 proj, name, err := t.SplitErr() | |
| 37 if err != nil { | |
| 38 return false, err | |
| 39 } | |
| 40 | |
| 41 var exists bool | |
| 42 err = contextAwareCall(c, func(c context.Context) (err error) { | |
| 43 exists, err = pubsub.TopicExists(p.with(c, proj), name) | |
| 44 return | |
| 45 }) | |
| 46 if err != nil { | |
| 47 return false, err | |
| 48 } | |
| 49 return exists, nil | |
| 50 } | |
| 51 | |
| 52 func (p *connectionImpl) SubExists(c context.Context, s Subscription) (bool, err
or) { | |
| 53 proj, name, err := s.SplitErr() | |
| 54 if err != nil { | |
| 55 return false, err | |
| 56 } | |
| 57 | |
| 58 var exists bool | |
| 59 err = contextAwareCall(c, func(c context.Context) (err error) { | |
| 60 exists, err = pubsub.SubExists(p.with(c, proj), name) | |
| 61 return | |
| 62 }) | |
| 63 if err != nil { | |
| 64 return false, err | |
| 65 } | |
| 66 return exists, nil | |
| 67 } | |
| 68 | |
| 69 func (p *connectionImpl) Publish(c context.Context, t Topic, msgs ...*Message) (
[]string, error) { | |
| 70 proj, name, err := t.SplitErr() | |
| 71 if err != nil { | |
| 72 return nil, err | |
| 73 } | |
| 74 | |
| 75 // Check if our Context has finished. Currently, the pubsub library does
not | |
| 76 // interrupt calls on Context cancellation. | |
| 77 if err := c.Err(); err != nil { | |
| 78 return nil, err | |
| 79 } | |
| 80 | |
| 81 var ids []string | |
| 82 err = contextAwareCall(c, func(c context.Context) (err error) { | |
| 83 ids, err = pubsub.Publish(p.with(c, proj), name, localMessageToP
ubSub(msgs)...) | |
| 84 return | |
| 85 }) | |
| 86 if err != nil { | |
| 87 return nil, err | |
| 88 } | |
| 89 return ids, nil | |
| 90 } | |
| 91 | |
| 92 func (p *connectionImpl) Pull(c context.Context, s Subscription, n int) ([]*Mess
age, error) { | |
| 93 proj, name, err := s.SplitErr() | |
| 94 if err != nil { | |
| 95 return nil, err | |
| 96 } | |
| 97 | |
| 98 var msgs []*pubsub.Message | |
| 99 err = contextAwareCall(c, func(c context.Context) (err error) { | |
| 100 msgs, err = pubsub.Pull(p.with(c, proj), name, n) | |
| 101 return | |
| 102 }) | |
| 103 if err != nil { | |
| 104 return nil, err | |
| 105 } | |
| 106 return pubSubMessageToLocal(msgs), nil | |
| 107 } | |
| 108 | |
| 109 func (p *connectionImpl) Ack(c context.Context, s Subscription, ackIDs ...string
) error { | |
| 110 proj, name, err := s.SplitErr() | |
| 111 if err != nil { | |
| 112 return err | |
| 113 } | |
| 114 | |
| 115 return contextAwareCall(c, func(c context.Context) error { | |
| 116 return pubsub.Ack(p.with(c, proj), name, ackIDs...) | |
| 117 }) | |
| 118 } | |
| 119 | |
| 120 func (p *connectionImpl) with(c context.Context, project string) context.Context
{ | |
| 121 return cloud.WithContext(c, project, p.client) | |
| 122 } | |
| 123 | |
| 124 // contextAwareCall invokes the supplied function, f, and returns with either | |
| 125 // f's error code or the Context's finished error code, whichever happens | |
| 126 // first. | |
| 127 // | |
| 128 // Note that if f has side effects, they may still happen after this function | |
| 129 // has returned due to Context completion, since nothing can abort f's execution | |
| 130 // once executed. It is important to ensure that if this method returns an | |
| 131 // error value, it is checked immediately, and that any data that f touches is | |
| 132 // only consumed if this method returns nil. | |
| 133 func contextAwareCall(c context.Context, f func(context.Context) error) error { | |
| 134 errC := make(chan error, 1) | |
| 135 | |
| 136 go func() { | |
| 137 defer close(errC) | |
| 138 errC <- f(c) | |
| 139 }() | |
| 140 | |
| 141 select { | |
| 142 case <-c.Done(): | |
| 143 // Return immediately. Our "f" will finish and have its error st
atus | |
| 144 // ignored. | |
| 145 return c.Err() | |
| 146 | |
| 147 case err := <-errC: | |
| 148 // We currently treat all Pub/Sub errors as transient. | |
| 149 return errors.WrapTransient(err) | |
| 150 } | |
| 151 } | |
| OLD | NEW |