| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package main | 5 package main |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "time" | 9 "time" |
| 10 | 10 |
| 11 "github.com/luci/luci-go/common/auth" | 11 "github.com/luci/luci-go/common/auth" |
| 12 "github.com/luci/luci-go/common/clock" | 12 "github.com/luci/luci-go/common/clock" |
| 13 "github.com/luci/luci-go/common/errors" | 13 "github.com/luci/luci-go/common/errors" |
| 14 » "github.com/luci/luci-go/common/gcloud/pubsub" | 14 » gcps "github.com/luci/luci-go/common/gcloud/pubsub" |
| 15 » "github.com/luci/luci-go/common/gcloud/pubsub/ackbuffer" | |
| 16 » "github.com/luci/luci-go/common/gcloud/pubsub/subscriber" | |
| 17 log "github.com/luci/luci-go/common/logging" | 15 log "github.com/luci/luci-go/common/logging" |
| 16 "github.com/luci/luci-go/common/parallel" |
| 18 "github.com/luci/luci-go/server/internal/logdog/collector" | 17 "github.com/luci/luci-go/server/internal/logdog/collector" |
| 19 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" | 18 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" |
| 20 "github.com/luci/luci-go/server/internal/logdog/service" | 19 "github.com/luci/luci-go/server/internal/logdog/service" |
| 21 "golang.org/x/net/context" | 20 "golang.org/x/net/context" |
| 21 "google.golang.org/cloud" |
| 22 "google.golang.org/cloud/pubsub" |
| 22 ) | 23 ) |
| 23 | 24 |
| 24 var ( | 25 var ( |
| 25 errInvalidConfig = errors.New("invalid configuration") | 26 errInvalidConfig = errors.New("invalid configuration") |
| 26 ) | 27 ) |
| 27 | 28 |
| 29 const ( |
| 30 pubsubPullErrorDelay = 10 * time.Second |
| 31 ) |
| 32 |
| 28 // application is the Collector application state. | 33 // application is the Collector application state. |
| 29 type application struct { | 34 type application struct { |
| 30 service.Service | 35 service.Service |
| 31 } | 36 } |
| 32 | 37 |
| 33 // run is the main execution function. | 38 // run is the main execution function. |
| 34 func (a *application) runCollector(c context.Context) error { | 39 func (a *application) runCollector(c context.Context) error { |
| 35 cfg := a.Config() | 40 cfg := a.Config() |
| 36 ccfg := cfg.GetCollector() | 41 ccfg := cfg.GetCollector() |
| 37 if ccfg == nil { | 42 if ccfg == nil { |
| 38 return errors.New("no collector configuration") | 43 return errors.New("no collector configuration") |
| 39 } | 44 } |
| 40 | 45 |
| 41 pscfg := cfg.GetTransport().GetPubsub() | 46 pscfg := cfg.GetTransport().GetPubsub() |
| 42 if pscfg == nil { | 47 if pscfg == nil { |
| 43 return errors.New("missing Pub/Sub configuration") | 48 return errors.New("missing Pub/Sub configuration") |
| 44 } | 49 } |
| 45 | 50 |
| 46 // Our Subscription must be a valid one. | 51 // Our Subscription must be a valid one. |
| 47 » sub := pubsub.NewSubscription(pscfg.Project, pscfg.Subscription) | 52 » sub := gcps.NewSubscription(pscfg.Project, pscfg.Subscription) |
| 48 if err := sub.Validate(); err != nil { | 53 if err := sub.Validate(); err != nil { |
| 49 return fmt.Errorf("invalid Pub/Sub subscription %q: %v", sub, er
r) | 54 return fmt.Errorf("invalid Pub/Sub subscription %q: %v", sub, er
r) |
| 50 } | 55 } |
| 51 | 56 |
| 52 // New PubSub instance with the authenticated client. | 57 // New PubSub instance with the authenticated client. |
| 53 » psClient, err := a.AuthenticatedClient(func(o *auth.Options) { | 58 » psAuth, err := a.Authenticator(func(o *auth.Options) { |
| 54 » » o.Scopes = pubsub.SubscriberScopes | 59 » » o.Scopes = gcps.SubscriberScopes |
| 55 }) | 60 }) |
| 56 if err != nil { | 61 if err != nil { |
| 57 » » log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.") | 62 » » log.WithError(err).Errorf(c, "Failed to create Pub/Sub token sou
rce.") |
| 58 return err | 63 return err |
| 59 } | 64 } |
| 60 | 65 |
| 61 » // Create a retrying Pub/Sub client. | 66 » psClient, err := pubsub.NewClient(c, pscfg.Project, cloud.WithTokenSourc
e(psAuth.TokenSource())) |
| 62 » ps := &pubsub.Retry{ | |
| 63 » » Connection: pubsub.NewConnection(psClient), | |
| 64 » » Callback: func(err error, d time.Duration) { | |
| 65 » » » log.Fields{ | |
| 66 » » » » log.ErrorKey: err, | |
| 67 » » » » "delay": d, | |
| 68 » » » }.Warningf(c, "Transient error encountered; retrying..."
) | |
| 69 » » }, | |
| 70 » } | |
| 71 | |
| 72 » exists, err := ps.SubExists(c, sub) | |
| 73 if err != nil { | 67 if err != nil { |
| 74 log.Fields{ | 68 log.Fields{ |
| 75 log.ErrorKey: err, | 69 log.ErrorKey: err, |
| 70 "subscription": sub, |
| 71 }.Errorf(c, "Failed to create Pub/Sub client.") |
| 72 return err |
| 73 } |
| 74 |
| 75 psSub := psClient.Subscription(pscfg.Subscription) |
| 76 exists, err := psSub.Exists(c) |
| 77 if err != nil { |
| 78 log.Fields{ |
| 79 log.ErrorKey: err, |
| 76 "subscription": sub, | 80 "subscription": sub, |
| 77 }.Errorf(c, "Could not confirm Pub/Sub subscription.") | 81 }.Errorf(c, "Could not confirm Pub/Sub subscription.") |
| 78 return errInvalidConfig | 82 return errInvalidConfig |
| 79 } | 83 } |
| 80 if !exists { | 84 if !exists { |
| 81 log.Fields{ | 85 log.Fields{ |
| 82 "subscription": sub, | 86 "subscription": sub, |
| 83 }.Errorf(c, "Subscription does not exist.") | 87 }.Errorf(c, "Subscription does not exist.") |
| 84 return errInvalidConfig | 88 return errInvalidConfig |
| 85 } | 89 } |
| 86 log.Fields{ | 90 log.Fields{ |
| 87 "subscription": sub, | 91 "subscription": sub, |
| 88 }.Infof(c, "Successfully validated Pub/Sub subscription.") | 92 }.Infof(c, "Successfully validated Pub/Sub subscription.") |
| 89 | 93 |
| 90 // Initialize our Storage. | 94 // Initialize our Storage. |
| 91 s, err := a.IntermediateStorage(c) | 95 s, err := a.IntermediateStorage(c) |
| 92 if err != nil { | 96 if err != nil { |
| 93 log.WithError(err).Errorf(c, "Failed to get storage instance.") | 97 log.WithError(err).Errorf(c, "Failed to get storage instance.") |
| 94 return err | 98 return err |
| 95 } | 99 } |
| 96 defer s.Close() | 100 defer s.Close() |
| 97 | 101 |
| 98 // Application shutdown will now operate by cancelling the Collector's | 102 // Application shutdown will now operate by cancelling the Collector's |
| 99 // shutdown Context. | 103 // shutdown Context. |
| 100 shutdownCtx, shutdownFunc := context.WithCancel(c) | 104 shutdownCtx, shutdownFunc := context.WithCancel(c) |
| 101 a.SetShutdownFunc(shutdownFunc) | 105 a.SetShutdownFunc(shutdownFunc) |
| 102 | 106 |
| 103 // Start an ACK buffer so that we can batch ACKs. Note that we do NOT us
e the | |
| 104 // shutdown context here, as we want clean shutdowns to continue to ack
any | |
| 105 // buffered messages. | |
| 106 ab := ackbuffer.New(c, ackbuffer.Config{ | |
| 107 Ack: ackbuffer.NewACK(ps, sub, 0), | |
| 108 }) | |
| 109 defer ab.CloseAndFlush() | |
| 110 | |
| 111 // Initialize our Collector service object using a caching Coordinator | 107 // Initialize our Collector service object using a caching Coordinator |
| 112 // interface. | 108 // interface. |
| 113 coord := coordinator.NewCoordinator(a.Coordinator()) | 109 coord := coordinator.NewCoordinator(a.Coordinator()) |
| 114 coord = coordinator.NewCache(coord, int(ccfg.StateCacheSize), ccfg.State
CacheExpiration.Duration()) | 110 coord = coordinator.NewCache(coord, int(ccfg.StateCacheSize), ccfg.State
CacheExpiration.Duration()) |
| 115 | 111 |
| 116 coll := collector.Collector{ | 112 coll := collector.Collector{ |
| 117 » » Coordinator: coord, | 113 » » Coordinator: coord, |
| 118 » » Storage: s, | 114 » » Storage: s, |
| 119 » » MaxParallelBundles: int(ccfg.Workers), | |
| 120 » » MaxIngestWorkers: int(ccfg.Workers), | |
| 121 } | 115 } |
| 122 defer coll.Close() | 116 defer coll.Close() |
| 123 | 117 |
| 124 » // Execute our main Subscriber loop. It will run until the supplied Cont
ext | 118 » // Execute our main subscription pull loop. It will run until the suppli
ed |
| 125 » // is cancelled. | 119 » // Context is cancelled. |
| 126 » clk := clock.Get(c) | 120 » psIterator, err := psSub.Pull(c) |
| 127 » engine := subscriber.Subscriber{ | 121 » if err != nil { |
| 128 » » S: subscriber.NewSource(ps, sub, 0), | 122 » » log.WithError(err).Errorf(c, "Failed to create Pub/Sub iterator.
") |
| 129 » » A: ab, | 123 » » return err |
| 124 » } |
| 125 » defer func() { |
| 126 » » log.Debugf(c, "Waiting for Pub/Sub subscription iterator to stop
...") |
| 127 » » psIterator.Stop() |
| 128 » » log.Debugf(c, "Pub/Sub subscription iterator has stopped.") |
| 129 » }() |
| 130 | 130 |
| 131 » » PullWorkers: int(ccfg.TransportWorkers), | 131 » parallel.Ignore(parallel.Run(int(ccfg.TransportConcurrentMessages), func
(taskC chan<- func() error) { |
| 132 » » HandlerWorkers: int(ccfg.Workers), | 132 » » // Loop until shut down. |
| 133 » } | 133 » » for shutdownCtx.Err() == nil { |
| 134 » engine.Run(shutdownCtx, func(msg *pubsub.Message) bool { | 134 » » » msg, err := psIterator.Next() |
| 135 » » c := log.SetField(c, "messageID", msg.ID) | 135 » » » if err != nil { |
| 136 » » log.Fields{ | 136 » » » » log.Fields{ |
| 137 » » » "ackID": msg.AckID, | 137 » » » » » log.ErrorKey: err, |
| 138 » » » "size": len(msg.Data), | 138 » » » » » "delay": pubsubPullErrorDelay, |
| 139 » » }.Infof(c, "Received Pub/Sub Message.") | 139 » » » » }.Errorf(c, "Failed to fetch Pub/Sub message, re
try after delay...") |
| 140 » » » » clock.Sleep(c, pubsubPullErrorDelay) |
| 141 » » » » continue |
| 142 » » » } |
| 140 | 143 |
| 141 » » startTime := clk.Now() | 144 » » » taskC <- func() error { |
| 142 » » err := coll.Process(c, msg.Data) | 145 » » » » c := log.SetField(c, "messageID", msg.ID) |
| 143 » » duration := clk.Now().Sub(startTime) | 146 » » » » msg.Done(a.processMessage(c, &coll, msg)) |
| 144 | 147 » » » » return nil |
| 145 » » switch { | 148 » » » } |
| 146 » » case errors.IsTransient(err): | |
| 147 » » » // Do not consume | |
| 148 » » » log.Fields{ | |
| 149 » » » » log.ErrorKey: err, | |
| 150 » » » » "duration": duration, | |
| 151 » » » }.Warningf(c, "TRANSIENT error ingesting Pub/Sub message
.") | |
| 152 » » » return false | |
| 153 | |
| 154 » » case err == nil: | |
| 155 » » » log.Fields{ | |
| 156 » » » » "ackID": msg.AckID, | |
| 157 » » » » "size": len(msg.Data), | |
| 158 » » » » "duration": duration, | |
| 159 » » » }.Infof(c, "Message successfully processed; ACKing.") | |
| 160 » » » return true | |
| 161 | |
| 162 » » default: | |
| 163 » » » log.Fields{ | |
| 164 » » » » log.ErrorKey: err, | |
| 165 » » » » "ackID": msg.AckID, | |
| 166 » » » » "size": len(msg.Data), | |
| 167 » » » » "duration": duration, | |
| 168 » » » }.Errorf(c, "Non-transient error ingesting Pub/Sub messa
ge; ACKing.") | |
| 169 » » » return true | |
| 170 } | 149 } |
| 171 » }) | 150 » })) |
| 172 | 151 |
| 173 log.Debugf(c, "Collector finished.") | 152 log.Debugf(c, "Collector finished.") |
| 174 return nil | 153 return nil |
| 175 } | 154 } |
| 176 | 155 |
| 156 // processMessage returns true if the message should be ACK'd (deleted from |
| 157 // Pub/Sub) or false if the message should not be ACK'd. |
| 158 func (a *application) processMessage(c context.Context, coll *collector.Collecto
r, msg *pubsub.Message) bool { |
| 159 log.Fields{ |
| 160 "ackID": msg.AckID, |
| 161 "size": len(msg.Data), |
| 162 }.Infof(c, "Received Pub/Sub Message.") |
| 163 |
| 164 startTime := clock.Now(c) |
| 165 err := coll.Process(c, msg.Data) |
| 166 duration := clock.Now(c).Sub(startTime) |
| 167 |
| 168 switch { |
| 169 case errors.IsTransient(err): |
| 170 // Do not consume |
| 171 log.Fields{ |
| 172 log.ErrorKey: err, |
| 173 "duration": duration, |
| 174 }.Warningf(c, "TRANSIENT error ingesting Pub/Sub message.") |
| 175 return false |
| 176 |
| 177 case err == nil: |
| 178 log.Fields{ |
| 179 "ackID": msg.AckID, |
| 180 "size": len(msg.Data), |
| 181 "duration": duration, |
| 182 }.Infof(c, "Message successfully processed; ACKing.") |
| 183 return true |
| 184 |
| 185 default: |
| 186 log.Fields{ |
| 187 log.ErrorKey: err, |
| 188 "ackID": msg.AckID, |
| 189 "size": len(msg.Data), |
| 190 "duration": duration, |
| 191 }.Errorf(c, "Non-transient error ingesting Pub/Sub message; ACKi
ng.") |
| 192 return true |
| 193 } |
| 194 } |
| 195 |
| 177 // Entry point. | 196 // Entry point. |
| 178 func main() { | 197 func main() { |
| 179 a := application{} | 198 a := application{} |
| 180 a.Run(context.Background(), a.runCollector) | 199 a.Run(context.Background(), a.runCollector) |
| 181 } | 200 } |
| OLD | NEW |