Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
|
dnj (Google)
2016/01/21 04:36:24
This changed a fair bit. Originally it dispatched
| |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 // Package subscriber implements the Subscriber, which orchestrates parallel | |
| 6 // Pub/Sub subscription pulls. | |
| 7 package subscriber | 5 package subscriber |
| 8 | 6 |
| 9 import ( | 7 import ( |
| 10 "sync" | 8 "sync" |
| 11 "time" | 9 "time" |
| 12 | 10 |
| 13 "github.com/luci/luci-go/common/clock" | 11 "github.com/luci/luci-go/common/clock" |
| 14 "github.com/luci/luci-go/common/gcloud/gcps" | |
| 15 log "github.com/luci/luci-go/common/logging" | 12 log "github.com/luci/luci-go/common/logging" |
| 16 "github.com/luci/luci-go/common/parallel" | 13 "github.com/luci/luci-go/common/parallel" |
| 17 "github.com/luci/luci-go/common/retry" | |
| 18 "golang.org/x/net/context" | 14 "golang.org/x/net/context" |
| 19 "google.golang.org/cloud/pubsub" | 15 "google.golang.org/cloud/pubsub" |
| 20 ) | 16 ) |
| 21 | 17 |
| 22 const ( | 18 const ( |
| 23 » // DefaultWorkers is the number of subscription workers to use. | 19 » // DefaultNoDataDelay is the default amount of time a worker will sleep if |
| 24 » DefaultWorkers = 20 | 20 » // there is no Pub/Sub data. |
| 25 | 21 » DefaultNoDataDelay = 5 * time.Second |
| 26 » // noDataDelay is the amount of time a worker will sleep if there is no | |
| 27 » // Pub/Sub data. | |
| 28 » noDataDelay = 1 * time.Second | |
| 29 ) | 22 ) |
| 30 | 23 |
| 31 // PubSubPull is an interface for something that can return Pub/Sub messages on | 24 // ACK is a goroutine-safe interface that is capable of sending Pub/Sub ACKs. |
| 32 // request. | 25 type ACK interface { |
| 33 // | 26 » // Ack ACKs a single Pub/Sub message ID. |
| 34 // gcps.PubSub naturally implements this interface. | 27 » Ack(string) |
| 35 type PubSubPull interface { | |
| 36 » // Pull pulls messages from the subscription. It returns up the requeste d | |
| 37 » // number of messages. | |
| 38 » Pull(gcps.Subscription, int) ([]*pubsub.Message, error) | |
| 39 } | 28 } |
| 40 | 29 |
| 41 // Callback is invoked for each received Pub/Sub message. | 30 // Handler is a handler function that manages an individual message. It returns |
| 42 type Callback func(*pubsub.Message) | 31 // true if the message should be consumed and false otherwise. |
| 32 type Handler func(*pubsub.Message) bool | |
| 43 | 33 |
| 44 // Subscriber is an interface to return Pub/Sub messages from Cloud Pub/Sub. | 34 // Subscriber pulls messages from a Pub/Sub channel and processes them. |
| 45 // It spawns several worker goroutines to poll a Pub/Sub interface and invoke | |
| 46 // a configured callback for each received message. | |
| 47 type Subscriber struct { | 35 type Subscriber struct { |
| 48 » // PubSub is the underlying Pub/Sub instance to pull from. | 36 » // S is used to pull Pub/Sub messages. |
| 49 » PubSub PubSubPull | 37 » S Source |
| 38 » // A is used to send Pub/Sub message ACKs. | |
| 39 » A ACK | |
| 50 | 40 |
| 51 » // Subscription is the name of the subscription to poll. | 41 » // Workers is the maximum number of simultaneous workers that a Subscrib er can |
| 52 » Subscription gcps.Subscription | 42 » // have at any given moment. |
| 43 » // | |
| 44 » // If <= 0, one worker will be used. | |
| 45 » Workers int | |
| 53 | 46 |
| 54 » // BatchSize is the number of simultaneous messages to pull. If <= zero, | 47 » // HandlerSem is the Semaphore to use to constrain the number of worker |
| 55 » // gcps.MaxSubscriptionPullSize is used. | 48 » // goroutines used to process individual messages. If nil, no constraint will |
| 56 » BatchSize int | 49 » // be applied. |
| 50 » HandlerSem parallel.Semaphore | |
| 57 | 51 |
| 58 » // Workers is the number of Pub/Sub polling workers to simultaneously ru n. If | 52 » // NoDataDelay is the amount of time to wait in between retries if there is |
| 59 » // <= zero, DefaultWorkers will be used. | 53 » // either an error or no data polling Pub/Sub. |
| 60 » Workers int | 54 » // |
| 55 » // If <= 0, DefaultNoDataDelay will be used. | |
| 56 » NoDataDelay time.Duration | |
| 57 | |
| 58 » // noDataMu is used to throttle retries if the subscription has no avail able | |
| 59 » // data. | |
| 60 » noDataMu sync.Mutex | |
| 61 » // handlerWG is the WaitGroup used to track outstanding message handlers . | |
| 62 » handlerWG sync.WaitGroup | |
| 61 } | 63 } |
| 62 | 64 |
| 63 // Run executes the Subscriber instance, spawning several workers and polling | 65 // Run executes until the supplied Context is cancelled. Each recieved message |
| 64 // for messages. The supplied callback will be invoked for each polled message. | 66 // is processed by a Handler. |
| 65 // | 67 func (s *Subscriber) Run(c context.Context, h Handler) { |
| 66 // Subscriber will run until the supplied Context is cancelled. | 68 » workers := s.Workers |
| 67 func (s *Subscriber) Run(ctx context.Context, cb Callback) { | 69 » if workers <= 0 { |
| 68 » batchSize := s.BatchSize | 70 » » workers = 1 |
| 69 » if batchSize <= 0 { | |
| 70 » » batchSize = gcps.MaxSubscriptionPullSize | |
| 71 } | 71 } |
| 72 | 72 |
| 73 » // Set our base logging fields. | 73 » parallel.WorkPool(workers, func(taskC chan<- func() error) { |
| 74 » ctx = log.SetFields(ctx, log.Fields{ | 74 » » for { |
| 75 » » "subscription": s.Subscription, | 75 » » » select { |
| 76 » » "batchSize": batchSize, | 76 » » » case <-c.Done(): |
| 77 » » » » return | |
| 78 | |
| 79 » » » default: | |
| 80 » » » » // Fetch and process another batch of messages. | |
| 81 » » » » taskC <- func() error { | |
| 82 » » » » » msgs, err := s.S.Pull(c) | |
| 83 » » » » » switch err { | |
| 84 » » » » » case context.Canceled: | |
| 85 » » » » » » break | |
| 86 | |
| 87 » » » » » case nil: | |
| 88 » » » » » » s.handleMessages(c, h, msgs) | |
| 89 | |
| 90 » » » » » default: | |
| 91 » » » » » » log.WithError(err).Errorf(c, "Fa iled to pull messages.") | |
| 92 » » » » » » s.noDataSleep(c) | |
| 93 » » » » » } | |
| 94 | |
| 95 » » » » » return nil | |
| 96 » » » » } | |
| 97 » » » } | |
| 98 » » } | |
| 77 }) | 99 }) |
| 78 | 100 |
| 79 » // Mutex to protect Pub/Sub spamming when there are no available message s. | 101 » // Wait for all of our Handlers to finish. |
| 80 » noDataMu := sync.Mutex{} | 102 » s.handlerWG.Wait() |
| 103 } | |
| 81 | 104 |
| 82 » workers := s.Workers | 105 func (s *Subscriber) handleMessages(c context.Context, h Handler, msgs []*pubsub .Message) { |
| 83 » if workers <= 0 { | 106 » if len(msgs) == 0 { |
| 84 » » workers = DefaultWorkers | 107 » » s.noDataSleep(c) |
| 108 » » return | |
| 85 } | 109 } |
| 86 err := parallel.WorkPool(workers, func(taskC chan<- func() error) { | |
| 87 // Dispatch poll tasks until our Context is cancelled. | |
| 88 for active := true; active; { | |
| 89 // Check if we're cancelled. | |
| 90 select { | |
| 91 case <-ctx.Done(): | |
| 92 log.WithError(ctx.Err()).Infof(ctx, "Context is finished.") | |
| 93 return | |
| 94 default: | |
| 95 break | |
| 96 } | |
| 97 | 110 |
| 98 » » » // Dispatch a poll task. Always return "nil", even if th ere is an error, | 111 » // Handle all messages in parallel. |
| 99 » » » // since error conditions are neither tracked nor fatal. | 112 » parallel.Run(s.HandlerSem, func(taskC chan<- func() error) { |
| 113 » » s.handlerWG.Add(len(msgs)) | |
| 114 » » for _, msg := range msgs { | |
| 115 » » » msg := msg | |
| 116 | |
| 117 » » » // Handle an individual message. If the Handler returns true, ACK | |
| 118 » » » // it. | |
| 100 taskC <- func() error { | 119 taskC <- func() error { |
| 101 » » » » if err := s.pullMessages(ctx, batchSize, &noData Mu, cb); err != nil { | 120 » » » » defer s.handlerWG.Done() |
| 102 » » » » » log.WithError(err).Errorf(ctx, "Failed t o pull messages.") | 121 |
| 122 » » » » if h(msg) { | |
| 123 » » » » » s.A.Ack(msg.AckID) | |
| 103 } | 124 } |
| 104 return nil | 125 return nil |
| 105 } | 126 } |
| 106 } | 127 } |
| 107 }) | 128 }) |
| 108 if err != nil { | |
| 109 log.WithError(err).Errorf(ctx, "Failed to run Subscriber work po ol.") | |
| 110 } | |
| 111 } | 129 } |
| 112 | 130 |
| 113 func (s *Subscriber) pullMessages(ctx context.Context, batchSize int, noDataMu s ync.Locker, cb Callback) error { | 131 func (s *Subscriber) noDataSleep(c context.Context) { |
| 114 » // Pull a set of messages. | 132 » s.noDataMu.Lock() |
| 115 » var messages []*pubsub.Message | 133 » defer s.noDataMu.Unlock() |
| 116 » err := retry.Retry(ctx, retry.TransientOnly(retry.Default()), func() (ie rr error) { | |
| 117 » » messages, ierr = s.PubSub.Pull(s.Subscription, batchSize) | |
| 118 » » return | |
| 119 » }, func(err error, d time.Duration) { | |
| 120 » » log.Fields{ | |
| 121 » » » log.ErrorKey: err, | |
| 122 » » » "delay": d, | |
| 123 » » }.Warningf(ctx, "Transient error on Pull(). Retrying...") | |
| 124 » }) | |
| 125 | 134 |
| 126 » if len(messages) > 0 { | 135 » d := s.NoDataDelay |
| 127 » » log.Fields{ | 136 » if d <= 0 { |
| 128 » » » "messageCount": len(messages), | 137 » » d = DefaultNoDataDelay |
| 129 » » }.Infof(ctx, "Pulled messages.") | |
| 130 | |
| 131 » » for _, msg := range messages { | |
| 132 » » » cb(msg) | |
| 133 » » } | |
| 134 } | 138 } |
| 135 | 139 » cancellableSleep(c, d) |
| 136 » if err != nil || len(messages) == 0 { | |
| 137 » » log.Fields{ | |
| 138 » » » log.ErrorKey: err, | |
| 139 » » » "delay": noDataDelay, | |
| 140 » » }.Debugf(ctx, "Sleeping.") | |
| 141 | |
| 142 » » noDataMu.Lock() | |
| 143 » » defer noDataMu.Unlock() | |
| 144 » » cancellableSleep(ctx, noDataDelay) | |
| 145 » } | |
| 146 | |
| 147 » return err | |
| 148 } | 140 } |
| 149 | 141 |
| 150 // cancellableSleep sleeps, returning either when the sleep duration has expired | 142 // cancellableSleep sleeps, returning either when the sleep duration has expired |
| 151 // or the supplied context has been cancelled. | 143 // or the supplied context has been cancelled. |
| 152 func cancellableSleep(ctx context.Context, delay time.Duration) { | 144 func cancellableSleep(c context.Context, delay time.Duration) { |
| 153 // Sleep for "delay", stopping early if our Context is cancelled. | 145 // Sleep for "delay", stopping early if our Context is cancelled. |
| 154 select { | 146 select { |
| 155 » case <-clock.After(ctx, delay): | 147 » case <-clock.After(c, delay): |
| 156 break | 148 break |
| 157 | 149 |
| 158 » case <-ctx.Done(): | 150 » case <-c.Done(): |
| 159 break | 151 break |
| 160 } | 152 } |
| 161 } | 153 } |
| OLD | NEW |