Chromium Code Reviews| Index: common/prpc/client.go |
| diff --git a/common/prpc/client.go b/common/prpc/client.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..946af0c95edafda590754b0c27479fe3771e8e2c |
| --- /dev/null |
| +++ b/common/prpc/client.go |
| @@ -0,0 +1,200 @@ |
| +// Copyright 2016 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package prpc |
| + |
| +import ( |
| + "bytes" |
| + "fmt" |
| + "io" |
| + "io/ioutil" |
| + "net/http" |
| + "net/url" |
| + "strconv" |
| + "time" |
| + |
| + "github.com/golang/protobuf/proto" |
| + "golang.org/x/net/context" |
| + "google.golang.org/grpc" |
| + "google.golang.org/grpc/codes" |
| + "google.golang.org/grpc/metadata" |
| + |
| + "github.com/luci/luci-go/common/errors" |
| + "github.com/luci/luci-go/common/logging" |
| + "github.com/luci/luci-go/common/retry" |
| + "github.com/luci/luci-go/common/transport" |
| +) |
| + |
| +// Client can make pRPC calls. |
| +type Client struct { |
| + Host string // host and optionally a port number of the target server. |
| + Options *Options // if nil, DefaultOptions() are used. |
| +} |
| + |
| +// NewClient creates a new pRPC client with default options. |
| +func NewClient(host string) *Client { |
|
dnj
2016/01/21 07:44:27
This seems unnecessary, since "Client" has no priv
nodir
2016/01/22 00:47:24
Done.
|
| + return &Client{Host: host, Options: DefaultOptions()} |
| +} |
| + |
| +// renderOptions copies client options and applies opts. |
| +func (c *Client) renderOptions(opts []grpc.CallOption) *Options { |
| + var options Options |
| + if c.Options != nil { |
| + options = *c.Options |
| + } else { |
| + options = *DefaultOptions() |
|
dnj
2016/01/21 07:44:28
DefaultOptions generates a new instance anyway. Cl
nodir
2016/01/22 00:47:24
Done.
|
| + } |
| + options.apply(opts) |
| + return &options |
| +} |
| + |
| +// Call makes an RPC. |
| +// Retries on transient errors according to retry options. |
| +// Logs HTTP errors. |
| +// |
| +// opts must be created by this package. |
| +// Calling from multiple goroutines concurrently is safe, unless Client is mutated. |
| +// Called from generated code. |
| +func (c *Client) Call(ctx context.Context, serviceName, methodName string, in, out proto.Message, opts ...grpc.CallOption) error { |
| + options := c.renderOptions(opts) |
| + |
| + reqBody, err := proto.Marshal(in) |
| + if err != nil { |
| + return err |
| + } |
| + |
| + req := prepareRequest(c.Host, serviceName, methodName, len(reqBody), options) |
| + fields := logging.Fields{ |
|
dnj
2016/01/21 07:44:28
Fields layer naturally via "SetFields". ctx = logg
nodir
2016/01/22 00:47:23
ah, I see. Done, but I don't see much code-size s
dnj
2016/01/22 01:16:27
Looks like you are.
It's less about code size cha
nodir
2016/01/22 02:57:50
ok so there is no "a lot less code"
dnj (Google)
2016/01/22 04:06:23
Well, depends, each logging message where you don'
|
| + "Host": c.Host, |
| + "Service": serviceName, |
| + "Method": methodName, |
| + } |
| + |
| + // Send the request in a retry loop. |
| + onRetry := func(err error, sleepTime time.Duration) { |
|
dnj
2016/01/21 07:44:27
I personally like to inline this w/ the retry.Retr
nodir
2016/01/22 00:47:23
Done.
|
| + fields := fields.Copy(logging.Fields{ |
|
dnj
2016/01/21 07:44:27
(Layering fields is a lot cleaner than explicitly
nodir
2016/01/22 00:47:23
Done.
|
| + logging.ErrorKey: err, |
| + "SleepTime": sleepTime, |
| + }) |
| + fields.Warningf(ctx, "RPC failed transiently. Will retry in %s", sleepTime) |
| + } |
| + var client http.Client |
| + client.Transport = transport.Get(ctx) |
|
dnj
2016/01/21 07:44:27
I seriously dislike embedding Transport in context
nodir
2016/01/22 00:47:24
If it is that serious, please start a discussion i
|
| + err = retry.Retry(ctx, retry.TransientOnly(options.Retry()), func() error { |
| + fields.Infof(ctx, "RPC %s/%s.%s", c.Host, serviceName, methodName) |
|
dnj
2016/01/21 07:44:27
This might be a bit wordy even for info level. May
nodir
2016/01/22 00:47:24
Done. It is wordy indeed, we use info in https://g
|
| + |
| + // Send the request. |
| + req.Body = ioutil.NopCloser(bytes.NewBuffer(reqBody)) |
|
dnj
2016/01/21 07:44:27
bytes.NewReader is more lightweight and appropriat
nodir
2016/01/22 00:47:24
Done.
|
| + res, err := client.Do(req) |
| + if err != nil { |
| + return errors.WrapTransient(fmt.Errorf("failed to send request: %s", err)) |
|
dnj
2016/01/21 07:44:28
IMO you should handle transient errors more surgic
nodir
2016/01/22 00:47:24
status is not available if err != nil
dnj
2016/01/22 01:16:27
This sounds like a hard failure to me then, not a
nodir
2016/01/22 02:57:50
Client.Do may return an error on URLFetch proxy fa
dnj (Google)
2016/01/22 04:06:23
Ugh okay. I really dislike that they don't give us
|
| + } |
| + defer res.Body.Close() |
| + |
| + if options.resHeaderMetadata != nil { |
| + *options.resHeaderMetadata = metadata.MD(res.Header).Copy() |
| + } |
| + |
| + // Check response status code. |
| + if res.StatusCode >= 300 { |
|
dnj
2016/01/21 07:44:27
Maybe http.StatusMultipleChoices instead of 300?
nodir
2016/01/22 00:47:24
name "MultipleChoises" does not imply it is the fi
dnj
2016/01/22 01:16:27
Actually now that I think about it, you control th
nodir
2016/01/22 02:57:50
Why couple tightly? HTTP status codes < 300 are no
dnj (Google)
2016/01/22 04:06:23
If we fully control our server's response codes an
|
| + return responseErr(ctx, res) |
| + } |
| + |
| + // Read and parse response message. |
| + buf, err := ioutil.ReadAll(res.Body) |
|
dnj
2016/01/21 07:44:27
Since you're doing this in a retry loop, you could
nodir
2016/01/22 00:47:24
Done.
|
| + if err != nil { |
| + return errors.WrapTransient(fmt.Errorf("failed to read response body: %s", err)) |
|
dnj
2016/01/21 07:44:27
Why is a failure to read body transient? This woul
nodir
2016/01/22 00:47:24
Isn't TCP interruption transient? In other words,
dnj
2016/01/22 01:16:27
It depends why the interruption happened. e.g, if
nodir
2016/01/22 02:57:50
Done
|
| + } |
| + |
| + if options.resTrailerMetadata != nil { |
| + *options.resTrailerMetadata = metadata.MD(res.Trailer).Copy() |
| + } |
| + |
| + return proto.Unmarshal(buf, out) // non-transient error |
| + }, onRetry) |
| + |
| + if err != nil { |
| + fields[logging.ErrorKey] = err |
|
dnj
2016/01/21 07:44:27
If you set fields, you can make this:
logging.With
nodir
2016/01/22 00:47:24
Done
|
| + fields.Warningf(ctx, "RPC failed permanently: %s", err) |
| + } |
| + |
| + // We have to unwrap gRPC errors because |
| + // grpc.Code and grpc.ErrorDesc functions do not work with error wrappers. |
| + // https://github.com/grpc/grpc-go/issues/494 |
| + return errors.UnwrapAll(err) |
| +} |
| + |
| +// prepareRequest creates an HTTP request for an RPC, |
| +// except it does not set the request body. |
| +func prepareRequest(host, serviceName, methodName string, contentLength int, options *Options) *http.Request { |
| + if host == "" { |
| + panic("Host is not set") |
| + } |
| + req := &http.Request{ |
| + Method: "POST", |
| + URL: &url.URL{ |
| + Scheme: "https", |
| + Host: host, |
| + Path: fmt.Sprintf("/prpc/%s/%s", serviceName, methodName), |
| + }, |
| + Header: http.Header{}, |
| + } |
| + if options.Insecure { |
| + req.URL.Scheme = "http" |
| + } |
| + |
| + // Set headers. |
| + const mediaType = "application/prpc" // binary |
| + req.Header.Set("Content-Type", mediaType) |
| + req.Header.Set("Accept", mediaType) |
| + if options.UserAgent != "" { |
| + req.Header.Set("User-Agent", options.UserAgent) |
|
dnj
2016/01/21 07:44:27
Endpoints clients have a default user agent (e.g.,
nodir
2016/01/22 00:47:24
ah right, forgot about that
|
| + } |
| + req.ContentLength = int64(contentLength) |
| + req.Header.Set("Content-Length", strconv.Itoa(contentLength)) |
| + // TODO(nodir): add "Accept-Encoding: gzip" when pRPC server supports it. |
| + return req |
| +} |
| + |
| +// responseErr converts an HTTP response to a gRPC error. |
| +// The returned error may be wrapped with errors.WrapTransient. |
| +func responseErr(c context.Context, res *http.Response) error { |
| + // Read first 256 bytes of the body. Leave empty if cannot read. |
| + var body string |
| + const maxBody = 256 |
|
dnj
2016/01/21 07:44:27
Perhaps this could be a Client setting? That way,
nodir
2016/01/22 00:47:24
Done.
|
| + bodyBytes, err := ioutil.ReadAll(io.LimitReader(res.Body, maxBody)) |
| + if err == nil { |
| + if len(bodyBytes) == maxBody { |
| + bodyBytes = append(bodyBytes, []byte("...")...) |
| + } |
| + body = string(bodyBytes) |
| + } |
| + |
| + const noCode = 0xffffffff |
|
dnj
2016/01/21 07:44:27
Rather than use a sentinel value, just use a boole
nodir
2016/01/22 00:47:24
not relevant any more
|
| + code := codes.Code(noCode) |
| + // Read explicit gRPC code. |
| + const headerName = "X-Prpc-Grpc-Code" |
|
dnj
2016/01/21 07:44:28
Shared constant w/ server?
nodir
2016/01/22 00:47:23
Added a constant
|
| + if codeHeader := res.Header.Get(headerName); codeHeader != "" { |
| + if intCode, err := strconv.Atoi(codeHeader); err != nil { |
| + logging.Warningf(c, "could not parse %s header: %s", headerName, err) |
|
dnj
2016/01/21 07:44:28
Capitalize logging messages. Use fields:
logging.
nodir
2016/01/22 00:47:23
Done
dnj
2016/01/22 01:16:27
Acknowledged.
|
| + } else { |
| + code = codes.Code(intCode) |
| + } |
| + } |
| + if code == noCode { |
| + code = StatusCode(res.StatusCode) |
| + } |
| + |
| + // Return the error. |
| + err = grpc.Errorf(code, "HTTP %s: %s", res.Status, body) |
| + if isTransientStatus(res.StatusCode) { |
| + err = errors.WrapTransient(err) |
| + } |
| + return err |
| +} |
| + |
| +// isTransientStatus returns true if an HTTP status code indicates a transient error. |
| +func isTransientStatus(status int) bool { |
| + return status >= 500 || status == http.StatusRequestTimeout |
| +} |