OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package coordinator | 5 package coordinator |
6 | 6 |
7 import ( | 7 import ( |
8 "fmt" | 8 "fmt" |
9 | 9 |
10 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 10 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| 11 "github.com/luci/luci-go/common/config" |
11 "github.com/luci/luci-go/common/errors" | 12 "github.com/luci/luci-go/common/errors" |
12 "github.com/luci/luci-go/common/logdog/types" | 13 "github.com/luci/luci-go/common/logdog/types" |
13 "github.com/luci/luci-go/common/proto/logdog/logpb" | 14 "github.com/luci/luci-go/common/proto/logdog/logpb" |
14 "golang.org/x/net/context" | 15 "golang.org/x/net/context" |
15 ) | 16 ) |
16 | 17 |
17 // Coordinator is an interface to a remote LogDog Coordinator service. This is | 18 // Coordinator is an interface to a remote LogDog Coordinator service. This is |
18 // a simplified version of the Coordinator's Service API tailored specifically | 19 // a simplified version of the Coordinator's Service API tailored specifically |
19 // to the Collector's usage. | 20 // to the Collector's usage. |
20 // | 21 // |
21 // All Coordiantor methods will return transient-wrapped errors if appropriate. | 22 // All Coordiantor methods will return transient-wrapped errors if appropriate. |
22 type Coordinator interface { | 23 type Coordinator interface { |
23 // RegisterStream registers a log stream state. | 24 // RegisterStream registers a log stream state. |
24 RegisterStream(context.Context, *LogStreamState, *logpb.LogStreamDescrip
tor) (*LogStreamState, error) | 25 RegisterStream(context.Context, *LogStreamState, *logpb.LogStreamDescrip
tor) (*LogStreamState, error) |
25 // TerminateStream registers the terminal index of a log stream state. | 26 // TerminateStream registers the terminal index of a log stream state. |
26 TerminateStream(context.Context, *LogStreamState) error | 27 TerminateStream(context.Context, *LogStreamState) error |
27 } | 28 } |
28 | 29 |
29 // LogStreamState is a local representation of a remote stream's state. It is a | 30 // LogStreamState is a local representation of a remote stream's state. It is a |
30 // subset of the remote state with the necessary elements for the Collector to | 31 // subset of the remote state with the necessary elements for the Collector to |
31 // operate and update. | 32 // operate and update. |
32 type LogStreamState struct { | 33 type LogStreamState struct { |
| 34 Project config.ProjectName // Project name. |
33 Path types.StreamPath // Stream path. | 35 Path types.StreamPath // Stream path. |
34 ProtoVersion string // Stream protocol version string. | 36 ProtoVersion string // Stream protocol version string. |
35 » Secret []byte // Secret. | 37 » Secret types.PrefixSecret // Secret. |
36 TerminalIndex types.MessageIndex // Terminal index, <0 for unterminated. | 38 TerminalIndex types.MessageIndex // Terminal index, <0 for unterminated. |
37 Archived bool // Is the stream archived? | 39 Archived bool // Is the stream archived? |
38 Purged bool // Is the stream purged? | 40 Purged bool // Is the stream purged? |
39 } | 41 } |
40 | 42 |
41 type coordinatorImpl struct { | 43 type coordinatorImpl struct { |
42 c logdog.ServicesClient | 44 c logdog.ServicesClient |
43 } | 45 } |
44 | 46 |
45 // NewCoordinator returns a Coordinator implementation that uses a | 47 // NewCoordinator returns a Coordinator implementation that uses a |
46 // logdog.ServicesClient. | 48 // logdog.ServicesClient. |
47 func NewCoordinator(s logdog.ServicesClient) Coordinator { | 49 func NewCoordinator(s logdog.ServicesClient) Coordinator { |
48 return &coordinatorImpl{s} | 50 return &coordinatorImpl{s} |
49 } | 51 } |
50 | 52 |
51 func (*coordinatorImpl) clientSideValidate(s *LogStreamState) error { | 53 func (*coordinatorImpl) clientSideValidate(s *LogStreamState) error { |
| 54 // TODO(dnj): Force this validation when empty project is not accepted. |
| 55 if s.Project != "" { |
| 56 if err := s.Project.Validate(); err != nil { |
| 57 return fmt.Errorf("failed to validate project: %s", err) |
| 58 } |
| 59 } |
52 if err := s.Path.Validate(); err != nil { | 60 if err := s.Path.Validate(); err != nil { |
53 » » return err | 61 » » return fmt.Errorf("failed to validate path: %s", err) |
54 } | 62 } |
55 » if len(s.Secret) == 0 { | 63 » if err := s.Secret.Validate(); err != nil { |
56 » » return errors.New("missing stream secret") | 64 » » return fmt.Errorf("failed to validate secret: %s", err) |
57 } | 65 } |
58 return nil | 66 return nil |
59 } | 67 } |
60 | 68 |
61 func (c *coordinatorImpl) RegisterStream(ctx context.Context, s *LogStreamState,
d *logpb.LogStreamDescriptor) ( | 69 func (c *coordinatorImpl) RegisterStream(ctx context.Context, s *LogStreamState,
d *logpb.LogStreamDescriptor) ( |
62 *LogStreamState, error) { | 70 *LogStreamState, error) { |
63 if err := c.clientSideValidate(s); err != nil { | 71 if err := c.clientSideValidate(s); err != nil { |
64 return nil, err | 72 return nil, err |
65 } | 73 } |
66 if err := d.Validate(true); err != nil { | 74 if err := d.Validate(true); err != nil { |
67 return nil, fmt.Errorf("invalid descriptor: %s", err) | 75 return nil, fmt.Errorf("invalid descriptor: %s", err) |
68 } | 76 } |
69 | 77 |
70 req := logdog.RegisterStreamRequest{ | 78 req := logdog.RegisterStreamRequest{ |
| 79 Project: string(s.Project), |
71 Path: string(s.Path), | 80 Path: string(s.Path), |
72 » » Secret: s.Secret, | 81 » » Secret: []byte(s.Secret), |
73 ProtoVersion: s.ProtoVersion, | 82 ProtoVersion: s.ProtoVersion, |
74 Desc: d, | 83 Desc: d, |
75 } | 84 } |
76 | 85 |
77 resp, err := c.c.RegisterStream(ctx, &req) | 86 resp, err := c.c.RegisterStream(ctx, &req) |
78 switch { | 87 switch { |
79 case err != nil: | 88 case err != nil: |
80 return nil, err | 89 return nil, err |
81 case resp.State == nil: | 90 case resp.State == nil: |
82 return nil, errors.New("missing stream state") | 91 return nil, errors.New("missing stream state") |
83 } | 92 } |
84 | 93 |
85 return &LogStreamState{ | 94 return &LogStreamState{ |
| 95 Project: config.ProjectName(resp.State.Project), |
86 Path: types.StreamPath(resp.State.Path), | 96 Path: types.StreamPath(resp.State.Path), |
87 ProtoVersion: resp.State.ProtoVersion, | 97 ProtoVersion: resp.State.ProtoVersion, |
88 » » Secret: resp.Secret, | 98 » » Secret: types.PrefixSecret(resp.Secret), |
89 TerminalIndex: types.MessageIndex(resp.State.TerminalIndex), | 99 TerminalIndex: types.MessageIndex(resp.State.TerminalIndex), |
90 Archived: resp.State.Archived, | 100 Archived: resp.State.Archived, |
91 Purged: resp.State.Purged, | 101 Purged: resp.State.Purged, |
92 }, nil | 102 }, nil |
93 } | 103 } |
94 | 104 |
95 func (c *coordinatorImpl) TerminateStream(ctx context.Context, s *LogStreamState
) error { | 105 func (c *coordinatorImpl) TerminateStream(ctx context.Context, s *LogStreamState
) error { |
96 if err := c.clientSideValidate(s); err != nil { | 106 if err := c.clientSideValidate(s); err != nil { |
97 return err | 107 return err |
98 } | 108 } |
99 if s.TerminalIndex < 0 { | 109 if s.TerminalIndex < 0 { |
100 return errors.New("refusing to terminate with non-terminal state
") | 110 return errors.New("refusing to terminate with non-terminal state
") |
101 } | 111 } |
102 | 112 |
103 req := logdog.TerminateStreamRequest{ | 113 req := logdog.TerminateStreamRequest{ |
| 114 Project: string(s.Project), |
104 Path: string(s.Path), | 115 Path: string(s.Path), |
105 » » Secret: s.Secret, | 116 » » Secret: []byte(s.Secret), |
106 TerminalIndex: int64(s.TerminalIndex), | 117 TerminalIndex: int64(s.TerminalIndex), |
107 } | 118 } |
108 | 119 |
109 if _, err := c.c.TerminateStream(ctx, &req); err != nil { | 120 if _, err := c.c.TerminateStream(ctx, &req); err != nil { |
110 return err | 121 return err |
111 } | 122 } |
112 return nil | 123 return nil |
113 } | 124 } |
OLD | NEW |