Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package coordinatorClient | |
|
dnj (Google)
2016/01/21 04:36:25
You can mostly ignore this file. It will be replac
| |
| 6 | |
| 7 import ( | |
| 8 "encoding/base64" | |
| 9 "fmt" | |
| 10 "net/http" | |
| 11 | |
| 12 "github.com/golang/protobuf/proto" | |
| 13 "github.com/luci/luci-go/common/api/logdog_coordinator/service/v1" | |
| 14 "github.com/luci/luci-go/common/auth" | |
| 15 "github.com/luci/luci-go/common/errors" | |
| 16 "github.com/luci/luci-go/common/logdog/protocol" | |
| 17 "github.com/luci/luci-go/common/logdog/types" | |
| 18 "golang.org/x/net/context" | |
| 19 "google.golang.org/api/googleapi" | |
| 20 ) | |
| 21 | |
| 22 var ( | |
| 23 // ServiceScopes is the set of OAuth scopes required to communicate with this | |
| 24 // service. | |
| 25 ServiceScopes = []string{ | |
| 26 auth.OAuthScopeEmail, | |
| 27 } | |
| 28 ) | |
| 29 | |
| 30 // State is a representation of the remote Coordinator state. | |
| 31 type State struct { | |
| 32 // Path is the stream path. | |
| 33 Path types.StreamPath | |
| 34 | |
| 35 // Secret is the stream secret value. It must be included if Descriptor is not | |
| 36 // nil. | |
| 37 Secret []byte | |
| 38 | |
| 39 // ProtoVersion is the protobuf version string. | |
| 40 ProtoVersion string | |
| 41 | |
| 42 // Descriptor is the new stream state to push. If nil, no registration w ill | |
| 43 // occur. | |
| 44 State *service.LogStreamState | |
| 45 // Descriptor is the new stream state to push. If nil, no registration w ill | |
| 46 // occur. | |
| 47 Descriptor *protocol.LogStreamDescriptor | |
| 48 } | |
| 49 | |
| 50 func loadState(path string, secret string, desc string, state *service.LogStream State) (*State, error) { | |
| 51 s := State{ | |
| 52 Path: types.StreamPath(path), | |
| 53 State: state, | |
| 54 } | |
| 55 if err := s.Path.Validate(); err != nil { | |
| 56 return nil, fmt.Errorf("failed to validate stream path: %v", err ) | |
| 57 } | |
| 58 | |
| 59 if secret != "" { | |
| 60 var err error | |
| 61 s.Secret, err = base64.StdEncoding.DecodeString(secret) | |
| 62 if err != nil { | |
| 63 return nil, fmt.Errorf("failed to decode secret: %v", er r) | |
| 64 } | |
| 65 } | |
| 66 | |
| 67 if desc != "" { | |
| 68 d, err := base64.StdEncoding.DecodeString(desc) | |
| 69 if err != nil { | |
| 70 return nil, fmt.Errorf("failed to decode descriptor: %v" , err) | |
| 71 } | |
| 72 | |
| 73 lsd := protocol.LogStreamDescriptor{} | |
| 74 if err := proto.Unmarshal(d, &lsd); err != nil { | |
| 75 return nil, fmt.Errorf("failed to unmarshal descriptor: %v", err) | |
| 76 } | |
| 77 s.Descriptor = &lsd | |
| 78 } | |
| 79 | |
| 80 if state != nil { | |
| 81 s.ProtoVersion = state.ProtoVersion | |
| 82 } | |
| 83 | |
| 84 return &s, nil | |
| 85 } | |
| 86 | |
| 87 // clientSideValidate performs a set of basic sanity checks to not waste time | |
| 88 // on something the Coordinator is known to reject. | |
| 89 func (s *State) clientSideValidate() error { | |
| 90 // Let's do some client-side validation and not waste the server's time if | |
| 91 // something is obviously wrong! | |
| 92 if err := s.Path.Validate(); err != nil { | |
| 93 return err | |
| 94 } | |
| 95 if s.Secret == nil { | |
| 96 return errors.New("missing stream secret") | |
| 97 } | |
| 98 if s.ProtoVersion == "" { | |
| 99 return errors.New("missing protobuf version") | |
| 100 } | |
| 101 return nil | |
| 102 } | |
| 103 | |
| 104 // ServiceConfig is the structure returned by the GetConfig service API call. | |
| 105 type ServiceConfig struct { | |
| 106 service.GetConfigResponse | |
| 107 } | |
| 108 | |
| 109 // Archived returns true if the log stream is marked as archived. | |
| 110 func (s *State) Archived() bool { | |
| 111 if st := s.State; st != nil { | |
| 112 return !(st.ArchiveDataURL == "" && st.ArchiveIndexURL == "" && st.ArchiveStreamURL == "") | |
| 113 } | |
| 114 return false | |
| 115 } | |
| 116 | |
| 117 // Options is the set of options to supply to a new Client instance. | |
| 118 type Options struct { | |
| 119 // Client is the authenticated HTTP client to use. | |
| 120 Client *http.Client | |
| 121 | |
| 122 // BasePath is the API base path. If empty, the generated endpoint defau lt | |
| 123 // base path will be used. | |
| 124 // | |
| 125 // This should not include the service endpoint, e.g.: | |
| 126 // https://logdog.example.com/api/ | |
| 127 BasePath string | |
| 128 | |
| 129 // UserAgent, if supplied, will be included in the user agent string for | |
| 130 // endpoint requests. | |
| 131 UserAgent string | |
| 132 } | |
| 133 | |
| 134 // Client is a LogDog Coordinator client. | |
| 135 // | |
| 136 // Client methods will return an errors.Transient error if the failure is | |
| 137 // considered transient. | |
| 138 type Client struct { | |
| 139 *Options | |
| 140 | |
| 141 svc *service.Service | |
| 142 } | |
| 143 | |
| 144 // New returns a new production Client using the supplied authenticated HTTP | |
| 145 // Client. | |
| 146 // | |
| 147 // If apiBase is not empty, it will be used to override the | |
| 148 func New(o Options) *Client { | |
| 149 svc, err := service.New(o.Client) | |
| 150 if err != nil { | |
| 151 // This will only happen if the supplied Client is nil, which is a bug. | |
| 152 panic(err) | |
| 153 } | |
| 154 if o.BasePath != "" { | |
| 155 svc.BasePath = fmt.Sprintf("%sservice/v1/", o.BasePath) | |
| 156 } | |
| 157 svc.UserAgent = o.UserAgent | |
| 158 | |
| 159 return &Client{ | |
| 160 Options: &o, | |
| 161 svc: svc, | |
| 162 } | |
| 163 } | |
| 164 | |
| 165 // GetConfig loads the service configuration from the Coordinator. | |
| 166 func (c *Client) GetConfig(ctx context.Context) (*ServiceConfig, error) { | |
| 167 // Retrieve the global configuration. | |
| 168 gcfg, err := c.svc.GetConfig().Context(ctx).Do() | |
| 169 if err != nil { | |
| 170 return nil, translateError(err) | |
| 171 } | |
| 172 | |
| 173 return &ServiceConfig{ | |
| 174 GetConfigResponse: *gcfg, | |
| 175 }, nil | |
| 176 } | |
| 177 | |
| 178 // LoadStream loads the named stream parameters. | |
| 179 func (c *Client) LoadStream(ctx context.Context, path types.StreamPath) (*State, error) { | |
| 180 if err := path.Validate(); err != nil { | |
| 181 return nil, err | |
| 182 } | |
| 183 | |
| 184 resp, err := c.svc.LoadStream(string(path)).Context(ctx).Do() | |
| 185 if err != nil { | |
| 186 return nil, translateError(err) | |
| 187 } | |
| 188 | |
| 189 s, err := loadState(resp.Path, resp.Secret, resp.Descriptor, resp.State) | |
| 190 if err != nil { | |
| 191 return nil, err | |
| 192 } | |
| 193 return s, nil | |
| 194 } | |
| 195 | |
| 196 // RegisterStream registers stream metadata with the Coordiantor. The | |
| 197 // Coordinator will respond with its own version of that State on success. | |
| 198 // This is idempotent so long as the data is consistent, so it may be called | |
| 199 // multiple times. | |
| 200 func (c *Client) RegisterStream(ctx context.Context, s State) (*State, error) { | |
| 201 if err := s.clientSideValidate(); err != nil { | |
| 202 return nil, err | |
| 203 } | |
| 204 | |
| 205 desc := []byte(nil) | |
| 206 if s.Descriptor != nil { | |
| 207 err := error(nil) | |
| 208 desc, err = proto.Marshal(s.Descriptor) | |
| 209 if err != nil { | |
| 210 return nil, err | |
| 211 } | |
| 212 } | |
| 213 | |
| 214 // No point in including the Descriptor; clear it (if it's set). | |
| 215 resp, err := c.svc.RegisterStream(&service.RegisterStreamRequest{ | |
| 216 ProtoVersion: s.ProtoVersion, | |
| 217 Descriptor: base64.StdEncoding.EncodeToString(desc), | |
| 218 Path: string(s.Path), | |
| 219 Secret: base64.StdEncoding.EncodeToString(s.Secret), | |
| 220 }).Context(ctx).Do() | |
| 221 if err != nil { | |
| 222 return nil, translateError(err) | |
| 223 } | |
| 224 | |
| 225 rs, err := loadState(resp.Path, resp.Secret, "", resp.State) | |
| 226 if err != nil { | |
| 227 return nil, err | |
| 228 } | |
| 229 rs.Descriptor = s.Descriptor | |
| 230 return rs, nil | |
| 231 } | |
| 232 | |
| 233 // TerminateStream registers the terminal index for the named stream. | |
| 234 func (c *Client) TerminateStream(ctx context.Context, p types.StreamPath, s []by te, tidx types.MessageIndex) error { | |
| 235 if tidx < 0 { | |
| 236 return errors.New("stream state has non-terminal index") | |
| 237 } | |
| 238 | |
| 239 err := c.svc.TerminateStream(&service.TerminateStreamRequest{ | |
| 240 Path: string(p), | |
| 241 Secret: base64.StdEncoding.EncodeToString(s), | |
| 242 TerminalIndex: int64(tidx), | |
| 243 }).Context(ctx).Do() | |
| 244 if err != nil { | |
| 245 return translateError(err) | |
| 246 } | |
| 247 return nil | |
| 248 } | |
| 249 | |
| 250 func translateError(err error) error { | |
| 251 if gerr, ok := err.(*googleapi.Error); ok { | |
| 252 // Auth and server errors are considered transient. | |
| 253 switch { | |
| 254 case gerr.Code == http.StatusUnauthorized: | |
| 255 fallthrough | |
| 256 case gerr.Code == http.StatusForbidden: | |
| 257 fallthrough | |
| 258 case gerr.Code >= http.StatusInternalServerError: | |
| 259 return errors.WrapTransient(err) | |
| 260 } | |
| 261 | |
| 262 return err | |
| 263 } | |
| 264 | |
| 265 // Not a Google API error. Assume it's transient. | |
| 266 return errors.WrapTransient(err) | |
| 267 } | |
| OLD | NEW |