Chromium Code Reviews| Index: server/internal/logdog/coordinatorClient/client.go |
| diff --git a/server/internal/logdog/coordinatorClient/client.go b/server/internal/logdog/coordinatorClient/client.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..735219eeca9017ce6df853fd03e93514ef8da93c |
| --- /dev/null |
| +++ b/server/internal/logdog/coordinatorClient/client.go |
| @@ -0,0 +1,267 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package coordinatorClient |
|
dnj (Google)
2016/01/21 04:36:25
You can mostly ignore this file. It will be replac
|
| + |
| +import ( |
| + "encoding/base64" |
| + "fmt" |
| + "net/http" |
| + |
| + "github.com/golang/protobuf/proto" |
| + "github.com/luci/luci-go/common/api/logdog_coordinator/service/v1" |
| + "github.com/luci/luci-go/common/auth" |
| + "github.com/luci/luci-go/common/errors" |
| + "github.com/luci/luci-go/common/logdog/protocol" |
| + "github.com/luci/luci-go/common/logdog/types" |
| + "golang.org/x/net/context" |
| + "google.golang.org/api/googleapi" |
| +) |
| + |
| +var ( |
| + // ServiceScopes is the set of OAuth scopes required to communicate with this |
| + // service. |
| + ServiceScopes = []string{ |
| + auth.OAuthScopeEmail, |
| + } |
| +) |
| + |
| +// State is a representation of the remote Coordinator state. |
| +type State struct { |
| + // Path is the stream path. |
| + Path types.StreamPath |
| + |
| + // Secret is the stream secret value. It must be included if Descriptor is not |
| + // nil. |
| + Secret []byte |
| + |
| + // ProtoVersion is the protobuf version string. |
| + ProtoVersion string |
| + |
| + // Descriptor is the new stream state to push. If nil, no registration will |
| + // occur. |
| + State *service.LogStreamState |
| + // Descriptor is the new stream state to push. If nil, no registration will |
| + // occur. |
| + Descriptor *protocol.LogStreamDescriptor |
| +} |
| + |
| +func loadState(path string, secret string, desc string, state *service.LogStreamState) (*State, error) { |
| + s := State{ |
| + Path: types.StreamPath(path), |
| + State: state, |
| + } |
| + if err := s.Path.Validate(); err != nil { |
| + return nil, fmt.Errorf("failed to validate stream path: %v", err) |
| + } |
| + |
| + if secret != "" { |
| + var err error |
| + s.Secret, err = base64.StdEncoding.DecodeString(secret) |
| + if err != nil { |
| + return nil, fmt.Errorf("failed to decode secret: %v", err) |
| + } |
| + } |
| + |
| + if desc != "" { |
| + d, err := base64.StdEncoding.DecodeString(desc) |
| + if err != nil { |
| + return nil, fmt.Errorf("failed to decode descriptor: %v", err) |
| + } |
| + |
| + lsd := protocol.LogStreamDescriptor{} |
| + if err := proto.Unmarshal(d, &lsd); err != nil { |
| + return nil, fmt.Errorf("failed to unmarshal descriptor: %v", err) |
| + } |
| + s.Descriptor = &lsd |
| + } |
| + |
| + if state != nil { |
| + s.ProtoVersion = state.ProtoVersion |
| + } |
| + |
| + return &s, nil |
| +} |
| + |
| +// clientSideValidate performs a set of basic sanity checks to not waste time |
| +// on something the Coordinator is known to reject. |
| +func (s *State) clientSideValidate() error { |
| + // Let's do some client-side validation and not waste the server's time if |
| + // something is obviously wrong! |
| + if err := s.Path.Validate(); err != nil { |
| + return err |
| + } |
| + if s.Secret == nil { |
| + return errors.New("missing stream secret") |
| + } |
| + if s.ProtoVersion == "" { |
| + return errors.New("missing protobuf version") |
| + } |
| + return nil |
| +} |
| + |
| +// ServiceConfig is the structure returned by the GetConfig service API call. |
| +type ServiceConfig struct { |
| + service.GetConfigResponse |
| +} |
| + |
| +// Archived returns true if the log stream is marked as archived. |
| +func (s *State) Archived() bool { |
| + if st := s.State; st != nil { |
| + return !(st.ArchiveDataURL == "" && st.ArchiveIndexURL == "" && st.ArchiveStreamURL == "") |
| + } |
| + return false |
| +} |
| + |
| +// Options is the set of options to supply to a new Client instance. |
| +type Options struct { |
| + // Client is the authenticated HTTP client to use. |
| + Client *http.Client |
| + |
| + // BasePath is the API base path. If empty, the generated endpoint default |
| + // base path will be used. |
| + // |
| + // This should not include the service endpoint, e.g.: |
| + // https://logdog.example.com/api/ |
| + BasePath string |
| + |
| + // UserAgent, if supplied, will be included in the user agent string for |
| + // endpoint requests. |
| + UserAgent string |
| +} |
| + |
| +// Client is a LogDog Coordinator client. |
| +// |
| +// Client methods will return an errors.Transient error if the failure is |
| +// considered transient. |
| +type Client struct { |
| + *Options |
| + |
| + svc *service.Service |
| +} |
| + |
| +// New returns a new production Client using the supplied authenticated HTTP |
| +// Client. |
| +// |
| +// If apiBase is not empty, it will be used to override the |
| +func New(o Options) *Client { |
| + svc, err := service.New(o.Client) |
| + if err != nil { |
| + // This will only happen if the supplied Client is nil, which is a bug. |
| + panic(err) |
| + } |
| + if o.BasePath != "" { |
| + svc.BasePath = fmt.Sprintf("%sservice/v1/", o.BasePath) |
| + } |
| + svc.UserAgent = o.UserAgent |
| + |
| + return &Client{ |
| + Options: &o, |
| + svc: svc, |
| + } |
| +} |
| + |
| +// GetConfig loads the service configuration from the Coordinator. |
| +func (c *Client) GetConfig(ctx context.Context) (*ServiceConfig, error) { |
| + // Retrieve the global configuration. |
| + gcfg, err := c.svc.GetConfig().Context(ctx).Do() |
| + if err != nil { |
| + return nil, translateError(err) |
| + } |
| + |
| + return &ServiceConfig{ |
| + GetConfigResponse: *gcfg, |
| + }, nil |
| +} |
| + |
| +// LoadStream loads the named stream parameters. |
| +func (c *Client) LoadStream(ctx context.Context, path types.StreamPath) (*State, error) { |
| + if err := path.Validate(); err != nil { |
| + return nil, err |
| + } |
| + |
| + resp, err := c.svc.LoadStream(string(path)).Context(ctx).Do() |
| + if err != nil { |
| + return nil, translateError(err) |
| + } |
| + |
| + s, err := loadState(resp.Path, resp.Secret, resp.Descriptor, resp.State) |
| + if err != nil { |
| + return nil, err |
| + } |
| + return s, nil |
| +} |
| + |
| +// RegisterStream registers stream metadata with the Coordiantor. The |
| +// Coordinator will respond with its own version of that State on success. |
| +// This is idempotent so long as the data is consistent, so it may be called |
| +// multiple times. |
| +func (c *Client) RegisterStream(ctx context.Context, s State) (*State, error) { |
| + if err := s.clientSideValidate(); err != nil { |
| + return nil, err |
| + } |
| + |
| + desc := []byte(nil) |
| + if s.Descriptor != nil { |
| + err := error(nil) |
| + desc, err = proto.Marshal(s.Descriptor) |
| + if err != nil { |
| + return nil, err |
| + } |
| + } |
| + |
| + // No point in including the Descriptor; clear it (if it's set). |
| + resp, err := c.svc.RegisterStream(&service.RegisterStreamRequest{ |
| + ProtoVersion: s.ProtoVersion, |
| + Descriptor: base64.StdEncoding.EncodeToString(desc), |
| + Path: string(s.Path), |
| + Secret: base64.StdEncoding.EncodeToString(s.Secret), |
| + }).Context(ctx).Do() |
| + if err != nil { |
| + return nil, translateError(err) |
| + } |
| + |
| + rs, err := loadState(resp.Path, resp.Secret, "", resp.State) |
| + if err != nil { |
| + return nil, err |
| + } |
| + rs.Descriptor = s.Descriptor |
| + return rs, nil |
| +} |
| + |
| +// TerminateStream registers the terminal index for the named stream. |
| +func (c *Client) TerminateStream(ctx context.Context, p types.StreamPath, s []byte, tidx types.MessageIndex) error { |
| + if tidx < 0 { |
| + return errors.New("stream state has non-terminal index") |
| + } |
| + |
| + err := c.svc.TerminateStream(&service.TerminateStreamRequest{ |
| + Path: string(p), |
| + Secret: base64.StdEncoding.EncodeToString(s), |
| + TerminalIndex: int64(tidx), |
| + }).Context(ctx).Do() |
| + if err != nil { |
| + return translateError(err) |
| + } |
| + return nil |
| +} |
| + |
| +func translateError(err error) error { |
| + if gerr, ok := err.(*googleapi.Error); ok { |
| + // Auth and server errors are considered transient. |
| + switch { |
| + case gerr.Code == http.StatusUnauthorized: |
| + fallthrough |
| + case gerr.Code == http.StatusForbidden: |
| + fallthrough |
| + case gerr.Code >= http.StatusInternalServerError: |
| + return errors.WrapTransient(err) |
| + } |
| + |
| + return err |
| + } |
| + |
| + // Not a Google API error. Assume it's transient. |
| + return errors.WrapTransient(err) |
| +} |