Index: go/src/infra/libs/epclient/epclient.go |
diff --git a/go/src/infra/libs/epclient/epclient.go b/go/src/infra/libs/epclient/epclient.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..3dbebe5ff140a727d9934e05cb9ddedf3d405b80 |
--- /dev/null |
+++ b/go/src/infra/libs/epclient/epclient.go |
@@ -0,0 +1,371 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+// Package epclient provides a simple implementation of a Google Cloud Endpoints |
+// client (for services implemented in Go). |
+// |
+// Unlike a traditional endpoitns client, the interfaces and datatypes are |
+// statically built with the application, using the "infra/gae/libs/endpoints" |
+// library. |
+// |
+// Example usage: |
+// |
+// import "github.com/GoogleCloudPlatform/go-endpoints/endpoints" |
+// import infra_endpoints "infra/gae/libs/endpoints" |
Vadim Sh.
2015/06/03 18:13:39
I'm sort of against intermixing GAE and non-GAE li
|
+// import "some/service" |
+// |
+// // service.Info = *endpoints.ServiceInfo{...} |
+// // service.MethodInfo = *infra_endpoints.MethodInfoMap |
+// |
+// // service. |
+// |
+// func |
+// |
+package epclient |
+ |
+import ( |
+ "bytes" |
+ "encoding/json" |
+ "fmt" |
+ "golang.org/x/net/context" |
+ "io" |
+ "io/ioutil" |
+ "net" |
+ "net/http" |
+ "reflect" |
+ "regexp" |
+ "strings" |
+ "sync" |
+ "text/template" |
+ "time" |
+ |
+ "github.com/GoogleCloudPlatform/go-endpoints/endpoints" |
+ "github.com/luci/luci-go/common/logging" |
Vadim Sh.
2015/06/03 18:13:39
nit: seems weird to group luci-go import for go-en
|
+ |
+ "github.com/luci/luci-go/common/errors" |
+) |
+ |
+var backoffSlot = 50 * time.Millisecond |
+ |
+// Backoff calculates an amount of time to back off, depending on the number |
+// of failures which have occured so far. It is non-randomized to aid in |
+// determinism during tests. |
+func Backoff(failures uint) time.Duration { |
+ const failcap = 10 |
+ |
+ if failures > failcap { |
+ failures = failcap |
+ } |
+ return backoffSlot * time.Duration(1<<failures) |
+} |
+ |
+// Client is an endpoints-client which provides a very simple in/out RPC |
+// interface for a given endpoints server. |
+// |
+// Mapping is done statically using the infra/gae/libs/endpoints.MethodInfoMap |
+// of a given service. This means that you need to recompile the client any time |
+// you change the interface of your service (but honestly, you probably needed |
+// to do that anyway). Serialization and deserialization follow the |
+// encoding/json rules. The service implementation should expose the correct |
+// input and output types (along with any json field tags) necessary to interact |
+// with the service. |
+type Client interface { |
+ // DoWithRetries will try the given method up to `upto` times before returning |
+ // an error. An `upto` value of 0 will not do the API call at all. |
+ // |
+ // in and out must be the approprite concrete Go types which correspond to the |
+ // method. |
+ // |
+ // methodName is the service method name, not the REST name for the api (e.g. |
+ // it would be the Method portion of `Service.Method`, if you were hitting |
+ // the spi interface). |
+ // |
+ // DoWithRetries will calculate a time to back off between attempts using |
+ // the Backoff method. |
+ DoWithRetries(serviceName, methodName string, in interface{}, out interface{}, upto uint) error |
Vadim Sh.
2015/06/03 18:13:39
consider using https://github.com/luci/luci-go/tre
|
+ |
+ ForService(serviceName string) (ServiceClient, error) |
Vadim Sh.
2015/06/03 18:13:39
document
|
+} |
+ |
+// ServiceClient is like a Client, but restricted to a single service under a |
+// given endpoints server. |
+type ServiceClient interface { |
+ DoWithRetries(methodName string, in interface{}, out interface{}, upto uint) error |
+} |
+ |
+var mark = errors.MakeMarkFn("endpoints") |
+ |
+type client struct { |
+ sync.Mutex |
+ |
+ url string |
+ |
+ ctx context.Context |
+ serverDef *endpoints.Server |
+ templateMap map[string]map[string]*templateEntry |
+ |
+ // for testing. Returns equivalent of http.Do and a closer function which |
+ // should be defer'd to close the underlying client. |
+ mkHTTPDoer func() ( |
+ doer func(*http.Request) (*http.Response, error), |
+ closer func()) |
+} |
+ |
+var _ Client = (*client)(nil) |
Vadim Sh.
2015/06/03 18:13:39
why? NewClient does this type check already.
|
+ |
+// NewClient returns a new Client object bound to hit `url`, which should be |
+// the url to the service (e.g. "https://foo.appspot.com"). It will expect |
+// to find the REST-style APIs at "<url>/_ah/api/<service>/<version>/<api>". |
+func NewClient(ctx context.Context, url string, s *endpoints.Server) Client { |
+ // TODO(riannucci): provide an 'spi' mode. |
+ return &client{ |
+ url: url, |
+ ctx: ctx, |
+ serverDef: s, |
+ templateMap: map[string]map[string]*templateEntry{}, |
+ mkHTTPDoer: mkHTTPDoer, |
+ } |
+} |
+ |
+var getFieldsRE = regexp.MustCompile("{{\\.([^}]*)}}") |
+ |
+type templateEntry struct { |
+ *template.Template |
+ used map[string]struct{} |
+ si *endpoints.ServiceInfo |
+ mi *endpoints.MethodInfo |
+ |
+ reqType reflect.Type |
+ rspType reflect.Type |
+} |
+ |
+func (c *client) getTemplateMap(serviceName string) map[string]*templateEntry { |
+ c.Lock() |
+ if m, ok := c.templateMap[serviceName]; ok { |
+ c.Unlock() |
+ return m |
+ } |
+ c.Unlock() |
+ |
+ templateConvert := strings.NewReplacer("{", "{{.", "}", "}}") |
+ service := c.serverDef.ServiceByName(serviceName) |
+ if service == nil { |
+ c.Lock() |
+ defer c.Unlock() |
+ c.templateMap[serviceName] = nil |
+ return nil |
+ } |
+ |
+ methods := service.Methods() |
+ ret := make(map[string]*templateEntry, len(methods)) |
+ |
+ // due to a bug (feature?) in go-endpoints, method.Info().Name is always |
+ // lowercase, even when MethodByName expects the uppercase name. |
+ // So let's reflect and grab those keys :) |
+ methodNames := reflect.ValueOf(service).Elem().FieldByName("methods").MapKeys() |
+ for _, nameV := range methodNames { |
+ name := nameV.String() |
+ method := service.MethodByName(name) |
+ info := method.Info() |
+ path := templateConvert.Replace(info.Path) |
+ |
+ usedFields := map[string]struct{}{} |
+ for _, submatches := range getFieldsRE.FindAllStringSubmatch(path, -1) { |
+ usedFields[submatches[1]] = struct{}{} |
+ } |
+ |
+ ret[name] = &templateEntry{ |
+ template.Must(template.New(name).Parse(path)), |
+ usedFields, |
+ service.Info(), |
+ info, |
+ reflect.PtrTo(method.ReqType), |
+ reflect.PtrTo(method.RespType), |
+ } |
+ } |
+ |
+ c.Lock() |
+ defer c.Unlock() |
+ c.templateMap[serviceName] = ret // who cares if we overwrite it? |
+ return ret |
+} |
+ |
+func (c *client) getMethodInfo(serviceName, methodName string) (*templateEntry, error) { |
+ m := c.getTemplateMap(serviceName) |
+ if m == nil { |
+ return nil, mark(fmt.Errorf("getMethodInfo: no such service %s", serviceName)) |
+ } |
+ |
+ methodTmpl := m[methodName] |
+ if methodTmpl == nil { |
+ return nil, mark(fmt.Errorf("getMethodInfo: no such method %s.%s", serviceName, methodName)) |
+ } |
+ |
+ return methodTmpl, nil |
+} |
+ |
+func renderURL(base string, t *templateEntry, requestData interface{}) (string, error) { |
+ buf := &bytes.Buffer{} |
+ if err := t.Execute(buf, requestData); err != nil { |
+ return "", err |
+ } |
+ |
+ ret := fmt.Sprintf("%s/_ah/api/%s/%s/%s", base, |
+ t.si.Name, t.si.Version, buf.String()) |
+ |
+ addedQuestion := false |
+ |
+ if t.mi.HTTPMethod == "GET" && requestData != nil { |
+ val := reflect.ValueOf(requestData) |
+ stype := reflect.TypeOf(requestData) |
+ if stype.Kind() == reflect.Ptr { |
+ stype = stype.Elem() |
+ val = val.Elem() |
+ } |
+ for i := 0; i < stype.NumField(); i++ { |
+ f := stype.Field(i) |
+ if _, skip := t.used[f.Name]; !skip { |
+ connect := "?" |
+ if addedQuestion { |
+ connect = "&" |
+ } |
+ addedQuestion = true |
+ marshalled, err := json.Marshal(val.Field(i).Interface()) |
+ if err != nil { |
+ return "", err |
+ } |
+ ret = fmt.Sprintf("%s%s%s=%s", ret, connect, f.Name, string(marshalled)) |
+ } |
+ } |
+ } |
+ |
+ return ret, nil |
+} |
+ |
+func (c *client) DoWithRetries(serviceName, methodName string, in interface{}, out interface{}, upto uint) (err error) { |
+ for i := uint(0); i < upto; i++ { |
+ err = c.do(serviceName, methodName, in, out) |
+ if err != nil { |
+ sleepIval := Backoff(i + 1) |
+ ctx := logging.SetField(c.ctx, "retriesLeft", upto-i) |
+ ctx = logging.SetField(ctx, "sleepFor", sleepIval) |
+ if err != nil { |
+ ctx = logging.SetField(ctx, "prevErr", err) |
+ } |
+ logging.Get(ctx).Warningf("retrying") |
+ select { |
+ case <-ctx.Done(): |
+ return |
+ case <-time.After(sleepIval): |
+ } |
+ } else { |
+ break |
+ } |
+ } |
+ return |
+} |
+ |
+func mkHTTPDoer() (func(*http.Request) (*http.Response, error), func()) { |
Vadim Sh.
2015/06/03 18:13:39
consider using https://github.com/luci/luci-go/tre
|
+ // TODO(riannucci): Figure out why the default client isn't good enough. Was |
+ // seeing random hangups when running against appspot.com that the default |
+ // transport wasn't smart enough to deal with (e.g. would get Do calls which |
+ // died with 'Connection Closed' errors. |
+ transp := &http.Transport{ |
+ Dial: (&net.Dialer{ |
+ Timeout: 30 * time.Second, |
+ KeepAlive: 30 * time.Second, |
+ }).Dial, |
+ TLSHandshakeTimeout: 10 * time.Second, |
+ DisableKeepAlives: true, |
+ } |
+ ret := &http.Client{Transport: transp} |
+ return ret.Do, transp.CloseIdleConnections |
+} |
+ |
+func (c *client) do(serviceName, methodName string, in interface{}, out interface{}) error { |
+ do, closer := c.mkHTTPDoer() |
+ defer closer() |
+ |
+ tentry, err := c.getMethodInfo(serviceName, methodName) |
+ if err != nil { |
+ return err |
+ } |
+ |
+ if reflect.TypeOf(in) != tentry.reqType { |
+ return fmt.Errorf("do(%s) type mismatch for input. Got %T but expected %s", |
+ methodName, in, tentry.reqType) |
+ } |
+ // out must be a pointer to something. |
+ if reflect.TypeOf(out) != tentry.rspType { |
+ return fmt.Errorf("do(%s) type mismatch for output. Got %T but expected %s", |
+ methodName, out, tentry.rspType) |
+ } |
+ |
+ var inBuf io.ReadWriter |
+ if tentry.mi.HTTPMethod != "GET" { |
+ inBuf = &bytes.Buffer{} |
+ enc := json.NewEncoder(inBuf) |
+ err := enc.Encode(in) |
+ if err != nil { |
+ return err |
+ } |
+ } |
+ |
+ url, err := renderURL(c.url, tentry, in) |
+ if err != nil { |
+ return err |
+ } |
+ |
+ req, err := http.NewRequest(tentry.mi.HTTPMethod, url, inBuf) |
+ if err != nil { |
+ return err |
+ } |
+ if inBuf != nil { |
+ req.Header.Add("content-type", "application/json") |
+ } |
+ |
+ rsp, err := do(req) |
+ if err != nil { |
+ return err |
+ } |
+ defer rsp.Body.Close() |
+ |
+ data, err := ioutil.ReadAll(rsp.Body) |
+ if err != nil { |
+ return mark(fmt.Errorf("do(%s) failed (reading body): %s", methodName, err)) |
+ } |
+ |
+ if rsp.StatusCode/100 == 2 { |
Vadim Sh.
2015/06/03 18:13:39
wat? Is rsp.StatusCode >= 200 && rsp.StatusCode <
|
+ if out != nil { |
+ err = json.Unmarshal(data, out) |
+ } |
+ } else { |
+ msg := &map[string]map[string]interface{}{} |
+ err = json.Unmarshal(data, msg) |
+ if err != nil { |
+ return mark(fmt.Errorf("do(%s) failed (decoding json %q): %s", methodName, string(data), err)) |
+ } |
+ |
+ return mark(fmt.Errorf("do(%s) failed: %s", methodName, (*msg)["error"]["message"])) |
+ } |
+ return nil |
+} |
+ |
+func (c *client) ForService(serviceName string) (ServiceClient, error) { |
+ if c.serverDef.ServiceByName(serviceName) == nil { |
+ return nil, mark(fmt.Errorf("getMethodInfo: no such service %s", serviceName)) |
+ } |
+ return &serviceClient{c, serviceName}, nil |
+} |
+ |
+type serviceClient struct { |
+ c Client |
+ service string |
+} |
+ |
+var _ ServiceClient = (*serviceClient)(nil) |
+ |
+func (s *serviceClient) DoWithRetries(methodName string, in interface{}, out interface{}, upto uint) error { |
+ return s.c.DoWithRetries(s.service, methodName, in, out, upto) |
+} |