Chromium Code Reviews| Index: server/internal/logdog/collector/coordinator.go |
| diff --git a/server/internal/logdog/collector/coordinator.go b/server/internal/logdog/collector/coordinator.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..44f3b1b8ff81ee6c4f1fdc97afb1d838b6c951ec |
| --- /dev/null |
| +++ b/server/internal/logdog/collector/coordinator.go |
| @@ -0,0 +1,81 @@ |
| +// Copyright 2016 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package collector |
| + |
| +import ( |
| + "time" |
| + |
| + "github.com/luci/luci-go/common/logdog/types" |
| + log "github.com/luci/luci-go/common/logging" |
| + "github.com/luci/luci-go/common/retry" |
| + cc "github.com/luci/luci-go/server/internal/logdog/coordinatorClient" |
| + "golang.org/x/net/context" |
| +) |
| + |
| +// CoordinatorClient is an interface to the methods in the Coordiantor service |
|
martiniss
2016/01/23 01:05:41
Typo
dnj (Google)
2016/01/26 05:06:05
Done.
|
| +// that Collector uses. |
| +// |
| +// cc.Client implements this interface. |
| +type CoordinatorClient interface { |
| + // RegisterStream registers a stream's state with the Coordinator. On success, |
| + // it will return the Coordinator's view of the stream state. |
| + // |
| + // This operation is idempotent. It may return an errors.Transient error if |
| + // a transient failure happened. |
| + RegisterStream(context.Context, cc.State) (*cc.State, error) |
| + |
| + // TerminateStream registers the stream's terminal index with the Coordinator. |
| + // If successful, the terminal index was successfully registered. |
| + // |
| + // This operation is idempotent. It may return an errors.Transient error if |
| + // a transient failure happened. |
| + TerminateStream(ctx context.Context, p types.StreamPath, s []byte, idx types.MessageIndex) error |
| +} |
| + |
| +// RetryCoordinatorClient wraps a CoordinatorClient, retrying transient errors. |
| +type RetryCoordinatorClient struct { |
| + // Client is the CoordinatorClient that is being wrapped. |
| + Client CoordinatorClient |
| + |
| + // G is the retry.Generator to use to generate retry.Iterator instances. If |
| + // nil, retry.Default will be used. |
| + G retry.Generator |
| +} |
| + |
| +// RegisterStream implements CoordinatorClient. |
| +func (c *RetryCoordinatorClient) RegisterStream(ctx context.Context, st cc.State) (rst *cc.State, err error) { |
| + err = retry.Retry(ctx, c.retryIter(), func() (err error) { |
| + rst, err = c.Client.RegisterStream(ctx, st) |
| + return |
| + }, func(err error, d time.Duration) { |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "delay": d, |
| + }.Warningf(ctx, "Transient error registering stream. Retrying...") |
| + }) |
| + return |
| +} |
| + |
| +// TerminateStream implements CoordinatorClient. |
| +func (c *RetryCoordinatorClient) TerminateStream(ctx context.Context, p types.StreamPath, s []byte, idx types.MessageIndex) error { |
| + return retry.Retry(ctx, c.retryIter(), func() error { |
| + return c.Client.TerminateStream(ctx, p, s, idx) |
| + }, func(err error, d time.Duration) { |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "delay": d, |
| + }.Warningf(ctx, "Transient error terminating stream. Retrying...") |
| + }) |
| +} |
| + |
| +func (c *RetryCoordinatorClient) retryIter() retry.Iterator { |
| + var it retry.Iterator |
| + if c.G != nil { |
| + it = c.G() |
| + } else { |
| + it = retry.Default() |
| + } |
| + return retry.TransientOnly(it) |
| +} |