| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package main | 5 package main |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "time" | 9 "time" |
| 10 | 10 |
| 11 "github.com/luci/luci-go/common/auth" | 11 "github.com/luci/luci-go/common/auth" |
| 12 "github.com/luci/luci-go/common/clock" | 12 "github.com/luci/luci-go/common/clock" |
| 13 "github.com/luci/luci-go/common/errors" | 13 "github.com/luci/luci-go/common/errors" |
| 14 » "github.com/luci/luci-go/common/gcloud/pubsub" | 14 » gcps "github.com/luci/luci-go/common/gcloud/pubsub" |
| 15 » "github.com/luci/luci-go/common/gcloud/pubsub/ackbuffer" | |
| 16 » "github.com/luci/luci-go/common/gcloud/pubsub/subscriber" | |
| 17 log "github.com/luci/luci-go/common/logging" | 15 log "github.com/luci/luci-go/common/logging" |
| 16 "github.com/luci/luci-go/common/parallel" |
| 18 "github.com/luci/luci-go/server/internal/logdog/collector" | 17 "github.com/luci/luci-go/server/internal/logdog/collector" |
| 19 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" | 18 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" |
| 20 "github.com/luci/luci-go/server/internal/logdog/service" | 19 "github.com/luci/luci-go/server/internal/logdog/service" |
| 21 "golang.org/x/net/context" | 20 "golang.org/x/net/context" |
| 21 "google.golang.org/cloud" |
| 22 "google.golang.org/cloud/pubsub" |
| 22 ) | 23 ) |
| 23 | 24 |
| 24 var ( | 25 var ( |
| 25 errInvalidConfig = errors.New("invalid configuration") | 26 errInvalidConfig = errors.New("invalid configuration") |
| 26 ) | 27 ) |
| 27 | 28 |
| 29 const ( |
| 30 pubsubPullErrorDelay = 10 * time.Second |
| 31 ) |
| 32 |
| 28 // application is the Collector application state. | 33 // application is the Collector application state. |
| 29 type application struct { | 34 type application struct { |
| 30 service.Service | 35 service.Service |
| 31 } | 36 } |
| 32 | 37 |
| 33 // run is the main execution function. | 38 // run is the main execution function. |
| 34 func (a *application) runCollector(c context.Context) error { | 39 func (a *application) runCollector(c context.Context) error { |
| 35 cfg := a.Config() | 40 cfg := a.Config() |
| 36 ccfg := cfg.GetCollector() | 41 ccfg := cfg.GetCollector() |
| 37 if ccfg == nil { | 42 if ccfg == nil { |
| 38 return errors.New("no collector configuration") | 43 return errors.New("no collector configuration") |
| 39 } | 44 } |
| 40 | 45 |
| 41 pscfg := cfg.GetTransport().GetPubsub() | 46 pscfg := cfg.GetTransport().GetPubsub() |
| 42 if pscfg == nil { | 47 if pscfg == nil { |
| 43 return errors.New("missing Pub/Sub configuration") | 48 return errors.New("missing Pub/Sub configuration") |
| 44 } | 49 } |
| 45 | 50 |
| 46 // Our Subscription must be a valid one. | 51 // Our Subscription must be a valid one. |
| 47 » sub := pubsub.NewSubscription(pscfg.Project, pscfg.Subscription) | 52 » sub := gcps.NewSubscription(pscfg.Project, pscfg.Subscription) |
| 48 if err := sub.Validate(); err != nil { | 53 if err := sub.Validate(); err != nil { |
| 49 return fmt.Errorf("invalid Pub/Sub subscription %q: %v", sub, er
r) | 54 return fmt.Errorf("invalid Pub/Sub subscription %q: %v", sub, er
r) |
| 50 } | 55 } |
| 51 | 56 |
| 52 // New PubSub instance with the authenticated client. | 57 // New PubSub instance with the authenticated client. |
| 53 » psClient, err := a.AuthenticatedClient(func(o *auth.Options) { | 58 » psAuth, err := a.Authenticator(func(o *auth.Options) { |
| 54 » » o.Scopes = pubsub.SubscriberScopes | 59 » » o.Scopes = gcps.SubscriberScopes |
| 55 }) | 60 }) |
| 56 if err != nil { | 61 if err != nil { |
| 57 » » log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.") | 62 » » log.WithError(err).Errorf(c, "Failed to create Pub/Sub token sou
rce.") |
| 58 return err | 63 return err |
| 59 } | 64 } |
| 60 | 65 |
| 61 » // Create a retrying Pub/Sub client. | 66 » psClient, err := pubsub.NewClient(c, pscfg.Project, cloud.WithTokenSourc
e(psAuth.TokenSource())) |
| 62 » ps := &pubsub.Retry{ | |
| 63 » » Connection: pubsub.NewConnection(psClient), | |
| 64 » » Callback: func(err error, d time.Duration) { | |
| 65 » » » log.Fields{ | |
| 66 » » » » log.ErrorKey: err, | |
| 67 » » » » "delay": d, | |
| 68 » » » }.Warningf(c, "Transient error encountered; retrying..."
) | |
| 69 » » }, | |
| 70 » } | |
| 71 | |
| 72 » exists, err := ps.SubExists(c, sub) | |
| 73 if err != nil { | 67 if err != nil { |
| 74 log.Fields{ | 68 log.Fields{ |
| 75 log.ErrorKey: err, | 69 log.ErrorKey: err, |
| 70 "subscription": sub, |
| 71 }.Errorf(c, "Failed to create Pub/Sub client.") |
| 72 return err |
| 73 } |
| 74 |
| 75 psSub := psClient.Subscription(pscfg.Subscription) |
| 76 exists, err := psSub.Exists(c) |
| 77 if err != nil { |
| 78 log.Fields{ |
| 79 log.ErrorKey: err, |
| 76 "subscription": sub, | 80 "subscription": sub, |
| 77 }.Errorf(c, "Could not confirm Pub/Sub subscription.") | 81 }.Errorf(c, "Could not confirm Pub/Sub subscription.") |
| 78 return errInvalidConfig | 82 return errInvalidConfig |
| 79 } | 83 } |
| 80 if !exists { | 84 if !exists { |
| 81 log.Fields{ | 85 log.Fields{ |
| 82 "subscription": sub, | 86 "subscription": sub, |
| 83 }.Errorf(c, "Subscription does not exist.") | 87 }.Errorf(c, "Subscription does not exist.") |
| 84 return errInvalidConfig | 88 return errInvalidConfig |
| 85 } | 89 } |
| 86 log.Fields{ | 90 log.Fields{ |
| 87 "subscription": sub, | 91 "subscription": sub, |
| 88 }.Infof(c, "Successfully validated Pub/Sub subscription.") | 92 }.Infof(c, "Successfully validated Pub/Sub subscription.") |
| 89 | 93 |
| 90 st, err := a.IntermediateStorage(c) | 94 st, err := a.IntermediateStorage(c) |
| 91 if err != nil { | 95 if err != nil { |
| 92 return err | 96 return err |
| 93 } | 97 } |
| 94 defer st.Close() | 98 defer st.Close() |
| 95 | 99 |
| 96 // Application shutdown will now operate by cancelling the Collector's | 100 // Application shutdown will now operate by cancelling the Collector's |
| 97 // shutdown Context. | 101 // shutdown Context. |
| 98 shutdownCtx, shutdownFunc := context.WithCancel(c) | 102 shutdownCtx, shutdownFunc := context.WithCancel(c) |
| 99 a.SetShutdownFunc(shutdownFunc) | 103 a.SetShutdownFunc(shutdownFunc) |
| 100 | 104 |
| 101 // Start an ACK buffer so that we can batch ACKs. Note that we do NOT us
e the | |
| 102 // shutdown context here, as we want clean shutdowns to continue to ack
any | |
| 103 // buffered messages. | |
| 104 ab := ackbuffer.New(c, ackbuffer.Config{ | |
| 105 Ack: ackbuffer.NewACK(ps, sub, 0), | |
| 106 }) | |
| 107 defer ab.CloseAndFlush() | |
| 108 | |
| 109 // Initialize our Collector service object using a caching Coordinator | 105 // Initialize our Collector service object using a caching Coordinator |
| 110 // interface. | 106 // interface. |
| 111 coord := coordinator.NewCoordinator(a.Coordinator()) | 107 coord := coordinator.NewCoordinator(a.Coordinator()) |
| 112 coord = coordinator.NewCache(coord, int(ccfg.StateCacheSize), ccfg.State
CacheExpiration.Duration()) | 108 coord = coordinator.NewCache(coord, int(ccfg.StateCacheSize), ccfg.State
CacheExpiration.Duration()) |
| 113 | 109 |
| 114 coll := collector.Collector{ | 110 coll := collector.Collector{ |
| 115 Coordinator: coord, | 111 Coordinator: coord, |
| 116 Storage: st, | 112 Storage: st, |
| 117 MaxMessageWorkers: int(ccfg.MaxMessageWorkers), | 113 MaxMessageWorkers: int(ccfg.MaxMessageWorkers), |
| 118 } | 114 } |
| 119 defer coll.Close() | 115 defer coll.Close() |
| 120 | 116 |
| 121 » // Execute our main Subscriber loop. It will run until the supplied Cont
ext | 117 » // Execute our main subscription pull loop. It will run until the suppli
ed |
| 122 » // is cancelled. | 118 » // Context is cancelled. |
| 123 » clk := clock.Get(c) | 119 » psIterator, err := psSub.Pull(c) |
| 124 » engine := subscriber.Subscriber{ | 120 » if err != nil { |
| 125 » » S: subscriber.NewSource(ps, sub), | 121 » » log.WithError(err).Errorf(c, "Failed to create Pub/Sub iterator.
") |
| 126 » » A: ab, | 122 » » return err |
| 127 » » Workers: int(ccfg.MaxConcurrentMessages), | |
| 128 } | 123 } |
| 129 » engine.Run(shutdownCtx, func(msg *pubsub.Message) bool { | 124 » defer func() { |
| 130 » » c := log.SetField(c, "messageID", msg.ID) | 125 » » log.Debugf(c, "Waiting for Pub/Sub subscription iterator to stop
...") |
| 131 » » log.Fields{ | 126 » » psIterator.Stop() |
| 132 » » » "ackID": msg.AckID, | 127 » » log.Debugf(c, "Pub/Sub subscription iterator has stopped.") |
| 133 » » » "size": len(msg.Data), | 128 » }() |
| 134 » » }.Infof(c, "Received Pub/Sub Message.") | |
| 135 | 129 |
| 136 » » startTime := clk.Now() | 130 » parallel.Ignore(parallel.Run(int(ccfg.MaxConcurrentMessages), func(taskC
chan<- func() error) { |
| 137 » » err := coll.Process(c, msg.Data) | 131 » » // Loop until shut down. |
| 138 » » duration := clk.Now().Sub(startTime) | 132 » » for shutdownCtx.Err() == nil { |
| 133 » » » msg, err := psIterator.Next() |
| 134 » » » if err != nil { |
| 135 » » » » log.Fields{ |
| 136 » » » » » log.ErrorKey: err, |
| 137 » » » » » "delay": pubsubPullErrorDelay, |
| 138 » » » » }.Errorf(c, "Failed to fetch Pub/Sub message, re
try after delay...") |
| 139 » » » » clock.Sleep(c, pubsubPullErrorDelay) |
| 140 » » » » continue |
| 141 » » » } |
| 139 | 142 |
| 140 » » switch { | 143 » » » taskC <- func() error { |
| 141 » » case errors.IsTransient(err): | 144 » » » » c := log.SetField(c, "messageID", msg.ID) |
| 142 » » » // Do not consume | 145 » » » » msg.Done(a.processMessage(c, &coll, msg)) |
| 143 » » » log.Fields{ | 146 » » » » return nil |
| 144 » » » » log.ErrorKey: err, | 147 » » » } |
| 145 » » » » "duration": duration, | |
| 146 » » » }.Warningf(c, "TRANSIENT error ingesting Pub/Sub message
.") | |
| 147 » » » return false | |
| 148 | |
| 149 » » case err == nil: | |
| 150 » » » log.Fields{ | |
| 151 » » » » "ackID": msg.AckID, | |
| 152 » » » » "size": len(msg.Data), | |
| 153 » » » » "duration": duration, | |
| 154 » » » }.Infof(c, "Message successfully processed; ACKing.") | |
| 155 » » » return true | |
| 156 | |
| 157 » » default: | |
| 158 » » » log.Fields{ | |
| 159 » » » » log.ErrorKey: err, | |
| 160 » » » » "ackID": msg.AckID, | |
| 161 » » » » "size": len(msg.Data), | |
| 162 » » » » "duration": duration, | |
| 163 » » » }.Errorf(c, "Non-transient error ingesting Pub/Sub messa
ge; ACKing.") | |
| 164 » » » return true | |
| 165 } | 148 } |
| 166 » }) | 149 » })) |
| 167 | 150 |
| 168 log.Debugf(c, "Collector finished.") | 151 log.Debugf(c, "Collector finished.") |
| 169 return nil | 152 return nil |
| 170 } | 153 } |
| 171 | 154 |
| 155 // processMessage returns true if the message should be ACK'd (deleted from |
| 156 // Pub/Sub) or false if the message should not be ACK'd. |
| 157 func (a *application) processMessage(c context.Context, coll *collector.Collecto
r, msg *pubsub.Message) bool { |
| 158 log.Fields{ |
| 159 "ackID": msg.AckID, |
| 160 "size": len(msg.Data), |
| 161 }.Infof(c, "Received Pub/Sub Message.") |
| 162 |
| 163 startTime := clock.Now(c) |
| 164 err := coll.Process(c, msg.Data) |
| 165 duration := clock.Now(c).Sub(startTime) |
| 166 |
| 167 switch { |
| 168 case errors.IsTransient(err): |
| 169 // Do not consume |
| 170 log.Fields{ |
| 171 log.ErrorKey: err, |
| 172 "duration": duration, |
| 173 }.Warningf(c, "TRANSIENT error ingesting Pub/Sub message.") |
| 174 return false |
| 175 |
| 176 case err == nil: |
| 177 log.Fields{ |
| 178 "ackID": msg.AckID, |
| 179 "size": len(msg.Data), |
| 180 "duration": duration, |
| 181 }.Infof(c, "Message successfully processed; ACKing.") |
| 182 return true |
| 183 |
| 184 default: |
| 185 log.Fields{ |
| 186 log.ErrorKey: err, |
| 187 "ackID": msg.AckID, |
| 188 "size": len(msg.Data), |
| 189 "duration": duration, |
| 190 }.Errorf(c, "Non-transient error ingesting Pub/Sub message; ACKi
ng.") |
| 191 return true |
| 192 } |
| 193 } |
| 194 |
| 172 // Entry point. | 195 // Entry point. |
| 173 func main() { | 196 func main() { |
| 174 a := application{} | 197 a := application{} |
| 175 a.Run(context.Background(), a.runCollector) | 198 a.Run(context.Background(), a.runCollector) |
| 176 } | 199 } |
| OLD | NEW |