Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(310)

Side by Side Diff: common/gcloud/pubsub/subscriber/subscriber.go

Issue 1838303002: Use native Pub/Sub library primitives. (Closed) Base URL: https://github.com/luci/luci-go@logdog-go1.6
Patch Set: Use "Topic" instead of "NewTopic" ... don't want to create :) Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package subscriber
6
7 import (
8 "time"
9
10 "github.com/luci/luci-go/common/clock"
11 "github.com/luci/luci-go/common/gcloud/pubsub"
12 log "github.com/luci/luci-go/common/logging"
13 "github.com/luci/luci-go/common/parallel"
14 "golang.org/x/net/context"
15 )
16
17 const (
18 // DefaultNoDataDelay is the default amount of time a worker will sleep if
19 // there is no Pub/Sub data.
20 DefaultNoDataDelay = 5 * time.Second
21 )
22
23 // ACK is a goroutine-safe interface that is capable of sending Pub/Sub ACKs.
24 type ACK interface {
25 // Ack ACKs a single Pub/Sub message ID.
26 //
27 // Note that this method cannot fail. The ACK instance is responsible fo r
28 // making a best effort to perform the acknowledgement and buffering/ret rying
29 // as it sees fit. Applications must understand that ACKs can fail and p lan
30 // their ingest pipeline accordingly.
31 //
32 // This functions primarily as a hand-off of responsibility from Subscri ber
33 // (intent to acknowledge) to ACK (responsibility to acknowledge).
34 Ack(string)
35 }
36
37 // Handler is a handler function that manages an individual message. It returns
38 // true if the message should be consumed and false otherwise.
39 type Handler func(*pubsub.Message) bool
40
41 // Subscriber pulls messages from a Pub/Sub channel and processes them.
42 type Subscriber struct {
43 // S is used to pull Pub/Sub messages.
44 S Source
45 // A is used to send Pub/Sub message ACKs.
46 A ACK
47
48 // Workers is the number of concurrent messages to be processed. If <= 0 , a
49 // default of pubsub.MaxSubscriptionPullSize will be used.
50 Workers int
51
52 // NoDataDelay is the amount of time to wait in between retries if there is
53 // either an error or no data polling Pub/Sub.
54 //
55 // If <= 0, DefaultNoDataDelay will be used.
56 NoDataDelay time.Duration
57 }
58
59 // Run executes until the supplied Context is canceled. Each recieved message
60 // is processed by a Handler.
61 func (s *Subscriber) Run(c context.Context, h Handler) {
62 // Start a goroutine to continuously pull messages and put them into mes sageC.
63 workers := s.Workers
64 if workers <= 0 {
65 workers = pubsub.MaxSubscriptionPullSize
66 }
67 messageC := make(chan *pubsub.Message, workers)
68 go func() {
69 defer close(messageC)
70
71 // Adjust our Pull batch size based on limits.
72 batchSize := workers
73 if batchSize > pubsub.MaxSubscriptionPullSize {
74 batchSize = pubsub.MaxSubscriptionPullSize
75 }
76
77 // Fetch and process another batch of messages.
78 for {
79 switch msgs, err := s.S.Pull(c, batchSize); err {
80 case context.Canceled, context.DeadlineExceeded:
81 return
82
83 case nil:
84 // Write all messages into messageC.
85 for _, msg := range msgs {
86 select {
87 case messageC <- msg:
88 break
89 case <-c.Done():
90 // Prefer messages for determini sm.
91 select {
92 case messageC <- msg:
93 break
94 default:
95 break
96 }
97
98 return
99 }
100 }
101
102 default:
103 log.WithError(err).Errorf(c, "Failed to pull mes sages.")
104 s.noDataSleep(c)
105 }
106 }
107 }()
108
109 // Dispatch message handlers in parallel.
110 parallel.Ignore(parallel.Run(workers, func(taskC chan<- func() error) {
111 for msg := range messageC {
112 msg := msg
113 taskC <- func() error {
114 if h(msg) {
115 s.A.Ack(msg.AckID)
116 }
117 return nil
118 }
119 }
120 }))
121 }
122
123 // noDataSleep sleeps for the configured NoDataDelay. This sleep will terminate
124 // immediately if the supplied Context is canceled.
125 //
126 // This method is called when a pull goroutine receives either an error or a
127 // response with no messages from Pub/Sub. In order to smooth out retry spam
128 // while we either wait for more messages or wait for Pub/Sub to work again, all
129 // of the goroutines share a sleep mutex.
130 //
131 // This collapses their potentially-parallel sleep attempts into a serial chain
132 // of sleeps. This is done by having all sleep attempts share a lock. Any
133 // goroutine that wants to sleep will wait for the lock and hold it through its
134 // sleep. This is a simple method to obtain the desired effect of avoiding
135 // pointless burst Pub/Sub spam when the service has nothing useful to offer.
136 func (s *Subscriber) noDataSleep(c context.Context) {
137 d := s.NoDataDelay
138 if d <= 0 {
139 d = DefaultNoDataDelay
140 }
141 clock.Sleep(c, d)
142 }
OLDNEW
« no previous file with comments | « common/gcloud/pubsub/subscriber/source.go ('k') | common/gcloud/pubsub/subscriber/subscriber_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698