Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1594)

Unified Diff: common/prpc/client.go

Issue 1637193002: common/prpc, tools/cmd/cproto: prpc client (Closed) Base URL: https://github.com/luci/luci-go@prpc-server
Patch Set: Add some content length tests. Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « common/logging/fields.go ('k') | common/prpc/client_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: common/prpc/client.go
diff --git a/common/prpc/client.go b/common/prpc/client.go
index 61e02574cf741abdd4db36738049d398408607a2..76ff12b19915e0fc4d7521696b858938023e1c57 100644
--- a/common/prpc/client.go
+++ b/common/prpc/client.go
@@ -4,9 +4,287 @@
package prpc
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "net/url"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+ "golang.org/x/net/context"
+ "golang.org/x/net/context/ctxhttp"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/metadata"
+
+ "github.com/luci/luci-go/common/clock"
+ "github.com/luci/luci-go/common/errors"
+ "github.com/luci/luci-go/common/grpcutil"
+ "github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/common/retry"
+)
+
const (
// HeaderGRPCCode is a name of the HTTP header that specifies the
// gRPC code in the response.
// A pRPC server must always specify it.
HeaderGRPCCode = "X-Prpc-Grpc-Code"
+
+ // HeaderTimeout is HTTP header used to set pRPC request timeout.
+ // The single value should match regexp `\d+[HMSmun]`.
+ HeaderTimeout = "X-Prpc-Grpc-Timeout"
+
+ // DefaultMaxContentLength is the default maximum content length (in bytes)
+ // for a Client. It is 32MiB.
+ DefaultMaxContentLength = 32 * 1024 * 1024
)
+
+var (
+ // DefaultUserAgent is default User-Agent HTTP header for pRPC requests.
+ DefaultUserAgent = "pRPC Client 1.0"
+
+ // ErrResponseTooBig is returned by Call when the Response's body size exceeds
+ // the Client's soft limit, MaxContentLength.
+ ErrResponseTooBig = errors.New("response too big")
+)
+
+// Client can make pRPC calls.
+type Client struct {
+ C *http.Client // if nil, uses http.DefaultClient
+
+ // ErrBodySize is the number of bytes to read from a HTTP response
+ // with error status and include in the error.
+ // If non-positive, defaults to 256.
+ ErrBodySize int
+
+ // MaxContentLength, if > 0, is the maximum content length, in bytes, that a
+ // pRPC is willing to read from the server. If a larger content length is
+ // present in the response, ErrResponseTooBig will be returned.
+ //
+ // If <= 0, DefaultMaxContentLength will be used.
+ MaxContentLength int
+
+ Host string // host and optionally a port number of the target server.
+ Options *Options // if nil, DefaultOptions() are used.
+}
+
+// renderOptions copies client options and applies opts.
+func (c *Client) renderOptions(opts []grpc.CallOption) (*Options, error) {
+ var options *Options
+ if c.Options != nil {
+ cpy := *c.Options
+ options = &cpy
+ } else {
+ options = DefaultOptions()
+ }
+ if err := options.apply(opts); err != nil {
+ return nil, err
+ }
+ return options, nil
+}
+
+func (c *Client) getHTTPClient() *http.Client {
+ if c.C == nil {
+ return http.DefaultClient
+ }
+ return c.C
+}
+
+// Call makes an RPC.
+// Retries on transient errors according to retry options.
+// Logs HTTP errors.
+//
+// opts must be created by this package.
+// Calling from multiple goroutines concurrently is safe, unless Client is mutated.
+// Called from generated code.
+//
+// If there is a Deadline applied to the Context, it will be forwarded to the
+// server using the HeaderTimeout haeder.
+func (c *Client) Call(ctx context.Context, serviceName, methodName string, in, out proto.Message, opts ...grpc.CallOption) error {
+ options, err := c.renderOptions(opts)
+ if err != nil {
+ return err
+ }
+
+ reqBody, err := proto.Marshal(in)
+ if err != nil {
+ return err
+ }
+
+ req := prepareRequest(c.Host, serviceName, methodName, len(reqBody), options)
+ ctx = logging.SetFields(ctx, logging.Fields{
+ "host": c.Host,
+ "service": serviceName,
+ "method": methodName,
+ })
+
+ // Send the request in a retry loop.
+ var buf bytes.Buffer
+ err = retry.Retry(
+ ctx,
+ retry.TransientOnly(options.Retry),
+ func() error {
+ logging.Debugf(ctx, "RPC %s/%s.%s", c.Host, serviceName, methodName)
+
+ // If there is a deadline on our Context, set the timeout header on the
+ // request.
+ if deadline, ok := ctx.Deadline(); ok {
+ delta := deadline.Sub(clock.Now(ctx))
+ if delta <= 0 {
+ // The request has already expired. This will likely never happen,
+ // since the outer Retry loop will have expired, but there is a very
+ // slight possibility of a race.
+ return ctx.Err()
+ }
+
+ req.Header.Set(HeaderTimeout, EncodeTimeout(delta))
+ }
+
+ // Send the request.
+ req.Body = ioutil.NopCloser(bytes.NewReader(reqBody))
+ res, err := ctxhttp.Do(ctx, c.getHTTPClient(), req)
+ if err != nil {
+ return errors.WrapTransient(fmt.Errorf("failed to send request: %s", err))
+ }
+ defer res.Body.Close()
+
+ if options.resHeaderMetadata != nil {
+ *options.resHeaderMetadata = metadata.MD(res.Header).Copy()
+ }
+
+ // Read the response body.
+ buf.Reset()
+ var body io.Reader = res.Body
+
+ limit := c.MaxContentLength
+ if limit <= 0 {
+ limit = DefaultMaxContentLength
+ }
+ if l := res.ContentLength; l > 0 {
+ if l > int64(limit) {
+ logging.Fields{
+ "contentLength": l,
+ "limit": limit,
+ }.Errorf(ctx, "ContentLength header exceeds soft response body limit.")
+ return ErrResponseTooBig
+ }
+ limit = int(l)
+ buf.Grow(limit)
+ }
+ body = io.LimitReader(body, int64(limit))
+ if _, err = buf.ReadFrom(body); err != nil {
+ return fmt.Errorf("failed to read response body: %s", err)
+ }
+
+ // If there is more data in the body Reader, it means that the response
+ // size has exceeded our limit.
+ var probeB [1]byte
+ if amt, err := body.Read(probeB[:]); amt > 0 || err != io.EOF {
+ logging.Fields{
+ "limit": limit,
+ }.Errorf(ctx, "Soft response body limit exceeded.")
+ return ErrResponseTooBig
+ }
+
+ if options.resTrailerMetadata != nil {
+ *options.resTrailerMetadata = metadata.MD(res.Trailer).Copy()
+ }
+
+ codeHeader := res.Header.Get(HeaderGRPCCode)
+ if codeHeader == "" {
+ // Not a valid pRPC response.
+ body := buf.String()
+ bodySize := c.ErrBodySize
+ if bodySize <= 0 {
+ bodySize = 256
+ }
+ if len(body) > bodySize {
+ body = body[:bodySize] + "..."
+ }
+ return fmt.Errorf("HTTP %d: no gRPC code. Body: %q", res.StatusCode, body)
+ }
+
+ codeInt, err := strconv.Atoi(codeHeader)
+ if err != nil {
+ // Not a valid pRPC response.
+ return fmt.Errorf("invalid grpc code %q: %s", codeHeader, err)
+ }
+
+ code := codes.Code(codeInt)
+ if code != codes.OK {
+ desc := strings.TrimSuffix(buf.String(), "\n")
+ err := grpcutil.Errf(code, "%s", desc)
+ if isTransientCode(code) {
+ err = errors.WrapTransient(err)
+ }
+ return err
+ }
+
+ return proto.Unmarshal(buf.Bytes(), out) // non-transient error
+ },
+ func(err error, sleepTime time.Duration) {
+ logging.Fields{
+ logging.ErrorKey: err,
+ "sleepTime": sleepTime,
+ }.Warningf(ctx, "RPC failed transiently. Will retry in %s", sleepTime)
+ },
+ )
+
+ if err != nil {
+ logging.WithError(err).Warningf(ctx, "RPC failed permanently: %s", err)
+ }
+
+ // We have to unwrap gRPC errors because
+ // grpc.Code and grpc.ErrorDesc functions do not work with error wrappers.
+ // https://github.com/grpc/grpc-go/issues/494
+ return errors.UnwrapAll(err)
+}
+
+// prepareRequest creates an HTTP request for an RPC,
+// except it does not set the request body.
+func prepareRequest(host, serviceName, methodName string, contentLength int, options *Options) *http.Request {
+ if host == "" {
+ panic("Host is not set")
+ }
+ req := &http.Request{
+ Method: "POST",
+ URL: &url.URL{
+ Scheme: "https",
+ Host: host,
+ Path: fmt.Sprintf("/prpc/%s/%s", serviceName, methodName),
+ },
+ Header: http.Header{},
+ }
+ if options.Insecure {
+ req.URL.Scheme = "http"
+ }
+
+ // Set headers.
+ const mediaType = "application/prpc" // binary
+ req.Header.Set("Content-Type", mediaType)
+ req.Header.Set("Accept", mediaType)
+ userAgent := options.UserAgent
+ if userAgent == "" {
+ userAgent = DefaultUserAgent
+ }
+ req.Header.Set("User-Agent", userAgent)
+ req.ContentLength = int64(contentLength)
+ req.Header.Set("Content-Length", strconv.Itoa(contentLength))
+ // TODO(nodir): add "Accept-Encoding: gzip" when pRPC server supports it.
+ return req
+}
+
+func isTransientCode(code codes.Code) bool {
+ switch code {
+ case codes.Internal, codes.Unknown, codes.Unavailable:
+ return true
+
+ default:
+ return false
+ }
+}
« no previous file with comments | « common/logging/fields.go ('k') | common/prpc/client_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698