| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package prpc | 5 package prpc |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "fmt" | 9 "fmt" |
| 10 "io" | 10 "io" |
| 11 "io/ioutil" | 11 "io/ioutil" |
| 12 "net/http" | 12 "net/http" |
| 13 "net/url" | 13 "net/url" |
| 14 "strconv" | 14 "strconv" |
| 15 "strings" | 15 "strings" |
| 16 "time" | 16 "time" |
| 17 | 17 |
| 18 "github.com/golang/protobuf/proto" | 18 "github.com/golang/protobuf/proto" |
| 19 "golang.org/x/net/context" | 19 "golang.org/x/net/context" |
| 20 "golang.org/x/net/context/ctxhttp" | 20 "golang.org/x/net/context/ctxhttp" |
| 21 "google.golang.org/grpc" | 21 "google.golang.org/grpc" |
| 22 "google.golang.org/grpc/codes" | 22 "google.golang.org/grpc/codes" |
| 23 "google.golang.org/grpc/metadata" | 23 "google.golang.org/grpc/metadata" |
| 24 | 24 |
| 25 "github.com/luci/luci-go/common/clock" | 25 "github.com/luci/luci-go/common/clock" |
| 26 "github.com/luci/luci-go/common/errors" | 26 "github.com/luci/luci-go/common/errors" |
| 27 "github.com/luci/luci-go/common/logging" | 27 "github.com/luci/luci-go/common/logging" |
| 28 "github.com/luci/luci-go/common/retry" | 28 "github.com/luci/luci-go/common/retry" |
| 29 "github.com/luci/luci-go/common/retry/transient" |
| 29 "github.com/luci/luci-go/grpc/grpcutil" | 30 "github.com/luci/luci-go/grpc/grpcutil" |
| 30 ) | 31 ) |
| 31 | 32 |
| 32 const ( | 33 const ( |
| 33 // HeaderGRPCCode is a name of the HTTP header that specifies the | 34 // HeaderGRPCCode is a name of the HTTP header that specifies the |
| 34 // gRPC code in the response. | 35 // gRPC code in the response. |
| 35 // A pRPC server must always specify it. | 36 // A pRPC server must always specify it. |
| 36 HeaderGRPCCode = "X-Prpc-Grpc-Code" | 37 HeaderGRPCCode = "X-Prpc-Grpc-Code" |
| 37 | 38 |
| 38 // HeaderTimeout is HTTP header used to set pRPC request timeout. | 39 // HeaderTimeout is HTTP header used to set pRPC request timeout. |
| (...skipping 107 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 146 "host": c.Host, | 147 "host": c.Host, |
| 147 "service": serviceName, | 148 "service": serviceName, |
| 148 "method": methodName, | 149 "method": methodName, |
| 149 }) | 150 }) |
| 150 | 151 |
| 151 // Send the request in a retry loop. | 152 // Send the request in a retry loop. |
| 152 var buf bytes.Buffer | 153 var buf bytes.Buffer |
| 153 var contentType string | 154 var contentType string |
| 154 err = retry.Retry( | 155 err = retry.Retry( |
| 155 ctx, | 156 ctx, |
| 156 » » retry.TransientOnly(options.Retry), | 157 » » transient.Only(options.Retry), |
| 157 func() error { | 158 func() error { |
| 158 ctx := ctx | 159 ctx := ctx |
| 159 logging.Debugf(ctx, "RPC %s/%s.%s", c.Host, serviceName,
methodName) | 160 logging.Debugf(ctx, "RPC %s/%s.%s", c.Host, serviceName,
methodName) |
| 160 | 161 |
| 161 // If there is a deadline on our Context, set the timeou
t header on the | 162 // If there is a deadline on our Context, set the timeou
t header on the |
| 162 // request. | 163 // request. |
| 163 now := clock.Now(ctx) | 164 now := clock.Now(ctx) |
| 164 var requestDeadline time.Time | 165 var requestDeadline time.Time |
| 165 if options.PerRPCTimeout > 0 { | 166 if options.PerRPCTimeout > 0 { |
| 166 requestDeadline = now.Add(options.PerRPCTimeout) | 167 requestDeadline = now.Add(options.PerRPCTimeout) |
| (...skipping 29 matching lines...) Expand all Loading... |
| 196 req.Body = ioutil.NopCloser(bytes.NewReader(in)) | 197 req.Body = ioutil.NopCloser(bytes.NewReader(in)) |
| 197 res, err := ctxhttp.Do(ctx, c.getHTTPClient(), req) | 198 res, err := ctxhttp.Do(ctx, c.getHTTPClient(), req) |
| 198 if res != nil && res.Body != nil { | 199 if res != nil && res.Body != nil { |
| 199 defer res.Body.Close() | 200 defer res.Body.Close() |
| 200 } | 201 } |
| 201 if c.testPostHTTP != nil { | 202 if c.testPostHTTP != nil { |
| 202 err = c.testPostHTTP(ctx, err) | 203 err = c.testPostHTTP(ctx, err) |
| 203 } | 204 } |
| 204 if err != nil { | 205 if err != nil { |
| 205 // Treat all errors here as transient. | 206 // Treat all errors here as transient. |
| 206 » » » » return errors.WrapTransient(fmt.Errorf("failed t
o send request: %s", err)) | 207 » » » » return errors.Annotate(err).Reason("failed to se
nd request"). |
| 208 » » » » » Tag(transient.Tag).Err() |
| 207 } | 209 } |
| 208 | 210 |
| 209 if options.resHeaderMetadata != nil { | 211 if options.resHeaderMetadata != nil { |
| 210 *options.resHeaderMetadata = metadataFromHeaders
(res.Header) | 212 *options.resHeaderMetadata = metadataFromHeaders
(res.Header) |
| 211 } | 213 } |
| 212 contentType = res.Header.Get("Content-Type") | 214 contentType = res.Header.Get("Content-Type") |
| 213 | 215 |
| 214 // Read the response body. | 216 // Read the response body. |
| 215 buf.Reset() | 217 buf.Reset() |
| 216 var body io.Reader = res.Body | 218 var body io.Reader = res.Body |
| (...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 259 } | 261 } |
| 260 if len(body) > bodySize { | 262 if len(body) > bodySize { |
| 261 body = body[:bodySize] + "..." | 263 body = body[:bodySize] + "..." |
| 262 } | 264 } |
| 263 err := fmt.Errorf("HTTP %d: no gRPC code. Body:
%q", res.StatusCode, body) | 265 err := fmt.Errorf("HTTP %d: no gRPC code. Body:
%q", res.StatusCode, body) |
| 264 | 266 |
| 265 // Some HTTP codes are returned directly by host
ing platforms (e.g., | 267 // Some HTTP codes are returned directly by host
ing platforms (e.g., |
| 266 // AppEngine), and should be automatically retri
ed even if a gRPC code | 268 // AppEngine), and should be automatically retri
ed even if a gRPC code |
| 267 // header is not supplied. | 269 // header is not supplied. |
| 268 if res.StatusCode >= http.StatusInternalServerEr
ror { | 270 if res.StatusCode >= http.StatusInternalServerEr
ror { |
| 269 » » » » » err = errors.WrapTransient(err) | 271 » » » » » err = transient.Tag.Apply(err) |
| 270 } | 272 } |
| 271 return err | 273 return err |
| 272 } | 274 } |
| 273 | 275 |
| 274 codeInt, err := strconv.Atoi(codeHeader) | 276 codeInt, err := strconv.Atoi(codeHeader) |
| 275 if err != nil { | 277 if err != nil { |
| 276 // Not a valid pRPC response. | 278 // Not a valid pRPC response. |
| 277 return fmt.Errorf("invalid grpc code %q: %s", co
deHeader, err) | 279 return fmt.Errorf("invalid grpc code %q: %s", co
deHeader, err) |
| 278 } | 280 } |
| 279 | 281 |
| 280 code := codes.Code(codeInt) | 282 code := codes.Code(codeInt) |
| 281 if code != codes.OK { | 283 if code != codes.OK { |
| 282 desc := strings.TrimSuffix(buf.String(), "\n") | 284 desc := strings.TrimSuffix(buf.String(), "\n") |
| 283 err := grpcutil.Errf(code, "%s", desc) | 285 err := grpcutil.Errf(code, "%s", desc) |
| 284 if grpcutil.IsTransientCode(code) { | 286 if grpcutil.IsTransientCode(code) { |
| 285 » » » » » err = errors.WrapTransient(err) | 287 » » » » » err = transient.Tag.Apply(err) |
| 286 } | 288 } |
| 287 return err | 289 return err |
| 288 } | 290 } |
| 289 return nil | 291 return nil |
| 290 }, | 292 }, |
| 291 func(err error, sleepTime time.Duration) { | 293 func(err error, sleepTime time.Duration) { |
| 292 logging.Fields{ | 294 logging.Fields{ |
| 293 logging.ErrorKey: err, | 295 logging.ErrorKey: err, |
| 294 "sleepTime": sleepTime, | 296 "sleepTime": sleepTime, |
| 295 }.Warningf(ctx, "RPC failed transiently. Will retry in %
s", sleepTime) | 297 }.Warningf(ctx, "RPC failed transiently. Will retry in %
s", sleepTime) |
| (...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 362 if len(h) == 0 { | 364 if len(h) == 0 { |
| 363 return nil | 365 return nil |
| 364 } | 366 } |
| 365 | 367 |
| 366 md := make(metadata.MD, len(h)) | 368 md := make(metadata.MD, len(h)) |
| 367 for k, v := range h { | 369 for k, v := range h { |
| 368 md[strings.ToLower(k)] = v | 370 md[strings.ToLower(k)] = v |
| 369 } | 371 } |
| 370 return md | 372 return md |
| 371 } | 373 } |
| OLD | NEW |