Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(530)

Side by Side Diff: server/internal/logdog/collector/coordinator.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Post-splitting rebase. Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package collector
6
7 import (
8 "time"
9
10 "github.com/luci/luci-go/common/logdog/types"
11 log "github.com/luci/luci-go/common/logging"
12 "github.com/luci/luci-go/common/retry"
13 cc "github.com/luci/luci-go/server/internal/logdog/coordinatorClient"
14 "golang.org/x/net/context"
15 )
16
17 // CoordinatorClient is an interface to the methods in the Coordiantor service
martiniss 2016/01/23 01:05:41 Typo
dnj (Google) 2016/01/26 05:06:05 Done.
18 // that Collector uses.
19 //
20 // cc.Client implements this interface.
21 type CoordinatorClient interface {
22 // RegisterStream registers a stream's state with the Coordinator. On su ccess,
23 // it will return the Coordinator's view of the stream state.
24 //
25 // This operation is idempotent. It may return an errors.Transient error if
26 // a transient failure happened.
27 RegisterStream(context.Context, cc.State) (*cc.State, error)
28
29 // TerminateStream registers the stream's terminal index with the Coordi nator.
30 // If successful, the terminal index was successfully registered.
31 //
32 // This operation is idempotent. It may return an errors.Transient error if
33 // a transient failure happened.
34 TerminateStream(ctx context.Context, p types.StreamPath, s []byte, idx t ypes.MessageIndex) error
35 }
36
37 // RetryCoordinatorClient wraps a CoordinatorClient, retrying transient errors.
38 type RetryCoordinatorClient struct {
39 // Client is the CoordinatorClient that is being wrapped.
40 Client CoordinatorClient
41
42 // G is the retry.Generator to use to generate retry.Iterator instances. If
43 // nil, retry.Default will be used.
44 G retry.Generator
45 }
46
47 // RegisterStream implements CoordinatorClient.
48 func (c *RetryCoordinatorClient) RegisterStream(ctx context.Context, st cc.State ) (rst *cc.State, err error) {
49 err = retry.Retry(ctx, c.retryIter(), func() (err error) {
50 rst, err = c.Client.RegisterStream(ctx, st)
51 return
52 }, func(err error, d time.Duration) {
53 log.Fields{
54 log.ErrorKey: err,
55 "delay": d,
56 }.Warningf(ctx, "Transient error registering stream. Retrying... ")
57 })
58 return
59 }
60
61 // TerminateStream implements CoordinatorClient.
62 func (c *RetryCoordinatorClient) TerminateStream(ctx context.Context, p types.St reamPath, s []byte, idx types.MessageIndex) error {
63 return retry.Retry(ctx, c.retryIter(), func() error {
64 return c.Client.TerminateStream(ctx, p, s, idx)
65 }, func(err error, d time.Duration) {
66 log.Fields{
67 log.ErrorKey: err,
68 "delay": d,
69 }.Warningf(ctx, "Transient error terminating stream. Retrying... ")
70 })
71 }
72
73 func (c *RetryCoordinatorClient) retryIter() retry.Iterator {
74 var it retry.Iterator
75 if c.G != nil {
76 it = c.G()
77 } else {
78 it = retry.Default()
79 }
80 return retry.TransientOnly(it)
81 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698