| OLD | NEW |
| (Empty) |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package subscriber | |
| 6 | |
| 7 import ( | |
| 8 "sync" | |
| 9 "time" | |
| 10 | |
| 11 "github.com/luci/luci-go/common/clock" | |
| 12 "github.com/luci/luci-go/common/gcloud/pubsub" | |
| 13 log "github.com/luci/luci-go/common/logging" | |
| 14 "github.com/luci/luci-go/common/parallel" | |
| 15 "golang.org/x/net/context" | |
| 16 ) | |
| 17 | |
| 18 const ( | |
| 19 // DefaultNoDataDelay is the default amount of time a worker will sleep
if | |
| 20 // there is no Pub/Sub data. | |
| 21 DefaultNoDataDelay = 5 * time.Second | |
| 22 ) | |
| 23 | |
| 24 // ACK is a goroutine-safe interface that is capable of sending Pub/Sub ACKs. | |
| 25 type ACK interface { | |
| 26 // Ack ACKs a single Pub/Sub message ID. | |
| 27 // | |
| 28 // Note that this method cannot fail. The ACK instance is responsible fo
r | |
| 29 // making a best effort to perform the acknowledgement and buffering/ret
rying | |
| 30 // as it sees fit. Applications must understand that ACKs can fail and p
lan | |
| 31 // their ingest pipeline accordingly. | |
| 32 // | |
| 33 // This functions primarily as a hand-off of responsibility from Subscri
ber | |
| 34 // (intent to acknowledge) to ACK (responsibility to acknowledge). | |
| 35 Ack(string) | |
| 36 } | |
| 37 | |
| 38 // Handler is a handler function that manages an individual message. It returns | |
| 39 // true if the message should be consumed and false otherwise. | |
| 40 type Handler func(*pubsub.Message) bool | |
| 41 | |
| 42 // Subscriber pulls messages from a Pub/Sub channel and processes them. | |
| 43 type Subscriber struct { | |
| 44 // S is used to pull Pub/Sub messages. | |
| 45 S Source | |
| 46 // A is used to send Pub/Sub message ACKs. | |
| 47 A ACK | |
| 48 | |
| 49 // PullWorkers is the maximum number of simultaneous worker goroutines t
hat a | |
| 50 // Subscriber can have pulling Pub/Sub at any given moment. | |
| 51 // | |
| 52 // If <= 0, one worker will be used. | |
| 53 PullWorkers int | |
| 54 | |
| 55 // HandlerWorkers is the maximum number of message processing workers th
at | |
| 56 // the Subscriber will run at any given time. One worker is dispatched p
er | |
| 57 // Pub/Sub message received. | |
| 58 // | |
| 59 // If <= 0, the number of handler workers will be unbounded. | |
| 60 HandlerWorkers int | |
| 61 | |
| 62 // NoDataDelay is the amount of time to wait in between retries if there
is | |
| 63 // either an error or no data polling Pub/Sub. | |
| 64 // | |
| 65 // If <= 0, DefaultNoDataDelay will be used. | |
| 66 NoDataDelay time.Duration | |
| 67 | |
| 68 // noDataMu is used to throttle retries if the subscription has no avail
able | |
| 69 // data. | |
| 70 noDataMu sync.Mutex | |
| 71 } | |
| 72 | |
| 73 // Run executes until the supplied Context is canceled. Each recieved message | |
| 74 // is processed by a Handler. | |
| 75 func (s *Subscriber) Run(c context.Context, h Handler) { | |
| 76 pullWorkers := s.PullWorkers | |
| 77 if pullWorkers <= 0 { | |
| 78 pullWorkers = 1 | |
| 79 } | |
| 80 | |
| 81 runner := parallel.Runner{ | |
| 82 Sustained: s.HandlerWorkers, | |
| 83 Maximum: s.HandlerWorkers, | |
| 84 } | |
| 85 defer runner.Close() | |
| 86 | |
| 87 parallel.WorkPool(pullWorkers, func(taskC chan<- func() error) { | |
| 88 for { | |
| 89 // Stop if our Context has been canceled. | |
| 90 if err := c.Err(); err != nil { | |
| 91 return | |
| 92 } | |
| 93 | |
| 94 // Fetch and process another batch of messages. | |
| 95 taskC <- func() error { | |
| 96 switch msgs, err := s.S.Pull(c); err { | |
| 97 case context.Canceled, context.DeadlineExceeded: | |
| 98 break | |
| 99 | |
| 100 case nil: | |
| 101 s.handleMessages(c, h, &runner, msgs) | |
| 102 | |
| 103 default: | |
| 104 log.WithError(err).Errorf(c, "Failed to
pull messages.") | |
| 105 s.noDataSleep(c) | |
| 106 } | |
| 107 | |
| 108 return nil | |
| 109 } | |
| 110 } | |
| 111 }) | |
| 112 } | |
| 113 | |
| 114 func (s *Subscriber) handleMessages(c context.Context, h Handler, r *parallel.Ru
nner, msgs []*pubsub.Message) { | |
| 115 if len(msgs) == 0 { | |
| 116 s.noDataSleep(c) | |
| 117 return | |
| 118 } | |
| 119 | |
| 120 // Handle all messages in parallel. | |
| 121 parallel.Ignore(r.Run(func(taskC chan<- func() error) { | |
| 122 for _, msg := range msgs { | |
| 123 msg := msg | |
| 124 | |
| 125 // Handle an individual message. If the Handler returns
true, ACK | |
| 126 // it. | |
| 127 taskC <- func() error { | |
| 128 if h(msg) { | |
| 129 s.A.Ack(msg.AckID) | |
| 130 } | |
| 131 return nil | |
| 132 } | |
| 133 } | |
| 134 })) | |
| 135 } | |
| 136 | |
| 137 // noDataSleep sleeps for the configured NoDataDelay. This sleep will terminate | |
| 138 // immediately if the supplied Context is canceled. | |
| 139 // | |
| 140 // This method is called when a pull goroutine receives either an error or a | |
| 141 // response with no messages from Pub/Sub. In order to smooth out retry spam | |
| 142 // while we either wait for more messages or wait for Pub/Sub to work again, all | |
| 143 // of the goroutines share a sleep mutex. | |
| 144 // | |
| 145 // This collapses their potentially-parallel sleep attempts into a serial chain | |
| 146 // of sleeps. This is done by having all sleep attempts share a lock. Any | |
| 147 // goroutine that wants to sleep will wait for the lock and hold it through its | |
| 148 // sleep. This is a simple method to obtain the desired effect of avoiding | |
| 149 // pointless burst Pub/Sub spam when the service has nothing useful to offer. | |
| 150 func (s *Subscriber) noDataSleep(c context.Context) { | |
| 151 s.noDataMu.Lock() | |
| 152 defer s.noDataMu.Unlock() | |
| 153 | |
| 154 d := s.NoDataDelay | |
| 155 if d <= 0 { | |
| 156 d = DefaultNoDataDelay | |
| 157 } | |
| 158 clock.Sleep(c, d) | |
| 159 } | |
| OLD | NEW |