Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(473)

Side by Side Diff: logdog/appengine/coordinator/archivalPublisher.go

Issue 2582253002: logdog: Use gRPC credentials when creating PubSub client, not http.Client. (Closed)
Patch Set: clear gRPC metadata as a precation Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | logdog/appengine/coordinator/coordinatorTest/archival.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The LUCI Authors. All rights reserved. 1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package coordinator 5 package coordinator
6 6
7 import ( 7 import (
8 "time" 8 "time"
9 9
10 gcps "cloud.google.com/go/pubsub" 10 gcps "cloud.google.com/go/pubsub"
11 "github.com/golang/protobuf/proto" 11 "github.com/golang/protobuf/proto"
12 log "github.com/luci/luci-go/common/logging" 12 log "github.com/luci/luci-go/common/logging"
13 "github.com/luci/luci-go/common/retry" 13 "github.com/luci/luci-go/common/retry"
14 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" 14 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1"
15 "golang.org/x/net/context" 15 "golang.org/x/net/context"
16 ) 16 )
17 17
18 // ArchivalPublisher is capable of publishing archival requests. 18 // ArchivalPublisher is capable of publishing archival requests.
19 type ArchivalPublisher interface { 19 type ArchivalPublisher interface {
20 // Close shutdowns this publisher, releasing all its resources.
21 Close() error
22
20 // Publish publishes the supplied ArchiveTask. 23 // Publish publishes the supplied ArchiveTask.
21 Publish(context.Context, *logdog.ArchiveTask) error 24 Publish(context.Context, *logdog.ArchiveTask) error
22 25
23 // NewPublishIndex returns a new publish index. Each publish index is un ique 26 // NewPublishIndex returns a new publish index. Each publish index is un ique
24 // within its request. 27 // within its request.
25 NewPublishIndex() uint64 28 NewPublishIndex() uint64
26 } 29 }
27 30
28 type pubsubArchivalPublisher struct { 31 type pubsubArchivalPublisher struct {
32 // client is Pub/Sub client used by the publisher.
33 client *gcps.Client
34
29 // topic is the authenticated Pub/Sub topic handle to publish to. 35 // topic is the authenticated Pub/Sub topic handle to publish to.
30 topic *gcps.Topic 36 topic *gcps.Topic
31 37
32 // publishIndexFunc is a function that will return a unique publish inde x 38 // publishIndexFunc is a function that will return a unique publish inde x
33 // for this request. 39 // for this request.
34 publishIndexFunc func() uint64 40 publishIndexFunc func() uint64
35 } 41 }
36 42
43 func (p *pubsubArchivalPublisher) Close() error {
44 return p.client.Close()
45 }
46
37 func (p *pubsubArchivalPublisher) Publish(c context.Context, t *logdog.ArchiveTa sk) error { 47 func (p *pubsubArchivalPublisher) Publish(c context.Context, t *logdog.ArchiveTa sk) error {
38 d, err := proto.Marshal(t) 48 d, err := proto.Marshal(t)
39 if err != nil { 49 if err != nil {
40 log.WithError(err).Errorf(c, "Failed to marshal task.") 50 log.WithError(err).Errorf(c, "Failed to marshal task.")
41 return err 51 return err
42 } 52 }
43 53
44 // TODO: Route this through some system (e.g., task queue, tumble) that can 54 // TODO: Route this through some system (e.g., task queue, tumble) that can
45 // impose a dispatch delay for the settle period. 55 // impose a dispatch delay for the settle period.
46 msg := gcps.Message{ 56 msg := gcps.Message{
(...skipping 13 matching lines...) Expand all
60 log.Fields{ 70 log.Fields{
61 log.ErrorKey: err, 71 log.ErrorKey: err,
62 "delay": d, 72 "delay": d,
63 }.Warningf(c, "Failed to publish task. Retrying...") 73 }.Warningf(c, "Failed to publish task. Retrying...")
64 }) 74 })
65 } 75 }
66 76
67 func (p *pubsubArchivalPublisher) NewPublishIndex() uint64 { 77 func (p *pubsubArchivalPublisher) NewPublishIndex() uint64 {
68 return p.publishIndexFunc() 78 return p.publishIndexFunc()
69 } 79 }
OLDNEW
« no previous file with comments | « no previous file | logdog/appengine/coordinator/coordinatorTest/archival.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698