Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(152)

Side by Side Diff: common/gcloud/pubsub/subscriber/subscriber.go

Issue 1838303002: Use native Pub/Sub library primitives. (Closed) Base URL: https://github.com/luci/luci-go@logdog-go1.6
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package subscriber
6
7 import (
8 "sync"
9 "time"
10
11 "github.com/luci/luci-go/common/clock"
12 "github.com/luci/luci-go/common/gcloud/pubsub"
13 log "github.com/luci/luci-go/common/logging"
14 "github.com/luci/luci-go/common/parallel"
15 "golang.org/x/net/context"
16 )
17
18 const (
19 // DefaultNoDataDelay is the default amount of time a worker will sleep if
20 // there is no Pub/Sub data.
21 DefaultNoDataDelay = 5 * time.Second
22 )
23
24 // ACK is a goroutine-safe interface that is capable of sending Pub/Sub ACKs.
25 type ACK interface {
26 // Ack ACKs a single Pub/Sub message ID.
27 //
28 // Note that this method cannot fail. The ACK instance is responsible fo r
29 // making a best effort to perform the acknowledgement and buffering/ret rying
30 // as it sees fit. Applications must understand that ACKs can fail and p lan
31 // their ingest pipeline accordingly.
32 //
33 // This functions primarily as a hand-off of responsibility from Subscri ber
34 // (intent to acknowledge) to ACK (responsibility to acknowledge).
35 Ack(string)
36 }
37
38 // Handler is a handler function that manages an individual message. It returns
39 // true if the message should be consumed and false otherwise.
40 type Handler func(*pubsub.Message) bool
41
42 // Subscriber pulls messages from a Pub/Sub channel and processes them.
43 type Subscriber struct {
44 // S is used to pull Pub/Sub messages.
45 S Source
46 // A is used to send Pub/Sub message ACKs.
47 A ACK
48
49 // PullWorkers is the maximum number of simultaneous worker goroutines t hat a
50 // Subscriber can have pulling Pub/Sub at any given moment.
51 //
52 // If <= 0, one worker will be used.
53 PullWorkers int
54
55 // HandlerWorkers is the maximum number of message processing workers th at
56 // the Subscriber will run at any given time. One worker is dispatched p er
57 // Pub/Sub message received.
58 //
59 // If <= 0, the number of handler workers will be unbounded.
60 HandlerWorkers int
61
62 // NoDataDelay is the amount of time to wait in between retries if there is
63 // either an error or no data polling Pub/Sub.
64 //
65 // If <= 0, DefaultNoDataDelay will be used.
66 NoDataDelay time.Duration
67
68 // noDataMu is used to throttle retries if the subscription has no avail able
69 // data.
70 noDataMu sync.Mutex
71 }
72
73 // Run executes until the supplied Context is canceled. Each recieved message
74 // is processed by a Handler.
75 func (s *Subscriber) Run(c context.Context, h Handler) {
76 pullWorkers := s.PullWorkers
77 if pullWorkers <= 0 {
78 pullWorkers = 1
79 }
80
81 runner := parallel.Runner{
82 Sustained: s.HandlerWorkers,
83 Maximum: s.HandlerWorkers,
84 }
85 defer runner.Close()
86
87 parallel.WorkPool(pullWorkers, func(taskC chan<- func() error) {
88 for {
89 // Stop if our Context has been canceled.
90 if err := c.Err(); err != nil {
91 return
92 }
93
94 // Fetch and process another batch of messages.
95 taskC <- func() error {
96 switch msgs, err := s.S.Pull(c); err {
97 case context.Canceled, context.DeadlineExceeded:
98 break
99
100 case nil:
101 s.handleMessages(c, h, &runner, msgs)
102
103 default:
104 log.WithError(err).Errorf(c, "Failed to pull messages.")
105 s.noDataSleep(c)
106 }
107
108 return nil
109 }
110 }
111 })
112 }
113
114 func (s *Subscriber) handleMessages(c context.Context, h Handler, r *parallel.Ru nner, msgs []*pubsub.Message) {
115 if len(msgs) == 0 {
116 s.noDataSleep(c)
117 return
118 }
119
120 // Handle all messages in parallel.
121 parallel.Ignore(r.Run(func(taskC chan<- func() error) {
122 for _, msg := range msgs {
123 msg := msg
124
125 // Handle an individual message. If the Handler returns true, ACK
126 // it.
127 taskC <- func() error {
128 if h(msg) {
129 s.A.Ack(msg.AckID)
130 }
131 return nil
132 }
133 }
134 }))
135 }
136
137 // noDataSleep sleeps for the configured NoDataDelay. This sleep will terminate
138 // immediately if the supplied Context is canceled.
139 //
140 // This method is called when a pull goroutine receives either an error or a
141 // response with no messages from Pub/Sub. In order to smooth out retry spam
142 // while we either wait for more messages or wait for Pub/Sub to work again, all
143 // of the goroutines share a sleep mutex.
144 //
145 // This collapses their potentially-parallel sleep attempts into a serial chain
146 // of sleeps. This is done by having all sleep attempts share a lock. Any
147 // goroutine that wants to sleep will wait for the lock and hold it through its
148 // sleep. This is a simple method to obtain the desired effect of avoiding
149 // pointless burst Pub/Sub spam when the service has nothing useful to offer.
150 func (s *Subscriber) noDataSleep(c context.Context) {
151 s.noDataMu.Lock()
152 defer s.noDataMu.Unlock()
153
154 d := s.NoDataDelay
155 if d <= 0 {
156 d = DefaultNoDataDelay
157 }
158 clock.Sleep(c, d)
159 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698