Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(72)

Side by Side Diff: server/internal/logdog/collector/coordinator/coordinator.go

Issue 1906023002: LogDog: Add project namespace to Butler/Collector. (Closed) Base URL: https://github.com/luci/luci-go@logdog-project-archivist
Patch Set: Rebase? Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package coordinator 5 package coordinator
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 9
10 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" 10 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
11 "github.com/luci/luci-go/common/config"
11 "github.com/luci/luci-go/common/errors" 12 "github.com/luci/luci-go/common/errors"
12 "github.com/luci/luci-go/common/logdog/types" 13 "github.com/luci/luci-go/common/logdog/types"
13 "github.com/luci/luci-go/common/proto/logdog/logpb" 14 "github.com/luci/luci-go/common/proto/logdog/logpb"
14 "golang.org/x/net/context" 15 "golang.org/x/net/context"
15 ) 16 )
16 17
17 // Coordinator is an interface to a remote LogDog Coordinator service. This is 18 // Coordinator is an interface to a remote LogDog Coordinator service. This is
18 // a simplified version of the Coordinator's Service API tailored specifically 19 // a simplified version of the Coordinator's Service API tailored specifically
19 // to the Collector's usage. 20 // to the Collector's usage.
20 // 21 //
21 // All Coordiantor methods will return transient-wrapped errors if appropriate. 22 // All Coordiantor methods will return transient-wrapped errors if appropriate.
22 type Coordinator interface { 23 type Coordinator interface {
23 // RegisterStream registers a log stream state. 24 // RegisterStream registers a log stream state.
24 RegisterStream(context.Context, *LogStreamState, *logpb.LogStreamDescrip tor) (*LogStreamState, error) 25 RegisterStream(context.Context, *LogStreamState, *logpb.LogStreamDescrip tor) (*LogStreamState, error)
25 // TerminateStream registers the terminal index of a log stream state. 26 // TerminateStream registers the terminal index of a log stream state.
26 TerminateStream(context.Context, *LogStreamState) error 27 TerminateStream(context.Context, *LogStreamState) error
27 } 28 }
28 29
29 // LogStreamState is a local representation of a remote stream's state. It is a 30 // LogStreamState is a local representation of a remote stream's state. It is a
30 // subset of the remote state with the necessary elements for the Collector to 31 // subset of the remote state with the necessary elements for the Collector to
31 // operate and update. 32 // operate and update.
32 type LogStreamState struct { 33 type LogStreamState struct {
34 Project config.ProjectName // Project name.
33 Path types.StreamPath // Stream path. 35 Path types.StreamPath // Stream path.
34 ProtoVersion string // Stream protocol version string. 36 ProtoVersion string // Stream protocol version string.
35 » Secret []byte // Secret. 37 » Secret types.PrefixSecret // Secret.
36 TerminalIndex types.MessageIndex // Terminal index, <0 for unterminated. 38 TerminalIndex types.MessageIndex // Terminal index, <0 for unterminated.
37 Archived bool // Is the stream archived? 39 Archived bool // Is the stream archived?
38 Purged bool // Is the stream purged? 40 Purged bool // Is the stream purged?
39 } 41 }
40 42
41 type coordinatorImpl struct { 43 type coordinatorImpl struct {
42 c logdog.ServicesClient 44 c logdog.ServicesClient
43 } 45 }
44 46
45 // NewCoordinator returns a Coordinator implementation that uses a 47 // NewCoordinator returns a Coordinator implementation that uses a
46 // logdog.ServicesClient. 48 // logdog.ServicesClient.
47 func NewCoordinator(s logdog.ServicesClient) Coordinator { 49 func NewCoordinator(s logdog.ServicesClient) Coordinator {
48 return &coordinatorImpl{s} 50 return &coordinatorImpl{s}
49 } 51 }
50 52
51 func (*coordinatorImpl) clientSideValidate(s *LogStreamState) error { 53 func (*coordinatorImpl) clientSideValidate(s *LogStreamState) error {
54 // TODO(dnj): Force this validation when empty project is not accepted.
55 if s.Project != "" {
56 if err := s.Project.Validate(); err != nil {
57 return fmt.Errorf("failed to validate project: %s", err)
58 }
59 }
52 if err := s.Path.Validate(); err != nil { 60 if err := s.Path.Validate(); err != nil {
53 » » return err 61 » » return fmt.Errorf("failed to validate path: %s", err)
54 } 62 }
55 » if len(s.Secret) == 0 { 63 » if err := s.Secret.Validate(); err != nil {
56 » » return errors.New("missing stream secret") 64 » » return fmt.Errorf("failed to validate secret: %s", err)
57 } 65 }
58 return nil 66 return nil
59 } 67 }
60 68
61 func (c *coordinatorImpl) RegisterStream(ctx context.Context, s *LogStreamState, d *logpb.LogStreamDescriptor) ( 69 func (c *coordinatorImpl) RegisterStream(ctx context.Context, s *LogStreamState, d *logpb.LogStreamDescriptor) (
62 *LogStreamState, error) { 70 *LogStreamState, error) {
63 if err := c.clientSideValidate(s); err != nil { 71 if err := c.clientSideValidate(s); err != nil {
64 return nil, err 72 return nil, err
65 } 73 }
66 if err := d.Validate(true); err != nil { 74 if err := d.Validate(true); err != nil {
67 return nil, fmt.Errorf("invalid descriptor: %s", err) 75 return nil, fmt.Errorf("invalid descriptor: %s", err)
68 } 76 }
69 77
70 req := logdog.RegisterStreamRequest{ 78 req := logdog.RegisterStreamRequest{
79 Project: string(s.Project),
71 Path: string(s.Path), 80 Path: string(s.Path),
72 » » Secret: s.Secret, 81 » » Secret: []byte(s.Secret),
73 ProtoVersion: s.ProtoVersion, 82 ProtoVersion: s.ProtoVersion,
74 Desc: d, 83 Desc: d,
75 } 84 }
76 85
77 resp, err := c.c.RegisterStream(ctx, &req) 86 resp, err := c.c.RegisterStream(ctx, &req)
78 switch { 87 switch {
79 case err != nil: 88 case err != nil:
80 return nil, err 89 return nil, err
81 case resp.State == nil: 90 case resp.State == nil:
82 return nil, errors.New("missing stream state") 91 return nil, errors.New("missing stream state")
83 } 92 }
84 93
85 return &LogStreamState{ 94 return &LogStreamState{
95 Project: config.ProjectName(resp.State.Project),
86 Path: types.StreamPath(resp.State.Path), 96 Path: types.StreamPath(resp.State.Path),
87 ProtoVersion: resp.State.ProtoVersion, 97 ProtoVersion: resp.State.ProtoVersion,
88 » » Secret: resp.Secret, 98 » » Secret: types.PrefixSecret(resp.Secret),
89 TerminalIndex: types.MessageIndex(resp.State.TerminalIndex), 99 TerminalIndex: types.MessageIndex(resp.State.TerminalIndex),
90 Archived: resp.State.Archived, 100 Archived: resp.State.Archived,
91 Purged: resp.State.Purged, 101 Purged: resp.State.Purged,
92 }, nil 102 }, nil
93 } 103 }
94 104
95 func (c *coordinatorImpl) TerminateStream(ctx context.Context, s *LogStreamState ) error { 105 func (c *coordinatorImpl) TerminateStream(ctx context.Context, s *LogStreamState ) error {
96 if err := c.clientSideValidate(s); err != nil { 106 if err := c.clientSideValidate(s); err != nil {
97 return err 107 return err
98 } 108 }
99 if s.TerminalIndex < 0 { 109 if s.TerminalIndex < 0 {
100 return errors.New("refusing to terminate with non-terminal state ") 110 return errors.New("refusing to terminate with non-terminal state ")
101 } 111 }
102 112
103 req := logdog.TerminateStreamRequest{ 113 req := logdog.TerminateStreamRequest{
114 Project: string(s.Project),
104 Path: string(s.Path), 115 Path: string(s.Path),
105 » » Secret: s.Secret, 116 » » Secret: []byte(s.Secret),
106 TerminalIndex: int64(s.TerminalIndex), 117 TerminalIndex: int64(s.TerminalIndex),
107 } 118 }
108 119
109 if _, err := c.c.TerminateStream(ctx, &req); err != nil { 120 if _, err := c.c.TerminateStream(ctx, &req); err != nil {
110 return err 121 return err
111 } 122 }
112 return nil 123 return nil
113 } 124 }
OLDNEW
« no previous file with comments | « server/internal/logdog/collector/coordinator/cache_test.go ('k') | server/internal/logdog/collector/utils_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698