OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package prpc | 5 package prpc |
6 | 6 |
| 7 import ( |
| 8 "bytes" |
| 9 "fmt" |
| 10 "io" |
| 11 "io/ioutil" |
| 12 "net/http" |
| 13 "net/url" |
| 14 "strconv" |
| 15 "strings" |
| 16 "time" |
| 17 |
| 18 "github.com/golang/protobuf/proto" |
| 19 "golang.org/x/net/context" |
| 20 "golang.org/x/net/context/ctxhttp" |
| 21 "google.golang.org/grpc" |
| 22 "google.golang.org/grpc/codes" |
| 23 "google.golang.org/grpc/metadata" |
| 24 |
| 25 "github.com/luci/luci-go/common/clock" |
| 26 "github.com/luci/luci-go/common/errors" |
| 27 "github.com/luci/luci-go/common/grpcutil" |
| 28 "github.com/luci/luci-go/common/logging" |
| 29 "github.com/luci/luci-go/common/retry" |
| 30 ) |
| 31 |
7 const ( | 32 const ( |
8 // HeaderGRPCCode is a name of the HTTP header that specifies the | 33 // HeaderGRPCCode is a name of the HTTP header that specifies the |
9 // gRPC code in the response. | 34 // gRPC code in the response. |
10 // A pRPC server must always specify it. | 35 // A pRPC server must always specify it. |
11 HeaderGRPCCode = "X-Prpc-Grpc-Code" | 36 HeaderGRPCCode = "X-Prpc-Grpc-Code" |
| 37 |
| 38 // HeaderTimeout is HTTP header used to set pRPC request timeout. |
| 39 // The single value should match regexp `\d+[HMSmun]`. |
| 40 HeaderTimeout = "X-Prpc-Grpc-Timeout" |
| 41 |
| 42 // DefaultMaxContentLength is the default maximum content length (in byt
es) |
| 43 // for a Client. It is 32MiB. |
| 44 DefaultMaxContentLength = 32 * 1024 * 1024 |
12 ) | 45 ) |
| 46 |
| 47 var ( |
| 48 // DefaultUserAgent is default User-Agent HTTP header for pRPC requests. |
| 49 DefaultUserAgent = "pRPC Client 1.0" |
| 50 |
| 51 // ErrResponseTooBig is returned by Call when the Response's body size e
xceeds |
| 52 // the Client's soft limit, MaxContentLength. |
| 53 ErrResponseTooBig = errors.New("response too big") |
| 54 ) |
| 55 |
| 56 // Client can make pRPC calls. |
| 57 type Client struct { |
| 58 C *http.Client // if nil, uses http.DefaultClient |
| 59 |
| 60 // ErrBodySize is the number of bytes to read from a HTTP response |
| 61 // with error status and include in the error. |
| 62 // If non-positive, defaults to 256. |
| 63 ErrBodySize int |
| 64 |
| 65 // MaxContentLength, if > 0, is the maximum content length, in bytes, th
at a |
| 66 // pRPC is willing to read from the server. If a larger content length i
s |
| 67 // present in the response, ErrResponseTooBig will be returned. |
| 68 // |
| 69 // If <= 0, DefaultMaxContentLength will be used. |
| 70 MaxContentLength int |
| 71 |
| 72 Host string // host and optionally a port number of the target serv
er. |
| 73 Options *Options // if nil, DefaultOptions() are used. |
| 74 } |
| 75 |
| 76 // renderOptions copies client options and applies opts. |
| 77 func (c *Client) renderOptions(opts []grpc.CallOption) (*Options, error) { |
| 78 var options *Options |
| 79 if c.Options != nil { |
| 80 cpy := *c.Options |
| 81 options = &cpy |
| 82 } else { |
| 83 options = DefaultOptions() |
| 84 } |
| 85 if err := options.apply(opts); err != nil { |
| 86 return nil, err |
| 87 } |
| 88 return options, nil |
| 89 } |
| 90 |
| 91 func (c *Client) getHTTPClient() *http.Client { |
| 92 if c.C == nil { |
| 93 return http.DefaultClient |
| 94 } |
| 95 return c.C |
| 96 } |
| 97 |
| 98 // Call makes an RPC. |
| 99 // Retries on transient errors according to retry options. |
| 100 // Logs HTTP errors. |
| 101 // |
| 102 // opts must be created by this package. |
| 103 // Calling from multiple goroutines concurrently is safe, unless Client is mutat
ed. |
| 104 // Called from generated code. |
| 105 // |
| 106 // If there is a Deadline applied to the Context, it will be forwarded to the |
| 107 // server using the HeaderTimeout haeder. |
| 108 func (c *Client) Call(ctx context.Context, serviceName, methodName string, in, o
ut proto.Message, opts ...grpc.CallOption) error { |
| 109 options, err := c.renderOptions(opts) |
| 110 if err != nil { |
| 111 return err |
| 112 } |
| 113 |
| 114 reqBody, err := proto.Marshal(in) |
| 115 if err != nil { |
| 116 return err |
| 117 } |
| 118 |
| 119 req := prepareRequest(c.Host, serviceName, methodName, len(reqBody), opt
ions) |
| 120 ctx = logging.SetFields(ctx, logging.Fields{ |
| 121 "host": c.Host, |
| 122 "service": serviceName, |
| 123 "method": methodName, |
| 124 }) |
| 125 |
| 126 // Send the request in a retry loop. |
| 127 var buf bytes.Buffer |
| 128 err = retry.Retry( |
| 129 ctx, |
| 130 retry.TransientOnly(options.Retry), |
| 131 func() error { |
| 132 logging.Debugf(ctx, "RPC %s/%s.%s", c.Host, serviceName,
methodName) |
| 133 |
| 134 // If there is a deadline on our Context, set the timeou
t header on the |
| 135 // request. |
| 136 if deadline, ok := ctx.Deadline(); ok { |
| 137 delta := deadline.Sub(clock.Now(ctx)) |
| 138 if delta <= 0 { |
| 139 // The request has already expired. This
will likely never happen, |
| 140 // since the outer Retry loop will have
expired, but there is a very |
| 141 // slight possibility of a race. |
| 142 return ctx.Err() |
| 143 } |
| 144 |
| 145 req.Header.Set(HeaderTimeout, EncodeTimeout(delt
a)) |
| 146 } |
| 147 |
| 148 // Send the request. |
| 149 req.Body = ioutil.NopCloser(bytes.NewReader(reqBody)) |
| 150 res, err := ctxhttp.Do(ctx, c.getHTTPClient(), req) |
| 151 if err != nil { |
| 152 return errors.WrapTransient(fmt.Errorf("failed t
o send request: %s", err)) |
| 153 } |
| 154 defer res.Body.Close() |
| 155 |
| 156 if options.resHeaderMetadata != nil { |
| 157 *options.resHeaderMetadata = metadata.MD(res.Hea
der).Copy() |
| 158 } |
| 159 |
| 160 // Read the response body. |
| 161 buf.Reset() |
| 162 var body io.Reader = res.Body |
| 163 |
| 164 limit := c.MaxContentLength |
| 165 if limit <= 0 { |
| 166 limit = DefaultMaxContentLength |
| 167 } |
| 168 if l := res.ContentLength; l > 0 { |
| 169 if l > int64(limit) { |
| 170 logging.Fields{ |
| 171 "contentLength": l, |
| 172 "limit": limit, |
| 173 }.Errorf(ctx, "ContentLength header exce
eds soft response body limit.") |
| 174 return ErrResponseTooBig |
| 175 } |
| 176 limit = int(l) |
| 177 buf.Grow(limit) |
| 178 } |
| 179 body = io.LimitReader(body, int64(limit)) |
| 180 if _, err = buf.ReadFrom(body); err != nil { |
| 181 return fmt.Errorf("failed to read response body:
%s", err) |
| 182 } |
| 183 |
| 184 // If there is more data in the body Reader, it means th
at the response |
| 185 // size has exceeded our limit. |
| 186 var probeB [1]byte |
| 187 if amt, err := body.Read(probeB[:]); amt > 0 || err != i
o.EOF { |
| 188 logging.Fields{ |
| 189 "limit": limit, |
| 190 }.Errorf(ctx, "Soft response body limit exceeded
.") |
| 191 return ErrResponseTooBig |
| 192 } |
| 193 |
| 194 if options.resTrailerMetadata != nil { |
| 195 *options.resTrailerMetadata = metadata.MD(res.Tr
ailer).Copy() |
| 196 } |
| 197 |
| 198 codeHeader := res.Header.Get(HeaderGRPCCode) |
| 199 if codeHeader == "" { |
| 200 // Not a valid pRPC response. |
| 201 body := buf.String() |
| 202 bodySize := c.ErrBodySize |
| 203 if bodySize <= 0 { |
| 204 bodySize = 256 |
| 205 } |
| 206 if len(body) > bodySize { |
| 207 body = body[:bodySize] + "..." |
| 208 } |
| 209 return fmt.Errorf("HTTP %d: no gRPC code. Body:
%q", res.StatusCode, body) |
| 210 } |
| 211 |
| 212 codeInt, err := strconv.Atoi(codeHeader) |
| 213 if err != nil { |
| 214 // Not a valid pRPC response. |
| 215 return fmt.Errorf("invalid grpc code %q: %s", co
deHeader, err) |
| 216 } |
| 217 |
| 218 code := codes.Code(codeInt) |
| 219 if code != codes.OK { |
| 220 desc := strings.TrimSuffix(buf.String(), "\n") |
| 221 err := grpcutil.Errf(code, "%s", desc) |
| 222 if isTransientCode(code) { |
| 223 err = errors.WrapTransient(err) |
| 224 } |
| 225 return err |
| 226 } |
| 227 |
| 228 return proto.Unmarshal(buf.Bytes(), out) // non-transien
t error |
| 229 }, |
| 230 func(err error, sleepTime time.Duration) { |
| 231 logging.Fields{ |
| 232 logging.ErrorKey: err, |
| 233 "sleepTime": sleepTime, |
| 234 }.Warningf(ctx, "RPC failed transiently. Will retry in %
s", sleepTime) |
| 235 }, |
| 236 ) |
| 237 |
| 238 if err != nil { |
| 239 logging.WithError(err).Warningf(ctx, "RPC failed permanently: %s
", err) |
| 240 } |
| 241 |
| 242 // We have to unwrap gRPC errors because |
| 243 // grpc.Code and grpc.ErrorDesc functions do not work with error wrapper
s. |
| 244 // https://github.com/grpc/grpc-go/issues/494 |
| 245 return errors.UnwrapAll(err) |
| 246 } |
| 247 |
| 248 // prepareRequest creates an HTTP request for an RPC, |
| 249 // except it does not set the request body. |
| 250 func prepareRequest(host, serviceName, methodName string, contentLength int, opt
ions *Options) *http.Request { |
| 251 if host == "" { |
| 252 panic("Host is not set") |
| 253 } |
| 254 req := &http.Request{ |
| 255 Method: "POST", |
| 256 URL: &url.URL{ |
| 257 Scheme: "https", |
| 258 Host: host, |
| 259 Path: fmt.Sprintf("/prpc/%s/%s", serviceName, methodNa
me), |
| 260 }, |
| 261 Header: http.Header{}, |
| 262 } |
| 263 if options.Insecure { |
| 264 req.URL.Scheme = "http" |
| 265 } |
| 266 |
| 267 // Set headers. |
| 268 const mediaType = "application/prpc" // binary |
| 269 req.Header.Set("Content-Type", mediaType) |
| 270 req.Header.Set("Accept", mediaType) |
| 271 userAgent := options.UserAgent |
| 272 if userAgent == "" { |
| 273 userAgent = DefaultUserAgent |
| 274 } |
| 275 req.Header.Set("User-Agent", userAgent) |
| 276 req.ContentLength = int64(contentLength) |
| 277 req.Header.Set("Content-Length", strconv.Itoa(contentLength)) |
| 278 // TODO(nodir): add "Accept-Encoding: gzip" when pRPC server supports it
. |
| 279 return req |
| 280 } |
| 281 |
| 282 func isTransientCode(code codes.Code) bool { |
| 283 switch code { |
| 284 case codes.Internal, codes.Unknown, codes.Unavailable: |
| 285 return true |
| 286 |
| 287 default: |
| 288 return false |
| 289 } |
| 290 } |
OLD | NEW |