Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1739)

Unified Diff: appengine/logdog/coordinator/archivalPublisher.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Fix proto comment. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « appengine/logdog/coordinator/archival.go ('k') | appengine/logdog/coordinator/auth.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: appengine/logdog/coordinator/archivalPublisher.go
diff --git a/appengine/logdog/coordinator/archivalPublisher.go b/appengine/logdog/coordinator/archivalPublisher.go
new file mode 100644
index 0000000000000000000000000000000000000000..8e7c8d17dea75d089ed8bd873950c2300b419b87
--- /dev/null
+++ b/appengine/logdog/coordinator/archivalPublisher.go
@@ -0,0 +1,56 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package coordinator
+
+import (
+ "time"
+
+ "github.com/golang/protobuf/proto"
+ "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
+ log "github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/common/retry"
+ "golang.org/x/net/context"
+ gcps "google.golang.org/cloud/pubsub"
+)
+
+// ArchivalPublisher is capable of publishing archival requests.
+type ArchivalPublisher interface {
+ // Publish publishes the supplied ArchiveTask.
+ Publish(context.Context, *logdog.ArchiveTask) error
+}
+
+type pubsubArchivalPublisher struct {
+ // topic is the authenticated Pub/Sub topic handle to publish to.
+ topic *gcps.TopicHandle
+}
+
+func (p *pubsubArchivalPublisher) Publish(c context.Context, t *logdog.ArchiveTask) error {
+ d, err := proto.Marshal(t)
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to marshal task.")
+ return err
+ }
+
+ // TODO: Route this through some system (e.g., task queue, tumble) that can
+ // impose a dispatch delay for the settle period.
+ msg := gcps.Message{
+ Data: d,
+ }
+
+ return retry.Retry(c, retry.Default, func() error {
+ log.Fields{
+ "path": t.Path,
+ "key": t.Key,
+ }.Infof(c, "Publishing archival message for stream.")
+
+ _, err := p.topic.Publish(c, &msg)
+ return err
+ }, func(err error, d time.Duration) {
+ log.Fields{
+ log.ErrorKey: err,
+ "delay": d,
+ }.Warningf(c, "Failed to publish task. Retrying...")
+ })
+}
« no previous file with comments | « appengine/logdog/coordinator/archival.go ('k') | appengine/logdog/coordinator/auth.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698