Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(500)

Unified Diff: common/gcloud/pubsub/connection_impl.go

Issue 1838303002: Use native Pub/Sub library primitives. (Closed) Base URL: https://github.com/luci/luci-go@logdog-go1.6
Patch Set: Use "Topic" instead of "NewTopic" ... don't want to create :) Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « common/gcloud/pubsub/connection.go ('k') | common/gcloud/pubsub/pubsub.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: common/gcloud/pubsub/connection_impl.go
diff --git a/common/gcloud/pubsub/connection_impl.go b/common/gcloud/pubsub/connection_impl.go
deleted file mode 100644
index 30e27a0670bbc155fb9e56bba884a370d2546cd3..0000000000000000000000000000000000000000
--- a/common/gcloud/pubsub/connection_impl.go
+++ /dev/null
@@ -1,151 +0,0 @@
-// Copyright 2015 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-package pubsub
-
-import (
- "net/http"
-
- "github.com/luci/luci-go/common/errors"
- "golang.org/x/net/context"
- "google.golang.org/cloud"
- "google.golang.org/cloud/pubsub"
-)
-
-// connectionImpl is an implementation of Connection that communicates directly to
-// the Google Cloud Pub/Sub system.
-//
-// Currently, all errors are regarded as transient.
-type connectionImpl struct {
- client *http.Client
-}
-
-// NewConnection instantiates a new Connection instance configured to use the
-// Google Cloud Pub/Sub system.
-//
-// The supplied Client must be properly authenticated to interface with the
-// named Pub/Sub system.
-func NewConnection(c *http.Client) Connection {
- return &connectionImpl{
- client: c,
- }
-}
-
-func (p *connectionImpl) TopicExists(c context.Context, t Topic) (bool, error) {
- proj, name, err := t.SplitErr()
- if err != nil {
- return false, err
- }
-
- var exists bool
- err = contextAwareCall(c, func(c context.Context) (err error) {
- exists, err = pubsub.TopicExists(p.with(c, proj), name)
- return
- })
- if err != nil {
- return false, err
- }
- return exists, nil
-}
-
-func (p *connectionImpl) SubExists(c context.Context, s Subscription) (bool, error) {
- proj, name, err := s.SplitErr()
- if err != nil {
- return false, err
- }
-
- var exists bool
- err = contextAwareCall(c, func(c context.Context) (err error) {
- exists, err = pubsub.SubExists(p.with(c, proj), name)
- return
- })
- if err != nil {
- return false, err
- }
- return exists, nil
-}
-
-func (p *connectionImpl) Publish(c context.Context, t Topic, msgs ...*Message) ([]string, error) {
- proj, name, err := t.SplitErr()
- if err != nil {
- return nil, err
- }
-
- // Check if our Context has finished. Currently, the pubsub library does not
- // interrupt calls on Context cancellation.
- if err := c.Err(); err != nil {
- return nil, err
- }
-
- var ids []string
- err = contextAwareCall(c, func(c context.Context) (err error) {
- ids, err = pubsub.Publish(p.with(c, proj), name, localMessageToPubSub(msgs)...)
- return
- })
- if err != nil {
- return nil, err
- }
- return ids, nil
-}
-
-func (p *connectionImpl) Pull(c context.Context, s Subscription, n int) ([]*Message, error) {
- proj, name, err := s.SplitErr()
- if err != nil {
- return nil, err
- }
-
- var msgs []*pubsub.Message
- err = contextAwareCall(c, func(c context.Context) (err error) {
- msgs, err = pubsub.Pull(p.with(c, proj), name, n)
- return
- })
- if err != nil {
- return nil, err
- }
- return pubSubMessageToLocal(msgs), nil
-}
-
-func (p *connectionImpl) Ack(c context.Context, s Subscription, ackIDs ...string) error {
- proj, name, err := s.SplitErr()
- if err != nil {
- return err
- }
-
- return contextAwareCall(c, func(c context.Context) error {
- return pubsub.Ack(p.with(c, proj), name, ackIDs...)
- })
-}
-
-func (p *connectionImpl) with(c context.Context, project string) context.Context {
- return cloud.WithContext(c, project, p.client)
-}
-
-// contextAwareCall invokes the supplied function, f, and returns with either
-// f's error code or the Context's finished error code, whichever happens
-// first.
-//
-// Note that if f has side effects, they may still happen after this function
-// has returned due to Context completion, since nothing can abort f's execution
-// once executed. It is important to ensure that if this method returns an
-// error value, it is checked immediately, and that any data that f touches is
-// only consumed if this method returns nil.
-func contextAwareCall(c context.Context, f func(context.Context) error) error {
- errC := make(chan error, 1)
-
- go func() {
- defer close(errC)
- errC <- f(c)
- }()
-
- select {
- case <-c.Done():
- // Return immediately. Our "f" will finish and have its error status
- // ignored.
- return c.Err()
-
- case err := <-errC:
- // We currently treat all Pub/Sub errors as transient.
- return errors.WrapTransient(err)
- }
-}
« no previous file with comments | « common/gcloud/pubsub/connection.go ('k') | common/gcloud/pubsub/pubsub.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698