Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | |
|
dnj (Google)
2016/01/21 04:36:24
One of the main files to review.
| |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package main | |
| 6 | |
| 7 import ( | |
| 8 "flag" | |
| 9 "fmt" | |
| 10 "os" | |
| 11 "time" | |
| 12 | |
| 13 "github.com/luci/luci-go/common/auth" | |
| 14 "github.com/luci/luci-go/common/errors" | |
| 15 "github.com/luci/luci-go/common/gcloud/gcps" | |
| 16 "github.com/luci/luci-go/common/gcloud/gcps/ackbuffer" | |
| 17 "github.com/luci/luci-go/common/gcloud/gcps/subscriber" | |
| 18 log "github.com/luci/luci-go/common/logging" | |
| 19 "github.com/luci/luci-go/common/parallel" | |
| 20 "github.com/luci/luci-go/server/internal/logdog/collector" | |
| 21 "github.com/luci/luci-go/server/internal/logdog/service" | |
| 22 "golang.org/x/net/context" | |
| 23 "google.golang.org/cloud/pubsub" | |
| 24 ) | |
| 25 | |
| 26 var ( | |
| 27 errInvalidConfig = errors.New("invalid configuration") | |
| 28 ) | |
| 29 | |
| 30 // application is the Collector application state. | |
| 31 type application struct { | |
| 32 *service.Service | |
| 33 | |
| 34 // shutdownCtx is a Context that will be cancelled if our application | |
| 35 // receives a shutdown signal. | |
| 36 shutdownCtx context.Context | |
| 37 } | |
| 38 | |
| 39 // run is the main execution function. | |
| 40 func (a *application) runCollector() error { | |
| 41 cfg := a.Config() | |
| 42 ccfg := cfg.GetCollector() | |
| 43 if ccfg == nil { | |
| 44 return errors.New("no collector configuration") | |
| 45 } | |
| 46 | |
| 47 pscfg := cfg.GetTransport().GetPubsub() | |
| 48 switch { | |
| 49 case pscfg == nil: | |
| 50 return errors.New("missing Pub/Sub configuration") | |
| 51 case pscfg.Project == "": | |
| 52 return errors.New("missing required Pub/Sub project") | |
| 53 case pscfg.Subscription == "": | |
| 54 return errors.New("missing required subscription name") | |
| 55 } | |
| 56 | |
| 57 // Our Subscription must be a valid one. | |
| 58 sub := gcps.Subscription(pscfg.Subscription) | |
| 59 if err := sub.Validate(); err != nil { | |
| 60 return fmt.Errorf("invalid subscription name: %v", err) | |
| 61 } | |
| 62 | |
| 63 // New PubSub instance with the authenticated client. | |
| 64 psClient, err := a.AuthenticatedClient(func(o *auth.Options) { | |
| 65 o.Scopes = gcps.SubscriberScopes | |
| 66 }) | |
| 67 if err != nil { | |
| 68 log.WithError(err).Errorf(a, "Failed to create Pub/Sub client.") | |
| 69 return err | |
| 70 } | |
| 71 | |
| 72 // Create a retrying Pub/Sub client. | |
| 73 ps := gcps.New(psClient, pscfg.Project) | |
| 74 if err != nil { | |
| 75 log.Errorf(log.SetError(a, err), "Failed to create Pub/Sub insta nce.") | |
| 76 return errInvalidConfig | |
| 77 } | |
| 78 ps = &gcps.Retry{ | |
| 79 PS: ps, | |
| 80 C: func(err error, d time.Duration) { | |
| 81 log.Fields{ | |
| 82 log.ErrorKey: err, | |
| 83 "delay": d, | |
| 84 }.Warningf(a, "Transient error encountered; retrying..." ) | |
| 85 }, | |
| 86 } | |
| 87 | |
| 88 exists, err := ps.SubExists(a, sub) | |
| 89 if err != nil { | |
| 90 log.Fields{ | |
| 91 log.ErrorKey: err, | |
| 92 "subscription": pscfg.Subscription, | |
| 93 }.Errorf(a, "Could not confirm Pub/Sub subscription.") | |
| 94 return errInvalidConfig | |
| 95 } | |
| 96 if !exists { | |
| 97 log.Fields{ | |
| 98 "subscription": pscfg.Subscription, | |
| 99 }.Errorf(a, "Subscription does not exist.") | |
| 100 return errInvalidConfig | |
| 101 } | |
| 102 log.Fields{ | |
| 103 "subscription": sub, | |
| 104 }.Infof(a, "Successfully validated Pub/Sub subscription.") | |
| 105 | |
| 106 // Initialize our Storage. | |
| 107 s, err := a.Storage() | |
| 108 if err != nil { | |
| 109 log.WithError(err).Errorf(a, "Failed to get storage instance.") | |
| 110 return err | |
| 111 } | |
| 112 defer s.Close() | |
| 113 | |
| 114 // Application shutdown will now operate by cancelling the Collector's | |
| 115 // shutdown Context. | |
| 116 shutdownCtx, shutdownFunc := context.WithCancel(a) | |
| 117 a.SetShutdownFunc(shutdownFunc) | |
| 118 defer a.SetShutdownFunc(nil) | |
| 119 | |
| 120 // Start an ACK buffer so that we can batch ACKs. | |
| 121 ab := ackbuffer.New(a, ackbuffer.Config{ | |
| 122 Ack: ackbuffer.NewACK(ps, sub, 0), | |
| 123 }) | |
| 124 defer ab.CloseAndFlush() | |
| 125 | |
| 126 // Initialize our Collector service object. | |
| 127 coll := collector.New(collector.Options{ | |
| 128 Coordinator: a.Coordinator(), | |
| 129 Storage: s, | |
| 130 StreamStateCacheExpire: ccfg.StateCacheExpiration.Duration(), | |
| 131 Sem: make(parallel.Semaphore, int(ccfg.Workers)), | |
| 132 }) | |
| 133 | |
| 134 // Execute our main Subscriber loop. It will run until the supplied Cont ext | |
| 135 // is cancelled. | |
| 136 engine := subscriber.Subscriber{ | |
| 137 S: subscriber.NewSource(ps, sub, 0), | |
| 138 A: ab, | |
| 139 | |
| 140 Workers: int(ccfg.TransportWorkers), | |
| 141 HandlerSem: make(parallel.Semaphore, int(ccfg.Workers)), | |
| 142 } | |
| 143 engine.Run(shutdownCtx, func(msg *pubsub.Message) bool { | |
| 144 ctx := log.SetFields(a, log.Fields{ | |
| 145 "messageID": msg.ID, | |
| 146 "size": len(msg.Data), | |
| 147 "ackID": msg.AckID, | |
| 148 }) | |
| 149 | |
| 150 if err := coll.Process(ctx, msg.Data); err != nil { | |
| 151 if errors.IsTransient(err) { | |
| 152 // Do not consume | |
| 153 log.Fields{ | |
| 154 log.ErrorKey: err, | |
| 155 "msgID": msg.ID, | |
| 156 "size": len(msg.Data), | |
| 157 }.Warningf(ctx, "TRANSIENT error ingesting Pub/S ub message.") | |
| 158 return false | |
| 159 } | |
| 160 | |
| 161 log.Fields{ | |
| 162 log.ErrorKey: err, | |
| 163 "msgID": msg.ID, | |
| 164 "size": len(msg.Data), | |
| 165 }.Errorf(ctx, "Error ingesting Pub/Sub message.") | |
| 166 } | |
| 167 return true | |
| 168 }) | |
| 169 | |
| 170 log.Debugf(a, "Collector finished.") | |
| 171 return nil | |
| 172 } | |
| 173 | |
| 174 // mainImpl is the Main implementaion, and returns the application return code | |
| 175 // as an integer. | |
| 176 func mainImpl() int { | |
| 177 a := application{ | |
| 178 Service: service.New(context.Background()), | |
| 179 } | |
| 180 | |
| 181 fs := flag.FlagSet{} | |
| 182 a.AddFlags(&fs) | |
| 183 | |
| 184 if err := fs.Parse(os.Args[1:]); err != nil { | |
| 185 log.Errorf(log.SetError(a, err), "Failed to parse command-line." ) | |
| 186 return 1 | |
| 187 } | |
| 188 | |
| 189 // Run our configured application instance. | |
| 190 var rc int | |
| 191 if err := a.Run(a.runCollector); err != nil { | |
| 192 log.Errorf(log.SetError(a, err), "Application execution failed." ) | |
| 193 return 1 | |
| 194 } | |
| 195 log.Infof(log.SetField(a, "returnCode", rc), "Terminating.") | |
| 196 return 0 | |
| 197 } | |
| 198 | |
| 199 // Entry point. | |
| 200 func main() { | |
| 201 os.Exit(mainImpl()) | |
| 202 } | |
| OLD | NEW |