Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(59)

Unified Diff: common/gcloud/gcps/subscriber/subscriber.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: common/gcloud/gcps/subscriber/subscriber.go
diff --git a/common/gcloud/gcps/subscriber/subscriber.go b/common/gcloud/gcps/subscriber/subscriber.go
index aafacc250d2944d7e1b7e8acd7809d683d40f291..cd5d343ec56418db4408f599e0b6336ce8111958 100644
--- a/common/gcloud/gcps/subscriber/subscriber.go
+++ b/common/gcloud/gcps/subscriber/subscriber.go
@@ -1,9 +1,7 @@
-// Copyright 2015 The Chromium Authors. All rights reserved.
+// Copyright 2016 The Chromium Authors. All rights reserved.
dnj (Google) 2016/01/21 04:36:24 This changed a fair bit. Originally it dispatched
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-// Package subscriber implements the Subscriber, which orchestrates parallel
-// Pub/Sub subscription pulls.
package subscriber
import (
@@ -11,151 +9,145 @@ import (
"time"
"github.com/luci/luci-go/common/clock"
- "github.com/luci/luci-go/common/gcloud/gcps"
log "github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/common/parallel"
- "github.com/luci/luci-go/common/retry"
"golang.org/x/net/context"
"google.golang.org/cloud/pubsub"
)
const (
- // DefaultWorkers is the number of subscription workers to use.
- DefaultWorkers = 20
-
- // noDataDelay is the amount of time a worker will sleep if there is no
- // Pub/Sub data.
- noDataDelay = 1 * time.Second
+ // DefaultNoDataDelay is the default amount of time a worker will sleep if
+ // there is no Pub/Sub data.
+ DefaultNoDataDelay = 5 * time.Second
)
-// PubSubPull is an interface for something that can return Pub/Sub messages on
-// request.
-//
-// gcps.PubSub naturally implements this interface.
-type PubSubPull interface {
- // Pull pulls messages from the subscription. It returns up the requested
- // number of messages.
- Pull(gcps.Subscription, int) ([]*pubsub.Message, error)
+// ACK is a goroutine-safe interface that is capable of sending Pub/Sub ACKs.
+type ACK interface {
+ // Ack ACKs a single Pub/Sub message ID.
+ Ack(string)
}
-// Callback is invoked for each received Pub/Sub message.
-type Callback func(*pubsub.Message)
+// Handler is a handler function that manages an individual message. It returns
+// true if the message should be consumed and false otherwise.
+type Handler func(*pubsub.Message) bool
-// Subscriber is an interface to return Pub/Sub messages from Cloud Pub/Sub.
-// It spawns several worker goroutines to poll a Pub/Sub interface and invoke
-// a configured callback for each received message.
+// Subscriber pulls messages from a Pub/Sub channel and processes them.
type Subscriber struct {
- // PubSub is the underlying Pub/Sub instance to pull from.
- PubSub PubSubPull
-
- // Subscription is the name of the subscription to poll.
- Subscription gcps.Subscription
-
- // BatchSize is the number of simultaneous messages to pull. If <= zero,
- // gcps.MaxSubscriptionPullSize is used.
- BatchSize int
-
- // Workers is the number of Pub/Sub polling workers to simultaneously run. If
- // <= zero, DefaultWorkers will be used.
+ // S is used to pull Pub/Sub messages.
+ S Source
+ // A is used to send Pub/Sub message ACKs.
+ A ACK
+
+ // Workers is the maximum number of simultaneous workers that a Subscriber can
+ // have at any given moment.
+ //
+ // If <= 0, one worker will be used.
Workers int
-}
-
-// Run executes the Subscriber instance, spawning several workers and polling
-// for messages. The supplied callback will be invoked for each polled message.
-//
-// Subscriber will run until the supplied Context is cancelled.
-func (s *Subscriber) Run(ctx context.Context, cb Callback) {
- batchSize := s.BatchSize
- if batchSize <= 0 {
- batchSize = gcps.MaxSubscriptionPullSize
- }
- // Set our base logging fields.
- ctx = log.SetFields(ctx, log.Fields{
- "subscription": s.Subscription,
- "batchSize": batchSize,
- })
-
- // Mutex to protect Pub/Sub spamming when there are no available messages.
- noDataMu := sync.Mutex{}
+ // HandlerSem is the Semaphore to use to constrain the number of worker
+ // goroutines used to process individual messages. If nil, no constraint will
+ // be applied.
+ HandlerSem parallel.Semaphore
+
+ // NoDataDelay is the amount of time to wait in between retries if there is
+ // either an error or no data polling Pub/Sub.
+ //
+ // If <= 0, DefaultNoDataDelay will be used.
+ NoDataDelay time.Duration
+
+ // noDataMu is used to throttle retries if the subscription has no available
+ // data.
+ noDataMu sync.Mutex
+ // handlerWG is the WaitGroup used to track outstanding message handlers.
+ handlerWG sync.WaitGroup
+}
+// Run executes until the supplied Context is cancelled. Each recieved message
+// is processed by a Handler.
+func (s *Subscriber) Run(c context.Context, h Handler) {
workers := s.Workers
if workers <= 0 {
- workers = DefaultWorkers
+ workers = 1
}
- err := parallel.WorkPool(workers, func(taskC chan<- func() error) {
- // Dispatch poll tasks until our Context is cancelled.
- for active := true; active; {
- // Check if we're cancelled.
+
+ parallel.WorkPool(workers, func(taskC chan<- func() error) {
+ for {
select {
- case <-ctx.Done():
- log.WithError(ctx.Err()).Infof(ctx, "Context is finished.")
+ case <-c.Done():
return
- default:
- break
- }
- // Dispatch a poll task. Always return "nil", even if there is an error,
- // since error conditions are neither tracked nor fatal.
- taskC <- func() error {
- if err := s.pullMessages(ctx, batchSize, &noDataMu, cb); err != nil {
- log.WithError(err).Errorf(ctx, "Failed to pull messages.")
+ default:
+ // Fetch and process another batch of messages.
+ taskC <- func() error {
+ msgs, err := s.S.Pull(c)
+ switch err {
+ case context.Canceled:
+ break
+
+ case nil:
+ s.handleMessages(c, h, msgs)
+
+ default:
+ log.WithError(err).Errorf(c, "Failed to pull messages.")
+ s.noDataSleep(c)
+ }
+
+ return nil
}
- return nil
}
}
})
- if err != nil {
- log.WithError(err).Errorf(ctx, "Failed to run Subscriber work pool.")
- }
+
+ // Wait for all of our Handlers to finish.
+ s.handlerWG.Wait()
}
-func (s *Subscriber) pullMessages(ctx context.Context, batchSize int, noDataMu sync.Locker, cb Callback) error {
- // Pull a set of messages.
- var messages []*pubsub.Message
- err := retry.Retry(ctx, retry.TransientOnly(retry.Default()), func() (ierr error) {
- messages, ierr = s.PubSub.Pull(s.Subscription, batchSize)
+func (s *Subscriber) handleMessages(c context.Context, h Handler, msgs []*pubsub.Message) {
+ if len(msgs) == 0 {
+ s.noDataSleep(c)
return
- }, func(err error, d time.Duration) {
- log.Fields{
- log.ErrorKey: err,
- "delay": d,
- }.Warningf(ctx, "Transient error on Pull(). Retrying...")
- })
+ }
+
+ // Handle all messages in parallel.
+ parallel.Run(s.HandlerSem, func(taskC chan<- func() error) {
+ s.handlerWG.Add(len(msgs))
+ for _, msg := range msgs {
+ msg := msg
- if len(messages) > 0 {
- log.Fields{
- "messageCount": len(messages),
- }.Infof(ctx, "Pulled messages.")
+ // Handle an individual message. If the Handler returns true, ACK
+ // it.
+ taskC <- func() error {
+ defer s.handlerWG.Done()
- for _, msg := range messages {
- cb(msg)
+ if h(msg) {
+ s.A.Ack(msg.AckID)
+ }
+ return nil
+ }
}
- }
+ })
+}
- if err != nil || len(messages) == 0 {
- log.Fields{
- log.ErrorKey: err,
- "delay": noDataDelay,
- }.Debugf(ctx, "Sleeping.")
+func (s *Subscriber) noDataSleep(c context.Context) {
+ s.noDataMu.Lock()
+ defer s.noDataMu.Unlock()
- noDataMu.Lock()
- defer noDataMu.Unlock()
- cancellableSleep(ctx, noDataDelay)
+ d := s.NoDataDelay
+ if d <= 0 {
+ d = DefaultNoDataDelay
}
-
- return err
+ cancellableSleep(c, d)
}
// cancellableSleep sleeps, returning either when the sleep duration has expired
// or the supplied context has been cancelled.
-func cancellableSleep(ctx context.Context, delay time.Duration) {
+func cancellableSleep(c context.Context, delay time.Duration) {
// Sleep for "delay", stopping early if our Context is cancelled.
select {
- case <-clock.After(ctx, delay):
+ case <-clock.After(c, delay):
break
- case <-ctx.Done():
+ case <-c.Done():
break
}
}

Powered by Google App Engine
This is Rietveld 408576698