OLD | NEW |
(Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package prpc |
| 6 |
| 7 import ( |
| 8 "bytes" |
| 9 "fmt" |
| 10 "io" |
| 11 "io/ioutil" |
| 12 "net/http" |
| 13 "net/url" |
| 14 "strconv" |
| 15 "time" |
| 16 |
| 17 "github.com/golang/protobuf/proto" |
| 18 "golang.org/x/net/context" |
| 19 "golang.org/x/net/context/ctxhttp" |
| 20 "google.golang.org/grpc" |
| 21 "google.golang.org/grpc/codes" |
| 22 "google.golang.org/grpc/metadata" |
| 23 |
| 24 "github.com/luci/luci-go/common/errors" |
| 25 "github.com/luci/luci-go/common/logging" |
| 26 "github.com/luci/luci-go/common/retry" |
| 27 ) |
| 28 |
| 29 const ( |
| 30 // HeaderGRPCCode is a name of the HTTP header that specifies the |
| 31 // gRPC code in the response. If response does not specify it, |
| 32 // the gRPC code is derived from HTTP status code. |
| 33 HeaderGRPCCode = "X-Prpc-Grpc-Code" |
| 34 ) |
| 35 |
| 36 // DefaultUserAgent is default User-Agent HTTP header for pRPC requests. |
| 37 var DefaultUserAgent = "pRPC Client 1.0" |
| 38 |
| 39 // Client can make pRPC calls. |
| 40 type Client struct { |
| 41 C *http.Client // if nil, uses http.DefaultClient |
| 42 |
| 43 // ErrBodySize is the number of bytes to read from a HTTP response |
| 44 // with status >= 300 and include in the error. |
| 45 // If non-positive, defaults to 256. |
| 46 ErrBodySize int |
| 47 |
| 48 Host string // host and optionally a port number of the target serv
er. |
| 49 Options *Options // if nil, DefaultOptions() are used. |
| 50 } |
| 51 |
| 52 // renderOptions copies client options and applies opts. |
| 53 func (c *Client) renderOptions(opts []grpc.CallOption) *Options { |
| 54 var options *Options |
| 55 if c.Options != nil { |
| 56 cpy := *c.Options |
| 57 options = &cpy |
| 58 } else { |
| 59 options = DefaultOptions() |
| 60 } |
| 61 options.apply(opts) |
| 62 return options |
| 63 } |
| 64 |
| 65 func (c *Client) getHTTPClient() *http.Client { |
| 66 if c.C == nil { |
| 67 return http.DefaultClient |
| 68 } |
| 69 return c.C |
| 70 } |
| 71 |
| 72 // Call makes an RPC. |
| 73 // Retries on transient errors according to retry options. |
| 74 // Logs HTTP errors. |
| 75 // |
| 76 // opts must be created by this package. |
| 77 // Calling from multiple goroutines concurrently is safe, unless Client is mutat
ed. |
| 78 // Called from generated code. |
| 79 func (c *Client) Call(ctx context.Context, serviceName, methodName string, in, o
ut proto.Message, opts ...grpc.CallOption) error { |
| 80 options := c.renderOptions(opts) |
| 81 |
| 82 reqBody, err := proto.Marshal(in) |
| 83 if err != nil { |
| 84 return err |
| 85 } |
| 86 |
| 87 req := prepareRequest(c.Host, serviceName, methodName, len(reqBody), opt
ions) |
| 88 ctx = logging.SetFields(ctx, logging.Fields{ |
| 89 "Host": c.Host, |
| 90 "Service": serviceName, |
| 91 "Method": methodName, |
| 92 }) |
| 93 |
| 94 // Send the request in a retry loop. |
| 95 var iter retry.Iterator |
| 96 if options.Retry != nil { |
| 97 iter = retry.TransientOnly(options.Retry()) |
| 98 } |
| 99 var buf bytes.Buffer |
| 100 err = retry.Retry( |
| 101 ctx, |
| 102 iter, |
| 103 func() error { |
| 104 logging.Debugf(ctx, "RPC %s/%s.%s", c.Host, serviceName,
methodName) |
| 105 |
| 106 // Send the request. |
| 107 req.Body = ioutil.NopCloser(bytes.NewReader(reqBody)) |
| 108 res, err := ctxhttp.Do(ctx, c.getHTTPClient(), req) |
| 109 if err != nil { |
| 110 return errors.WrapTransient(fmt.Errorf("failed t
o send request: %s", err)) |
| 111 } |
| 112 defer res.Body.Close() |
| 113 |
| 114 if options.resHeaderMetadata != nil { |
| 115 *options.resHeaderMetadata = metadata.MD(res.Hea
der).Copy() |
| 116 } |
| 117 |
| 118 // Check response status code. |
| 119 if res.StatusCode >= 300 { |
| 120 return c.responseErr(ctx, res) |
| 121 } |
| 122 |
| 123 // Read the response body. |
| 124 buf.Reset() |
| 125 var body io.Reader = res.Body |
| 126 if res.ContentLength > 0 { |
| 127 buf.Grow(int(res.ContentLength)) |
| 128 body = io.LimitReader(body, res.ContentLength) |
| 129 } |
| 130 if _, err = buf.ReadFrom(body); err != nil { |
| 131 return fmt.Errorf("failed to read response body:
%s", err) |
| 132 } |
| 133 |
| 134 // Unmarshal the response message. |
| 135 if options.resTrailerMetadata != nil { |
| 136 *options.resTrailerMetadata = metadata.MD(res.Tr
ailer).Copy() |
| 137 } |
| 138 |
| 139 return proto.Unmarshal(buf.Bytes(), out) // non-transien
t error |
| 140 }, |
| 141 func(err error, sleepTime time.Duration) { |
| 142 logging.Fields{ |
| 143 logging.ErrorKey: err, |
| 144 "SleepTime": sleepTime, |
| 145 }.Warningf(ctx, "RPC failed transiently. Will retry in %
s", sleepTime) |
| 146 }, |
| 147 ) |
| 148 |
| 149 if err != nil { |
| 150 logging.WithError(err).Warningf(ctx, "RPC failed permanently: %s
", err) |
| 151 } |
| 152 |
| 153 // We have to unwrap gRPC errors because |
| 154 // grpc.Code and grpc.ErrorDesc functions do not work with error wrapper
s. |
| 155 // https://github.com/grpc/grpc-go/issues/494 |
| 156 return errors.UnwrapAll(err) |
| 157 } |
| 158 |
| 159 // responseErr converts an HTTP response to a gRPC error. |
| 160 // The returned error may be wrapped with errors.WrapTransient. |
| 161 func (c *Client) responseErr(ctx context.Context, res *http.Response) error { |
| 162 // Read first N bytes of the body. Leave empty if cannot read. |
| 163 bodySize := c.ErrBodySize |
| 164 if bodySize <= 0 { |
| 165 bodySize = 256 |
| 166 } |
| 167 var body string |
| 168 bodyBytes, err := ioutil.ReadAll(io.LimitReader(res.Body, int64(bodySize
))) |
| 169 if err == nil { |
| 170 if len(bodyBytes) == bodySize { |
| 171 bodyBytes = append(bodyBytes, []byte("...")...) |
| 172 } |
| 173 body = string(bodyBytes) |
| 174 } |
| 175 |
| 176 code := codes.Unknown |
| 177 // Read explicit gRPC code. |
| 178 if codeHeader := res.Header.Get(HeaderGRPCCode); codeHeader != "" { |
| 179 if intCode, err := strconv.Atoi(codeHeader); err != nil { |
| 180 logging.WithError(err).Warningf(ctx, "Could not parse %s
header: %s", HeaderGRPCCode, err) |
| 181 } else { |
| 182 code = codes.Code(intCode) |
| 183 } |
| 184 } else { |
| 185 // No explicit gRPC code provided, derive it from status code. |
| 186 code = StatusCode(res.StatusCode) |
| 187 } |
| 188 |
| 189 // Return the error. |
| 190 err = grpc.Errorf(code, "HTTP %s: %s", res.Status, body) |
| 191 if isTransientStatus(res.StatusCode) { |
| 192 err = errors.WrapTransient(err) |
| 193 } |
| 194 return err |
| 195 } |
| 196 |
| 197 // prepareRequest creates an HTTP request for an RPC, |
| 198 // except it does not set the request body. |
| 199 func prepareRequest(host, serviceName, methodName string, contentLength int, opt
ions *Options) *http.Request { |
| 200 if host == "" { |
| 201 panic("Host is not set") |
| 202 } |
| 203 req := &http.Request{ |
| 204 Method: "POST", |
| 205 URL: &url.URL{ |
| 206 Scheme: "https", |
| 207 Host: host, |
| 208 Path: fmt.Sprintf("/prpc/%s/%s", serviceName, methodNa
me), |
| 209 }, |
| 210 Header: http.Header{}, |
| 211 } |
| 212 if options.Insecure { |
| 213 req.URL.Scheme = "http" |
| 214 } |
| 215 |
| 216 // Set headers. |
| 217 const mediaType = "application/prpc" // binary |
| 218 req.Header.Set("Content-Type", mediaType) |
| 219 req.Header.Set("Accept", mediaType) |
| 220 userAgent := options.UserAgent |
| 221 if userAgent == "" { |
| 222 userAgent = DefaultUserAgent |
| 223 } |
| 224 req.Header.Set("User-Agent", userAgent) |
| 225 req.ContentLength = int64(contentLength) |
| 226 req.Header.Set("Content-Length", strconv.Itoa(contentLength)) |
| 227 // TODO(nodir): add "Accept-Encoding: gzip" when pRPC server supports it
. |
| 228 return req |
| 229 } |
| 230 |
| 231 // isTransientStatus returns true if an HTTP status code indicates a transient e
rror. |
| 232 func isTransientStatus(status int) bool { |
| 233 return status >= 500 || status == http.StatusRequestTimeout |
| 234 } |
OLD | NEW |