| Index: server/internal/logdog/collector/coordinator/coordinator.go
|
| diff --git a/server/internal/logdog/collector/coordinator/coordinator.go b/server/internal/logdog/collector/coordinator/coordinator.go
|
| index f253fe15c0d86cfb93b7bedac69ab1f74ed47825..ddc391ba9a44fabc2b5f96cc42daf22980eb5f1d 100644
|
| --- a/server/internal/logdog/collector/coordinator/coordinator.go
|
| +++ b/server/internal/logdog/collector/coordinator/coordinator.go
|
| @@ -8,6 +8,7 @@ import (
|
| "fmt"
|
|
|
| "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
|
| + "github.com/luci/luci-go/common/config"
|
| "github.com/luci/luci-go/common/errors"
|
| "github.com/luci/luci-go/common/logdog/types"
|
| "github.com/luci/luci-go/common/proto/logdog/logpb"
|
| @@ -30,9 +31,10 @@ type Coordinator interface {
|
| // subset of the remote state with the necessary elements for the Collector to
|
| // operate and update.
|
| type LogStreamState struct {
|
| + Project config.ProjectName // Project name.
|
| Path types.StreamPath // Stream path.
|
| ProtoVersion string // Stream protocol version string.
|
| - Secret []byte // Secret.
|
| + Secret types.PrefixSecret // Secret.
|
| TerminalIndex types.MessageIndex // Terminal index, <0 for unterminated.
|
| Archived bool // Is the stream archived?
|
| Purged bool // Is the stream purged?
|
| @@ -49,11 +51,17 @@ func NewCoordinator(s logdog.ServicesClient) Coordinator {
|
| }
|
|
|
| func (*coordinatorImpl) clientSideValidate(s *LogStreamState) error {
|
| + // TODO(dnj): Force this validation when empty project is not accepted.
|
| + if s.Project != "" {
|
| + if err := s.Project.Validate(); err != nil {
|
| + return fmt.Errorf("failed to validate project: %s", err)
|
| + }
|
| + }
|
| if err := s.Path.Validate(); err != nil {
|
| - return err
|
| + return fmt.Errorf("failed to validate path: %s", err)
|
| }
|
| - if len(s.Secret) == 0 {
|
| - return errors.New("missing stream secret")
|
| + if err := s.Secret.Validate(); err != nil {
|
| + return fmt.Errorf("failed to validate secret: %s", err)
|
| }
|
| return nil
|
| }
|
| @@ -68,8 +76,9 @@ func (c *coordinatorImpl) RegisterStream(ctx context.Context, s *LogStreamState,
|
| }
|
|
|
| req := logdog.RegisterStreamRequest{
|
| + Project: string(s.Project),
|
| Path: string(s.Path),
|
| - Secret: s.Secret,
|
| + Secret: []byte(s.Secret),
|
| ProtoVersion: s.ProtoVersion,
|
| Desc: d,
|
| }
|
| @@ -83,9 +92,10 @@ func (c *coordinatorImpl) RegisterStream(ctx context.Context, s *LogStreamState,
|
| }
|
|
|
| return &LogStreamState{
|
| + Project: config.ProjectName(resp.State.Project),
|
| Path: types.StreamPath(resp.State.Path),
|
| ProtoVersion: resp.State.ProtoVersion,
|
| - Secret: resp.Secret,
|
| + Secret: types.PrefixSecret(resp.Secret),
|
| TerminalIndex: types.MessageIndex(resp.State.TerminalIndex),
|
| Archived: resp.State.Archived,
|
| Purged: resp.State.Purged,
|
| @@ -101,8 +111,9 @@ func (c *coordinatorImpl) TerminateStream(ctx context.Context, s *LogStreamState
|
| }
|
|
|
| req := logdog.TerminateStreamRequest{
|
| + Project: string(s.Project),
|
| Path: string(s.Path),
|
| - Secret: s.Secret,
|
| + Secret: []byte(s.Secret),
|
| TerminalIndex: int64(s.TerminalIndex),
|
| }
|
|
|
|
|