OLD | NEW |
1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package prpc | 5 package prpc |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "fmt" | 9 "fmt" |
10 "io" | 10 "io" |
11 "io/ioutil" | 11 "io/ioutil" |
12 "net/http" | 12 "net/http" |
13 "net/url" | 13 "net/url" |
14 "strconv" | 14 "strconv" |
15 "strings" | 15 "strings" |
16 "time" | 16 "time" |
17 | 17 |
18 "github.com/golang/protobuf/proto" | 18 "github.com/golang/protobuf/proto" |
19 "golang.org/x/net/context" | 19 "golang.org/x/net/context" |
20 "golang.org/x/net/context/ctxhttp" | 20 "golang.org/x/net/context/ctxhttp" |
21 "google.golang.org/grpc" | 21 "google.golang.org/grpc" |
22 "google.golang.org/grpc/codes" | 22 "google.golang.org/grpc/codes" |
23 "google.golang.org/grpc/metadata" | 23 "google.golang.org/grpc/metadata" |
24 | 24 |
25 "github.com/luci/luci-go/common/clock" | 25 "github.com/luci/luci-go/common/clock" |
26 "github.com/luci/luci-go/common/errors" | 26 "github.com/luci/luci-go/common/errors" |
27 "github.com/luci/luci-go/common/logging" | 27 "github.com/luci/luci-go/common/logging" |
28 "github.com/luci/luci-go/common/retry" | 28 "github.com/luci/luci-go/common/retry" |
| 29 "github.com/luci/luci-go/common/retry/transient" |
29 "github.com/luci/luci-go/grpc/grpcutil" | 30 "github.com/luci/luci-go/grpc/grpcutil" |
30 ) | 31 ) |
31 | 32 |
32 const ( | 33 const ( |
33 // HeaderGRPCCode is a name of the HTTP header that specifies the | 34 // HeaderGRPCCode is a name of the HTTP header that specifies the |
34 // gRPC code in the response. | 35 // gRPC code in the response. |
35 // A pRPC server must always specify it. | 36 // A pRPC server must always specify it. |
36 HeaderGRPCCode = "X-Prpc-Grpc-Code" | 37 HeaderGRPCCode = "X-Prpc-Grpc-Code" |
37 | 38 |
38 // HeaderTimeout is HTTP header used to set pRPC request timeout. | 39 // HeaderTimeout is HTTP header used to set pRPC request timeout. |
(...skipping 107 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
146 "host": c.Host, | 147 "host": c.Host, |
147 "service": serviceName, | 148 "service": serviceName, |
148 "method": methodName, | 149 "method": methodName, |
149 }) | 150 }) |
150 | 151 |
151 // Send the request in a retry loop. | 152 // Send the request in a retry loop. |
152 var buf bytes.Buffer | 153 var buf bytes.Buffer |
153 var contentType string | 154 var contentType string |
154 err = retry.Retry( | 155 err = retry.Retry( |
155 ctx, | 156 ctx, |
156 » » retry.TransientOnly(options.Retry), | 157 » » transient.Only(options.Retry), |
157 func() error { | 158 func() error { |
158 ctx := ctx | 159 ctx := ctx |
159 logging.Debugf(ctx, "RPC %s/%s.%s", c.Host, serviceName,
methodName) | 160 logging.Debugf(ctx, "RPC %s/%s.%s", c.Host, serviceName,
methodName) |
160 | 161 |
161 // If there is a deadline on our Context, set the timeou
t header on the | 162 // If there is a deadline on our Context, set the timeou
t header on the |
162 // request. | 163 // request. |
163 now := clock.Now(ctx) | 164 now := clock.Now(ctx) |
164 var requestDeadline time.Time | 165 var requestDeadline time.Time |
165 if options.PerRPCTimeout > 0 { | 166 if options.PerRPCTimeout > 0 { |
166 requestDeadline = now.Add(options.PerRPCTimeout) | 167 requestDeadline = now.Add(options.PerRPCTimeout) |
(...skipping 29 matching lines...) Expand all Loading... |
196 req.Body = ioutil.NopCloser(bytes.NewReader(in)) | 197 req.Body = ioutil.NopCloser(bytes.NewReader(in)) |
197 res, err := ctxhttp.Do(ctx, c.getHTTPClient(), req) | 198 res, err := ctxhttp.Do(ctx, c.getHTTPClient(), req) |
198 if res != nil && res.Body != nil { | 199 if res != nil && res.Body != nil { |
199 defer res.Body.Close() | 200 defer res.Body.Close() |
200 } | 201 } |
201 if c.testPostHTTP != nil { | 202 if c.testPostHTTP != nil { |
202 err = c.testPostHTTP(ctx, err) | 203 err = c.testPostHTTP(ctx, err) |
203 } | 204 } |
204 if err != nil { | 205 if err != nil { |
205 // Treat all errors here as transient. | 206 // Treat all errors here as transient. |
206 » » » » return errors.WrapTransient(fmt.Errorf("failed t
o send request: %s", err)) | 207 » » » » return errors.Annotate(err).Reason("failed to se
nd request"). |
| 208 » » » » » Tag(transient.Tag).Err() |
207 } | 209 } |
208 | 210 |
209 if options.resHeaderMetadata != nil { | 211 if options.resHeaderMetadata != nil { |
210 *options.resHeaderMetadata = metadataFromHeaders
(res.Header) | 212 *options.resHeaderMetadata = metadataFromHeaders
(res.Header) |
211 } | 213 } |
212 contentType = res.Header.Get("Content-Type") | 214 contentType = res.Header.Get("Content-Type") |
213 | 215 |
214 // Read the response body. | 216 // Read the response body. |
215 buf.Reset() | 217 buf.Reset() |
216 var body io.Reader = res.Body | 218 var body io.Reader = res.Body |
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
259 } | 261 } |
260 if len(body) > bodySize { | 262 if len(body) > bodySize { |
261 body = body[:bodySize] + "..." | 263 body = body[:bodySize] + "..." |
262 } | 264 } |
263 err := fmt.Errorf("HTTP %d: no gRPC code. Body:
%q", res.StatusCode, body) | 265 err := fmt.Errorf("HTTP %d: no gRPC code. Body:
%q", res.StatusCode, body) |
264 | 266 |
265 // Some HTTP codes are returned directly by host
ing platforms (e.g., | 267 // Some HTTP codes are returned directly by host
ing platforms (e.g., |
266 // AppEngine), and should be automatically retri
ed even if a gRPC code | 268 // AppEngine), and should be automatically retri
ed even if a gRPC code |
267 // header is not supplied. | 269 // header is not supplied. |
268 if res.StatusCode >= http.StatusInternalServerEr
ror { | 270 if res.StatusCode >= http.StatusInternalServerEr
ror { |
269 » » » » » err = errors.WrapTransient(err) | 271 » » » » » err = transient.Tag.Apply(err) |
270 } | 272 } |
271 return err | 273 return err |
272 } | 274 } |
273 | 275 |
274 codeInt, err := strconv.Atoi(codeHeader) | 276 codeInt, err := strconv.Atoi(codeHeader) |
275 if err != nil { | 277 if err != nil { |
276 // Not a valid pRPC response. | 278 // Not a valid pRPC response. |
277 return fmt.Errorf("invalid grpc code %q: %s", co
deHeader, err) | 279 return fmt.Errorf("invalid grpc code %q: %s", co
deHeader, err) |
278 } | 280 } |
279 | 281 |
280 code := codes.Code(codeInt) | 282 code := codes.Code(codeInt) |
281 if code != codes.OK { | 283 if code != codes.OK { |
282 desc := strings.TrimSuffix(buf.String(), "\n") | 284 desc := strings.TrimSuffix(buf.String(), "\n") |
283 err := grpcutil.Errf(code, "%s", desc) | 285 err := grpcutil.Errf(code, "%s", desc) |
284 if grpcutil.IsTransientCode(code) { | 286 if grpcutil.IsTransientCode(code) { |
285 » » » » » err = errors.WrapTransient(err) | 287 » » » » » err = transient.Tag.Apply(err) |
286 } | 288 } |
287 return err | 289 return err |
288 } | 290 } |
289 return nil | 291 return nil |
290 }, | 292 }, |
291 func(err error, sleepTime time.Duration) { | 293 func(err error, sleepTime time.Duration) { |
292 logging.Fields{ | 294 logging.Fields{ |
293 logging.ErrorKey: err, | 295 logging.ErrorKey: err, |
294 "sleepTime": sleepTime, | 296 "sleepTime": sleepTime, |
295 }.Warningf(ctx, "RPC failed transiently. Will retry in %
s", sleepTime) | 297 }.Warningf(ctx, "RPC failed transiently. Will retry in %
s", sleepTime) |
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
362 if len(h) == 0 { | 364 if len(h) == 0 { |
363 return nil | 365 return nil |
364 } | 366 } |
365 | 367 |
366 md := make(metadata.MD, len(h)) | 368 md := make(metadata.MD, len(h)) |
367 for k, v := range h { | 369 for k, v := range h { |
368 md[strings.ToLower(k)] = v | 370 md[strings.ToLower(k)] = v |
369 } | 371 } |
370 return md | 372 return md |
371 } | 373 } |
OLD | NEW |