Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(362)

Side by Side Diff: common/gcloud/pubsub/connection_impl.go

Issue 1838303002: Use native Pub/Sub library primitives. (Closed) Base URL: https://github.com/luci/luci-go@logdog-go1.6
Patch Set: Use "Topic" instead of "NewTopic" ... don't want to create :) Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « common/gcloud/pubsub/connection.go ('k') | common/gcloud/pubsub/pubsub.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package pubsub
6
7 import (
8 "net/http"
9
10 "github.com/luci/luci-go/common/errors"
11 "golang.org/x/net/context"
12 "google.golang.org/cloud"
13 "google.golang.org/cloud/pubsub"
14 )
15
16 // connectionImpl is an implementation of Connection that communicates directly to
17 // the Google Cloud Pub/Sub system.
18 //
19 // Currently, all errors are regarded as transient.
20 type connectionImpl struct {
21 client *http.Client
22 }
23
24 // NewConnection instantiates a new Connection instance configured to use the
25 // Google Cloud Pub/Sub system.
26 //
27 // The supplied Client must be properly authenticated to interface with the
28 // named Pub/Sub system.
29 func NewConnection(c *http.Client) Connection {
30 return &connectionImpl{
31 client: c,
32 }
33 }
34
35 func (p *connectionImpl) TopicExists(c context.Context, t Topic) (bool, error) {
36 proj, name, err := t.SplitErr()
37 if err != nil {
38 return false, err
39 }
40
41 var exists bool
42 err = contextAwareCall(c, func(c context.Context) (err error) {
43 exists, err = pubsub.TopicExists(p.with(c, proj), name)
44 return
45 })
46 if err != nil {
47 return false, err
48 }
49 return exists, nil
50 }
51
52 func (p *connectionImpl) SubExists(c context.Context, s Subscription) (bool, err or) {
53 proj, name, err := s.SplitErr()
54 if err != nil {
55 return false, err
56 }
57
58 var exists bool
59 err = contextAwareCall(c, func(c context.Context) (err error) {
60 exists, err = pubsub.SubExists(p.with(c, proj), name)
61 return
62 })
63 if err != nil {
64 return false, err
65 }
66 return exists, nil
67 }
68
69 func (p *connectionImpl) Publish(c context.Context, t Topic, msgs ...*Message) ( []string, error) {
70 proj, name, err := t.SplitErr()
71 if err != nil {
72 return nil, err
73 }
74
75 // Check if our Context has finished. Currently, the pubsub library does not
76 // interrupt calls on Context cancellation.
77 if err := c.Err(); err != nil {
78 return nil, err
79 }
80
81 var ids []string
82 err = contextAwareCall(c, func(c context.Context) (err error) {
83 ids, err = pubsub.Publish(p.with(c, proj), name, localMessageToP ubSub(msgs)...)
84 return
85 })
86 if err != nil {
87 return nil, err
88 }
89 return ids, nil
90 }
91
92 func (p *connectionImpl) Pull(c context.Context, s Subscription, n int) ([]*Mess age, error) {
93 proj, name, err := s.SplitErr()
94 if err != nil {
95 return nil, err
96 }
97
98 var msgs []*pubsub.Message
99 err = contextAwareCall(c, func(c context.Context) (err error) {
100 msgs, err = pubsub.Pull(p.with(c, proj), name, n)
101 return
102 })
103 if err != nil {
104 return nil, err
105 }
106 return pubSubMessageToLocal(msgs), nil
107 }
108
109 func (p *connectionImpl) Ack(c context.Context, s Subscription, ackIDs ...string ) error {
110 proj, name, err := s.SplitErr()
111 if err != nil {
112 return err
113 }
114
115 return contextAwareCall(c, func(c context.Context) error {
116 return pubsub.Ack(p.with(c, proj), name, ackIDs...)
117 })
118 }
119
120 func (p *connectionImpl) with(c context.Context, project string) context.Context {
121 return cloud.WithContext(c, project, p.client)
122 }
123
124 // contextAwareCall invokes the supplied function, f, and returns with either
125 // f's error code or the Context's finished error code, whichever happens
126 // first.
127 //
128 // Note that if f has side effects, they may still happen after this function
129 // has returned due to Context completion, since nothing can abort f's execution
130 // once executed. It is important to ensure that if this method returns an
131 // error value, it is checked immediately, and that any data that f touches is
132 // only consumed if this method returns nil.
133 func contextAwareCall(c context.Context, f func(context.Context) error) error {
134 errC := make(chan error, 1)
135
136 go func() {
137 defer close(errC)
138 errC <- f(c)
139 }()
140
141 select {
142 case <-c.Done():
143 // Return immediately. Our "f" will finish and have its error st atus
144 // ignored.
145 return c.Err()
146
147 case err := <-errC:
148 // We currently treat all Pub/Sub errors as transient.
149 return errors.WrapTransient(err)
150 }
151 }
OLDNEW
« no previous file with comments | « common/gcloud/pubsub/connection.go ('k') | common/gcloud/pubsub/pubsub.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698