OLD | NEW |
1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package prpc | 5 package prpc |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "fmt" | 9 "fmt" |
10 "io" | 10 "io" |
(...skipping 185 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
196 req.Body = ioutil.NopCloser(bytes.NewReader(in)) | 196 req.Body = ioutil.NopCloser(bytes.NewReader(in)) |
197 res, err := ctxhttp.Do(ctx, c.getHTTPClient(), req) | 197 res, err := ctxhttp.Do(ctx, c.getHTTPClient(), req) |
198 if res != nil && res.Body != nil { | 198 if res != nil && res.Body != nil { |
199 defer res.Body.Close() | 199 defer res.Body.Close() |
200 } | 200 } |
201 if c.testPostHTTP != nil { | 201 if c.testPostHTTP != nil { |
202 err = c.testPostHTTP(ctx, err) | 202 err = c.testPostHTTP(ctx, err) |
203 } | 203 } |
204 if err != nil { | 204 if err != nil { |
205 // Treat all errors here as transient. | 205 // Treat all errors here as transient. |
206 » » » » return errors.WrapTransient(fmt.Errorf("failed t
o send request: %s", err)) | 206 » » » » return errors.Annotate(err).Reason("failed to se
nd request"). |
| 207 » » » » » Tag(retry.Tag).Err() |
207 } | 208 } |
208 | 209 |
209 if options.resHeaderMetadata != nil { | 210 if options.resHeaderMetadata != nil { |
210 *options.resHeaderMetadata = metadataFromHeaders
(res.Header) | 211 *options.resHeaderMetadata = metadataFromHeaders
(res.Header) |
211 } | 212 } |
212 contentType = res.Header.Get("Content-Type") | 213 contentType = res.Header.Get("Content-Type") |
213 | 214 |
214 // Read the response body. | 215 // Read the response body. |
215 buf.Reset() | 216 buf.Reset() |
216 var body io.Reader = res.Body | 217 var body io.Reader = res.Body |
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
259 } | 260 } |
260 if len(body) > bodySize { | 261 if len(body) > bodySize { |
261 body = body[:bodySize] + "..." | 262 body = body[:bodySize] + "..." |
262 } | 263 } |
263 err := fmt.Errorf("HTTP %d: no gRPC code. Body:
%q", res.StatusCode, body) | 264 err := fmt.Errorf("HTTP %d: no gRPC code. Body:
%q", res.StatusCode, body) |
264 | 265 |
265 // Some HTTP codes are returned directly by host
ing platforms (e.g., | 266 // Some HTTP codes are returned directly by host
ing platforms (e.g., |
266 // AppEngine), and should be automatically retri
ed even if a gRPC code | 267 // AppEngine), and should be automatically retri
ed even if a gRPC code |
267 // header is not supplied. | 268 // header is not supplied. |
268 if res.StatusCode >= http.StatusInternalServerEr
ror { | 269 if res.StatusCode >= http.StatusInternalServerEr
ror { |
269 » » » » » err = errors.WrapTransient(err) | 270 » » » » » err = retry.Tag.Apply(err) |
270 } | 271 } |
271 return err | 272 return err |
272 } | 273 } |
273 | 274 |
274 codeInt, err := strconv.Atoi(codeHeader) | 275 codeInt, err := strconv.Atoi(codeHeader) |
275 if err != nil { | 276 if err != nil { |
276 // Not a valid pRPC response. | 277 // Not a valid pRPC response. |
277 return fmt.Errorf("invalid grpc code %q: %s", co
deHeader, err) | 278 return fmt.Errorf("invalid grpc code %q: %s", co
deHeader, err) |
278 } | 279 } |
279 | 280 |
280 code := codes.Code(codeInt) | 281 code := codes.Code(codeInt) |
281 if code != codes.OK { | 282 if code != codes.OK { |
282 » » » » desc := strings.TrimSuffix(buf.String(), "\n") | 283 » » » » return grpcutil.Errf(code, "%s", strings.TrimSuf
fix(buf.String(), "\n")) |
283 » » » » err := grpcutil.Errf(code, "%s", desc) | |
284 » » » » if grpcutil.IsTransientCode(code) { | |
285 » » » » » err = errors.WrapTransient(err) | |
286 » » » » } | |
287 » » » » return err | |
288 } | 284 } |
289 return nil | 285 return nil |
290 }, | 286 }, |
291 func(err error, sleepTime time.Duration) { | 287 func(err error, sleepTime time.Duration) { |
292 logging.Fields{ | 288 logging.Fields{ |
293 logging.ErrorKey: err, | 289 logging.ErrorKey: err, |
294 "sleepTime": sleepTime, | 290 "sleepTime": sleepTime, |
295 }.Warningf(ctx, "RPC failed transiently. Will retry in %
s", sleepTime) | 291 }.Warningf(ctx, "RPC failed transiently. Will retry in %
s", sleepTime) |
296 }, | 292 }, |
297 ) | 293 ) |
(...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
362 if len(h) == 0 { | 358 if len(h) == 0 { |
363 return nil | 359 return nil |
364 } | 360 } |
365 | 361 |
366 md := make(metadata.MD, len(h)) | 362 md := make(metadata.MD, len(h)) |
367 for k, v := range h { | 363 for k, v := range h { |
368 md[strings.ToLower(k)] = v | 364 md[strings.ToLower(k)] = v |
369 } | 365 } |
370 return md | 366 return md |
371 } | 367 } |
OLD | NEW |