Chromium Code Reviews| Index: common/gcloud/gcps/subscriber/subscriber.go |
| diff --git a/common/gcloud/gcps/subscriber/subscriber.go b/common/gcloud/gcps/subscriber/subscriber.go |
| index aafacc250d2944d7e1b7e8acd7809d683d40f291..cd5d343ec56418db4408f599e0b6336ce8111958 100644 |
| --- a/common/gcloud/gcps/subscriber/subscriber.go |
| +++ b/common/gcloud/gcps/subscriber/subscriber.go |
| @@ -1,9 +1,7 @@ |
| -// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Copyright 2016 The Chromium Authors. All rights reserved. |
|
dnj (Google)
2016/01/21 04:36:24
This changed a fair bit. Originally it dispatched
|
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| -// Package subscriber implements the Subscriber, which orchestrates parallel |
| -// Pub/Sub subscription pulls. |
| package subscriber |
| import ( |
| @@ -11,151 +9,145 @@ import ( |
| "time" |
| "github.com/luci/luci-go/common/clock" |
| - "github.com/luci/luci-go/common/gcloud/gcps" |
| log "github.com/luci/luci-go/common/logging" |
| "github.com/luci/luci-go/common/parallel" |
| - "github.com/luci/luci-go/common/retry" |
| "golang.org/x/net/context" |
| "google.golang.org/cloud/pubsub" |
| ) |
| const ( |
| - // DefaultWorkers is the number of subscription workers to use. |
| - DefaultWorkers = 20 |
| - |
| - // noDataDelay is the amount of time a worker will sleep if there is no |
| - // Pub/Sub data. |
| - noDataDelay = 1 * time.Second |
| + // DefaultNoDataDelay is the default amount of time a worker will sleep if |
| + // there is no Pub/Sub data. |
| + DefaultNoDataDelay = 5 * time.Second |
| ) |
| -// PubSubPull is an interface for something that can return Pub/Sub messages on |
| -// request. |
| -// |
| -// gcps.PubSub naturally implements this interface. |
| -type PubSubPull interface { |
| - // Pull pulls messages from the subscription. It returns up the requested |
| - // number of messages. |
| - Pull(gcps.Subscription, int) ([]*pubsub.Message, error) |
| +// ACK is a goroutine-safe interface that is capable of sending Pub/Sub ACKs. |
| +type ACK interface { |
| + // Ack ACKs a single Pub/Sub message ID. |
| + Ack(string) |
| } |
| -// Callback is invoked for each received Pub/Sub message. |
| -type Callback func(*pubsub.Message) |
| +// Handler is a handler function that manages an individual message. It returns |
| +// true if the message should be consumed and false otherwise. |
| +type Handler func(*pubsub.Message) bool |
| -// Subscriber is an interface to return Pub/Sub messages from Cloud Pub/Sub. |
| -// It spawns several worker goroutines to poll a Pub/Sub interface and invoke |
| -// a configured callback for each received message. |
| +// Subscriber pulls messages from a Pub/Sub channel and processes them. |
| type Subscriber struct { |
| - // PubSub is the underlying Pub/Sub instance to pull from. |
| - PubSub PubSubPull |
| - |
| - // Subscription is the name of the subscription to poll. |
| - Subscription gcps.Subscription |
| - |
| - // BatchSize is the number of simultaneous messages to pull. If <= zero, |
| - // gcps.MaxSubscriptionPullSize is used. |
| - BatchSize int |
| - |
| - // Workers is the number of Pub/Sub polling workers to simultaneously run. If |
| - // <= zero, DefaultWorkers will be used. |
| + // S is used to pull Pub/Sub messages. |
| + S Source |
| + // A is used to send Pub/Sub message ACKs. |
| + A ACK |
| + |
| + // Workers is the maximum number of simultaneous workers that a Subscriber can |
| + // have at any given moment. |
| + // |
| + // If <= 0, one worker will be used. |
| Workers int |
| -} |
| - |
| -// Run executes the Subscriber instance, spawning several workers and polling |
| -// for messages. The supplied callback will be invoked for each polled message. |
| -// |
| -// Subscriber will run until the supplied Context is cancelled. |
| -func (s *Subscriber) Run(ctx context.Context, cb Callback) { |
| - batchSize := s.BatchSize |
| - if batchSize <= 0 { |
| - batchSize = gcps.MaxSubscriptionPullSize |
| - } |
| - // Set our base logging fields. |
| - ctx = log.SetFields(ctx, log.Fields{ |
| - "subscription": s.Subscription, |
| - "batchSize": batchSize, |
| - }) |
| - |
| - // Mutex to protect Pub/Sub spamming when there are no available messages. |
| - noDataMu := sync.Mutex{} |
| + // HandlerSem is the Semaphore to use to constrain the number of worker |
| + // goroutines used to process individual messages. If nil, no constraint will |
| + // be applied. |
| + HandlerSem parallel.Semaphore |
| + |
| + // NoDataDelay is the amount of time to wait in between retries if there is |
| + // either an error or no data polling Pub/Sub. |
| + // |
| + // If <= 0, DefaultNoDataDelay will be used. |
| + NoDataDelay time.Duration |
| + |
| + // noDataMu is used to throttle retries if the subscription has no available |
| + // data. |
| + noDataMu sync.Mutex |
| + // handlerWG is the WaitGroup used to track outstanding message handlers. |
| + handlerWG sync.WaitGroup |
| +} |
| +// Run executes until the supplied Context is cancelled. Each recieved message |
| +// is processed by a Handler. |
| +func (s *Subscriber) Run(c context.Context, h Handler) { |
| workers := s.Workers |
| if workers <= 0 { |
| - workers = DefaultWorkers |
| + workers = 1 |
| } |
| - err := parallel.WorkPool(workers, func(taskC chan<- func() error) { |
| - // Dispatch poll tasks until our Context is cancelled. |
| - for active := true; active; { |
| - // Check if we're cancelled. |
| + |
| + parallel.WorkPool(workers, func(taskC chan<- func() error) { |
| + for { |
| select { |
| - case <-ctx.Done(): |
| - log.WithError(ctx.Err()).Infof(ctx, "Context is finished.") |
| + case <-c.Done(): |
| return |
| - default: |
| - break |
| - } |
| - // Dispatch a poll task. Always return "nil", even if there is an error, |
| - // since error conditions are neither tracked nor fatal. |
| - taskC <- func() error { |
| - if err := s.pullMessages(ctx, batchSize, &noDataMu, cb); err != nil { |
| - log.WithError(err).Errorf(ctx, "Failed to pull messages.") |
| + default: |
| + // Fetch and process another batch of messages. |
| + taskC <- func() error { |
| + msgs, err := s.S.Pull(c) |
| + switch err { |
| + case context.Canceled: |
| + break |
| + |
| + case nil: |
| + s.handleMessages(c, h, msgs) |
| + |
| + default: |
| + log.WithError(err).Errorf(c, "Failed to pull messages.") |
| + s.noDataSleep(c) |
| + } |
| + |
| + return nil |
| } |
| - return nil |
| } |
| } |
| }) |
| - if err != nil { |
| - log.WithError(err).Errorf(ctx, "Failed to run Subscriber work pool.") |
| - } |
| + |
| + // Wait for all of our Handlers to finish. |
| + s.handlerWG.Wait() |
| } |
| -func (s *Subscriber) pullMessages(ctx context.Context, batchSize int, noDataMu sync.Locker, cb Callback) error { |
| - // Pull a set of messages. |
| - var messages []*pubsub.Message |
| - err := retry.Retry(ctx, retry.TransientOnly(retry.Default()), func() (ierr error) { |
| - messages, ierr = s.PubSub.Pull(s.Subscription, batchSize) |
| +func (s *Subscriber) handleMessages(c context.Context, h Handler, msgs []*pubsub.Message) { |
| + if len(msgs) == 0 { |
| + s.noDataSleep(c) |
| return |
| - }, func(err error, d time.Duration) { |
| - log.Fields{ |
| - log.ErrorKey: err, |
| - "delay": d, |
| - }.Warningf(ctx, "Transient error on Pull(). Retrying...") |
| - }) |
| + } |
| + |
| + // Handle all messages in parallel. |
| + parallel.Run(s.HandlerSem, func(taskC chan<- func() error) { |
| + s.handlerWG.Add(len(msgs)) |
| + for _, msg := range msgs { |
| + msg := msg |
| - if len(messages) > 0 { |
| - log.Fields{ |
| - "messageCount": len(messages), |
| - }.Infof(ctx, "Pulled messages.") |
| + // Handle an individual message. If the Handler returns true, ACK |
| + // it. |
| + taskC <- func() error { |
| + defer s.handlerWG.Done() |
| - for _, msg := range messages { |
| - cb(msg) |
| + if h(msg) { |
| + s.A.Ack(msg.AckID) |
| + } |
| + return nil |
| + } |
| } |
| - } |
| + }) |
| +} |
| - if err != nil || len(messages) == 0 { |
| - log.Fields{ |
| - log.ErrorKey: err, |
| - "delay": noDataDelay, |
| - }.Debugf(ctx, "Sleeping.") |
| +func (s *Subscriber) noDataSleep(c context.Context) { |
| + s.noDataMu.Lock() |
| + defer s.noDataMu.Unlock() |
| - noDataMu.Lock() |
| - defer noDataMu.Unlock() |
| - cancellableSleep(ctx, noDataDelay) |
| + d := s.NoDataDelay |
| + if d <= 0 { |
| + d = DefaultNoDataDelay |
| } |
| - |
| - return err |
| + cancellableSleep(c, d) |
| } |
| // cancellableSleep sleeps, returning either when the sleep duration has expired |
| // or the supplied context has been cancelled. |
| -func cancellableSleep(ctx context.Context, delay time.Duration) { |
| +func cancellableSleep(c context.Context, delay time.Duration) { |
| // Sleep for "delay", stopping early if our Context is cancelled. |
| select { |
| - case <-clock.After(ctx, delay): |
| + case <-clock.After(c, delay): |
| break |
| - case <-ctx.Done(): |
| + case <-c.Done(): |
| break |
| } |
| } |