Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(74)

Unified Diff: server/cmd/logdog_collector/main.go

Issue 1838303002: Use native Pub/Sub library primitives. (Closed) Base URL: https://github.com/luci/luci-go@logdog-go1.6
Patch Set: Use "Topic" instead of "NewTopic" ... don't want to create :) Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « common/tsmon/monitor/pubsub.go ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: server/cmd/logdog_collector/main.go
diff --git a/server/cmd/logdog_collector/main.go b/server/cmd/logdog_collector/main.go
index ffc2997769dab18317a4d5209fde8880aeaf29e2..9ea89b23c4264bf9b6f7acef50d9925341352c25 100644
--- a/server/cmd/logdog_collector/main.go
+++ b/server/cmd/logdog_collector/main.go
@@ -11,20 +11,25 @@ import (
"github.com/luci/luci-go/common/auth"
"github.com/luci/luci-go/common/clock"
"github.com/luci/luci-go/common/errors"
- "github.com/luci/luci-go/common/gcloud/pubsub"
- "github.com/luci/luci-go/common/gcloud/pubsub/ackbuffer"
- "github.com/luci/luci-go/common/gcloud/pubsub/subscriber"
+ gcps "github.com/luci/luci-go/common/gcloud/pubsub"
log "github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/common/parallel"
"github.com/luci/luci-go/server/internal/logdog/collector"
"github.com/luci/luci-go/server/internal/logdog/collector/coordinator"
"github.com/luci/luci-go/server/internal/logdog/service"
"golang.org/x/net/context"
+ "google.golang.org/cloud"
+ "google.golang.org/cloud/pubsub"
)
var (
errInvalidConfig = errors.New("invalid configuration")
)
+const (
+ pubsubPullErrorDelay = 10 * time.Second
+)
+
// application is the Collector application state.
type application struct {
service.Service
@@ -44,32 +49,31 @@ func (a *application) runCollector(c context.Context) error {
}
// Our Subscription must be a valid one.
- sub := pubsub.NewSubscription(pscfg.Project, pscfg.Subscription)
+ sub := gcps.NewSubscription(pscfg.Project, pscfg.Subscription)
if err := sub.Validate(); err != nil {
return fmt.Errorf("invalid Pub/Sub subscription %q: %v", sub, err)
}
// New PubSub instance with the authenticated client.
- psClient, err := a.AuthenticatedClient(func(o *auth.Options) {
- o.Scopes = pubsub.SubscriberScopes
+ psAuth, err := a.Authenticator(func(o *auth.Options) {
+ o.Scopes = gcps.SubscriberScopes
})
if err != nil {
- log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.")
+ log.WithError(err).Errorf(c, "Failed to create Pub/Sub token source.")
return err
}
- // Create a retrying Pub/Sub client.
- ps := &pubsub.Retry{
- Connection: pubsub.NewConnection(psClient),
- Callback: func(err error, d time.Duration) {
- log.Fields{
- log.ErrorKey: err,
- "delay": d,
- }.Warningf(c, "Transient error encountered; retrying...")
- },
+ psClient, err := pubsub.NewClient(c, pscfg.Project, cloud.WithTokenSource(psAuth.TokenSource()))
+ if err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "subscription": sub,
+ }.Errorf(c, "Failed to create Pub/Sub client.")
+ return err
}
- exists, err := ps.SubExists(c, sub)
+ psSub := psClient.Subscription(pscfg.Subscription)
+ exists, err := psSub.Exists(c)
if err != nil {
log.Fields{
log.ErrorKey: err,
@@ -98,14 +102,6 @@ func (a *application) runCollector(c context.Context) error {
shutdownCtx, shutdownFunc := context.WithCancel(c)
a.SetShutdownFunc(shutdownFunc)
- // Start an ACK buffer so that we can batch ACKs. Note that we do NOT use the
- // shutdown context here, as we want clean shutdowns to continue to ack any
- // buffered messages.
- ab := ackbuffer.New(c, ackbuffer.Config{
- Ack: ackbuffer.NewACK(ps, sub, 0),
- })
- defer ab.CloseAndFlush()
-
// Initialize our Collector service object using a caching Coordinator
// interface.
coord := coordinator.NewCoordinator(a.Coordinator())
@@ -118,57 +114,84 @@ func (a *application) runCollector(c context.Context) error {
}
defer coll.Close()
- // Execute our main Subscriber loop. It will run until the supplied Context
- // is cancelled.
- clk := clock.Get(c)
- engine := subscriber.Subscriber{
- S: subscriber.NewSource(ps, sub),
- A: ab,
- Workers: int(ccfg.MaxConcurrentMessages),
+ // Execute our main subscription pull loop. It will run until the supplied
+ // Context is cancelled.
+ psIterator, err := psSub.Pull(c)
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to create Pub/Sub iterator.")
+ return err
}
- engine.Run(shutdownCtx, func(msg *pubsub.Message) bool {
- c := log.SetField(c, "messageID", msg.ID)
- log.Fields{
- "ackID": msg.AckID,
- "size": len(msg.Data),
- }.Infof(c, "Received Pub/Sub Message.")
-
- startTime := clk.Now()
- err := coll.Process(c, msg.Data)
- duration := clk.Now().Sub(startTime)
-
- switch {
- case errors.IsTransient(err):
- // Do not consume
- log.Fields{
- log.ErrorKey: err,
- "duration": duration,
- }.Warningf(c, "TRANSIENT error ingesting Pub/Sub message.")
- return false
-
- case err == nil:
- log.Fields{
- "ackID": msg.AckID,
- "size": len(msg.Data),
- "duration": duration,
- }.Infof(c, "Message successfully processed; ACKing.")
- return true
-
- default:
- log.Fields{
- log.ErrorKey: err,
- "ackID": msg.AckID,
- "size": len(msg.Data),
- "duration": duration,
- }.Errorf(c, "Non-transient error ingesting Pub/Sub message; ACKing.")
- return true
+ defer func() {
+ log.Debugf(c, "Waiting for Pub/Sub subscription iterator to stop...")
+ psIterator.Stop()
+ log.Debugf(c, "Pub/Sub subscription iterator has stopped.")
+ }()
+
+ parallel.Ignore(parallel.Run(int(ccfg.MaxConcurrentMessages), func(taskC chan<- func() error) {
+ // Loop until shut down.
+ for shutdownCtx.Err() == nil {
+ msg, err := psIterator.Next()
+ if err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "delay": pubsubPullErrorDelay,
+ }.Errorf(c, "Failed to fetch Pub/Sub message, retry after delay...")
+ clock.Sleep(c, pubsubPullErrorDelay)
+ continue
+ }
+
+ taskC <- func() error {
+ c := log.SetField(c, "messageID", msg.ID)
+ msg.Done(a.processMessage(c, &coll, msg))
+ return nil
+ }
}
- })
+ }))
log.Debugf(c, "Collector finished.")
return nil
}
+// processMessage returns true if the message should be ACK'd (deleted from
+// Pub/Sub) or false if the message should not be ACK'd.
+func (a *application) processMessage(c context.Context, coll *collector.Collector, msg *pubsub.Message) bool {
+ log.Fields{
+ "ackID": msg.AckID,
+ "size": len(msg.Data),
+ }.Infof(c, "Received Pub/Sub Message.")
+
+ startTime := clock.Now(c)
+ err := coll.Process(c, msg.Data)
+ duration := clock.Now(c).Sub(startTime)
+
+ switch {
+ case errors.IsTransient(err):
+ // Do not consume
+ log.Fields{
+ log.ErrorKey: err,
+ "duration": duration,
+ }.Warningf(c, "TRANSIENT error ingesting Pub/Sub message.")
+ return false
+
+ case err == nil:
+ log.Fields{
+ "ackID": msg.AckID,
+ "size": len(msg.Data),
+ "duration": duration,
+ }.Infof(c, "Message successfully processed; ACKing.")
+ return true
+
+ default:
+ log.Fields{
+ log.ErrorKey: err,
+ "ackID": msg.AckID,
+ "size": len(msg.Data),
+ "duration": duration,
+ }.Errorf(c, "Non-transient error ingesting Pub/Sub message; ACKing.")
+ return true
+ }
+}
+
// Entry point.
func main() {
a := application{}
« no previous file with comments | « common/tsmon/monitor/pubsub.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698