Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(159)

Side by Side Diff: server/internal/logdog/coordinatorClient/client.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package coordinatorClient
dnj (Google) 2016/01/21 04:36:25 You can mostly ignore this file. It will be replac
6
7 import (
8 "encoding/base64"
9 "fmt"
10 "net/http"
11
12 "github.com/golang/protobuf/proto"
13 "github.com/luci/luci-go/common/api/logdog_coordinator/service/v1"
14 "github.com/luci/luci-go/common/auth"
15 "github.com/luci/luci-go/common/errors"
16 "github.com/luci/luci-go/common/logdog/protocol"
17 "github.com/luci/luci-go/common/logdog/types"
18 "golang.org/x/net/context"
19 "google.golang.org/api/googleapi"
20 )
21
22 var (
23 // ServiceScopes is the set of OAuth scopes required to communicate with this
24 // service.
25 ServiceScopes = []string{
26 auth.OAuthScopeEmail,
27 }
28 )
29
30 // State is a representation of the remote Coordinator state.
31 type State struct {
32 // Path is the stream path.
33 Path types.StreamPath
34
35 // Secret is the stream secret value. It must be included if Descriptor is not
36 // nil.
37 Secret []byte
38
39 // ProtoVersion is the protobuf version string.
40 ProtoVersion string
41
42 // Descriptor is the new stream state to push. If nil, no registration w ill
43 // occur.
44 State *service.LogStreamState
45 // Descriptor is the new stream state to push. If nil, no registration w ill
46 // occur.
47 Descriptor *protocol.LogStreamDescriptor
48 }
49
50 func loadState(path string, secret string, desc string, state *service.LogStream State) (*State, error) {
51 s := State{
52 Path: types.StreamPath(path),
53 State: state,
54 }
55 if err := s.Path.Validate(); err != nil {
56 return nil, fmt.Errorf("failed to validate stream path: %v", err )
57 }
58
59 if secret != "" {
60 var err error
61 s.Secret, err = base64.StdEncoding.DecodeString(secret)
62 if err != nil {
63 return nil, fmt.Errorf("failed to decode secret: %v", er r)
64 }
65 }
66
67 if desc != "" {
68 d, err := base64.StdEncoding.DecodeString(desc)
69 if err != nil {
70 return nil, fmt.Errorf("failed to decode descriptor: %v" , err)
71 }
72
73 lsd := protocol.LogStreamDescriptor{}
74 if err := proto.Unmarshal(d, &lsd); err != nil {
75 return nil, fmt.Errorf("failed to unmarshal descriptor: %v", err)
76 }
77 s.Descriptor = &lsd
78 }
79
80 if state != nil {
81 s.ProtoVersion = state.ProtoVersion
82 }
83
84 return &s, nil
85 }
86
87 // clientSideValidate performs a set of basic sanity checks to not waste time
88 // on something the Coordinator is known to reject.
89 func (s *State) clientSideValidate() error {
90 // Let's do some client-side validation and not waste the server's time if
91 // something is obviously wrong!
92 if err := s.Path.Validate(); err != nil {
93 return err
94 }
95 if s.Secret == nil {
96 return errors.New("missing stream secret")
97 }
98 if s.ProtoVersion == "" {
99 return errors.New("missing protobuf version")
100 }
101 return nil
102 }
103
104 // ServiceConfig is the structure returned by the GetConfig service API call.
105 type ServiceConfig struct {
106 service.GetConfigResponse
107 }
108
109 // Archived returns true if the log stream is marked as archived.
110 func (s *State) Archived() bool {
111 if st := s.State; st != nil {
112 return !(st.ArchiveDataURL == "" && st.ArchiveIndexURL == "" && st.ArchiveStreamURL == "")
113 }
114 return false
115 }
116
117 // Options is the set of options to supply to a new Client instance.
118 type Options struct {
119 // Client is the authenticated HTTP client to use.
120 Client *http.Client
121
122 // BasePath is the API base path. If empty, the generated endpoint defau lt
123 // base path will be used.
124 //
125 // This should not include the service endpoint, e.g.:
126 // https://logdog.example.com/api/
127 BasePath string
128
129 // UserAgent, if supplied, will be included in the user agent string for
130 // endpoint requests.
131 UserAgent string
132 }
133
134 // Client is a LogDog Coordinator client.
135 //
136 // Client methods will return an errors.Transient error if the failure is
137 // considered transient.
138 type Client struct {
139 *Options
140
141 svc *service.Service
142 }
143
144 // New returns a new production Client using the supplied authenticated HTTP
145 // Client.
146 //
147 // If apiBase is not empty, it will be used to override the
148 func New(o Options) *Client {
149 svc, err := service.New(o.Client)
150 if err != nil {
151 // This will only happen if the supplied Client is nil, which is a bug.
152 panic(err)
153 }
154 if o.BasePath != "" {
155 svc.BasePath = fmt.Sprintf("%sservice/v1/", o.BasePath)
156 }
157 svc.UserAgent = o.UserAgent
158
159 return &Client{
160 Options: &o,
161 svc: svc,
162 }
163 }
164
165 // GetConfig loads the service configuration from the Coordinator.
166 func (c *Client) GetConfig(ctx context.Context) (*ServiceConfig, error) {
167 // Retrieve the global configuration.
168 gcfg, err := c.svc.GetConfig().Context(ctx).Do()
169 if err != nil {
170 return nil, translateError(err)
171 }
172
173 return &ServiceConfig{
174 GetConfigResponse: *gcfg,
175 }, nil
176 }
177
178 // LoadStream loads the named stream parameters.
179 func (c *Client) LoadStream(ctx context.Context, path types.StreamPath) (*State, error) {
180 if err := path.Validate(); err != nil {
181 return nil, err
182 }
183
184 resp, err := c.svc.LoadStream(string(path)).Context(ctx).Do()
185 if err != nil {
186 return nil, translateError(err)
187 }
188
189 s, err := loadState(resp.Path, resp.Secret, resp.Descriptor, resp.State)
190 if err != nil {
191 return nil, err
192 }
193 return s, nil
194 }
195
196 // RegisterStream registers stream metadata with the Coordiantor. The
197 // Coordinator will respond with its own version of that State on success.
198 // This is idempotent so long as the data is consistent, so it may be called
199 // multiple times.
200 func (c *Client) RegisterStream(ctx context.Context, s State) (*State, error) {
201 if err := s.clientSideValidate(); err != nil {
202 return nil, err
203 }
204
205 desc := []byte(nil)
206 if s.Descriptor != nil {
207 err := error(nil)
208 desc, err = proto.Marshal(s.Descriptor)
209 if err != nil {
210 return nil, err
211 }
212 }
213
214 // No point in including the Descriptor; clear it (if it's set).
215 resp, err := c.svc.RegisterStream(&service.RegisterStreamRequest{
216 ProtoVersion: s.ProtoVersion,
217 Descriptor: base64.StdEncoding.EncodeToString(desc),
218 Path: string(s.Path),
219 Secret: base64.StdEncoding.EncodeToString(s.Secret),
220 }).Context(ctx).Do()
221 if err != nil {
222 return nil, translateError(err)
223 }
224
225 rs, err := loadState(resp.Path, resp.Secret, "", resp.State)
226 if err != nil {
227 return nil, err
228 }
229 rs.Descriptor = s.Descriptor
230 return rs, nil
231 }
232
233 // TerminateStream registers the terminal index for the named stream.
234 func (c *Client) TerminateStream(ctx context.Context, p types.StreamPath, s []by te, tidx types.MessageIndex) error {
235 if tidx < 0 {
236 return errors.New("stream state has non-terminal index")
237 }
238
239 err := c.svc.TerminateStream(&service.TerminateStreamRequest{
240 Path: string(p),
241 Secret: base64.StdEncoding.EncodeToString(s),
242 TerminalIndex: int64(tidx),
243 }).Context(ctx).Do()
244 if err != nil {
245 return translateError(err)
246 }
247 return nil
248 }
249
250 func translateError(err error) error {
251 if gerr, ok := err.(*googleapi.Error); ok {
252 // Auth and server errors are considered transient.
253 switch {
254 case gerr.Code == http.StatusUnauthorized:
255 fallthrough
256 case gerr.Code == http.StatusForbidden:
257 fallthrough
258 case gerr.Code >= http.StatusInternalServerError:
259 return errors.WrapTransient(err)
260 }
261
262 return err
263 }
264
265 // Not a Google API error. Assume it's transient.
266 return errors.WrapTransient(err)
267 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698