| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package coordinator | 5 package coordinator |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 | 9 |
| 10 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 10 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| 11 "github.com/luci/luci-go/common/config" |
| 11 "github.com/luci/luci-go/common/errors" | 12 "github.com/luci/luci-go/common/errors" |
| 12 "github.com/luci/luci-go/common/logdog/types" | 13 "github.com/luci/luci-go/common/logdog/types" |
| 13 "github.com/luci/luci-go/common/proto/logdog/logpb" | 14 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 14 "golang.org/x/net/context" | 15 "golang.org/x/net/context" |
| 15 ) | 16 ) |
| 16 | 17 |
| 17 // Coordinator is an interface to a remote LogDog Coordinator service. This is | 18 // Coordinator is an interface to a remote LogDog Coordinator service. This is |
| 18 // a simplified version of the Coordinator's Service API tailored specifically | 19 // a simplified version of the Coordinator's Service API tailored specifically |
| 19 // to the Collector's usage. | 20 // to the Collector's usage. |
| 20 // | 21 // |
| 21 // All Coordiantor methods will return transient-wrapped errors if appropriate. | 22 // All Coordiantor methods will return transient-wrapped errors if appropriate. |
| 22 type Coordinator interface { | 23 type Coordinator interface { |
| 23 // RegisterStream registers a log stream state. | 24 // RegisterStream registers a log stream state. |
| 24 RegisterStream(context.Context, *LogStreamState, *logpb.LogStreamDescrip
tor) (*LogStreamState, error) | 25 RegisterStream(context.Context, *LogStreamState, *logpb.LogStreamDescrip
tor) (*LogStreamState, error) |
| 25 // TerminateStream registers the terminal index of a log stream state. | 26 // TerminateStream registers the terminal index of a log stream state. |
| 26 TerminateStream(context.Context, *LogStreamState) error | 27 TerminateStream(context.Context, *LogStreamState) error |
| 27 } | 28 } |
| 28 | 29 |
| 29 // LogStreamState is a local representation of a remote stream's state. It is a | 30 // LogStreamState is a local representation of a remote stream's state. It is a |
| 30 // subset of the remote state with the necessary elements for the Collector to | 31 // subset of the remote state with the necessary elements for the Collector to |
| 31 // operate and update. | 32 // operate and update. |
| 32 type LogStreamState struct { | 33 type LogStreamState struct { |
| 34 Project config.ProjectName // Project name. |
| 33 Path types.StreamPath // Stream path. | 35 Path types.StreamPath // Stream path. |
| 34 ProtoVersion string // Stream protocol version string. | 36 ProtoVersion string // Stream protocol version string. |
| 35 » Secret []byte // Secret. | 37 » Secret types.PrefixSecret // Secret. |
| 36 TerminalIndex types.MessageIndex // Terminal index, <0 for unterminated. | 38 TerminalIndex types.MessageIndex // Terminal index, <0 for unterminated. |
| 37 Archived bool // Is the stream archived? | 39 Archived bool // Is the stream archived? |
| 38 Purged bool // Is the stream purged? | 40 Purged bool // Is the stream purged? |
| 39 } | 41 } |
| 40 | 42 |
| 41 type coordinatorImpl struct { | 43 type coordinatorImpl struct { |
| 42 c logdog.ServicesClient | 44 c logdog.ServicesClient |
| 43 } | 45 } |
| 44 | 46 |
| 45 // NewCoordinator returns a Coordinator implementation that uses a | 47 // NewCoordinator returns a Coordinator implementation that uses a |
| 46 // logdog.ServicesClient. | 48 // logdog.ServicesClient. |
| 47 func NewCoordinator(s logdog.ServicesClient) Coordinator { | 49 func NewCoordinator(s logdog.ServicesClient) Coordinator { |
| 48 return &coordinatorImpl{s} | 50 return &coordinatorImpl{s} |
| 49 } | 51 } |
| 50 | 52 |
| 51 func (*coordinatorImpl) clientSideValidate(s *LogStreamState) error { | 53 func (*coordinatorImpl) clientSideValidate(s *LogStreamState) error { |
| 54 // TODO(dnj): Force this validation when empty project is not accepted. |
| 55 if s.Project != "" { |
| 56 if err := s.Project.Validate(); err != nil { |
| 57 return fmt.Errorf("failed to validate project: %s", err) |
| 58 } |
| 59 } |
| 52 if err := s.Path.Validate(); err != nil { | 60 if err := s.Path.Validate(); err != nil { |
| 53 » » return err | 61 » » return fmt.Errorf("failed to validate path: %s", err) |
| 54 } | 62 } |
| 55 » if len(s.Secret) == 0 { | 63 » if err := s.Secret.Validate(); err != nil { |
| 56 » » return errors.New("missing stream secret") | 64 » » return fmt.Errorf("failed to validate secret: %s", err) |
| 57 } | 65 } |
| 58 return nil | 66 return nil |
| 59 } | 67 } |
| 60 | 68 |
| 61 func (c *coordinatorImpl) RegisterStream(ctx context.Context, s *LogStreamState,
d *logpb.LogStreamDescriptor) ( | 69 func (c *coordinatorImpl) RegisterStream(ctx context.Context, s *LogStreamState,
d *logpb.LogStreamDescriptor) ( |
| 62 *LogStreamState, error) { | 70 *LogStreamState, error) { |
| 63 if err := c.clientSideValidate(s); err != nil { | 71 if err := c.clientSideValidate(s); err != nil { |
| 64 return nil, err | 72 return nil, err |
| 65 } | 73 } |
| 66 if err := d.Validate(true); err != nil { | 74 if err := d.Validate(true); err != nil { |
| 67 return nil, fmt.Errorf("invalid descriptor: %s", err) | 75 return nil, fmt.Errorf("invalid descriptor: %s", err) |
| 68 } | 76 } |
| 69 | 77 |
| 70 req := logdog.RegisterStreamRequest{ | 78 req := logdog.RegisterStreamRequest{ |
| 79 Project: string(s.Project), |
| 71 Path: string(s.Path), | 80 Path: string(s.Path), |
| 72 » » Secret: s.Secret, | 81 » » Secret: []byte(s.Secret), |
| 73 ProtoVersion: s.ProtoVersion, | 82 ProtoVersion: s.ProtoVersion, |
| 74 Desc: d, | 83 Desc: d, |
| 75 } | 84 } |
| 76 | 85 |
| 77 resp, err := c.c.RegisterStream(ctx, &req) | 86 resp, err := c.c.RegisterStream(ctx, &req) |
| 78 switch { | 87 switch { |
| 79 case err != nil: | 88 case err != nil: |
| 80 return nil, err | 89 return nil, err |
| 81 case resp.State == nil: | 90 case resp.State == nil: |
| 82 return nil, errors.New("missing stream state") | 91 return nil, errors.New("missing stream state") |
| 83 } | 92 } |
| 84 | 93 |
| 85 return &LogStreamState{ | 94 return &LogStreamState{ |
| 95 Project: config.ProjectName(resp.State.Project), |
| 86 Path: types.StreamPath(resp.State.Path), | 96 Path: types.StreamPath(resp.State.Path), |
| 87 ProtoVersion: resp.State.ProtoVersion, | 97 ProtoVersion: resp.State.ProtoVersion, |
| 88 » » Secret: resp.Secret, | 98 » » Secret: types.PrefixSecret(resp.Secret), |
| 89 TerminalIndex: types.MessageIndex(resp.State.TerminalIndex), | 99 TerminalIndex: types.MessageIndex(resp.State.TerminalIndex), |
| 90 Archived: resp.State.Archived, | 100 Archived: resp.State.Archived, |
| 91 Purged: resp.State.Purged, | 101 Purged: resp.State.Purged, |
| 92 }, nil | 102 }, nil |
| 93 } | 103 } |
| 94 | 104 |
| 95 func (c *coordinatorImpl) TerminateStream(ctx context.Context, s *LogStreamState
) error { | 105 func (c *coordinatorImpl) TerminateStream(ctx context.Context, s *LogStreamState
) error { |
| 96 if err := c.clientSideValidate(s); err != nil { | 106 if err := c.clientSideValidate(s); err != nil { |
| 97 return err | 107 return err |
| 98 } | 108 } |
| 99 if s.TerminalIndex < 0 { | 109 if s.TerminalIndex < 0 { |
| 100 return errors.New("refusing to terminate with non-terminal state
") | 110 return errors.New("refusing to terminate with non-terminal state
") |
| 101 } | 111 } |
| 102 | 112 |
| 103 req := logdog.TerminateStreamRequest{ | 113 req := logdog.TerminateStreamRequest{ |
| 114 Project: string(s.Project), |
| 104 Path: string(s.Path), | 115 Path: string(s.Path), |
| 105 » » Secret: s.Secret, | 116 » » Secret: []byte(s.Secret), |
| 106 TerminalIndex: int64(s.TerminalIndex), | 117 TerminalIndex: int64(s.TerminalIndex), |
| 107 } | 118 } |
| 108 | 119 |
| 109 if _, err := c.c.TerminateStream(ctx, &req); err != nil { | 120 if _, err := c.c.TerminateStream(ctx, &req); err != nil { |
| 110 return err | 121 return err |
| 111 } | 122 } |
| 112 return nil | 123 return nil |
| 113 } | 124 } |
| OLD | NEW |