Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(168)

Side by Side Diff: logdog/appengine/coordinator/archivalPublisher.go

Issue 2583033002: LogDog: Re-use Pub/Sub gRPC clients. (Closed)
Patch Set: close under lock Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | logdog/appengine/coordinator/service.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The LUCI Authors. All rights reserved. 1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package coordinator 5 package coordinator
6 6
7 import ( 7 import (
8 "time" 8 "time"
9 9
10 gcps "cloud.google.com/go/pubsub" 10 gcps "cloud.google.com/go/pubsub"
11 "github.com/golang/protobuf/proto" 11 "github.com/golang/protobuf/proto"
12 log "github.com/luci/luci-go/common/logging" 12 log "github.com/luci/luci-go/common/logging"
13 "github.com/luci/luci-go/common/retry" 13 "github.com/luci/luci-go/common/retry"
14 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" 14 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1"
15 "golang.org/x/net/context" 15 "golang.org/x/net/context"
16 ) 16 )
17 17
18 // ArchivalPublisher is capable of publishing archival requests. 18 // ArchivalPublisher is capable of publishing archival requests.
19 type ArchivalPublisher interface { 19 type ArchivalPublisher interface {
20 » // Close shutdowns this publisher, releasing all its resources. 20 » // Close shutdowns this publisher instance, releasing all its resources.
21 Close() error 21 Close() error
22 22
23 // Publish publishes the supplied ArchiveTask. 23 // Publish publishes the supplied ArchiveTask.
24 Publish(context.Context, *logdog.ArchiveTask) error 24 Publish(context.Context, *logdog.ArchiveTask) error
25 25
26 // NewPublishIndex returns a new publish index. Each publish index is un ique 26 // NewPublishIndex returns a new publish index. Each publish index is un ique
27 // within its request. 27 // within its request.
28 NewPublishIndex() uint64 28 NewPublishIndex() uint64
29 } 29 }
30 30
31 type pubsubArchivalPublisher struct { 31 type pubsubArchivalPublisher struct {
32 // client is Pub/Sub client used by the publisher. 32 // client is Pub/Sub client used by the publisher.
33 //
34 // This client is owned by the prodServicesInst that created this instna ce,
35 // and should not be closed on shutdown here.
33 client *gcps.Client 36 client *gcps.Client
34 37
35 // topic is the authenticated Pub/Sub topic handle to publish to. 38 // topic is the authenticated Pub/Sub topic handle to publish to.
36 topic *gcps.Topic 39 topic *gcps.Topic
37 40
38 // publishIndexFunc is a function that will return a unique publish inde x 41 // publishIndexFunc is a function that will return a unique publish inde x
39 // for this request. 42 // for this request.
40 publishIndexFunc func() uint64 43 publishIndexFunc func() uint64
41 } 44 }
42 45
43 func (p *pubsubArchivalPublisher) Close() error { 46 func (p *pubsubArchivalPublisher) Close() error { return nil }
44 » return p.client.Close()
45 }
46 47
47 func (p *pubsubArchivalPublisher) Publish(c context.Context, t *logdog.ArchiveTa sk) error { 48 func (p *pubsubArchivalPublisher) Publish(c context.Context, t *logdog.ArchiveTa sk) error {
48 d, err := proto.Marshal(t) 49 d, err := proto.Marshal(t)
49 if err != nil { 50 if err != nil {
50 log.WithError(err).Errorf(c, "Failed to marshal task.") 51 log.WithError(err).Errorf(c, "Failed to marshal task.")
51 return err 52 return err
52 } 53 }
53 54
54 // TODO: Route this through some system (e.g., task queue, tumble) that can 55 // TODO: Route this through some system (e.g., task queue, tumble) that can
55 // impose a dispatch delay for the settle period. 56 // impose a dispatch delay for the settle period.
(...skipping 14 matching lines...) Expand all
70 log.Fields{ 71 log.Fields{
71 log.ErrorKey: err, 72 log.ErrorKey: err,
72 "delay": d, 73 "delay": d,
73 }.Warningf(c, "Failed to publish task. Retrying...") 74 }.Warningf(c, "Failed to publish task. Retrying...")
74 }) 75 })
75 } 76 }
76 77
77 func (p *pubsubArchivalPublisher) NewPublishIndex() uint64 { 78 func (p *pubsubArchivalPublisher) NewPublishIndex() uint64 {
78 return p.publishIndexFunc() 79 return p.publishIndexFunc()
79 } 80 }
OLDNEW
« no previous file with comments | « no previous file | logdog/appengine/coordinator/service.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698