Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(4)

Side by Side Diff: server/cmd/logdog_collector/main.go

Issue 1838303002: Use native Pub/Sub library primitives. (Closed) Base URL: https://github.com/luci/luci-go@logdog-go1.6
Patch Set: Use "Topic" instead of "NewTopic" ... don't want to create :) Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « common/tsmon/monitor/pubsub.go ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package main 5 package main
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "time" 9 "time"
10 10
11 "github.com/luci/luci-go/common/auth" 11 "github.com/luci/luci-go/common/auth"
12 "github.com/luci/luci-go/common/clock" 12 "github.com/luci/luci-go/common/clock"
13 "github.com/luci/luci-go/common/errors" 13 "github.com/luci/luci-go/common/errors"
14 » "github.com/luci/luci-go/common/gcloud/pubsub" 14 » gcps "github.com/luci/luci-go/common/gcloud/pubsub"
15 » "github.com/luci/luci-go/common/gcloud/pubsub/ackbuffer"
16 » "github.com/luci/luci-go/common/gcloud/pubsub/subscriber"
17 log "github.com/luci/luci-go/common/logging" 15 log "github.com/luci/luci-go/common/logging"
16 "github.com/luci/luci-go/common/parallel"
18 "github.com/luci/luci-go/server/internal/logdog/collector" 17 "github.com/luci/luci-go/server/internal/logdog/collector"
19 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" 18 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator"
20 "github.com/luci/luci-go/server/internal/logdog/service" 19 "github.com/luci/luci-go/server/internal/logdog/service"
21 "golang.org/x/net/context" 20 "golang.org/x/net/context"
21 "google.golang.org/cloud"
22 "google.golang.org/cloud/pubsub"
22 ) 23 )
23 24
24 var ( 25 var (
25 errInvalidConfig = errors.New("invalid configuration") 26 errInvalidConfig = errors.New("invalid configuration")
26 ) 27 )
27 28
29 const (
30 pubsubPullErrorDelay = 10 * time.Second
31 )
32
28 // application is the Collector application state. 33 // application is the Collector application state.
29 type application struct { 34 type application struct {
30 service.Service 35 service.Service
31 } 36 }
32 37
33 // run is the main execution function. 38 // run is the main execution function.
34 func (a *application) runCollector(c context.Context) error { 39 func (a *application) runCollector(c context.Context) error {
35 cfg := a.Config() 40 cfg := a.Config()
36 ccfg := cfg.GetCollector() 41 ccfg := cfg.GetCollector()
37 if ccfg == nil { 42 if ccfg == nil {
38 return errors.New("no collector configuration") 43 return errors.New("no collector configuration")
39 } 44 }
40 45
41 pscfg := cfg.GetTransport().GetPubsub() 46 pscfg := cfg.GetTransport().GetPubsub()
42 if pscfg == nil { 47 if pscfg == nil {
43 return errors.New("missing Pub/Sub configuration") 48 return errors.New("missing Pub/Sub configuration")
44 } 49 }
45 50
46 // Our Subscription must be a valid one. 51 // Our Subscription must be a valid one.
47 » sub := pubsub.NewSubscription(pscfg.Project, pscfg.Subscription) 52 » sub := gcps.NewSubscription(pscfg.Project, pscfg.Subscription)
48 if err := sub.Validate(); err != nil { 53 if err := sub.Validate(); err != nil {
49 return fmt.Errorf("invalid Pub/Sub subscription %q: %v", sub, er r) 54 return fmt.Errorf("invalid Pub/Sub subscription %q: %v", sub, er r)
50 } 55 }
51 56
52 // New PubSub instance with the authenticated client. 57 // New PubSub instance with the authenticated client.
53 » psClient, err := a.AuthenticatedClient(func(o *auth.Options) { 58 » psAuth, err := a.Authenticator(func(o *auth.Options) {
54 » » o.Scopes = pubsub.SubscriberScopes 59 » » o.Scopes = gcps.SubscriberScopes
55 }) 60 })
56 if err != nil { 61 if err != nil {
57 » » log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.") 62 » » log.WithError(err).Errorf(c, "Failed to create Pub/Sub token sou rce.")
58 return err 63 return err
59 } 64 }
60 65
61 » // Create a retrying Pub/Sub client. 66 » psClient, err := pubsub.NewClient(c, pscfg.Project, cloud.WithTokenSourc e(psAuth.TokenSource()))
62 » ps := &pubsub.Retry{
63 » » Connection: pubsub.NewConnection(psClient),
64 » » Callback: func(err error, d time.Duration) {
65 » » » log.Fields{
66 » » » » log.ErrorKey: err,
67 » » » » "delay": d,
68 » » » }.Warningf(c, "Transient error encountered; retrying..." )
69 » » },
70 » }
71
72 » exists, err := ps.SubExists(c, sub)
73 if err != nil { 67 if err != nil {
74 log.Fields{ 68 log.Fields{
75 log.ErrorKey: err, 69 log.ErrorKey: err,
70 "subscription": sub,
71 }.Errorf(c, "Failed to create Pub/Sub client.")
72 return err
73 }
74
75 psSub := psClient.Subscription(pscfg.Subscription)
76 exists, err := psSub.Exists(c)
77 if err != nil {
78 log.Fields{
79 log.ErrorKey: err,
76 "subscription": sub, 80 "subscription": sub,
77 }.Errorf(c, "Could not confirm Pub/Sub subscription.") 81 }.Errorf(c, "Could not confirm Pub/Sub subscription.")
78 return errInvalidConfig 82 return errInvalidConfig
79 } 83 }
80 if !exists { 84 if !exists {
81 log.Fields{ 85 log.Fields{
82 "subscription": sub, 86 "subscription": sub,
83 }.Errorf(c, "Subscription does not exist.") 87 }.Errorf(c, "Subscription does not exist.")
84 return errInvalidConfig 88 return errInvalidConfig
85 } 89 }
86 log.Fields{ 90 log.Fields{
87 "subscription": sub, 91 "subscription": sub,
88 }.Infof(c, "Successfully validated Pub/Sub subscription.") 92 }.Infof(c, "Successfully validated Pub/Sub subscription.")
89 93
90 st, err := a.IntermediateStorage(c) 94 st, err := a.IntermediateStorage(c)
91 if err != nil { 95 if err != nil {
92 return err 96 return err
93 } 97 }
94 defer st.Close() 98 defer st.Close()
95 99
96 // Application shutdown will now operate by cancelling the Collector's 100 // Application shutdown will now operate by cancelling the Collector's
97 // shutdown Context. 101 // shutdown Context.
98 shutdownCtx, shutdownFunc := context.WithCancel(c) 102 shutdownCtx, shutdownFunc := context.WithCancel(c)
99 a.SetShutdownFunc(shutdownFunc) 103 a.SetShutdownFunc(shutdownFunc)
100 104
101 // Start an ACK buffer so that we can batch ACKs. Note that we do NOT us e the
102 // shutdown context here, as we want clean shutdowns to continue to ack any
103 // buffered messages.
104 ab := ackbuffer.New(c, ackbuffer.Config{
105 Ack: ackbuffer.NewACK(ps, sub, 0),
106 })
107 defer ab.CloseAndFlush()
108
109 // Initialize our Collector service object using a caching Coordinator 105 // Initialize our Collector service object using a caching Coordinator
110 // interface. 106 // interface.
111 coord := coordinator.NewCoordinator(a.Coordinator()) 107 coord := coordinator.NewCoordinator(a.Coordinator())
112 coord = coordinator.NewCache(coord, int(ccfg.StateCacheSize), ccfg.State CacheExpiration.Duration()) 108 coord = coordinator.NewCache(coord, int(ccfg.StateCacheSize), ccfg.State CacheExpiration.Duration())
113 109
114 coll := collector.Collector{ 110 coll := collector.Collector{
115 Coordinator: coord, 111 Coordinator: coord,
116 Storage: st, 112 Storage: st,
117 MaxMessageWorkers: int(ccfg.MaxMessageWorkers), 113 MaxMessageWorkers: int(ccfg.MaxMessageWorkers),
118 } 114 }
119 defer coll.Close() 115 defer coll.Close()
120 116
121 » // Execute our main Subscriber loop. It will run until the supplied Cont ext 117 » // Execute our main subscription pull loop. It will run until the suppli ed
122 » // is cancelled. 118 » // Context is cancelled.
123 » clk := clock.Get(c) 119 » psIterator, err := psSub.Pull(c)
124 » engine := subscriber.Subscriber{ 120 » if err != nil {
125 » » S: subscriber.NewSource(ps, sub), 121 » » log.WithError(err).Errorf(c, "Failed to create Pub/Sub iterator. ")
126 » » A: ab, 122 » » return err
127 » » Workers: int(ccfg.MaxConcurrentMessages),
128 } 123 }
129 » engine.Run(shutdownCtx, func(msg *pubsub.Message) bool { 124 » defer func() {
130 » » c := log.SetField(c, "messageID", msg.ID) 125 » » log.Debugf(c, "Waiting for Pub/Sub subscription iterator to stop ...")
131 » » log.Fields{ 126 » » psIterator.Stop()
132 » » » "ackID": msg.AckID, 127 » » log.Debugf(c, "Pub/Sub subscription iterator has stopped.")
133 » » » "size": len(msg.Data), 128 » }()
134 » » }.Infof(c, "Received Pub/Sub Message.")
135 129
136 » » startTime := clk.Now() 130 » parallel.Ignore(parallel.Run(int(ccfg.MaxConcurrentMessages), func(taskC chan<- func() error) {
137 » » err := coll.Process(c, msg.Data) 131 » » // Loop until shut down.
138 » » duration := clk.Now().Sub(startTime) 132 » » for shutdownCtx.Err() == nil {
133 » » » msg, err := psIterator.Next()
134 » » » if err != nil {
135 » » » » log.Fields{
136 » » » » » log.ErrorKey: err,
137 » » » » » "delay": pubsubPullErrorDelay,
138 » » » » }.Errorf(c, "Failed to fetch Pub/Sub message, re try after delay...")
139 » » » » clock.Sleep(c, pubsubPullErrorDelay)
140 » » » » continue
141 » » » }
139 142
140 » » switch { 143 » » » taskC <- func() error {
141 » » case errors.IsTransient(err): 144 » » » » c := log.SetField(c, "messageID", msg.ID)
142 » » » // Do not consume 145 » » » » msg.Done(a.processMessage(c, &coll, msg))
143 » » » log.Fields{ 146 » » » » return nil
144 » » » » log.ErrorKey: err, 147 » » » }
145 » » » » "duration": duration,
146 » » » }.Warningf(c, "TRANSIENT error ingesting Pub/Sub message .")
147 » » » return false
148
149 » » case err == nil:
150 » » » log.Fields{
151 » » » » "ackID": msg.AckID,
152 » » » » "size": len(msg.Data),
153 » » » » "duration": duration,
154 » » » }.Infof(c, "Message successfully processed; ACKing.")
155 » » » return true
156
157 » » default:
158 » » » log.Fields{
159 » » » » log.ErrorKey: err,
160 » » » » "ackID": msg.AckID,
161 » » » » "size": len(msg.Data),
162 » » » » "duration": duration,
163 » » » }.Errorf(c, "Non-transient error ingesting Pub/Sub messa ge; ACKing.")
164 » » » return true
165 } 148 }
166 » }) 149 » }))
167 150
168 log.Debugf(c, "Collector finished.") 151 log.Debugf(c, "Collector finished.")
169 return nil 152 return nil
170 } 153 }
171 154
155 // processMessage returns true if the message should be ACK'd (deleted from
156 // Pub/Sub) or false if the message should not be ACK'd.
157 func (a *application) processMessage(c context.Context, coll *collector.Collecto r, msg *pubsub.Message) bool {
158 log.Fields{
159 "ackID": msg.AckID,
160 "size": len(msg.Data),
161 }.Infof(c, "Received Pub/Sub Message.")
162
163 startTime := clock.Now(c)
164 err := coll.Process(c, msg.Data)
165 duration := clock.Now(c).Sub(startTime)
166
167 switch {
168 case errors.IsTransient(err):
169 // Do not consume
170 log.Fields{
171 log.ErrorKey: err,
172 "duration": duration,
173 }.Warningf(c, "TRANSIENT error ingesting Pub/Sub message.")
174 return false
175
176 case err == nil:
177 log.Fields{
178 "ackID": msg.AckID,
179 "size": len(msg.Data),
180 "duration": duration,
181 }.Infof(c, "Message successfully processed; ACKing.")
182 return true
183
184 default:
185 log.Fields{
186 log.ErrorKey: err,
187 "ackID": msg.AckID,
188 "size": len(msg.Data),
189 "duration": duration,
190 }.Errorf(c, "Non-transient error ingesting Pub/Sub message; ACKi ng.")
191 return true
192 }
193 }
194
172 // Entry point. 195 // Entry point.
173 func main() { 196 func main() {
174 a := application{} 197 a := application{}
175 a.Run(context.Background(), a.runCollector) 198 a.Run(context.Background(), a.runCollector)
176 } 199 }
OLDNEW
« no previous file with comments | « common/tsmon/monitor/pubsub.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698