| OLD | NEW |
| (Empty) |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package subscriber | |
| 6 | |
| 7 import ( | |
| 8 "time" | |
| 9 | |
| 10 "github.com/luci/luci-go/common/clock" | |
| 11 "github.com/luci/luci-go/common/gcloud/pubsub" | |
| 12 log "github.com/luci/luci-go/common/logging" | |
| 13 "github.com/luci/luci-go/common/parallel" | |
| 14 "golang.org/x/net/context" | |
| 15 ) | |
| 16 | |
| 17 const ( | |
| 18 // DefaultNoDataDelay is the default amount of time a worker will sleep
if | |
| 19 // there is no Pub/Sub data. | |
| 20 DefaultNoDataDelay = 5 * time.Second | |
| 21 ) | |
| 22 | |
| 23 // ACK is a goroutine-safe interface that is capable of sending Pub/Sub ACKs. | |
| 24 type ACK interface { | |
| 25 // Ack ACKs a single Pub/Sub message ID. | |
| 26 // | |
| 27 // Note that this method cannot fail. The ACK instance is responsible fo
r | |
| 28 // making a best effort to perform the acknowledgement and buffering/ret
rying | |
| 29 // as it sees fit. Applications must understand that ACKs can fail and p
lan | |
| 30 // their ingest pipeline accordingly. | |
| 31 // | |
| 32 // This functions primarily as a hand-off of responsibility from Subscri
ber | |
| 33 // (intent to acknowledge) to ACK (responsibility to acknowledge). | |
| 34 Ack(string) | |
| 35 } | |
| 36 | |
| 37 // Handler is a handler function that manages an individual message. It returns | |
| 38 // true if the message should be consumed and false otherwise. | |
| 39 type Handler func(*pubsub.Message) bool | |
| 40 | |
| 41 // Subscriber pulls messages from a Pub/Sub channel and processes them. | |
| 42 type Subscriber struct { | |
| 43 // S is used to pull Pub/Sub messages. | |
| 44 S Source | |
| 45 // A is used to send Pub/Sub message ACKs. | |
| 46 A ACK | |
| 47 | |
| 48 // Workers is the number of concurrent messages to be processed. If <= 0
, a | |
| 49 // default of pubsub.MaxSubscriptionPullSize will be used. | |
| 50 Workers int | |
| 51 | |
| 52 // NoDataDelay is the amount of time to wait in between retries if there
is | |
| 53 // either an error or no data polling Pub/Sub. | |
| 54 // | |
| 55 // If <= 0, DefaultNoDataDelay will be used. | |
| 56 NoDataDelay time.Duration | |
| 57 } | |
| 58 | |
| 59 // Run executes until the supplied Context is canceled. Each recieved message | |
| 60 // is processed by a Handler. | |
| 61 func (s *Subscriber) Run(c context.Context, h Handler) { | |
| 62 // Start a goroutine to continuously pull messages and put them into mes
sageC. | |
| 63 workers := s.Workers | |
| 64 if workers <= 0 { | |
| 65 workers = pubsub.MaxSubscriptionPullSize | |
| 66 } | |
| 67 messageC := make(chan *pubsub.Message, workers) | |
| 68 go func() { | |
| 69 defer close(messageC) | |
| 70 | |
| 71 // Adjust our Pull batch size based on limits. | |
| 72 batchSize := workers | |
| 73 if batchSize > pubsub.MaxSubscriptionPullSize { | |
| 74 batchSize = pubsub.MaxSubscriptionPullSize | |
| 75 } | |
| 76 | |
| 77 // Fetch and process another batch of messages. | |
| 78 for { | |
| 79 switch msgs, err := s.S.Pull(c, batchSize); err { | |
| 80 case context.Canceled, context.DeadlineExceeded: | |
| 81 return | |
| 82 | |
| 83 case nil: | |
| 84 // Write all messages into messageC. | |
| 85 for _, msg := range msgs { | |
| 86 select { | |
| 87 case messageC <- msg: | |
| 88 break | |
| 89 case <-c.Done(): | |
| 90 // Prefer messages for determini
sm. | |
| 91 select { | |
| 92 case messageC <- msg: | |
| 93 break | |
| 94 default: | |
| 95 break | |
| 96 } | |
| 97 | |
| 98 return | |
| 99 } | |
| 100 } | |
| 101 | |
| 102 default: | |
| 103 log.WithError(err).Errorf(c, "Failed to pull mes
sages.") | |
| 104 s.noDataSleep(c) | |
| 105 } | |
| 106 } | |
| 107 }() | |
| 108 | |
| 109 // Dispatch message handlers in parallel. | |
| 110 parallel.Ignore(parallel.Run(workers, func(taskC chan<- func() error) { | |
| 111 for msg := range messageC { | |
| 112 msg := msg | |
| 113 taskC <- func() error { | |
| 114 if h(msg) { | |
| 115 s.A.Ack(msg.AckID) | |
| 116 } | |
| 117 return nil | |
| 118 } | |
| 119 } | |
| 120 })) | |
| 121 } | |
| 122 | |
| 123 // noDataSleep sleeps for the configured NoDataDelay. This sleep will terminate | |
| 124 // immediately if the supplied Context is canceled. | |
| 125 // | |
| 126 // This method is called when a pull goroutine receives either an error or a | |
| 127 // response with no messages from Pub/Sub. In order to smooth out retry spam | |
| 128 // while we either wait for more messages or wait for Pub/Sub to work again, all | |
| 129 // of the goroutines share a sleep mutex. | |
| 130 // | |
| 131 // This collapses their potentially-parallel sleep attempts into a serial chain | |
| 132 // of sleeps. This is done by having all sleep attempts share a lock. Any | |
| 133 // goroutine that wants to sleep will wait for the lock and hold it through its | |
| 134 // sleep. This is a simple method to obtain the desired effect of avoiding | |
| 135 // pointless burst Pub/Sub spam when the service has nothing useful to offer. | |
| 136 func (s *Subscriber) noDataSleep(c context.Context) { | |
| 137 d := s.NoDataDelay | |
| 138 if d <= 0 { | |
| 139 d = DefaultNoDataDelay | |
| 140 } | |
| 141 clock.Sleep(c, d) | |
| 142 } | |
| OLD | NEW |