| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package main |
| 6 |
| 7 import ( |
| 8 "flag" |
| 9 "fmt" |
| 10 "os" |
| 11 "time" |
| 12 |
| 13 "github.com/luci/luci-go/common/auth" |
| 14 "github.com/luci/luci-go/common/errors" |
| 15 "github.com/luci/luci-go/common/gcloud/gcps" |
| 16 "github.com/luci/luci-go/common/gcloud/gcps/ackbuffer" |
| 17 "github.com/luci/luci-go/common/gcloud/gcps/subscriber" |
| 18 log "github.com/luci/luci-go/common/logging" |
| 19 "github.com/luci/luci-go/common/parallel" |
| 20 "github.com/luci/luci-go/server/internal/logdog/collector" |
| 21 "github.com/luci/luci-go/server/internal/logdog/service" |
| 22 "golang.org/x/net/context" |
| 23 "google.golang.org/cloud/pubsub" |
| 24 ) |
| 25 |
| 26 var ( |
| 27 errInvalidConfig = errors.New("invalid configuration") |
| 28 ) |
| 29 |
| 30 // application is the Collector application state. |
| 31 type application struct { |
| 32 *service.Service |
| 33 |
| 34 // shutdownCtx is a Context that will be cancelled if our application |
| 35 // receives a shutdown signal. |
| 36 shutdownCtx context.Context |
| 37 } |
| 38 |
| 39 // run is the main execution function. |
| 40 func (a *application) runCollector() error { |
| 41 cfg := a.Config() |
| 42 ccfg := cfg.GetCollector() |
| 43 if ccfg == nil { |
| 44 return errors.New("no collector configuration") |
| 45 } |
| 46 |
| 47 pscfg := cfg.GetTransport().GetPubsub() |
| 48 switch { |
| 49 case pscfg == nil: |
| 50 return errors.New("missing Pub/Sub configuration") |
| 51 case pscfg.Project == "": |
| 52 return errors.New("missing required Pub/Sub project") |
| 53 case pscfg.Subscription == "": |
| 54 return errors.New("missing required subscription name") |
| 55 } |
| 56 |
| 57 // Our Subscription must be a valid one. |
| 58 sub := gcps.Subscription(pscfg.Subscription) |
| 59 if err := sub.Validate(); err != nil { |
| 60 return fmt.Errorf("invalid subscription name: %v", err) |
| 61 } |
| 62 |
| 63 // New PubSub instance with the authenticated client. |
| 64 psClient, err := a.AuthenticatedClient(func(o *auth.Options) { |
| 65 o.Scopes = gcps.SubscriberScopes |
| 66 }) |
| 67 if err != nil { |
| 68 log.WithError(err).Errorf(a, "Failed to create Pub/Sub client.") |
| 69 return err |
| 70 } |
| 71 |
| 72 // Create a retrying Pub/Sub client. |
| 73 ps := gcps.New(psClient, pscfg.Project) |
| 74 if err != nil { |
| 75 log.Errorf(log.SetError(a, err), "Failed to create Pub/Sub insta
nce.") |
| 76 return errInvalidConfig |
| 77 } |
| 78 ps = &gcps.Retry{ |
| 79 PS: ps, |
| 80 C: func(err error, d time.Duration) { |
| 81 log.Fields{ |
| 82 log.ErrorKey: err, |
| 83 "delay": d, |
| 84 }.Warningf(a, "Transient error encountered; retrying..."
) |
| 85 }, |
| 86 } |
| 87 |
| 88 exists, err := ps.SubExists(a, sub) |
| 89 if err != nil { |
| 90 log.Fields{ |
| 91 log.ErrorKey: err, |
| 92 "subscription": pscfg.Subscription, |
| 93 }.Errorf(a, "Could not confirm Pub/Sub subscription.") |
| 94 return errInvalidConfig |
| 95 } |
| 96 if !exists { |
| 97 log.Fields{ |
| 98 "subscription": pscfg.Subscription, |
| 99 }.Errorf(a, "Subscription does not exist.") |
| 100 return errInvalidConfig |
| 101 } |
| 102 log.Fields{ |
| 103 "subscription": sub, |
| 104 }.Infof(a, "Successfully validated Pub/Sub subscription.") |
| 105 |
| 106 // Initialize our Storage. |
| 107 s, err := a.Storage() |
| 108 if err != nil { |
| 109 log.WithError(err).Errorf(a, "Failed to get storage instance.") |
| 110 return err |
| 111 } |
| 112 defer s.Close() |
| 113 |
| 114 // Application shutdown will now operate by cancelling the Collector's |
| 115 // shutdown Context. |
| 116 shutdownCtx, shutdownFunc := context.WithCancel(a) |
| 117 a.SetShutdownFunc(shutdownFunc) |
| 118 defer a.SetShutdownFunc(nil) |
| 119 |
| 120 // Start an ACK buffer so that we can batch ACKs. |
| 121 ab := ackbuffer.New(a, ackbuffer.Config{ |
| 122 Ack: ackbuffer.NewACK(ps, sub, 0), |
| 123 }) |
| 124 defer ab.CloseAndFlush() |
| 125 |
| 126 // Initialize our Collector service object. |
| 127 coll := collector.New(collector.Options{ |
| 128 Coordinator: a.Coordinator(), |
| 129 Storage: s, |
| 130 StreamStateCacheExpire: ccfg.StateCacheExpiration.Duration(), |
| 131 Sem: make(parallel.Semaphore, int(ccfg.Workers)), |
| 132 }) |
| 133 |
| 134 // Execute our main Subscriber loop. It will run until the supplied Cont
ext |
| 135 // is cancelled. |
| 136 engine := subscriber.Subscriber{ |
| 137 S: subscriber.NewSource(ps, sub, 0), |
| 138 A: ab, |
| 139 |
| 140 Workers: int(ccfg.TransportWorkers), |
| 141 HandlerSem: make(parallel.Semaphore, int(ccfg.Workers)), |
| 142 } |
| 143 engine.Run(shutdownCtx, func(msg *pubsub.Message) bool { |
| 144 ctx := log.SetFields(a, log.Fields{ |
| 145 "messageID": msg.ID, |
| 146 "size": len(msg.Data), |
| 147 "ackID": msg.AckID, |
| 148 }) |
| 149 |
| 150 if err := coll.Process(ctx, msg.Data); err != nil { |
| 151 if errors.IsTransient(err) { |
| 152 // Do not consume |
| 153 log.Fields{ |
| 154 log.ErrorKey: err, |
| 155 "msgID": msg.ID, |
| 156 "size": len(msg.Data), |
| 157 }.Warningf(ctx, "TRANSIENT error ingesting Pub/S
ub message.") |
| 158 return false |
| 159 } |
| 160 |
| 161 log.Fields{ |
| 162 log.ErrorKey: err, |
| 163 "msgID": msg.ID, |
| 164 "size": len(msg.Data), |
| 165 }.Errorf(ctx, "Error ingesting Pub/Sub message.") |
| 166 } |
| 167 return true |
| 168 }) |
| 169 |
| 170 log.Debugf(a, "Collector finished.") |
| 171 return nil |
| 172 } |
| 173 |
| 174 // mainImpl is the Main implementaion, and returns the application return code |
| 175 // as an integer. |
| 176 func mainImpl() int { |
| 177 a := application{ |
| 178 Service: service.New(context.Background()), |
| 179 } |
| 180 |
| 181 fs := flag.FlagSet{} |
| 182 a.AddFlags(&fs) |
| 183 |
| 184 if err := fs.Parse(os.Args[1:]); err != nil { |
| 185 log.Errorf(log.SetError(a, err), "Failed to parse command-line."
) |
| 186 return 1 |
| 187 } |
| 188 |
| 189 // Run our configured application instance. |
| 190 var rc int |
| 191 if err := a.Run(a.runCollector); err != nil { |
| 192 log.Errorf(log.SetError(a, err), "Application execution failed."
) |
| 193 return 1 |
| 194 } |
| 195 log.Infof(log.SetField(a, "returnCode", rc), "Terminating.") |
| 196 return 0 |
| 197 } |
| 198 |
| 199 // Entry point. |
| 200 func main() { |
| 201 os.Exit(mainImpl()) |
| 202 } |
| OLD | NEW |