Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(95)

Unified Diff: common/gcloud/pubsub/subscriber/subscriber.go

Issue 1838803002: LogDog: BigTable batching schema. (Closed) Base URL: https://github.com/luci/luci-go@recordio-split
Patch Set: Minor comments and quality of code tweaks. Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « common/gcloud/pubsub/subscriber/source.go ('k') | common/gcloud/pubsub/subscriber/subscriber_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: common/gcloud/pubsub/subscriber/subscriber.go
diff --git a/common/gcloud/pubsub/subscriber/subscriber.go b/common/gcloud/pubsub/subscriber/subscriber.go
index c47116dfc85485d44ff0deda1bfb8c7b343bef08..de7b47cc8df244e85064fd8e321661a24cf04993 100644
--- a/common/gcloud/pubsub/subscriber/subscriber.go
+++ b/common/gcloud/pubsub/subscriber/subscriber.go
@@ -5,7 +5,6 @@
package subscriber
import (
- "sync"
"time"
"github.com/luci/luci-go/common/clock"
@@ -46,84 +45,71 @@ type Subscriber struct {
// A is used to send Pub/Sub message ACKs.
A ACK
- // PullWorkers is the maximum number of simultaneous worker goroutines that a
- // Subscriber can have pulling Pub/Sub at any given moment.
- //
- // If <= 0, one worker will be used.
- PullWorkers int
-
- // HandlerWorkers is the maximum number of message processing workers that
- // the Subscriber will run at any given time. One worker is dispatched per
- // Pub/Sub message received.
- //
- // If <= 0, the number of handler workers will be unbounded.
- HandlerWorkers int
+ // Workers is the number of concurrent messages to be processed. If <= 0, a
+ // default of pubsub.MaxSubscriptionPullSize will be used.
+ Workers int
// NoDataDelay is the amount of time to wait in between retries if there is
// either an error or no data polling Pub/Sub.
//
// If <= 0, DefaultNoDataDelay will be used.
NoDataDelay time.Duration
-
- // noDataMu is used to throttle retries if the subscription has no available
- // data.
- noDataMu sync.Mutex
}
// Run executes until the supplied Context is canceled. Each recieved message
// is processed by a Handler.
func (s *Subscriber) Run(c context.Context, h Handler) {
- pullWorkers := s.PullWorkers
- if pullWorkers <= 0 {
- pullWorkers = 1
+ // Start a goroutine to continuously pull messages and put them into messageC.
+ workers := s.Workers
+ if workers <= 0 {
+ workers = pubsub.MaxSubscriptionPullSize
}
+ messageC := make(chan *pubsub.Message, workers)
+ go func() {
+ defer close(messageC)
+
+ // Adjust our Pull batch size based on limits.
+ batchSize := workers
+ if batchSize > pubsub.MaxSubscriptionPullSize {
+ batchSize = pubsub.MaxSubscriptionPullSize
+ }
- runner := parallel.Runner{
- Sustained: s.HandlerWorkers,
- Maximum: s.HandlerWorkers,
- }
- defer runner.Close()
-
- parallel.WorkPool(pullWorkers, func(taskC chan<- func() error) {
+ // Fetch and process another batch of messages.
for {
- // Stop if our Context has been canceled.
- if err := c.Err(); err != nil {
+ switch msgs, err := s.S.Pull(c, batchSize); err {
+ case context.Canceled, context.DeadlineExceeded:
return
- }
- // Fetch and process another batch of messages.
- taskC <- func() error {
- switch msgs, err := s.S.Pull(c); err {
- case context.Canceled, context.DeadlineExceeded:
- break
-
- case nil:
- s.handleMessages(c, h, &runner, msgs)
-
- default:
- log.WithError(err).Errorf(c, "Failed to pull messages.")
- s.noDataSleep(c)
+ case nil:
+ // Write all messages into messageC.
+ for _, msg := range msgs {
+ select {
+ case messageC <- msg:
+ break
+ case <-c.Done():
+ // Prefer messages for determinism.
+ select {
+ case messageC <- msg:
+ break
+ default:
+ break
+ }
+
+ return
+ }
}
- return nil
+ default:
+ log.WithError(err).Errorf(c, "Failed to pull messages.")
+ s.noDataSleep(c)
}
}
- })
-}
+ }()
-func (s *Subscriber) handleMessages(c context.Context, h Handler, r *parallel.Runner, msgs []*pubsub.Message) {
- if len(msgs) == 0 {
- s.noDataSleep(c)
- return
- }
-
- // Handle all messages in parallel.
- parallel.Ignore(r.Run(func(taskC chan<- func() error) {
- for _, msg := range msgs {
+ // Dispatch message handlers in parallel.
+ parallel.Ignore(parallel.Run(workers, func(taskC chan<- func() error) {
+ for msg := range messageC {
msg := msg
-
- // Handle an individual message. If the Handler returns true, ACK
- // it.
taskC <- func() error {
if h(msg) {
s.A.Ack(msg.AckID)
@@ -148,9 +134,6 @@ func (s *Subscriber) handleMessages(c context.Context, h Handler, r *parallel.Ru
// sleep. This is a simple method to obtain the desired effect of avoiding
// pointless burst Pub/Sub spam when the service has nothing useful to offer.
func (s *Subscriber) noDataSleep(c context.Context) {
- s.noDataMu.Lock()
- defer s.noDataMu.Unlock()
-
d := s.NoDataDelay
if d <= 0 {
d = DefaultNoDataDelay
« no previous file with comments | « common/gcloud/pubsub/subscriber/source.go ('k') | common/gcloud/pubsub/subscriber/subscriber_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698