Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(186)

Unified Diff: server/internal/logdog/coordinatorClient/client.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: server/internal/logdog/coordinatorClient/client.go
diff --git a/server/internal/logdog/coordinatorClient/client.go b/server/internal/logdog/coordinatorClient/client.go
new file mode 100644
index 0000000000000000000000000000000000000000..735219eeca9017ce6df853fd03e93514ef8da93c
--- /dev/null
+++ b/server/internal/logdog/coordinatorClient/client.go
@@ -0,0 +1,267 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package coordinatorClient
dnj (Google) 2016/01/21 04:36:25 You can mostly ignore this file. It will be replac
+
+import (
+ "encoding/base64"
+ "fmt"
+ "net/http"
+
+ "github.com/golang/protobuf/proto"
+ "github.com/luci/luci-go/common/api/logdog_coordinator/service/v1"
+ "github.com/luci/luci-go/common/auth"
+ "github.com/luci/luci-go/common/errors"
+ "github.com/luci/luci-go/common/logdog/protocol"
+ "github.com/luci/luci-go/common/logdog/types"
+ "golang.org/x/net/context"
+ "google.golang.org/api/googleapi"
+)
+
+var (
+ // ServiceScopes is the set of OAuth scopes required to communicate with this
+ // service.
+ ServiceScopes = []string{
+ auth.OAuthScopeEmail,
+ }
+)
+
+// State is a representation of the remote Coordinator state.
+type State struct {
+ // Path is the stream path.
+ Path types.StreamPath
+
+ // Secret is the stream secret value. It must be included if Descriptor is not
+ // nil.
+ Secret []byte
+
+ // ProtoVersion is the protobuf version string.
+ ProtoVersion string
+
+ // Descriptor is the new stream state to push. If nil, no registration will
+ // occur.
+ State *service.LogStreamState
+ // Descriptor is the new stream state to push. If nil, no registration will
+ // occur.
+ Descriptor *protocol.LogStreamDescriptor
+}
+
+func loadState(path string, secret string, desc string, state *service.LogStreamState) (*State, error) {
+ s := State{
+ Path: types.StreamPath(path),
+ State: state,
+ }
+ if err := s.Path.Validate(); err != nil {
+ return nil, fmt.Errorf("failed to validate stream path: %v", err)
+ }
+
+ if secret != "" {
+ var err error
+ s.Secret, err = base64.StdEncoding.DecodeString(secret)
+ if err != nil {
+ return nil, fmt.Errorf("failed to decode secret: %v", err)
+ }
+ }
+
+ if desc != "" {
+ d, err := base64.StdEncoding.DecodeString(desc)
+ if err != nil {
+ return nil, fmt.Errorf("failed to decode descriptor: %v", err)
+ }
+
+ lsd := protocol.LogStreamDescriptor{}
+ if err := proto.Unmarshal(d, &lsd); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal descriptor: %v", err)
+ }
+ s.Descriptor = &lsd
+ }
+
+ if state != nil {
+ s.ProtoVersion = state.ProtoVersion
+ }
+
+ return &s, nil
+}
+
+// clientSideValidate performs a set of basic sanity checks to not waste time
+// on something the Coordinator is known to reject.
+func (s *State) clientSideValidate() error {
+ // Let's do some client-side validation and not waste the server's time if
+ // something is obviously wrong!
+ if err := s.Path.Validate(); err != nil {
+ return err
+ }
+ if s.Secret == nil {
+ return errors.New("missing stream secret")
+ }
+ if s.ProtoVersion == "" {
+ return errors.New("missing protobuf version")
+ }
+ return nil
+}
+
+// ServiceConfig is the structure returned by the GetConfig service API call.
+type ServiceConfig struct {
+ service.GetConfigResponse
+}
+
+// Archived returns true if the log stream is marked as archived.
+func (s *State) Archived() bool {
+ if st := s.State; st != nil {
+ return !(st.ArchiveDataURL == "" && st.ArchiveIndexURL == "" && st.ArchiveStreamURL == "")
+ }
+ return false
+}
+
+// Options is the set of options to supply to a new Client instance.
+type Options struct {
+ // Client is the authenticated HTTP client to use.
+ Client *http.Client
+
+ // BasePath is the API base path. If empty, the generated endpoint default
+ // base path will be used.
+ //
+ // This should not include the service endpoint, e.g.:
+ // https://logdog.example.com/api/
+ BasePath string
+
+ // UserAgent, if supplied, will be included in the user agent string for
+ // endpoint requests.
+ UserAgent string
+}
+
+// Client is a LogDog Coordinator client.
+//
+// Client methods will return an errors.Transient error if the failure is
+// considered transient.
+type Client struct {
+ *Options
+
+ svc *service.Service
+}
+
+// New returns a new production Client using the supplied authenticated HTTP
+// Client.
+//
+// If apiBase is not empty, it will be used to override the
+func New(o Options) *Client {
+ svc, err := service.New(o.Client)
+ if err != nil {
+ // This will only happen if the supplied Client is nil, which is a bug.
+ panic(err)
+ }
+ if o.BasePath != "" {
+ svc.BasePath = fmt.Sprintf("%sservice/v1/", o.BasePath)
+ }
+ svc.UserAgent = o.UserAgent
+
+ return &Client{
+ Options: &o,
+ svc: svc,
+ }
+}
+
+// GetConfig loads the service configuration from the Coordinator.
+func (c *Client) GetConfig(ctx context.Context) (*ServiceConfig, error) {
+ // Retrieve the global configuration.
+ gcfg, err := c.svc.GetConfig().Context(ctx).Do()
+ if err != nil {
+ return nil, translateError(err)
+ }
+
+ return &ServiceConfig{
+ GetConfigResponse: *gcfg,
+ }, nil
+}
+
+// LoadStream loads the named stream parameters.
+func (c *Client) LoadStream(ctx context.Context, path types.StreamPath) (*State, error) {
+ if err := path.Validate(); err != nil {
+ return nil, err
+ }
+
+ resp, err := c.svc.LoadStream(string(path)).Context(ctx).Do()
+ if err != nil {
+ return nil, translateError(err)
+ }
+
+ s, err := loadState(resp.Path, resp.Secret, resp.Descriptor, resp.State)
+ if err != nil {
+ return nil, err
+ }
+ return s, nil
+}
+
+// RegisterStream registers stream metadata with the Coordiantor. The
+// Coordinator will respond with its own version of that State on success.
+// This is idempotent so long as the data is consistent, so it may be called
+// multiple times.
+func (c *Client) RegisterStream(ctx context.Context, s State) (*State, error) {
+ if err := s.clientSideValidate(); err != nil {
+ return nil, err
+ }
+
+ desc := []byte(nil)
+ if s.Descriptor != nil {
+ err := error(nil)
+ desc, err = proto.Marshal(s.Descriptor)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ // No point in including the Descriptor; clear it (if it's set).
+ resp, err := c.svc.RegisterStream(&service.RegisterStreamRequest{
+ ProtoVersion: s.ProtoVersion,
+ Descriptor: base64.StdEncoding.EncodeToString(desc),
+ Path: string(s.Path),
+ Secret: base64.StdEncoding.EncodeToString(s.Secret),
+ }).Context(ctx).Do()
+ if err != nil {
+ return nil, translateError(err)
+ }
+
+ rs, err := loadState(resp.Path, resp.Secret, "", resp.State)
+ if err != nil {
+ return nil, err
+ }
+ rs.Descriptor = s.Descriptor
+ return rs, nil
+}
+
+// TerminateStream registers the terminal index for the named stream.
+func (c *Client) TerminateStream(ctx context.Context, p types.StreamPath, s []byte, tidx types.MessageIndex) error {
+ if tidx < 0 {
+ return errors.New("stream state has non-terminal index")
+ }
+
+ err := c.svc.TerminateStream(&service.TerminateStreamRequest{
+ Path: string(p),
+ Secret: base64.StdEncoding.EncodeToString(s),
+ TerminalIndex: int64(tidx),
+ }).Context(ctx).Do()
+ if err != nil {
+ return translateError(err)
+ }
+ return nil
+}
+
+func translateError(err error) error {
+ if gerr, ok := err.(*googleapi.Error); ok {
+ // Auth and server errors are considered transient.
+ switch {
+ case gerr.Code == http.StatusUnauthorized:
+ fallthrough
+ case gerr.Code == http.StatusForbidden:
+ fallthrough
+ case gerr.Code >= http.StatusInternalServerError:
+ return errors.WrapTransient(err)
+ }
+
+ return err
+ }
+
+ // Not a Google API error. Assume it's transient.
+ return errors.WrapTransient(err)
+}

Powered by Google App Engine
This is Rietveld 408576698