Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(19)

Unified Diff: common/gcloud/pubsub/subscriber/subscriber.go

Issue 1838303002: Use native Pub/Sub library primitives. (Closed) Base URL: https://github.com/luci/luci-go@logdog-go1.6
Patch Set: Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: common/gcloud/pubsub/subscriber/subscriber.go
diff --git a/common/gcloud/pubsub/subscriber/subscriber.go b/common/gcloud/pubsub/subscriber/subscriber.go
deleted file mode 100644
index c47116dfc85485d44ff0deda1bfb8c7b343bef08..0000000000000000000000000000000000000000
--- a/common/gcloud/pubsub/subscriber/subscriber.go
+++ /dev/null
@@ -1,159 +0,0 @@
-// Copyright 2016 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-package subscriber
-
-import (
- "sync"
- "time"
-
- "github.com/luci/luci-go/common/clock"
- "github.com/luci/luci-go/common/gcloud/pubsub"
- log "github.com/luci/luci-go/common/logging"
- "github.com/luci/luci-go/common/parallel"
- "golang.org/x/net/context"
-)
-
-const (
- // DefaultNoDataDelay is the default amount of time a worker will sleep if
- // there is no Pub/Sub data.
- DefaultNoDataDelay = 5 * time.Second
-)
-
-// ACK is a goroutine-safe interface that is capable of sending Pub/Sub ACKs.
-type ACK interface {
- // Ack ACKs a single Pub/Sub message ID.
- //
- // Note that this method cannot fail. The ACK instance is responsible for
- // making a best effort to perform the acknowledgement and buffering/retrying
- // as it sees fit. Applications must understand that ACKs can fail and plan
- // their ingest pipeline accordingly.
- //
- // This functions primarily as a hand-off of responsibility from Subscriber
- // (intent to acknowledge) to ACK (responsibility to acknowledge).
- Ack(string)
-}
-
-// Handler is a handler function that manages an individual message. It returns
-// true if the message should be consumed and false otherwise.
-type Handler func(*pubsub.Message) bool
-
-// Subscriber pulls messages from a Pub/Sub channel and processes them.
-type Subscriber struct {
- // S is used to pull Pub/Sub messages.
- S Source
- // A is used to send Pub/Sub message ACKs.
- A ACK
-
- // PullWorkers is the maximum number of simultaneous worker goroutines that a
- // Subscriber can have pulling Pub/Sub at any given moment.
- //
- // If <= 0, one worker will be used.
- PullWorkers int
-
- // HandlerWorkers is the maximum number of message processing workers that
- // the Subscriber will run at any given time. One worker is dispatched per
- // Pub/Sub message received.
- //
- // If <= 0, the number of handler workers will be unbounded.
- HandlerWorkers int
-
- // NoDataDelay is the amount of time to wait in between retries if there is
- // either an error or no data polling Pub/Sub.
- //
- // If <= 0, DefaultNoDataDelay will be used.
- NoDataDelay time.Duration
-
- // noDataMu is used to throttle retries if the subscription has no available
- // data.
- noDataMu sync.Mutex
-}
-
-// Run executes until the supplied Context is canceled. Each recieved message
-// is processed by a Handler.
-func (s *Subscriber) Run(c context.Context, h Handler) {
- pullWorkers := s.PullWorkers
- if pullWorkers <= 0 {
- pullWorkers = 1
- }
-
- runner := parallel.Runner{
- Sustained: s.HandlerWorkers,
- Maximum: s.HandlerWorkers,
- }
- defer runner.Close()
-
- parallel.WorkPool(pullWorkers, func(taskC chan<- func() error) {
- for {
- // Stop if our Context has been canceled.
- if err := c.Err(); err != nil {
- return
- }
-
- // Fetch and process another batch of messages.
- taskC <- func() error {
- switch msgs, err := s.S.Pull(c); err {
- case context.Canceled, context.DeadlineExceeded:
- break
-
- case nil:
- s.handleMessages(c, h, &runner, msgs)
-
- default:
- log.WithError(err).Errorf(c, "Failed to pull messages.")
- s.noDataSleep(c)
- }
-
- return nil
- }
- }
- })
-}
-
-func (s *Subscriber) handleMessages(c context.Context, h Handler, r *parallel.Runner, msgs []*pubsub.Message) {
- if len(msgs) == 0 {
- s.noDataSleep(c)
- return
- }
-
- // Handle all messages in parallel.
- parallel.Ignore(r.Run(func(taskC chan<- func() error) {
- for _, msg := range msgs {
- msg := msg
-
- // Handle an individual message. If the Handler returns true, ACK
- // it.
- taskC <- func() error {
- if h(msg) {
- s.A.Ack(msg.AckID)
- }
- return nil
- }
- }
- }))
-}
-
-// noDataSleep sleeps for the configured NoDataDelay. This sleep will terminate
-// immediately if the supplied Context is canceled.
-//
-// This method is called when a pull goroutine receives either an error or a
-// response with no messages from Pub/Sub. In order to smooth out retry spam
-// while we either wait for more messages or wait for Pub/Sub to work again, all
-// of the goroutines share a sleep mutex.
-//
-// This collapses their potentially-parallel sleep attempts into a serial chain
-// of sleeps. This is done by having all sleep attempts share a lock. Any
-// goroutine that wants to sleep will wait for the lock and hold it through its
-// sleep. This is a simple method to obtain the desired effect of avoiding
-// pointless burst Pub/Sub spam when the service has nothing useful to offer.
-func (s *Subscriber) noDataSleep(c context.Context) {
- s.noDataMu.Lock()
- defer s.noDataMu.Unlock()
-
- d := s.NoDataDelay
- if d <= 0 {
- d = DefaultNoDataDelay
- }
- clock.Sleep(c, d)
-}

Powered by Google App Engine
This is Rietveld 408576698