Index: common/prpc/client.go |
diff --git a/common/prpc/client.go b/common/prpc/client.go |
index 61e02574cf741abdd4db36738049d398408607a2..76ff12b19915e0fc4d7521696b858938023e1c57 100644 |
--- a/common/prpc/client.go |
+++ b/common/prpc/client.go |
@@ -4,9 +4,287 @@ |
package prpc |
+import ( |
+ "bytes" |
+ "fmt" |
+ "io" |
+ "io/ioutil" |
+ "net/http" |
+ "net/url" |
+ "strconv" |
+ "strings" |
+ "time" |
+ |
+ "github.com/golang/protobuf/proto" |
+ "golang.org/x/net/context" |
+ "golang.org/x/net/context/ctxhttp" |
+ "google.golang.org/grpc" |
+ "google.golang.org/grpc/codes" |
+ "google.golang.org/grpc/metadata" |
+ |
+ "github.com/luci/luci-go/common/clock" |
+ "github.com/luci/luci-go/common/errors" |
+ "github.com/luci/luci-go/common/grpcutil" |
+ "github.com/luci/luci-go/common/logging" |
+ "github.com/luci/luci-go/common/retry" |
+) |
+ |
const ( |
// HeaderGRPCCode is a name of the HTTP header that specifies the |
// gRPC code in the response. |
// A pRPC server must always specify it. |
HeaderGRPCCode = "X-Prpc-Grpc-Code" |
+ |
+ // HeaderTimeout is HTTP header used to set pRPC request timeout. |
+ // The single value should match regexp `\d+[HMSmun]`. |
+ HeaderTimeout = "X-Prpc-Grpc-Timeout" |
+ |
+ // DefaultMaxContentLength is the default maximum content length (in bytes) |
+ // for a Client. It is 32MiB. |
+ DefaultMaxContentLength = 32 * 1024 * 1024 |
) |
+ |
+var ( |
+ // DefaultUserAgent is default User-Agent HTTP header for pRPC requests. |
+ DefaultUserAgent = "pRPC Client 1.0" |
+ |
+ // ErrResponseTooBig is returned by Call when the Response's body size exceeds |
+ // the Client's soft limit, MaxContentLength. |
+ ErrResponseTooBig = errors.New("response too big") |
+) |
+ |
+// Client can make pRPC calls. |
+type Client struct { |
+ C *http.Client // if nil, uses http.DefaultClient |
+ |
+ // ErrBodySize is the number of bytes to read from a HTTP response |
+ // with error status and include in the error. |
+ // If non-positive, defaults to 256. |
+ ErrBodySize int |
+ |
+ // MaxContentLength, if > 0, is the maximum content length, in bytes, that a |
+ // pRPC is willing to read from the server. If a larger content length is |
+ // present in the response, ErrResponseTooBig will be returned. |
+ // |
+ // If <= 0, DefaultMaxContentLength will be used. |
+ MaxContentLength int |
+ |
+ Host string // host and optionally a port number of the target server. |
+ Options *Options // if nil, DefaultOptions() are used. |
+} |
+ |
+// renderOptions copies client options and applies opts. |
+func (c *Client) renderOptions(opts []grpc.CallOption) (*Options, error) { |
+ var options *Options |
+ if c.Options != nil { |
+ cpy := *c.Options |
+ options = &cpy |
+ } else { |
+ options = DefaultOptions() |
+ } |
+ if err := options.apply(opts); err != nil { |
+ return nil, err |
+ } |
+ return options, nil |
+} |
+ |
+func (c *Client) getHTTPClient() *http.Client { |
+ if c.C == nil { |
+ return http.DefaultClient |
+ } |
+ return c.C |
+} |
+ |
+// Call makes an RPC. |
+// Retries on transient errors according to retry options. |
+// Logs HTTP errors. |
+// |
+// opts must be created by this package. |
+// Calling from multiple goroutines concurrently is safe, unless Client is mutated. |
+// Called from generated code. |
+// |
+// If there is a Deadline applied to the Context, it will be forwarded to the |
+// server using the HeaderTimeout haeder. |
+func (c *Client) Call(ctx context.Context, serviceName, methodName string, in, out proto.Message, opts ...grpc.CallOption) error { |
+ options, err := c.renderOptions(opts) |
+ if err != nil { |
+ return err |
+ } |
+ |
+ reqBody, err := proto.Marshal(in) |
+ if err != nil { |
+ return err |
+ } |
+ |
+ req := prepareRequest(c.Host, serviceName, methodName, len(reqBody), options) |
+ ctx = logging.SetFields(ctx, logging.Fields{ |
+ "host": c.Host, |
+ "service": serviceName, |
+ "method": methodName, |
+ }) |
+ |
+ // Send the request in a retry loop. |
+ var buf bytes.Buffer |
+ err = retry.Retry( |
+ ctx, |
+ retry.TransientOnly(options.Retry), |
+ func() error { |
+ logging.Debugf(ctx, "RPC %s/%s.%s", c.Host, serviceName, methodName) |
+ |
+ // If there is a deadline on our Context, set the timeout header on the |
+ // request. |
+ if deadline, ok := ctx.Deadline(); ok { |
+ delta := deadline.Sub(clock.Now(ctx)) |
+ if delta <= 0 { |
+ // The request has already expired. This will likely never happen, |
+ // since the outer Retry loop will have expired, but there is a very |
+ // slight possibility of a race. |
+ return ctx.Err() |
+ } |
+ |
+ req.Header.Set(HeaderTimeout, EncodeTimeout(delta)) |
+ } |
+ |
+ // Send the request. |
+ req.Body = ioutil.NopCloser(bytes.NewReader(reqBody)) |
+ res, err := ctxhttp.Do(ctx, c.getHTTPClient(), req) |
+ if err != nil { |
+ return errors.WrapTransient(fmt.Errorf("failed to send request: %s", err)) |
+ } |
+ defer res.Body.Close() |
+ |
+ if options.resHeaderMetadata != nil { |
+ *options.resHeaderMetadata = metadata.MD(res.Header).Copy() |
+ } |
+ |
+ // Read the response body. |
+ buf.Reset() |
+ var body io.Reader = res.Body |
+ |
+ limit := c.MaxContentLength |
+ if limit <= 0 { |
+ limit = DefaultMaxContentLength |
+ } |
+ if l := res.ContentLength; l > 0 { |
+ if l > int64(limit) { |
+ logging.Fields{ |
+ "contentLength": l, |
+ "limit": limit, |
+ }.Errorf(ctx, "ContentLength header exceeds soft response body limit.") |
+ return ErrResponseTooBig |
+ } |
+ limit = int(l) |
+ buf.Grow(limit) |
+ } |
+ body = io.LimitReader(body, int64(limit)) |
+ if _, err = buf.ReadFrom(body); err != nil { |
+ return fmt.Errorf("failed to read response body: %s", err) |
+ } |
+ |
+ // If there is more data in the body Reader, it means that the response |
+ // size has exceeded our limit. |
+ var probeB [1]byte |
+ if amt, err := body.Read(probeB[:]); amt > 0 || err != io.EOF { |
+ logging.Fields{ |
+ "limit": limit, |
+ }.Errorf(ctx, "Soft response body limit exceeded.") |
+ return ErrResponseTooBig |
+ } |
+ |
+ if options.resTrailerMetadata != nil { |
+ *options.resTrailerMetadata = metadata.MD(res.Trailer).Copy() |
+ } |
+ |
+ codeHeader := res.Header.Get(HeaderGRPCCode) |
+ if codeHeader == "" { |
+ // Not a valid pRPC response. |
+ body := buf.String() |
+ bodySize := c.ErrBodySize |
+ if bodySize <= 0 { |
+ bodySize = 256 |
+ } |
+ if len(body) > bodySize { |
+ body = body[:bodySize] + "..." |
+ } |
+ return fmt.Errorf("HTTP %d: no gRPC code. Body: %q", res.StatusCode, body) |
+ } |
+ |
+ codeInt, err := strconv.Atoi(codeHeader) |
+ if err != nil { |
+ // Not a valid pRPC response. |
+ return fmt.Errorf("invalid grpc code %q: %s", codeHeader, err) |
+ } |
+ |
+ code := codes.Code(codeInt) |
+ if code != codes.OK { |
+ desc := strings.TrimSuffix(buf.String(), "\n") |
+ err := grpcutil.Errf(code, "%s", desc) |
+ if isTransientCode(code) { |
+ err = errors.WrapTransient(err) |
+ } |
+ return err |
+ } |
+ |
+ return proto.Unmarshal(buf.Bytes(), out) // non-transient error |
+ }, |
+ func(err error, sleepTime time.Duration) { |
+ logging.Fields{ |
+ logging.ErrorKey: err, |
+ "sleepTime": sleepTime, |
+ }.Warningf(ctx, "RPC failed transiently. Will retry in %s", sleepTime) |
+ }, |
+ ) |
+ |
+ if err != nil { |
+ logging.WithError(err).Warningf(ctx, "RPC failed permanently: %s", err) |
+ } |
+ |
+ // We have to unwrap gRPC errors because |
+ // grpc.Code and grpc.ErrorDesc functions do not work with error wrappers. |
+ // https://github.com/grpc/grpc-go/issues/494 |
+ return errors.UnwrapAll(err) |
+} |
+ |
+// prepareRequest creates an HTTP request for an RPC, |
+// except it does not set the request body. |
+func prepareRequest(host, serviceName, methodName string, contentLength int, options *Options) *http.Request { |
+ if host == "" { |
+ panic("Host is not set") |
+ } |
+ req := &http.Request{ |
+ Method: "POST", |
+ URL: &url.URL{ |
+ Scheme: "https", |
+ Host: host, |
+ Path: fmt.Sprintf("/prpc/%s/%s", serviceName, methodName), |
+ }, |
+ Header: http.Header{}, |
+ } |
+ if options.Insecure { |
+ req.URL.Scheme = "http" |
+ } |
+ |
+ // Set headers. |
+ const mediaType = "application/prpc" // binary |
+ req.Header.Set("Content-Type", mediaType) |
+ req.Header.Set("Accept", mediaType) |
+ userAgent := options.UserAgent |
+ if userAgent == "" { |
+ userAgent = DefaultUserAgent |
+ } |
+ req.Header.Set("User-Agent", userAgent) |
+ req.ContentLength = int64(contentLength) |
+ req.Header.Set("Content-Length", strconv.Itoa(contentLength)) |
+ // TODO(nodir): add "Accept-Encoding: gzip" when pRPC server supports it. |
+ return req |
+} |
+ |
+func isTransientCode(code codes.Code) bool { |
+ switch code { |
+ case codes.Internal, codes.Unknown, codes.Unavailable: |
+ return true |
+ |
+ default: |
+ return false |
+ } |
+} |