Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(48)

Side by Side Diff: common/gcloud/gcps/pubsub_impl.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package gcps 5 package gcps
6 6
7 import ( 7 import (
8 "net/http"
9
8 "github.com/luci/luci-go/common/errors" 10 "github.com/luci/luci-go/common/errors"
9 "golang.org/x/net/context" 11 "golang.org/x/net/context"
12 "google.golang.org/cloud"
10 "google.golang.org/cloud/pubsub" 13 "google.golang.org/cloud/pubsub"
11 ) 14 )
12 15
13 // pubSubImpl is an implementation of PubSub that communicates directly to the 16 // pubSubImpl is an implementation of PubSub that communicates directly to the
14 // Google Cloud Pub/Sub system. 17 // Google Cloud Pub/Sub system.
15 // 18 //
16 // Currently, all errors are regarded as transient. 19 // Currently, all errors are regarded as transient.
17 type pubSubImpl struct { 20 type pubSubImpl struct {
18 » ctx context.Context 21 » client *http.Client
dnj (Google) 2016/01/21 04:36:24 Rearrange to accommodate the end-method Context ch
22 » project string
19 } 23 }
20 24
21 // New instantiates a new PubSub instance configured to use the Google Cloud 25 // New instantiates a new PubSub instance configured to use the Google Cloud
22 // Pub/Sub system. 26 // Pub/Sub system.
23 // 27 //
24 // The supplied context must be properly authenticated to interface with the 28 // The supplied Client must be properly authenticated to interface with the
25 // named Pub/Sub system. 29 // named Pub/Sub system.
26 func New(ctx context.Context) PubSub { 30 func New(c *http.Client, project string) PubSub {
27 return &pubSubImpl{ 31 return &pubSubImpl{
28 » » ctx: ctx, 32 » » client: c,
33 » » project: project,
29 } 34 }
30 } 35 }
31 36
32 func (p *pubSubImpl) TopicExists(t Topic) (bool, error) { 37 func (p *pubSubImpl) TopicExists(c context.Context, t Topic) (bool, error) {
33 » exists, err := pubsub.TopicExists(p.ctx, string(t)) 38 » exists, err := pubsub.TopicExists(p.with(c), string(t))
34 return exists, (err) 39 return exists, (err)
35 } 40 }
36 41
37 func (p *pubSubImpl) SubExists(s Subscription) (bool, error) { 42 func (p *pubSubImpl) SubExists(c context.Context, s Subscription) (bool, error) {
38 » exists, err := pubsub.SubExists(p.ctx, string(s)) 43 » exists, err := pubsub.SubExists(p.with(c), string(s))
39 return exists, errors.WrapTransient(err) 44 return exists, errors.WrapTransient(err)
40 } 45 }
41 46
42 func (p *pubSubImpl) Publish(t Topic, msgs ...*pubsub.Message) ([]string, error) { 47 func (p *pubSubImpl) Publish(c context.Context, t Topic, msgs ...*pubsub.Message ) ([]string, error) {
43 » ids, err := pubsub.Publish(p.ctx, string(t), msgs...) 48 » ids, err := pubsub.Publish(p.with(c), string(t), msgs...)
44 return ids, errors.WrapTransient(err) 49 return ids, errors.WrapTransient(err)
45 } 50 }
46 51
47 func (p *pubSubImpl) Pull(s Subscription, n int) ([]*pubsub.Message, error) { 52 func (p *pubSubImpl) Pull(c context.Context, s Subscription, n int) ([]*pubsub.M essage, error) {
48 » msgs, err := pubsub.Pull(p.ctx, string(s), n) 53 » msgs, err := pubsub.Pull(p.with(c), string(s), n)
49 return msgs, errors.WrapTransient(err) 54 return msgs, errors.WrapTransient(err)
50 } 55 }
51 56
52 func (p *pubSubImpl) Ack(s Subscription, ackIDs ...string) error { 57 func (p *pubSubImpl) Ack(c context.Context, s Subscription, ackIDs ...string) er ror {
53 » return errors.WrapTransient(pubsub.Ack(p.ctx, string(s), ackIDs...)) 58 » return errors.WrapTransient(pubsub.Ack(p.with(c), string(s), ackIDs...))
54 } 59 }
60
61 func (p *pubSubImpl) with(c context.Context) context.Context {
62 return cloud.WithContext(c, p.project, p.client)
63 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698