Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(864)

Unified Diff: server/internal/logdog/collector/coordinator/coordinator.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Comments, rebase. Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: server/internal/logdog/collector/coordinator/coordinator.go
diff --git a/server/internal/logdog/collector/coordinator/coordinator.go b/server/internal/logdog/collector/coordinator/coordinator.go
new file mode 100644
index 0000000000000000000000000000000000000000..f811be0546301285fa3a6a8aa307d9afbc921fa2
--- /dev/null
+++ b/server/internal/logdog/collector/coordinator/coordinator.go
@@ -0,0 +1,118 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package coordinator
+
+import (
+ "fmt"
+
+ "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
+ "github.com/luci/luci-go/common/errors"
+ "github.com/luci/luci-go/common/grpcutil"
+ "github.com/luci/luci-go/common/logdog/types"
+ "github.com/luci/luci-go/common/proto/logdog/logpb"
+ "golang.org/x/net/context"
+)
+
+// Coordinator is an interface to a remote LogDog Coordinator service. This is
+// a simplified version of the Coordinator's Service API tailored specifically
+// to the Collector's usage.
+//
+// All Coordiantor methods will return transient-wrapped errors if appropriate.
+type Coordinator interface {
+ // RegisterStream registers a log stream state.
+ RegisterStream(context.Context, *LogStreamState, *logpb.LogStreamDescriptor) (*LogStreamState, error)
+ // TerminateStream registers the terminal index of a log stream state.
+ TerminateStream(context.Context, *LogStreamState) error
+}
+
+// LogStreamState is a local representation of a remote stream's state. It is a
+// subset of the remote state with the necessary elements for the Collector to
+// operate and update.
+type LogStreamState struct {
+ Path types.StreamPath // Stream path.
+ ProtoVersion string // Stream protocol version string.
+ Secret []byte // Secret.
+ TerminalIndex types.MessageIndex // Terminal index, <0 for unterminated.
+ Archived bool // Is the stream archived?
+ Purged bool // Is the stream purged?
+}
+
+type coordinatorImpl struct {
+ c services.ServicesClient
+}
+
+// NewCoordinator returns a Coordinator implementation that uses a
+// services.ServicesClient.
+func NewCoordinator(s services.ServicesClient) Coordinator {
+ return &coordinatorImpl{s}
+}
+
+func (*coordinatorImpl) clientSideValidate(s *LogStreamState) error {
+ if err := s.Path.Validate(); err != nil {
+ return err
+ }
+ if len(s.Secret) == 0 {
+ return errors.New("missing stream secret")
+ }
+ return nil
+}
+
+func (c *coordinatorImpl) RegisterStream(ctx context.Context, s *LogStreamState, d *logpb.LogStreamDescriptor) (
+ *LogStreamState, error) {
+ if err := c.clientSideValidate(s); err != nil {
+ return nil, err
+ }
+ if err := d.Validate(true); err != nil {
+ return nil, fmt.Errorf("invalid descriptor: %s", err)
+ }
+
+ req := services.RegisterStreamRequest{
+ Path: string(s.Path),
+ Secret: s.Secret,
+ ProtoVersion: s.ProtoVersion,
+ Desc: d,
+ }
+
+ resp, err := c.c.RegisterStream(ctx, &req)
+ if err != nil {
+ return nil, err
+ }
+
+ return &LogStreamState{
+ Path: types.StreamPath(resp.Path),
+ ProtoVersion: resp.ProtoVersion,
+ Secret: resp.Secret,
+ TerminalIndex: types.MessageIndex(resp.TerminalIndex),
+ Archived: resp.Archived,
+ Purged: resp.Purged,
+ }, nil
+}
+
+func (c *coordinatorImpl) TerminateStream(ctx context.Context, s *LogStreamState) error {
+ if err := c.clientSideValidate(s); err != nil {
+ return err
+ }
+ if s.TerminalIndex < 0 {
+ return errors.New("refusing to terminate with non-terminal state")
+ }
+
+ req := services.TerminateStreamRequest{
+ Path: string(s.Path),
+ Secret: s.Secret,
+ TerminalIndex: int64(s.TerminalIndex),
+ }
+
+ if _, err := c.c.TerminateStream(ctx, &req); err != nil {
+ return err
+ }
+ return nil
+}
+
+func wrapIfTransient(err error) error {
+ if grpcutil.IsTransient(err) {
+ err = errors.WrapTransient(err)
+ }
+ return err
+}
« no previous file with comments | « server/internal/logdog/collector/coordinator/cache_test.go ('k') | server/internal/logdog/collector/coordinator/doc.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698