| Index: common/gcloud/pubsub/subscriber/subscriber.go
|
| diff --git a/common/gcloud/pubsub/subscriber/subscriber.go b/common/gcloud/pubsub/subscriber/subscriber.go
|
| index c47116dfc85485d44ff0deda1bfb8c7b343bef08..de7b47cc8df244e85064fd8e321661a24cf04993 100644
|
| --- a/common/gcloud/pubsub/subscriber/subscriber.go
|
| +++ b/common/gcloud/pubsub/subscriber/subscriber.go
|
| @@ -5,7 +5,6 @@
|
| package subscriber
|
|
|
| import (
|
| - "sync"
|
| "time"
|
|
|
| "github.com/luci/luci-go/common/clock"
|
| @@ -46,84 +45,71 @@ type Subscriber struct {
|
| // A is used to send Pub/Sub message ACKs.
|
| A ACK
|
|
|
| - // PullWorkers is the maximum number of simultaneous worker goroutines that a
|
| - // Subscriber can have pulling Pub/Sub at any given moment.
|
| - //
|
| - // If <= 0, one worker will be used.
|
| - PullWorkers int
|
| -
|
| - // HandlerWorkers is the maximum number of message processing workers that
|
| - // the Subscriber will run at any given time. One worker is dispatched per
|
| - // Pub/Sub message received.
|
| - //
|
| - // If <= 0, the number of handler workers will be unbounded.
|
| - HandlerWorkers int
|
| + // Workers is the number of concurrent messages to be processed. If <= 0, a
|
| + // default of pubsub.MaxSubscriptionPullSize will be used.
|
| + Workers int
|
|
|
| // NoDataDelay is the amount of time to wait in between retries if there is
|
| // either an error or no data polling Pub/Sub.
|
| //
|
| // If <= 0, DefaultNoDataDelay will be used.
|
| NoDataDelay time.Duration
|
| -
|
| - // noDataMu is used to throttle retries if the subscription has no available
|
| - // data.
|
| - noDataMu sync.Mutex
|
| }
|
|
|
| // Run executes until the supplied Context is canceled. Each recieved message
|
| // is processed by a Handler.
|
| func (s *Subscriber) Run(c context.Context, h Handler) {
|
| - pullWorkers := s.PullWorkers
|
| - if pullWorkers <= 0 {
|
| - pullWorkers = 1
|
| + // Start a goroutine to continuously pull messages and put them into messageC.
|
| + workers := s.Workers
|
| + if workers <= 0 {
|
| + workers = pubsub.MaxSubscriptionPullSize
|
| }
|
| + messageC := make(chan *pubsub.Message, workers)
|
| + go func() {
|
| + defer close(messageC)
|
| +
|
| + // Adjust our Pull batch size based on limits.
|
| + batchSize := workers
|
| + if batchSize > pubsub.MaxSubscriptionPullSize {
|
| + batchSize = pubsub.MaxSubscriptionPullSize
|
| + }
|
|
|
| - runner := parallel.Runner{
|
| - Sustained: s.HandlerWorkers,
|
| - Maximum: s.HandlerWorkers,
|
| - }
|
| - defer runner.Close()
|
| -
|
| - parallel.WorkPool(pullWorkers, func(taskC chan<- func() error) {
|
| + // Fetch and process another batch of messages.
|
| for {
|
| - // Stop if our Context has been canceled.
|
| - if err := c.Err(); err != nil {
|
| + switch msgs, err := s.S.Pull(c, batchSize); err {
|
| + case context.Canceled, context.DeadlineExceeded:
|
| return
|
| - }
|
|
|
| - // Fetch and process another batch of messages.
|
| - taskC <- func() error {
|
| - switch msgs, err := s.S.Pull(c); err {
|
| - case context.Canceled, context.DeadlineExceeded:
|
| - break
|
| -
|
| - case nil:
|
| - s.handleMessages(c, h, &runner, msgs)
|
| -
|
| - default:
|
| - log.WithError(err).Errorf(c, "Failed to pull messages.")
|
| - s.noDataSleep(c)
|
| + case nil:
|
| + // Write all messages into messageC.
|
| + for _, msg := range msgs {
|
| + select {
|
| + case messageC <- msg:
|
| + break
|
| + case <-c.Done():
|
| + // Prefer messages for determinism.
|
| + select {
|
| + case messageC <- msg:
|
| + break
|
| + default:
|
| + break
|
| + }
|
| +
|
| + return
|
| + }
|
| }
|
|
|
| - return nil
|
| + default:
|
| + log.WithError(err).Errorf(c, "Failed to pull messages.")
|
| + s.noDataSleep(c)
|
| }
|
| }
|
| - })
|
| -}
|
| + }()
|
|
|
| -func (s *Subscriber) handleMessages(c context.Context, h Handler, r *parallel.Runner, msgs []*pubsub.Message) {
|
| - if len(msgs) == 0 {
|
| - s.noDataSleep(c)
|
| - return
|
| - }
|
| -
|
| - // Handle all messages in parallel.
|
| - parallel.Ignore(r.Run(func(taskC chan<- func() error) {
|
| - for _, msg := range msgs {
|
| + // Dispatch message handlers in parallel.
|
| + parallel.Ignore(parallel.Run(workers, func(taskC chan<- func() error) {
|
| + for msg := range messageC {
|
| msg := msg
|
| -
|
| - // Handle an individual message. If the Handler returns true, ACK
|
| - // it.
|
| taskC <- func() error {
|
| if h(msg) {
|
| s.A.Ack(msg.AckID)
|
| @@ -148,9 +134,6 @@ func (s *Subscriber) handleMessages(c context.Context, h Handler, r *parallel.Ru
|
| // sleep. This is a simple method to obtain the desired effect of avoiding
|
| // pointless burst Pub/Sub spam when the service has nothing useful to offer.
|
| func (s *Subscriber) noDataSleep(c context.Context) {
|
| - s.noDataMu.Lock()
|
| - defer s.noDataMu.Unlock()
|
| -
|
| d := s.NoDataDelay
|
| if d <= 0 {
|
| d = DefaultNoDataDelay
|
|
|