Chromium Code Reviews| Index: common/prpc/client.go |
| diff --git a/common/prpc/client.go b/common/prpc/client.go |
| index 61e02574cf741abdd4db36738049d398408607a2..657234ea8b14ac7f2e6be29088665ad068ee8837 100644 |
| --- a/common/prpc/client.go |
| +++ b/common/prpc/client.go |
| @@ -4,9 +4,242 @@ |
| package prpc |
| +import ( |
| + "bytes" |
| + "fmt" |
| + "io" |
| + "io/ioutil" |
| + "net/http" |
| + "net/url" |
| + "strconv" |
| + "strings" |
| + "time" |
| + |
| + "github.com/golang/protobuf/proto" |
| + "golang.org/x/net/context" |
| + "golang.org/x/net/context/ctxhttp" |
| + "google.golang.org/grpc" |
| + "google.golang.org/grpc/codes" |
| + "google.golang.org/grpc/metadata" |
| + |
| + "github.com/luci/luci-go/common/clock" |
| + "github.com/luci/luci-go/common/errors" |
| + "github.com/luci/luci-go/common/grpcutil" |
| + "github.com/luci/luci-go/common/logging" |
| + "github.com/luci/luci-go/common/retry" |
| +) |
| + |
| const ( |
| // HeaderGRPCCode is a name of the HTTP header that specifies the |
| // gRPC code in the response. |
| // A pRPC server must always specify it. |
| HeaderGRPCCode = "X-Prpc-Grpc-Code" |
| + |
| + // HeaderTimeout is HTTP header used to set pRPC request timeout. |
| + // The single value should match regexp `\d+[HMSmun]`. |
| + HeaderTimeout = "X-Prpc-Grpc-Deadline" |
|
nodir
2016/01/27 03:04:18
was accidentally renamed to deadline?
fwiu, usual
dnj (Google)
2016/01/27 03:25:53
Done.
|
| ) |
| + |
| +// DefaultUserAgent is default User-Agent HTTP header for pRPC requests. |
| +var DefaultUserAgent = "pRPC Client 1.0" |
| + |
| +// Client can make pRPC calls. |
| +type Client struct { |
| + C *http.Client // if nil, uses http.DefaultClient |
| + |
| + // ErrBodySize is the number of bytes to read from a HTTP response |
| + // with error status and include in the error. |
| + // If non-positive, defaults to 256. |
| + ErrBodySize int |
| + |
| + Host string // host and optionally a port number of the target server. |
| + Options *Options // if nil, DefaultOptions() are used. |
| +} |
| + |
| +// renderOptions copies client options and applies opts. |
| +func (c *Client) renderOptions(opts []grpc.CallOption) *Options { |
| + var options *Options |
| + if c.Options != nil { |
| + cpy := *c.Options |
| + options = &cpy |
| + } else { |
| + options = DefaultOptions() |
| + } |
| + options.apply(opts) |
| + return options |
| +} |
| + |
| +func (c *Client) getHTTPClient() *http.Client { |
| + if c.C == nil { |
| + return http.DefaultClient |
| + } |
| + return c.C |
| +} |
| + |
| +// Call makes an RPC. |
| +// Retries on transient errors according to retry options. |
| +// Logs HTTP errors. |
| +// |
| +// opts must be created by this package. |
| +// Calling from multiple goroutines concurrently is safe, unless Client is mutated. |
| +// Called from generated code. |
| +// |
| +// If there is a Deadline applied to the Context, it will be forwarded to the |
| +// server using the HeaderTimeout heaer. |
|
dnj (Google)
2016/01/27 02:42:54
typo will fix.
iannucci
2016/01/27 02:55:50
heaer typo
|
| +func (c *Client) Call(ctx context.Context, serviceName, methodName string, in, out proto.Message, opts ...grpc.CallOption) error { |
|
iannucci
2016/01/27 02:55:49
this should probably just be CallOption: if we s/p
dnj (Google)
2016/01/27 03:24:26
I don't really mind this. The fall-through is nice
|
| + options := c.renderOptions(opts) |
| + |
| + reqBody, err := proto.Marshal(in) |
| + if err != nil { |
| + return err |
| + } |
| + |
| + req := prepareRequest(c.Host, serviceName, methodName, len(reqBody), options) |
| + ctx = logging.SetFields(ctx, logging.Fields{ |
| + "host": c.Host, |
| + "service": serviceName, |
| + "method": methodName, |
| + }) |
| + |
| + // Send the request in a retry loop. |
| + var buf bytes.Buffer |
| + err = retry.Retry( |
| + ctx, |
| + retry.TransientOnly(options.Retry), |
| + func() error { |
| + logging.Debugf(ctx, "RPC %s/%s.%s", c.Host, serviceName, methodName) |
| + |
| + // If there is a deadline on our Context, set the timeout header on the |
| + // request. |
| + if deadline, ok := ctx.Deadline(); ok { |
| + delta := deadline.Sub(clock.Now(ctx)) |
| + if delta <= 0 { |
| + // The request has already expired. This will likely never happen, |
| + // since the outer Retry loop will have expired, but there is a very |
| + // slight possibility of a race. |
| + return ctx.Err() |
| + } |
| + |
| + req.Header.Set(HeaderTimeout, EncodeTimeout(delta)) |
| + } |
| + |
| + // Send the request. |
| + req.Body = ioutil.NopCloser(bytes.NewReader(reqBody)) |
| + res, err := ctxhttp.Do(ctx, c.getHTTPClient(), req) |
| + if err != nil { |
| + return errors.WrapTransient(fmt.Errorf("failed to send request: %s", err)) |
| + } |
| + defer res.Body.Close() |
| + |
| + if options.resHeaderMetadata != nil { |
| + *options.resHeaderMetadata = metadata.MD(res.Header).Copy() |
| + } |
| + |
| + // Read the response body. |
| + buf.Reset() |
| + var body io.Reader = res.Body |
| + if res.ContentLength > 0 { |
| + buf.Grow(int(res.ContentLength)) |
|
iannucci
2016/01/27 02:55:49
hm... what if this number is enormous? Can we cap
dnj (Google)
2016/01/27 03:24:25
I've added a soft limit, default limits, and check
|
| + body = io.LimitReader(body, res.ContentLength) |
| + } |
| + if _, err = buf.ReadFrom(body); err != nil { |
| + return fmt.Errorf("failed to read response body: %s", err) |
| + } |
| + |
| + if options.resTrailerMetadata != nil { |
| + *options.resTrailerMetadata = metadata.MD(res.Trailer).Copy() |
| + } |
| + |
| + codeHeader := res.Header.Get(HeaderGRPCCode) |
| + if codeHeader == "" { |
| + // Not a valid pRPC response. |
| + body := buf.String() |
| + bodySize := c.ErrBodySize |
| + if bodySize <= 0 { |
| + bodySize = 256 |
| + } |
| + if len(body) > bodySize { |
| + body = body[:bodySize] + "..." |
| + } |
| + return fmt.Errorf("HTTP %d: no gRPC code. Body: %s", res.StatusCode, body) |
|
iannucci
2016/01/27 02:55:49
what if it's not valid utf8? can we quote it?
dnj (Google)
2016/01/27 03:24:26
Done.
|
| + } |
| + |
| + codeInt, err := strconv.Atoi(codeHeader) |
| + if err != nil { |
| + // Not a valid pRPC response. |
| + return fmt.Errorf("invalid grpc code %q: %s", codeHeader, err) |
| + } |
| + |
| + code := codes.Code(codeInt) |
| + if code != codes.OK { |
| + desc := strings.TrimSuffix(buf.String(), "\n") |
| + err := grpcutil.Errf(code, "%s", desc) |
| + if isTransientCode(code) { |
| + err = errors.WrapTransient(err) |
| + } |
| + return err |
| + } |
| + |
| + return proto.Unmarshal(buf.Bytes(), out) // non-transient error |
| + }, |
| + func(err error, sleepTime time.Duration) { |
| + logging.Fields{ |
| + logging.ErrorKey: err, |
| + "sleepTime": sleepTime, |
| + }.Warningf(ctx, "RPC failed transiently. Will retry in %s", sleepTime) |
| + }, |
| + ) |
| + |
| + if err != nil { |
| + logging.WithError(err).Warningf(ctx, "RPC failed permanently: %s", err) |
| + } |
| + |
| + // We have to unwrap gRPC errors because |
| + // grpc.Code and grpc.ErrorDesc functions do not work with error wrappers. |
| + // https://github.com/grpc/grpc-go/issues/494 |
| + return errors.UnwrapAll(err) |
| +} |
| + |
| +// prepareRequest creates an HTTP request for an RPC, |
| +// except it does not set the request body. |
| +func prepareRequest(host, serviceName, methodName string, contentLength int, options *Options) *http.Request { |
| + if host == "" { |
| + panic("Host is not set") |
| + } |
| + req := &http.Request{ |
| + Method: "POST", |
| + URL: &url.URL{ |
| + Scheme: "https", |
| + Host: host, |
| + Path: fmt.Sprintf("/prpc/%s/%s", serviceName, methodName), |
| + }, |
| + Header: http.Header{}, |
| + } |
| + if options.Insecure { |
| + req.URL.Scheme = "http" |
| + } |
| + |
| + // Set headers. |
| + const mediaType = "application/prpc" // binary |
| + req.Header.Set("Content-Type", mediaType) |
| + req.Header.Set("Accept", mediaType) |
| + userAgent := options.UserAgent |
| + if userAgent == "" { |
| + userAgent = DefaultUserAgent |
| + } |
| + req.Header.Set("User-Agent", userAgent) |
| + req.ContentLength = int64(contentLength) |
| + req.Header.Set("Content-Length", strconv.Itoa(contentLength)) |
| + // TODO(nodir): add "Accept-Encoding: gzip" when pRPC server supports it. |
| + return req |
| +} |
| + |
| +func isTransientCode(code codes.Code) bool { |
| + switch code { |
| + case codes.Internal, codes.Unknown, codes.Unavailable: |
| + return true |
| + |
| + default: |
| + return false |
| + } |
| +} |