Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(274)

Side by Side Diff: common/gcloud/gcps/subscriber/subscriber.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
dnj (Google) 2016/01/21 04:36:24 This changed a fair bit. Originally it dispatched
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 // Package subscriber implements the Subscriber, which orchestrates parallel
6 // Pub/Sub subscription pulls.
7 package subscriber 5 package subscriber
8 6
9 import ( 7 import (
10 "sync" 8 "sync"
11 "time" 9 "time"
12 10
13 "github.com/luci/luci-go/common/clock" 11 "github.com/luci/luci-go/common/clock"
14 "github.com/luci/luci-go/common/gcloud/gcps"
15 log "github.com/luci/luci-go/common/logging" 12 log "github.com/luci/luci-go/common/logging"
16 "github.com/luci/luci-go/common/parallel" 13 "github.com/luci/luci-go/common/parallel"
17 "github.com/luci/luci-go/common/retry"
18 "golang.org/x/net/context" 14 "golang.org/x/net/context"
19 "google.golang.org/cloud/pubsub" 15 "google.golang.org/cloud/pubsub"
20 ) 16 )
21 17
22 const ( 18 const (
23 » // DefaultWorkers is the number of subscription workers to use. 19 » // DefaultNoDataDelay is the default amount of time a worker will sleep if
24 » DefaultWorkers = 20 20 » // there is no Pub/Sub data.
25 21 » DefaultNoDataDelay = 5 * time.Second
26 » // noDataDelay is the amount of time a worker will sleep if there is no
27 » // Pub/Sub data.
28 » noDataDelay = 1 * time.Second
29 ) 22 )
30 23
31 // PubSubPull is an interface for something that can return Pub/Sub messages on 24 // ACK is a goroutine-safe interface that is capable of sending Pub/Sub ACKs.
32 // request. 25 type ACK interface {
33 // 26 » // Ack ACKs a single Pub/Sub message ID.
34 // gcps.PubSub naturally implements this interface. 27 » Ack(string)
35 type PubSubPull interface {
36 » // Pull pulls messages from the subscription. It returns up the requeste d
37 » // number of messages.
38 » Pull(gcps.Subscription, int) ([]*pubsub.Message, error)
39 } 28 }
40 29
41 // Callback is invoked for each received Pub/Sub message. 30 // Handler is a handler function that manages an individual message. It returns
42 type Callback func(*pubsub.Message) 31 // true if the message should be consumed and false otherwise.
32 type Handler func(*pubsub.Message) bool
43 33
44 // Subscriber is an interface to return Pub/Sub messages from Cloud Pub/Sub. 34 // Subscriber pulls messages from a Pub/Sub channel and processes them.
45 // It spawns several worker goroutines to poll a Pub/Sub interface and invoke
46 // a configured callback for each received message.
47 type Subscriber struct { 35 type Subscriber struct {
48 » // PubSub is the underlying Pub/Sub instance to pull from. 36 » // S is used to pull Pub/Sub messages.
49 » PubSub PubSubPull 37 » S Source
38 » // A is used to send Pub/Sub message ACKs.
39 » A ACK
50 40
51 » // Subscription is the name of the subscription to poll. 41 » // Workers is the maximum number of simultaneous workers that a Subscrib er can
52 » Subscription gcps.Subscription 42 » // have at any given moment.
43 » //
44 » // If <= 0, one worker will be used.
45 » Workers int
53 46
54 » // BatchSize is the number of simultaneous messages to pull. If <= zero, 47 » // HandlerSem is the Semaphore to use to constrain the number of worker
55 » // gcps.MaxSubscriptionPullSize is used. 48 » // goroutines used to process individual messages. If nil, no constraint will
56 » BatchSize int 49 » // be applied.
50 » HandlerSem parallel.Semaphore
57 51
58 » // Workers is the number of Pub/Sub polling workers to simultaneously ru n. If 52 » // NoDataDelay is the amount of time to wait in between retries if there is
59 » // <= zero, DefaultWorkers will be used. 53 » // either an error or no data polling Pub/Sub.
60 » Workers int 54 » //
55 » // If <= 0, DefaultNoDataDelay will be used.
56 » NoDataDelay time.Duration
57
58 » // noDataMu is used to throttle retries if the subscription has no avail able
59 » // data.
60 » noDataMu sync.Mutex
61 » // handlerWG is the WaitGroup used to track outstanding message handlers .
62 » handlerWG sync.WaitGroup
61 } 63 }
62 64
63 // Run executes the Subscriber instance, spawning several workers and polling 65 // Run executes until the supplied Context is cancelled. Each recieved message
64 // for messages. The supplied callback will be invoked for each polled message. 66 // is processed by a Handler.
65 // 67 func (s *Subscriber) Run(c context.Context, h Handler) {
66 // Subscriber will run until the supplied Context is cancelled. 68 » workers := s.Workers
67 func (s *Subscriber) Run(ctx context.Context, cb Callback) { 69 » if workers <= 0 {
68 » batchSize := s.BatchSize 70 » » workers = 1
69 » if batchSize <= 0 {
70 » » batchSize = gcps.MaxSubscriptionPullSize
71 } 71 }
72 72
73 » // Set our base logging fields. 73 » parallel.WorkPool(workers, func(taskC chan<- func() error) {
74 » ctx = log.SetFields(ctx, log.Fields{ 74 » » for {
75 » » "subscription": s.Subscription, 75 » » » select {
76 » » "batchSize": batchSize, 76 » » » case <-c.Done():
77 » » » » return
78
79 » » » default:
80 » » » » // Fetch and process another batch of messages.
81 » » » » taskC <- func() error {
82 » » » » » msgs, err := s.S.Pull(c)
83 » » » » » switch err {
84 » » » » » case context.Canceled:
85 » » » » » » break
86
87 » » » » » case nil:
88 » » » » » » s.handleMessages(c, h, msgs)
89
90 » » » » » default:
91 » » » » » » log.WithError(err).Errorf(c, "Fa iled to pull messages.")
92 » » » » » » s.noDataSleep(c)
93 » » » » » }
94
95 » » » » » return nil
96 » » » » }
97 » » » }
98 » » }
77 }) 99 })
78 100
79 » // Mutex to protect Pub/Sub spamming when there are no available message s. 101 » // Wait for all of our Handlers to finish.
80 » noDataMu := sync.Mutex{} 102 » s.handlerWG.Wait()
103 }
81 104
82 » workers := s.Workers 105 func (s *Subscriber) handleMessages(c context.Context, h Handler, msgs []*pubsub .Message) {
83 » if workers <= 0 { 106 » if len(msgs) == 0 {
84 » » workers = DefaultWorkers 107 » » s.noDataSleep(c)
108 » » return
85 } 109 }
86 err := parallel.WorkPool(workers, func(taskC chan<- func() error) {
87 // Dispatch poll tasks until our Context is cancelled.
88 for active := true; active; {
89 // Check if we're cancelled.
90 select {
91 case <-ctx.Done():
92 log.WithError(ctx.Err()).Infof(ctx, "Context is finished.")
93 return
94 default:
95 break
96 }
97 110
98 » » » // Dispatch a poll task. Always return "nil", even if th ere is an error, 111 » // Handle all messages in parallel.
99 » » » // since error conditions are neither tracked nor fatal. 112 » parallel.Run(s.HandlerSem, func(taskC chan<- func() error) {
113 » » s.handlerWG.Add(len(msgs))
114 » » for _, msg := range msgs {
115 » » » msg := msg
116
117 » » » // Handle an individual message. If the Handler returns true, ACK
118 » » » // it.
100 taskC <- func() error { 119 taskC <- func() error {
101 » » » » if err := s.pullMessages(ctx, batchSize, &noData Mu, cb); err != nil { 120 » » » » defer s.handlerWG.Done()
102 » » » » » log.WithError(err).Errorf(ctx, "Failed t o pull messages.") 121
122 » » » » if h(msg) {
123 » » » » » s.A.Ack(msg.AckID)
103 } 124 }
104 return nil 125 return nil
105 } 126 }
106 } 127 }
107 }) 128 })
108 if err != nil {
109 log.WithError(err).Errorf(ctx, "Failed to run Subscriber work po ol.")
110 }
111 } 129 }
112 130
113 func (s *Subscriber) pullMessages(ctx context.Context, batchSize int, noDataMu s ync.Locker, cb Callback) error { 131 func (s *Subscriber) noDataSleep(c context.Context) {
114 » // Pull a set of messages. 132 » s.noDataMu.Lock()
115 » var messages []*pubsub.Message 133 » defer s.noDataMu.Unlock()
116 » err := retry.Retry(ctx, retry.TransientOnly(retry.Default()), func() (ie rr error) {
117 » » messages, ierr = s.PubSub.Pull(s.Subscription, batchSize)
118 » » return
119 » }, func(err error, d time.Duration) {
120 » » log.Fields{
121 » » » log.ErrorKey: err,
122 » » » "delay": d,
123 » » }.Warningf(ctx, "Transient error on Pull(). Retrying...")
124 » })
125 134
126 » if len(messages) > 0 { 135 » d := s.NoDataDelay
127 » » log.Fields{ 136 » if d <= 0 {
128 » » » "messageCount": len(messages), 137 » » d = DefaultNoDataDelay
129 » » }.Infof(ctx, "Pulled messages.")
130
131 » » for _, msg := range messages {
132 » » » cb(msg)
133 » » }
134 } 138 }
135 139 » cancellableSleep(c, d)
136 » if err != nil || len(messages) == 0 {
137 » » log.Fields{
138 » » » log.ErrorKey: err,
139 » » » "delay": noDataDelay,
140 » » }.Debugf(ctx, "Sleeping.")
141
142 » » noDataMu.Lock()
143 » » defer noDataMu.Unlock()
144 » » cancellableSleep(ctx, noDataDelay)
145 » }
146
147 » return err
148 } 140 }
149 141
150 // cancellableSleep sleeps, returning either when the sleep duration has expired 142 // cancellableSleep sleeps, returning either when the sleep duration has expired
151 // or the supplied context has been cancelled. 143 // or the supplied context has been cancelled.
152 func cancellableSleep(ctx context.Context, delay time.Duration) { 144 func cancellableSleep(c context.Context, delay time.Duration) {
153 // Sleep for "delay", stopping early if our Context is cancelled. 145 // Sleep for "delay", stopping early if our Context is cancelled.
154 select { 146 select {
155 » case <-clock.After(ctx, delay): 147 » case <-clock.After(c, delay):
156 break 148 break
157 149
158 » case <-ctx.Done(): 150 » case <-c.Done():
159 break 151 break
160 } 152 }
161 } 153 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698