Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(82)

Side by Side Diff: server/cmd/logdog_collector/main.go

Issue 1838303002: Use native Pub/Sub library primitives. (Closed) Base URL: https://github.com/luci/luci-go@logdog-go1.6
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package main 5 package main
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "time" 9 "time"
10 10
11 "github.com/luci/luci-go/common/auth" 11 "github.com/luci/luci-go/common/auth"
12 "github.com/luci/luci-go/common/clock" 12 "github.com/luci/luci-go/common/clock"
13 "github.com/luci/luci-go/common/errors" 13 "github.com/luci/luci-go/common/errors"
14 » "github.com/luci/luci-go/common/gcloud/pubsub" 14 » gcps "github.com/luci/luci-go/common/gcloud/pubsub"
15 » "github.com/luci/luci-go/common/gcloud/pubsub/ackbuffer"
16 » "github.com/luci/luci-go/common/gcloud/pubsub/subscriber"
17 log "github.com/luci/luci-go/common/logging" 15 log "github.com/luci/luci-go/common/logging"
16 "github.com/luci/luci-go/common/parallel"
18 "github.com/luci/luci-go/server/internal/logdog/collector" 17 "github.com/luci/luci-go/server/internal/logdog/collector"
19 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" 18 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator"
20 "github.com/luci/luci-go/server/internal/logdog/service" 19 "github.com/luci/luci-go/server/internal/logdog/service"
21 "golang.org/x/net/context" 20 "golang.org/x/net/context"
21 "google.golang.org/cloud"
22 "google.golang.org/cloud/pubsub"
22 ) 23 )
23 24
24 var ( 25 var (
25 errInvalidConfig = errors.New("invalid configuration") 26 errInvalidConfig = errors.New("invalid configuration")
26 ) 27 )
27 28
29 const (
30 pubsubPullErrorDelay = 10 * time.Second
31 )
32
28 // application is the Collector application state. 33 // application is the Collector application state.
29 type application struct { 34 type application struct {
30 service.Service 35 service.Service
31 } 36 }
32 37
33 // run is the main execution function. 38 // run is the main execution function.
34 func (a *application) runCollector(c context.Context) error { 39 func (a *application) runCollector(c context.Context) error {
35 cfg := a.Config() 40 cfg := a.Config()
36 ccfg := cfg.GetCollector() 41 ccfg := cfg.GetCollector()
37 if ccfg == nil { 42 if ccfg == nil {
38 return errors.New("no collector configuration") 43 return errors.New("no collector configuration")
39 } 44 }
40 45
41 pscfg := cfg.GetTransport().GetPubsub() 46 pscfg := cfg.GetTransport().GetPubsub()
42 if pscfg == nil { 47 if pscfg == nil {
43 return errors.New("missing Pub/Sub configuration") 48 return errors.New("missing Pub/Sub configuration")
44 } 49 }
45 50
46 // Our Subscription must be a valid one. 51 // Our Subscription must be a valid one.
47 » sub := pubsub.NewSubscription(pscfg.Project, pscfg.Subscription) 52 » sub := gcps.NewSubscription(pscfg.Project, pscfg.Subscription)
48 if err := sub.Validate(); err != nil { 53 if err := sub.Validate(); err != nil {
49 return fmt.Errorf("invalid Pub/Sub subscription %q: %v", sub, er r) 54 return fmt.Errorf("invalid Pub/Sub subscription %q: %v", sub, er r)
50 } 55 }
51 56
52 // New PubSub instance with the authenticated client. 57 // New PubSub instance with the authenticated client.
53 » psClient, err := a.AuthenticatedClient(func(o *auth.Options) { 58 » psAuth, err := a.Authenticator(func(o *auth.Options) {
54 » » o.Scopes = pubsub.SubscriberScopes 59 » » o.Scopes = gcps.SubscriberScopes
55 }) 60 })
56 if err != nil { 61 if err != nil {
57 » » log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.") 62 » » log.WithError(err).Errorf(c, "Failed to create Pub/Sub token sou rce.")
58 return err 63 return err
59 } 64 }
60 65
61 » // Create a retrying Pub/Sub client. 66 » psClient, err := pubsub.NewClient(c, pscfg.Project, cloud.WithTokenSourc e(psAuth.TokenSource()))
62 » ps := &pubsub.Retry{
63 » » Connection: pubsub.NewConnection(psClient),
64 » » Callback: func(err error, d time.Duration) {
65 » » » log.Fields{
66 » » » » log.ErrorKey: err,
67 » » » » "delay": d,
68 » » » }.Warningf(c, "Transient error encountered; retrying..." )
69 » » },
70 » }
71
72 » exists, err := ps.SubExists(c, sub)
73 if err != nil { 67 if err != nil {
74 log.Fields{ 68 log.Fields{
75 log.ErrorKey: err, 69 log.ErrorKey: err,
70 "subscription": sub,
71 }.Errorf(c, "Failed to create Pub/Sub client.")
72 return err
73 }
74
75 psSub := psClient.Subscription(pscfg.Subscription)
76 exists, err := psSub.Exists(c)
77 if err != nil {
78 log.Fields{
79 log.ErrorKey: err,
76 "subscription": sub, 80 "subscription": sub,
77 }.Errorf(c, "Could not confirm Pub/Sub subscription.") 81 }.Errorf(c, "Could not confirm Pub/Sub subscription.")
78 return errInvalidConfig 82 return errInvalidConfig
79 } 83 }
80 if !exists { 84 if !exists {
81 log.Fields{ 85 log.Fields{
82 "subscription": sub, 86 "subscription": sub,
83 }.Errorf(c, "Subscription does not exist.") 87 }.Errorf(c, "Subscription does not exist.")
84 return errInvalidConfig 88 return errInvalidConfig
85 } 89 }
86 log.Fields{ 90 log.Fields{
87 "subscription": sub, 91 "subscription": sub,
88 }.Infof(c, "Successfully validated Pub/Sub subscription.") 92 }.Infof(c, "Successfully validated Pub/Sub subscription.")
89 93
90 // Initialize our Storage. 94 // Initialize our Storage.
91 s, err := a.IntermediateStorage(c) 95 s, err := a.IntermediateStorage(c)
92 if err != nil { 96 if err != nil {
93 log.WithError(err).Errorf(c, "Failed to get storage instance.") 97 log.WithError(err).Errorf(c, "Failed to get storage instance.")
94 return err 98 return err
95 } 99 }
96 defer s.Close() 100 defer s.Close()
97 101
98 // Application shutdown will now operate by cancelling the Collector's 102 // Application shutdown will now operate by cancelling the Collector's
99 // shutdown Context. 103 // shutdown Context.
100 shutdownCtx, shutdownFunc := context.WithCancel(c) 104 shutdownCtx, shutdownFunc := context.WithCancel(c)
101 a.SetShutdownFunc(shutdownFunc) 105 a.SetShutdownFunc(shutdownFunc)
102 106
103 // Start an ACK buffer so that we can batch ACKs. Note that we do NOT us e the
104 // shutdown context here, as we want clean shutdowns to continue to ack any
105 // buffered messages.
106 ab := ackbuffer.New(c, ackbuffer.Config{
107 Ack: ackbuffer.NewACK(ps, sub, 0),
108 })
109 defer ab.CloseAndFlush()
110
111 // Initialize our Collector service object using a caching Coordinator 107 // Initialize our Collector service object using a caching Coordinator
112 // interface. 108 // interface.
113 coord := coordinator.NewCoordinator(a.Coordinator()) 109 coord := coordinator.NewCoordinator(a.Coordinator())
114 coord = coordinator.NewCache(coord, int(ccfg.StateCacheSize), ccfg.State CacheExpiration.Duration()) 110 coord = coordinator.NewCache(coord, int(ccfg.StateCacheSize), ccfg.State CacheExpiration.Duration())
115 111
116 coll := collector.Collector{ 112 coll := collector.Collector{
117 » » Coordinator: coord, 113 » » Coordinator: coord,
118 » » Storage: s, 114 » » Storage: s,
119 » » MaxParallelBundles: int(ccfg.Workers),
120 » » MaxIngestWorkers: int(ccfg.Workers),
121 } 115 }
122 defer coll.Close() 116 defer coll.Close()
123 117
124 » // Execute our main Subscriber loop. It will run until the supplied Cont ext 118 » // Execute our main subscription pull loop. It will run until the suppli ed
125 » // is cancelled. 119 » // Context is cancelled.
126 » clk := clock.Get(c) 120 » psIterator, err := psSub.Pull(c)
127 » engine := subscriber.Subscriber{ 121 » if err != nil {
128 » » S: subscriber.NewSource(ps, sub, 0), 122 » » log.WithError(err).Errorf(c, "Failed to create Pub/Sub iterator. ")
129 » » A: ab, 123 » » return err
124 » }
125 » defer func() {
126 » » log.Debugf(c, "Waiting for Pub/Sub subscription iterator to stop ...")
127 » » psIterator.Stop()
128 » » log.Debugf(c, "Pub/Sub subscription iterator has stopped.")
129 » }()
130 130
131 » » PullWorkers: int(ccfg.TransportWorkers), 131 » parallel.Ignore(parallel.Run(int(ccfg.TransportConcurrentMessages), func (taskC chan<- func() error) {
132 » » HandlerWorkers: int(ccfg.Workers), 132 » » // Loop until shut down.
133 » } 133 » » for shutdownCtx.Err() == nil {
134 » engine.Run(shutdownCtx, func(msg *pubsub.Message) bool { 134 » » » msg, err := psIterator.Next()
135 » » c := log.SetField(c, "messageID", msg.ID) 135 » » » if err != nil {
136 » » log.Fields{ 136 » » » » log.Fields{
137 » » » "ackID": msg.AckID, 137 » » » » » log.ErrorKey: err,
138 » » » "size": len(msg.Data), 138 » » » » » "delay": pubsubPullErrorDelay,
139 » » }.Infof(c, "Received Pub/Sub Message.") 139 » » » » }.Errorf(c, "Failed to fetch Pub/Sub message, re try after delay...")
140 » » » » clock.Sleep(c, pubsubPullErrorDelay)
141 » » » » continue
142 » » » }
140 143
141 » » startTime := clk.Now() 144 » » » taskC <- func() error {
142 » » err := coll.Process(c, msg.Data) 145 » » » » c := log.SetField(c, "messageID", msg.ID)
143 » » duration := clk.Now().Sub(startTime) 146 » » » » msg.Done(a.processMessage(c, &coll, msg))
144 147 » » » » return nil
145 » » switch { 148 » » » }
146 » » case errors.IsTransient(err):
147 » » » // Do not consume
148 » » » log.Fields{
149 » » » » log.ErrorKey: err,
150 » » » » "duration": duration,
151 » » » }.Warningf(c, "TRANSIENT error ingesting Pub/Sub message .")
152 » » » return false
153
154 » » case err == nil:
155 » » » log.Fields{
156 » » » » "ackID": msg.AckID,
157 » » » » "size": len(msg.Data),
158 » » » » "duration": duration,
159 » » » }.Infof(c, "Message successfully processed; ACKing.")
160 » » » return true
161
162 » » default:
163 » » » log.Fields{
164 » » » » log.ErrorKey: err,
165 » » » » "ackID": msg.AckID,
166 » » » » "size": len(msg.Data),
167 » » » » "duration": duration,
168 » » » }.Errorf(c, "Non-transient error ingesting Pub/Sub messa ge; ACKing.")
169 » » » return true
170 } 149 }
171 » }) 150 » }))
172 151
173 log.Debugf(c, "Collector finished.") 152 log.Debugf(c, "Collector finished.")
174 return nil 153 return nil
175 } 154 }
176 155
156 // processMessage returns true if the message should be ACK'd (deleted from
157 // Pub/Sub) or false if the message should not be ACK'd.
158 func (a *application) processMessage(c context.Context, coll *collector.Collecto r, msg *pubsub.Message) bool {
159 log.Fields{
160 "ackID": msg.AckID,
161 "size": len(msg.Data),
162 }.Infof(c, "Received Pub/Sub Message.")
163
164 startTime := clock.Now(c)
165 err := coll.Process(c, msg.Data)
166 duration := clock.Now(c).Sub(startTime)
167
168 switch {
169 case errors.IsTransient(err):
170 // Do not consume
171 log.Fields{
172 log.ErrorKey: err,
173 "duration": duration,
174 }.Warningf(c, "TRANSIENT error ingesting Pub/Sub message.")
175 return false
176
177 case err == nil:
178 log.Fields{
179 "ackID": msg.AckID,
180 "size": len(msg.Data),
181 "duration": duration,
182 }.Infof(c, "Message successfully processed; ACKing.")
183 return true
184
185 default:
186 log.Fields{
187 log.ErrorKey: err,
188 "ackID": msg.AckID,
189 "size": len(msg.Data),
190 "duration": duration,
191 }.Errorf(c, "Non-transient error ingesting Pub/Sub message; ACKi ng.")
192 return true
193 }
194 }
195
177 // Entry point. 196 // Entry point.
178 func main() { 197 func main() {
179 a := application{} 198 a := application{}
180 a.Run(context.Background(), a.runCollector) 199 a.Run(context.Background(), a.runCollector)
181 } 200 }
OLDNEW
« client/internal/logdog/butler/output/pubsub/pubsubOutput.go ('K') | « common/tsmon/monitor/pubsub.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698