Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(682)

Unified Diff: server/internal/logdog/collector/coordinator.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Post-splitting rebase. Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: server/internal/logdog/collector/coordinator.go
diff --git a/server/internal/logdog/collector/coordinator.go b/server/internal/logdog/collector/coordinator.go
new file mode 100644
index 0000000000000000000000000000000000000000..44f3b1b8ff81ee6c4f1fdc97afb1d838b6c951ec
--- /dev/null
+++ b/server/internal/logdog/collector/coordinator.go
@@ -0,0 +1,81 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package collector
+
+import (
+ "time"
+
+ "github.com/luci/luci-go/common/logdog/types"
+ log "github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/common/retry"
+ cc "github.com/luci/luci-go/server/internal/logdog/coordinatorClient"
+ "golang.org/x/net/context"
+)
+
+// CoordinatorClient is an interface to the methods in the Coordiantor service
martiniss 2016/01/23 01:05:41 Typo
dnj (Google) 2016/01/26 05:06:05 Done.
+// that Collector uses.
+//
+// cc.Client implements this interface.
+type CoordinatorClient interface {
+ // RegisterStream registers a stream's state with the Coordinator. On success,
+ // it will return the Coordinator's view of the stream state.
+ //
+ // This operation is idempotent. It may return an errors.Transient error if
+ // a transient failure happened.
+ RegisterStream(context.Context, cc.State) (*cc.State, error)
+
+ // TerminateStream registers the stream's terminal index with the Coordinator.
+ // If successful, the terminal index was successfully registered.
+ //
+ // This operation is idempotent. It may return an errors.Transient error if
+ // a transient failure happened.
+ TerminateStream(ctx context.Context, p types.StreamPath, s []byte, idx types.MessageIndex) error
+}
+
+// RetryCoordinatorClient wraps a CoordinatorClient, retrying transient errors.
+type RetryCoordinatorClient struct {
+ // Client is the CoordinatorClient that is being wrapped.
+ Client CoordinatorClient
+
+ // G is the retry.Generator to use to generate retry.Iterator instances. If
+ // nil, retry.Default will be used.
+ G retry.Generator
+}
+
+// RegisterStream implements CoordinatorClient.
+func (c *RetryCoordinatorClient) RegisterStream(ctx context.Context, st cc.State) (rst *cc.State, err error) {
+ err = retry.Retry(ctx, c.retryIter(), func() (err error) {
+ rst, err = c.Client.RegisterStream(ctx, st)
+ return
+ }, func(err error, d time.Duration) {
+ log.Fields{
+ log.ErrorKey: err,
+ "delay": d,
+ }.Warningf(ctx, "Transient error registering stream. Retrying...")
+ })
+ return
+}
+
+// TerminateStream implements CoordinatorClient.
+func (c *RetryCoordinatorClient) TerminateStream(ctx context.Context, p types.StreamPath, s []byte, idx types.MessageIndex) error {
+ return retry.Retry(ctx, c.retryIter(), func() error {
+ return c.Client.TerminateStream(ctx, p, s, idx)
+ }, func(err error, d time.Duration) {
+ log.Fields{
+ log.ErrorKey: err,
+ "delay": d,
+ }.Warningf(ctx, "Transient error terminating stream. Retrying...")
+ })
+}
+
+func (c *RetryCoordinatorClient) retryIter() retry.Iterator {
+ var it retry.Iterator
+ if c.G != nil {
+ it = c.G()
+ } else {
+ it = retry.Default()
+ }
+ return retry.TransientOnly(it)
+}

Powered by Google App Engine
This is Rietveld 408576698