Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1052)

Side by Side Diff: common/gcloud/pubsub/subscriber/subscriber.go

Issue 1838803002: LogDog: BigTable batching schema. (Closed) Base URL: https://github.com/luci/luci-go@recordio-split
Patch Set: Minor comments and quality of code tweaks. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package subscriber 5 package subscriber
6 6
7 import ( 7 import (
8 "sync"
9 "time" 8 "time"
10 9
11 "github.com/luci/luci-go/common/clock" 10 "github.com/luci/luci-go/common/clock"
12 "github.com/luci/luci-go/common/gcloud/pubsub" 11 "github.com/luci/luci-go/common/gcloud/pubsub"
13 log "github.com/luci/luci-go/common/logging" 12 log "github.com/luci/luci-go/common/logging"
14 "github.com/luci/luci-go/common/parallel" 13 "github.com/luci/luci-go/common/parallel"
15 "golang.org/x/net/context" 14 "golang.org/x/net/context"
16 ) 15 )
17 16
18 const ( 17 const (
(...skipping 20 matching lines...) Expand all
39 // true if the message should be consumed and false otherwise. 38 // true if the message should be consumed and false otherwise.
40 type Handler func(*pubsub.Message) bool 39 type Handler func(*pubsub.Message) bool
41 40
42 // Subscriber pulls messages from a Pub/Sub channel and processes them. 41 // Subscriber pulls messages from a Pub/Sub channel and processes them.
43 type Subscriber struct { 42 type Subscriber struct {
44 // S is used to pull Pub/Sub messages. 43 // S is used to pull Pub/Sub messages.
45 S Source 44 S Source
46 // A is used to send Pub/Sub message ACKs. 45 // A is used to send Pub/Sub message ACKs.
47 A ACK 46 A ACK
48 47
49 » // PullWorkers is the maximum number of simultaneous worker goroutines t hat a 48 » // Workers is the number of concurrent messages to be processed. If <= 0 , a
50 » // Subscriber can have pulling Pub/Sub at any given moment. 49 » // default of pubsub.MaxSubscriptionPullSize will be used.
51 » // 50 » Workers int
52 » // If <= 0, one worker will be used.
53 » PullWorkers int
54
55 » // HandlerWorkers is the maximum number of message processing workers th at
56 » // the Subscriber will run at any given time. One worker is dispatched p er
57 » // Pub/Sub message received.
58 » //
59 » // If <= 0, the number of handler workers will be unbounded.
60 » HandlerWorkers int
61 51
62 // NoDataDelay is the amount of time to wait in between retries if there is 52 // NoDataDelay is the amount of time to wait in between retries if there is
63 // either an error or no data polling Pub/Sub. 53 // either an error or no data polling Pub/Sub.
64 // 54 //
65 // If <= 0, DefaultNoDataDelay will be used. 55 // If <= 0, DefaultNoDataDelay will be used.
66 NoDataDelay time.Duration 56 NoDataDelay time.Duration
67
68 // noDataMu is used to throttle retries if the subscription has no avail able
69 // data.
70 noDataMu sync.Mutex
71 } 57 }
72 58
73 // Run executes until the supplied Context is canceled. Each recieved message 59 // Run executes until the supplied Context is canceled. Each recieved message
74 // is processed by a Handler. 60 // is processed by a Handler.
75 func (s *Subscriber) Run(c context.Context, h Handler) { 61 func (s *Subscriber) Run(c context.Context, h Handler) {
76 » pullWorkers := s.PullWorkers 62 » // Start a goroutine to continuously pull messages and put them into mes sageC.
77 » if pullWorkers <= 0 { 63 » workers := s.Workers
78 » » pullWorkers = 1 64 » if workers <= 0 {
65 » » workers = pubsub.MaxSubscriptionPullSize
79 } 66 }
67 messageC := make(chan *pubsub.Message, workers)
68 go func() {
69 defer close(messageC)
80 70
81 » runner := parallel.Runner{ 71 » » // Adjust our Pull batch size based on limits.
82 » » Sustained: s.HandlerWorkers, 72 » » batchSize := workers
83 » » Maximum: s.HandlerWorkers, 73 » » if batchSize > pubsub.MaxSubscriptionPullSize {
84 » } 74 » » » batchSize = pubsub.MaxSubscriptionPullSize
85 » defer runner.Close() 75 » » }
86 76
87 » parallel.WorkPool(pullWorkers, func(taskC chan<- func() error) { 77 » » // Fetch and process another batch of messages.
88 for { 78 for {
89 » » » // Stop if our Context has been canceled. 79 » » » switch msgs, err := s.S.Pull(c, batchSize); err {
90 » » » if err := c.Err(); err != nil { 80 » » » case context.Canceled, context.DeadlineExceeded:
91 return 81 return
92 }
93 82
94 » » » // Fetch and process another batch of messages. 83 » » » case nil:
95 » » » taskC <- func() error { 84 » » » » // Write all messages into messageC.
96 » » » » switch msgs, err := s.S.Pull(c); err { 85 » » » » for _, msg := range msgs {
97 » » » » case context.Canceled, context.DeadlineExceeded: 86 » » » » » select {
98 » » » » » break 87 » » » » » case messageC <- msg:
88 » » » » » » break
89 » » » » » case <-c.Done():
90 » » » » » » // Prefer messages for determini sm.
91 » » » » » » select {
92 » » » » » » case messageC <- msg:
93 » » » » » » » break
94 » » » » » » default:
95 » » » » » » » break
96 » » » » » » }
99 97
100 » » » » case nil: 98 » » » » » » return
101 » » » » » s.handleMessages(c, h, &runner, msgs) 99 » » » » » }
102
103 » » » » default:
104 » » » » » log.WithError(err).Errorf(c, "Failed to pull messages.")
105 » » » » » s.noDataSleep(c)
106 } 100 }
107 101
108 » » » » return nil 102 » » » default:
103 » » » » log.WithError(err).Errorf(c, "Failed to pull mes sages.")
104 » » » » s.noDataSleep(c)
109 } 105 }
110 } 106 }
111 » }) 107 » }()
112 }
113 108
114 func (s *Subscriber) handleMessages(c context.Context, h Handler, r *parallel.Ru nner, msgs []*pubsub.Message) { 109 » // Dispatch message handlers in parallel.
115 » if len(msgs) == 0 { 110 » parallel.Ignore(parallel.Run(workers, func(taskC chan<- func() error) {
116 » » s.noDataSleep(c) 111 » » for msg := range messageC {
117 » » return
118 » }
119
120 » // Handle all messages in parallel.
121 » parallel.Ignore(r.Run(func(taskC chan<- func() error) {
122 » » for _, msg := range msgs {
123 msg := msg 112 msg := msg
124
125 // Handle an individual message. If the Handler returns true, ACK
126 // it.
127 taskC <- func() error { 113 taskC <- func() error {
128 if h(msg) { 114 if h(msg) {
129 s.A.Ack(msg.AckID) 115 s.A.Ack(msg.AckID)
130 } 116 }
131 return nil 117 return nil
132 } 118 }
133 } 119 }
134 })) 120 }))
135 } 121 }
136 122
137 // noDataSleep sleeps for the configured NoDataDelay. This sleep will terminate 123 // noDataSleep sleeps for the configured NoDataDelay. This sleep will terminate
138 // immediately if the supplied Context is canceled. 124 // immediately if the supplied Context is canceled.
139 // 125 //
140 // This method is called when a pull goroutine receives either an error or a 126 // This method is called when a pull goroutine receives either an error or a
141 // response with no messages from Pub/Sub. In order to smooth out retry spam 127 // response with no messages from Pub/Sub. In order to smooth out retry spam
142 // while we either wait for more messages or wait for Pub/Sub to work again, all 128 // while we either wait for more messages or wait for Pub/Sub to work again, all
143 // of the goroutines share a sleep mutex. 129 // of the goroutines share a sleep mutex.
144 // 130 //
145 // This collapses their potentially-parallel sleep attempts into a serial chain 131 // This collapses their potentially-parallel sleep attempts into a serial chain
146 // of sleeps. This is done by having all sleep attempts share a lock. Any 132 // of sleeps. This is done by having all sleep attempts share a lock. Any
147 // goroutine that wants to sleep will wait for the lock and hold it through its 133 // goroutine that wants to sleep will wait for the lock and hold it through its
148 // sleep. This is a simple method to obtain the desired effect of avoiding 134 // sleep. This is a simple method to obtain the desired effect of avoiding
149 // pointless burst Pub/Sub spam when the service has nothing useful to offer. 135 // pointless burst Pub/Sub spam when the service has nothing useful to offer.
150 func (s *Subscriber) noDataSleep(c context.Context) { 136 func (s *Subscriber) noDataSleep(c context.Context) {
151 s.noDataMu.Lock()
152 defer s.noDataMu.Unlock()
153
154 d := s.NoDataDelay 137 d := s.NoDataDelay
155 if d <= 0 { 138 if d <= 0 {
156 d = DefaultNoDataDelay 139 d = DefaultNoDataDelay
157 } 140 }
158 clock.Sleep(c, d) 141 clock.Sleep(c, d)
159 } 142 }
OLDNEW
« no previous file with comments | « common/gcloud/pubsub/subscriber/source.go ('k') | common/gcloud/pubsub/subscriber/subscriber_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698