| Index: server/cmd/logdog_collector/main.go
|
| diff --git a/server/cmd/logdog_collector/main.go b/server/cmd/logdog_collector/main.go
|
| index ffc2997769dab18317a4d5209fde8880aeaf29e2..9ea89b23c4264bf9b6f7acef50d9925341352c25 100644
|
| --- a/server/cmd/logdog_collector/main.go
|
| +++ b/server/cmd/logdog_collector/main.go
|
| @@ -11,20 +11,25 @@ import (
|
| "github.com/luci/luci-go/common/auth"
|
| "github.com/luci/luci-go/common/clock"
|
| "github.com/luci/luci-go/common/errors"
|
| - "github.com/luci/luci-go/common/gcloud/pubsub"
|
| - "github.com/luci/luci-go/common/gcloud/pubsub/ackbuffer"
|
| - "github.com/luci/luci-go/common/gcloud/pubsub/subscriber"
|
| + gcps "github.com/luci/luci-go/common/gcloud/pubsub"
|
| log "github.com/luci/luci-go/common/logging"
|
| + "github.com/luci/luci-go/common/parallel"
|
| "github.com/luci/luci-go/server/internal/logdog/collector"
|
| "github.com/luci/luci-go/server/internal/logdog/collector/coordinator"
|
| "github.com/luci/luci-go/server/internal/logdog/service"
|
| "golang.org/x/net/context"
|
| + "google.golang.org/cloud"
|
| + "google.golang.org/cloud/pubsub"
|
| )
|
|
|
| var (
|
| errInvalidConfig = errors.New("invalid configuration")
|
| )
|
|
|
| +const (
|
| + pubsubPullErrorDelay = 10 * time.Second
|
| +)
|
| +
|
| // application is the Collector application state.
|
| type application struct {
|
| service.Service
|
| @@ -44,32 +49,31 @@ func (a *application) runCollector(c context.Context) error {
|
| }
|
|
|
| // Our Subscription must be a valid one.
|
| - sub := pubsub.NewSubscription(pscfg.Project, pscfg.Subscription)
|
| + sub := gcps.NewSubscription(pscfg.Project, pscfg.Subscription)
|
| if err := sub.Validate(); err != nil {
|
| return fmt.Errorf("invalid Pub/Sub subscription %q: %v", sub, err)
|
| }
|
|
|
| // New PubSub instance with the authenticated client.
|
| - psClient, err := a.AuthenticatedClient(func(o *auth.Options) {
|
| - o.Scopes = pubsub.SubscriberScopes
|
| + psAuth, err := a.Authenticator(func(o *auth.Options) {
|
| + o.Scopes = gcps.SubscriberScopes
|
| })
|
| if err != nil {
|
| - log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.")
|
| + log.WithError(err).Errorf(c, "Failed to create Pub/Sub token source.")
|
| return err
|
| }
|
|
|
| - // Create a retrying Pub/Sub client.
|
| - ps := &pubsub.Retry{
|
| - Connection: pubsub.NewConnection(psClient),
|
| - Callback: func(err error, d time.Duration) {
|
| - log.Fields{
|
| - log.ErrorKey: err,
|
| - "delay": d,
|
| - }.Warningf(c, "Transient error encountered; retrying...")
|
| - },
|
| + psClient, err := pubsub.NewClient(c, pscfg.Project, cloud.WithTokenSource(psAuth.TokenSource()))
|
| + if err != nil {
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "subscription": sub,
|
| + }.Errorf(c, "Failed to create Pub/Sub client.")
|
| + return err
|
| }
|
|
|
| - exists, err := ps.SubExists(c, sub)
|
| + psSub := psClient.Subscription(pscfg.Subscription)
|
| + exists, err := psSub.Exists(c)
|
| if err != nil {
|
| log.Fields{
|
| log.ErrorKey: err,
|
| @@ -98,14 +102,6 @@ func (a *application) runCollector(c context.Context) error {
|
| shutdownCtx, shutdownFunc := context.WithCancel(c)
|
| a.SetShutdownFunc(shutdownFunc)
|
|
|
| - // Start an ACK buffer so that we can batch ACKs. Note that we do NOT use the
|
| - // shutdown context here, as we want clean shutdowns to continue to ack any
|
| - // buffered messages.
|
| - ab := ackbuffer.New(c, ackbuffer.Config{
|
| - Ack: ackbuffer.NewACK(ps, sub, 0),
|
| - })
|
| - defer ab.CloseAndFlush()
|
| -
|
| // Initialize our Collector service object using a caching Coordinator
|
| // interface.
|
| coord := coordinator.NewCoordinator(a.Coordinator())
|
| @@ -118,57 +114,84 @@ func (a *application) runCollector(c context.Context) error {
|
| }
|
| defer coll.Close()
|
|
|
| - // Execute our main Subscriber loop. It will run until the supplied Context
|
| - // is cancelled.
|
| - clk := clock.Get(c)
|
| - engine := subscriber.Subscriber{
|
| - S: subscriber.NewSource(ps, sub),
|
| - A: ab,
|
| - Workers: int(ccfg.MaxConcurrentMessages),
|
| + // Execute our main subscription pull loop. It will run until the supplied
|
| + // Context is cancelled.
|
| + psIterator, err := psSub.Pull(c)
|
| + if err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to create Pub/Sub iterator.")
|
| + return err
|
| }
|
| - engine.Run(shutdownCtx, func(msg *pubsub.Message) bool {
|
| - c := log.SetField(c, "messageID", msg.ID)
|
| - log.Fields{
|
| - "ackID": msg.AckID,
|
| - "size": len(msg.Data),
|
| - }.Infof(c, "Received Pub/Sub Message.")
|
| -
|
| - startTime := clk.Now()
|
| - err := coll.Process(c, msg.Data)
|
| - duration := clk.Now().Sub(startTime)
|
| -
|
| - switch {
|
| - case errors.IsTransient(err):
|
| - // Do not consume
|
| - log.Fields{
|
| - log.ErrorKey: err,
|
| - "duration": duration,
|
| - }.Warningf(c, "TRANSIENT error ingesting Pub/Sub message.")
|
| - return false
|
| -
|
| - case err == nil:
|
| - log.Fields{
|
| - "ackID": msg.AckID,
|
| - "size": len(msg.Data),
|
| - "duration": duration,
|
| - }.Infof(c, "Message successfully processed; ACKing.")
|
| - return true
|
| -
|
| - default:
|
| - log.Fields{
|
| - log.ErrorKey: err,
|
| - "ackID": msg.AckID,
|
| - "size": len(msg.Data),
|
| - "duration": duration,
|
| - }.Errorf(c, "Non-transient error ingesting Pub/Sub message; ACKing.")
|
| - return true
|
| + defer func() {
|
| + log.Debugf(c, "Waiting for Pub/Sub subscription iterator to stop...")
|
| + psIterator.Stop()
|
| + log.Debugf(c, "Pub/Sub subscription iterator has stopped.")
|
| + }()
|
| +
|
| + parallel.Ignore(parallel.Run(int(ccfg.MaxConcurrentMessages), func(taskC chan<- func() error) {
|
| + // Loop until shut down.
|
| + for shutdownCtx.Err() == nil {
|
| + msg, err := psIterator.Next()
|
| + if err != nil {
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "delay": pubsubPullErrorDelay,
|
| + }.Errorf(c, "Failed to fetch Pub/Sub message, retry after delay...")
|
| + clock.Sleep(c, pubsubPullErrorDelay)
|
| + continue
|
| + }
|
| +
|
| + taskC <- func() error {
|
| + c := log.SetField(c, "messageID", msg.ID)
|
| + msg.Done(a.processMessage(c, &coll, msg))
|
| + return nil
|
| + }
|
| }
|
| - })
|
| + }))
|
|
|
| log.Debugf(c, "Collector finished.")
|
| return nil
|
| }
|
|
|
| +// processMessage returns true if the message should be ACK'd (deleted from
|
| +// Pub/Sub) or false if the message should not be ACK'd.
|
| +func (a *application) processMessage(c context.Context, coll *collector.Collector, msg *pubsub.Message) bool {
|
| + log.Fields{
|
| + "ackID": msg.AckID,
|
| + "size": len(msg.Data),
|
| + }.Infof(c, "Received Pub/Sub Message.")
|
| +
|
| + startTime := clock.Now(c)
|
| + err := coll.Process(c, msg.Data)
|
| + duration := clock.Now(c).Sub(startTime)
|
| +
|
| + switch {
|
| + case errors.IsTransient(err):
|
| + // Do not consume
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "duration": duration,
|
| + }.Warningf(c, "TRANSIENT error ingesting Pub/Sub message.")
|
| + return false
|
| +
|
| + case err == nil:
|
| + log.Fields{
|
| + "ackID": msg.AckID,
|
| + "size": len(msg.Data),
|
| + "duration": duration,
|
| + }.Infof(c, "Message successfully processed; ACKing.")
|
| + return true
|
| +
|
| + default:
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "ackID": msg.AckID,
|
| + "size": len(msg.Data),
|
| + "duration": duration,
|
| + }.Errorf(c, "Non-transient error ingesting Pub/Sub message; ACKing.")
|
| + return true
|
| + }
|
| +}
|
| +
|
| // Entry point.
|
| func main() {
|
| a := application{}
|
|
|