Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package prpc | 5 package prpc |
| 6 | 6 |
| 7 import ( | |
| 8 "bytes" | |
| 9 "fmt" | |
| 10 "io" | |
| 11 "io/ioutil" | |
| 12 "net/http" | |
| 13 "net/url" | |
| 14 "strconv" | |
| 15 "strings" | |
| 16 "time" | |
| 17 | |
| 18 "github.com/golang/protobuf/proto" | |
| 19 "golang.org/x/net/context" | |
| 20 "golang.org/x/net/context/ctxhttp" | |
| 21 "google.golang.org/grpc" | |
| 22 "google.golang.org/grpc/codes" | |
| 23 "google.golang.org/grpc/metadata" | |
| 24 | |
| 25 "github.com/luci/luci-go/common/errors" | |
| 26 "github.com/luci/luci-go/common/grpcutil" | |
| 27 "github.com/luci/luci-go/common/logging" | |
| 28 "github.com/luci/luci-go/common/retry" | |
| 29 ) | |
| 30 | |
| 7 const ( | 31 const ( |
| 8 // HeaderGRPCCode is a name of the HTTP header that specifies the | 32 // HeaderGRPCCode is a name of the HTTP header that specifies the |
| 9 // gRPC code in the response. | 33 // gRPC code in the response. |
| 10 // A pRPC server must always specify it. | 34 // A pRPC server must always specify it. |
| 11 HeaderGRPCCode = "X-Prpc-Grpc-Code" | 35 HeaderGRPCCode = "X-Prpc-Grpc-Code" |
| 12 ) | 36 ) |
| 37 | |
| 38 // DefaultUserAgent is default User-Agent HTTP header for pRPC requests. | |
| 39 var DefaultUserAgent = "pRPC Client 1.0" | |
| 40 | |
| 41 // Client can make pRPC calls. | |
| 42 type Client struct { | |
| 43 C *http.Client // if nil, uses http.DefaultClient | |
| 44 | |
| 45 // ErrBodySize is the number of bytes to read from a HTTP response | |
| 46 // with error status and include in the error. | |
| 47 // If non-positive, defaults to 256. | |
| 48 ErrBodySize int | |
| 49 | |
| 50 Host string // host and optionally a port number of the target serv er. | |
| 51 Options *Options // if nil, DefaultOptions() are used. | |
| 52 } | |
| 53 | |
| 54 // renderOptions copies client options and applies opts. | |
| 55 func (c *Client) renderOptions(opts []grpc.CallOption) *Options { | |
| 56 var options *Options | |
| 57 if c.Options != nil { | |
| 58 cpy := *c.Options | |
| 59 options = &cpy | |
| 60 } else { | |
| 61 options = DefaultOptions() | |
| 62 } | |
| 63 options.apply(opts) | |
| 64 return options | |
| 65 } | |
| 66 | |
| 67 func (c *Client) getHTTPClient() *http.Client { | |
| 68 if c.C == nil { | |
| 69 return http.DefaultClient | |
| 70 } | |
| 71 return c.C | |
| 72 } | |
| 73 | |
| 74 // Call makes an RPC. | |
| 75 // Retries on transient errors according to retry options. | |
| 76 // Logs HTTP errors. | |
| 77 // | |
| 78 // opts must be created by this package. | |
| 79 // Calling from multiple goroutines concurrently is safe, unless Client is mutat ed. | |
| 80 // Called from generated code. | |
| 81 func (c *Client) Call(ctx context.Context, serviceName, methodName string, in, o ut proto.Message, opts ...grpc.CallOption) error { | |
| 82 options := c.renderOptions(opts) | |
| 83 | |
| 84 reqBody, err := proto.Marshal(in) | |
| 85 if err != nil { | |
| 86 return err | |
| 87 } | |
| 88 | |
| 89 req := prepareRequest(c.Host, serviceName, methodName, len(reqBody), opt ions) | |
| 90 ctx = logging.SetFields(ctx, logging.Fields{ | |
| 91 "host": c.Host, | |
| 92 "service": serviceName, | |
| 93 "method": methodName, | |
| 94 }) | |
| 95 | |
| 96 // Send the request in a retry loop. | |
| 97 var buf bytes.Buffer | |
| 98 err = retry.Retry( | |
| 99 ctx, | |
| 100 retry.TransientOnly(options.Retry), | |
| 101 func() error { | |
| 102 logging.Debugf(ctx, "RPC %s/%s.%s", c.Host, serviceName, methodName) | |
|
Vadim Sh.
2016/01/27 01:13:54
host, serviceName and methodName is already in the
| |
| 103 | |
| 104 // Send the request. | |
| 105 req.Body = ioutil.NopCloser(bytes.NewReader(reqBody)) | |
| 106 res, err := ctxhttp.Do(ctx, c.getHTTPClient(), req) | |
| 107 if err != nil { | |
|
Vadim Sh.
2016/01/27 01:13:54
detect context cancelation here? We don't want to
| |
| 108 return errors.WrapTransient(fmt.Errorf("failed t o send request: %s", err)) | |
| 109 } | |
| 110 defer res.Body.Close() | |
| 111 | |
| 112 if options.resHeaderMetadata != nil { | |
| 113 *options.resHeaderMetadata = metadata.MD(res.Hea der).Copy() | |
| 114 } | |
| 115 | |
| 116 // Read the response body. | |
| 117 buf.Reset() | |
| 118 var body io.Reader = res.Body | |
| 119 if res.ContentLength > 0 { | |
| 120 buf.Grow(int(res.ContentLength)) | |
| 121 body = io.LimitReader(body, res.ContentLength) | |
| 122 } | |
| 123 if _, err = buf.ReadFrom(body); err != nil { | |
| 124 return fmt.Errorf("failed to read response body: %s", err) | |
|
Vadim Sh.
2016/01/27 01:13:54
this is most probably transient error too (e.g. co
| |
| 125 } | |
| 126 | |
| 127 if options.resTrailerMetadata != nil { | |
| 128 *options.resTrailerMetadata = metadata.MD(res.Tr ailer).Copy() | |
| 129 } | |
| 130 | |
| 131 codeHeader := res.Header.Get(HeaderGRPCCode) | |
| 132 if codeHeader == "" { | |
| 133 // Not a valid pRPC response. | |
| 134 body := buf.String() | |
| 135 bodySize := c.ErrBodySize | |
| 136 if bodySize <= 0 { | |
| 137 bodySize = 256 | |
| 138 } | |
| 139 if len(body) > bodySize { | |
| 140 body = body[:bodySize] + "..." | |
| 141 } | |
| 142 return fmt.Errorf("HTTP %d: no gRPC code. Body: %s", res.StatusCode, body) | |
| 143 } | |
| 144 | |
| 145 codeInt, err := strconv.Atoi(codeHeader) | |
| 146 if err != nil { | |
| 147 // Not a valid pRPC response. | |
| 148 return fmt.Errorf("invalid grpc code %q: %s", co deHeader, err) | |
| 149 } | |
| 150 | |
| 151 code := codes.Code(codeInt) | |
| 152 if code != codes.OK { | |
| 153 desc := strings.TrimSuffix(buf.String(), "\n") | |
| 154 err := grpcutil.Errf(code, "%s", desc) | |
| 155 if isTransientCode(code) { | |
| 156 err = errors.WrapTransient(err) | |
| 157 } | |
| 158 return err | |
| 159 } | |
| 160 | |
| 161 return proto.Unmarshal(buf.Bytes(), out) // non-transien t error | |
| 162 }, | |
| 163 func(err error, sleepTime time.Duration) { | |
| 164 logging.Fields{ | |
| 165 logging.ErrorKey: err, | |
| 166 "sleepTime": sleepTime, | |
| 167 }.Warningf(ctx, "RPC failed transiently. Will retry in % s", sleepTime) | |
| 168 }, | |
| 169 ) | |
| 170 | |
| 171 if err != nil { | |
| 172 logging.WithError(err).Warningf(ctx, "RPC failed permanently: %s ", err) | |
| 173 } | |
| 174 | |
| 175 // We have to unwrap gRPC errors because | |
| 176 // grpc.Code and grpc.ErrorDesc functions do not work with error wrapper s. | |
| 177 // https://github.com/grpc/grpc-go/issues/494 | |
| 178 return errors.UnwrapAll(err) | |
| 179 } | |
| 180 | |
| 181 // prepareRequest creates an HTTP request for an RPC, | |
| 182 // except it does not set the request body. | |
| 183 func prepareRequest(host, serviceName, methodName string, contentLength int, opt ions *Options) *http.Request { | |
| 184 if host == "" { | |
| 185 panic("Host is not set") | |
| 186 } | |
| 187 req := &http.Request{ | |
| 188 Method: "POST", | |
| 189 URL: &url.URL{ | |
| 190 Scheme: "https", | |
| 191 Host: host, | |
| 192 Path: fmt.Sprintf("/prpc/%s/%s", serviceName, methodNa me), | |
| 193 }, | |
| 194 Header: http.Header{}, | |
| 195 } | |
| 196 if options.Insecure { | |
|
Vadim Sh.
2016/01/27 01:13:54
can we also do it by default for "localhost"? Very
| |
| 197 req.URL.Scheme = "http" | |
| 198 } | |
| 199 | |
| 200 // Set headers. | |
| 201 const mediaType = "application/prpc" // binary | |
| 202 req.Header.Set("Content-Type", mediaType) | |
| 203 req.Header.Set("Accept", mediaType) | |
| 204 userAgent := options.UserAgent | |
| 205 if userAgent == "" { | |
| 206 userAgent = DefaultUserAgent | |
| 207 } | |
| 208 req.Header.Set("User-Agent", userAgent) | |
| 209 req.ContentLength = int64(contentLength) | |
| 210 req.Header.Set("Content-Length", strconv.Itoa(contentLength)) | |
| 211 // TODO(nodir): add "Accept-Encoding: gzip" when pRPC server supports it . | |
| 212 return req | |
| 213 } | |
| 214 | |
| 215 func isTransientCode(code codes.Code) bool { | |
| 216 switch code { | |
| 217 case codes.Internal, codes.Unknown, codes.Unavailable: | |
| 218 return true | |
| 219 | |
| 220 default: | |
| 221 return false | |
| 222 } | |
| 223 } | |
| OLD | NEW |