| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package subscriber | 5 package subscriber |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "sync" | |
| 9 "time" | 8 "time" |
| 10 | 9 |
| 11 "github.com/luci/luci-go/common/clock" | 10 "github.com/luci/luci-go/common/clock" |
| 12 "github.com/luci/luci-go/common/gcloud/pubsub" | 11 "github.com/luci/luci-go/common/gcloud/pubsub" |
| 13 log "github.com/luci/luci-go/common/logging" | 12 log "github.com/luci/luci-go/common/logging" |
| 14 "github.com/luci/luci-go/common/parallel" | 13 "github.com/luci/luci-go/common/parallel" |
| 15 "golang.org/x/net/context" | 14 "golang.org/x/net/context" |
| 16 ) | 15 ) |
| 17 | 16 |
| 18 const ( | 17 const ( |
| (...skipping 20 matching lines...) Expand all Loading... |
| 39 // true if the message should be consumed and false otherwise. | 38 // true if the message should be consumed and false otherwise. |
| 40 type Handler func(*pubsub.Message) bool | 39 type Handler func(*pubsub.Message) bool |
| 41 | 40 |
| 42 // Subscriber pulls messages from a Pub/Sub channel and processes them. | 41 // Subscriber pulls messages from a Pub/Sub channel and processes them. |
| 43 type Subscriber struct { | 42 type Subscriber struct { |
| 44 // S is used to pull Pub/Sub messages. | 43 // S is used to pull Pub/Sub messages. |
| 45 S Source | 44 S Source |
| 46 // A is used to send Pub/Sub message ACKs. | 45 // A is used to send Pub/Sub message ACKs. |
| 47 A ACK | 46 A ACK |
| 48 | 47 |
| 49 » // PullWorkers is the maximum number of simultaneous worker goroutines t
hat a | 48 » // Workers is the number of concurrent messages to be processed. If <= 0
, a |
| 50 » // Subscriber can have pulling Pub/Sub at any given moment. | 49 » // default of pubsub.MaxSubscriptionPullSize will be used. |
| 51 » // | 50 » Workers int |
| 52 » // If <= 0, one worker will be used. | |
| 53 » PullWorkers int | |
| 54 | |
| 55 » // HandlerWorkers is the maximum number of message processing workers th
at | |
| 56 » // the Subscriber will run at any given time. One worker is dispatched p
er | |
| 57 » // Pub/Sub message received. | |
| 58 » // | |
| 59 » // If <= 0, the number of handler workers will be unbounded. | |
| 60 » HandlerWorkers int | |
| 61 | 51 |
| 62 // NoDataDelay is the amount of time to wait in between retries if there
is | 52 // NoDataDelay is the amount of time to wait in between retries if there
is |
| 63 // either an error or no data polling Pub/Sub. | 53 // either an error or no data polling Pub/Sub. |
| 64 // | 54 // |
| 65 // If <= 0, DefaultNoDataDelay will be used. | 55 // If <= 0, DefaultNoDataDelay will be used. |
| 66 NoDataDelay time.Duration | 56 NoDataDelay time.Duration |
| 67 | |
| 68 // noDataMu is used to throttle retries if the subscription has no avail
able | |
| 69 // data. | |
| 70 noDataMu sync.Mutex | |
| 71 } | 57 } |
| 72 | 58 |
| 73 // Run executes until the supplied Context is canceled. Each recieved message | 59 // Run executes until the supplied Context is canceled. Each recieved message |
| 74 // is processed by a Handler. | 60 // is processed by a Handler. |
| 75 func (s *Subscriber) Run(c context.Context, h Handler) { | 61 func (s *Subscriber) Run(c context.Context, h Handler) { |
| 76 » pullWorkers := s.PullWorkers | 62 » // Start a goroutine to continuously pull messages and put them into mes
sageC. |
| 77 » if pullWorkers <= 0 { | 63 » workers := s.Workers |
| 78 » » pullWorkers = 1 | 64 » if workers <= 0 { |
| 65 » » workers = pubsub.MaxSubscriptionPullSize |
| 79 } | 66 } |
| 67 messageC := make(chan *pubsub.Message, workers) |
| 68 go func() { |
| 69 defer close(messageC) |
| 80 | 70 |
| 81 » runner := parallel.Runner{ | 71 » » // Adjust our Pull batch size based on limits. |
| 82 » » Sustained: s.HandlerWorkers, | 72 » » batchSize := workers |
| 83 » » Maximum: s.HandlerWorkers, | 73 » » if batchSize > pubsub.MaxSubscriptionPullSize { |
| 84 » } | 74 » » » batchSize = pubsub.MaxSubscriptionPullSize |
| 85 » defer runner.Close() | 75 » » } |
| 86 | 76 |
| 87 » parallel.WorkPool(pullWorkers, func(taskC chan<- func() error) { | 77 » » // Fetch and process another batch of messages. |
| 88 for { | 78 for { |
| 89 » » » // Stop if our Context has been canceled. | 79 » » » switch msgs, err := s.S.Pull(c, batchSize); err { |
| 90 » » » if err := c.Err(); err != nil { | 80 » » » case context.Canceled, context.DeadlineExceeded: |
| 91 return | 81 return |
| 92 } | |
| 93 | 82 |
| 94 » » » // Fetch and process another batch of messages. | 83 » » » case nil: |
| 95 » » » taskC <- func() error { | 84 » » » » // Write all messages into messageC. |
| 96 » » » » switch msgs, err := s.S.Pull(c); err { | 85 » » » » for _, msg := range msgs { |
| 97 » » » » case context.Canceled, context.DeadlineExceeded: | 86 » » » » » select { |
| 98 » » » » » break | 87 » » » » » case messageC <- msg: |
| 88 » » » » » » break |
| 89 » » » » » case <-c.Done(): |
| 90 » » » » » » // Prefer messages for determini
sm. |
| 91 » » » » » » select { |
| 92 » » » » » » case messageC <- msg: |
| 93 » » » » » » » break |
| 94 » » » » » » default: |
| 95 » » » » » » » break |
| 96 » » » » » » } |
| 99 | 97 |
| 100 » » » » case nil: | 98 » » » » » » return |
| 101 » » » » » s.handleMessages(c, h, &runner, msgs) | 99 » » » » » } |
| 102 | |
| 103 » » » » default: | |
| 104 » » » » » log.WithError(err).Errorf(c, "Failed to
pull messages.") | |
| 105 » » » » » s.noDataSleep(c) | |
| 106 } | 100 } |
| 107 | 101 |
| 108 » » » » return nil | 102 » » » default: |
| 103 » » » » log.WithError(err).Errorf(c, "Failed to pull mes
sages.") |
| 104 » » » » s.noDataSleep(c) |
| 109 } | 105 } |
| 110 } | 106 } |
| 111 » }) | 107 » }() |
| 112 } | |
| 113 | 108 |
| 114 func (s *Subscriber) handleMessages(c context.Context, h Handler, r *parallel.Ru
nner, msgs []*pubsub.Message) { | 109 » // Dispatch message handlers in parallel. |
| 115 » if len(msgs) == 0 { | 110 » parallel.Ignore(parallel.Run(workers, func(taskC chan<- func() error) { |
| 116 » » s.noDataSleep(c) | 111 » » for msg := range messageC { |
| 117 » » return | |
| 118 » } | |
| 119 | |
| 120 » // Handle all messages in parallel. | |
| 121 » parallel.Ignore(r.Run(func(taskC chan<- func() error) { | |
| 122 » » for _, msg := range msgs { | |
| 123 msg := msg | 112 msg := msg |
| 124 | |
| 125 // Handle an individual message. If the Handler returns
true, ACK | |
| 126 // it. | |
| 127 taskC <- func() error { | 113 taskC <- func() error { |
| 128 if h(msg) { | 114 if h(msg) { |
| 129 s.A.Ack(msg.AckID) | 115 s.A.Ack(msg.AckID) |
| 130 } | 116 } |
| 131 return nil | 117 return nil |
| 132 } | 118 } |
| 133 } | 119 } |
| 134 })) | 120 })) |
| 135 } | 121 } |
| 136 | 122 |
| 137 // noDataSleep sleeps for the configured NoDataDelay. This sleep will terminate | 123 // noDataSleep sleeps for the configured NoDataDelay. This sleep will terminate |
| 138 // immediately if the supplied Context is canceled. | 124 // immediately if the supplied Context is canceled. |
| 139 // | 125 // |
| 140 // This method is called when a pull goroutine receives either an error or a | 126 // This method is called when a pull goroutine receives either an error or a |
| 141 // response with no messages from Pub/Sub. In order to smooth out retry spam | 127 // response with no messages from Pub/Sub. In order to smooth out retry spam |
| 142 // while we either wait for more messages or wait for Pub/Sub to work again, all | 128 // while we either wait for more messages or wait for Pub/Sub to work again, all |
| 143 // of the goroutines share a sleep mutex. | 129 // of the goroutines share a sleep mutex. |
| 144 // | 130 // |
| 145 // This collapses their potentially-parallel sleep attempts into a serial chain | 131 // This collapses their potentially-parallel sleep attempts into a serial chain |
| 146 // of sleeps. This is done by having all sleep attempts share a lock. Any | 132 // of sleeps. This is done by having all sleep attempts share a lock. Any |
| 147 // goroutine that wants to sleep will wait for the lock and hold it through its | 133 // goroutine that wants to sleep will wait for the lock and hold it through its |
| 148 // sleep. This is a simple method to obtain the desired effect of avoiding | 134 // sleep. This is a simple method to obtain the desired effect of avoiding |
| 149 // pointless burst Pub/Sub spam when the service has nothing useful to offer. | 135 // pointless burst Pub/Sub spam when the service has nothing useful to offer. |
| 150 func (s *Subscriber) noDataSleep(c context.Context) { | 136 func (s *Subscriber) noDataSleep(c context.Context) { |
| 151 s.noDataMu.Lock() | |
| 152 defer s.noDataMu.Unlock() | |
| 153 | |
| 154 d := s.NoDataDelay | 137 d := s.NoDataDelay |
| 155 if d <= 0 { | 138 if d <= 0 { |
| 156 d = DefaultNoDataDelay | 139 d = DefaultNoDataDelay |
| 157 } | 140 } |
| 158 clock.Sleep(c, d) | 141 clock.Sleep(c, d) |
| 159 } | 142 } |
| OLD | NEW |