| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package main |
| 6 |
| 7 import ( |
| 8 "flag" |
| 9 "fmt" |
| 10 "os" |
| 11 "time" |
| 12 |
| 13 "github.com/luci/luci-go/common/auth" |
| 14 "github.com/luci/luci-go/common/errors" |
| 15 "github.com/luci/luci-go/common/gcloud/pubsub" |
| 16 "github.com/luci/luci-go/common/gcloud/pubsub/ackbuffer" |
| 17 "github.com/luci/luci-go/common/gcloud/pubsub/subscriber" |
| 18 log "github.com/luci/luci-go/common/logging" |
| 19 "github.com/luci/luci-go/common/parallel" |
| 20 "github.com/luci/luci-go/server/internal/logdog/collector" |
| 21 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" |
| 22 "github.com/luci/luci-go/server/internal/logdog/service" |
| 23 "golang.org/x/net/context" |
| 24 ) |
| 25 |
| 26 var ( |
| 27 errInvalidConfig = errors.New("invalid configuration") |
| 28 ) |
| 29 |
| 30 // application is the Collector application state. |
| 31 type application struct { |
| 32 *service.Service |
| 33 |
| 34 // shutdownCtx is a Context that will be cancelled if our application |
| 35 // receives a shutdown signal. |
| 36 shutdownCtx context.Context |
| 37 } |
| 38 |
| 39 // run is the main execution function. |
| 40 func (a *application) runCollector() error { |
| 41 cfg := a.Config() |
| 42 ccfg := cfg.GetCollector() |
| 43 if ccfg == nil { |
| 44 return errors.New("no collector configuration") |
| 45 } |
| 46 |
| 47 pscfg := cfg.GetTransport().GetPubsub() |
| 48 if pscfg == nil { |
| 49 return errors.New("missing Pub/Sub configuration") |
| 50 } |
| 51 |
| 52 // Our Subscription must be a valid one. |
| 53 sub := pubsub.NewSubscription(pscfg.Project, pscfg.Subscription) |
| 54 if err := sub.Validate(); err != nil { |
| 55 return fmt.Errorf("invalid Pub/Sub subscription %q: %v", sub, er
r) |
| 56 } |
| 57 |
| 58 // New PubSub instance with the authenticated client. |
| 59 psClient, err := a.AuthenticatedClient(func(o *auth.Options) { |
| 60 o.Scopes = pubsub.SubscriberScopes |
| 61 }) |
| 62 if err != nil { |
| 63 log.WithError(err).Errorf(a, "Failed to create Pub/Sub client.") |
| 64 return err |
| 65 } |
| 66 |
| 67 // Create a retrying Pub/Sub client. |
| 68 ps := &pubsub.Retry{ |
| 69 Connection: pubsub.NewConnection(psClient), |
| 70 Callback: func(err error, d time.Duration) { |
| 71 log.Fields{ |
| 72 log.ErrorKey: err, |
| 73 "delay": d, |
| 74 }.Warningf(a, "Transient error encountered; retrying..."
) |
| 75 }, |
| 76 } |
| 77 |
| 78 exists, err := ps.SubExists(a, sub) |
| 79 if err != nil { |
| 80 log.Fields{ |
| 81 log.ErrorKey: err, |
| 82 "subscription": sub, |
| 83 }.Errorf(a, "Could not confirm Pub/Sub subscription.") |
| 84 return errInvalidConfig |
| 85 } |
| 86 if !exists { |
| 87 log.Fields{ |
| 88 "subscription": sub, |
| 89 }.Errorf(a, "Subscription does not exist.") |
| 90 return errInvalidConfig |
| 91 } |
| 92 log.Fields{ |
| 93 "subscription": sub, |
| 94 }.Infof(a, "Successfully validated Pub/Sub subscription.") |
| 95 |
| 96 // Initialize our Storage. |
| 97 s, err := a.Storage() |
| 98 if err != nil { |
| 99 log.WithError(err).Errorf(a, "Failed to get storage instance.") |
| 100 return err |
| 101 } |
| 102 defer s.Close() |
| 103 |
| 104 // Application shutdown will now operate by cancelling the Collector's |
| 105 // shutdown Context. |
| 106 shutdownCtx, shutdownFunc := context.WithCancel(a) |
| 107 a.SetShutdownFunc(shutdownFunc) |
| 108 defer a.SetShutdownFunc(nil) |
| 109 |
| 110 // Start an ACK buffer so that we can batch ACKs. |
| 111 ab := ackbuffer.New(a, ackbuffer.Config{ |
| 112 Ack: ackbuffer.NewACK(ps, sub, 0), |
| 113 }) |
| 114 defer ab.CloseAndFlush() |
| 115 |
| 116 // Initialize our Collector service object using a caching Coordinator |
| 117 // interface. |
| 118 coord := coordinator.NewCoordinator(a.Coordinator()) |
| 119 coord = coordinator.NewCache(coord, int(ccfg.StateCacheSize), ccfg.State
CacheExpiration.Duration()) |
| 120 coll := collector.Collector{ |
| 121 Coordinator: coord, |
| 122 Storage: s, |
| 123 Sem: make(parallel.Semaphore, int(ccfg.Workers)), |
| 124 } |
| 125 |
| 126 // Execute our main Subscriber loop. It will run until the supplied Cont
ext |
| 127 // is cancelled. |
| 128 engine := subscriber.Subscriber{ |
| 129 S: subscriber.NewSource(ps, sub, 0), |
| 130 A: ab, |
| 131 |
| 132 PullWorkers: int(ccfg.TransportWorkers), |
| 133 HandlerWorkers: int(ccfg.Workers), |
| 134 } |
| 135 engine.Run(shutdownCtx, func(msg *pubsub.Message) bool { |
| 136 ctx := log.SetFields(a, log.Fields{ |
| 137 "messageID": msg.ID, |
| 138 "size": len(msg.Data), |
| 139 "ackID": msg.AckID, |
| 140 }) |
| 141 |
| 142 if err := coll.Process(ctx, msg.Data); err != nil { |
| 143 if errors.IsTransient(err) { |
| 144 // Do not consume |
| 145 log.Fields{ |
| 146 log.ErrorKey: err, |
| 147 "msgID": msg.ID, |
| 148 "size": len(msg.Data), |
| 149 }.Warningf(ctx, "TRANSIENT error ingesting Pub/S
ub message.") |
| 150 return false |
| 151 } |
| 152 |
| 153 log.Fields{ |
| 154 log.ErrorKey: err, |
| 155 "msgID": msg.ID, |
| 156 "size": len(msg.Data), |
| 157 }.Errorf(ctx, "Error ingesting Pub/Sub message.") |
| 158 } |
| 159 return true |
| 160 }) |
| 161 |
| 162 log.Debugf(a, "Collector finished.") |
| 163 return nil |
| 164 } |
| 165 |
| 166 // mainImpl is the Main implementaion, and returns the application return code |
| 167 // as an integer. |
| 168 func mainImpl() int { |
| 169 a := application{ |
| 170 Service: service.New(context.Background()), |
| 171 } |
| 172 |
| 173 fs := flag.FlagSet{} |
| 174 a.AddFlags(&fs) |
| 175 |
| 176 if err := fs.Parse(os.Args[1:]); err != nil { |
| 177 log.Errorf(log.SetError(a, err), "Failed to parse command-line."
) |
| 178 return 1 |
| 179 } |
| 180 |
| 181 // Run our configured application instance. |
| 182 var rc int |
| 183 if err := a.Run(a.runCollector); err != nil { |
| 184 log.Errorf(log.SetError(a, err), "Application execution failed."
) |
| 185 return 1 |
| 186 } |
| 187 log.Infof(log.SetField(a, "returnCode", rc), "Terminating.") |
| 188 return 0 |
| 189 } |
| 190 |
| 191 // Entry point. |
| 192 func main() { |
| 193 os.Exit(mainImpl()) |
| 194 } |
| OLD | NEW |